Client Fetch Logic

This commit is contained in:
Sarah Jamie Lewis 2018-03-10 14:05:48 -08:00
parent 5f94e82347
commit fbf42b3a79
4 changed files with 81 additions and 42 deletions

View File

@ -1,13 +1,14 @@
package connections package connections
import ( import (
"github.com/s-rah/go-ricochet" "git.mascherari.press/cwtch/client/fetch"
"github.com/s-rah/go-ricochet/connection"
"github.com/s-rah/go-ricochet/channels"
"github.com/s-rah/go-ricochet/identity"
"github.com/s-rah/go-ricochet/utils"
"git.mascherari.press/cwtch/client/send" "git.mascherari.press/cwtch/client/send"
"git.mascherari.press/cwtch/protocol" "git.mascherari.press/cwtch/protocol"
"github.com/s-rah/go-ricochet"
"github.com/s-rah/go-ricochet/channels"
"github.com/s-rah/go-ricochet/connection"
"github.com/s-rah/go-ricochet/identity"
"github.com/s-rah/go-ricochet/utils"
"time" "time"
) )
@ -16,6 +17,8 @@ type PeerServerConnection struct {
Server string Server string
state ConnectionState state ConnectionState
connection connection.Connection connection connection.Connection
GroupMessageHandler func(string, *protocol.GroupMessage)
} }
func NewPeerServerConnection(serverhostname string) *PeerServerConnection { func NewPeerServerConnection(serverhostname string) *PeerServerConnection {
@ -34,6 +37,7 @@ func (psc *PeerServerConnection) GetState() ConnectionState {
func (psc *PeerServerConnection) Run() error { func (psc *PeerServerConnection) Run() error {
rc, err := goricochet.Open(psc.Server) rc, err := goricochet.Open(psc.Server)
if err == nil { if err == nil {
rc.TraceLog(true)
psc.connection = *rc psc.connection = *rc
psc.state = CONNECTED psc.state = CONNECTED
pk, err := utils.GeneratePrivateKey() pk, err := utils.GeneratePrivateKey()
@ -41,6 +45,14 @@ func (psc *PeerServerConnection) Run() error {
_, err := connection.HandleOutboundConnection(&psc.connection).ProcessAuthAsClient(identity.Initialize("cwtchpeer", pk)) _, err := connection.HandleOutboundConnection(&psc.connection).ProcessAuthAsClient(identity.Initialize("cwtchpeer", pk))
if err == nil { if err == nil {
psc.state = AUTHENTICATED psc.state = AUTHENTICATED
go func() {
psc.connection.Do(func() error {
psc.connection.RequestOpenChannel("im.cwtch.server.fetch", &fetch.CwtchPeerFetchChannel{Handler: psc})
return nil
})
}()
psc.connection.Process(psc) psc.connection.Process(psc)
} }
} }
@ -48,32 +60,31 @@ func (psc *PeerServerConnection) Run() error {
return err return err
} }
// Break makes Run() return and prevents processing, but doesn't close the connection. // Break makes Run() return and prevents processing, but doesn't close the connection.
func (psc *PeerServerConnection) Break() error { func (psc *PeerServerConnection) Break() error {
return psc.connection.Break() return psc.connection.Break()
} }
func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) { func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) {
psc.connection.Do(func() error { psc.connection.Do(func() error {
psc.connection.RequestOpenChannel("im.cwtch.server.send", &send.CwtchPeerSendChannel{}) psc.connection.RequestOpenChannel("im.cwtch.server.send", &send.CwtchPeerSendChannel{})
return nil return nil
}) })
// TODO We have to wait to receive the channel result before we can continue // TODO We have to wait to receive the channel result before we can continue
// We should have a better mechanism for this kindof interaction // We should have a better mechanism for this kindof interaction
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
psc.connection.Do(func() error { psc.connection.Do(func() error {
channel := psc.connection.Channel("im.cwtch.server.send", channels.Outbound) channel := psc.connection.Channel("im.cwtch.server.send", channels.Outbound)
if channel != nil { if channel != nil {
sendchannel, ok := channel.Handler.(*send.CwtchPeerSendChannel) sendchannel, ok := channel.Handler.(*send.CwtchPeerSendChannel)
if ok { if ok {
sendchannel.SendGroupMessage(gm) sendchannel.SendGroupMessage(gm)
} }
} }
return nil return nil
}) })
} }
func (psc *PeerServerConnection) HandleGroupMessage(*protocol.GroupMessage) { func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) {
psc.GroupMessageHandler(psc.Server, gm)
} }

View File

@ -2,13 +2,14 @@ package connections
import ( import (
"crypto/rsa" "crypto/rsa"
"git.mascherari.press/cwtch/protocol"
"git.mascherari.press/cwtch/server/fetch"
"git.mascherari.press/cwtch/server/send"
"github.com/s-rah/go-ricochet" "github.com/s-rah/go-ricochet"
"github.com/s-rah/go-ricochet/channels"
"github.com/s-rah/go-ricochet/connection" "github.com/s-rah/go-ricochet/connection"
"github.com/s-rah/go-ricochet/identity" "github.com/s-rah/go-ricochet/identity"
"github.com/s-rah/go-ricochet/utils" "github.com/s-rah/go-ricochet/utils"
"github.com/s-rah/go-ricochet/channels"
"git.mascherari.press/cwtch/server/send"
"git.mascherari.press/cwtch/protocol"
"net" "net"
"testing" "testing"
"time" "time"
@ -19,12 +20,16 @@ func ServerAuthValid(hostname string, publicKey rsa.PublicKey) (allowed, known b
} }
type TestServer struct { type TestServer struct {
connection.AutoConnectionHandler connection.AutoConnectionHandler
Received bool Received bool
} }
func (ts *TestServer) HandleGroupMessage(gm *protocol.GroupMessage) { func (ts *TestServer) HandleGroupMessage(gm *protocol.GroupMessage) {
ts.Received = true ts.Received = true
}
func (ts *TestServer) HandleFetchRequest() []*protocol.GroupMessage {
return []*protocol.GroupMessage{{Ciphertext: []byte("hello"), Spamguard: []byte{}}, {Ciphertext: []byte("hello"), Spamguard: []byte{}}}
} }
func runtestserver(t *testing.T, ts *TestServer) { func runtestserver(t *testing.T, ts *TestServer) {
@ -39,19 +44,25 @@ func runtestserver(t *testing.T, ts *TestServer) {
if err != nil { if err != nil {
t.Errorf("Negotiate Version Error: %v", err) t.Errorf("Negotiate Version Error: %v", err)
} }
rc.TraceLog(true)
err = connection.HandleInboundConnection(rc).ProcessAuthAsServer(identity.Initialize("", privateKey), ServerAuthValid) err = connection.HandleInboundConnection(rc).ProcessAuthAsServer(identity.Initialize("", privateKey), ServerAuthValid)
if err != nil { if err != nil {
t.Errorf("ServerAuth Error: %v", err) t.Errorf("ServerAuth Error: %v", err)
} }
ts.RegisterChannelHandler("im.cwtch.server.send", func() channels.Handler { ts.RegisterChannelHandler("im.cwtch.server.send", func() channels.Handler {
server := new(send.CwtchServerSendChannel) server := new(send.CwtchServerSendChannel)
server.Handler = ts server.Handler = ts
return server return server
}) })
rc.Process(ts) ts.RegisterChannelHandler("im.cwtch.server.fetch", func() channels.Handler {
server := new(fetch.CwtchServerFetchChannel)
server.Handler = ts
return server
})
rc.Process(ts)
} }
func TestPeerServerConnection(t *testing.T) { func TestPeerServerConnection(t *testing.T) {
@ -59,6 +70,10 @@ func TestPeerServerConnection(t *testing.T) {
ts.Init() ts.Init()
go runtestserver(t, ts) go runtestserver(t, ts)
psc := NewPeerServerConnection("127.0.0.1:5451|kwke2hntvyfqm7dr") psc := NewPeerServerConnection("127.0.0.1:5451|kwke2hntvyfqm7dr")
numcalls := 0
psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) {
numcalls += 1
}
state := psc.GetState() state := psc.GetState()
if state != DISCONNECTED { if state != DISCONNECTED {
t.Errorf("new connections should start in disconnected state") t.Errorf("new connections should start in disconnected state")
@ -75,7 +90,11 @@ func TestPeerServerConnection(t *testing.T) {
psc.SendGroupMessage(gm) psc.SendGroupMessage(gm)
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
if ts.Received == false { if ts.Received == false {
t.Errorf("Should have received a group message in test server") t.Errorf("Should have received a group message in test server")
} }
if numcalls != 2 {
t.Errorf("Should have received 2 calls from fetch request, instead received %v", numcalls)
}
} }

View File

@ -68,10 +68,20 @@ func (cpfc *CwtchPeerFetchChannel) OpenOutboundResult(err error, crm *Protocol_D
if err == nil { if err == nil {
if crm.GetOpened() { if crm.GetOpened() {
cpfc.channel.Pending = false cpfc.channel.Pending = false
cpfc.FetchRequest()
} }
} }
} }
func (cpfc *CwtchPeerFetchChannel) FetchRequest() {
fm := &protocol.FetchMessage{}
csp := &protocol.CwtchServerPacket{
FetchMessage: fm,
}
packet, _ := proto.Marshal(csp)
cpfc.channel.SendMessage(packet)
}
// Packet is called for each raw packet received on this channel. // Packet is called for each raw packet received on this channel.
func (cpfc *CwtchPeerFetchChannel) Packet(data []byte) { func (cpfc *CwtchPeerFetchChannel) Packet(data []byte) {
csp := &protocol.CwtchServerPacket{} csp := &protocol.CwtchServerPacket{}

View File

@ -80,9 +80,8 @@ func (cc *CwtchServerFetchChannel) OpenOutboundResult(err error, crm *Protocol_D
// SendGroupMessage // SendGroupMessage
func (cc *CwtchServerFetchChannel) SendGroupMessages(gm []*protocol.GroupMessage) { func (cc *CwtchServerFetchChannel) SendGroupMessages(gm []*protocol.GroupMessage) {
csp := &protocol.CwtchServerPacket{ csp := &protocol.CwtchServerPacket{}
GroupMessages: gm, csp.GroupMessages = gm
}
packet, _ := proto.Marshal(csp) packet, _ := proto.Marshal(csp)
cc.channel.SendMessage(packet) cc.channel.SendMessage(packet)
} }