package connections import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/protocol/groups" "cwtch.im/tapir" "cwtch.im/tapir/networks/tor" "cwtch.im/tapir/primitives" "encoding/base64" "encoding/json" "errors" "fmt" "git.openprivacy.ca/openprivacy/connectivity" torProvider "git.openprivacy.ca/openprivacy/connectivity/tor" "git.openprivacy.ca/openprivacy/log" "github.com/gtank/ristretto255" "golang.org/x/crypto/ed25519" "strconv" "sync" "time" ) type engine struct { queue event.Queue // Engine Attributes identity primitives.Identity acn connectivity.ACN // Engine State started bool // Authorization list of contacts to authorization status authorizations sync.Map // string(onion) => model.Authorization // Block Unknown Contacts blockUnknownContacts bool // Pointer to the Global Event Manager eventManager event.Manager // Nextgen Tapir Service service tapir.Service // Nextgen Tapir Service ephemeralServices sync.Map // string(onion) => tapir.Service // Required for listen(), inaccessible from identity privateKey ed25519.PrivateKey shuttingDown bool } // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. // Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages type Engine interface { ACN() connectivity.ACN EventManager() event.Manager Shutdown() } // NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, peerAuthorizations map[string]model.Authorization) Engine { engine := new(engine) engine.identity = identity engine.privateKey = privateKey engine.queue = event.NewQueue() go engine.eventHandler() engine.acn = acn // Init the Server running the Simple App. engine.service = new(tor.BaseOnionService) engine.service.Init(acn, privateKey, &identity) engine.eventManager = eventManager engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue) engine.eventManager.Subscribe(event.PeerRequest, engine.queue) engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue) engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue) engine.eventManager.Subscribe(event.JoinServer, engine.queue) engine.eventManager.Subscribe(event.LeaveServer, engine.queue) engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue) engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue) engine.eventManager.Subscribe(event.SendGetValMessageToPeer, engine.queue) engine.eventManager.Subscribe(event.SendRetValMessageToPeer, engine.queue) engine.eventManager.Subscribe(event.DeleteContact, engine.queue) engine.eventManager.Subscribe(event.DeleteGroup, engine.queue) engine.eventManager.Subscribe(event.SetPeerAuthorization, engine.queue) engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue) engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue) for peer, authorization := range peerAuthorizations { engine.authorizations.Store(peer, authorization) } return engine } func (e *engine) ACN() connectivity.ACN { return e.acn } func (e *engine) EventManager() event.Manager { return e.eventManager } // eventHandler process events from other subsystems func (e *engine) eventHandler() { for { ev := e.queue.Next() switch ev.EventType { case event.StatusRequest: e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID}) case event.PeerRequest: if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { go e.peerWithOnion(ev.Data[event.RemotePeer]) } case event.RetryPeerRequest: // This event allows engine to treat (automated) retry peering requests differently to user-specified // peer events if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { log.Debugf("Retrying Peer Request: %v", ev.Data[event.RemotePeer]) go e.peerWithOnion(ev.Data[event.RemotePeer]) } case event.InvitePeerToGroup: e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite])) case event.JoinServer: e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY]) case event.LeaveServer: es, ok := e.ephemeralServices.Load(ev.Data[event.GroupServer]) if ok { ephemeralService := es.(tapir.Service) ephemeralService.Shutdown() e.ephemeralServices.Delete(ev.Data[event.GroupServer]) } case event.DeleteContact: onion := ev.Data[event.RemotePeer] // We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on. e.authorizations.Delete(ev.Data[event.RemotePeer]) e.deleteConnection(onion) case event.DeleteGroup: // TODO: There isn't a way here to determine if other Groups are using a server connection... case event.SendMessageToGroup: ciphertext,_ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) signature,_ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) err := e.sendMessageToGroup(ev.Data[event.GroupServer],ciphertext, signature) if err != nil { e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupServer: ev.Data[event.GroupServer], event.EventID: ev.EventID, event.Error: err.Error()})) } case event.SendMessageToPeer: // TODO: remove this passthrough once the UI is integrated. context, ok := ev.Data[event.EventContext] if !ok { context = event.ContextRaw } err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.Data])) if err != nil { e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) } case event.SendGetValMessageToPeer: e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path]) case event.SendRetValMessageToPeer: e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]) case event.SetPeerAuthorization: auth := model.Authorization(ev.Data[event.Authorization]) e.authorizations.Store(ev.Data[event.RemotePeer], auth) if auth == model.AuthBlocked { connection, err := e.service.GetConnection(ev.Data[event.RemotePeer]) if connection != nil && err == nil { connection.Close() } // Explicitly send a disconnected event (if we don't do this here then the UI can wait for a while before // an ongoing Open() connection fails and so the user will see a blocked peer as still connecting (because // there isn't an active connection and we are stuck waiting for tor to time out) e.peerDisconnected(ev.Data[event.RemotePeer]) } case event.AllowUnknownPeers: e.blockUnknownContacts = false case event.BlockUnknownPeers: e.blockUnknownContacts = true case event.ProtocolEngineStartListen: go e.listenFn() default: return } } } func (e *engine) isBlocked(onion string) bool { authorization, known := e.authorizations.Load(onion) if !known { // if we block unknown peers we will block this contact return e.blockUnknownContacts } return authorization.(model.Authorization) == model.AuthBlocked } func (e *engine) isAllowed(onion string) bool { authorization, known := e.authorizations.Load(onion) if !known { log.Errorf("attempted to lookup authorization of onion not in map...that should never happen") return false } if e.blockUnknownContacts { return authorization.(model.Authorization) == model.AuthApproved } return authorization.(model.Authorization) != model.AuthBlocked } func (e *engine) createPeerTemplate() *PeerApp { peerAppTemplate := new(PeerApp) peerAppTemplate.IsBlocked = e.isBlocked peerAppTemplate.IsAllowed = e.isAllowed peerAppTemplate.MessageHandler = e.handlePeerMessage peerAppTemplate.OnAcknowledgement = e.ignoreOnShutdown2(e.peerAck) peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed) peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting) peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected) peerAppTemplate.RetValHandler = e.handlePeerRetVal return peerAppTemplate } // Listen sets up an onion listener to process incoming cwtch messages func (e *engine) listenFn() { err := e.service.Listen(e.createPeerTemplate()) if !e.shuttingDown { e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()})) } return } // Shutdown tears down the eventHandler goroutine func (e *engine) Shutdown() { e.shuttingDown = true e.service.Shutdown() e.queue.Shutdown() } // peerWithOnion is the entry point for cwtchPeer relationships // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithOnion(onion string) { log.Debugf("Called PeerWithOnion for %v", onion) if !e.isBlocked(onion) { e.ignoreOnShutdown(e.peerConnecting)(onion) connected, err := e.service.Connect(onion, e.createPeerTemplate()) // If we are already connected...check if we are authed and issue an auth event // (This allows the ui to be stateless) if connected && err != nil { conn, err := e.service.GetConnection(onion) if err == nil { if conn.HasCapability(cwtchCapability) { e.ignoreOnShutdown(e.peerAuthed)(onion) return } } } // Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists) if !connected && err != nil { e.ignoreOnShutdown(e.peerDisconnected)(onion) } } } // peerWithTokenServer is the entry point for cwtchPeer - server relationships // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string) { service, exists := e.ephemeralServices.Load(onion) if exists { connection := service.(*tor.BaseOnionService) if conn, err := connection.GetConnection(onion); err == nil { if conn.IsClosed() == false { return } } // Otherwise...let's reconnect } log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion) e.ignoreOnShutdown(e.serverConnecting)(onion) // Create a new ephemeral service for this connection ephemeralService := new(tor.BaseOnionService) eid, epk := primitives.InitializeEphemeralIdentity() ephemeralService.Init(e.acn, epk, &eid) Y := ristretto255.NewElement() Y.UnmarshalText([]byte(tokenServerY)) connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, e.receiveGroupMessage, e.serverSynced)) e.ephemeralServices.Store(onion, ephemeralService) // If we are already connected...check if we are authed and issue an auth event // (This allows the ui to be stateless) if connected && err != nil { conn, err := ephemeralService.GetConnection(onion) if err == nil { if conn.HasCapability(groups.CwtchServerSyncedCapability) { e.ignoreOnShutdown(e.serverConnected)(onion) return } } } // Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists) if !connected && err != nil { e.ignoreOnShutdown(e.serverDisconnected)(onion) } } func (e *engine) ignoreOnShutdown(f func(string)) func(string) { return func(x string) { if !e.shuttingDown { f(x) } } } func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) { return func(x, y string) { if !e.shuttingDown { f(x, y) } } } func (e *engine) peerAuthed(onion string) { _, known := e.authorizations.Load(onion) if !known { e.authorizations.Store(onion, model.AuthUnknown) } e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ event.RemotePeer: string(onion), event.ConnectionState: ConnectionStateName[AUTHENTICATED], })) } func (e *engine) peerConnecting(onion string) { e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ event.RemotePeer: string(onion), event.ConnectionState: ConnectionStateName[CONNECTING], })) } func (e *engine) serverConnecting(onion string) { e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ event.GroupServer: string(onion), event.ConnectionState: ConnectionStateName[CONNECTING], })) } func (e *engine) serverConnected(onion string) { e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ event.GroupServer: onion, event.ConnectionState: ConnectionStateName[CONNECTED], })) } func (e *engine) serverSynced(onion string) { log.Debugf("SERVER SYNCED: %v", onion) e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ event.GroupServer: onion, event.ConnectionState: ConnectionStateName[SYNCED], })) } func (e *engine) serverDisconnected(onion string) { e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ event.GroupServer: onion, event.ConnectionState: ConnectionStateName[DISCONNECTED], })) } func (e *engine) peerAck(onion string, eventID string) { e.eventManager.Publish(event.NewEvent(event.PeerAcknowledgement, map[event.Field]string{ event.EventID: eventID, event.RemotePeer: onion, })) } func (e *engine) peerDisconnected(onion string) { e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ event.RemotePeer: string(onion), event.ConnectionState: ConnectionStateName[DISCONNECTED], })) } // sendMessageToPeer sends a message to a peer under a given context func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error { conn, err := e.service.WaitForCapabilityOrClose(onion, cwtchCapability) if err == nil { peerApp, ok := (conn.App()).(*PeerApp) if ok { peerApp.SendMessage(PeerMessage{eventID, context, message}) return nil } return errors.New("failed type assertion conn.App != PeerApp") } return err } func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error { log.Debugf("sendGetValMessage to peer %v %v%v\n", onion, scope, path) getVal := peerGetVal{Scope: scope, Path: path} message, err := json.Marshal(getVal) if err != nil { return err } return e.sendMessageToPeer(eventID, onion, event.ContextGetVal, message) } func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error { log.Debugf("sendRetValMessage to peer %v (%v) %v %v\n", onion, eventID, val, existsStr) exists, _ := strconv.ParseBool(existsStr) retVal := peerRetVal{Val: val, Exists: exists} message, err := json.Marshal(retVal) if err != nil { return err } return e.sendMessageToPeer(eventID, onion, event.ContextRetVal, message) } func (e *engine) deleteConnection(id string) { conn, err := e.service.GetConnection(id) if err == nil { conn.Close() } } // receiveGroupMessage is a callback function that processes GroupMessages from a given server func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) { // Publish Event so that a Profile Engine can deal with it. // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine! e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)})) } // sendMessageToGroup attempts to sent the given message to the given group id. func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) error { es, ok := e.ephemeralServices.Load(server) if !ok { return fmt.Errorf("no service exists for group %v", server) } ephemeralService := es.(tapir.Service) conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) if ok { attempts := 0 for tokenApp.Post(ct, sig) == false { // TODO This should eventually be wired back into the UI to allow it to error tokenApp.MakePayment() time.Sleep(time.Second * 5) attempts++ if attempts == 5 { return errors.New("failed to post to token board") } } return nil } return errors.New("failed type assertion conn.App != TokenBoardClientApp") } return err } func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) { log.Debugf("New message from peer: %v %v", hostname, context) if context == event.ContextInvite { e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(message)})) } else if context == event.ContextGetVal { var getVal peerGetVal err := json.Unmarshal(message, &getVal) if err == nil { ev := event.NewEventList(event.NewGetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path) ev.EventID = eventID e.eventManager.Publish(ev) } } else { e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)})) } } func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte) { var getVal peerGetVal var retVal peerRetVal err := json.Unmarshal(getValData, &getVal) if err != nil { log.Errorf("Unmarshalling our own getVal request: %v\n", err) return } err = json.Unmarshal(retValData, &retVal) if err != nil { log.Errorf("Unmarshalling peer response to getVal request") return } e.eventManager.Publish(event.NewEventList(event.NewRetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path, event.Exists, strconv.FormatBool(retVal.Exists), event.Data, retVal.Val)) }