2019-01-04 21:44:21 +00:00
package connections
import (
2022-10-03 20:05:36 +00:00
"encoding/base64"
"encoding/json"
"fmt"
2022-10-11 17:58:10 +00:00
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
2022-10-03 20:05:36 +00:00
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol/files"
"cwtch.im/cwtch/protocol/groups"
pmodel "cwtch.im/cwtch/protocol/model"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/openprivacy/connectivity"
torProvider "git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"github.com/gtank/ristretto255"
"golang.org/x/crypto/ed25519"
2019-01-04 21:44:21 +00:00
)
2022-12-03 00:23:11 +00:00
// 32 from tor/src/app/config/config.c MaxClientCircuitsPending
// we lower a bit because there's a lot of spillage
// - just cus we get a SOCKS timeout doesn't mean tor has stopped trying as a huge sorce
// - potential multiple profiles as a huge source
// - second order connections like token service's second servers aren't tracked in our system adding a few extra periodically
const TorMaxPendingConns = 28
2022-09-26 01:28:04 +00:00
2021-12-18 01:27:56 +00:00
type connectionLockedService struct {
2022-10-03 20:05:36 +00:00
service tapir . Service
connectingLock sync . Mutex
2021-12-18 01:27:56 +00:00
}
2019-05-15 19:58:57 +00:00
type engine struct {
2022-10-03 20:05:36 +00:00
queue event . Queue
2019-01-04 21:44:21 +00:00
2022-10-03 20:05:36 +00:00
// Engine Attributes
identity primitives . Identity
acn connectivity . ACN
2019-01-04 21:44:21 +00:00
2022-10-03 20:05:36 +00:00
// Authorization list of contacts to authorization status
authorizations sync . Map // string(onion) => model.Authorization
2019-01-08 18:58:01 +00:00
2022-10-03 20:05:36 +00:00
// Block Unknown Contacts
blockUnknownContacts bool
2019-08-21 19:25:26 +00:00
2022-10-03 20:05:36 +00:00
// Pointer to the Global Event Manager
eventManager event . Manager
2019-05-15 19:58:57 +00:00
2022-10-03 20:05:36 +00:00
// Nextgen Tapir Service
service tapir . Service
2019-07-17 19:10:52 +00:00
2022-10-03 20:05:36 +00:00
getValRequests sync . Map // [string]string eventID:Data
2021-09-30 00:57:13 +00:00
2022-10-03 20:05:36 +00:00
// Nextgen Tapir Service
ephemeralServices map [ string ] * connectionLockedService //sync.Map // string(onion) => tapir.Service
ephemeralServicesLock sync . Mutex
2020-07-14 00:46:05 +00:00
2022-10-03 20:05:36 +00:00
// Required for listen(), inaccessible from identity
privateKey ed25519 . PrivateKey
2019-07-24 18:49:01 +00:00
2022-10-03 20:05:36 +00:00
// file sharing subsystem is responsible for maintaining active shares and downloads
filesharingSubSystem files . FileSharingSubSystem
2021-09-30 00:57:13 +00:00
2022-10-03 20:05:36 +00:00
tokenManagers sync . Map // [tokenService][]TokenManager
2022-09-06 19:13:32 +00:00
2022-10-03 20:05:36 +00:00
shuttingDown atomic . Bool
2019-05-15 19:58:57 +00:00
}
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
2021-05-08 19:03:09 +00:00
// Protocol Engine *can* associate Group Identifiers with Group Servers, although we don't currently make use of this fact
// other than to route errors back to the UI.
2019-05-15 19:58:57 +00:00
type Engine interface {
2022-10-03 20:05:36 +00:00
ACN ( ) connectivity . ACN
EventManager ( ) event . Manager
Shutdown ( )
2019-01-04 21:44:21 +00:00
}
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
2020-06-16 00:16:04 +00:00
func NewProtocolEngine ( identity primitives . Identity , privateKey ed25519 . PrivateKey , acn connectivity . ACN , eventManager event . Manager , peerAuthorizations map [ string ] model . Authorization ) Engine {
2022-10-03 20:05:36 +00:00
engine := new ( engine )
engine . identity = identity
engine . privateKey = privateKey
engine . ephemeralServices = make ( map [ string ] * connectionLockedService )
engine . queue = event . NewQueue ( )
go engine . eventHandler ( )
engine . acn = acn
// Init the Server running the Simple App.
engine . service = new ( tor . BaseOnionService )
engine . service . Init ( acn , privateKey , & identity )
engine . eventManager = eventManager
engine . eventManager . Subscribe ( event . ProtocolEngineStartListen , engine . queue )
engine . eventManager . Subscribe ( event . ProtocolEngineShutdown , engine . queue )
engine . eventManager . Subscribe ( event . PeerRequest , engine . queue )
engine . eventManager . Subscribe ( event . RetryPeerRequest , engine . queue )
engine . eventManager . Subscribe ( event . InvitePeerToGroup , engine . queue )
engine . eventManager . Subscribe ( event . JoinServer , engine . queue )
engine . eventManager . Subscribe ( event . LeaveServer , engine . queue )
engine . eventManager . Subscribe ( event . SendMessageToGroup , engine . queue )
engine . eventManager . Subscribe ( event . SendMessageToPeer , engine . queue )
engine . eventManager . Subscribe ( event . SendGetValMessageToPeer , engine . queue )
engine . eventManager . Subscribe ( event . SendRetValMessageToPeer , engine . queue )
engine . eventManager . Subscribe ( event . DeleteContact , engine . queue )
engine . eventManager . Subscribe ( event . UpdateConversationAuthorization , engine . queue )
engine . eventManager . Subscribe ( event . BlockUnknownPeers , engine . queue )
engine . eventManager . Subscribe ( event . AllowUnknownPeers , engine . queue )
// File Handling
engine . eventManager . Subscribe ( event . ShareManifest , engine . queue )
engine . eventManager . Subscribe ( event . StopFileShare , engine . queue )
engine . eventManager . Subscribe ( event . StopAllFileShares , engine . queue )
engine . eventManager . Subscribe ( event . ManifestSizeReceived , engine . queue )
engine . eventManager . Subscribe ( event . ManifestSaved , engine . queue )
// Token Server
engine . eventManager . Subscribe ( event . MakeAntispamPayment , engine . queue )
for peer , authorization := range peerAuthorizations {
engine . authorizations . Store ( peer , authorization )
}
return engine
2019-01-04 21:44:21 +00:00
}
2019-05-15 19:58:57 +00:00
func ( e * engine ) ACN ( ) connectivity . ACN {
2022-10-03 20:05:36 +00:00
return e . acn
2019-05-15 19:58:57 +00:00
}
2019-06-05 20:40:55 +00:00
func ( e * engine ) EventManager ( ) event . Manager {
2022-10-03 20:05:36 +00:00
return e . eventManager
2019-05-15 20:12:11 +00:00
}
2019-01-04 21:44:21 +00:00
// eventHandler process events from other subsystems
2019-05-15 19:58:57 +00:00
func ( e * engine ) eventHandler ( ) {
2022-10-03 20:05:36 +00:00
for {
ev := e . queue . Next ( )
// optimistic shutdown...
if e . shuttingDown . Load ( ) {
return
}
switch ev . EventType {
case event . StatusRequest :
e . eventManager . Publish ( event . Event { EventType : event . ProtocolEngineStatus , EventID : ev . EventID } )
case event . PeerRequest :
if torProvider . IsValidHostname ( ev . Data [ event . RemotePeer ] ) {
go e . peerWithOnion ( ev . Data [ event . RemotePeer ] )
}
case event . RetryPeerRequest :
// This event allows engine to treat (automated) retry peering requests differently to user-specified
// peer events
if torProvider . IsValidHostname ( ev . Data [ event . RemotePeer ] ) {
log . Debugf ( "Retrying Peer Request: %v" , ev . Data [ event . RemotePeer ] )
go e . peerWithOnion ( ev . Data [ event . RemotePeer ] )
}
case event . InvitePeerToGroup :
err := e . sendPeerMessage ( ev . Data [ event . RemotePeer ] , pmodel . PeerMessage { ID : ev . EventID , Context : event . ContextInvite , Data : [ ] byte ( ev . Data [ event . GroupInvite ] ) } )
if err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . EventContext : string ( event . InvitePeerToGroup ) , event . RemotePeer : ev . Data [ event . RemotePeer ] , event . EventID : ev . EventID , event . Error : "peer is offline or the connection has yet to finalize" } ) )
}
case event . JoinServer :
signature , err := base64 . StdEncoding . DecodeString ( ev . Data [ event . Signature ] )
if err != nil {
// will result in a full sync
signature = [ ] byte { }
}
2022-11-23 16:01:22 +00:00
// if we have been sent cached tokens, also deserialize them
cachedTokensJson := ev . Data [ event . CachedTokens ]
var cachedTokens [ ] * privacypass . Token
if len ( cachedTokensJson ) != 0 {
json . Unmarshal ( [ ] byte ( cachedTokensJson ) , & cachedTokens )
}
// create a new token handler...
e . NewTokenHandler ( ev . Data [ event . ServerTokenOnion ] , cachedTokens )
go e . peerWithTokenServer ( ev . Data [ event . GroupServer ] , ev . Data [ event . ServerTokenOnion ] , ev . Data [ event . ServerTokenY ] , signature , cachedTokens )
2022-10-03 20:05:36 +00:00
case event . MakeAntispamPayment :
go e . makeAntispamPayment ( ev . Data [ event . GroupServer ] )
case event . LeaveServer :
e . leaveServer ( ev . Data [ event . GroupServer ] )
case event . DeleteContact :
onion := ev . Data [ event . RemotePeer ]
// We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
e . authorizations . Delete ( ev . Data [ event . RemotePeer ] )
e . deleteConnection ( onion )
case event . SendMessageToGroup :
ciphertext , _ := base64 . StdEncoding . DecodeString ( ev . Data [ event . Ciphertext ] )
signature , _ := base64 . StdEncoding . DecodeString ( ev . Data [ event . Signature ] )
2022-10-11 17:58:10 +00:00
// launch a goroutine to post to the server
2022-11-23 16:01:22 +00:00
go e . sendMessageToGroup ( ev . Data [ event . GroupID ] , ev . Data [ event . GroupServer ] , ciphertext , signature , 0 )
2022-10-03 20:05:36 +00:00
case event . SendMessageToPeer :
// TODO: remove this passthrough once the UI is integrated.
context , ok := ev . Data [ event . EventContext ]
if ! ok {
context = event . ContextRaw
}
if err := e . sendPeerMessage ( ev . Data [ event . RemotePeer ] , pmodel . PeerMessage { ID : ev . EventID , Context : context , Data : [ ] byte ( ev . Data [ event . Data ] ) } ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . EventContext : string ( event . SendMessageToPeer ) , event . RemotePeer : ev . Data [ event . RemotePeer ] , event . EventID : ev . EventID , event . Error : "peer is offline or the connection has yet to finalize" } ) )
}
case event . SendGetValMessageToPeer :
if err := e . sendGetValToPeer ( ev . EventID , ev . Data [ event . RemotePeer ] , ev . Data [ event . Scope ] , ev . Data [ event . Path ] ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . EventContext : string ( event . SendGetValMessageToPeer ) , event . RemotePeer : ev . Data [ event . RemotePeer ] , event . EventID : ev . EventID , event . Error : err . Error ( ) } ) )
}
case event . SendRetValMessageToPeer :
if err := e . sendRetValToPeer ( ev . EventID , ev . Data [ event . RemotePeer ] , ev . Data [ event . Data ] , ev . Data [ event . Exists ] ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . EventContext : string ( event . SendRetValMessageToPeer ) , event . RemotePeer : ev . Data [ event . RemotePeer ] , event . EventID : ev . EventID , event . Error : err . Error ( ) } ) )
}
case event . UpdateConversationAuthorization :
accepted , _ := strconv . ParseBool ( ev . Data [ event . Accepted ] )
blocked , _ := strconv . ParseBool ( ev . Data [ event . Blocked ] )
auth := model . AuthUnknown
if blocked {
auth = model . AuthBlocked
} else if accepted {
auth = model . AuthApproved
}
e . authorizations . Store ( ev . Data [ event . RemotePeer ] , auth )
if auth == model . AuthBlocked {
connection , err := e . service . GetConnection ( ev . Data [ event . RemotePeer ] )
if connection != nil && err == nil {
connection . Close ( )
}
// Explicitly send a disconnected event (if we don't do this here then the UI can wait for a while before
// an ongoing Open() connection fails and so the user will see a blocked peer as still connecting (because
// there isn't an active connection and we are stuck waiting for tor to time out)
e . peerDisconnected ( ev . Data [ event . RemotePeer ] )
}
case event . AllowUnknownPeers :
log . Debugf ( "%v now allows unknown connections" , e . identity . Hostname ( ) )
e . blockUnknownContacts = false
case event . BlockUnknownPeers :
log . Debugf ( "%v now forbids unknown connections" , e . identity . Hostname ( ) )
e . blockUnknownContacts = true
case event . ProtocolEngineStartListen :
go e . listenFn ( )
case event . ShareManifest :
e . filesharingSubSystem . ShareFile ( ev . Data [ event . FileKey ] , ev . Data [ event . SerializedManifest ] )
case event . StopFileShare :
e . filesharingSubSystem . StopFileShare ( ev . Data [ event . FileKey ] )
case event . StopAllFileShares :
e . filesharingSubSystem . StopAllFileShares ( )
case event . ManifestSizeReceived :
handle := ev . Data [ event . Handle ]
key := ev . Data [ event . FileKey ]
size , _ := strconv . Atoi ( ev . Data [ event . ManifestSize ] )
if err := e . sendPeerMessage ( handle , e . filesharingSubSystem . FetchManifest ( key , uint64 ( size ) ) ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : ev . Data [ event . RemotePeer ] , event . EventID : ev . EventID , event . Error : err . Error ( ) } ) )
}
case event . ManifestSaved :
handle := ev . Data [ event . Handle ]
key := ev . Data [ event . FileKey ]
serializedManifest := ev . Data [ event . SerializedManifest ]
tempFile := ev . Data [ event . TempFile ]
title := ev . Data [ event . NameSuggestion ]
// NOTE: for now there will probably only ever be a single chunk request. When we enable group
// sharing and rehosting then this loop will serve as a a way of splitting the request among multiple
// contacts
for _ , message := range e . filesharingSubSystem . CompileChunkRequests ( key , serializedManifest , tempFile , title ) {
if err := e . sendPeerMessage ( handle , message ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : ev . Data [ event . RemotePeer ] , event . EventID : ev . EventID , event . Error : err . Error ( ) } ) )
}
}
case event . ProtocolEngineShutdown :
return
default :
return
}
}
2019-01-04 21:44:21 +00:00
}
2020-06-16 00:16:04 +00:00
func ( e * engine ) isBlocked ( onion string ) bool {
2022-10-03 20:05:36 +00:00
authorization , known := e . authorizations . Load ( onion )
if ! known {
// if we block unknown peers we will block this contact
return e . blockUnknownContacts
}
return authorization . ( model . Authorization ) == model . AuthBlocked
2020-06-16 00:16:04 +00:00
}
2020-11-12 21:17:06 +00:00
func ( e * engine ) isAllowed ( onion string ) bool {
2022-10-03 20:05:36 +00:00
authorization , known := e . authorizations . Load ( onion )
if ! known {
log . Errorf ( "attempted to lookup authorization of onion not in map...that should never happen" )
return false
}
if e . blockUnknownContacts {
return authorization . ( model . Authorization ) == model . AuthApproved
}
return authorization . ( model . Authorization ) != model . AuthBlocked
2020-06-16 00:16:04 +00:00
}
2019-07-23 17:57:04 +00:00
func ( e * engine ) createPeerTemplate ( ) * PeerApp {
2022-10-03 20:05:36 +00:00
peerAppTemplate := new ( PeerApp )
peerAppTemplate . IsBlocked = e . isBlocked
peerAppTemplate . IsAllowed = e . isAllowed
peerAppTemplate . MessageHandler = e . handlePeerMessage
peerAppTemplate . OnAcknowledgement = e . ignoreOnShutdown2 ( e . peerAck )
peerAppTemplate . OnAuth = e . ignoreOnShutdown ( e . peerAuthed )
peerAppTemplate . OnConnecting = e . ignoreOnShutdown ( e . peerConnecting )
peerAppTemplate . OnClose = e . ignoreOnShutdown ( e . peerDisconnected )
return peerAppTemplate
2019-07-23 17:57:04 +00:00
}
// Listen sets up an onion listener to process incoming cwtch messages
func ( e * engine ) listenFn ( ) {
2022-10-03 20:05:36 +00:00
err := e . service . Listen ( e . createPeerTemplate ( ) )
if ! e . shuttingDown . Load ( ) {
e . eventManager . Publish ( event . NewEvent ( event . ProtocolEngineStopped , map [ event . Field ] string { event . Identity : e . identity . Hostname ( ) , event . Error : err . Error ( ) } ) )
}
2019-01-04 21:44:21 +00:00
}
// Shutdown tears down the eventHandler goroutine
2019-05-15 19:58:57 +00:00
func ( e * engine ) Shutdown ( ) {
2022-10-03 20:05:36 +00:00
// don't accept any more events...
e . queue . Publish ( event . NewEvent ( event . ProtocolEngineShutdown , map [ event . Field ] string { } ) )
e . service . Shutdown ( )
2022-10-03 18:20:44 +00:00
2022-10-03 20:05:36 +00:00
e . shuttingDown . Store ( true )
2021-11-19 22:04:43 +00:00
2022-10-03 20:05:36 +00:00
e . ephemeralServicesLock . Lock ( )
defer e . ephemeralServicesLock . Unlock ( )
for _ , connection := range e . ephemeralServices {
log . Infof ( "shutting down ephemeral service" )
connection . connectingLock . Lock ( )
connection . service . Shutdown ( )
connection . connectingLock . Unlock ( )
}
2022-04-21 22:12:58 +00:00
2022-10-03 20:05:36 +00:00
e . queue . Shutdown ( )
2019-01-04 21:44:21 +00:00
}
2019-05-15 19:58:57 +00:00
// peerWithOnion is the entry point for cwtchPeer relationships
2019-07-29 19:48:12 +00:00
// needs to be run in a goroutine as will block on Open.
2019-07-17 19:10:52 +00:00
func ( e * engine ) peerWithOnion ( onion string ) {
2022-10-03 20:05:36 +00:00
log . Debugf ( "Called PeerWithOnion for %v" , onion )
if ! e . isBlocked ( onion ) {
e . ignoreOnShutdown ( e . peerConnecting ) ( onion )
connected , err := e . service . Connect ( onion , e . createPeerTemplate ( ) )
// If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless)
if connected && err != nil {
conn , err := e . service . GetConnection ( onion )
if err == nil {
if conn . HasCapability ( cwtchCapability ) {
e . ignoreOnShutdown ( e . peerAuthed ) ( onion )
return
}
}
}
// Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
if ! connected && err != nil {
e . ignoreOnShutdown ( e . peerDisconnected ) ( onion )
}
}
2019-07-17 19:10:52 +00:00
}
2022-09-10 17:15:32 +00:00
func ( e * engine ) makeAntispamPayment ( onion string ) {
2022-10-03 20:05:36 +00:00
log . Debugf ( "making antispam payment" )
e . ephemeralServicesLock . Lock ( )
ephemeralService , ok := e . ephemeralServices [ onion ]
e . ephemeralServicesLock . Unlock ( )
if ephemeralService == nil || ! ok {
log . Debugf ( "could not find associated group for antispam payment" )
return
}
conn , err := ephemeralService . service . GetConnection ( onion )
if err == nil {
tokenApp , ok := ( conn . App ( ) ) . ( * TokenBoardClient )
if ok {
2022-10-11 17:58:10 +00:00
tokenManagerPointer , _ := e . tokenManagers . LoadOrStore ( tokenApp . tokenServiceOnion , NewTokenManager ( ) )
2022-10-03 20:05:36 +00:00
tokenManager := tokenManagerPointer . ( * TokenManager )
log . Debugf ( "checking antispam tokens %v" , tokenManager . NumTokens ( ) )
if tokenManager . NumTokens ( ) < 5 {
go tokenApp . PurchaseTokens ( )
}
}
}
2022-09-10 17:15:32 +00:00
}
2020-07-14 00:46:05 +00:00
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
// needs to be run in a goroutine as will block on Open.
2022-11-23 16:01:22 +00:00
func ( e * engine ) peerWithTokenServer ( onion string , tokenServerOnion string , tokenServerY string , lastKnownSignature [ ] byte , cachedTokens [ ] * privacypass . Token ) {
2022-10-03 20:05:36 +00:00
e . ephemeralServicesLock . Lock ( )
_ , exists := e . ephemeralServices [ onion ]
if exists {
e . ephemeralServicesLock . Unlock ( )
log . Debugf ( "attempted to join a server with an active connection" )
return
}
connectionService := & connectionLockedService { service : new ( tor . BaseOnionService ) }
e . ephemeralServices [ onion ] = connectionService
connectionService . connectingLock . Lock ( )
defer connectionService . connectingLock . Unlock ( )
e . ephemeralServicesLock . Unlock ( )
log . Debugf ( "Peering with Token Server %v %v" , onion , tokenServerOnion )
e . ignoreOnShutdown ( e . serverConnecting ) ( onion )
// Create a new ephemeral service for this connection
eid , epk := primitives . InitializeEphemeralIdentity ( )
connectionService . service . Init ( e . acn , epk , & eid )
Y := new ( ristretto255 . Element )
Y . UnmarshalText ( [ ] byte ( tokenServerY ) )
connected , err := connectionService . service . Connect ( onion , NewTokenBoardClient ( e . acn , Y , tokenServerOnion , lastKnownSignature , e ) )
// If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless)
if connected && err != nil {
conn , err := connectionService . service . GetConnection ( onion )
if err == nil {
// If the server is synced, resend the synced status update
if conn . HasCapability ( groups . CwtchServerSyncedCapability ) {
e . ignoreOnShutdown ( e . serverSynced ) ( onion )
return
}
// If the server is authed, resend the auth status update
if conn . HasCapability ( applications . AuthCapability ) {
// Resend the authed event...
e . ignoreOnShutdown ( e . serverAuthed ) ( onion )
return
}
}
}
// Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
if ! connected && err != nil {
e . ignoreOnShutdown ( e . serverDisconnected ) ( onion )
}
2020-07-14 00:46:05 +00:00
}
2019-07-29 20:21:58 +00:00
func ( e * engine ) ignoreOnShutdown ( f func ( string ) ) func ( string ) {
2022-10-03 20:05:36 +00:00
return func ( x string ) {
if ! e . shuttingDown . Load ( ) {
f ( x )
}
}
2019-07-29 20:21:58 +00:00
}
func ( e * engine ) ignoreOnShutdown2 ( f func ( string , string ) ) func ( string , string ) {
2022-10-03 20:05:36 +00:00
return func ( x , y string ) {
if ! e . shuttingDown . Load ( ) {
f ( x , y )
}
}
2019-07-24 18:49:01 +00:00
}
2019-07-17 19:10:52 +00:00
func ( e * engine ) peerAuthed ( onion string ) {
2022-10-03 20:05:36 +00:00
_ , known := e . authorizations . Load ( onion )
if ! known {
e . authorizations . Store ( onion , model . AuthUnknown )
}
// FIXME: This call uses WAY too much memory, and was responsible for the vast majority
// of allocations in the UI
// This is because Bine ends up reading the entire response into memory and then passes that back
// into Connectivity which eventually extracts just what it needs.
// Ideally we would just read from the control stream directly into reusable buffers.
//details, err := e.acn.GetInfo(onion)
//if err == nil {
// if hops, exists := details["circuit"]; exists {
// e.eventManager.Publish(event.NewEvent(event.ACNInfo, map[event.Field]string{
// event.Handle: onion,
// event.Key: "circuit",
// event.Data: hops,
// }))
// }
//} else {
// log.Errorf("error getting info for onion %v", err)
//}
e . eventManager . Publish ( event . NewEvent ( event . PeerStateChange , map [ event . Field ] string {
event . RemotePeer : string ( onion ) ,
event . ConnectionState : ConnectionStateName [ AUTHENTICATED ] ,
} ) )
2019-07-17 19:10:52 +00:00
}
2019-07-23 17:57:04 +00:00
func ( e * engine ) peerConnecting ( onion string ) {
2022-10-03 20:05:36 +00:00
e . eventManager . Publish ( event . NewEvent ( event . PeerStateChange , map [ event . Field ] string {
2022-10-25 20:59:05 +00:00
event . RemotePeer : onion ,
2022-10-03 20:05:36 +00:00
event . ConnectionState : ConnectionStateName [ CONNECTING ] ,
} ) )
2019-07-23 17:57:04 +00:00
}
2020-07-14 00:46:05 +00:00
func ( e * engine ) serverConnecting ( onion string ) {
2022-10-03 20:05:36 +00:00
e . eventManager . Publish ( event . NewEvent ( event . ServerStateChange , map [ event . Field ] string {
2022-10-25 20:59:05 +00:00
event . GroupServer : onion ,
2022-10-03 20:05:36 +00:00
event . ConnectionState : ConnectionStateName [ CONNECTING ] ,
} ) )
2020-07-14 00:46:05 +00:00
}
2021-06-29 21:38:53 +00:00
func ( e * engine ) serverAuthed ( onion string ) {
2022-10-03 20:05:36 +00:00
e . eventManager . Publish ( event . NewEvent ( event . ServerStateChange , map [ event . Field ] string {
event . GroupServer : onion ,
event . ConnectionState : ConnectionStateName [ AUTHENTICATED ] ,
} ) )
2021-06-29 21:38:53 +00:00
}
2020-07-14 00:46:05 +00:00
func ( e * engine ) serverSynced ( onion string ) {
2022-10-03 20:05:36 +00:00
e . eventManager . Publish ( event . NewEvent ( event . ServerStateChange , map [ event . Field ] string {
event . GroupServer : onion ,
event . ConnectionState : ConnectionStateName [ SYNCED ] ,
} ) )
2020-07-14 00:46:05 +00:00
}
func ( e * engine ) serverDisconnected ( onion string ) {
2022-10-03 20:05:36 +00:00
e . leaveServer ( onion )
2022-04-20 20:10:07 +00:00
2022-10-03 20:05:36 +00:00
e . eventManager . Publish ( event . NewEvent ( event . ServerStateChange , map [ event . Field ] string {
event . GroupServer : onion ,
event . ConnectionState : ConnectionStateName [ DISCONNECTED ] ,
} ) )
2020-07-14 00:46:05 +00:00
}
2019-07-29 20:21:58 +00:00
func ( e * engine ) peerAck ( onion string , eventID string ) {
2022-10-03 20:05:36 +00:00
e . eventManager . Publish ( event . NewEvent ( event . PeerAcknowledgement , map [ event . Field ] string {
event . EventID : eventID ,
event . RemotePeer : onion ,
} ) )
2019-07-22 20:49:23 +00:00
}
2019-07-17 19:10:52 +00:00
func ( e * engine ) peerDisconnected ( onion string ) {
2022-01-17 20:09:29 +00:00
2022-10-03 20:05:36 +00:00
// Clean up any existing get value requests...
e . getValRequests . Range ( func ( key , value interface { } ) bool {
keyString := key . ( string )
if strings . HasPrefix ( keyString , onion ) {
e . getValRequests . Delete ( keyString )
}
return true
} )
2022-04-21 22:12:58 +00:00
2022-10-03 20:05:36 +00:00
// Purge circuit information...
e . eventManager . Publish ( event . NewEvent ( event . ACNInfo , map [ event . Field ] string {
event . Handle : onion ,
event . Key : "circuit" ,
event . Data : "" ,
} ) )
2022-01-17 20:09:29 +00:00
2022-10-03 20:05:36 +00:00
e . eventManager . Publish ( event . NewEvent ( event . PeerStateChange , map [ event . Field ] string {
event . RemotePeer : string ( onion ) ,
event . ConnectionState : ConnectionStateName [ DISCONNECTED ] ,
} ) )
2019-01-04 21:44:21 +00:00
}
2020-03-07 07:41:00 +00:00
func ( e * engine ) sendGetValToPeer ( eventID , onion , scope , path string ) error {
2022-10-03 20:05:36 +00:00
log . Debugf ( "sendGetValMessage to peer %v %v.%v\n" , onion , scope , path )
getVal := peerGetVal { Scope : scope , Path : path }
message , err := json . Marshal ( getVal )
if err != nil {
return err
}
key := onion + eventID
e . getValRequests . Store ( key , message )
err = e . sendPeerMessage ( onion , pmodel . PeerMessage { ID : eventID , Context : event . ContextGetVal , Data : message } )
if err != nil {
e . getValRequests . Delete ( key )
}
return err
2020-03-07 07:41:00 +00:00
}
func ( e * engine ) sendRetValToPeer ( eventID , onion , val , existsStr string ) error {
2022-10-03 20:05:36 +00:00
log . Debugf ( "sendRetValMessage to peer %v (%v) %v %v\n" , onion , eventID , val , existsStr )
exists , _ := strconv . ParseBool ( existsStr )
retVal := peerRetVal { Val : val , Exists : exists }
message , err := json . Marshal ( retVal )
if err != nil {
return err
}
return e . sendPeerMessage ( onion , pmodel . PeerMessage { ID : eventID , Context : event . ContextRetVal , Data : message } )
2020-03-07 07:41:00 +00:00
}
2022-07-30 23:05:39 +00:00
func ( e * engine ) deleteConnection ( id string ) {
2022-10-03 20:05:36 +00:00
conn , err := e . service . GetConnection ( id )
if err == nil {
conn . Close ( )
}
2022-07-30 23:05:39 +00:00
}
2019-05-15 19:58:57 +00:00
// receiveGroupMessage is a callback function that processes GroupMessages from a given server
2020-07-14 00:46:05 +00:00
func ( e * engine ) receiveGroupMessage ( server string , gm * groups . EncryptedGroupMessage ) {
2022-10-03 20:05:36 +00:00
// Publish Event so that a Profile Engine can deal with it.
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
e . eventManager . Publish ( event . NewEvent ( event . EncryptedGroupMessage , map [ event . Field ] string { event . GroupServer : server , event . Ciphertext : base64 . StdEncoding . EncodeToString ( gm . Ciphertext ) , event . Signature : base64 . StdEncoding . EncodeToString ( gm . Signature ) } ) )
2019-01-04 21:44:21 +00:00
}
2019-05-15 19:58:57 +00:00
// sendMessageToGroup attempts to sent the given message to the given group id.
2022-11-23 16:01:22 +00:00
func ( e * engine ) sendMessageToGroup ( groupID string , server string , ct [ ] byte , sig [ ] byte , attempts int ) {
2022-10-03 20:05:36 +00:00
// sending to groups can fail for a few reasons (slow server, not enough tokens, etc.)
// rather than trying to keep all that logic in method we simply back-off and try again
// but if we fail more than 5 times then we report back to the client so they can investigate other options.
// Note: This flow only applies to online-and-connected servers (this method will return faster if the server is not
// online)
if attempts >= 5 {
log . Errorf ( "failed to post a message to a group after %v attempts" , attempts )
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToGroupError , map [ event . Field ] string { event . GroupID : groupID , event . GroupServer : server , event . Error : "could not make payment to server" , event . Signature : base64 . StdEncoding . EncodeToString ( sig ) } ) )
return
}
e . ephemeralServicesLock . Lock ( )
ephemeralService , ok := e . ephemeralServices [ server ]
e . ephemeralServicesLock . Unlock ( )
if ephemeralService == nil || ! ok {
log . Debugf ( "could not send message to group: serve not found" )
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToGroupError , map [ event . Field ] string { event . GroupID : groupID , event . GroupServer : server , event . Error : "server-not-found" , event . Signature : base64 . StdEncoding . EncodeToString ( sig ) } ) )
return
}
conn , err := ephemeralService . service . WaitForCapabilityOrClose ( server , groups . CwtchServerSyncedCapability )
if err == nil {
tokenApp , ok := ( conn . App ( ) ) . ( * TokenBoardClient )
if ok {
2022-10-11 17:58:10 +00:00
if spent , numtokens := tokenApp . Post ( groupID , ct , sig ) ; ! spent {
2022-10-03 20:05:36 +00:00
// we failed to post, probably because we ran out of tokens... so make a payment
go tokenApp . PurchaseTokens ( )
// backoff
time . Sleep ( time . Second * 5 )
// try again
log . Debugf ( "sending message to group error attempt: %v" , attempts )
2022-11-23 16:01:22 +00:00
e . sendMessageToGroup ( groupID , server , ct , sig , attempts + 1 )
2022-10-03 20:05:36 +00:00
} else {
if numtokens < 5 {
go tokenApp . PurchaseTokens ( )
}
}
// regardless we return....
return
}
}
log . Debugf ( "could not send message to group" )
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToGroupError , map [ event . Field ] string { event . GroupID : groupID , event . GroupServer : server , event . Error : "server-connection-not-valid" , event . Signature : base64 . StdEncoding . EncodeToString ( sig ) } ) )
2019-01-04 21:44:21 +00:00
}
2021-09-30 00:57:13 +00:00
// TODO this is becoming cluttered
2020-03-07 07:41:00 +00:00
func ( e * engine ) handlePeerMessage ( hostname string , eventID string , context string , message [ ] byte ) {
2022-10-03 20:05:36 +00:00
log . Debugf ( "New message from peer: %v %v" , hostname , context )
if context == event . ContextAck {
e . peerAck ( hostname , eventID )
} else if context == event . ContextRetVal {
req , ok := e . getValRequests . Load ( hostname + eventID )
if ok {
reqStr := req . ( [ ] byte )
e . handlePeerRetVal ( hostname , reqStr , message )
e . getValRequests . Delete ( hostname + eventID )
} else {
log . Errorf ( "could not find val request for %v %s" , hostname , eventID )
}
} else if context == event . ContextGetVal {
var getVal peerGetVal
err := json . Unmarshal ( message , & getVal )
if err == nil {
ev := event . NewEventList ( event . NewGetValMessageFromPeer , event . RemotePeer , hostname , event . Scope , getVal . Scope , event . Path , getVal . Path )
ev . EventID = eventID
e . eventManager . Publish ( ev )
}
} else if context == event . ContextRequestManifest {
for _ , message := range e . filesharingSubSystem . RequestManifestParts ( eventID ) {
if err := e . sendPeerMessage ( hostname , message ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : hostname , event . EventID : eventID , event . Error : err . Error ( ) } ) )
}
}
} else if context == event . ContextSendManifest {
if fileKey , manifest := e . filesharingSubSystem . ReceiveManifestPart ( eventID , message ) ; len ( manifest ) != 0 {
// We have a valid manifest
e . eventManager . Publish ( event . NewEvent ( event . ManifestReceived , map [ event . Field ] string { event . Handle : hostname , event . FileKey : fileKey , event . SerializedManifest : manifest } ) )
}
} else if context == event . ContextRequestFile {
chunks := e . filesharingSubSystem . ProcessChunkRequest ( eventID , message )
go func ( ) {
for _ , message := range chunks {
if err := e . sendPeerMessage ( hostname , message ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : hostname , event . EventID : eventID , event . Error : err . Error ( ) } ) )
}
}
} ( )
} else if context == event . ContextSendFile {
fileKey , progress , totalChunks , _ , title := e . filesharingSubSystem . ProcessChunk ( eventID , message )
if len ( fileKey ) != 0 {
e . eventManager . Publish ( event . NewEvent ( event . FileDownloadProgressUpdate , map [ event . Field ] string { event . FileKey : fileKey , event . Progress : strconv . Itoa ( int ( progress ) ) , event . FileSizeInChunks : strconv . Itoa ( int ( totalChunks ) ) , event . NameSuggestion : title } ) )
if progress == totalChunks {
if tempFile , filePath , success := e . filesharingSubSystem . VerifyFile ( fileKey ) ; success {
log . Debugf ( "file verified and downloaded!" )
e . eventManager . Publish ( event . NewEvent ( event . FileDownloaded , map [ event . Field ] string { event . FileKey : fileKey , event . FilePath : filePath , event . TempFile : tempFile } ) )
} else {
log . Debugf ( "file failed to verify!" )
e . eventManager . Publish ( event . NewEvent ( event . FileVerificationFailed , map [ event . Field ] string { event . FileKey : fileKey } ) )
}
}
}
} else {
// Fall through handler for the default text conversation.
e . eventManager . Publish ( event . NewEvent ( event . NewMessageFromPeerEngine , map [ event . Field ] string { event . TimestampReceived : time . Now ( ) . Format ( time . RFC3339Nano ) , event . RemotePeer : hostname , event . Data : string ( message ) } ) )
// Send an explicit acknowledgement
// Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow
if err := e . sendPeerMessage ( hostname , pmodel . PeerMessage { ID : eventID , Context : event . ContextAck , Data : [ ] byte { } } ) ; err != nil {
e . eventManager . Publish ( event . NewEvent ( event . SendMessageToPeerError , map [ event . Field ] string { event . RemotePeer : hostname , event . EventID : eventID , event . Error : err . Error ( ) } ) )
}
}
2019-01-04 21:44:21 +00:00
}
2020-03-07 07:41:00 +00:00
func ( e * engine ) handlePeerRetVal ( hostname string , getValData , retValData [ ] byte ) {
2022-10-03 20:05:36 +00:00
var getVal peerGetVal
var retVal peerRetVal
2020-03-07 07:41:00 +00:00
2022-10-03 20:05:36 +00:00
err := json . Unmarshal ( getValData , & getVal )
if err != nil {
log . Errorf ( "Unmarshalling our own getVal request: %v\n" , err )
return
}
err = json . Unmarshal ( retValData , & retVal )
if err != nil {
log . Errorf ( "Unmarshalling peer response to getVal request" )
return
}
2020-03-07 07:41:00 +00:00
2022-10-03 20:05:36 +00:00
e . eventManager . Publish ( event . NewEventList ( event . NewRetValMessageFromPeer , event . RemotePeer , hostname , event . Scope , getVal . Scope , event . Path , getVal . Path , event . Exists , strconv . FormatBool ( retVal . Exists ) , event . Data , retVal . Val ) )
2020-03-07 07:41:00 +00:00
}
2021-06-01 18:34:35 +00:00
2021-12-17 00:07:08 +00:00
// leaveServer disconnects from a server and deletes the ephemeral service
2021-06-01 18:34:35 +00:00
func ( e * engine ) leaveServer ( server string ) {
2022-10-03 20:05:36 +00:00
e . ephemeralServicesLock . Lock ( )
defer e . ephemeralServicesLock . Unlock ( )
ephemeralService , ok := e . ephemeralServices [ server ]
if ok {
ephemeralService . service . Shutdown ( )
delete ( e . ephemeralServices , server )
}
2021-06-01 18:34:35 +00:00
}
2021-09-30 00:57:13 +00:00
func ( e * engine ) sendPeerMessage ( handle string , message pmodel . PeerMessage ) error {
2022-10-03 20:05:36 +00:00
conn , err := e . service . WaitForCapabilityOrClose ( handle , cwtchCapability )
if err == nil {
peerApp , ok := ( conn . App ( ) ) . ( * PeerApp )
if ok {
return peerApp . SendMessage ( message )
}
log . Debugf ( "could not derive peer app: %v" , err )
return fmt . Errorf ( "could not find peer app to send message to: %v" , handle )
}
log . Debugf ( "could not send peer message: %v" , err )
return err
2021-09-30 00:57:13 +00:00
}