From 00dc2e60e5317133d40748f132c5bf44c2142ab9 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Sat, 8 May 2021 12:03:09 -0700 Subject: [PATCH 1/2] Wire up SendMessageToGroupError (Also makes this flow much more efficient by including groupId in the round trip) --- model/profile.go | 13 ++++++------- peer/cwtch_peer.go | 7 ++++--- protocol/connections/engine.go | 4 +++- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/model/profile.go b/model/profile.go index 99ceada..fceac0a 100644 --- a/model/profile.go +++ b/model/profile.go @@ -216,13 +216,12 @@ func (p *Profile) AckSentMessageToPeer(onion string, eventID string) int { } // AddGroupSentMessageError searches matching groups for the message by sig and marks it as an error -func (p *Profile) AddGroupSentMessageError(groupServer string, signature string, error string) { - for _, group := range p.Groups { - if group.GroupServer == groupServer { - if group.ErrorSentMessage([]byte(signature), error) { - break - } - } +func (p *Profile) AddGroupSentMessageError(groupID string, signature []byte, error string) { + p.lock.Lock() + defer p.lock.Unlock() + group, exists := p.Groups[groupID] + if exists { + group.ErrorSentMessage(signature, error) } } diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index e5abba4..d9f83a6 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -482,10 +482,10 @@ func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) ( ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid) if err == nil { - cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})) + cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: groupid, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})) } - return string(sig), err + return base64.StdEncoding.EncodeToString(sig), err } func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { @@ -702,7 +702,8 @@ func (cp *cwtchPeer) eventHandler() { case event.SendMessageToGroupError: cp.mutex.Lock() - cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupServer], ev.Data[event.Signature], ev.Data[event.Error]) + signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) + cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupID], signature, ev.Data[event.Error]) cp.mutex.Unlock() case event.SendMessageToPeerError: diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 15b43c6..dbeaf70 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -54,6 +54,8 @@ type engine struct { // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. // Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages +// Protocol Engine *can* associate Group Identifiers with Group Servers, although we don't currently make use of this fact +// other than to route errors back to the UI. type Engine interface { ACN() connectivity.ACN EventManager() event.Manager @@ -153,7 +155,7 @@ func (e *engine) eventHandler() { signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) err := e.sendMessageToGroup(ev.Data[event.GroupServer], ciphertext, signature) if err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error()})) + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: ev.Data[event.GroupID], event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error(), event.Signature: ev.Data[event.Signature]})) } case event.SendMessageToPeer: // TODO: remove this passthrough once the UI is integrated. From e7fc228cfa2327bda497e7ac44f8c003598f753a Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 10 May 2021 16:37:20 -0700 Subject: [PATCH 2/2] Don't inline group server connections... --- peer/cwtch_peer.go | 2 +- protocol/connections/engine.go | 41 +++++++++++---------- protocol/connections/tokenboardclientapp.go | 34 ++++++++++++----- 3 files changed, 48 insertions(+), 29 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index d9f83a6..3e4d16a 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -473,8 +473,8 @@ func (cp *cwtchPeer) JoinServer(onion string) error { // It returns the signature of the message which can be used to identify it in any UX layer. func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) { cp.mutex.Lock() - group := cp.Profile.GetGroup(groupid) defer cp.mutex.Unlock() + group := cp.Profile.GetGroup(groupid) if group == nil { return "", errors.New("invalid group id") diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index dbeaf70..fb54b0e 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -7,7 +7,6 @@ import ( "encoding/base64" "encoding/json" "errors" - "fmt" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/networks/tor" "git.openprivacy.ca/cwtch.im/tapir/primitives" @@ -135,7 +134,7 @@ func (e *engine) eventHandler() { // will result in a full sync signature = []byte{} } - e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) + go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) case event.LeaveServer: es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer]) if ok { @@ -153,10 +152,7 @@ func (e *engine) eventHandler() { case event.SendMessageToGroup: ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - err := e.sendMessageToGroup(ev.Data[event.GroupServer], ciphertext, signature) - if err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: ev.Data[event.GroupID], event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error(), event.Signature: ev.Data[event.Signature]})) - } + go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature) case event.SendMessageToPeer: // TODO: remove this passthrough once the UI is integrated. context, ok := ev.Data[event.EventContext] @@ -445,31 +441,38 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes } // sendMessageToGroup attempts to sent the given message to the given group id. -func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) error { +func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte) { + es, ok := e.ephemeralServices.Load(server) if !ok { - return fmt.Errorf("no service exists for group %v", server) + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)})) } ephemeralService := es.(tapir.Service) + conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) if ok { - attempts := 0 - for tokenApp.Post(ct, sig) == false { - // TODO This should eventually be wired back into the UI to allow it to error - tokenApp.MakePayment() - time.Sleep(time.Second * 5) - attempts++ - if attempts == 5 { - return errors.New("failed to post to token board") + if spent, numtokens := tokenApp.Post(ct, sig); spent == false { + // TODO: while this works for the spam guard, it won't work for other forms of payment... + // Make an -inline- payment, this will hold the goroutine + if err := tokenApp.MakePayment(); err == nil { + // This really shouldn't fail since we now know we have the required tokens... + if spent, _ := tokenApp.Post(ct, sig); spent == false { + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)})) + } + } else { + // Broadast the token error + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)})) } + } else if numtokens < 5 { + go tokenApp.MakePayment() } - return nil + // regardless we return.... + return } - return errors.New("failed type assertion conn.App != TokenBoardClientApp") } - return err + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-connection-not-valid", event.Signature: base64.StdEncoding.EncodeToString(sig)})) } func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) { diff --git a/protocol/connections/tokenboardclientapp.go b/protocol/connections/tokenboardclientapp.go index f4b9cb0..b24215a 100644 --- a/protocol/connections/tokenboardclientapp.go +++ b/protocol/connections/tokenboardclientapp.go @@ -12,6 +12,7 @@ import ( "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/log" "github.com/gtank/ristretto255" + "sync" ) // NewTokenBoardClient generates a new Client for Token Board @@ -39,6 +40,7 @@ type TokenBoardClient struct { // Token service handling acn connectivity.ACN tokens []*privacypass.Token + tokenLock sync.Mutex tokenService *privacypass.TokenServer tokenServiceOnion string lastKnownSignature []byte @@ -65,6 +67,9 @@ func (ta *TokenBoardClient) Init(connection tapir.Connection) { ta.connection.SetCapability(groups.CwtchServerSyncedCapability) log.Debugf("Successfully Initialized Connection") go ta.Listen() + // Optimistically acquire many tokens for this server... + go ta.MakePayment() + go ta.MakePayment() ta.Replay() } else { connection.Close() @@ -145,21 +150,21 @@ func (ta *TokenBoardClient) PurchaseTokens() { } // Post sends a Post Request to the server -func (ta *TokenBoardClient) Post(ct []byte, sig []byte) bool { +func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) { egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig} - token, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname()) + token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname()) if err == nil { data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}}) log.Debugf("Message Length: %s %v", data, len(data)) ta.connection.Send(data) - return true + return true, numTokens } log.Debugf("No Valid Tokens: %v", err) - return false + return false, numTokens } // MakePayment uses the PoW based token protocol to obtain more tokens -func (ta *TokenBoardClient) MakePayment() { +func (ta *TokenBoardClient) MakePayment() error { log.Debugf("Making a Payment %v", ta) id, sk := primitives.InitializeEphemeralIdentity() var client tapir.Service @@ -172,23 +177,34 @@ func (ta *TokenBoardClient) MakePayment() { ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability). ChainApplication(tokenApplication, applications.HasTokensCapability) client.Connect(ta.tokenServiceOnion, powTokenApp) + log.Debugf("Waiting for successful PoW Auth...") conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) if err == nil { powtapp, _ := conn.App().(*applications.TokenApplication) + // Update tokens...we need a lock here to prevent SpendToken from modifying the tokens + // during this process.. + log.Debugf("Updating Tokens") + ta.tokenLock.Lock() ta.tokens = append(ta.tokens, powtapp.Tokens...) + ta.tokenLock.Unlock() log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) conn.Close() - return + return nil } log.Debugf("Error making payment: to %v %v", ta.tokenServiceOnion, err) + return err } // NextToken retrieves the next token -func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, error) { +func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, int, error) { + // Taken the first new token, we need a lock here because tokens can be appended by MakePayment + // which could result in weird behaviour... + ta.tokenLock.Lock() + defer ta.tokenLock.Unlock() if len(ta.tokens) == 0 { - return privacypass.SpentToken{}, errors.New("No more tokens") + return privacypass.SpentToken{}, len(ta.tokens), errors.New("No more tokens") } token := ta.tokens[0] ta.tokens = ta.tokens[1:] - return token.SpendToken(append(data, hostname...)), nil + return token.SpendToken(append(data, hostname...)), len(ta.tokens), nil }