Merge pull request 'Wire up SendMessageToGroupError' (#355) from groupwiring into master
continuous-integration/drone/tag Build is passing Details
continuous-integration/drone/push Build is failing Details

Reviewed-on: #355
This commit is contained in:
erinn 2021-05-10 16:52:00 -07:00
commit a4064b1872
4 changed files with 60 additions and 39 deletions

View File

@ -216,13 +216,12 @@ func (p *Profile) AckSentMessageToPeer(onion string, eventID string) int {
} }
// AddGroupSentMessageError searches matching groups for the message by sig and marks it as an error // AddGroupSentMessageError searches matching groups for the message by sig and marks it as an error
func (p *Profile) AddGroupSentMessageError(groupServer string, signature string, error string) { func (p *Profile) AddGroupSentMessageError(groupID string, signature []byte, error string) {
for _, group := range p.Groups { p.lock.Lock()
if group.GroupServer == groupServer { defer p.lock.Unlock()
if group.ErrorSentMessage([]byte(signature), error) { group, exists := p.Groups[groupID]
break if exists {
} group.ErrorSentMessage(signature, error)
}
} }
} }

View File

@ -473,8 +473,8 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
// It returns the signature of the message which can be used to identify it in any UX layer. // It returns the signature of the message which can be used to identify it in any UX layer.
func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) { func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) {
cp.mutex.Lock() cp.mutex.Lock()
group := cp.Profile.GetGroup(groupid)
defer cp.mutex.Unlock() defer cp.mutex.Unlock()
group := cp.Profile.GetGroup(groupid)
if group == nil { if group == nil {
return "", errors.New("invalid group id") return "", errors.New("invalid group id")
@ -482,10 +482,10 @@ func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid) ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
if err == nil { if err == nil {
cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})) cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: groupid, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
} }
return string(sig), err return base64.StdEncoding.EncodeToString(sig), err
} }
func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
@ -702,7 +702,8 @@ func (cp *cwtchPeer) eventHandler() {
case event.SendMessageToGroupError: case event.SendMessageToGroupError:
cp.mutex.Lock() cp.mutex.Lock()
cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupServer], ev.Data[event.Signature], ev.Data[event.Error]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupID], signature, ev.Data[event.Error])
cp.mutex.Unlock() cp.mutex.Unlock()
case event.SendMessageToPeerError: case event.SendMessageToPeerError:

View File

