From 0c4bbe9ad131272d794799d5a850a6204817506b Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Wed, 15 May 2019 13:12:11 -0700 Subject: [PATCH] Refactor: engine and peer decoupled, engine and eventbus now per peer and stored top level in app. Storage has read only mode. Peer and group state now event based and stored in profiles. --- app/app.go | 83 ++++++++++++----- app/bots/servermon/main.go | 12 +-- app/cli/main.go | 23 +++-- event/common.go | 10 +++ event/eventmanager.go | 5 +- model/group.go | 1 + model/profile.go | 13 +++ peer/cwtch_peer.go | 82 ++++++++++------- protocol/connections/connectionsmanager.go | 27 +----- protocol/connections/engine.go | 20 ++--- protocol/connections/peerpeerconnection.go | 19 ++-- protocol/connections/peerserverconnection.go | 32 ++++--- .../connections/peerserverconnection_test.go | 7 +- protocol/connections/state.go | 6 +- storage/profile_store.go | 27 ++++-- storage/profile_store_test.go | 4 +- testing/cwtch_peer_server_integration_test.go | 90 +++++++------------ 17 files changed, 264 insertions(+), 197 deletions(-) diff --git a/app/app.go b/app/app.go index a477e6b..dbe0177 100644 --- a/app/app.go +++ b/app/app.go @@ -3,8 +3,10 @@ package app import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/storage" "fmt" + "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" @@ -16,12 +18,13 @@ import ( type application struct { peers map[string]peer.CwtchPeer + storage map[string]storage.ProfileStore + engines map[string]connections.Engine + eventBuses map[string]*event.Manager acn connectivity.ACN directory string mutex sync.Mutex primaryonion string - storage map[string]storage.ProfileStore - eventBus *event.Manager } // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers @@ -32,20 +35,18 @@ type Application interface { PrimaryIdentity() peer.CwtchPeer GetPeer(onion string) peer.CwtchPeer ListPeers() map[string]string + GetEventBus(onion string) *event.Manager LaunchPeers() - EventBus() *event.Manager - + ShutdownPeer(string) Shutdown() } // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string) Application { log.Debugf("NewApp(%v)\n", appDirectory) - app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), directory: appDirectory, acn: acn} + app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), eventBuses: make(map[string]*event.Manager), directory: appDirectory, acn: acn} os.Mkdir(path.Join(app.directory, "profiles"), 0700) - app.eventBus = new(event.Manager) - app.eventBus.Initialize() return app } @@ -53,19 +54,31 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application { func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer, error) { log.Debugf("CreatePeer(%v)\n", name) + eventBus := new(event.Manager) + eventBus.Initialize() + profile := storage.NewProfile(name) - profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", profile.LocalID), password, profile) + profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", profile.LocalID), password, profile) pc := profileStore.GetProfileCopy() p := peer.FromProfile(pc) - p.Init(app.acn, app.eventBus) _, exists := app.peers[p.GetProfile().Onion] if exists { - p.Shutdown() + profileStore.Shutdown() + eventBus.Shutdown() return nil, fmt.Errorf("Error: profile for onion %v already exists", p.GetProfile().Onion) } + p.Init(app.acn, eventBus) + + blockedPeers := profile.BlockedPeers() + // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key. + identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) + engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers) + app.mutex.Lock() app.peers[p.GetProfile().Onion] = p app.storage[p.GetProfile().Onion] = profileStore + app.engines[p.GetProfile().Onion] = engine + app.eventBuses[p.GetProfile().Onion] = eventBus app.mutex.Unlock() return p, nil @@ -79,8 +92,10 @@ func (app *application) LoadProfiles(password string) error { for _, file := range files { - // TODO: Per profile eventBus - profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", file.Name()), password, nil) + eventBus := new(event.Manager) + eventBus.Initialize() + + profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", file.Name()), password, nil) err = profileStore.Load() if err != nil { continue @@ -91,16 +106,23 @@ func (app *application) LoadProfiles(password string) error { _, exists := app.peers[profile.Onion] if exists { profileStore.Shutdown() + eventBus.Shutdown() log.Errorf("profile for onion %v already exists", profile.Onion) continue } peer := peer.FromProfile(profile) - peer.Init(app.acn, app.eventBus) + peer.Init(app.acn, eventBus) + + blockedPeers := profile.BlockedPeers() + identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) + engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers) app.mutex.Lock() app.peers[profile.Onion] = peer app.storage[profile.Onion] = profileStore + app.engines[profile.Onion] = engine + app.eventBuses[profile.Onion] = eventBus if app.primaryonion == "" { app.primaryonion = profile.Onion } @@ -109,10 +131,13 @@ func (app *application) LoadProfiles(password string) error { return nil } +// LaunchPeers starts each peer Listening and connecting to peers and groups func (app *application) LaunchPeers() { for _, p := range app.peers { if !p.IsStarted() { p.Listen() + p.StartPeersConnections() + p.StartGroupConnections() } } } @@ -139,20 +164,34 @@ func (app *application) GetPeer(onion string) peer.CwtchPeer { return nil } -/* -// GetTorStatus returns tor control port bootstrap-phase status info in a map -func (app *application) GetTorStatus() (map[string]string, error) { - return app.torManager.GetStatus() -}*/ +// GetEventBus returns a cwtchPeer's event bus +func (app *application) GetEventBus(onion string) *event.Manager { + if manager, ok := app.eventBuses[onion]; ok { + return manager + } + return nil +} -// Fetch the app's event manager -func (app *application) EventBus() *event.Manager { - return app.eventBus +// ShutdownPeer shuts down a peer and removes it from the app's management +func (app *application) ShutdownPeer(onion string) { + app.mutex.Lock() + defer app.mutex.Unlock() + app.eventBuses[onion].Shutdown() + delete(app.eventBuses, onion) + app.peers[onion].Shutdown() + delete(app.peers, onion) + app.engines[onion].Shutdown() + delete(app.engines, onion) + app.storage[onion].Shutdown() + delete(app.storage, onion) } // Shutdown shutsdown all peers of an app and then the tormanager func (app *application) Shutdown() { - for _, peer := range app.peers { + for id, peer := range app.peers { peer.Shutdown() + app.engines[id].Shutdown() + app.storage[id].Shutdown() + app.eventBuses[id].Shutdown() } } diff --git a/app/bots/servermon/main.go b/app/bots/servermon/main.go index a8e4252..e82d2e5 100644 --- a/app/bots/servermon/main.go +++ b/app/bots/servermon/main.go @@ -11,16 +11,16 @@ import ( "time" ) -func waitForPeerServerConnection(peer peer.CwtchPeer, server string) error { +func waitForPeerGroupConnection(peer peer.CwtchPeer, groupID string) error { for { - servers := peer.GetServers() - state, ok := servers[server] + _, ok := peer.GetProfile().Groups[groupID] if ok { + state := peer.GetGroupState(groupID) if state == connections.FAILED { - return errors.New("Connection to server " + server + " failed!") + return errors.New("Connection to group " + groupID + " failed!") } if state != connections.AUTHENTICATED { - fmt.Printf("peer %v waiting to authenticate with server %v, current state: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state]) + fmt.Printf("peer %v waiting to authenticate with group %v 's server, current state: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state]) time.Sleep(time.Second * 10) continue } @@ -57,7 +57,7 @@ func main() { os.Exit(1) } - err = waitForPeerServerConnection(botPeer, serverAddr) + err = waitForPeerGroupConnection(botPeer, groupID) if err != nil { fmt.Printf("Could not connect to server %v: %v\n", serverAddr, err) os.Exit(1) diff --git a/app/cli/main.go b/app/cli/main.go index a0cbc7c..3fed1d5 100644 --- a/app/cli/main.go +++ b/app/cli/main.go @@ -6,7 +6,6 @@ import ( "bytes" "cwtch.im/cwtch/model" - "cwtch.im/cwtch/protocol/connections" "fmt" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" @@ -45,8 +44,8 @@ var suggestionsSelectedProfile = []prompt.Suggest{ {Text: "/invite", Description: "invite a new contact"}, {Text: "/invite-to-group", Description: "invite an existing contact to join an existing group"}, {Text: "/accept-invite", Description: "accept the invite of a group"}, - {Text: "/list-servers", Description: "retrieve a list of servers and their connection status"}, - {Text: "/list-peers", Description: "retrieve a list of peers and their connection status"}, + /*{Text: "/list-servers", Description: "retrieve a list of servers and their connection status"}, + {Text: "/list-peers", Description: "retrieve a list of peers and their connection status"},*/ {Text: "/export-group", Description: "export a group invite: prints as a string"}, {Text: "/trust", Description: "trust a peer"}, {Text: "/block", Description: "block a peer - you will no longer see messages or connect to this peer"}, @@ -55,13 +54,13 @@ var suggestionsSelectedProfile = []prompt.Suggest{ var suggestions = suggestionsBase var usages = map[string]string{ - "/new-profile": "/new-profile [name]", - "/load-profiles": "/load-profiles", - "/list-profiles": "", - "/select-profile": "/select-profile [onion]", - "/quit": "", - "/list-servers": "", - "/list-peers": "", + "/new-profile": "/new-profile [name]", + "/load-profiles": "/load-profiles", + "/list-profiles": "", + "/select-profile": "/select-profile [onion]", + "/quit": "", + /* "/list-servers": "", + "/list-peers": "",*/ "/list-contacts": "", "/list-groups": "", "/select-group": "/select-group [groupid]", @@ -424,7 +423,7 @@ func main() { } else { fmt.Printf("Error inviting peer, usage: %s\n", usages[commands[0]]) } - case "/list-peers": + /*case "/list-peers": peers := peer.GetPeers() for p, s := range peers { fmt.Printf("Name: %v Status: %v\n", p, connections.ConnectionStateName[s]) @@ -433,7 +432,7 @@ func main() { servers := peer.GetServers() for s, st := range servers { fmt.Printf("Name: %v Status: %v\n", s, connections.ConnectionStateName[st]) - } + }*/ case "/list-contacts": contacts := peer.GetContacts() for _, onion := range contacts { diff --git a/event/common.go b/event/common.go index f8d68e9..7e87e37 100644 --- a/event/common.go +++ b/event/common.go @@ -84,6 +84,14 @@ const ( // Key [eg "nick"] // Data [eg "open privacy board"] SetGroupAttribute = Type("SetGroupAttribute") + + // RemotePeer + // ConnectionState + PeerStateChange = Type("PeerStateChange") + + // GroupServer + // ConnectionState + ServerStateChange = Type("GroupStateChange") ) // Field defines common event attributes @@ -106,6 +114,8 @@ const ( ProfileName = Field("ProfileName") + ConnectionState = Field("ConnectionState") + Key = Field("Key") Data = Field("Data") diff --git a/event/eventmanager.go b/event/eventmanager.go index 58b1462..3334226 100644 --- a/event/eventmanager.go +++ b/event/eventmanager.go @@ -24,6 +24,7 @@ type Manager struct { events chan Event mapMutex sync.Mutex internal chan bool + closed bool } // Initialize sets up the Manager. @@ -31,6 +32,7 @@ func (em *Manager) Initialize() { em.subscribers = make(map[Type][]chan Event) em.events = make(chan Event) em.internal = make(chan bool) + em.closed = false go em.eventBus() } @@ -44,7 +46,7 @@ func (em *Manager) Subscribe(eventType Type, eventChannel chan Event) { // Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers func (em *Manager) Publish(event Event) { - if event.EventType != "" { + if event.EventType != "" && em.closed != true { em.events <- event } } @@ -83,6 +85,7 @@ func (em *Manager) eventBus() { // Shutdown triggers, and waits for, the internal eventBus goroutine to finish func (em *Manager) Shutdown() { em.events <- Event{} + em.closed = true // wait for eventBus to finish <-em.internal close(em.events) diff --git a/model/group.go b/model/group.go index f91ec63..6d7d7ac 100644 --- a/model/group.go +++ b/model/group.go @@ -29,6 +29,7 @@ type Group struct { Attributes map[string]string lock sync.Mutex LocalID string + State string `json:"-"` unacknowledgedMessages []Message } diff --git a/model/profile.go b/model/profile.go index 661adb5..b3109a0 100644 --- a/model/profile.go +++ b/model/profile.go @@ -27,6 +27,7 @@ type PublicProfile struct { Attributes map[string]string Timeline Timeline LocalID string // used by storage engine + State string `json:"-"` lock sync.Mutex } @@ -186,6 +187,18 @@ func (p *Profile) BlockPeer(onion string) (err error) { return } +// BlockedPeers calculates a list of Peers who have been Blocked. +func (p *Profile) BlockedPeers() []string { + blockedPeers := []string{} + for _, contact := range p.GetContacts() { + c, _ := p.GetContact(contact) + if c.Blocked { + blockedPeers = append(blockedPeers, c.Onion) + } + } + return blockedPeers +} + // TrustPeer sets a contact to trusted func (p *Profile) TrustPeer(onion string) (err error) { p.lock.Lock() diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index f2516e5..05475d5 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -5,11 +5,11 @@ import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol/connections" + "encoding/base32" "encoding/base64" "encoding/json" "errors" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" - "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "github.com/golang/protobuf/proto" "strings" @@ -24,7 +24,6 @@ type cwtchPeer struct { shutdown bool started bool - engine connections.Engine queue *event.Queue eventBus *event.Manager } @@ -47,9 +46,7 @@ type CwtchPeer interface { SendMessageToGroupTracked(string, string) (string, error) GetProfile() *model.Profile - - GetPeers() map[string]connections.ConnectionState - GetServers() map[string]connections.ConnectionState + GetPeerState(string) connections.ConnectionState StartGroup(string) (string, []byte, error) @@ -57,13 +54,16 @@ type CwtchPeer interface { ExportGroup(string) (string, error) GetGroup(string) *model.Group + GetGroupState(string) connections.ConnectionState GetGroups() []string - AddContact(nick, onion string, publickey []byte, trusted bool) + AddContact(nick, onion string, trusted bool) GetContacts() []string GetContact(string) *model.PublicProfile IsStarted() bool Listen() + StartPeersConnections() + StartGroupConnections() Shutdown() } @@ -90,19 +90,8 @@ func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus *event.Manager) { cp.eventBus = eventBus cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue.EventChannel) cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue.EventChannel) - - // Calculate a list of Peers who have been Blocked. - blockedPeers := []string{} - for _, contact := range cp.Profile.GetContacts() { - c, _ := cp.Profile.GetContact(contact) - if c.Blocked { - blockedPeers = append(blockedPeers, c.Onion) - } - } - - // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key. - identity := identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey) - cp.engine = connections.NewProtocolEngine(identity, cp.Profile.Ed25519PrivateKey, acn, eventBus, blockedPeers) + cp.eventBus.Subscribe(event.ServerStateChange, cp.queue.EventChannel) + cp.eventBus.Subscribe(event.PeerStateChange, cp.queue.EventChannel) } // ImportGroup intializes a group from an imported source rather than a peer invite @@ -175,8 +164,9 @@ func (cp *cwtchPeer) GetGroup(groupID string) *model.Group { return cp.Profile.GetGroupByGroupID(groupID) } -func (cp *cwtchPeer) AddContact(nick, onion string, publickey []byte, trusted bool) { - pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: publickey, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"nick": nick}} +func (cp *cwtchPeer) AddContact(nick, onion string, trusted bool) { + decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion)) + pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: decodedPub, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"nick": nick}} cp.Profile.AddContact(onion, pp) pd, _ := json.Marshal(pp) cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{ @@ -201,6 +191,14 @@ func (cp *cwtchPeer) GetProfile() *model.Profile { return cp.Profile } +func (cp *cwtchPeer) GetPeerState(onion string) connections.ConnectionState { + return connections.ConnectionStateType[cp.Profile.Contacts[onion].State] +} + +func (cp *cwtchPeer) GetGroupState(groupid string) connections.ConnectionState { + return connections.ConnectionStateType[cp.Profile.Groups[groupid].State] +} + // PeerWithOnion is the entry point for cwtchPeer relationships func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection { cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) @@ -255,16 +253,6 @@ func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { return event.EventID } -// GetPeers returns a list of peer connections. -func (cp *cwtchPeer) GetPeers() map[string]connections.ConnectionState { - return cp.engine.GetPeers() -} - -// GetServers returns a list of server connections -func (cp *cwtchPeer) GetServers() map[string]connections.ConnectionState { - return cp.engine.GetServers() -} - // TrustPeer sets an existing peer relationship to trusted func (cp *cwtchPeer) TrustPeer(peer string) error { err := cp.Profile.TrustPeer(peer) @@ -298,14 +286,34 @@ func (cp *cwtchPeer) RejectInvite(groupID string) { cp.Profile.RejectInvite(groupID) } +// Listen makes the peer open a listening port to accept incoming connections (and be detactably online) func (cp *cwtchPeer) Listen() { cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{})) } +// StartGroupConnections attempts to connect to all group servers (thus initiating reconnect attempts in the conectionsmanager) +func (cp *cwtchPeer) StartPeersConnections() { + for _, contact := range cp.GetContacts() { + cp.PeerWithOnion(contact) + } +} + +// StartPeerConnections attempts to connect to all peers (thus initiating reconnect attempts in the conectionsmanager) +func (cp *cwtchPeer) StartGroupConnections() { + joinedServers := map[string]bool{} + for _, groupID := range cp.GetGroups() { + // Only send a join server packet if we haven't joined this server yet... + group := cp.GetGroup(groupID) + if joined := joinedServers[groupID]; group.Accepted && !joined { + cp.JoinServer(group.GroupServer) + joinedServers[group.GroupServer] = true + } + } +} + // Shutdown kills all connections and cleans up all goroutines for the peer func (cp *cwtchPeer) Shutdown() { cp.shutdown = true - cp.engine.Shutdown() cp.queue.Shutdown() } @@ -328,6 +336,16 @@ func (cp *cwtchPeer) eventHandler() { var groupInvite protocol.GroupChatInvite proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &groupInvite) cp.Profile.ProcessInvite(&groupInvite, ev.Data[event.RemotePeer]) + case event.PeerStateChange: + if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists { + cp.Profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState] + } + case event.ServerStateChange: + for _, group := range cp.Profile.Groups { + if group.GroupServer == ev.Data[event.GroupServer] { + group.State = ev.Data[event.ConnectionState] + } + } default: if ev.EventType != "" { log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) diff --git a/protocol/connections/connectionsmanager.go b/protocol/connections/connectionsmanager.go index 27b2b25..bb7730b 100644 --- a/protocol/connections/connectionsmanager.go +++ b/protocol/connections/connectionsmanager.go @@ -42,12 +42,12 @@ func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConn } // ManageServerConnection creates a new ServerConnection for Host with the given callback handler. -func (m *Manager) ManageServerConnection(host string, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) { +func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) { m.lock.Lock() psc, exists := m.serverConnections[host] - newPsc := NewPeerServerConnection(m.acn, host) + newPsc := NewPeerServerConnection(engine, host) newPsc.GroupMessageHandler = messageHandler newPsc.CloseHandler = closedHandler go newPsc.Run() @@ -60,28 +60,6 @@ func (m *Manager) ManageServerConnection(host string, messageHandler func(string m.lock.Unlock() } -// GetPeers returns a map of all peer connections with their state -func (m *Manager) GetPeers() map[string]ConnectionState { - rm := make(map[string]ConnectionState) - m.lock.Lock() - for onion, ppc := range m.peerConnections { - rm[onion] = ppc.GetState() - } - m.lock.Unlock() - return rm -} - -// GetServers returns a map of all server connections with their state. -func (m *Manager) GetServers() map[string]ConnectionState { - rm := make(map[string]ConnectionState) - m.lock.Lock() - for onion, psc := range m.serverConnections { - rm[onion] = psc.GetState() - } - m.lock.Unlock() - return rm -} - // GetPeerPeerConnectionForOnion safely returns a given peer connection func (m *Manager) GetPeerPeerConnectionForOnion(host string) (ppc *PeerPeerConnection) { m.lock.Lock() @@ -103,7 +81,6 @@ func (m *Manager) AttemptReconnections() { maxTimeout := time.Minute * 5 // nearly instant first run, next few runs will prolly be too quick to have any FAILED and will gracefully slow to MAX after that timeout := time.Millisecond * 500 - for { select { case <-time.After(timeout): diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 1c1be8c..62728fe 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -45,6 +45,7 @@ type engine struct { type Engine interface { Identity() identity.Identity ACN() connectivity.ACN + EventManager() *event.Manager GetPeerHandler(string) *CwtchPeerHandler ContactRequest(string, string) string @@ -52,9 +53,6 @@ type Engine interface { LookupContact(string, rsa.PublicKey) (bool, bool) LookupContactV3(string, ed25519.PublicKey) (bool, bool) - GetPeers() map[string]ConnectionState - GetServers() map[string]ConnectionState - Shutdown() } @@ -94,6 +92,10 @@ func (e *engine) Identity() identity.Identity { return e.identity } +func (e *engine) EventManager() *event.Manager { + return e.eventManager +} + // eventHandler process events from other subsystems func (e *engine) eventHandler() { for { @@ -244,7 +246,7 @@ func (e *engine) finishedFetch(server string) { // joinServer manages a new server connection with the given onion address func (e *engine) joinServer(onion string) { - e.connectionsManager.ManageServerConnection(onion, e.receiveGroupMessage, e.finishedFetch) + e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage, e.finishedFetch) } // sendMessageToGroup attempts to sent the given message to the given group id. @@ -264,16 +266,6 @@ func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) { } } -// GetPeers returns a list of peer connections. -func (e *engine) GetPeers() map[string]ConnectionState { - return e.connectionsManager.GetPeers() -} - -// GetServers returns a list of server connections -func (e *engine) GetServers() map[string]ConnectionState { - return e.connectionsManager.GetServers() -} - // CwtchPeerInstance encapsulates incoming peer connections type CwtchPeerInstance struct { rai *application.Instance diff --git a/protocol/connections/peerpeerconnection.go b/protocol/connections/peerpeerconnection.go index 3644fb5..684ef6d 100644 --- a/protocol/connections/peerpeerconnection.go +++ b/protocol/connections/peerpeerconnection.go @@ -1,6 +1,7 @@ package connections import ( + "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol/connections/peer" "errors" "git.openprivacy.ca/openprivacy/libricochet-go" @@ -28,6 +29,14 @@ func NewPeerPeerConnection(peerhostname string, protocolEngine Engine) *PeerPeer return ppc } +func (ppc *PeerPeerConnection) setState(state ConnectionState) { + ppc.state = state + ppc.protocolEngine.EventManager().Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ + event.RemotePeer: string(ppc.PeerHostname), + event.ConnectionState: ConnectionStateName[state], + })) +} + // GetState returns the current connection state func (ppc *PeerPeerConnection) GetState() ConnectionState { return ppc.state @@ -78,14 +87,14 @@ func (ppc *PeerPeerConnection) WaitTilAuthenticated() { // Run manages the setup and teardown of a peer->peer connection func (ppc *PeerPeerConnection) Run() error { - ppc.state = CONNECTING + ppc.setState(CONNECTING) rc, err := goricochet.Open(ppc.protocolEngine.ACN(), ppc.PeerHostname) if err == nil { ppc.connection = rc - ppc.state = CONNECTED + ppc.setState(CONNECTED) _, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity()) if err == nil { - ppc.state = AUTHENTICATED + ppc.setState(AUTHENTICATED) go func() { ppc.connection.Do(func() error { ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)}) @@ -101,13 +110,13 @@ func (ppc *PeerPeerConnection) Run() error { ppc.connection.Process(ppc) } } - ppc.state = FAILED + ppc.setState(FAILED) return err } // Close closes the connection func (ppc *PeerPeerConnection) Close() { - ppc.state = KILLED + ppc.setState(KILLED) if ppc.connection != nil { ppc.connection.Close() } diff --git a/protocol/connections/peerserverconnection.go b/protocol/connections/peerserverconnection.go index 3b3094b..6325820 100644 --- a/protocol/connections/peerserverconnection.go +++ b/protocol/connections/peerserverconnection.go @@ -2,6 +2,7 @@ package connections import ( "crypto/rand" + "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol/connections/fetch" "cwtch.im/cwtch/protocol/connections/listen" @@ -10,7 +11,6 @@ import ( "git.openprivacy.ca/openprivacy/libricochet-go" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/connection" - "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "golang.org/x/crypto/ed25519" @@ -20,19 +20,19 @@ import ( // PeerServerConnection encapsulates a single Peer->Server connection type PeerServerConnection struct { connection.AutoConnectionHandler - Server string - state ConnectionState - connection *connection.Connection - acn connectivity.ACN + Server string + state ConnectionState + connection *connection.Connection + protocolEngine Engine GroupMessageHandler func(string, *protocol.GroupMessage) CloseHandler func(string) } // NewPeerServerConnection creates a new Peer->Server outbound connection -func NewPeerServerConnection(acn connectivity.ACN, serverhostname string) *PeerServerConnection { +func NewPeerServerConnection(engine Engine, serverhostname string) *PeerServerConnection { psc := new(PeerServerConnection) - psc.acn = acn + psc.protocolEngine = engine psc.Server = serverhostname psc.Init() return psc @@ -43,6 +43,14 @@ func (psc *PeerServerConnection) GetState() ConnectionState { return psc.state } +func (psc *PeerServerConnection) setState(state ConnectionState) { + psc.state = state + psc.protocolEngine.EventManager().Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ + event.GroupServer: string(psc.Server), + event.ConnectionState: ConnectionStateName[state], + })) +} + // WaitTilAuthenticated waits until the underlying connection is authenticated func (psc *PeerServerConnection) WaitTilAuthenticated() { for { @@ -56,15 +64,15 @@ func (psc *PeerServerConnection) WaitTilAuthenticated() { // Run manages the setup and teardown of a peer server connection func (psc *PeerServerConnection) Run() error { log.Infof("Connecting to %v", psc.Server) - rc, err := goricochet.Open(psc.acn, psc.Server) + rc, err := goricochet.Open(psc.protocolEngine.ACN(), psc.Server) if err == nil { psc.connection = rc - psc.state = CONNECTED + psc.setState(CONNECTED) pub, priv, err := ed25519.GenerateKey(rand.Reader) if err == nil { _, err := connection.HandleOutboundConnection(psc.connection).ProcessAuthAsV3Client(identity.InitializeV3("cwtchpeer", &priv, &pub)) if err == nil { - psc.state = AUTHENTICATED + psc.setState(AUTHENTICATED) go func() { psc.connection.Do(func() error { @@ -81,7 +89,7 @@ func (psc *PeerServerConnection) Run() error { } } } - psc.state = FAILED + psc.setState(FAILED) return err } @@ -128,7 +136,7 @@ func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) err // Close shuts down the connection (freeing the handler goroutines) func (psc *PeerServerConnection) Close() { - psc.state = KILLED + psc.setState(KILLED) if psc.connection != nil { psc.connection.Close() } diff --git a/protocol/connections/peerserverconnection_test.go b/protocol/connections/peerserverconnection_test.go index d86d444..13b9fe6 100644 --- a/protocol/connections/peerserverconnection_test.go +++ b/protocol/connections/peerserverconnection_test.go @@ -2,6 +2,7 @@ package connections import ( "crypto/rand" + "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/server/fetch" "cwtch.im/cwtch/server/send" @@ -75,7 +76,11 @@ func TestPeerServerConnection(t *testing.T) { <-listenChan onionAddr := identity.Hostname() - psc := NewPeerServerConnection(connectivity.LocalProvider(), "127.0.0.1:5451|"+onionAddr) + manager := &event.Manager{} + manager.Initialize() + engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil) + + psc := NewPeerServerConnection(engine, "127.0.0.1:5451|"+onionAddr) numcalls := 0 psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) { numcalls++ diff --git a/protocol/connections/state.go b/protocol/connections/state.go index 79e1594..f8a6d9c 100644 --- a/protocol/connections/state.go +++ b/protocol/connections/state.go @@ -7,7 +7,7 @@ type ConnectionState int // DISCONNECTED - No existing connection has been made, or all attempts have failed // CONNECTING - We are in the process of attempting to connect to a given endpoint // CONNECTED - We have connected but not yet authenticated -// AUTHENTICATED - im.ricochet.auth-hidden-server has succeeded on thec onnection. +// AUTHENTICATED - im.ricochet.auth-hidden-server has succeeded on the connection. const ( DISCONNECTED ConnectionState = iota CONNECTING @@ -20,4 +20,8 @@ const ( var ( // ConnectionStateName allows conversion of states to their string representations ConnectionStateName = []string{"Disconnected", "Connecting", "Connected", "Authenticated", "Failed", "Killed"} + + // ConnectionStateType allows conversion of strings to their state type + ConnectionStateType = map[string]ConnectionState{"Disconnected": DISCONNECTED, "Connecting": CONNECTING, + "Connected": CONNECTED, "Authenticated": AUTHENTICATED, "Failed": FAILED, "Killed": KILLED} ) diff --git a/storage/profile_store.go b/storage/profile_store.go index ba09624..0247be1 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -21,6 +21,7 @@ type profileStore struct { profile *model.Profile eventManager *event.Manager queue *event.Queue + writer bool } // ProfileStore is an interface to managing the storage of Cwtch Profiles @@ -30,11 +31,11 @@ type ProfileStore interface { GetProfileCopy() *model.Profile } -// NewProfileStore returns a profile store backed by a filestore listening for events and saving them +// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them // directory should be $appDir/profiles/$rand -func NewProfileStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore { +func NewProfileWriterStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore { os.Mkdir(directory, 0700) - ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}} + ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true} ps.queue = event.NewEventQueue(100) if profile != nil { ps.save() @@ -55,6 +56,15 @@ func NewProfileStore(eventManager *event.Manager, directory, password string, pr return ps } +// NewProfileReaderStore returns a profile store backed by a filestore +// directory should be $appDir/profiles/$rand +func NewProfileReaderStore(directory, password string, profile *model.Profile) ProfileStore { + os.Mkdir(directory, 0700) + ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true} + + return ps +} + // NewProfile creates a new profile for use in the profile store. func NewProfile(name string) *model.Profile { profile := model.GenerateNewProfile(name) @@ -62,8 +72,11 @@ func NewProfile(name string) *model.Profile { } func (ps *profileStore) save() error { - bytes, _ := json.Marshal(ps.profile) - return ps.fs.Save(bytes) + if ps.writer { + bytes, _ := json.Marshal(ps.profile) + return ps.fs.Save(bytes) + } + return nil } // Load instantiates a cwtchPeer from the file store @@ -181,5 +194,7 @@ func (ps *profileStore) eventHandler() { } func (ps *profileStore) Shutdown() { - ps.queue.Shutdown() + if ps.queue != nil { + ps.queue.Shutdown() + } } diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go index 8dc17f8..3466b8b 100644 --- a/storage/profile_store_test.go +++ b/storage/profile_store_test.go @@ -20,7 +20,7 @@ func TestProfileStoreWriteRead(t *testing.T) { eventBus := new(event.Manager) eventBus.Initialize() profile := NewProfile(testProfileName) - ps1 := NewProfileStore(eventBus, testingDir, password, profile) + ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile) eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal})) time.Sleep(1 * time.Second) @@ -50,7 +50,7 @@ func TestProfileStoreWriteRead(t *testing.T) { ps1.Shutdown() - ps2 := NewProfileStore(eventBus, testingDir, password, nil) + ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil) err = ps2.Load() if err != nil { t.Errorf("Error createing profileStore: %v\n", err) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index b49b7a5..bdcbe44 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -1,7 +1,7 @@ package testing import ( - "cwtch.im/cwtch/event" + app2 "cwtch.im/cwtch/app" "cwtch.im/cwtch/model" "cwtch.im/cwtch/peer" "cwtch.im/cwtch/protocol/connections" @@ -55,16 +55,17 @@ func serverCheck(t *testing.T, serverAddr string) bool { return true } -func waitForPeerServerConnection(t *testing.T, peer peer.CwtchPeer, server string) { +func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) { for { - servers := peer.GetServers() - state, ok := servers[server] + _, ok := peer.GetProfile().Groups[groupID] if ok { + state := peer.GetGroupState(groupID) + //log.Infof("Waiting for Peer %v to join group %v - state: %v\n", peer.GetProfile().Name, groupID, state) if state == connections.FAILED { - t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, server) + t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, groupID) } if state != connections.AUTHENTICATED { - fmt.Printf("peer %v waiting connect to server %v, currently: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state]) + fmt.Printf("peer %v waiting connect to group %v, currently: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state]) time.Sleep(time.Second * 5) continue } else { @@ -78,9 +79,11 @@ func waitForPeerServerConnection(t *testing.T, peer peer.CwtchPeer, server strin func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) { for { - peers := peera.GetPeers() - state, ok := peers[peerb.GetProfile().Onion] + //peers := peera.GetPeers() + _, ok := peera.GetProfile().Contacts[peerb.GetProfile().Onion] if ok { + state := peera.GetPeerState(peerb.GetProfile().Onion) + //log.Infof("Waiting for Peer %v to peer with peer: %v - state: %v\n", peera.GetProfile().Name, peerb.GetProfile().Name, state) if state == connections.FAILED { t.Fatalf("%v could not connect to %v", peera.GetProfile().Onion, peerb.GetProfile().Onion) } @@ -132,35 +135,33 @@ func TestCwtchPeerIntegration(t *testing.T) { numGoRoutinesPostServer := runtime.NumGoroutine() + app := app2.NewApp(acn, "./storage") + // ***** cwtchPeer setup ***** // It's important that each Peer have their own EventBus - aliceEventBus := new(event.Manager) + /*aliceEventBus := new(event.Manager) aliceEventBus.Initialize() bobEventBus := new(event.Manager) bobEventBus.Initialize() carolEventBus := new(event.Manager) - carolEventBus.Initialize() + carolEventBus.Initialize()*/ fmt.Println("Creating Alice...") - alice := peer.NewCwtchPeer("Alice") - alice.Init(acn, aliceEventBus) - alice.Listen() + alice, _ := app.CreatePeer("alice", "asdfasdf") fmt.Println("Alice created:", alice.GetProfile().Onion) fmt.Println("Creating Bob...") - bob := peer.NewCwtchPeer("Bob") - bob.Init(acn, bobEventBus) - bob.Listen() + bob, _ := app.CreatePeer("bob", "asdfasdf") fmt.Println("Bob created:", bob.GetProfile().Onion) fmt.Println("Creating Carol...") - carol := peer.NewCwtchPeer("Carol") - carol.Init(acn, carolEventBus) - carol.Listen() + carol, _ := app.CreatePeer("Carol", "asdfasdf") fmt.Println("Carol created:", carol.GetProfile().Onion) - fmt.Println("Waiting for Alice, Bob, and Carol to connection with onion network...") + app.LaunchPeers() + + fmt.Println("Waiting for Alice, Bob, and Carol to connect with onion network...") time.Sleep(time.Second * 90) numGoRoutinesPostPeerStart := runtime.NumGoroutine() @@ -175,8 +176,10 @@ func TestCwtchPeerIntegration(t *testing.T) { } fmt.Println("Alice peering with Bob...") + alice.AddContact("Bob", bob.GetProfile().Onion, false) // Add contact so we can track connection state alice.PeerWithOnion(bob.GetProfile().Onion) fmt.Println("Alice peering with Carol...") + alice.AddContact("Carol", carol.GetProfile().Onion, false) alice.PeerWithOnion(carol.GetProfile().Onion) fmt.Println("Alice joining server...") @@ -185,10 +188,10 @@ func TestCwtchPeerIntegration(t *testing.T) { bob.JoinServer(serverAddr) fmt.Println("Waiting for alice to join server...") - waitForPeerServerConnection(t, alice, serverAddr) + waitForPeerGroupConnection(t, alice, groupID) - fmt.Println("Waiting for bob to join server...") - waitForPeerServerConnection(t, bob, serverAddr) + //fmt.Println("Waiting for bob to join server...") + //waitForPeerGroupConnection(t, bob, groupID) fmt.Println("Waiting for alice and Bob to peer...") waitForPeerPeerConnection(t, alice, bob) @@ -217,35 +220,9 @@ func TestCwtchPeerIntegration(t *testing.T) { numGoRoutinesPostServerConnect := runtime.NumGoroutine() - // ***** Fill up message history of server ****** - - /* - // filler group will be used to fill up the servers message history a bit to stress test fetch later for carol - fillerGroupId, _, err := alice.Profile.StartGroup(serverAddr) - if err != nil { - t.Errorf("Failed to init filler group: %v", err) - return - } - - fmt.Println("Alice filling message history of server...") - for i := 0; i < 100; i++ { - - go func (x int) { - time.Sleep(time.Second * time.Duration(x)) - err := alice.SendMessageToGroup(fillerGroupId, aliceLines[0]) - if err != nil { - fmt.Println("SEND", x, "ERROR:", err) - } else { - fmt.Println("SEND", x, " SUCCESS!") - } - }(i) - } - - time.Sleep(time.Second * 110) - */ // Wait for them to join the server - waitForPeerServerConnection(t, alice, serverAddr) - waitForPeerServerConnection(t, bob, serverAddr) + waitForPeerGroupConnection(t, alice, groupID) + waitForPeerGroupConnection(t, bob, groupID) //numGouRoutinesPostServerConnect := runtime.NumGoroutine() // ***** Conversation ***** @@ -291,14 +268,13 @@ func TestCwtchPeerIntegration(t *testing.T) { } fmt.Println("Shutting down Alice...") - alice.Shutdown() - aliceEventBus.Shutdown() + app.ShutdownPeer(alice.GetProfile().Onion) time.Sleep(time.Second * 5) numGoRoutinesPostAlice := runtime.NumGoroutine() fmt.Println("Carol joining server...") carol.JoinServer(serverAddr) - waitForPeerServerConnection(t, carol, serverAddr) + waitForPeerGroupConnection(t, carol, groupID) numGoRotinesPostCarolConnect := runtime.NumGoroutine() fmt.Println("Bob> ", bobLines[2]) @@ -380,8 +356,7 @@ func TestCwtchPeerIntegration(t *testing.T) { } fmt.Println("Shutting down Bob...") - bob.Shutdown() - bobEventBus.Shutdown() + app.ShutdownPeer(bob.GetProfile().Onion) time.Sleep(time.Second * 3) numGoRoutinesPostBob := runtime.NumGoroutine() if server != nil { @@ -392,8 +367,7 @@ func TestCwtchPeerIntegration(t *testing.T) { numGoRoutinesPostServerShutdown := runtime.NumGoroutine() fmt.Println("Shutting down Carol...") - carol.Shutdown() - carolEventBus.Shutdown() + app.ShutdownPeer(carol.GetProfile().Onion) time.Sleep(time.Second * 3) numGoRoutinesPostCarol := runtime.NumGoroutine()