diff --git a/app/app.go b/app/app.go index a477e6b..dbe0177 100644 --- a/app/app.go +++ b/app/app.go @@ -3,8 +3,10 @@ package app import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/storage" "fmt" + "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" @@ -16,12 +18,13 @@ import ( type application struct { peers map[string]peer.CwtchPeer + storage map[string]storage.ProfileStore + engines map[string]connections.Engine + eventBuses map[string]*event.Manager acn connectivity.ACN directory string mutex sync.Mutex primaryonion string - storage map[string]storage.ProfileStore - eventBus *event.Manager } // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers @@ -32,20 +35,18 @@ type Application interface { PrimaryIdentity() peer.CwtchPeer GetPeer(onion string) peer.CwtchPeer ListPeers() map[string]string + GetEventBus(onion string) *event.Manager LaunchPeers() - EventBus() *event.Manager - + ShutdownPeer(string) Shutdown() } // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string) Application { log.Debugf("NewApp(%v)\n", appDirectory) - app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), directory: appDirectory, acn: acn} + app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), eventBuses: make(map[string]*event.Manager), directory: appDirectory, acn: acn} os.Mkdir(path.Join(app.directory, "profiles"), 0700) - app.eventBus = new(event.Manager) - app.eventBus.Initialize() return app } @@ -53,19 +54,31 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application { func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer, error) { log.Debugf("CreatePeer(%v)\n", name) + eventBus := new(event.Manager) + eventBus.Initialize() + profile := storage.NewProfile(name) - profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", profile.LocalID), password, profile) + profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", profile.LocalID), password, profile) pc := profileStore.GetProfileCopy() p := peer.FromProfile(pc) - p.Init(app.acn, app.eventBus) _, exists := app.peers[p.GetProfile().Onion] if exists { - p.Shutdown() + profileStore.Shutdown() + eventBus.Shutdown() return nil, fmt.Errorf("Error: profile for onion %v already exists", p.GetProfile().Onion) } + p.Init(app.acn, eventBus) + + blockedPeers := profile.BlockedPeers() + // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key. + identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) + engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers) + app.mutex.Lock() app.peers[p.GetProfile().Onion] = p app.storage[p.GetProfile().Onion] = profileStore + app.engines[p.GetProfile().Onion] = engine + app.eventBuses[p.GetProfile().Onion] = eventBus app.mutex.Unlock() return p, nil @@ -79,8 +92,10 @@ func (app *application) LoadProfiles(password string) error { for _, file := range files { - // TODO: Per profile eventBus - profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", file.Name()), password, nil) + eventBus := new(event.Manager) + eventBus.Initialize() + + profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", file.Name()), password, nil) err = profileStore.Load() if err != nil { continue @@ -91,16 +106,23 @@ func (app *application) LoadProfiles(password string) error { _, exists := app.peers[profile.Onion] if exists { profileStore.Shutdown() + eventBus.Shutdown() log.Errorf("profile for onion %v already exists", profile.Onion) continue } peer := peer.FromProfile(profile) - peer.Init(app.acn, app.eventBus) + peer.Init(app.acn, eventBus) + + blockedPeers := profile.BlockedPeers() + identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) + engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers) app.mutex.Lock() app.peers[profile.Onion] = peer app.storage[profile.Onion] = profileStore + app.engines[profile.Onion] = engine + app.eventBuses[profile.Onion] = eventBus if app.primaryonion == "" { app.primaryonion = profile.Onion } @@ -109,10 +131,13 @@ func (app *application) LoadProfiles(password string) error { return nil } +// LaunchPeers starts each peer Listening and connecting to peers and groups func (app *application) LaunchPeers() { for _, p := range app.peers { if !p.IsStarted() { p.Listen() + p.StartPeersConnections() + p.StartGroupConnections() } } } @@ -139,20 +164,34 @@ func (app *application) GetPeer(onion string) peer.CwtchPeer { return nil } -/* -// GetTorStatus returns tor control port bootstrap-phase status info in a map -func (app *application) GetTorStatus() (map[string]string, error) { - return app.torManager.GetStatus() -}*/ +// GetEventBus returns a cwtchPeer's event bus +func (app *application) GetEventBus(onion string) *event.Manager { + if manager, ok := app.eventBuses[onion]; ok { + return manager + } + return nil +} -// Fetch the app's event manager -func (app *application) EventBus() *event.Manager { - return app.eventBus +// ShutdownPeer shuts down a peer and removes it from the app's management +func (app *application) ShutdownPeer(onion string) { + app.mutex.Lock() + defer app.mutex.Unlock() + app.eventBuses[onion].Shutdown() + delete(app.eventBuses, onion) + app.peers[onion].Shutdown() + delete(app.peers, onion) + app.engines[onion].Shutdown() + delete(app.engines, onion) + app.storage[onion].Shutdown() + delete(app.storage, onion) } // Shutdown shutsdown all peers of an app and then the tormanager func (app *application) Shutdown() { - for _, peer := range app.peers { + for id, peer := range app.peers { peer.Shutdown() + app.engines[id].Shutdown() + app.storage[id].Shutdown() + app.eventBuses[id].Shutdown() } } diff --git a/app/bots/servermon/main.go b/app/bots/servermon/main.go index a8e4252..e82d2e5 100644 --- a/app/bots/servermon/main.go +++ b/app/bots/servermon/main.go @@ -11,16 +11,16 @@ import ( "time" ) -func waitForPeerServerConnection(peer peer.CwtchPeer, server string) error { +func waitForPeerGroupConnection(peer peer.CwtchPeer, groupID string) error { for { - servers := peer.GetServers() - state, ok := servers[server] + _, ok := peer.GetProfile().Groups[groupID] if ok { + state := peer.GetGroupState(groupID) if state == connections.FAILED { - return errors.New("Connection to server " + server + " failed!") + return errors.New("Connection to group " + groupID + " failed!") } if state != connections.AUTHENTICATED { - fmt.Printf("peer %v waiting to authenticate with server %v, current state: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state]) + fmt.Printf("peer %v waiting to authenticate with group %v 's server, current state: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state]) time.Sleep(time.Second * 10) continue } @@ -57,7 +57,7 @@ func main() { os.Exit(1) } - err = waitForPeerServerConnection(botPeer, serverAddr) + err = waitForPeerGroupConnection(botPeer, groupID) if err != nil { fmt.Printf("Could not connect to server %v: %v\n", serverAddr, err) os.Exit(1) diff --git a/app/cli/main.go b/app/cli/main.go index a0cbc7c..3fed1d5 100644 --- a/app/cli/main.go +++ b/app/cli/main.go @@ -6,7 +6,6 @@ import ( "bytes" "cwtch.im/cwtch/model" - "cwtch.im/cwtch/protocol/connections" "fmt" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" @@ -45,8 +44,8 @@ var suggestionsSelectedProfile = []prompt.Suggest{ {Text: "/invite", Description: "invite a new contact"}, {Text: "/invite-to-group", Description: "invite an existing contact to join an existing group"}, {Text: "/accept-invite", Description: "accept the invite of a group"}, - {Text: "/list-servers", Description: "retrieve a list of servers and their connection status"}, - {Text: "/list-peers", Description: "retrieve a list of peers and their connection status"}, + /*{Text: "/list-servers", Description: "retrieve a list of servers and their connection status"}, + {Text: "/list-peers", Description: "retrieve a list of peers and their connection status"},*/ {Text: "/export-group", Description: "export a group invite: prints as a string"}, {Text: "/trust", Description: "trust a peer"}, {Text: "/block", Description: "block a peer - you will no longer see messages or connect to this peer"}, @@ -55,13 +54,13 @@ var suggestionsSelectedProfile = []prompt.Suggest{ var suggestions = suggestionsBase var usages = map[string]string{ - "/new-profile": "/new-profile [name]", - "/load-profiles": "/load-profiles", - "/list-profiles": "", - "/select-profile": "/select-profile [onion]", - "/quit": "", - "/list-servers": "", - "/list-peers": "", + "/new-profile": "/new-profile [name]", + "/load-profiles": "/load-profiles", + "/list-profiles": "", + "/select-profile": "/select-profile [onion]", + "/quit": "", + /* "/list-servers": "", + "/list-peers": "",*/ "/list-contacts": "", "/list-groups": "", "/select-group": "/select-group [groupid]", @@ -424,7 +423,7 @@ func main() { } else { fmt.Printf("Error inviting peer, usage: %s\n", usages[commands[0]]) } - case "/list-peers": + /*case "/list-peers": peers := peer.GetPeers() for p, s := range peers { fmt.Printf("Name: %v Status: %v\n", p, connections.ConnectionStateName[s]) @@ -433,7 +432,7 @@ func main() { servers := peer.GetServers() for s, st := range servers { fmt.Printf("Name: %v Status: %v\n", s, connections.ConnectionStateName[st]) - } + }*/ case "/list-contacts": contacts := peer.GetContacts() for _, onion := range contacts { diff --git a/event/common.go b/event/common.go index f8d68e9..7e87e37 100644 --- a/event/common.go +++ b/event/common.go @@ -84,6 +84,14 @@ const ( // Key [eg "nick"] // Data [eg "open privacy board"] SetGroupAttribute = Type("SetGroupAttribute") + + // RemotePeer + // ConnectionState + PeerStateChange = Type("PeerStateChange") + + // GroupServer + // ConnectionState + ServerStateChange = Type("GroupStateChange") ) // Field defines common event attributes @@ -106,6 +114,8 @@ const ( ProfileName = Field("ProfileName") + ConnectionState = Field("ConnectionState") + Key = Field("Key") Data = Field("Data") diff --git a/event/eventmanager.go b/event/eventmanager.go index 58b1462..3334226 100644 --- a/event/eventmanager.go +++ b/event/eventmanager.go @@ -24,6 +24,7 @@ type Manager struct { events chan Event mapMutex sync.Mutex internal chan bool + closed bool } // Initialize sets up the Manager. @@ -31,6 +32,7 @@ func (em *Manager) Initialize() { em.subscribers = make(map[Type][]chan Event) em.events = make(chan Event) em.internal = make(chan bool) + em.closed = false go em.eventBus() } @@ -44,7 +46,7 @@ func (em *Manager) Subscribe(eventType Type, eventChannel chan Event) { // Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers func (em *Manager) Publish(event Event) { - if event.EventType != "" { + if event.EventType != "" && em.closed != true { em.events <- event } } @@ -83,6 +85,7 @@ func (em *Manager) eventBus() { // Shutdown triggers, and waits for, the internal eventBus goroutine to finish func (em *Manager) Shutdown() { em.events <- Event{} + em.closed = true // wait for eventBus to finish <-em.internal close(em.events) diff --git a/model/group.go b/model/group.go index f91ec63..6d7d7ac 100644 --- a/model/group.go +++ b/model/group.go @@ -29,6 +29,7 @@ type Group struct { Attributes map[string]string lock sync.Mutex LocalID string + State string `json:"-"` unacknowledgedMessages []Message } diff --git a/model/profile.go b/model/profile.go index 661adb5..b3109a0 100644 --- a/model/profile.go +++ b/model/profile.go @@ -27,6 +27,7 @@ type PublicProfile struct { Attributes map[string]string Timeline Timeline LocalID string // used by storage engine + State string `json:"-"` lock sync.Mutex } @@ -186,6 +187,18 @@ func (p *Profile) BlockPeer(onion string) (err error) { return } +// BlockedPeers calculates a list of Peers who have been Blocked. +func (p *Profile) BlockedPeers() []string { + blockedPeers := []string{} + for _, contact := range p.GetContacts() { + c, _ := p.GetContact(contact) + if c.Blocked { + blockedPeers = append(blockedPeers, c.Onion) + } + } + return blockedPeers +} + // TrustPeer sets a contact to trusted func (p *Profile) TrustPeer(onion string) (err error) { p.lock.Lock() diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index f2516e5..05475d5 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -5,11 +5,11 @@ import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol/connections" + "encoding/base32" "encoding/base64" "encoding/json" "errors" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" - "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "github.com/golang/protobuf/proto" "strings" @@ -24,7 +24,6 @@ type cwtchPeer struct { shutdown bool started bool - engine connections.Engine queue *event.Queue eventBus *event.Manager } @@ -47,9 +46,7 @@ type CwtchPeer interface { SendMessageToGroupTracked(string, string) (string, error) GetProfile() *model.Profile - - GetPeers() map[string]connections.ConnectionState - GetServers() map[string]connections.ConnectionState + GetPeerState(string) connections.ConnectionState StartGroup(string) (string, []byte, error) @@ -57,13 +54,16 @@ type CwtchPeer interface { ExportGroup(string) (string, error) GetGroup(string) *model.Group + GetGroupState(string) connections.ConnectionState GetGroups() []string - AddContact(nick, onion string, publickey []byte, trusted bool) + AddContact(nick, onion string, trusted bool) GetContacts() []string GetContact(string) *model.PublicProfile IsStarted() bool Listen() + StartPeersConnections() + StartGroupConnections() Shutdown() } @@ -90,19 +90,8 @@ func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus *event.Manager) { cp.eventBus = eventBus cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue.EventChannel) cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue.EventChannel) - - // Calculate a list of Peers who have been Blocked. - blockedPeers := []string{} - for _, contact := range cp.Profile.GetContacts() { - c, _ := cp.Profile.GetContact(contact) - if c.Blocked { - blockedPeers = append(blockedPeers, c.Onion) - } - } - - // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key. - identity := identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey) - cp.engine = connections.NewProtocolEngine(identity, cp.Profile.Ed25519PrivateKey, acn, eventBus, blockedPeers) + cp.eventBus.Subscribe(event.ServerStateChange, cp.queue.EventChannel) + cp.eventBus.Subscribe(event.PeerStateChange, cp.queue.EventChannel) } // ImportGroup intializes a group from an imported source rather than a peer invite @@ -175,8 +164,9 @@ func (cp *cwtchPeer) GetGroup(groupID string) *model.Group { return cp.Profile.GetGroupByGroupID(groupID) } -func (cp *cwtchPeer) AddContact(nick, onion string, publickey []byte, trusted bool) { - pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: publickey, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"nick": nick}} +func (cp *cwtchPeer) AddContact(nick, onion string, trusted bool) { + decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion)) + pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: decodedPub, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"nick": nick}} cp.Profile.AddContact(onion, pp) pd, _ := json.Marshal(pp) cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{ @@ -201,6 +191,14 @@ func (cp *cwtchPeer) GetProfile() *model.Profile { return cp.Profile } +func (cp *cwtchPeer) GetPeerState(onion string) connections.ConnectionState { + return connections.ConnectionStateType[cp.Profile.Contacts[onion].State] +} + +func (cp *cwtchPeer) GetGroupState(groupid string) connections.ConnectionState { + return connections.ConnectionStateType[cp.Profile.Groups[groupid].State] +} + // PeerWithOnion is the entry point for cwtchPeer relationships func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection { cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) @@ -255,16 +253,6 @@ func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { return event.EventID } -// GetPeers returns a list of peer connections. -func (cp *cwtchPeer) GetPeers() map[string]connections.ConnectionState { - return cp.engine.GetPeers() -} - -// GetServers returns a list of server connections -func (cp *cwtchPeer) GetServers() map[string]connections.ConnectionState { - return cp.engine.GetServers() -} - // TrustPeer sets an existing peer relationship to trusted func (cp *cwtchPeer) TrustPeer(peer string) error { err := cp.Profile.TrustPeer(peer) @@ -298,14 +286,34 @@ func (cp *cwtchPeer) RejectInvite(groupID string) { cp.Profile.RejectInvite(groupID) } +// Listen makes the peer open a listening port to accept incoming connections (and be detactably online) func (cp *cwtchPeer) Listen() { cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{})) } +// StartGroupConnections attempts to connect to all group servers (thus initiating reconnect attempts in the conectionsmanager) +func (cp *cwtchPeer) StartPeersConnections() { + for _, contact := range cp.GetContacts() { + cp.PeerWithOnion(contact) + } +} + +// StartPeerConnections attempts to connect to all peers (thus initiating reconnect attempts in the conectionsmanager) +func (cp *cwtchPeer) StartGroupConnections() { + joinedServers := map[string]bool{} + for _, groupID := range cp.GetGroups() { + // Only send a join server packet if we haven't joined this server yet... + group := cp.GetGroup(groupID) + if joined := joinedServers[groupID]; group.Accepted && !joined { + cp.JoinServer(group.GroupServer) + joinedServers[group.GroupServer] = true + } + } +} + // Shutdown kills all connections and cleans up all goroutines for the peer func (cp *cwtchPeer) Shutdown() { cp.shutdown = true - cp.engine.Shutdown() cp.queue.Shutdown() } @@ -328,6 +336,16 @@ func (cp *cwtchPeer) eventHandler() { var groupInvite protocol.GroupChatInvite proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &groupInvite) cp.Profile.ProcessInvite(&groupInvite, ev.Data[event.RemotePeer]) + case event.PeerStateChange: + if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists { + cp.Profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState] + } + case event.ServerStateChange: + for _, group := range cp.Profile.Groups { + if group.GroupServer == ev.Data[event.GroupServer] { + group.State = ev.Data[event.ConnectionState] + } + } default: if ev.EventType != "" { log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) diff --git a/protocol/connections/connectionsmanager.go b/protocol/connections/connectionsmanager.go index 27b2b25..bb7730b 100644 --- a/protocol/connections/connectionsmanager.go +++ b/protocol/connections/connectionsmanager.go @@ -42,12 +42,12 @@ func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConn } // ManageServerConnection creates a new ServerConnection for Host with the given callback handler. -func (m *Manager) ManageServerConnection(host string, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) { +func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) { m.lock.Lock() psc, exists := m.serverConnections[host] - newPsc := NewPeerServerConnection(m.acn, host) + newPsc := NewPeerServerConnection(engine, host) newPsc.GroupMessageHandler = messageHandler newPsc.CloseHandler = closedHandler go newPsc.Run() @@ -60,28 +60,6 @@ func (m *Manager) ManageServerConnection(host string, messageHandler func(string m.lock.Unlock() } -// GetPeers returns a map of all peer connections with their state -func (m *Manager) GetPeers() map[string]ConnectionState { - rm := make(map[string]ConnectionState) - m.lock.Lock() - for onion, ppc := range m.peerConnections { - rm[onion] = ppc.GetState() - } - m.lock.Unlock() - return rm -} - -// GetServers returns a map of all server connections with their state. -func (m *Manager) GetServers() map[string]ConnectionState { - rm := make(map[string]ConnectionState) - m.lock.Lock() - for onion, psc := range m.serverConnections { - rm[onion] = psc.GetState() - } - m.lock.Unlock() - return rm -} - // GetPeerPeerConnectionForOnion safely returns a given peer connection func (m *Manager) GetPeerPeerConnectionForOnion(host string) (ppc *PeerPeerConnection) { m.lock.Lock() @@ -103,7 +81,6 @@ func (m *Manager) AttemptReconnections() { maxTimeout := time.Minute * 5 // nearly instant first run, next few runs will prolly be too quick to have any FAILED and will gracefully slow to MAX after that timeout := time.Millisecond * 500 - for { select { case <-time.After(timeout): diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 1c1be8c..62728fe 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -45,6 +45,7 @@ type engine struct { type Engine interface { Identity() identity.Identity ACN() connectivity.ACN + EventManager() *event.Manager GetPeerHandler(string) *CwtchPeerHandler ContactRequest(string, string) string @@ -52,9 +53,6 @@ type Engine interface { LookupContact(string, rsa.PublicKey) (bool, bool) LookupContactV3(string, ed25519.PublicKey) (bool, bool) - GetPeers() map[string]ConnectionState - GetServers() map[string]ConnectionState - Shutdown() } @@ -94,6 +92,10 @@ func (e *engine) Identity() identity.Identity { return e.identity } +func (e *engine) EventManager() *event.Manager { + return e.eventManager +} + // eventHandler process events from other subsystems func (e *engine) eventHandler() { for { @@ -244,7 +246,7 @@ func (e *engine) finishedFetch(server string) { // joinServer manages a new server connection with the given onion address func (e *engine) joinServer(onion string) { - e.connectionsManager.ManageServerConnection(onion, e.receiveGroupMessage, e.finishedFetch) + e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage, e.finishedFetch) } // sendMessageToGroup attempts to sent the given message to the given group id. @@ -264,16 +266,6 @@ func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) { } } -// GetPeers returns a list of peer connections. -func (e *engine) GetPeers() map[string]ConnectionState { - return e.connectionsManager.GetPeers() -} - -// GetServers returns a list of server connections -func (e *engine) GetServers() map[string]ConnectionState { - return e.connectionsManager.GetServers() -} - // CwtchPeerInstance encapsulates incoming peer connections type CwtchPeerInstance struct { rai *application.Instance diff --git a/protocol/connections/peerpeerconnection.go b/protocol/connections/peerpeerconnection.go index 3644fb5..684ef6d 100644 --- a/protocol/connections/peerpeerconnection.go +++ b/protocol/connections/peerpeerconnection.go @@ -1,6 +1,7 @@ package connections import ( + "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol/connections/peer" "errors" "git.openprivacy.ca/openprivacy/libricochet-go" @@ -28,6 +29,14 @@ func NewPeerPeerConnection(peerhostname string, protocolEngine Engine) *PeerPeer return ppc } +func (ppc *PeerPeerConnection) setState(state ConnectionState) { + ppc.state = state + ppc.protocolEngine.EventManager().Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ + event.RemotePeer: string(ppc.PeerHostname), + event.ConnectionState: ConnectionStateName[state], + })) +} + // GetState returns the current connection state func (ppc *PeerPeerConnection) GetState() ConnectionState { return ppc.state @@ -78,14 +87,14 @@ func (ppc *PeerPeerConnection) WaitTilAuthenticated() { // Run manages the setup and teardown of a peer->peer connection func (ppc *PeerPeerConnection) Run() error { - ppc.state = CONNECTING + ppc.setState(CONNECTING) rc, err := goricochet.Open(ppc.protocolEngine.ACN(), ppc.PeerHostname) if err == nil { ppc.connection = rc - ppc.state = CONNECTED + ppc.setState(CONNECTED) _, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity()) if err == nil { - ppc.state = AUTHENTICATED + ppc.setState(AUTHENTICATED) go func() { ppc.connection.Do(func() error { ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)}) @@ -101,13 +110,13 @@ func (ppc *PeerPeerConnection) Run() error { ppc.connection.Process(ppc) } } - ppc.state = FAILED + ppc.setState(FAILED) return err } // Close closes the connection func (ppc *PeerPeerConnection) Close() { - ppc.state = KILLED + ppc.setState(KILLED) if ppc.connection != nil { ppc.connection.Close() } diff --git a/protocol/connections/peerserverconnection.go b/protocol/connections/peerserverconnection.go index 3b3094b..6325820 100644 --- a/protocol/connections/peerserverconnection.go +++ b/protocol/connections/peerserverconnection.go @@ -2,6 +2,7 @@ package connections import ( "crypto/rand" + "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol/connections/fetch" "cwtch.im/cwtch/protocol/connections/listen" @@ -10,7 +11,6 @@ import ( "git.openprivacy.ca/openprivacy/libricochet-go" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/connection" - "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "golang.org/x/crypto/ed25519" @@ -20,19 +20,19 @@ import ( // PeerServerConnection encapsulates a single Peer->Server connection type PeerServerConnection struct { connection.AutoConnectionHandler - Server string - state ConnectionState - connection *connection.Connection - acn connectivity.ACN + Server string + state ConnectionState + connection *connection.Connection + protocolEngine Engine GroupMessageHandler func(string, *protocol.GroupMessage) CloseHandler func(string) } // NewPeerServerConnection creates a new Peer->Server outbound connection -func NewPeerServerConnection(acn connectivity.ACN, serverhostname string) *PeerServerConnection { +func NewPeerServerConnection(engine Engine, serverhostname string) *PeerServerConnection { psc := new(PeerServerConnection) - psc.acn = acn + psc.protocolEngine = engine psc.Server = serverhostname psc.Init() return psc @@ -43,6 +43,14 @@ func (psc *PeerServerConnection) GetState() ConnectionState { return psc.state } +func (psc *PeerServerConnection) setState(state ConnectionState) { + psc.state = state + psc.protocolEngine.EventManager().Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ + event.GroupServer: string(psc.Server), + event.ConnectionState: ConnectionStateName[state], + })) +} + // WaitTilAuthenticated waits until the underlying connection is authenticated func (psc *PeerServerConnection) WaitTilAuthenticated() { for { @@ -56,15 +64,15 @@ func (psc *PeerServerConnection) WaitTilAuthenticated() { // Run manages the setup and teardown of a peer server connection func (psc *PeerServerConnection) Run() error { log.Infof("Connecting to %v", psc.Server) - rc, err := goricochet.Open(psc.acn, psc.Server) + rc, err := goricochet.Open(psc.protocolEngine.ACN(), psc.Server) if err == nil { psc.connection = rc - psc.state = CONNECTED + psc.setState(CONNECTED) pub, priv, err := ed25519.GenerateKey(rand.Reader) if err == nil { _, err := connection.HandleOutboundConnection(psc.connection).ProcessAuthAsV3Client(identity.InitializeV3("cwtchpeer", &priv, &pub)) if err == nil { - psc.state = AUTHENTICATED + psc.setState(AUTHENTICATED) go func() { psc.connection.Do(func() error { @@ -81,7 +89,7 @@ func (psc *PeerServerConnection) Run() error { } } } - psc.state = FAILED + psc.setState(FAILED) return err } @@ -128,7 +136,7 @@ func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) err // Close shuts down the connection (freeing the handler goroutines) func (psc *PeerServerConnection) Close() { - psc.state = KILLED + psc.setState(KILLED) if psc.connection != nil { psc.connection.Close() } diff --git a/protocol/connections/peerserverconnection_test.go b/protocol/connections/peerserverconnection_test.go index d86d444..13b9fe6 100644 --- a/protocol/connections/peerserverconnection_test.go +++ b/protocol/connections/peerserverconnection_test.go @@ -2,6 +2,7 @@ package connections import ( "crypto/rand" + "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/server/fetch" "cwtch.im/cwtch/server/send" @@ -75,7 +76,11 @@ func TestPeerServerConnection(t *testing.T) { <-listenChan onionAddr := identity.Hostname() - psc := NewPeerServerConnection(connectivity.LocalProvider(), "127.0.0.1:5451|"+onionAddr) + manager := &event.Manager{} + manager.Initialize() + engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil) + + psc := NewPeerServerConnection(engine, "127.0.0.1:5451|"+onionAddr) numcalls := 0 psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) { numcalls++ diff --git a/protocol/connections/state.go b/protocol/connections/state.go index 79e1594..f8a6d9c 100644 --- a/protocol/connections/state.go +++ b/protocol/connections/state.go @@ -7,7 +7,7 @@ type ConnectionState int // DISCONNECTED - No existing connection has been made, or all attempts have failed // CONNECTING - We are in the process of attempting to connect to a given endpoint // CONNECTED - We have connected but not yet authenticated -// AUTHENTICATED - im.ricochet.auth-hidden-server has succeeded on thec onnection. +// AUTHENTICATED - im.ricochet.auth-hidden-server has succeeded on the connection. const ( DISCONNECTED ConnectionState = iota CONNECTING @@ -20,4 +20,8 @@ const ( var ( // ConnectionStateName allows conversion of states to their string representations ConnectionStateName = []string{"Disconnected", "Connecting", "Connected", "Authenticated", "Failed", "Killed"} + + // ConnectionStateType allows conversion of strings to their state type + ConnectionStateType = map[string]ConnectionState{"Disconnected": DISCONNECTED, "Connecting": CONNECTING, + "Connected": CONNECTED, "Authenticated": AUTHENTICATED, "Failed": FAILED, "Killed": KILLED} ) diff --git a/storage/profile_store.go b/storage/profile_store.go index ba09624..0247be1 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -21,6 +21,7 @@ type profileStore struct { profile *model.Profile eventManager *event.Manager queue *event.Queue + writer bool } // ProfileStore is an interface to managing the storage of Cwtch Profiles @@ -30,11 +31,11 @@ type ProfileStore interface { GetProfileCopy() *model.Profile } -// NewProfileStore returns a profile store backed by a filestore listening for events and saving them +// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them // directory should be $appDir/profiles/$rand -func NewProfileStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore { +func NewProfileWriterStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore { os.Mkdir(directory, 0700) - ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}} + ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true} ps.queue = event.NewEventQueue(100) if profile != nil { ps.save() @@ -55,6 +56,15 @@ func NewProfileStore(eventManager *event.Manager, directory, password string, pr return ps } +// NewProfileReaderStore returns a profile store backed by a filestore +// directory should be $appDir/profiles/$rand +func NewProfileReaderStore(directory, password string, profile *model.Profile) ProfileStore { + os.Mkdir(directory, 0700) + ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true} + + return ps +} + // NewProfile creates a new profile for use in the profile store. func NewProfile(name string) *model.Profile { profile := model.GenerateNewProfile(name) @@ -62,8 +72,11 @@ func NewProfile(name string) *model.Profile { } func (ps *profileStore) save() error { - bytes, _ := json.Marshal(ps.profile) - return ps.fs.Save(bytes) + if ps.writer { + bytes, _ := json.Marshal(ps.profile) + return ps.fs.Save(bytes) + } + return nil } // Load instantiates a cwtchPeer from the file store @@ -181,5 +194,7 @@ func (ps *profileStore) eventHandler() { } func (ps *profileStore) Shutdown() { - ps.queue.Shutdown() + if ps.queue != nil { + ps.queue.Shutdown() + } } diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go index 8dc17f8..3466b8b 100644 --- a/storage/profile_store_test.go +++ b/storage/profile_store_test.go @@ -20,7 +20,7 @@ func TestProfileStoreWriteRead(t *testing.T) { eventBus := new(event.Manager) eventBus.Initialize() profile := NewProfile(testProfileName) - ps1 := NewProfileStore(eventBus, testingDir, password, profile) + ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile) eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal})) time.Sleep(1 * time.Second) @@ -50,7 +50,7 @@ func TestProfileStoreWriteRead(t *testing.T) { ps1.Shutdown() - ps2 := NewProfileStore(eventBus, testingDir, password, nil) + ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil) err = ps2.Load() if err != nil { t.Errorf("Error createing profileStore: %v\n", err) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index b49b7a5..bdcbe44 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -1,7 +1,7 @@ package testing import ( - "cwtch.im/cwtch/event" + app2 "cwtch.im/cwtch/app" "cwtch.im/cwtch/model" "cwtch.im/cwtch/peer" "cwtch.im/cwtch/protocol/connections" @@ -55,16 +55,17 @@ func serverCheck(t *testing.T, serverAddr string) bool { return true } -func waitForPeerServerConnection(t *testing.T, peer peer.CwtchPeer, server string) { +func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) { for { - servers := peer.GetServers() - state, ok := servers[server] + _, ok := peer.GetProfile().Groups[groupID] if ok { + state := peer.GetGroupState(groupID) + //log.Infof("Waiting for Peer %v to join group %v - state: %v\n", peer.GetProfile().Name, groupID, state) if state == connections.FAILED { - t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, server) + t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, groupID) } if state != connections.AUTHENTICATED { - fmt.Printf("peer %v waiting connect to server %v, currently: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state]) + fmt.Printf("peer %v waiting connect to group %v, currently: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state]) time.Sleep(time.Second * 5) continue } else { @@ -78,9 +79,11 @@ func waitForPeerServerConnection(t *testing.T, peer peer.CwtchPeer, server strin func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) { for { - peers := peera.GetPeers() - state, ok := peers[peerb.GetProfile().Onion] + //peers := peera.GetPeers() + _, ok := peera.GetProfile().Contacts[peerb.GetProfile().Onion] if ok { + state := peera.GetPeerState(peerb.GetProfile().Onion) + //log.Infof("Waiting for Peer %v to peer with peer: %v - state: %v\n", peera.GetProfile().Name, peerb.GetProfile().Name, state) if state == connections.FAILED { t.Fatalf("%v could not connect to %v", peera.GetProfile().Onion, peerb.GetProfile().Onion) } @@ -132,35 +135,33 @@ func TestCwtchPeerIntegration(t *testing.T) { numGoRoutinesPostServer := runtime.NumGoroutine() + app := app2.NewApp(acn, "./storage") + // ***** cwtchPeer setup ***** // It's important that each Peer have their own EventBus - aliceEventBus := new(event.Manager) + /*aliceEventBus := new(event.Manager) aliceEventBus.Initialize() bobEventBus := new(event.Manager) bobEventBus.Initialize() carolEventBus := new(event.Manager) - carolEventBus.Initialize() + carolEventBus.Initialize()*/ fmt.Println("Creating Alice...") - alice := peer.NewCwtchPeer("Alice") - alice.Init(acn, aliceEventBus) - alice.Listen() + alice, _ := app.CreatePeer("alice", "asdfasdf") fmt.Println("Alice created:", alice.GetProfile().Onion) fmt.Println("Creating Bob...") - bob := peer.NewCwtchPeer("Bob") - bob.Init(acn, bobEventBus) - bob.Listen() + bob, _ := app.CreatePeer("bob", "asdfasdf") fmt.Println("Bob created:", bob.GetProfile().Onion) fmt.Println("Creating Carol...") - carol := peer.NewCwtchPeer("Carol") - carol.Init(acn, carolEventBus) - carol.Listen() + carol, _ := app.CreatePeer("Carol", "asdfasdf") fmt.Println("Carol created:", carol.GetProfile().Onion) - fmt.Println("Waiting for Alice, Bob, and Carol to connection with onion network...") + app.LaunchPeers() + + fmt.Println("Waiting for Alice, Bob, and Carol to connect with onion network...") time.Sleep(time.Second * 90) numGoRoutinesPostPeerStart := runtime.NumGoroutine() @@ -175,8 +176,10 @@ func TestCwtchPeerIntegration(t *testing.T) { } fmt.Println("Alice peering with Bob...") + alice.AddContact("Bob", bob.GetProfile().Onion, false) // Add contact so we can track connection state alice.PeerWithOnion(bob.GetProfile().Onion) fmt.Println("Alice peering with Carol...") + alice.AddContact("Carol", carol.GetProfile().Onion, false) alice.PeerWithOnion(carol.GetProfile().Onion) fmt.Println("Alice joining server...") @@ -185,10 +188,10 @@ func TestCwtchPeerIntegration(t *testing.T) { bob.JoinServer(serverAddr) fmt.Println("Waiting for alice to join server...") - waitForPeerServerConnection(t, alice, serverAddr) + waitForPeerGroupConnection(t, alice, groupID) - fmt.Println("Waiting for bob to join server...") - waitForPeerServerConnection(t, bob, serverAddr) + //fmt.Println("Waiting for bob to join server...") + //waitForPeerGroupConnection(t, bob, groupID) fmt.Println("Waiting for alice and Bob to peer...") waitForPeerPeerConnection(t, alice, bob) @@ -217,35 +220,9 @@ func TestCwtchPeerIntegration(t *testing.T) { numGoRoutinesPostServerConnect := runtime.NumGoroutine() - // ***** Fill up message history of server ****** - - /* - // filler group will be used to fill up the servers message history a bit to stress test fetch later for carol - fillerGroupId, _, err := alice.Profile.StartGroup(serverAddr) - if err != nil { - t.Errorf("Failed to init filler group: %v", err) - return - } - - fmt.Println("Alice filling message history of server...") - for i := 0; i < 100; i++ { - - go func (x int) { - time.Sleep(time.Second * time.Duration(x)) - err := alice.SendMessageToGroup(fillerGroupId, aliceLines[0]) - if err != nil { - fmt.Println("SEND", x, "ERROR:", err) - } else { - fmt.Println("SEND", x, " SUCCESS!") - } - }(i) - } - - time.Sleep(time.Second * 110) - */ // Wait for them to join the server - waitForPeerServerConnection(t, alice, serverAddr) - waitForPeerServerConnection(t, bob, serverAddr) + waitForPeerGroupConnection(t, alice, groupID) + waitForPeerGroupConnection(t, bob, groupID) //numGouRoutinesPostServerConnect := runtime.NumGoroutine() // ***** Conversation ***** @@ -291,14 +268,13 @@ func TestCwtchPeerIntegration(t *testing.T) { } fmt.Println("Shutting down Alice...") - alice.Shutdown() - aliceEventBus.Shutdown() + app.ShutdownPeer(alice.GetProfile().Onion) time.Sleep(time.Second * 5) numGoRoutinesPostAlice := runtime.NumGoroutine() fmt.Println("Carol joining server...") carol.JoinServer(serverAddr) - waitForPeerServerConnection(t, carol, serverAddr) + waitForPeerGroupConnection(t, carol, groupID) numGoRotinesPostCarolConnect := runtime.NumGoroutine() fmt.Println("Bob> ", bobLines[2]) @@ -380,8 +356,7 @@ func TestCwtchPeerIntegration(t *testing.T) { } fmt.Println("Shutting down Bob...") - bob.Shutdown() - bobEventBus.Shutdown() + app.ShutdownPeer(bob.GetProfile().Onion) time.Sleep(time.Second * 3) numGoRoutinesPostBob := runtime.NumGoroutine() if server != nil { @@ -392,8 +367,7 @@ func TestCwtchPeerIntegration(t *testing.T) { numGoRoutinesPostServerShutdown := runtime.NumGoroutine() fmt.Println("Shutting down Carol...") - carol.Shutdown() - carolEventBus.Shutdown() + app.ShutdownPeer(carol.GetProfile().Onion) time.Sleep(time.Second * 3) numGoRoutinesPostCarol := runtime.NumGoroutine()