diff --git a/client/connections/peerserverconnection.go b/client/connections/peerserverconnection.go index bcb2492..85f9fe9 100644 --- a/client/connections/peerserverconnection.go +++ b/client/connections/peerserverconnection.go @@ -1,13 +1,14 @@ package connections import ( - "github.com/s-rah/go-ricochet" - "github.com/s-rah/go-ricochet/connection" - "github.com/s-rah/go-ricochet/channels" - "github.com/s-rah/go-ricochet/identity" - "github.com/s-rah/go-ricochet/utils" + "git.mascherari.press/cwtch/client/fetch" "git.mascherari.press/cwtch/client/send" "git.mascherari.press/cwtch/protocol" + "github.com/s-rah/go-ricochet" + "github.com/s-rah/go-ricochet/channels" + "github.com/s-rah/go-ricochet/connection" + "github.com/s-rah/go-ricochet/identity" + "github.com/s-rah/go-ricochet/utils" "time" ) @@ -16,6 +17,8 @@ type PeerServerConnection struct { Server string state ConnectionState connection connection.Connection + + GroupMessageHandler func(string, *protocol.GroupMessage) } func NewPeerServerConnection(serverhostname string) *PeerServerConnection { @@ -34,6 +37,7 @@ func (psc *PeerServerConnection) GetState() ConnectionState { func (psc *PeerServerConnection) Run() error { rc, err := goricochet.Open(psc.Server) if err == nil { + rc.TraceLog(true) psc.connection = *rc psc.state = CONNECTED pk, err := utils.GeneratePrivateKey() @@ -41,6 +45,14 @@ func (psc *PeerServerConnection) Run() error { _, err := connection.HandleOutboundConnection(&psc.connection).ProcessAuthAsClient(identity.Initialize("cwtchpeer", pk)) if err == nil { psc.state = AUTHENTICATED + + go func() { + psc.connection.Do(func() error { + psc.connection.RequestOpenChannel("im.cwtch.server.fetch", &fetch.CwtchPeerFetchChannel{Handler: psc}) + return nil + }) + }() + psc.connection.Process(psc) } } @@ -48,32 +60,31 @@ func (psc *PeerServerConnection) Run() error { return err } - // Break makes Run() return and prevents processing, but doesn't close the connection. func (psc *PeerServerConnection) Break() error { return psc.connection.Break() } func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) { - psc.connection.Do(func() error { - psc.connection.RequestOpenChannel("im.cwtch.server.send", &send.CwtchPeerSendChannel{}) - return nil - }) - // TODO We have to wait to receive the channel result before we can continue - // We should have a better mechanism for this kindof interaction - time.Sleep(time.Second * 1) - psc.connection.Do(func() error { - channel := psc.connection.Channel("im.cwtch.server.send", channels.Outbound) - if channel != nil { - sendchannel, ok := channel.Handler.(*send.CwtchPeerSendChannel) - if ok { - sendchannel.SendGroupMessage(gm) - } - } - return nil - }) + psc.connection.Do(func() error { + psc.connection.RequestOpenChannel("im.cwtch.server.send", &send.CwtchPeerSendChannel{}) + return nil + }) + // TODO We have to wait to receive the channel result before we can continue + // We should have a better mechanism for this kindof interaction + time.Sleep(time.Second * 1) + psc.connection.Do(func() error { + channel := psc.connection.Channel("im.cwtch.server.send", channels.Outbound) + if channel != nil { + sendchannel, ok := channel.Handler.(*send.CwtchPeerSendChannel) + if ok { + sendchannel.SendGroupMessage(gm) + } + } + return nil + }) } -func (psc *PeerServerConnection) HandleGroupMessage(*protocol.GroupMessage) { - +func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) { + psc.GroupMessageHandler(psc.Server, gm) } diff --git a/client/connections/peerserverconnection_test.go b/client/connections/peerserverconnection_test.go index 121af53..35d0b73 100644 --- a/client/connections/peerserverconnection_test.go +++ b/client/connections/peerserverconnection_test.go @@ -2,13 +2,14 @@ package connections import ( "crypto/rsa" + "git.mascherari.press/cwtch/protocol" + "git.mascherari.press/cwtch/server/fetch" + "git.mascherari.press/cwtch/server/send" "github.com/s-rah/go-ricochet" + "github.com/s-rah/go-ricochet/channels" "github.com/s-rah/go-ricochet/connection" "github.com/s-rah/go-ricochet/identity" "github.com/s-rah/go-ricochet/utils" - "github.com/s-rah/go-ricochet/channels" - "git.mascherari.press/cwtch/server/send" - "git.mascherari.press/cwtch/protocol" "net" "testing" "time" @@ -19,12 +20,16 @@ func ServerAuthValid(hostname string, publicKey rsa.PublicKey) (allowed, known b } type TestServer struct { - connection.AutoConnectionHandler - Received bool + connection.AutoConnectionHandler + Received bool } func (ts *TestServer) HandleGroupMessage(gm *protocol.GroupMessage) { - ts.Received = true + ts.Received = true +} + +func (ts *TestServer) HandleFetchRequest() []*protocol.GroupMessage { + return []*protocol.GroupMessage{{Ciphertext: []byte("hello"), Spamguard: []byte{}}, {Ciphertext: []byte("hello"), Spamguard: []byte{}}} } func runtestserver(t *testing.T, ts *TestServer) { @@ -39,19 +44,25 @@ func runtestserver(t *testing.T, ts *TestServer) { if err != nil { t.Errorf("Negotiate Version Error: %v", err) } + rc.TraceLog(true) err = connection.HandleInboundConnection(rc).ProcessAuthAsServer(identity.Initialize("", privateKey), ServerAuthValid) if err != nil { t.Errorf("ServerAuth Error: %v", err) } - ts.RegisterChannelHandler("im.cwtch.server.send", func() channels.Handler { - server := new(send.CwtchServerSendChannel) - server.Handler = ts - return server + server := new(send.CwtchServerSendChannel) + server.Handler = ts + return server }) - - rc.Process(ts) + + ts.RegisterChannelHandler("im.cwtch.server.fetch", func() channels.Handler { + server := new(fetch.CwtchServerFetchChannel) + server.Handler = ts + return server + }) + + rc.Process(ts) } func TestPeerServerConnection(t *testing.T) { @@ -59,6 +70,10 @@ func TestPeerServerConnection(t *testing.T) { ts.Init() go runtestserver(t, ts) psc := NewPeerServerConnection("127.0.0.1:5451|kwke2hntvyfqm7dr") + numcalls := 0 + psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) { + numcalls += 1 + } state := psc.GetState() if state != DISCONNECTED { t.Errorf("new connections should start in disconnected state") @@ -75,7 +90,11 @@ func TestPeerServerConnection(t *testing.T) { psc.SendGroupMessage(gm) time.Sleep(time.Second * 2) if ts.Received == false { - t.Errorf("Should have received a group message in test server") + t.Errorf("Should have received a group message in test server") } - + + if numcalls != 2 { + t.Errorf("Should have received 2 calls from fetch request, instead received %v", numcalls) + } + } diff --git a/client/fetch/peer_fetch_channel.go b/client/fetch/peer_fetch_channel.go index 4ac56a1..a7a1f5b 100644 --- a/client/fetch/peer_fetch_channel.go +++ b/client/fetch/peer_fetch_channel.go @@ -68,10 +68,20 @@ func (cpfc *CwtchPeerFetchChannel) OpenOutboundResult(err error, crm *Protocol_D if err == nil { if crm.GetOpened() { cpfc.channel.Pending = false + cpfc.FetchRequest() } } } +func (cpfc *CwtchPeerFetchChannel) FetchRequest() { + fm := &protocol.FetchMessage{} + csp := &protocol.CwtchServerPacket{ + FetchMessage: fm, + } + packet, _ := proto.Marshal(csp) + cpfc.channel.SendMessage(packet) +} + // Packet is called for each raw packet received on this channel. func (cpfc *CwtchPeerFetchChannel) Packet(data []byte) { csp := &protocol.CwtchServerPacket{} diff --git a/server/fetch/server_fetch_channel.go b/server/fetch/server_fetch_channel.go index f83376a..9119343 100644 --- a/server/fetch/server_fetch_channel.go +++ b/server/fetch/server_fetch_channel.go @@ -80,9 +80,8 @@ func (cc *CwtchServerFetchChannel) OpenOutboundResult(err error, crm *Protocol_D // SendGroupMessage func (cc *CwtchServerFetchChannel) SendGroupMessages(gm []*protocol.GroupMessage) { - csp := &protocol.CwtchServerPacket{ - GroupMessages: gm, - } + csp := &protocol.CwtchServerPacket{} + csp.GroupMessages = gm packet, _ := proto.Marshal(csp) cc.channel.SendMessage(packet) }