Browse Source

make cwtch responsible for firing joinServer after joining a group; fire a fetch done event

pull/242/head
Dan Ballard 11 months ago
parent
commit
2b2dcb9f6b
8 changed files with 40 additions and 6 deletions
  1. +2
    -0
      event/common.go
  2. +7
    -1
      peer/cwtch_peer.go
  3. +3
    -2
      protocol/connections/connectionsmanager.go
  4. +7
    -2
      protocol/connections/engine.go
  5. +2
    -1
      protocol/connections/fetch/peer_fetch_channel.go
  6. +5
    -0
      protocol/connections/fetch/peer_fetch_channel_test.go
  7. +6
    -0
      protocol/connections/peerserverconnection.go
  8. +8
    -0
      protocol/connections/peerserverconnection_test.go

+ 2
- 0
event/common.go View File

@@ -11,6 +11,8 @@ const (
PeerRequest = Type("PeerRequest")
BlockPeer = Type("BlockPeer")
JoinServer = Type("JoinServer")
// attributes: GroupServer
FinishedFetch = Type("FinishedFetch")

ProtocolEngineStartListen = Type("ProtocolEngineStartListen")
ProtocolEngineStopped = Type("ProtocolEngineStopped")


+ 7
- 1
peer/cwtch_peer.go View File

@@ -283,8 +283,14 @@ func (cp *cwtchPeer) BlockPeer(peer string) error {

// AcceptInvite accepts a given existing group invite
func (cp *cwtchPeer) AcceptInvite(groupID string) error {
err := cp.Profile.AcceptInvite(groupID)
if err != nil {
return err
}
cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID}))
return cp.Profile.AcceptInvite(groupID)
cp.JoinServer(cp.Profile.Groups[groupID].GroupServer)

return nil
}

// RejectInvite rejects a given group invite.


+ 3
- 2
protocol/connections/connectionsmanager.go View File

@@ -42,13 +42,14 @@ func (m *Manager) ManagePeerConnection(host string, engine *Engine) *PeerPeerCon
}

// ManageServerConnection creates a new ServerConnection for Host with the given callback handler.
func (m *Manager) ManageServerConnection(host string, handler func(string, *protocol.GroupMessage)) {
func (m *Manager) ManageServerConnection(host string, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) {
m.lock.Lock()

psc, exists := m.serverConnections[host]

newPsc := NewPeerServerConnection(m.acn, host)
newPsc.GroupMessageHandler = handler
newPsc.GroupMessageHandler = messageHandler
newPsc.CloseHandler = closedHandler
go newPsc.Run()
m.serverConnections[host] = newPsc



+ 7
- 2
protocol/connections/engine.go View File

@@ -91,7 +91,7 @@ func (e *Engine) eventHandler() {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: err.Error()}))
}
} else {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.Signature: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
}
case event.BlockPeer:
e.blocked.Store(ev.Data[event.RemotePeer], true)
@@ -210,9 +210,14 @@ func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) {
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
}

// FinishedFetch is a callback function the processes the termination of a fetch channel from a given server
func (e *Engine) FinishedFetch(server string) {
e.eventManager.Publish(event.NewEvent(event.FinishedFetch, map[event.Field]string{event.GroupServer: server}))
}

// JoinServer manages a new server connection with the given onion address
func (e *Engine) JoinServer(onion string) {
e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage)
e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage, e.FinishedFetch)
}

// SendMessageToGroup attempts to sent the given message to the given group id.


+ 2
- 1
protocol/connections/fetch/peer_fetch_channel.go View File

@@ -19,6 +19,7 @@ type CwtchPeerFetchChannel struct {
// CwtchPeerFetchChannelHandler should be implemented by peers to receive new messages.
type CwtchPeerFetchChannelHandler interface {
HandleGroupMessage(*protocol.GroupMessage)
HandleFetchDone()
}

// Type returns the type string for this channel, e.g. "im.ricochet.server.fetch)
@@ -28,7 +29,7 @@ func (cpfc *CwtchPeerFetchChannel) Type() string {

// Closed is called when the channel is closed for any reason.
func (cpfc *CwtchPeerFetchChannel) Closed(err error) {
cpfc.Handler.HandleFetchDone()
}

// OnlyClientCanOpen - for Cwtch server channels only client can open


+ 5
- 0
protocol/connections/fetch/peer_fetch_channel_test.go View File

@@ -11,12 +11,17 @@ import (

type TestHandler struct {
Received bool
Closed bool
}

func (th *TestHandler) HandleGroupMessage(m *protocol.GroupMessage) {
th.Received = true
}

func (th *TestHandler) HandleFetchDone() {
th.Closed = true
}

func TestPeerFetchChannelAttributes(t *testing.T) {
cssc := new(CwtchPeerFetchChannel)
if cssc.Type() != "im.cwtch.server.fetch" {


+ 6
- 0
protocol/connections/peerserverconnection.go View File

@@ -26,6 +26,7 @@ type PeerServerConnection struct {
acn connectivity.ACN

GroupMessageHandler func(string, *protocol.GroupMessage)
CloseHandler func(string)
}

// NewPeerServerConnection creates a new Peer->Server outbound connection
@@ -137,3 +138,8 @@ func (psc *PeerServerConnection) Close() {
func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) {
psc.GroupMessageHandler(psc.Server, gm)
}

// HandleFetchDone calls the supplied callback for when a fetch connection is closed
func (psc *PeerServerConnection) HandleFetchDone() {
psc.CloseHandler(psc.Server)
}

+ 8
- 0
protocol/connections/peerserverconnection_test.go View File

@@ -80,6 +80,10 @@ func TestPeerServerConnection(t *testing.T) {
psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) {
numcalls++
}
closedCalls := 0
psc.CloseHandler = func(s string) {
closedCalls++
}
state := psc.GetState()
if state != DISCONNECTED {
t.Errorf("new connections should start in disconnected state")
@@ -103,4 +107,8 @@ func TestPeerServerConnection(t *testing.T) {
t.Errorf("Should have received 2 calls from fetch request, instead received %v", numcalls)
}

if closedCalls != 1 {
t.Errorf("Should have closed connection 1 time, instead of %v", closedCalls)
}

}

Loading…
Cancel
Save