diff --git a/go.mod b/go.mod index 31672ef..43dc8b8 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,9 @@ module cwtch.im/cwtch go 1.14 require ( - git.openprivacy.ca/cwtch.im/tapir v0.4.5 + git.openprivacy.ca/cwtch.im/tapir v0.4.6 git.openprivacy.ca/openprivacy/connectivity v1.4.5 - git.openprivacy.ca/openprivacy/log v1.0.2 + git.openprivacy.ca/openprivacy/log v1.0.3 github.com/gtank/ristretto255 v0.1.2 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/struCoder/pidusage v0.1.3 diff --git a/go.sum b/go.sum index 4c313c5..7ed9cbc 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ git.openprivacy.ca/cwtch.im/tapir v0.4.4 h1:KyuTVmr9GYptTCeR7JDODjmhBBbnIBf9V3NS git.openprivacy.ca/cwtch.im/tapir v0.4.4/go.mod h1:qMFTdmDZITc1BLP1jSW0gVpLmvpg+Zjsh5ek8StwbFE= git.openprivacy.ca/cwtch.im/tapir v0.4.5 h1:CTMFdH2qBt4dkiyzNbxuQpK52iKerZmFnG2SfLLSmS8= git.openprivacy.ca/cwtch.im/tapir v0.4.5/go.mod h1:qMFTdmDZITc1BLP1jSW0gVpLmvpg+Zjsh5ek8StwbFE= +git.openprivacy.ca/cwtch.im/tapir v0.4.6 h1:hoKIXNfsWjTiLry9NYTBc9VJYSST3WydHv+Wrt+bn8s= +git.openprivacy.ca/cwtch.im/tapir v0.4.6/go.mod h1:qMFTdmDZITc1BLP1jSW0gVpLmvpg+Zjsh5ek8StwbFE= git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c= git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU= git.openprivacy.ca/openprivacy/connectivity v1.4.0 h1:c7AANUCrlA4hIqXxIGDOWMtSe8CpDleD1877PShScbM= @@ -37,6 +39,8 @@ git.openprivacy.ca/openprivacy/connectivity v1.4.5/go.mod h1:JVRCIdL+lAG6ohBFWiK git.openprivacy.ca/openprivacy/log v1.0.1/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= git.openprivacy.ca/openprivacy/log v1.0.2 h1:HLP4wsw4ljczFAelYnbObIs821z+jgMPCe8uODPnGQM= git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= +git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0= +git.openprivacy.ca/openprivacy/log v1.0.3/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index c1f27c9..a80c936 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -160,7 +160,7 @@ func (e *engine) eventHandler() { case event.SendMessageToGroup: ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature) + go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0) case event.SendMessageToPeer: // TODO: remove this passthrough once the UI is integrated. context, ok := ev.Data[event.EventContext] @@ -450,7 +450,7 @@ func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error { return err } - e.getValRequests.Store(onion + eventID, message) + e.getValRequests.Store(onion+eventID, message) return e.sendPeerMessage(onion, model3.PeerMessage{ID: eventID, Context: event.ContextGetVal, Data: message}) } @@ -480,7 +480,18 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes } // sendMessageToGroup attempts to sent the given message to the given group id. -func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte) { +func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) { + + // sending to groups can fail for a few reasons (slow server, not enough tokens, etc.) + // rather than trying to keep all that logic in method we simply back-off and try again + // but if we fail more than 5 times then we report back to the client so they can investigate other options. + // Note: This flow only applies to online-and-connected servers (this method will return faster if the server is not + // online) + if attempts >= 5 { + log.Errorf("failed to post a message to a group after %v attempts", attempts) + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "could not make payment to server", event.Signature: base64.StdEncoding.EncodeToString(sig)})) + return + } es, ok := e.ephemeralServices.Load(server) if es == nil || !ok { @@ -494,19 +505,16 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si tokenApp, ok := (conn.App()).(*TokenBoardClient) if ok { if spent, numtokens := tokenApp.Post(ct, sig); !spent { - // TODO: while this works for the spam guard, it won't work for other forms of payment... - // Make an -inline- payment, this will hold the goroutine - if err := tokenApp.MakePayment(); err == nil { - // This really shouldn't fail since we now know we have the required tokens... - if spent, _ := tokenApp.Post(ct, sig); !spent { - e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)})) - } - } else { - // Broadcast the token error - e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)})) - } - } else if numtokens < 5 { + // we failed to post, probably because we ran out of tokens... so make a payment go tokenApp.MakePayment() + // backoff + time.Sleep(time.Second) + // try again + e.sendMessageToGroup(groupID, server, ct, sig, attempts+1) + } else { + if numtokens < 5 { + go tokenApp.MakePayment() + } } // regardless we return.... return @@ -519,16 +527,16 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) { log.Debugf("New message from peer: %v %v", hostname, context) - if context == event.ContextAck { + if context == event.ContextAck { e.peerAck(hostname, eventID) } else if context == event.ContextRetVal { - req, ok := e.getValRequests.Load(hostname + eventID) + req, ok := e.getValRequests.Load(hostname + eventID) if ok { reqStr := req.([]byte) e.handlePeerRetVal(hostname, reqStr, message) - e.getValRequests.Delete(hostname + eventID) + e.getValRequests.Delete(hostname + eventID) } else { - log.Errorf("could not find val request for %v %s",hostname, eventID) + log.Errorf("could not find val request for %v %s", hostname, eventID) } } else if context == event.ContextGetVal { var getVal peerGetVal diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index 4f707b8..ec3deb5 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -79,9 +79,9 @@ func (pa *PeerApp) listen() { var peerMessage model2.PeerMessage err := json.Unmarshal(message, &peerMessage) if err == nil { - if pa.IsAllowed(pa.connection.Hostname()) { - pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data) - } + if pa.IsAllowed(pa.connection.Hostname()) { + pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data) + } } else { log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err) } diff --git a/protocol/connections/tokenboardclientapp.go b/protocol/connections/tokenboardclientapp.go index cb36cc8..1ca69fe 100644 --- a/protocol/connections/tokenboardclientapp.go +++ b/protocol/connections/tokenboardclientapp.go @@ -12,7 +12,9 @@ import ( "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/log" "github.com/gtank/ristretto255" + "reflect" "sync" + "time" ) // NewTokenBoardClient generates a new Client for Token Board @@ -179,23 +181,38 @@ func (ta *TokenBoardClient) MakePayment() error { powTokenApp := new(applications.ApplicationChain). ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability). ChainApplication(tokenApplication, applications.HasTokensCapability) - client.Connect(ta.tokenServiceOnion, powTokenApp) + log.Debugf("Waiting for successful PoW Auth...") - conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) - if err == nil { - powtapp, _ := conn.App().(*applications.TokenApplication) - // Update tokens...we need a lock here to prevent SpendToken from modifying the tokens - // during this process.. - log.Debugf("Updating Tokens") - ta.tokenLock.Lock() - ta.tokens = append(ta.tokens, powtapp.Tokens...) - ta.tokenLock.Unlock() - log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) - conn.Close() - return nil + + connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp) + if connected == true && err == nil { + log.Debugf("Waiting for successful Token Acquisition...") + conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) + if err == nil { + powtapp, ok := conn.App().(*applications.TokenApplication) + if ok { + // Update tokens...we need a lock here to prevent SpendToken from modifying the tokens + // during this process.. + log.Debugf("Updating Tokens") + ta.tokenLock.Lock() + ta.tokens = append(ta.tokens, powtapp.Tokens...) + ta.tokenLock.Unlock() + log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) + conn.Close() + return nil + } + log.Errorf("invalid cast of powapp. this should not happen %v %v", powtapp, reflect.TypeOf(conn.App())) + return errors.New("invalid cast of powapp. this should never happen") + } + log.Debugf("could not connect to payment server %v..trying again") + return ta.MakePayment() + } else if connected && err != nil { + log.Debugf("inexplicable error: %v", err) } - log.Debugf("Error making payment: to %v %v", ta.tokenServiceOnion, err) - return err + log.Debugf("failed to make a connection. trying again...") + // it doesn't actually take that long to make a payment, so waiting a small amount of time should suffice + time.Sleep(time.Second) + return ta.MakePayment() } // NextToken retrieves the next token diff --git a/testing/file_sharing_integration_test.go b/testing/filesharing/file_sharing_integration_test.go similarity index 81% rename from testing/file_sharing_integration_test.go rename to testing/filesharing/file_sharing_integration_test.go index 5d9e1d3..e02d3be 100644 --- a/testing/file_sharing_integration_test.go +++ b/testing/filesharing/file_sharing_integration_test.go @@ -1,4 +1,4 @@ -package testing +package filesharing import ( "crypto/rand" @@ -7,6 +7,9 @@ import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/functionality/filesharing" "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/files" "encoding/base64" "encoding/hex" @@ -24,6 +27,29 @@ import ( "time" ) +func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) { + for { + state, ok := peera.GetPeerState(peerb.GetOnion()) + if ok { + //log.Infof("Waiting for Peer %v to peer with peer: %v - state: %v\n", peera.GetProfile().Name, peerb.GetProfile().Name, state) + if state == connections.FAILED { + t.Fatalf("%v could not connect to %v", peera.GetOnion(), peerb.GetOnion()) + } + if state != connections.AUTHENTICATED { + fmt.Printf("peer %v waiting connect to peer %v, currently: %v\n", peera.GetOnion(), peerb.GetOnion(), connections.ConnectionStateName[state]) + time.Sleep(time.Second * 5) + continue + } else { + peerAName, _ := peera.GetAttribute(attr.GetLocalScope("name")) + peerBName, _ := peerb.GetAttribute(attr.GetLocalScope("name")) + fmt.Printf("%v CONNECTED and AUTHED to %v\n", peerAName, peerBName) + break + } + } + } + return +} + func TestFileSharing(t *testing.T) { numGoRoutinesStart := runtime.NumGoroutine()