Tapir Server Refactor #317
|
@ -58,8 +58,6 @@ func (ap *appletPeers) LaunchPeers() {
|
|||
log.Debugf("done Listen() for %v\n", pid)
|
||||
p.StartPeersConnections()
|
||||
log.Debugf("done StartPeersConnections() for %v\n", pid)
|
||||
p.StartGroupConnections()
|
||||
log.Debugf("done StartGroupConnections() for %v\n", pid)
|
||||
}
|
||||
ap.launched = true
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package plugins
|
|||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -56,11 +57,13 @@ func (cr *contactRetry) run() {
|
|||
case e := <-cr.queue.OutChan():
|
||||
switch e.EventType {
|
||||
case event.PeerStateChange:
|
||||
log.Errorf("PEER STATE CHANGE: %v", e)
|
||||
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]
|
||||
peer := e.Data[event.RemotePeer]
|
||||
cr.handleEvent(peer, state, peerConn)
|
||||
|
||||
case event.ServerStateChange:
|
||||
log.Errorf("SERVER STATE CHANGE: %v", e)
|
||||
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]
|
||||
server := e.Data[event.GroupServer]
|
||||
cr.handleEvent(server, state, serverConn)
|
||||
|
@ -73,7 +76,9 @@ func (cr *contactRetry) run() {
|
|||
p := v.(*contact)
|
||||
p.ticks = 0
|
||||
p.backoff = 1
|
||||
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
|
||||
if p.ctype == peerConn {
|
||||
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
|
||||
}
|
||||
return true
|
||||
})
|
||||
} else if prog != "100" {
|
||||
|
@ -93,7 +98,7 @@ func (cr *contactRetry) run() {
|
|||
if p.ctype == peerConn {
|
||||
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
|
||||
} else {
|
||||
cr.bus.Publish(event.NewEventList(event.JoinServer, event.GroupServer, p.id))
|
||||
//cr.bus.Publish(event.NewEventList(event.JoinServer, event.GroupServer, p.id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ const (
|
|||
// TimestampReceived [eg time.Now().Format(time.RFC3339Nano)]
|
||||
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"]
|
||||
// GroupInvite: [eg "torv3....."]
|
||||
// Imported
|
||||
NewGroupInvite = Type("NewGroupInvite")
|
||||
|
||||
// GroupID
|
||||
|
@ -247,6 +248,11 @@ const (
|
|||
EventContext = Field("EventContext")
|
||||
|
||||
Authorization = Field("Authorization")
|
||||
|
||||
KeyBundle = Field("KeyBundle")
|
||||
|
||||
// Indicate whether an event was triggered by a user import
|
||||
Imported = Field("Imported")
|
||||
)
|
||||
|
||||
// Defining Common errors
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -49,7 +50,7 @@ type CwtchPeer interface {
|
|||
DeleteGroup(string)
|
||||
|
||||
AddServer(string) error
|
||||
JoinServer(string)
|
||||
JoinServer(string) error
|
||||
SendMessageToGroup(string, string) error
|
||||
SendMessageToGroupTracked(string, string) (string, error)
|
||||
|
||||
|
@ -80,7 +81,6 @@ type CwtchPeer interface {
|
|||
|
||||
Listen()
|
||||
StartPeersConnections()
|
||||
StartGroupConnections()
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
|
@ -106,7 +106,7 @@ func (cp *cwtchPeer) Init(eventBus event.Manager) {
|
|||
go cp.eventHandler()
|
||||
|
||||
cp.eventBus = eventBus
|
||||
cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement,
|
||||
cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement, event.NewGroupInvite,
|
||||
event.PeerError, event.SendMessageToGroupError, event.NewGetValMessageFromPeer})
|
||||
}
|
||||
|
||||
|
@ -128,6 +128,7 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (err error) {
|
|||
if err == nil {
|
||||
cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{
|
||||
event.GroupInvite: string(data),
|
||||
event.Imported: "true",
|
||||
}))
|
||||
} else {
|
||||
log.Errorf("error decoding group invite: %v", err)
|
||||
|
@ -298,7 +299,12 @@ func (cp *cwtchPeer) GetGroupState(groupid string) (connections.ConnectionState,
|
|||
func (cp *cwtchPeer) PeerWithOnion(onion string) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
if _, exists := cp.Profile.GetContact(onion); !exists {
|
||||
if contact, exists := cp.Profile.GetContact(onion); !exists {
|
||||
if contact.IsServer() {
|
||||
log.Debugf("Tried to peer with a server...should never happen so logging it to try and trace the path:")
|
||||
debug.PrintStack()
|
||||
return // ABORT
|
||||
}
|
||||
cp.AddContact(onion, onion, model.AuthApproved)
|
||||
}
|
||||
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
|
||||
|
@ -337,15 +343,16 @@ func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
|
|||
}
|
||||
|
||||
// JoinServer manages a new server connection with the given onion address
|
||||
func (cp *cwtchPeer) JoinServer(onion string) {
|
||||
func (cp *cwtchPeer) JoinServer(onion string) error {
|
||||
if cp.GetContact(onion) != nil {
|
||||
tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass))
|
||||
tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion))
|
||||
if yExists && onionExists {
|
||||
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion}))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// TODO HANDLE ERROR
|
||||
return errors.New("no keys found for server connection")
|
||||
}
|
||||
|
||||
// SendMessageToGroup attempts to sent the given message to the given group id.
|
||||
|
@ -436,23 +443,11 @@ func (cp *cwtchPeer) Listen() {
|
|||
// StartGroupConnections attempts to connect to all group servers (thus initiating reconnect attempts in the conectionsmanager)
|
||||
func (cp *cwtchPeer) StartPeersConnections() {
|
||||
for _, contact := range cp.GetContacts() {
|
||||
cp.PeerWithOnion(contact)
|
||||
}
|
||||
}
|
||||
|
||||
// StartPeerConnections attempts to connect to all peers (thus initiating reconnect attempts in the conectionsmanager)
|
||||
func (cp *cwtchPeer) StartGroupConnections() {
|
||||
joinedServers := map[string]bool{}
|
||||
for _, groupID := range cp.GetGroups() {
|
||||
// Only send a join server packet if we haven't joined this server yet...
|
||||
group := cp.GetGroup(groupID)
|
||||
cp.mutex.Lock()
|
||||
if joined := joinedServers[groupID]; group.Accepted && !joined {
|
||||
log.Infof("Join Server %v (%v)\n", group.GroupServer, joined)
|
||||
cp.JoinServer(group.GroupServer)
|
||||
joinedServers[group.GroupServer] = true
|
||||
if cp.GetContact(contact).IsServer() {
|
||||
cp.JoinServer(contact)
|
||||
} else {
|
||||
cp.PeerWithOnion(contact)
|
||||
}
|
||||
cp.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -611,7 +606,18 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
}
|
||||
case event.NewGroupInvite:
|
||||
cp.mutex.Lock()
|
||||
cp.Profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
|
||||
group, err := cp.Profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
|
||||
if err == nil {
|
||||
if ev.Data[event.Imported] == "true" {
|
||||
cp.Profile.GetGroup(group).Accepted = true
|
||||
cp.mutex.Unlock() // TODO...seriously need a better way of handling these cases
|
||||
err = cp.JoinServer(cp.Profile.GetGroup(group).GroupServer)
|
||||
cp.mutex.Lock()
|
||||
if err != nil {
|
||||
log.Errorf("Joining Server should have worked %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
cp.mutex.Unlock()
|
||||
case event.PeerStateChange:
|
||||
cp.mutex.Lock()
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
torProdider "git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
torProvider "git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"github.com/gtank/ristretto255"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
|
@ -113,13 +113,14 @@ func (e *engine) eventHandler() {
|
|||
case event.StatusRequest:
|
||||
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
|
||||
case event.PeerRequest:
|
||||
if torProdider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
||||
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
||||
go e.peerWithOnion(ev.Data[event.RemotePeer])
|
||||
}
|
||||
case event.RetryPeerRequest:
|
||||
// This event allows engine to treat (automated) retry peering requests differently to user-specified
|
||||
// peer events
|
||||
if torProdider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
||||
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
||||
log.Debugf("Retrying Peer Request: %v", ev.Data[event.RemotePeer])
|
||||
go e.peerWithOnion(ev.Data[event.RemotePeer])
|
||||
}
|
||||
case event.InvitePeerToGroup:
|
||||
|
@ -134,7 +135,10 @@ func (e *engine) eventHandler() {
|
|||
case event.DeleteGroup:
|
||||
// TODO: There isn't a way here to determine if other Groups are using a server connection...
|
||||
case event.SendMessageToGroup:
|
||||
e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
|
||||
err := e.sendMessageToGroup(ev.Data[event.GroupServer], []byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
|
||||
if err != nil {
|
||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error()}))
|
||||
}
|
||||
case event.SendMessageToPeer:
|
||||
// TODO: remove this passthrough once the UI is integrated.
|
||||
context, ok := ev.Data[event.EventContext]
|
||||
|
@ -223,6 +227,7 @@ func (e *engine) Shutdown() {
|
|||
// peerWithOnion is the entry point for cwtchPeer relationships
|
||||
// needs to be run in a goroutine as will block on Open.
|
||||
func (e *engine) peerWithOnion(onion string) {
|
||||
log.Debugf("Called PeerWithOnion for %v", onion)
|
||||
if !e.isBlocked(onion) {
|
||||
e.ignoreOnShutdown(e.peerConnecting)(onion)
|
||||
connected, err := e.service.Connect(onion, e.createPeerTemplate())
|
||||
|
@ -249,6 +254,7 @@ func (e *engine) peerWithOnion(onion string) {
|
|||
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
|
||||
// needs to be run in a goroutine as will block on Open.
|
||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string) {
|
||||
log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion)
|
||||
e.ignoreOnShutdown(e.serverConnecting)(onion)
|
||||
|
||||
// Create a new ephemeral service for this connection
|
||||
|
@ -415,8 +421,15 @@ func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) error
|
|||
if err == nil {
|
||||
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||
if ok {
|
||||
attempts := 0
|
||||
for tokenApp.Post(ct, sig) == false {
|
||||
// TODO This should eventually be wired back into the UI to allow it to error
|
||||
tokenApp.MakePayment()
|
||||
time.Sleep(time.Second * 5)
|
||||
attempts++
|
||||
if attempts == 5 {
|
||||
return errors.New("failed to post to token board")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -145,6 +145,7 @@ func (ta *TokenBoardClient) Post(ct []byte, sig []byte) bool {
|
|||
|
||||
// MakePayment uses the PoW based token protocol to obtain more tokens
|
||||
func (ta *TokenBoardClient) MakePayment() {
|
||||
log.Debugf("Making a Payment %v", ta)
|
||||
id, sk := primitives.InitializeEphemeralIdentity()
|
||||
var client tapir.Service
|
||||
client = new(tor.BaseOnionService)
|
||||
|
@ -164,7 +165,7 @@ func (ta *TokenBoardClient) MakePayment() {
|
|||
conn.Close()
|
||||
return
|
||||
}
|
||||
log.Debugf("Error making payment: %v", err)
|
||||
log.Debugf("Error making payment: to %v %v", ta.tokenServiceOnion, err)
|
||||
}
|
||||
|
||||
// NextToken retrieves the next token
|
||||
|
|
|
@ -1,7 +1,12 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/model"
|
||||
cwtchserver "cwtch.im/cwtch/server"
|
||||
"cwtch.im/tapir/primitives"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"os"
|
||||
|
@ -16,6 +21,24 @@ func main() {
|
|||
log.AddEverythingFromPattern("server/server")
|
||||
configDir := os.Getenv("CWTCH_CONFIG_DIR")
|
||||
|
||||
if len(os.Args) == 2 && os.Args[1] == "gen1" {
|
||||
config := new(cwtchserver.Config)
|
||||
id, pk := primitives.InitializeEphemeralIdentity()
|
||||
tid, tpk := primitives.InitializeEphemeralIdentity()
|
||||
config.PrivateKey = pk
|
||||
config.PublicKey = id.PublicKey()
|
||||
config.TokenServerPrivateKey = tpk
|
||||
config.TokenServerPublicKey = tid.PublicKey()
|
||||
config.MaxBufferLines = 100000
|
||||
config.ServerReporting = cwtchserver.Reporting{
|
||||
LogMetricsToFile: true,
|
||||
ReportingGroupID: "",
|
||||
ReportingServerAddr: "",
|
||||
}
|
||||
config.Save(".", "serverConfig.json")
|
||||
return
|
||||
}
|
||||
|
||||
serverConfig := cwtchserver.LoadConfig(configDir, serverConfigFile)
|
||||
|
||||
acn, err := tor.NewTorACNWithAuth(".", "", 9051, tor.HashedPasswordAuthenticator{Password: "examplehashedpassword"})
|
||||
|
@ -31,5 +54,17 @@ func main() {
|
|||
// TODO load params from .cwtch/server.conf or command line flag
|
||||
// TODO: respond to HUP so t.Close is gracefully called
|
||||
server.Setup(serverConfig)
|
||||
|
||||
// TODO create a random group for testing
|
||||
group, _ := model.NewGroup(tor.GetTorV3Hostname(serverConfig.PublicKey))
|
||||
group.SignGroup([]byte{})
|
||||
invite, err := group.Invite([]byte{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Printf("%v", "torv3"+base64.StdEncoding.EncodeToString(invite))
|
||||
|
||||
bundle, _ := json.Marshal(server.KeyBundle())
|
||||
log.Infof("Server Config: server:%s", base64.StdEncoding.EncodeToString(bundle))
|
||||
server.Run(acn)
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"cwtch.im/tapir"
|
||||
"cwtch.im/tapir/applications"
|
||||
tor2 "cwtch.im/tapir/networks/tor"
|
||||
"cwtch.im/tapir/persistence"
|
||||
"cwtch.im/tapir/primitives"
|
||||
"cwtch.im/tapir/primitives/privacypass"
|
||||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
|
@ -32,8 +33,11 @@ type Server struct {
|
|||
// Setup initialized a server from a given configuration
|
||||
func (s *Server) Setup(serverConfig Config) {
|
||||
s.config = serverConfig
|
||||
s.tokenServer = privacypass.NewTokenServer()
|
||||
s.tokenService, s.tokenServicePrivKey = primitives.InitializeEphemeralIdentity()
|
||||
bs := new(persistence.BoltPersistence)
|
||||
bs.Open("./tokens.db")
|
||||
s.tokenServer = privacypass.NewTokenServerFromStore(bs)
|
||||
s.tokenService = s.config.TokenServiceIdentity()
|
||||
s.tokenServicePrivKey = s.config.TokenServerPrivateKey
|
||||
}
|
||||
|
||||
// Run starts a server with the given privateKey
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"cwtch.im/tapir/primitives"
|
||||
"encoding/json"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"github.com/gtank/ristretto255"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
|
@ -20,20 +19,25 @@ type Reporting struct {
|
|||
|
||||
// Config is a struct for storing basic server configuration
|
||||
type Config struct {
|
||||
ConfigDir string `json:"-"`
|
||||
MaxBufferLines int `json:"maxBufferLines"`
|
||||
PublicKey ed25519.PublicKey `json:"publicKey"`
|
||||
PrivateKey ed25519.PrivateKey `json:"privateKey"`
|
||||
PrivacyPassPublicKey ristretto255.Element `json:"privacyPassPublicKey"`
|
||||
PrivacyPassPrivateKey ristretto255.Scalar `json:"privacyPassPrivateKey"`
|
||||
ServerReporting Reporting `json:"serverReporting"`
|
||||
ConfigDir string `json:"-"`
|
||||
MaxBufferLines int `json:"maxBufferLines"`
|
||||
PublicKey ed25519.PublicKey `json:"publicKey"`
|
||||
PrivateKey ed25519.PrivateKey `json:"privateKey"`
|
||||
TokenServerPublicKey ed25519.PublicKey `json:"tokenServerPublicKey"`
|
||||
TokenServerPrivateKey ed25519.PrivateKey `json:"tokenServerPrivateKey"`
|
||||
ServerReporting Reporting `json:"serverReporting"`
|
||||
}
|
||||
|
||||
// Identity returns an encapsulation of the servers keys for running ricochet
|
||||
// Identity returns an encapsulation of the servers keys
|
||||
func (config *Config) Identity() primitives.Identity {
|
||||
return primitives.InitializeIdentity("", &config.PrivateKey, &config.PublicKey)
|
||||
}
|
||||
|
||||
// TokenServiceIdentity returns an encapsulation of the servers token server (experimental)
|
||||
func (config *Config) TokenServiceIdentity() primitives.Identity {
|
||||
return primitives.InitializeIdentity("", &config.TokenServerPrivateKey, &config.TokenServerPublicKey)
|
||||
}
|
||||
|
||||
// Save dumps the latest version of the config to a json file given by filename
|
||||
func (config *Config) Save(dir, filename string) {
|
||||
log.Infof("Saving config to %s\n", path.Join(dir, filename))
|
||||
|
|
Loading…
Reference in New Issue