diff --git a/event/common.go b/event/common.go index f658788..f8d68e9 100644 --- a/event/common.go +++ b/event/common.go @@ -11,6 +11,8 @@ const ( PeerRequest = Type("PeerRequest") BlockPeer = Type("BlockPeer") JoinServer = Type("JoinServer") + // attributes: GroupServer + FinishedFetch = Type("FinishedFetch") ProtocolEngineStartListen = Type("ProtocolEngineStartListen") ProtocolEngineStopped = Type("ProtocolEngineStopped") diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index f04e442..44e5b19 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -283,8 +283,14 @@ func (cp *cwtchPeer) BlockPeer(peer string) error { // AcceptInvite accepts a given existing group invite func (cp *cwtchPeer) AcceptInvite(groupID string) error { + err := cp.Profile.AcceptInvite(groupID) + if err != nil { + return err + } cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID})) - return cp.Profile.AcceptInvite(groupID) + cp.JoinServer(cp.Profile.Groups[groupID].GroupServer) + + return nil } // RejectInvite rejects a given group invite. diff --git a/protocol/connections/connectionsmanager.go b/protocol/connections/connectionsmanager.go index 8c8b9e2..100e2d7 100644 --- a/protocol/connections/connectionsmanager.go +++ b/protocol/connections/connectionsmanager.go @@ -42,13 +42,14 @@ func (m *Manager) ManagePeerConnection(host string, engine *Engine) *PeerPeerCon } // ManageServerConnection creates a new ServerConnection for Host with the given callback handler. -func (m *Manager) ManageServerConnection(host string, handler func(string, *protocol.GroupMessage)) { +func (m *Manager) ManageServerConnection(host string, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) { m.lock.Lock() psc, exists := m.serverConnections[host] newPsc := NewPeerServerConnection(m.acn, host) - newPsc.GroupMessageHandler = handler + newPsc.GroupMessageHandler = messageHandler + newPsc.CloseHandler = closedHandler go newPsc.Run() m.serverConnections[host] = newPsc diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index a22faee..5aa89a7 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -91,7 +91,7 @@ func (e *Engine) eventHandler() { e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: err.Error()})) } } else { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) } case event.BlockPeer: e.blocked.Store(ev.Data[event.RemotePeer], true) @@ -210,9 +210,14 @@ func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) { e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())})) } +// FinishedFetch is a callback function the processes the termination of a fetch channel from a given server +func (e *Engine) FinishedFetch(server string) { + e.eventManager.Publish(event.NewEvent(event.FinishedFetch, map[event.Field]string{event.GroupServer: server})) +} + // JoinServer manages a new server connection with the given onion address func (e *Engine) JoinServer(onion string) { - e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage) + e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage, e.FinishedFetch) } // SendMessageToGroup attempts to sent the given message to the given group id. diff --git a/protocol/connections/fetch/peer_fetch_channel.go b/protocol/connections/fetch/peer_fetch_channel.go index 50c9c92..62f3835 100644 --- a/protocol/connections/fetch/peer_fetch_channel.go +++ b/protocol/connections/fetch/peer_fetch_channel.go @@ -19,6 +19,7 @@ type CwtchPeerFetchChannel struct { // CwtchPeerFetchChannelHandler should be implemented by peers to receive new messages. type CwtchPeerFetchChannelHandler interface { HandleGroupMessage(*protocol.GroupMessage) + HandleFetchDone() } // Type returns the type string for this channel, e.g. "im.ricochet.server.fetch) @@ -28,7 +29,7 @@ func (cpfc *CwtchPeerFetchChannel) Type() string { // Closed is called when the channel is closed for any reason. func (cpfc *CwtchPeerFetchChannel) Closed(err error) { - + cpfc.Handler.HandleFetchDone() } // OnlyClientCanOpen - for Cwtch server channels only client can open diff --git a/protocol/connections/fetch/peer_fetch_channel_test.go b/protocol/connections/fetch/peer_fetch_channel_test.go index 7f63593..1d9652a 100644 --- a/protocol/connections/fetch/peer_fetch_channel_test.go +++ b/protocol/connections/fetch/peer_fetch_channel_test.go @@ -11,12 +11,17 @@ import ( type TestHandler struct { Received bool + Closed bool } func (th *TestHandler) HandleGroupMessage(m *protocol.GroupMessage) { th.Received = true } +func (th *TestHandler) HandleFetchDone() { + th.Closed = true +} + func TestPeerFetchChannelAttributes(t *testing.T) { cssc := new(CwtchPeerFetchChannel) if cssc.Type() != "im.cwtch.server.fetch" { diff --git a/protocol/connections/peerserverconnection.go b/protocol/connections/peerserverconnection.go index 2900ecb..3b3094b 100644 --- a/protocol/connections/peerserverconnection.go +++ b/protocol/connections/peerserverconnection.go @@ -26,6 +26,7 @@ type PeerServerConnection struct { acn connectivity.ACN GroupMessageHandler func(string, *protocol.GroupMessage) + CloseHandler func(string) } // NewPeerServerConnection creates a new Peer->Server outbound connection @@ -137,3 +138,8 @@ func (psc *PeerServerConnection) Close() { func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) { psc.GroupMessageHandler(psc.Server, gm) } + +// HandleFetchDone calls the supplied callback for when a fetch connection is closed +func (psc *PeerServerConnection) HandleFetchDone() { + psc.CloseHandler(psc.Server) +} diff --git a/protocol/connections/peerserverconnection_test.go b/protocol/connections/peerserverconnection_test.go index 9777292..d86d444 100644 --- a/protocol/connections/peerserverconnection_test.go +++ b/protocol/connections/peerserverconnection_test.go @@ -80,6 +80,10 @@ func TestPeerServerConnection(t *testing.T) { psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) { numcalls++ } + closedCalls := 0 + psc.CloseHandler = func(s string) { + closedCalls++ + } state := psc.GetState() if state != DISCONNECTED { t.Errorf("new connections should start in disconnected state") @@ -103,4 +107,8 @@ func TestPeerServerConnection(t *testing.T) { t.Errorf("Should have received 2 calls from fetch request, instead received %v", numcalls) } + if closedCalls != 1 { + t.Errorf("Should have closed connection 1 time, instead of %v", closedCalls) + } + }