diff --git a/app/app.go b/app/app.go index dd02147..85bc997 100644 --- a/app/app.go +++ b/app/app.go @@ -26,7 +26,8 @@ type applicationCore struct { } type appletPeers struct { - peers map[string]peer.CwtchPeer + peers map[string]peer.CwtchPeer + launched bool // bit hacky, place holder while we transition to full multi peer support and a better api } type appletACN struct { @@ -69,6 +70,7 @@ func newAppCore(appDirectory string) *applicationCore { func (ap *appletPeers) init() { ap.peers = make(map[string]peer.CwtchPeer) + ap.launched = false } func (a *appletACN) init(acn connectivity.ACN, publish func(int, string)) { @@ -127,7 +129,7 @@ func (app *application) CreatePeer(name string, password string) { profileStore := storage.NewProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile) app.storage[profile.Onion] = profileStore - pc := app.storage[profile.Onion].GetProfileCopy() + pc := app.storage[profile.Onion].GetProfileCopy(true) peer := peer.FromProfile(pc) peer.Init(app.eventBuses[profile.Onion]) @@ -143,7 +145,7 @@ func (app *application) CreatePeer(name string, password string) { } // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them -func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfileFn) error { +func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error { files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles")) if err != nil { return fmt.Errorf("Error: cannot read profiles directory: %v", err) @@ -157,7 +159,7 @@ func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfi continue } - profile := profileStore.GetProfileCopy() + profile := profileStore.GetProfileCopy(timeline) _, exists := ac.eventBuses[profile.Onion] if exists { @@ -179,7 +181,7 @@ func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfi // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them func (app *application) LoadProfiles(password string) { count := 0 - app.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) { + app.applicationCore.LoadProfiles(password, true, func(profile *model.Profile, profileStore storage.ProfileStore) { peer := peer.FromProfile(profile) peer.Init(app.eventBuses[profile.Onion]) @@ -207,13 +209,16 @@ func (app *application) GetPrimaryBus() event.Manager { // LaunchPeers starts each peer Listening and connecting to peers and groups func (ap *appletPeers) LaunchPeers() { - for _, p := range ap.peers { - if !p.IsStarted() { - p.Listen() - p.StartPeersConnections() - p.StartGroupConnections() - } + log.Debugf("appletPeers LaunchPeers\n") + if ap.launched { + return } + for _, p := range ap.peers { + p.Listen() + p.StartPeersConnections() + p.StartGroupConnections() + } + ap.launched = true } // ListPeers returns a map of onions to their profile's Name diff --git a/app/appBridge.go b/app/appBridge.go index 6a68945..28235a5 100644 --- a/app/appBridge.go +++ b/app/appBridge.go @@ -3,6 +3,11 @@ package app import "cwtch.im/cwtch/event" import "git.openprivacy.ca/openprivacy/libricochet-go/log" +const ( + // DestApp should be used as a destination for IPC messages that are for the application itself an not a peer + DestApp = "app" +) + type applicationBridge struct { applicationCore @@ -16,6 +21,7 @@ func (ab *applicationBridge) listen() { ipcMessage, ok := ab.bridge.Read() log.Debugf("listen() got %v for %v\n", ipcMessage.Message.EventType, ipcMessage.Dest) if !ok { + log.Debugln("exiting appBridge.listen()") return } @@ -30,5 +36,4 @@ func (ab *applicationBridge) listen() { } func (ab *applicationBridge) Shutdown() { - ab.bridge.Shutdown() } diff --git a/app/appClient.go b/app/appClient.go index 9b0da8c..04ba9cc 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -24,6 +24,8 @@ func NewAppClient(appDirectory string, bridge event.IPCBridge) Application { go appClient.listen() + appClient.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadClient)}) + log.Infoln("Created new App Client") return appClient } @@ -38,17 +40,20 @@ func (ac *applicationClient) handleEvent(ev *event.Event) { case event.NewPeer: localID := ev.Data[event.Identity] password := ev.Data[event.Password] - ac.newPeer(localID, password) + reload := ev.Data[event.Status] == "running" + ac.newPeer(localID, password, reload) case event.PeerError: ac.appBus.Publish(*ev) case event.AppError: ac.appBus.Publish(*ev) case event.ACNStatus: ac.appBus.Publish(*ev) + case event.ReloadDone: + ac.appBus.Publish(*ev) } } -func (ac *applicationClient) newPeer(localID, password string) { +func (ac *applicationClient) newPeer(localID, password string, reload bool) { profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), password) if err != nil { log.Errorf("Could not read profile for NewPeer event: %v\n", err) @@ -71,7 +76,16 @@ func (ac *applicationClient) newPeer(localID, password string) { defer ac.mutex.Unlock() ac.peers[profile.Onion] = peer ac.eventBuses[profile.Onion] = eventBus - ac.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})) + npEvent := event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}) + if reload { + npEvent.Data[event.Status] = "running" + } + ac.appBus.Publish(npEvent) + + if reload { + ac.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadPeer, event.Identity, profile.Onion)}) + } + } // CreatePeer messages the service to create a new Peer with the given name diff --git a/app/appService.go b/app/appService.go index 13b1028..48414b0 100644 --- a/app/appService.go +++ b/app/appService.go @@ -12,11 +12,6 @@ import ( "strconv" ) -const ( - // DestApp should be used as a destination for IPC messages that are for the application itself an not a peer - DestApp = "app" -) - type applicationService struct { applicationBridge appletACN @@ -33,6 +28,7 @@ type ApplicationService interface { // NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService { appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}} + fn := func(progress int, status string) { progStr := strconv.Itoa(progress) appService.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)}) @@ -56,6 +52,25 @@ func (as *applicationService) handleEvent(ev *event.Event) { case event.LoadProfiles: password := ev.Data[event.Password] as.loadProfiles(password) + case event.ReloadClient: + for _, storage := range as.storage { + message := event.IPCMessage{Dest: DestApp, Message: *storage.GetNewPeerMessage()} + as.bridge.Write(&message) + } + + message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadDone)} + as.bridge.Write(&message) + case event.ReloadPeer: + onion := ev.Data[event.Identity] + events := as.storage[onion].GetStatusMessages() + + for _, ev := range events { + message := event.IPCMessage{Dest: onion, Message: *ev} + as.bridge.Write(&message) + } + case event.ShutdownPeer: + onion := ev.Data[event.Identity] + as.ShutdownPeer(onion) } } @@ -86,8 +101,9 @@ func (as *applicationService) createPeer(name, password string) { func (as *applicationService) loadProfiles(password string) { count := 0 - as.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) { + as.applicationCore.LoadProfiles(password, false, func(profile *model.Profile, profileStore storage.ProfileStore) { as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion]) + blockedPeers := profile.BlockedPeers() identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], blockedPeers) @@ -105,11 +121,18 @@ func (as *applicationService) loadProfiles(password string) { } } +func (as *applicationService) ShutdownPeer(onion string) { + as.engines[onion].Shutdown() + delete(as.engines, onion) + as.storage[onion].Shutdown() + delete(as.storage, onion) + as.eventBuses[onion].Shutdown() + delete(as.eventBuses, onion) +} + // Shutdown shuts down the application Service and all peer related backend parts func (as *applicationService) Shutdown() { for id := range as.engines { - as.engines[id].Shutdown() - as.storage[id].Shutdown() - as.eventBuses[id].Shutdown() + as.ShutdownPeer(id) } } diff --git a/event/bridge/pipeBridge.go b/event/bridge/pipeBridge.go index 6db91e8..bd97c42 100644 --- a/event/bridge/pipeBridge.go +++ b/event/bridge/pipeBridge.go @@ -26,6 +26,9 @@ type pipeBridge struct { closedChan chan bool state connections.ConnectionState lock sync.Mutex + + // For logging / debugging purposes + name string } func newPipeBridge(inFilename, outFilename string) *pipeBridge { @@ -41,66 +44,50 @@ func newPipeBridge(inFilename, outFilename string) *pipeBridge { func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge { log.Debugf("Making new PipeBridge Client...\n") pb := newPipeBridge(inFilename, outFilename) - go pb.clientConnectionManager() + pb.name = "client" + go pb.connectionManager() return pb } // NewPipeBridgeService returns a pipe backed IPCBridge for a service -func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, error) { +func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge { log.Debugf("Making new PipeBridge Service...\n") pb := newPipeBridge(inFilename, outFilename) + pb.name = "service" - go pb.serviceConnectionManager() + go pb.connectionManager() log.Debugf("Successfully created new PipeBridge Service!\n") - return pb, nil + return pb } -func (pb *pipeBridge) clientConnectionManager() { +func (pb *pipeBridge) connectionManager() { for pb.state != connections.KILLED { + log.Debugf("clientConnManager loop start init\n") pb.state = connections.CONNECTING var err error - pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600) + log.Debugf("%v open file infile\n", pb.name) + pb.in, err = os.OpenFile(pb.infile, os.O_RDWR, 0600) if err != nil { pb.state = connections.DISCONNECTED continue } - pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600) + log.Debugf("%v open file outfile\n", pb.name) + pb.out, err = os.OpenFile(pb.outfile, os.O_RDWR, 0600) if err != nil { pb.state = connections.DISCONNECTED continue } - log.Debugf("Successfully connected PipeBridge Client!\n") + log.Debugf("Successfully connected PipeBridge %v!\n", pb.name) pb.handleConns() } -} + log.Debugf("exiting %v ConnectionManager\n", pb.name) -func (pb *pipeBridge) serviceConnectionManager() { - for pb.state != connections.KILLED { - pb.state = connections.CONNECTING - - var err error - pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600) - if err != nil { - pb.state = connections.DISCONNECTED - continue - } - - pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600) - if err != nil { - pb.state = connections.DISCONNECTED - continue - } - - log.Debugf("Successfully connected PipeBridge Service!\n") - - pb.handleConns() - } } func (pb *pipeBridge) handleConns() { @@ -110,30 +97,41 @@ func (pb *pipeBridge) handleConns() { pb.closedChan = make(chan bool, 5) - log.Debugf("handleConns authed, 2xgo\n") + log.Debugf("handleConns authed, %v 2xgo\n", pb.name) go pb.handleRead() go pb.handleWrite() <-pb.closedChan - log.Debugf("handleConns CLOSEDCHAN!!!!\n") + log.Debugf("handleConns <-closedChan (%v)\n", pb.name) if pb.state != connections.KILLED { pb.state = connections.FAILED } + pb.closeReset() + log.Debugf("handleConns done for %v, exit\n", pb.name) +} + +func (pb *pipeBridge) closeReset() { pb.in.Close() pb.out.Close() close(pb.write) close(pb.read) pb.read = make(chan event.IPCMessage, maxBufferSize) pb.write = make(chan event.IPCMessage, maxBufferSize) - log.Debugf("handleConns done, exit\n") } func (pb *pipeBridge) handleWrite() { + log.Debugf("handleWrite() %v\n", pb.name) + defer log.Debugf("exiting handleWrite() %v\n", pb.name) + for { select { case message := <-pb.write: - log.Debugf("handleWrite <- message: %v\n", message) + if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { + log.Debugf("handleWrite <- message: %v %v ...\n", message.Dest, message.Message.EventType) + } else { + log.Debugf("handleWrite <- message: %v\n", message) + } if pb.state == connections.AUTHENTICATED { encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}} for k, v := range message.Message.Data { @@ -162,6 +160,9 @@ func (pb *pipeBridge) handleWrite() { } func (pb *pipeBridge) handleRead() { + log.Debugf("handleRead() %v\n", pb.name) + defer log.Debugf("exiting handleRead() %v", pb.name) + var n int size := make([]byte, 2) var err error @@ -198,26 +199,41 @@ func (pb *pipeBridge) handleRead() { val, _ := base64.StdEncoding.DecodeString(v) message.Message.Data[k] = string(val) } - log.Debugf("handleRead read<-: %v\n", message) + if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { + log.Debugf("handleRead read<-: %v %v ...\n", message.Dest, message.Message.EventType) + } else { + log.Debugf("handleRead read<-: %v\n", message) + } pb.read <- message log.Debugf("handleRead wrote\n") } } func (pb *pipeBridge) Read() (*event.IPCMessage, bool) { - log.Debugf("Read()...\n") + log.Debugf("Read() %v...\n", pb.name) + message := <-pb.read - log.Debugf("Read: %v\n", message) - return &message, true + if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { + log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType) + } else { + log.Debugf("Read %v: %v\n", pb.name, message) + } + return &message, pb.state != connections.KILLED } func (pb *pipeBridge) Write(message *event.IPCMessage) { - log.Debugf("Write: %v\n", message) + if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { + log.Debugf("Write %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType) + } else { + log.Debugf("Write %v: %v\n", pb.name, message) + } pb.write <- *message log.Debugf("Wrote\n") } func (pb *pipeBridge) Shutdown() { + log.Debugf("pb.Shutdown() for %v currently in state: %v\n", pb.name, connections.ConnectionStateName[pb.state]) pb.state = connections.KILLED pb.closedChan <- true + log.Debugf("Done Shutdown for %v\n", pb.name) } diff --git a/event/bridge/pipeBridge_test.go b/event/bridge/pipeBridge_test.go index 6317196..da0c500 100644 --- a/event/bridge/pipeBridge_test.go +++ b/event/bridge/pipeBridge_test.go @@ -36,12 +36,7 @@ func clientHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, d } func serviceHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) { - service, err := NewPipeBridgeService(in, out) - if err != nil { - t.Errorf("Error opening %v pipe: %v", in, err) - done <- true - return - } + service := NewPipeBridgeService(in, out) service.Write(messageOrig) diff --git a/event/common.go b/event/common.go index 2c9f275..b1b5e0e 100644 --- a/event/common.go +++ b/event/common.go @@ -11,8 +11,6 @@ const ( PeerRequest = Type("PeerRequest") BlockPeer = Type("BlockPeer") JoinServer = Type("JoinServer") - // attributes: GroupServer - FinishedFetch = Type("FinishedFetch") ProtocolEngineStartListen = Type("ProtocolEngineStartListen") ProtocolEngineStopped = Type("ProtocolEngineStopped") @@ -91,20 +89,28 @@ const ( // GroupServer // ConnectionState - ServerStateChange = Type("GroupStateChange") + ServerStateChange = Type("ServerStateChange") /***** Application client / service messages *****/ // ProfileName, Password CreatePeer = Type("CreatePeer") - // service -> client: Identity(localId), Password + // service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')] // app -> Identity(onion) NewPeer = Type("NewPeer") // Password LoadProfiles = Type("LoadProfiles") + // Client has reloaded, triggers NewPeer s then ReloadDone + ReloadClient = Type("ReloadClient") + + ReloadDone = Type("ReloadDone") + + // Identity - Ask service to resend all connection states + ReloadPeer = Type("ReloadPeer") + // Identity(onion) ShutdownPeer = Type("ShutdownPeer") diff --git a/model/group.go b/model/group.go index 6d7d7ac..9978582 100644 --- a/model/group.go +++ b/model/group.go @@ -21,7 +21,7 @@ type Group struct { SignedGroupID []byte GroupKey [32]byte GroupServer string - Timeline Timeline + Timeline Timeline `json:"-"` Accepted bool Owner string IsCompromised bool diff --git a/model/message.go b/model/message.go index f22d68b..e5b2f4a 100644 --- a/model/message.go +++ b/model/message.go @@ -1,6 +1,7 @@ package model import ( + "encoding/json" "sort" "sync" "time" @@ -48,6 +49,16 @@ func (t *Timeline) GetMessages() []Message { return messages } +// GetCopy returns a duplicate of the Timeline +func (t *Timeline) GetCopy() *Timeline { + t.lock.Lock() + defer t.lock.Unlock() + bytes, _ := json.Marshal(t) + newt := &Timeline{} + json.Unmarshal(bytes, newt) + return newt +} + // SetMessages sets the Messages of this timeline. Only to be used in loading/initialization func (t *Timeline) SetMessages(messages []Message) { t.lock.Lock() diff --git a/model/profile.go b/model/profile.go index b3109a0..02c5d0c 100644 --- a/model/profile.go +++ b/model/profile.go @@ -25,10 +25,10 @@ type PublicProfile struct { Blocked bool Onion string Attributes map[string]string - Timeline Timeline - LocalID string // used by storage engine - State string `json:"-"` - lock sync.Mutex + //Timeline Timeline `json:"-"` // TODO: cache recent messages for client + LocalID string // used by storage engine + State string `json:"-"` + lock sync.Mutex } // Profile encapsulates all the attributes necessary to be a Cwtch Peer. @@ -119,6 +119,7 @@ func (p *Profile) RejectInvite(groupID string) { p.lock.Unlock() } +/* // AddMessageToContactTimeline allows the saving of a message sent via a direct connection chat to the profile. func (p *Profile) AddMessageToContactTimeline(onion string, fromMe bool, message string, sent time.Time) { p.lock.Lock() @@ -136,6 +137,7 @@ func (p *Profile) AddMessageToContactTimeline(onion string, fromMe bool, message } } } +*/ // AcceptInvite accepts a group invite func (p *Profile) AcceptInvite(groupID string) (err error) { @@ -384,13 +386,20 @@ func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte, return nil, nil, errors.New("group does not exist") } -// GetCopy returns a full deep copy of the Profile struct and its members -func (p *Profile) GetCopy() *Profile { +// GetCopy returns a full deep copy of the Profile struct and its members (timeline inclusion control by arg) +func (p *Profile) GetCopy(timeline bool) *Profile { p.lock.Lock() defer p.lock.Unlock() newp := new(Profile) bytes, _ := json.Marshal(p) json.Unmarshal(bytes, &newp) + + if timeline { + for groupID := range newp.Groups { + newp.Groups[groupID].Timeline = *p.Groups[groupID].Timeline.GetCopy() + } + } + return newp } diff --git a/model/profile_test.go b/model/profile_test.go index c056655..7071b4c 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -4,31 +4,8 @@ import ( "cwtch.im/cwtch/protocol" "github.com/golang/protobuf/proto" "testing" - "time" ) -func TestP2P(t *testing.T) { - sarah := GenerateNewProfile("Sarah") - alice := GenerateNewProfile("Alice") - - sarah.AddContact(alice.Onion, &alice.PublicProfile) - - sarah.AddMessageToContactTimeline(alice.Onion, false, "hello", time.Now()) - sarah.AddMessageToContactTimeline(alice.Onion, true, "world", time.Now()) - - contact, _ := sarah.GetContact(alice.Onion) - for i, m := range contact.Timeline.GetMessages() { - if i == 0 && (m.Message != "hello" || m.PeerID != alice.Onion) { - t.Fatalf("Timeline is invalid: %v", m) - } - - if i == 1 && (m.Message != "world" || m.PeerID != sarah.Onion) { - t.Fatalf("Timeline is invalid: %v", m) - } - t.Logf("Message: %v", m) - } -} - func TestProfileIdentity(t *testing.T) { sarah := GenerateNewProfile("Sarah") alice := GenerateNewProfile("Alice") diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index d8c2c2b..8dcd33a 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -21,7 +21,6 @@ type cwtchPeer struct { Profile *model.Profile mutex sync.Mutex shutdown bool - started bool queue *event.Queue eventBus event.Manager @@ -59,7 +58,6 @@ type CwtchPeer interface { GetContacts() []string GetContact(string) *model.PublicProfile - IsStarted() bool Listen() StartPeersConnections() StartGroupConnections() @@ -287,6 +285,7 @@ func (cp *cwtchPeer) RejectInvite(groupID string) { // Listen makes the peer open a listening port to accept incoming connections (and be detactably online) func (cp *cwtchPeer) Listen() { + log.Debugf("cwtchPeer Listen sending ProtocolEngineStartListen\n") cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{})) } @@ -317,11 +316,6 @@ func (cp *cwtchPeer) Shutdown() { cp.queue.Shutdown() } -// IsStarted returns true if Listen() has successfully been run before on this connection (ever). TODO: we will need to properly unset this flag on error if we want to support resumption in the future -func (cp *cwtchPeer) IsStarted() bool { - return cp.started -} - // eventHandler process events from other subsystems func (cp *cwtchPeer) eventHandler() { for { diff --git a/protocol/connections/connectionsmanager.go b/protocol/connections/connectionsmanager.go index 4856982..9300836 100644 --- a/protocol/connections/connectionsmanager.go +++ b/protocol/connections/connectionsmanager.go @@ -46,7 +46,7 @@ func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConn // ManageServerConnection creates a new ServerConnection for Host with the given callback handler. // If there is an establish connection, it is replaced with a new one, assuming this came from // a new JoinServer from a new Group being joined. If it is still connecting to a server, the second request will be abandonded -func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) { +func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage)) { m.lock.Lock() defer m.lock.Unlock() @@ -61,7 +61,6 @@ func (m *Manager) ManageServerConnection(host string, engine Engine, messageHand newPsc := NewPeerServerConnection(engine, host) newPsc.GroupMessageHandler = messageHandler - newPsc.CloseHandler = closedHandler go newPsc.Run() m.serverConnections[host] = newPsc @@ -71,6 +70,11 @@ func (m *Manager) ManageServerConnection(host string, engine Engine, messageHand } } +// SetServerSynced is a helper for peerserver connections and engine to call when a Fetch is done to set the state of the connection to SYNCED +func (m *Manager) SetServerSynced(onion string) { + m.serverConnections[onion].setState(SYNCED) +} + // GetPeerPeerConnectionForOnion safely returns a given peer connection func (m *Manager) GetPeerPeerConnectionForOnion(host string) (ppc *PeerPeerConnection) { m.lock.Lock() diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 2cb9959..c568d05 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -239,15 +239,9 @@ func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) { e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())})) } -// finishedFetch is a callback function the processes the termination of a fetch channel from a given server -func (e *engine) finishedFetch(server string) { - log.Debugf("Finished Fetch for %v\n", server) - e.eventManager.Publish(event.NewEvent(event.FinishedFetch, map[event.Field]string{event.GroupServer: server})) -} - // joinServer manages a new server connection with the given onion address func (e *engine) joinServer(onion string) { - e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage, e.finishedFetch) + e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage) } // sendMessageToGroup attempts to sent the given message to the given group id. diff --git a/protocol/connections/peerserverconnection.go b/protocol/connections/peerserverconnection.go index e6bad78..86b527c 100644 --- a/protocol/connections/peerserverconnection.go +++ b/protocol/connections/peerserverconnection.go @@ -26,7 +26,6 @@ type PeerServerConnection struct { protocolEngine Engine GroupMessageHandler func(string, *protocol.GroupMessage) - CloseHandler func(string) } // NewPeerServerConnection creates a new Peer->Server outbound connection @@ -53,10 +52,10 @@ func (psc *PeerServerConnection) setState(state ConnectionState) { })) } -// WaitTilAuthenticated waits until the underlying connection is authenticated -func (psc *PeerServerConnection) WaitTilAuthenticated() { +// WaitTilSynced waits until the underlying connection is authenticated +func (psc *PeerServerConnection) WaitTilSynced() { for { - if psc.GetState() == AUTHENTICATED { + if psc.GetState() == SYNCED { break } time.Sleep(time.Second * 1) @@ -110,8 +109,8 @@ func (psc *PeerServerConnection) Break() error { // SendGroupMessage sends the given protocol message to the Server. func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) error { - if psc.state != AUTHENTICATED { - return errors.New("peer is not yet connected & authenticated to server cannot send message") + if psc.state != SYNCED { + return errors.New("peer is not yet connected & authenticated & synced to server cannot send message") } err := psc.connection.Do(func() error { @@ -159,5 +158,5 @@ func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) { // HandleFetchDone calls the supplied callback for when a fetch connection is closed func (psc *PeerServerConnection) HandleFetchDone() { - psc.CloseHandler(psc.Server) + psc.setState(SYNCED) } diff --git a/protocol/connections/peerserverconnection_test.go b/protocol/connections/peerserverconnection_test.go index cc8946b..c15cd20 100644 --- a/protocol/connections/peerserverconnection_test.go +++ b/protocol/connections/peerserverconnection_test.go @@ -27,7 +27,7 @@ type TestServer struct { } func (ts *TestServer) HandleGroupMessage(gm *protocol.GroupMessage) { - ts.Received<-true + ts.Received <- true } func (ts *TestServer) HandleFetchRequest() []*protocol.GroupMessage { @@ -68,7 +68,6 @@ func TestPeerServerConnection(t *testing.T) { pub, priv, _ := ed25519.GenerateKey(rand.Reader) identity := identity.InitializeV3("", &priv, &pub) - t.Logf("Launching Server....\n") ts := new(TestServer) ts.Init() @@ -86,17 +85,13 @@ func TestPeerServerConnection(t *testing.T) { psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) { numcalls++ } - closedCalls := 0 - psc.CloseHandler = func(s string) { - closedCalls++ - } state := psc.GetState() if state != DISCONNECTED { t.Errorf("new connections should start in disconnected state") } time.Sleep(time.Second * 1) go psc.Run() - psc.WaitTilAuthenticated() + psc.WaitTilSynced() gm := &protocol.GroupMessage{Ciphertext: []byte("hello"), Signature: []byte{}} psc.SendGroupMessage(gm) @@ -107,9 +102,4 @@ func TestPeerServerConnection(t *testing.T) { if numcalls != 2 { t.Errorf("Should have received 2 calls from fetch request, instead received %v", numcalls) } - - if closedCalls != 1 { - t.Errorf("Should have closed connection 1 time, instead of %v", closedCalls) - } - } diff --git a/protocol/connections/state.go b/protocol/connections/state.go index f8a6d9c..7b5037f 100644 --- a/protocol/connections/state.go +++ b/protocol/connections/state.go @@ -8,20 +8,22 @@ type ConnectionState int // CONNECTING - We are in the process of attempting to connect to a given endpoint // CONNECTED - We have connected but not yet authenticated // AUTHENTICATED - im.ricochet.auth-hidden-server has succeeded on the connection. +// SYNCED - we have pulled all the messages for groups from the server and are ready to send const ( DISCONNECTED ConnectionState = iota CONNECTING CONNECTED AUTHENTICATED + SYNCED FAILED KILLED ) var ( // ConnectionStateName allows conversion of states to their string representations - ConnectionStateName = []string{"Disconnected", "Connecting", "Connected", "Authenticated", "Failed", "Killed"} + ConnectionStateName = []string{"Disconnected", "Connecting", "Connected", "Authenticated", "Synced", "Failed", "Killed"} // ConnectionStateType allows conversion of strings to their state type ConnectionStateType = map[string]ConnectionState{"Disconnected": DISCONNECTED, "Connecting": CONNECTING, - "Connected": CONNECTED, "Authenticated": AUTHENTICATED, "Failed": FAILED, "Killed": KILLED} + "Connected": CONNECTED, "Authenticated": AUTHENTICATED, "Synced": SYNCED, "Failed": FAILED, "Killed": KILLED} ) diff --git a/storage/profile_store.go b/storage/profile_store.go index 7d9fe05..78ec173 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -28,7 +28,9 @@ type profileStore struct { type ProfileStore interface { Load() error Shutdown() - GetProfileCopy() *model.Profile + GetProfileCopy(timeline bool) *model.Profile + GetNewPeerMessage() *event.Event + GetStatusMessages() []*event.Event } // NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them @@ -52,6 +54,8 @@ func NewProfileWriterStore(eventManager event.Manager, directory, password strin ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue.EventChannel) ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue.EventChannel) ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue.EventChannel) + ps.eventManager.Subscribe(event.PeerStateChange, ps.queue.EventChannel) + ps.eventManager.Subscribe(event.ServerStateChange, ps.queue.EventChannel) return ps } @@ -67,7 +71,7 @@ func ReadProfile(directory, password string) (*model.Profile, error) { return nil, err } - profile := ps.GetProfileCopy() + profile := ps.GetProfileCopy(true) return profile, nil } @@ -78,6 +82,37 @@ func NewProfile(name string) *model.Profile { return profile } +// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers +func (ps *profileStore) GetNewPeerMessage() *event.Event { + message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running") + return &message +} + +func (ps *profileStore) GetStatusMessages() []*event.Event { + messages := []*event.Event{} + for _, contact := range ps.profile.Contacts { + message := event.NewEvent(event.PeerStateChange, map[event.Field]string{ + event.RemotePeer: string(contact.Onion), + event.ConnectionState: contact.State, + }) + messages = append(messages, &message) + } + + doneServers := make(map[string]bool) + for _, group := range ps.profile.Groups { + if _, exists := doneServers[group.GroupServer]; !exists { + message := event.NewEvent(event.ServerStateChange, map[event.Field]string{ + event.GroupServer: string(group.GroupServer), + event.ConnectionState: group.State, + }) + messages = append(messages, &message) + doneServers[group.GroupServer] = true + } + } + + return messages +} + func (ps *profileStore) save() error { if ps.writer { bytes, _ := json.Marshal(ps.profile) @@ -97,15 +132,9 @@ func (ps *profileStore) Load() error { if err == nil { ps.profile = cp - for _, profile := range cp.Contacts { - ss := NewStreamStore(ps.directory, profile.LocalID, ps.password) - profile.Timeline.SetMessages(ss.Read()) - ps.streamStores[profile.Onion] = ss - } - - for _, group := range cp.Groups { + for gid, group := range cp.Groups { ss := NewStreamStore(ps.directory, group.LocalID, ps.password) - group.Timeline.SetMessages(ss.Read()) + cp.Groups[gid].Timeline.SetMessages(ss.Read()) ps.streamStores[group.GroupID] = ss } } @@ -113,8 +142,8 @@ func (ps *profileStore) Load() error { return err } -func (ps *profileStore) GetProfileCopy() *model.Profile { - return ps.profile.GetCopy() +func (ps *profileStore) GetProfileCopy(timeline bool) *model.Profile { + return ps.profile.GetCopy(timeline) } func (ps *profileStore) eventHandler() { @@ -131,10 +160,11 @@ func (ps *profileStore) eventHandler() { var pp *model.PublicProfile json.Unmarshal([]byte(ev.Data[event.Data]), &pp) ps.profile.AddContact(ev.Data[event.RemotePeer], pp) - ss := NewStreamStore(ps.directory, pp.LocalID, ps.password) + // TODO: configure - allow peers to be configured to turn on limited storage + /*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password) pp.Timeline.SetMessages(ss.Read()) ps.streamStores[pp.Onion] = ss - ps.save() + ps.save()*/ case event.GroupCreated: var group *model.Group json.Unmarshal([]byte(ev.Data[event.Data]), &group) @@ -193,6 +223,16 @@ func (ps *profileStore) eventHandler() { } else { log.Errorf("error storing new group message: %v stream store does not exist", ev) } + case event.PeerStateChange: + if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists { + ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState] + } + case event.ServerStateChange: + for _, group := range ps.profile.Groups { + if group.GroupServer == ev.Data[event.GroupServer] { + group.State = ev.Data[event.ConnectionState] + } + } default: return } diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go index dbb6479..9e19908 100644 --- a/storage/profile_store_test.go +++ b/storage/profile_store_test.go @@ -35,14 +35,14 @@ func TestProfileStoreWriteRead(t *testing.T) { packet := protocol.CwtchPeerPacket{} proto.Unmarshal(invite, &packet) invite, _ = proto.Marshal(packet.GetGroupChatInvite()) - eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy().Onion, event.GroupInvite: string(invite)})) + eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)})) time.Sleep(1 * time.Second) eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{ event.GroupID: groupid, event.TimestampSent: time.Now().Format(time.RFC3339Nano), event.TimestampReceived: time.Now().Format(time.RFC3339Nano), - event.RemotePeer: ps1.GetProfileCopy().Onion, + event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.Data: testMessage, })) time.Sleep(1 * time.Second) @@ -55,7 +55,7 @@ func TestProfileStoreWriteRead(t *testing.T) { t.Errorf("Error createing profileStore: %v\n", err) } - profile = ps2.GetProfileCopy() + profile = ps2.GetProfileCopy(true) if profile.Name != testProfileName { t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name) } @@ -65,7 +65,7 @@ func TestProfileStoreWriteRead(t *testing.T) { t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v) } - group2 := ps2.GetProfileCopy().Groups[groupid] + group2 := ps2.GetProfileCopy(true).Groups[groupid] if group2 == nil { t.Errorf("Group not loaded\n") } diff --git a/storage/stream_store.go b/storage/stream_store.go index 0e38adc..15a98fa 100644 --- a/storage/stream_store.go +++ b/storage/stream_store.go @@ -23,6 +23,7 @@ type streamStore struct { storeDirectory string filenameBase string + // Buffer is used just for current file to write to messages []model.Message bufferByteCount int @@ -100,7 +101,7 @@ func (ss *streamStore) rotateFileStore() { } } -// FetchMessages returns all messages from the backing file. +// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file) func (ss *streamStore) Read() (messages []model.Message) { ss.lock.Lock() defer ss.lock.Unlock() @@ -116,12 +117,8 @@ func (ss *streamStore) Read() (messages []model.Message) { } msgs := []model.Message{} - err = json.Unmarshal([]byte(bytes), &msgs) - if err == nil { - resp = append(resp, msgs...) - } else { - log.Debugf("Failed to unmarshal messages: %v", err) - } + json.Unmarshal([]byte(bytes), &msgs) + resp = append(resp, msgs...) } return resp diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index fcdc246..117d59e 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -66,7 +66,7 @@ func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID strin if state == connections.FAILED { t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, groupID) } - if state != connections.AUTHENTICATED { + if state != connections.SYNCED { fmt.Printf("peer %v waiting connect to group %v, currently: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state]) time.Sleep(time.Second * 5) continue @@ -110,6 +110,7 @@ func TestCwtchPeerIntegration(t *testing.T) { log.ExcludeFromPattern("connection/connection") log.ExcludeFromPattern("outbound/3dhauthchannel") log.ExcludeFromPattern("event/eventmanager") + log.ExcludeFromPattern("pipeBridge") acn, err := connectivity.StartTor(".", "") if err != nil { t.Fatalf("Could not start Tor: %v", err) @@ -142,9 +143,10 @@ func TestCwtchPeerIntegration(t *testing.T) { app := app2.NewApp(acn, "./storage") - bridge1, bridge2 := bridge.MakeGoChanBridge() - appClient := app2.NewAppClient("./storage", bridge1) - appService := app2.NewAppService(acn, "./storage", bridge2) + bridgeClient := bridge.NewPipeBridgeClient("./clientPipe", "./servicePipe") + bridgeService := bridge.NewPipeBridgeService("./servicePipe", "./clientPipe") + appClient := app2.NewAppClient("./storage", bridgeClient) + appService := app2.NewAppService(acn, "./storage", bridgeService) numGoRoutinesPostAppStart := runtime.NumGoroutine() @@ -168,8 +170,6 @@ func TestCwtchPeerIntegration(t *testing.T) { carol := utils.WaitGetPeer(appClient, "carol") fmt.Println("Carol created:", carol.GetProfile().Onion) - //fmt.Println("Carol created:", carol.GetProfile().Onion) - app.LaunchPeers() appClient.LaunchPeers() @@ -384,12 +384,22 @@ func TestCwtchPeerIntegration(t *testing.T) { numGoRoutinesPostCarol := runtime.NumGoroutine() fmt.Println("Shutting down apps...") + fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine()) app.Shutdown() + fmt.Printf("appClientShutdown: %v\n", runtime.NumGoroutine()) appClient.Shutdown() + fmt.Printf("appServiceShutdown: %v\n", runtime.NumGoroutine()) appService.Shutdown() - bridge1.Shutdown() - bridge2.Shutdown() + fmt.Printf("bridgeClientShutdown: %v\n", runtime.NumGoroutine()) + bridgeClient.Shutdown() + time.Sleep(2 * time.Second) + + fmt.Printf("brideServiceShutdown: %v\n", runtime.NumGoroutine()) + bridgeService.Shutdown() + time.Sleep(2 * time.Second) + + fmt.Printf("Done shutdown: %v\n", runtime.NumGoroutine()) numGoRoutinesPostAppShutdown := runtime.NumGoroutine() fmt.Println("Shutting down ACN...")