forked from cwtch.im/cwtch
Don't inline group server connections...
This commit is contained in:
parent
00dc2e60e5
commit
e7fc228cfa
|
@ -473,8 +473,8 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
|
||||||
// It returns the signature of the message which can be used to identify it in any UX layer.
|
// It returns the signature of the message which can be used to identify it in any UX layer.
|
||||||
func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) {
|
func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) {
|
||||||
cp.mutex.Lock()
|
cp.mutex.Lock()
|
||||||
group := cp.Profile.GetGroup(groupid)
|
|
||||||
defer cp.mutex.Unlock()
|
defer cp.mutex.Unlock()
|
||||||
|
group := cp.Profile.GetGroup(groupid)
|
||||||
|
|
||||||
if group == nil {
|
if group == nil {
|
||||||
return "", errors.New("invalid group id")
|
return "", errors.New("invalid group id")
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"git.openprivacy.ca/cwtch.im/tapir"
|
"git.openprivacy.ca/cwtch.im/tapir"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
||||||
|
@ -135,7 +134,7 @@ func (e *engine) eventHandler() {
|
||||||
// will result in a full sync
|
// will result in a full sync
|
||||||
signature = []byte{}
|
signature = []byte{}
|
||||||
}
|
}
|
||||||
e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
||||||
case event.LeaveServer:
|
case event.LeaveServer:
|
||||||
es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer])
|
es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer])
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -153,10 +152,7 @@ func (e *engine) eventHandler() {
|
||||||
case event.SendMessageToGroup:
|
case event.SendMessageToGroup:
|
||||||
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
|
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
|
||||||
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||||
err := e.sendMessageToGroup(ev.Data[event.GroupServer], ciphertext, signature)
|
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature)
|
||||||
if err != nil {
|
|
||||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: ev.Data[event.GroupID], event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error(), event.Signature: ev.Data[event.Signature]}))
|
|
||||||
}
|
|
||||||
case event.SendMessageToPeer:
|
case event.SendMessageToPeer:
|
||||||
// TODO: remove this passthrough once the UI is integrated.
|
// TODO: remove this passthrough once the UI is integrated.
|
||||||
context, ok := ev.Data[event.EventContext]
|
context, ok := ev.Data[event.EventContext]
|
||||||
|
@ -445,31 +441,38 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendMessageToGroup attempts to sent the given message to the given group id.
|
// sendMessageToGroup attempts to sent the given message to the given group id.
|
||||||
func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) error {
|
func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte) {
|
||||||
|
|
||||||
es, ok := e.ephemeralServices.Load(server)
|
es, ok := e.ephemeralServices.Load(server)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("no service exists for group %v", server)
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
|
||||||
}
|
}
|
||||||
ephemeralService := es.(tapir.Service)
|
ephemeralService := es.(tapir.Service)
|
||||||
|
|
||||||
conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
|
conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||||
if ok {
|
if ok {
|
||||||
attempts := 0
|
if spent, numtokens := tokenApp.Post(ct, sig); spent == false {
|
||||||
for tokenApp.Post(ct, sig) == false {
|
// TODO: while this works for the spam guard, it won't work for other forms of payment...
|
||||||
// TODO This should eventually be wired back into the UI to allow it to error
|
// Make an -inline- payment, this will hold the goroutine
|
||||||
tokenApp.MakePayment()
|
if err := tokenApp.MakePayment(); err == nil {
|
||||||
time.Sleep(time.Second * 5)
|
// This really shouldn't fail since we now know we have the required tokens...
|
||||||
attempts++
|
if spent, _ := tokenApp.Post(ct, sig); spent == false {
|
||||||
if attempts == 5 {
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
|
||||||
return errors.New("failed to post to token board")
|
}
|
||||||
|
} else {
|
||||||
|
// Broadast the token error
|
||||||
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
|
||||||
}
|
}
|
||||||
|
} else if numtokens < 5 {
|
||||||
|
go tokenApp.MakePayment()
|
||||||
}
|
}
|
||||||
return nil
|
// regardless we return....
|
||||||
|
return
|
||||||
}
|
}
|
||||||
return errors.New("failed type assertion conn.App != TokenBoardClientApp")
|
|
||||||
}
|
}
|
||||||
return err
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-connection-not-valid", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) {
|
func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) {
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"git.openprivacy.ca/openprivacy/connectivity"
|
"git.openprivacy.ca/openprivacy/connectivity"
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
"github.com/gtank/ristretto255"
|
"github.com/gtank/ristretto255"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewTokenBoardClient generates a new Client for Token Board
|
// NewTokenBoardClient generates a new Client for Token Board
|
||||||
|
@ -39,6 +40,7 @@ type TokenBoardClient struct {
|
||||||
// Token service handling
|
// Token service handling
|
||||||
acn connectivity.ACN
|
acn connectivity.ACN
|
||||||
tokens []*privacypass.Token
|
tokens []*privacypass.Token
|
||||||
|
tokenLock sync.Mutex
|
||||||
tokenService *privacypass.TokenServer
|
tokenService *privacypass.TokenServer
|
||||||
tokenServiceOnion string
|
tokenServiceOnion string
|
||||||
lastKnownSignature []byte
|
lastKnownSignature []byte
|
||||||
|
@ -65,6 +67,9 @@ func (ta *TokenBoardClient) Init(connection tapir.Connection) {
|
||||||
ta.connection.SetCapability(groups.CwtchServerSyncedCapability)
|
ta.connection.SetCapability(groups.CwtchServerSyncedCapability)
|
||||||
log.Debugf("Successfully Initialized Connection")
|
log.Debugf("Successfully Initialized Connection")
|
||||||
go ta.Listen()
|
go ta.Listen()
|
||||||
|
// Optimistically acquire many tokens for this server...
|
||||||
|
go ta.MakePayment()
|
||||||
|
go ta.MakePayment()
|
||||||
ta.Replay()
|
ta.Replay()
|
||||||
} else {
|
} else {
|
||||||
connection.Close()
|
connection.Close()
|
||||||
|
@ -145,21 +150,21 @@ func (ta *TokenBoardClient) PurchaseTokens() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Post sends a Post Request to the server
|
// Post sends a Post Request to the server
|
||||||
func (ta *TokenBoardClient) Post(ct []byte, sig []byte) bool {
|
func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) {
|
||||||
egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig}
|
egm := groups.EncryptedGroupMessage{Ciphertext: ct, Signature: sig}
|
||||||
token, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname())
|
token, numTokens, err := ta.NextToken(egm.ToBytes(), ta.connection.Hostname())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}})
|
data, _ := json.Marshal(groups.Message{MessageType: groups.PostRequestMessage, PostRequest: &groups.PostRequest{EGM: egm, Token: token}})
|
||||||
log.Debugf("Message Length: %s %v", data, len(data))
|
log.Debugf("Message Length: %s %v", data, len(data))
|
||||||
ta.connection.Send(data)
|
ta.connection.Send(data)
|
||||||
return true
|
return true, numTokens
|
||||||
}
|
}
|
||||||
log.Debugf("No Valid Tokens: %v", err)
|
log.Debugf("No Valid Tokens: %v", err)
|
||||||
return false
|
return false, numTokens
|
||||||
}
|
}
|
||||||
|
|
||||||
// MakePayment uses the PoW based token protocol to obtain more tokens
|
// MakePayment uses the PoW based token protocol to obtain more tokens
|
||||||
func (ta *TokenBoardClient) MakePayment() {
|
func (ta *TokenBoardClient) MakePayment() error {
|
||||||
log.Debugf("Making a Payment %v", ta)
|
log.Debugf("Making a Payment %v", ta)
|
||||||
id, sk := primitives.InitializeEphemeralIdentity()
|
id, sk := primitives.InitializeEphemeralIdentity()
|
||||||
var client tapir.Service
|
var client tapir.Service
|
||||||
|
@ -172,23 +177,34 @@ func (ta *TokenBoardClient) MakePayment() {
|
||||||
ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability).
|
ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability).
|
||||||
ChainApplication(tokenApplication, applications.HasTokensCapability)
|
ChainApplication(tokenApplication, applications.HasTokensCapability)
|
||||||
client.Connect(ta.tokenServiceOnion, powTokenApp)
|
client.Connect(ta.tokenServiceOnion, powTokenApp)
|
||||||
|
log.Debugf("Waiting for successful PoW Auth...")
|
||||||
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
|
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
powtapp, _ := conn.App().(*applications.TokenApplication)
|
powtapp, _ := conn.App().(*applications.TokenApplication)
|
||||||
|
// Update tokens...we need a lock here to prevent SpendToken from modifying the tokens
|
||||||
|
// during this process..
|
||||||
|
log.Debugf("Updating Tokens")
|
||||||
|
ta.tokenLock.Lock()
|
||||||
ta.tokens = append(ta.tokens, powtapp.Tokens...)
|
ta.tokens = append(ta.tokens, powtapp.Tokens...)
|
||||||
|
ta.tokenLock.Unlock()
|
||||||
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
|
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
log.Debugf("Error making payment: to %v %v", ta.tokenServiceOnion, err)
|
log.Debugf("Error making payment: to %v %v", ta.tokenServiceOnion, err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// NextToken retrieves the next token
|
// NextToken retrieves the next token
|
||||||
func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, error) {
|
func (ta *TokenBoardClient) NextToken(data []byte, hostname string) (privacypass.SpentToken, int, error) {
|
||||||
|
// Taken the first new token, we need a lock here because tokens can be appended by MakePayment
|
||||||
|
// which could result in weird behaviour...
|
||||||
|
ta.tokenLock.Lock()
|
||||||
|
defer ta.tokenLock.Unlock()
|
||||||
if len(ta.tokens) == 0 {
|
if len(ta.tokens) == 0 {
|
||||||
return privacypass.SpentToken{}, errors.New("No more tokens")
|
return privacypass.SpentToken{}, len(ta.tokens), errors.New("No more tokens")
|
||||||
}
|
}
|
||||||
token := ta.tokens[0]
|
token := ta.tokens[0]
|
||||||
ta.tokens = ta.tokens[1:]
|
ta.tokens = ta.tokens[1:]
|
||||||
return token.SpendToken(append(data, hostname...)), nil
|
return token.SpendToken(append(data, hostname...)), len(ta.tokens), nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue