cwtch/protocol/connections/engine.go

516 lines
19 KiB
Go
Raw Normal View History

2019-01-04 21:44:21 +00:00
package connections
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
2020-07-14 00:46:05 +00:00
"cwtch.im/cwtch/protocol/groups"
"encoding/base64"
"encoding/json"
2019-07-23 17:57:04 +00:00
"errors"
2021-04-09 01:22:08 +00:00
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/openprivacy/connectivity"
2020-09-21 21:26:28 +00:00
torProvider "git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
2020-07-14 00:46:05 +00:00
"github.com/gtank/ristretto255"
2019-01-04 21:44:21 +00:00
"golang.org/x/crypto/ed25519"
"strconv"
2019-01-08 18:58:01 +00:00
"sync"
"time"
2019-01-04 21:44:21 +00:00
)
type engine struct {
2020-07-14 00:46:05 +00:00
queue event.Queue
2019-01-04 21:44:21 +00:00
// Engine Attributes
2019-08-08 18:39:38 +00:00
identity primitives.Identity
acn connectivity.ACN
2019-01-04 21:44:21 +00:00
// Authorization list of contacts to authorization status
authorizations sync.Map // string(onion) => model.Authorization
2019-01-08 18:58:01 +00:00
2019-08-21 19:25:26 +00:00
// Block Unknown Contacts
blockUnknownContacts bool
2019-01-04 21:44:21 +00:00
// Pointer to the Global Event Manager
eventManager event.Manager
2019-07-17 19:10:52 +00:00
// Nextgen Tapir Service
service tapir.Service
2020-07-14 00:46:05 +00:00
// Nextgen Tapir Service
ephemeralServices sync.Map // string(onion) => tapir.Service
// Required for listen(), inaccessible from identity
privateKey ed25519.PrivateKey
2019-07-24 18:49:01 +00:00
shuttingDown bool
}
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
// Protocol Engine *can* associate Group Identifiers with Group Servers, although we don't currently make use of this fact
// other than to route errors back to the UI.
type Engine interface {
ACN() connectivity.ACN
EventManager() event.Manager
Shutdown()
2019-01-04 21:44:21 +00:00
}
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, peerAuthorizations map[string]model.Authorization) Engine {
engine := new(engine)
engine.identity = identity
2019-01-04 21:44:21 +00:00
engine.privateKey = privateKey
engine.queue = event.NewQueue()
2019-01-04 21:44:21 +00:00
go engine.eventHandler()
engine.acn = acn
2019-01-04 21:44:21 +00:00
2019-07-17 19:10:52 +00:00
// Init the Server running the Simple App.
engine.service = new(tor.BaseOnionService)
2019-08-08 18:39:38 +00:00
engine.service.Init(acn, privateKey, &identity)
2019-07-17 19:10:52 +00:00
2019-01-04 21:44:21 +00:00
engine.eventManager = eventManager
2019-01-08 18:58:01 +00:00
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue)
engine.eventManager.Subscribe(event.PeerRequest, engine.queue)
engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue)
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue)
engine.eventManager.Subscribe(event.JoinServer, engine.queue)
2020-10-29 21:51:59 +00:00
engine.eventManager.Subscribe(event.LeaveServer, engine.queue)
engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue)
engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue)
engine.eventManager.Subscribe(event.SendGetValMessageToPeer, engine.queue)
engine.eventManager.Subscribe(event.SendRetValMessageToPeer, engine.queue)
engine.eventManager.Subscribe(event.DeleteContact, engine.queue)
engine.eventManager.Subscribe(event.DeleteGroup, engine.queue)
engine.eventManager.Subscribe(event.SetPeerAuthorization, engine.queue)
2019-08-21 19:25:26 +00:00
engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue)
engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue)
for peer, authorization := range peerAuthorizations {
engine.authorizations.Store(peer, authorization)
2019-01-08 18:58:01 +00:00
}
2019-01-04 21:44:21 +00:00
return engine
}
func (e *engine) ACN() connectivity.ACN {
return e.acn
}
func (e *engine) EventManager() event.Manager {
return e.eventManager
}
2019-01-04 21:44:21 +00:00
// eventHandler process events from other subsystems
func (e *engine) eventHandler() {
2019-01-04 21:44:21 +00:00
for {
ev := e.queue.Next()
switch ev.EventType {
case event.StatusRequest:
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
case event.PeerRequest:
2020-09-21 21:26:28 +00:00
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
go e.peerWithOnion(ev.Data[event.RemotePeer])
}
case event.RetryPeerRequest:
// This event allows engine to treat (automated) retry peering requests differently to user-specified
// peer events
2020-09-21 21:26:28 +00:00
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
log.Debugf("Retrying Peer Request: %v", ev.Data[event.RemotePeer])
go e.peerWithOnion(ev.Data[event.RemotePeer])
}
2019-01-04 21:44:21 +00:00
case event.InvitePeerToGroup:
2019-07-23 17:57:04 +00:00
e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite]))
2019-01-04 21:44:21 +00:00
case event.JoinServer:
signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
if err != nil {
// will result in a full sync
signature = []byte{}
}
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
2020-10-29 21:51:59 +00:00
case event.LeaveServer:
2021-06-01 18:34:35 +00:00
e.leaveServer(ev.Data[event.GroupServer])
case event.DeleteContact:
onion := ev.Data[event.RemotePeer]
2019-08-21 19:25:26 +00:00
// We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
e.authorizations.Delete(ev.Data[event.RemotePeer])
e.deleteConnection(onion)
case event.DeleteGroup:
// TODO: There isn't a way here to determine if other Groups are using a server connection...
2019-01-04 21:44:21 +00:00
case event.SendMessageToGroup:
ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext])
signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature)
2019-01-04 21:44:21 +00:00
case event.SendMessageToPeer:
// TODO: remove this passthrough once the UI is integrated.
context, ok := ev.Data[event.EventContext]
if !ok {
2019-07-23 17:57:04 +00:00
context = event.ContextRaw
}
2019-07-30 23:47:12 +00:00
err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.Data]))
2019-07-17 19:10:52 +00:00
if err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
2019-01-04 21:44:21 +00:00
}
case event.SendGetValMessageToPeer:
e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path])
case event.SendRetValMessageToPeer:
e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists])
case event.SetPeerAuthorization:
auth := model.Authorization(ev.Data[event.Authorization])
e.authorizations.Store(ev.Data[event.RemotePeer], auth)
if auth == model.AuthBlocked {
connection, err := e.service.GetConnection(ev.Data[event.RemotePeer])
if connection != nil && err == nil {
connection.Close()
}
// Explicitly send a disconnected event (if we don't do this here then the UI can wait for a while before
// an ongoing Open() connection fails and so the user will see a blocked peer as still connecting (because
// there isn't an active connection and we are stuck waiting for tor to time out)
e.peerDisconnected(ev.Data[event.RemotePeer])
}
2019-08-21 19:25:26 +00:00
case event.AllowUnknownPeers:
log.Debugf("%v now allows unknown connections", e.identity.Hostname())
2019-08-21 19:25:26 +00:00
e.blockUnknownContacts = false
case event.BlockUnknownPeers:
log.Debugf("%v now forbids unknown connections", e.identity.Hostname())
2019-08-21 19:25:26 +00:00
e.blockUnknownContacts = true
2019-01-04 21:44:21 +00:00
case event.ProtocolEngineStartListen:
go e.listenFn()
default:
return
}
}
}
func (e *engine) isBlocked(onion string) bool {
authorization, known := e.authorizations.Load(onion)
if !known {
// if we block unknown peers we will block this contact
return e.blockUnknownContacts
}
return authorization.(model.Authorization) == model.AuthBlocked
}
func (e *engine) isAllowed(onion string) bool {
authorization, known := e.authorizations.Load(onion)
if !known {
log.Errorf("attempted to lookup authorization of onion not in map...that should never happen")
return false
}
if e.blockUnknownContacts {
return authorization.(model.Authorization) == model.AuthApproved
}
return authorization.(model.Authorization) != model.AuthBlocked
}
2019-07-23 17:57:04 +00:00
func (e *engine) createPeerTemplate() *PeerApp {
2019-07-17 19:10:52 +00:00
peerAppTemplate := new(PeerApp)
peerAppTemplate.IsBlocked = e.isBlocked
peerAppTemplate.IsAllowed = e.isAllowed
2019-07-17 19:10:52 +00:00
peerAppTemplate.MessageHandler = e.handlePeerMessage
2019-07-29 20:21:58 +00:00
peerAppTemplate.OnAcknowledgement = e.ignoreOnShutdown2(e.peerAck)
peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
peerAppTemplate.RetValHandler = e.handlePeerRetVal
2019-07-23 17:57:04 +00:00
return peerAppTemplate
}
// Listen sets up an onion listener to process incoming cwtch messages
func (e *engine) listenFn() {
err := e.service.Listen(e.createPeerTemplate())
2019-07-24 19:26:44 +00:00
if !e.shuttingDown {
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()}))
}
2019-01-04 21:44:21 +00:00
}
// Shutdown tears down the eventHandler goroutine
func (e *engine) Shutdown() {
2019-07-24 18:49:01 +00:00
e.shuttingDown = true
2019-07-24 19:26:44 +00:00
e.service.Shutdown()
2019-01-04 21:44:21 +00:00
e.queue.Shutdown()
}
// peerWithOnion is the entry point for cwtchPeer relationships
2019-07-29 19:48:12 +00:00
// needs to be run in a goroutine as will block on Open.
2019-07-17 19:10:52 +00:00
func (e *engine) peerWithOnion(onion string) {
2020-09-21 21:26:28 +00:00
log.Debugf("Called PeerWithOnion for %v", onion)
if !e.isBlocked(onion) {
e.ignoreOnShutdown(e.peerConnecting)(onion)
connected, err := e.service.Connect(onion, e.createPeerTemplate())
// If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless)
if connected && err != nil {
conn, err := e.service.GetConnection(onion)
if err == nil {
if conn.HasCapability(cwtchCapability) {
e.ignoreOnShutdown(e.peerAuthed)(onion)
return
}
}
}
// Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
if !connected && err != nil {
e.ignoreOnShutdown(e.peerDisconnected)(onion)
}
2019-07-29 19:48:12 +00:00
}
2019-07-17 19:10:52 +00:00
}
2020-07-14 00:46:05 +00:00
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
// needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
2020-11-02 23:53:13 +00:00
service, exists := e.ephemeralServices.Load(onion)
if exists {
connection := service.(*tor.BaseOnionService)
if conn, err := connection.GetConnection(onion); err == nil {
2021-06-01 18:34:35 +00:00
// We are already peered and synced so return...
// This will only not-trigger it lastKnownSignature has been wiped, which only happens when ResyncServer is called
// in CwtchPeer.
2021-06-02 18:34:57 +00:00
if !conn.IsClosed() && len(lastKnownSignature) != 0 {
2020-11-02 23:53:13 +00:00
return
}
2021-06-02 18:34:57 +00:00
// Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)...
e.leaveServer(onion)
2020-11-02 23:53:13 +00:00
}
// Otherwise...let's reconnect
}
2020-09-21 21:26:28 +00:00
log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion)
2020-07-14 00:46:05 +00:00
e.ignoreOnShutdown(e.serverConnecting)(onion)
// Create a new ephemeral service for this connection
ephemeralService := new(tor.BaseOnionService)
eid, epk := primitives.InitializeEphemeralIdentity()
ephemeralService.Init(e.acn, epk, &eid)
Y := ristretto255.NewElement()
Y.UnmarshalText([]byte(tokenServerY))
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverSynced, e.serverDisconnected))
2020-07-14 00:46:05 +00:00
e.ephemeralServices.Store(onion, ephemeralService)
// If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless)
if connected && err != nil {
conn, err := ephemeralService.GetConnection(onion)
if err == nil {
if conn.HasCapability(groups.CwtchServerSyncedCapability) {
e.ignoreOnShutdown(e.serverConnected)(onion)
return
}
}
}
// Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
if !connected && err != nil {
e.ignoreOnShutdown(e.serverDisconnected)(onion)
}
}
2019-07-29 20:21:58 +00:00
func (e *engine) ignoreOnShutdown(f func(string)) func(string) {
return func(x string) {
if !e.shuttingDown {
f(x)
}
}
}
func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) {
return func(x, y string) {
if !e.shuttingDown {
f(x, y)
}
}
2019-07-24 18:49:01 +00:00
}
2019-07-17 19:10:52 +00:00
func (e *engine) peerAuthed(onion string) {
_, known := e.authorizations.Load(onion)
if !known {
e.authorizations.Store(onion, model.AuthUnknown)
}
2019-07-17 19:10:52 +00:00
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(onion),
event.ConnectionState: ConnectionStateName[AUTHENTICATED],
}))
}
2019-07-23 17:57:04 +00:00
func (e *engine) peerConnecting(onion string) {
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(onion),
event.ConnectionState: ConnectionStateName[CONNECTING],
}))
}
2020-07-14 00:46:05 +00:00
func (e *engine) serverConnecting(onion string) {
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: string(onion),
event.ConnectionState: ConnectionStateName[CONNECTING],
}))
}
func (e *engine) serverConnected(onion string) {
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: onion,
event.ConnectionState: ConnectionStateName[CONNECTED],
}))
}
func (e *engine) serverSynced(onion string) {
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: onion,
event.ConnectionState: ConnectionStateName[SYNCED],
}))
}
func (e *engine) serverDisconnected(onion string) {
e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: onion,
event.ConnectionState: ConnectionStateName[DISCONNECTED],
}))
}
2019-07-29 20:21:58 +00:00
func (e *engine) peerAck(onion string, eventID string) {
e.eventManager.Publish(event.NewEvent(event.PeerAcknowledgement, map[event.Field]string{
2019-07-29 20:21:58 +00:00
event.EventID: eventID,
event.RemotePeer: onion,
}))
}
2019-07-17 19:10:52 +00:00
func (e *engine) peerDisconnected(onion string) {
e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(onion),
event.ConnectionState: ConnectionStateName[DISCONNECTED],
}))
2019-01-04 21:44:21 +00:00
}
// sendMessageToPeer sends a message to a peer under a given context
func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error {
conn, err := e.service.WaitForCapabilityOrClose(onion, cwtchCapability)
2019-07-17 19:10:52 +00:00
if err == nil {
2019-08-08 18:39:38 +00:00
peerApp, ok := (conn.App()).(*PeerApp)
2019-07-17 19:10:52 +00:00
if ok {
peerApp.SendMessage(PeerMessage{eventID, context, message})
2019-07-17 19:10:52 +00:00
return nil
}
2019-07-23 17:57:04 +00:00
return errors.New("failed type assertion conn.App != PeerApp")
2019-01-04 21:44:21 +00:00
}
2019-07-17 19:10:52 +00:00
return err
2019-01-04 21:44:21 +00:00
}
func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error {
log.Debugf("sendGetValMessage to peer %v %v%v\n", onion, scope, path)
getVal := peerGetVal{Scope: scope, Path: path}
message, err := json.Marshal(getVal)
if err != nil {
return err
}
return e.sendMessageToPeer(eventID, onion, event.ContextGetVal, message)
}
func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error {
log.Debugf("sendRetValMessage to peer %v (%v) %v %v\n", onion, eventID, val, existsStr)
exists, _ := strconv.ParseBool(existsStr)
retVal := peerRetVal{Val: val, Exists: exists}
message, err := json.Marshal(retVal)
if err != nil {
return err
}
return e.sendMessageToPeer(eventID, onion, event.ContextRetVal, message)
}
func (e *engine) deleteConnection(id string) {
conn, err := e.service.GetConnection(id)
if err == nil {
conn.Close()
}
}
// receiveGroupMessage is a callback function that processes GroupMessages from a given server
2020-07-14 00:46:05 +00:00
func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) {
2019-01-04 21:44:21 +00:00
// Publish Event so that a Profile Engine can deal with it.
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.GroupServer: server, event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)}))
2019-01-04 21:44:21 +00:00
}
// sendMessageToGroup attempts to sent the given message to the given group id.
func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte) {
2020-07-14 00:46:05 +00:00
es, ok := e.ephemeralServices.Load(server)
if es == nil || !ok {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
return
2019-01-04 21:44:21 +00:00
}
2020-07-14 00:46:05 +00:00
ephemeralService := es.(tapir.Service)
2020-07-14 00:46:05 +00:00
conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok {
2021-06-02 18:34:57 +00:00
if spent, numtokens := tokenApp.Post(ct, sig); !spent {
// TODO: while this works for the spam guard, it won't work for other forms of payment...
// Make an -inline- payment, this will hold the goroutine
if err := tokenApp.MakePayment(); err == nil {
// This really shouldn't fail since we now know we have the required tokens...
2021-06-02 18:34:57 +00:00
if spent, _ := tokenApp.Post(ct, sig); !spent {
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
}
} else {
// Broadast the token error
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
2020-09-21 21:26:28 +00:00
}
} else if numtokens < 5 {
go tokenApp.MakePayment()
2020-07-14 00:46:05 +00:00
}
// regardless we return....
return
2020-07-14 00:46:05 +00:00
}
2019-02-20 20:58:05 +00:00
}
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-connection-not-valid", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
2019-01-04 21:44:21 +00:00
}
func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) {
2019-07-30 23:47:12 +00:00
log.Debugf("New message from peer: %v %v", hostname, context)
2021-05-03 23:32:48 +00:00
if context == event.ContextGetVal {
var getVal peerGetVal
err := json.Unmarshal(message, &getVal)
if err == nil {
ev := event.NewEventList(event.NewGetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path)
ev.EventID = eventID
e.eventManager.Publish(ev)
}
2019-07-23 17:57:04 +00:00
} else {
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
2019-01-04 21:44:21 +00:00
}
}
func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte) {
var getVal peerGetVal
var retVal peerRetVal
err := json.Unmarshal(getValData, &getVal)
if err != nil {
log.Errorf("Unmarshalling our own getVal request: %v\n", err)
return
}
err = json.Unmarshal(retValData, &retVal)
if err != nil {
log.Errorf("Unmarshalling peer response to getVal request")
return
}
e.eventManager.Publish(event.NewEventList(event.NewRetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path, event.Exists, strconv.FormatBool(retVal.Exists), event.Data, retVal.Val))
}
2021-06-01 18:34:35 +00:00
func (e *engine) leaveServer(server string) {
es, ok := e.ephemeralServices.Load(server)
if ok {
ephemeralService := es.(tapir.Service)
ephemeralService.Shutdown()
e.ephemeralServices.Delete(server)
}
}