@ -1,12 +1,18 @@
package connections
import (
"encoding/base64"
"encoding/json"
"fmt"
"strconv"
"sync"
"time"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol/files"
"cwtch.im/cwtch/protocol/groups"
"encoding/base64"
"encoding/json"
"errors"
pmodel "cwtch.im/cwtch/protocol/model"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
@ -16,9 +22,6 @@ import (
"git.openprivacy.ca/openprivacy/log"
"github.com/gtank/ristretto255"
"golang.org/x/crypto/ed25519"
"strconv"
"sync"
"time"
)
type engine struct {
@ -40,12 +43,17 @@ type engine struct {
// Nextgen Tapir Service
service tapir . Service
getValRequests sync . Map // [string]string eventID:Data
// Nextgen Tapir Service
ephemeralServices sync . Map // string(onion) => tapir.Service
// Required for listen(), inaccessible from identity
privateKey ed25519 . PrivateKey
// file sharing subsystem is responsible for maintaining active shares and downloads
filesharingSubSystem files . FileSharingSubSystem
shuttingDown bool
}
@ -92,6 +100,11 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine . eventManager . Subscribe ( event . BlockUnknownPeers , engine . queue )
engine . eventManager . Subscribe ( event . AllowUnknownPeers , engine . queue )
// File Handling
engine . eventManager . Subscribe ( event . ShareManifest , engine . queue )
engine . eventManager . Subscribe ( event . ManifestSizeReceived , engine . queue )
engine . eventManager . Subscribe ( event . ManifestSaved , engine . queue )
for peer , authorization := range peerAuthorizations {
engine . authorizations . Store ( peer , authorization )
}
@ -125,7 +138,10 @@ func (e *engine) eventHandler() {
go e . peerWithOnion ( ev . Data [ event . RemotePeer ] )
}
case event . InvitePeerToGroup :
e . sendMessageToPeer ( ev . EventID , ev . Data [ event . RemotePeer ] , event . ContextInvite , [ ] byte ( ev . Data [ event . GroupInvite ] ) )
err := e . sendPeerMessage ( ev . Data [ event . RemotePeer ] , pmodel . PeerMessage { ID : ev . EventID , Context : event . ContextInvite , Data : [ ] byte ( ev . Data [ event . GroupInvite ] ) } )
if err != nil {
}
case event . JoinServer :
signature , err := base64 . StdEncoding . DecodeString ( ev . Data [ event . Signature ] )
if err != nil {
@ -145,21 +161,24 @@ func (e *engine) eventHandler() {
case event . SendMessageToGroup :
ciphertext , _ := base64 . StdEncoding . DecodeString ( ev . Data [ event . Ciphertext ] )
signature , _ := base64 . StdEncoding . DecodeString ( ev . Data [ event . Signature ] )
go e . sendMessageToGroup ( ev . Data [ event . GroupID ] , ev . Data [ event . GroupServer ] , ciphertext , signature )
go e . sendMessageToGroup ( ev . Data [ event . GroupID ] , ev . Data [ event . GroupServer ] , ciphertext , signature , 0 )
case event . SendMessageToPeer :
// TODO: remove this passthrough once the UI is integrated.
context , ok := ev . Data [ event . EventContext ]
if ! ok {
context = event . ContextRaw
}
err := e . sendMessageToPeer ( ev . EventID , ev . Data [ event . RemotePeer ] , context , [ ] byte ( ev . Data [ event . Data ] ) )
if err != nil {
if err := e . sendPeerMessage ( ev . Data [ event . RemotePeer ] , pmodel . PeerMessage { ID : ev . EventID , Context : context , Data : [ ] byte ( ev . Data [ event . Data ] ) } ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : ev . Data [ event . RemotePeer ] , event . EventID : ev . EventID , event . Error : "peer is offline or the connection has yet to finalize" } ) )
}
case event . SendGetValMessageToPeer :
e . sendGetValToPeer ( ev . EventID , ev . Data [ event . RemotePeer ] , ev . Data [ event . Scope ] , ev . Data [ event . Path ] )
if err := e . sendGetValToPeer ( ev . EventID , ev . Data [ event . RemotePeer ] , ev . Data [ event . Scope ] , ev . Data [ event . Path ] ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : ev . Data [ event . RemotePeer ] , event . EventID : ev . EventID , event . Error : err . Error ( ) } ) )
}
case event . SendRetValMessageToPeer :
e . sendRetValToPeer ( ev . EventID , ev . Data [ event . RemotePeer ] , ev . Data [ event . Data ] , ev . Data [ event . Exists ] )
if err := e . sendRetValToPeer ( ev . EventID , ev . Data [ event . RemotePeer ] , ev . Data [ event . Data ] , ev . Data [ event . Exists ] ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : ev . Data [ event . RemotePeer ] , event . EventID : ev . EventID , event . Error : err . Error ( ) } ) )
}
case event . SetPeerAuthorization :
auth := model . Authorization ( ev . Data [ event . Authorization ] )
e . authorizations . Store ( ev . Data [ event . RemotePeer ] , auth )
@ -181,6 +200,29 @@ func (e *engine) eventHandler() {
e . blockUnknownContacts = true
case event . ProtocolEngineStartListen :
go e . listenFn ( )
case event . ShareManifest :
e . filesharingSubSystem . ShareFile ( ev . Data [ event . FileKey ] , ev . Data [ event . SerializedManifest ] )
case event . ManifestSizeReceived :
handle := ev . Data [ event . Handle ]
key := ev . Data [ event . FileKey ]
size , _ := strconv . Atoi ( ev . Data [ event . ManifestSize ] )
if err := e . sendPeerMessage ( handle , e . filesharingSubSystem . FetchManifest ( key , uint64 ( size ) ) ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : ev . Data [ event . RemotePeer ] , event . EventID : ev . EventID , event . Error : err . Error ( ) } ) )
}
case event . ManifestSaved :
handle := ev . Data [ event . Handle ]
key := ev . Data [ event . FileKey ]
serializedManifest := ev . Data [ event . SerializedManifest ]
tempFile := ev . Data [ event . TempFile ]
title := ev . Data [ event . NameSuggestion ]
// NOTE: for now there will probably only ever be a single chunk request. When we enable group
// sharing and rehosting then this loop will serve as a a way of splitting the request among multiple
// contacts
for _ , message := range e . filesharingSubSystem . CompileChunkRequests ( key , serializedManifest , tempFile , title ) {
if err := e . sendPeerMessage ( handle , message ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : ev . Data [ event . RemotePeer ] , event . EventID : ev . EventID , event . Error : err . Error ( ) } ) )
}
}
default :
return
}
@ -217,7 +259,6 @@ func (e *engine) createPeerTemplate() *PeerApp {
peerAppTemplate . OnAuth = e . ignoreOnShutdown ( e . peerAuthed )
peerAppTemplate . OnConnecting = e . ignoreOnShutdown ( e . peerConnecting )
peerAppTemplate . OnClose = e . ignoreOnShutdown ( e . peerDisconnected )
peerAppTemplate . RetValHandler = e . handlePeerRetVal
return peerAppTemplate
}
@ -404,28 +445,16 @@ func (e *engine) peerDisconnected(onion string) {
} ) )
}
// sendMessageToPeer sends a message to a peer under a given context
func ( e * engine ) sendMessageToPeer ( eventID string , onion string , context string , message [ ] byte ) error {
conn , err := e . service . WaitForCapabilityOrClose ( onion , cwtchCapability )
if err == nil {
peerApp , ok := ( conn . App ( ) ) . ( * PeerApp )
if ok {
peerApp . SendMessage ( PeerMessage { eventID , context , message } )
return nil
}
return errors . New ( "failed type assertion conn.App != PeerApp" )
}
return err
}
func ( e * engine ) sendGetValToPeer ( eventID , onion , scope , path string ) error {
log . Debugf ( "sendGetValMessage to peer %v %v %v\n", onion , scope , path )
log . Debugf ( "sendGetValMessage to peer %v %v.%v\n" , onion , scope , path )
getVal := peerGetVal { Scope : scope , Path : path }
message , err := json . Marshal ( getVal )
if err != nil {
return err
}
return e . sendMessageToPeer ( eventID , onion , event . ContextGetVal , message )
e . getValRequests . Store ( onion + eventID , message )
return e . sendPeerMessage ( onion , pmodel . PeerMessage { ID : eventID , Context : event . ContextGetVal , Data : message } )
}
func ( e * engine ) sendRetValToPeer ( eventID , onion , val , existsStr string ) error {
@ -436,7 +465,7 @@ func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error {
if err != nil {
return err
}
return e . send MessageToPeer( eventID , onion , event . ContextRetVal , message)
return e . send PeerMessage( onion , pmodel . PeerMessage { ID : eventID , Context : event . ContextRetVal , Data: message} )
}
func ( e * engine ) deleteConnection ( id string ) {
@ -454,7 +483,18 @@ func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMes
}
// sendMessageToGroup attempts to sent the given message to the given group id.
func ( e * engine ) sendMessageToGroup ( groupID string , server string , ct [ ] byte , sig [ ] byte ) {
func ( e * engine ) sendMessageToGroup ( groupID string , server string , ct [ ] byte , sig [ ] byte , attempts int ) {
// sending to groups can fail for a few reasons (slow server, not enough tokens, etc.)
// rather than trying to keep all that logic in method we simply back-off and try again
// but if we fail more than 5 times then we report back to the client so they can investigate other options.
// Note: This flow only applies to online-and-connected servers (this method will return faster if the server is not
// online)
if attempts >= 5 {
log . Errorf ( "failed to post a message to a group after %v attempts" , attempts )
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToGroupError , map [ event . Field ] string { event . GroupID : groupID , event . GroupServer : server , event . Error : "could not make payment to server" , event . Signature : base64 . StdEncoding . EncodeToString ( sig ) } ) )
return
}
es , ok := e . ephemeralServices . Load ( server )
if es == nil || ! ok {
@ -468,19 +508,16 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
tokenApp , ok := ( conn . App ( ) ) . ( * TokenBoardClient )
if ok {
if spent , numtokens := tokenApp . Post ( ct , sig ) ; ! spent {
// TODO: while this works for the spam guard, it won't work for other forms of payment...
// Make an -inline- payment, this will hold the goroutine
if err := tokenApp . MakePayment ( ) ; err == nil {
// This really shouldn't fail since we now know we have the required tokens...
if spent , _ := tokenApp . Post ( ct , sig ) ; ! spent {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToGroupError , map [ event . Field ] string { event . GroupID : groupID , event . GroupServer : server , event . Error : err . Error ( ) , event . Signature : base64 . StdEncoding . EncodeToString ( sig ) } ) )
}
} else {
// Broadast the token error
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToGroupError , map [ event . Field ] string { event . GroupID : groupID , event . GroupServer : server , event . Error : err . Error ( ) , event . Signature : base64 . StdEncoding . EncodeToString ( sig ) } ) )
}
} else if numtokens < 5 {
// we failed to post, probably because we ran out of tokens... so make a payment
go tokenApp . MakePayment ( )
// backoff
time . Sleep ( time . Second * 5 )
// try again
e . sendMessageToGroup ( groupID , server , ct , sig , attempts + 1 )
} else {
if numtokens < 5 {
go tokenApp . MakePayment ( )
}
}
// regardless we return....
return
@ -489,9 +526,22 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToGroupError , map [ event . Field ] string { event . GroupID : groupID , event . GroupServer : server , event . Error : "server-connection-not-valid" , event . Signature : base64 . StdEncoding . EncodeToString ( sig ) } ) )
}
// TODO this is becoming cluttered
func ( e * engine ) handlePeerMessage ( hostname string , eventID string , context string , message [ ] byte ) {
log . Debugf ( "New message from peer: %v %v" , hostname , context )
if context == event . ContextGetVal {
if context == event . ContextAck {
e . peerAck ( hostname , eventID )
} else if context == event . ContextRetVal {
req , ok := e . getValRequests . Load ( hostname + eventID )
if ok {
reqStr := req . ( [ ] byte )
e . handlePeerRetVal ( hostname , reqStr , message )
e . getValRequests . Delete ( hostname + eventID )
} else {
log . Errorf ( "could not find val request for %v %s" , hostname , eventID )
}
} else if context == event . ContextGetVal {
var getVal peerGetVal
err := json . Unmarshal ( message , & getVal )
if err == nil {
@ -499,8 +549,46 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
ev . EventID = eventID
e . eventManager . Publish ( ev )
}
} else if context == event . ContextRequestManifest {
for _ , message := range e . filesharingSubSystem . RequestManifestParts ( eventID ) {
if err := e . sendPeerMessage ( hostname , message ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : hostname , event . EventID : eventID , event . Error : err . Error ( ) } ) )
}
}
} else if context == event . ContextSendManifest {
if fileKey , manifest := e . filesharingSubSystem . ReceiveManifestPart ( eventID , message ) ; len ( manifest ) != 0 {
// We have a valid manifest
e . eventManager . Publish ( event . NewEvent ( event . ManifestReceived , map [ event . Field ] string { event . Handle : hostname , event . FileKey : fileKey , event . SerializedManifest : manifest } ) )
}
} else if context == event . ContextRequestFile {
for _ , message := range e . filesharingSubSystem . ProcessChunkRequest ( eventID , message ) {
if err := e . sendPeerMessage ( hostname , message ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : hostname , event . EventID : eventID , event . Error : err . Error ( ) } ) )
}
}
} else if context == event . ContextSendFile {
fileKey , progress , totalChunks , _ , title := e . filesharingSubSystem . ProcessChunk ( eventID , message )
if len ( fileKey ) != 0 {
e . eventManager . Publish ( event . NewEvent ( event . FileDownloadProgressUpdate , map [ event . Field ] string { event . FileKey : fileKey , event . Progress : strconv . Itoa ( int ( progress ) ) , event . FileSizeInChunks : strconv . Itoa ( int ( totalChunks ) ) , event . NameSuggestion : title } ) )
if progress == totalChunks {
if tempFile , filePath , success := e . filesharingSubSystem . VerifyFile ( fileKey ) ; success {
log . Debugf ( "file verified and downloaded!" )
e . eventManager . Publish ( event . NewEvent ( event . FileDownloaded , map [ event . Field ] string { event . FileKey : fileKey , event . FilePath : filePath , event . TempFile : tempFile } ) )
} else {
log . Debugf ( "file failed to verify!" )
e . eventManager . Publish ( event . NewEvent ( event . FileVerificationFailed , map [ event . Field ] string { event . FileKey : fileKey } ) )
}
}
}
} else {
// Fall through handler for the default text conversation.
e . eventManager . Publish ( event . NewEvent ( event . NewMessageFromPeer , map [ event . Field ] string { event . TimestampReceived : time . Now ( ) . Format ( time . RFC3339Nano ) , event . RemotePeer : hostname , event . Data : string ( message ) } ) )
// Send an explicit acknowledgement
// Every other protocol should have a explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow
if err := e . sendPeerMessage ( hostname , pmodel . PeerMessage { ID : eventID , Context : event . ContextAck , Data : [ ] byte { } } ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : hostname , event . EventID : eventID , event . Error : err . Error ( ) } ) )
}
}
}
@ -530,3 +618,17 @@ func (e *engine) leaveServer(server string) {
e . ephemeralServices . Delete ( server )
}
}
func ( e * engine ) sendPeerMessage ( handle string , message pmodel . PeerMessage ) error {
conn , err := e . service . WaitForCapabilityOrClose ( handle , cwtchCapability )
if err == nil {
peerApp , ok := ( conn . App ( ) ) . ( * PeerApp )
if ok {
return peerApp . SendMessage ( message )
}
log . Debugf ( "could not derive peer app: %v" , err )
return fmt . Errorf ( "could not find peer app to send message to: %v" , handle )
}
log . Debugf ( "could not send peer message: %v" , err )
return err
}