Merge branch 'fetchEvent' of dan/cwtch into master

This commit is contained in:
Sarah Jamie Lewis 2019-04-24 13:32:15 -07:00 committed by Gogs
commit 475073f9f6
8 changed files with 40 additions and 6 deletions

View File

@ -11,6 +11,8 @@ const (
PeerRequest = Type("PeerRequest") PeerRequest = Type("PeerRequest")
BlockPeer = Type("BlockPeer") BlockPeer = Type("BlockPeer")
JoinServer = Type("JoinServer") JoinServer = Type("JoinServer")
// attributes: GroupServer
FinishedFetch = Type("FinishedFetch")
ProtocolEngineStartListen = Type("ProtocolEngineStartListen") ProtocolEngineStartListen = Type("ProtocolEngineStartListen")
ProtocolEngineStopped = Type("ProtocolEngineStopped") ProtocolEngineStopped = Type("ProtocolEngineStopped")

View File

@ -283,8 +283,14 @@ func (cp *cwtchPeer) BlockPeer(peer string) error {
// AcceptInvite accepts a given existing group invite // AcceptInvite accepts a given existing group invite
func (cp *cwtchPeer) AcceptInvite(groupID string) error { func (cp *cwtchPeer) AcceptInvite(groupID string) error {
err := cp.Profile.AcceptInvite(groupID)
if err != nil {
return err
}
cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID})) cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID}))
return cp.Profile.AcceptInvite(groupID) cp.JoinServer(cp.Profile.Groups[groupID].GroupServer)
return nil
} }
// RejectInvite rejects a given group invite. // RejectInvite rejects a given group invite.

View File

@ -42,13 +42,14 @@ func (m *Manager) ManagePeerConnection(host string, engine *Engine) *PeerPeerCon
} }
// ManageServerConnection creates a new ServerConnection for Host with the given callback handler. // ManageServerConnection creates a new ServerConnection for Host with the given callback handler.
func (m *Manager) ManageServerConnection(host string, handler func(string, *protocol.GroupMessage)) { func (m *Manager) ManageServerConnection(host string, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) {
m.lock.Lock() m.lock.Lock()
psc, exists := m.serverConnections[host] psc, exists := m.serverConnections[host]
newPsc := NewPeerServerConnection(m.acn, host) newPsc := NewPeerServerConnection(m.acn, host)
newPsc.GroupMessageHandler = handler newPsc.GroupMessageHandler = messageHandler
newPsc.CloseHandler = closedHandler
go newPsc.Run() go newPsc.Run()
m.serverConnections[host] = newPsc m.serverConnections[host] = newPsc

View File

@ -91,7 +91,7 @@ func (e *Engine) eventHandler() {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: err.Error()})) e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: err.Error()}))
} }
} else { } else {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
} }
case event.BlockPeer: case event.BlockPeer:
e.blocked.Store(ev.Data[event.RemotePeer], true) e.blocked.Store(ev.Data[event.RemotePeer], true)
@ -210,9 +210,14 @@ func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) {
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())})) e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
} }
// FinishedFetch is a callback function the processes the termination of a fetch channel from a given server
func (e *Engine) FinishedFetch(server string) {
e.eventManager.Publish(event.NewEvent(event.FinishedFetch, map[event.Field]string{event.GroupServer: server}))
}
// JoinServer manages a new server connection with the given onion address // JoinServer manages a new server connection with the given onion address
func (e *Engine) JoinServer(onion string) { func (e *Engine) JoinServer(onion string) {
e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage) e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage, e.FinishedFetch)
} }
// SendMessageToGroup attempts to sent the given message to the given group id. // SendMessageToGroup attempts to sent the given message to the given group id.

View File

@ -19,6 +19,7 @@ type CwtchPeerFetchChannel struct {
// CwtchPeerFetchChannelHandler should be implemented by peers to receive new messages. // CwtchPeerFetchChannelHandler should be implemented by peers to receive new messages.
type CwtchPeerFetchChannelHandler interface { type CwtchPeerFetchChannelHandler interface {
HandleGroupMessage(*protocol.GroupMessage) HandleGroupMessage(*protocol.GroupMessage)
HandleFetchDone()
} }
// Type returns the type string for this channel, e.g. "im.ricochet.server.fetch) // Type returns the type string for this channel, e.g. "im.ricochet.server.fetch)
@ -28,7 +29,7 @@ func (cpfc *CwtchPeerFetchChannel) Type() string {
// Closed is called when the channel is closed for any reason. // Closed is called when the channel is closed for any reason.
func (cpfc *CwtchPeerFetchChannel) Closed(err error) { func (cpfc *CwtchPeerFetchChannel) Closed(err error) {
cpfc.Handler.HandleFetchDone()
} }
// OnlyClientCanOpen - for Cwtch server channels only client can open // OnlyClientCanOpen - for Cwtch server channels only client can open

View File

@ -11,12 +11,17 @@ import (
type TestHandler struct { type TestHandler struct {
Received bool Received bool
Closed bool
} }
func (th *TestHandler) HandleGroupMessage(m *protocol.GroupMessage) { func (th *TestHandler) HandleGroupMessage(m *protocol.GroupMessage) {
th.Received = true th.Received = true
} }
func (th *TestHandler) HandleFetchDone() {
th.Closed = true
}
func TestPeerFetchChannelAttributes(t *testing.T) { func TestPeerFetchChannelAttributes(t *testing.T) {
cssc := new(CwtchPeerFetchChannel) cssc := new(CwtchPeerFetchChannel)
if cssc.Type() != "im.cwtch.server.fetch" { if cssc.Type() != "im.cwtch.server.fetch" {

View File

@ -26,6 +26,7 @@ type PeerServerConnection struct {
acn connectivity.ACN acn connectivity.ACN
GroupMessageHandler func(string, *protocol.GroupMessage) GroupMessageHandler func(string, *protocol.GroupMessage)
CloseHandler func(string)
} }
// NewPeerServerConnection creates a new Peer->Server outbound connection // NewPeerServerConnection creates a new Peer->Server outbound connection
@ -137,3 +138,8 @@ func (psc *PeerServerConnection) Close() {
func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) { func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) {
psc.GroupMessageHandler(psc.Server, gm) psc.GroupMessageHandler(psc.Server, gm)
} }
// HandleFetchDone calls the supplied callback for when a fetch connection is closed
func (psc *PeerServerConnection) HandleFetchDone() {
psc.CloseHandler(psc.Server)
}

View File

@ -80,6 +80,10 @@ func TestPeerServerConnection(t *testing.T) {
psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) { psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) {
numcalls++ numcalls++
} }
closedCalls := 0
psc.CloseHandler = func(s string) {
closedCalls++
}
state := psc.GetState() state := psc.GetState()
if state != DISCONNECTED { if state != DISCONNECTED {
t.Errorf("new connections should start in disconnected state") t.Errorf("new connections should start in disconnected state")
@ -103,4 +107,8 @@ func TestPeerServerConnection(t *testing.T) {
t.Errorf("Should have received 2 calls from fetch request, instead received %v", numcalls) t.Errorf("Should have received 2 calls from fetch request, instead received %v", numcalls)
} }
if closedCalls != 1 {
t.Errorf("Should have closed connection 1 time, instead of %v", closedCalls)
}
} }