From e7fc228cfa2327bda497e7ac44f8c003598f753a Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 10 May 2021 16:37:20 -0700 Subject: [PATCH] Don't inline group server connections... --- peer/cwtch_peer.go | 2 +- protocol/connections/engine.go | 41 +++++++++++---------- protocol/connections/tokenboardclientapp.go | 34 ++++++++++++----- 3 files changed, 48 insertions(+), 29 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index d9f83a6..3e4d16a 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -473,8 +473,8 @@ func (cp *cwtchPeer) JoinServer(onion string) error { // It returns the signature of the message which can be used to identify it in any UX layer. func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) { cp.mutex.Lock() - group := cp.Profile.GetGroup(groupid) defer cp.mutex.Unlock() + group := cp.Profile.GetGroup(groupid) if group == nil { return "", errors.New("invalid group id") diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index dbeaf70..fb54b0e 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -7,7 +7,6 @@ import ( "encoding/base64" "encoding/json" "errors" - "fmt" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/networks/tor" "git.openprivacy.ca/cwtch.im/tapir/primitives" @@ -135,7 +134,7 @@ func (e *engine) eventHandler() { // will result in a full sync signature = []byte{} } - e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) + go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) case event.LeaveServer: es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer]) if ok { @@ -153,10 +152,7 @@ func (e *engine) eventHandler() { case event.SendMessageToGroup: ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - err := e.sendMessageToGroup(ev.Data[event.GroupServer], ciphertext, signature) - if err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: ev.Data[event.GroupID], event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error(), event.Signature: ev.Data[event.Signature]})) - } + go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature) case event.SendMessageToPeer: // TODO: remove this passthrough once the UI is integrated. context, ok := ev.Data[event.EventContext] @@ -445,31 +441,38 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes } // sendMessageToGroup attempts to sent the given message to the given group id. -func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) error { +func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte) { + es, ok := e.ephemeralServices.Load(server) if !ok { - return fmt.Errorf("no service exists for group %v", server) + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)})) } ephemeralService := es.(tapir.Service) + conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) if ok { - attempts := 0 - for tokenApp.Post(ct, sig) == false { - // TODO This should eventually be wired back into the UI to allow it to error - tokenApp.MakePayment() - time.Sleep(time.Second * 5) - attempts++ - if attempts == 5 { - return errors.New("failed to post to token board") + if spent, numtokens := tokenApp.Post(ct, sig); spent == false { + // TODO: while this works for the spam guard, it won't work for other forms of payment... + // Make an -inline- payment, this will hold the goroutine + if err := tokenApp.MakePayment(); err == nil { + // This really shouldn't fail since we now know we have the required tokens... + if spent, _ := tokenApp.Post(ct, sig); spent == false { + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)})) + } + } else { + // Broadast the token error + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)})) } + } else if numtokens < 5 { + go tokenApp.MakePayment() } - return nil + // regardless we return.... + return } - return errors.New("failed type assertion conn.App != TokenBoardClientApp") } - return err + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-connection-not-valid", event.Signature: base64.StdEncoding.EncodeToString(sig)})) } func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) { diff --git a/protocol/connections/tokenboardclientapp.go b/protocol/connections/tokenboardclientapp.go index f4b9cb0..b24215a 100644 --- a/protocol/connections/tokenboardclientapp.go +++ b/protocol/connections/tokenboardclientapp.go @@ -12,6 +12,7 @@ import ( "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/log" "github.com/gtank/ristretto255" + "sync" ) // NewTokenBoardClient generates a new Client for Token Board @@ -39,6 +40,7 @@ type TokenBoardClient struct { // Token service handling acn connectivity.ACN tokens []*privacypass.Token + tokenLock sync.Mutex tokenService *privacypass.TokenServer tokenServiceOnion string lastKnownSignature []byte @@ -65,6 +67,9 @@ func (ta *TokenBoardClient) Init(connection tapir.Connection) { ta.connection.SetCapability(groups.CwtchServerSyncedCapability) log.Debugf("Successfully Initialized Connection") go ta.Listen() + // Optimistically acquire many tokens for this server... + go ta.MakePayment() + go ta.MakePayment() ta.Replay() } else { connection.Close() @@ -145,21 +150,21 @@ func (ta *TokenBoardClient) PurchaseTokens() { } // Post sends a Post Request to the server -func (ta *TokenBoardClient) Post(ct []byte, sig []byte) bool { +func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) { egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig} - token, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname()) + token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname()) if err == nil { data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}}) log.Debugf("Message Length: %s %v", data, len(data)) ta.connection.Send(data) - return true + return true, numTokens } log.Debugf("No Valid Tokens: %v", err) - return false + return false, numTokens } // MakePayment uses the PoW based token protocol to obtain more tokens -func (ta *TokenBoardClient) MakePayment() { +func (ta *TokenBoardClient) MakePayment() error { log.Debugf("Making a Payment %v", ta) id, sk := primitives.InitializeEphemeralIdentity() var client tapir.Service @@ -172,23 +177,34 @@ func (ta *TokenBoardClient) MakePayment() { ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability). ChainApplication(tokenApplication, applications.HasTokensCapability) client.Connect(ta.tokenServiceOnion, powTokenApp) + log.Debugf("Waiting for successful PoW Auth...") conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) if err == nil { powtapp, _ := conn.App().(*applications.TokenApplication) + // Update tokens...we need a lock here to prevent SpendToken from modifying the tokens + // during this process.. + log.Debugf("Updating Tokens") + ta.tokenLock.Lock() ta.tokens = append(ta.tokens, powtapp.Tokens...) + ta.tokenLock.Unlock() log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) conn.Close() - return + return nil } log.Debugf("Error making payment: to %v %v", ta.tokenServiceOnion, err) + return err } // NextToken retrieves the next token -func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, error) { +func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, int, error) { + // Taken the first new token, we need a lock here because tokens can be appended by MakePayment + // which could result in weird behaviour... + ta.tokenLock.Lock() + defer ta.tokenLock.Unlock() if len(ta.tokens) == 0 { - return privacypass.SpentToken{}, errors.New("No more tokens") + return privacypass.SpentToken{}, len(ta.tokens), errors.New("No more tokens") } token := ta.tokens[0] ta.tokens = ta.tokens[1:] - return token.SpendToken(append(data, hostname...)), nil + return token.SpendToken(append(data, hostname...)), len(ta.tokens), nil }