@ -7,7 +7,6 @@ import (
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/networks/tor" "git.openprivacy.ca/cwtch.im/tapir/networks/tor"
"git.openprivacy.ca/cwtch.im/tapir/primitives" "git.openprivacy.ca/cwtch.im/tapir/primitives"
@ -54,6 +53,8 @@ type engine struct {
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages // Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
// Protocol Engine *can* associate Group Identifiers with Group Servers, although we don't currently make use of this fact
// other than to route errors back to the UI.
type Engine interface { type Engine interface {
ACN() connectivity.ACN ACN() connectivity.ACN
EventManager() event.Manager EventManager() event.Manager
@ -133,7 +134,7 @@ func (e *engine) eventHandler() {
// will result in a full sync // will result in a full sync
signature = []byte{} signature = []byte{}
} }
e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
case event.LeaveServer: case event.LeaveServer:
es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer]) es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer])
if ok { if ok {
@ -151,10 +152,7 @@ func (e *engine) eventHandler() {
case event.SendMessageToGroup: case event.SendMessageToGroup:
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
err := e.sendMessageToGroup(ev.Data[event.GroupServer], ciphertext, signature) go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature)
if err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
case event.SendMessageToPeer: case event.SendMessageToPeer:
// TODO: remove this passthrough once the UI is integrated. // TODO: remove this passthrough once the UI is integrated.
context, ok := ev.Data[event.EventContext] context, ok := ev.Data[event.EventContext]
@ -443,31 +441,38 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes
} }
// sendMessageToGroup attempts to sent the given message to the given group id. // sendMessageToGroup attempts to sent the given message to the given group id.
func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) error { func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte) {
es, ok := e.ephemeralServices.Load(server) es, ok := e.ephemeralServices.Load(server)
if !ok { if !ok {
return fmt.Errorf("no service exists for group %v", server) e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
} }
ephemeralService := es.(tapir.Service) ephemeralService := es.(tapir.Service)
conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
if err == nil { if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient) tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok { if ok {
attempts := 0 if spent, numtokens := tokenApp.Post(ct, sig); spent == false {
for tokenApp.Post(ct, sig) == false { // TODO: while this works for the spam guard, it won't work for other forms of payment...
// TODO This should eventually be wired back into the UI to allow it to error // Make an -inline- payment, this will hold the goroutine
tokenApp.MakePayment() if err := tokenApp.MakePayment(); err == nil {
time.Sleep(time.Second * 5) // This really shouldn't fail since we now know we have the required tokens...
attempts++ if spent, _ := tokenApp.Post(ct, sig); spent == false {
if attempts == 5 { e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
return errors.New("failed to post to token board") }
} else {
// Broadast the token error
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
} }
} else if numtokens < 5 {
go tokenApp.MakePayment()
} }
return nil // regardless we return....
return
} }
return errors.New("failed type assertion conn.App != TokenBoardClientApp")
} }
return err e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-connection-not-valid", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
} }
func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) { func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) {

View File

@ -12,6 +12,7 @@ import (
"git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"github.com/gtank/ristretto255" "github.com/gtank/ristretto255"
"sync"
) )
// NewTokenBoardClient generates a new Client for Token Board // NewTokenBoardClient generates a new Client for Token Board
@ -39,6 +40,7 @@ type TokenBoardClient struct {
// Token service handling // Token service handling
acn connectivity.ACN acn connectivity.ACN
tokens []*privacypass.Token tokens []*privacypass.Token
tokenLock sync.Mutex
tokenService *privacypass.TokenServer tokenService *privacypass.TokenServer
tokenServiceOnion string tokenServiceOnion string
lastKnownSignature []byte lastKnownSignature []byte
@ -65,6 +67,9 @@ func (ta *TokenBoardClient) Init(connection tapir.Connection) {
ta.connection.SetCapability(groups.CwtchServerSyncedCapability) ta.connection.SetCapability(groups.CwtchServerSyncedCapability)
log.Debugf("Successfully Initialized Connection") log.Debugf("Successfully Initialized Connection")
go ta.Listen() go ta.Listen()
// Optimistically acquire many tokens for this server...
go ta.MakePayment()
go ta.MakePayment()
ta.Replay() ta.Replay()
} else { } else {
connection.Close() connection.Close()
@ -145,21 +150,21 @@ func (ta *TokenBoardClient) PurchaseTokens() {
} }
// Post sends a Post Request to the server // Post sends a Post Request to the server
func (ta *TokenBoardClient) Post(ct []byte, sig []byte) bool { func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) {
egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig} egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig}
token, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname()) token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname())
if err == nil { if err == nil {
data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}}) data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}})
log.Debugf("Message Length: %s %v", data, len(data)) log.Debugf("Message Length: %s %v", data, len(data))
ta.connection.Send(data) ta.connection.Send(data)
return true return true, numTokens
} }
log.Debugf("No Valid Tokens: %v", err) log.Debugf("No Valid Tokens: %v", err)
return false return false, numTokens
} }
// MakePayment uses the PoW based token protocol to obtain more tokens // MakePayment uses the PoW based token protocol to obtain more tokens
func (ta *TokenBoardClient) MakePayment() { func (ta *TokenBoardClient) MakePayment() error {
log.Debugf("Making a Payment %v", ta) log.Debugf("Making a Payment %v", ta)
id, sk := primitives.InitializeEphemeralIdentity() id, sk := primitives.InitializeEphemeralIdentity()
var client tapir.Service var client tapir.Service
@ -172,23 +177,34 @@ func (ta *TokenBoardClient) MakePayment() {
ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability). ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability).
ChainApplication(tokenApplication, applications.HasTokensCapability) ChainApplication(tokenApplication, applications.HasTokensCapability)
client.Connect(ta.tokenServiceOnion, powTokenApp) client.Connect(ta.tokenServiceOnion, powTokenApp)
log.Debugf("Waiting for successful PoW Auth...")
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability) conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
if err == nil { if err == nil {
powtapp, _ := conn.App().(*applications.TokenApplication) powtapp, _ := conn.App().(*applications.TokenApplication)
// Update tokens...we need a lock here to prevent SpendToken from modifying the tokens
// during this process..
log.Debugf("Updating Tokens")
ta.tokenLock.Lock()
ta.tokens = append(ta.tokens, powtapp.Tokens...) ta.tokens = append(ta.tokens, powtapp.Tokens...)
ta.tokenLock.Unlock()
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit()) log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
conn.Close() conn.Close()
return return nil
} }
log.Debugf("Error making payment: to %v %v", ta.tokenServiceOnion, err) log.Debugf("Error making payment: to %v %v", ta.tokenServiceOnion, err)
return err
} }
// NextToken retrieves the next token // NextToken retrieves the next token
func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, error) { func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, int, error) {
// Taken the first new token, we need a lock here because tokens can be appended by MakePayment
// which could result in weird behaviour...
ta.tokenLock.Lock()
defer ta.tokenLock.Unlock()
if len(ta.tokens) == 0 { if len(ta.tokens) == 0 {
return privacypass.SpentToken{}, errors.New("No more tokens") return privacypass.SpentToken{}, len(ta.tokens), errors.New("No more tokens")
} }
token := ta.tokens[0] token := ta.tokens[0]
ta.tokens = ta.tokens[1:] ta.tokens = ta.tokens[1:]
return token.SpendToken(append(data, hostname...)), nil return token.SpendToken(append(data, hostname...)), len(ta.tokens), nil
} }