Merge branch 'filesharing' of git.openprivacy.ca:cwtch.im/cwtch into erinnnewfilesharing

This commit is contained in:
erinn 2021-09-15 13:35:22 -07:00
commit f0c4c4a92e
6 changed files with 95 additions and 40 deletions

4
go.mod
View File

@ -3,9 +3,9 @@ module cwtch.im/cwtch
go 1.14
require (
git.openprivacy.ca/cwtch.im/tapir v0.4.5
git.openprivacy.ca/cwtch.im/tapir v0.4.6
git.openprivacy.ca/openprivacy/connectivity v1.4.5
git.openprivacy.ca/openprivacy/log v1.0.2
git.openprivacy.ca/openprivacy/log v1.0.3
github.com/gtank/ristretto255 v0.1.2
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/struCoder/pidusage v0.1.3

4
go.sum
View File

@ -20,6 +20,8 @@ git.openprivacy.ca/cwtch.im/tapir v0.4.4 h1:KyuTVmr9GYptTCeR7JDODjmhBBbnIBf9V3NS
git.openprivacy.ca/cwtch.im/tapir v0.4.4/go.mod h1:qMFTdmDZITc1BLP1jSW0gVpLmvpg+Zjsh5ek8StwbFE=
git.openprivacy.ca/cwtch.im/tapir v0.4.5 h1:CTMFdH2qBt4dkiyzNbxuQpK52iKerZmFnG2SfLLSmS8=
git.openprivacy.ca/cwtch.im/tapir v0.4.5/go.mod h1:qMFTdmDZITc1BLP1jSW0gVpLmvpg+Zjsh5ek8StwbFE=
git.openprivacy.ca/cwtch.im/tapir v0.4.6 h1:hoKIXNfsWjTiLry9NYTBc9VJYSST3WydHv+Wrt+bn8s=
git.openprivacy.ca/cwtch.im/tapir v0.4.6/go.mod h1:qMFTdmDZITc1BLP1jSW0gVpLmvpg+Zjsh5ek8StwbFE=
git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c=
git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU=
git.openprivacy.ca/openprivacy/connectivity v1.4.0 h1:c7AANUCrlA4hIqXxIGDOWMtSe8CpDleD1877PShScbM=
@ -37,6 +39,8 @@ git.openprivacy.ca/openprivacy/connectivity v1.4.5/go.mod h1:JVRCIdL+lAG6ohBFWiK
git.openprivacy.ca/openprivacy/log v1.0.1/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
git.openprivacy.ca/openprivacy/log v1.0.2 h1:HLP4wsw4ljczFAelYnbObIs821z+jgMPCe8uODPnGQM=
git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0=
git.openprivacy.ca/openprivacy/log v1.0.3/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -160,7 +160,7 @@ func (e *engine) eventHandler() {
case event.SendMessageToGroup:
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature)
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0)
case event.SendMessageToPeer:
// TODO: remove this passthrough once the UI is integrated.
context, ok := ev.Data[event.EventContext]
@ -450,7 +450,7 @@ func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error {
return err
}
e.getValRequests.Store(onion + eventID, message)
e.getValRequests.Store(onion+eventID, message)
return e.sendPeerMessage(onion, model3.PeerMessage{ID: eventID, Context: event.ContextGetVal, Data: message})
}
@ -480,7 +480,18 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes
}
// sendMessageToGroup attempts to sent the given message to the given group id.
func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte) {
func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) {
// sending to groups can fail for a few reasons (slow server, not enough tokens, etc.)
// rather than trying to keep all that logic in method we simply back-off and try again
// but if we fail more than 5 times then we report back to the client so they can investigate other options.
// Note: This flow only applies to online-and-connected servers (this method will return faster if the server is not
// online)
if attempts >= 5 {
log.Errorf("failed to post a message to a group after %v attempts", attempts)
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "could not make payment to server", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
return
}
es, ok := e.ephemeralServices.Load(server)
if es == nil || !ok {
@ -494,19 +505,16 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok {
if spent, numtokens := tokenApp.Post(ct, sig); !spent {
// TODO: while this works for the spam guard, it won't work for other forms of payment...
// Make an -inline- payment, this will hold the goroutine
if err := tokenApp.MakePayment(); err == nil {
// This really shouldn't fail since we now know we have the required tokens...
if spent, _ := tokenApp.Post(ct, sig); !spent {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
}
} else {
// Broadcast the token error
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
}
} else if numtokens < 5 {
// we failed to post, probably because we ran out of tokens... so make a payment
go tokenApp.MakePayment()
// backoff
time.Sleep(time.Second)
// try again
e.sendMessageToGroup(groupID, server, ct, sig, attempts+1)
} else {
if numtokens < 5 {
go tokenApp.MakePayment()
}
}
// regardless we return....
return
@ -519,16 +527,16 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) {
log.Debugf("New message from peer: %v %v", hostname, context)
if context == event.ContextAck {
if context == event.ContextAck {
e.peerAck(hostname, eventID)
} else if context == event.ContextRetVal {
req, ok := e.getValRequests.Load(hostname + eventID)
req, ok := e.getValRequests.Load(hostname + eventID)
if ok {
reqStr := req.([]byte)
e.handlePeerRetVal(hostname, reqStr, message)
e.getValRequests.Delete(hostname + eventID)
e.getValRequests.Delete(hostname + eventID)
} else {
log.Errorf("could not find val request for %v %s",hostname, eventID)
log.Errorf("could not find val request for %v %s", hostname, eventID)
}
} else if context == event.ContextGetVal {
var getVal peerGetVal

View File

@ -79,9 +79,9 @@ func (pa *PeerApp) listen() {
var peerMessage model2.PeerMessage
err := json.Unmarshal(message, &peerMessage)
if err == nil {
if pa.IsAllowed(pa.connection.Hostname()) {
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)
}
if pa.IsAllowed(pa.connection.Hostname()) {
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)
}
} else {
log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err)
}

View File

@ -12,7 +12,9 @@ import (
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log"
"github.com/gtank/ristretto255"
"reflect"
"sync"
"time"
)
// NewTokenBoardClient generates a new Client for Token Board
@ -179,23 +181,38 @@ func (ta *TokenBoardClient) MakePayment() error {
powTokenApp := new(applications.ApplicationChain).
ChainApplication(new(applications.ProofOfWorkApplication), applications.SuccessfulProofOfWorkCapability).
ChainApplication(tokenApplication, applications.HasTokensCapability)
client.Connect(ta.tokenServiceOnion, powTokenApp)
log.Debugf("Waiting for successful PoW Auth...")
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
if err == nil {
powtapp, _ := conn.App().(*applications.TokenApplication)
// Update tokens...we need a lock here to prevent SpendToken from modifying the tokens
// during this process..
log.Debugf("Updating Tokens")
ta.tokenLock.Lock()
ta.tokens = append(ta.tokens, powtapp.Tokens...)
ta.tokenLock.Unlock()
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
conn.Close()
return nil
connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp)
if connected == true && err == nil {
log.Debugf("Waiting for successful Token Acquisition...")
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
if err == nil {
powtapp, ok := conn.App().(*applications.TokenApplication)
if ok {
// Update tokens...we need a lock here to prevent SpendToken from modifying the tokens
// during this process..
log.Debugf("Updating Tokens")
ta.tokenLock.Lock()
ta.tokens = append(ta.tokens, powtapp.Tokens...)
ta.tokenLock.Unlock()
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
conn.Close()
return nil
}
log.Errorf("invalid cast of powapp. this should not happen %v %v", powtapp, reflect.TypeOf(conn.App()))
return errors.New("invalid cast of powapp. this should never happen")
}
log.Debugf("could not connect to payment server %v..trying again")
return ta.MakePayment()
} else if connected && err != nil {
log.Debugf("inexplicable error: %v", err)
}
log.Debugf("Error making payment: to %v %v", ta.tokenServiceOnion, err)
return err
log.Debugf("failed to make a connection. trying again...")
// it doesn't actually take that long to make a payment, so waiting a small amount of time should suffice
time.Sleep(time.Second)
return ta.MakePayment()
}
// NextToken retrieves the next token

View File

@ -1,4 +1,4 @@
package testing
package filesharing
import (
"crypto/rand"
@ -7,6 +7,9 @@ import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/functionality/filesharing"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/protocol/files"
"encoding/base64"
"encoding/hex"
@ -24,6 +27,29 @@ import (
"time"
)
func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) {
for {
state, ok := peera.GetPeerState(peerb.GetOnion())
if ok {
//log.Infof("Waiting for Peer %v to peer with peer: %v - state: %v\n", peera.GetProfile().Name, peerb.GetProfile().Name, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peera.GetOnion(), peerb.GetOnion())
}
if state != connections.AUTHENTICATED {
fmt.Printf("peer %v waiting connect to peer %v, currently: %v\n", peera.GetOnion(), peerb.GetOnion(), connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
peerAName, _ := peera.GetAttribute(attr.GetLocalScope("name"))
peerBName, _ := peerb.GetAttribute(attr.GetLocalScope("name"))
fmt.Printf("%v CONNECTED and AUTHED to %v\n", peerAName, peerBName)
break
}
}
}
return
}
func TestFileSharing(t *testing.T) {
numGoRoutinesStart := runtime.NumGoroutine()