diff --git a/.drone.yml b/.drone.yml index feb1374..1611d05 100644 --- a/.drone.yml +++ b/.drone.yml @@ -5,7 +5,7 @@ name: linux-test steps: - name: fetch - image: golang:1.17.5 + image: golang:1.19.1 volumes: - name: deps path: /go diff --git a/app/app.go b/app/app.go index a4d9cf1..af87955 100644 --- a/app/app.go +++ b/app/app.go @@ -1,57 +1,57 @@ package app import ( - "cwtch.im/cwtch/app/plugins" - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/model" - "cwtch.im/cwtch/model/attr" - "cwtch.im/cwtch/model/constants" - "cwtch.im/cwtch/peer" - "cwtch.im/cwtch/protocol/connections" - "cwtch.im/cwtch/storage" - "git.openprivacy.ca/openprivacy/connectivity" - "git.openprivacy.ca/openprivacy/log" - "os" - path "path/filepath" - "strconv" - "sync" + "cwtch.im/cwtch/app/plugins" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" + "cwtch.im/cwtch/storage" + "git.openprivacy.ca/openprivacy/connectivity" + "git.openprivacy.ca/openprivacy/log" + "os" + path "path/filepath" + "strconv" + "sync" ) type application struct { - eventBuses map[string]event.Manager - directory string + eventBuses map[string]event.Manager + directory string - peerLock sync.Mutex - peers map[string]peer.CwtchPeer - acn connectivity.ACN - plugins sync.Map //map[string] []plugins.Plugin + peerLock sync.Mutex + peers map[string]peer.CwtchPeer + acn connectivity.ACN + plugins sync.Map //map[string] []plugins.Plugin - engines map[string]connections.Engine - appBus event.Manager - appmutex sync.Mutex + engines map[string]connections.Engine + appBus event.Manager + appmutex sync.Mutex } // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers type Application interface { - LoadProfiles(password string) - CreateTaggedPeer(name string, password string, tag string) - ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) - DeletePeer(onion string, currentPassword string) - AddPeerPlugin(onion string, pluginID plugins.PluginID) + LoadProfiles(password string) + CreateTaggedPeer(name string, password string, tag string) + ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) + DeletePeer(onion string, currentPassword string) + AddPeerPlugin(onion string, pluginID plugins.PluginID) - GetPrimaryBus() event.Manager - GetEventBus(onion string) event.Manager - QueryACNStatus() - QueryACNVersion() + GetPrimaryBus() event.Manager + GetEventBus(onion string) event.Manager + QueryACNStatus() + QueryACNVersion() - ActivePeerEngine(onion string, doListen, doPeers, doServers bool) - DeactivatePeerEngine(onion string) + ActivePeerEngine(onion string, doListen, doPeers, doServers bool) + DeactivatePeerEngine(onion string) - ShutdownPeer(string) - Shutdown() + ShutdownPeer(string) + Shutdown() - GetPeer(onion string) peer.CwtchPeer - ListProfiles() []string + GetPeer(onion string) peer.CwtchPeer + ListProfiles() []string } // LoadProfileFn is the function signature for a function in an app that loads a profile @@ -59,295 +59,295 @@ type LoadProfileFn func(profile peer.CwtchPeer) // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string) Application { - log.Debugf("NewApp(%v)\n", appDirectory) - os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) + log.Debugf("NewApp(%v)\n", appDirectory) + os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) - app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()} - app.peers = make(map[string]peer.CwtchPeer) + app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()} + app.peers = make(map[string]peer.CwtchPeer) - app.acn = acn - statusHandler := app.getACNStatusHandler() - acn.SetStatusCallback(statusHandler) - acn.SetVersionCallback(app.getACNVersionHandler()) - prog, status := acn.GetBootstrapStatus() - statusHandler(prog, status) + app.acn = acn + statusHandler := app.getACNStatusHandler() + acn.SetStatusCallback(statusHandler) + acn.SetVersionCallback(app.getACNVersionHandler()) + prog, status := acn.GetBootstrapStatus() + statusHandler(prog, status) - return app + return app } // ListProfiles returns a map of onions to their profile's Name func (app *application) ListProfiles() []string { - var keys []string + var keys []string - app.peerLock.Lock() - defer app.peerLock.Unlock() - for handle := range app.peers { - keys = append(keys, handle) - } - return keys + app.peerLock.Lock() + defer app.peerLock.Unlock() + for handle := range app.peers { + keys = append(keys, handle) + } + return keys } // GetPeer returns a cwtchPeer for a given onion address func (app *application) GetPeer(onion string) peer.CwtchPeer { - if peer, ok := app.peers[onion]; ok { - return peer - } - return nil + if peer, ok := app.peers[onion]; ok { + return peer + } + return nil } func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) { - if _, exists := ap.plugins.Load(peerid); !exists { - ap.plugins.Store(peerid, []plugins.Plugin{}) - } + if _, exists := ap.plugins.Load(peerid); !exists { + ap.plugins.Store(peerid, []plugins.Plugin{}) + } - pluginsinf, _ := ap.plugins.Load(peerid) - peerPlugins := pluginsinf.([]plugins.Plugin) + pluginsinf, _ := ap.plugins.Load(peerid) + peerPlugins := pluginsinf.([]plugins.Plugin) - newp, err := plugins.Get(id, bus, acn, peerid) - if err == nil { - newp.Start() - peerPlugins = append(peerPlugins, newp) - log.Debugf("storing plugin for %v %v", peerid, peerPlugins) - ap.plugins.Store(peerid, peerPlugins) - } else { - log.Errorf("error adding plugin: %v", err) - } + newp, err := plugins.Get(id, bus, acn, peerid) + if err == nil { + newp.Start() + peerPlugins = append(peerPlugins, newp) + log.Debugf("storing plugin for %v %v", peerid, peerPlugins) + ap.plugins.Store(peerid, peerPlugins) + } else { + log.Errorf("error adding plugin: %v", err) + } } func (app *application) CreateTaggedPeer(name string, password string, tag string) { - app.appmutex.Lock() - defer app.appmutex.Unlock() + app.appmutex.Lock() + defer app.appmutex.Unlock() - profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID()) + profileDirectory := path.Join(app.directory, "profiles", model.GenerateRandomID()) - profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password) - if err != nil { - log.Errorf("Error Creating Peer: %v", err) - app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error())) - return - } + profile, err := peer.CreateEncryptedStorePeer(profileDirectory, name, password) + if err != nil { + log.Errorf("Error Creating Peer: %v", err) + app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error())) + return + } - eventBus := event.NewEventManager() - app.eventBuses[profile.GetOnion()] = eventBus - profile.Init(app.eventBuses[profile.GetOnion()]) - app.peers[profile.GetOnion()] = profile + eventBus := event.NewEventManager() + app.eventBuses[profile.GetOnion()] = eventBus + profile.Init(app.eventBuses[profile.GetOnion()]) + app.peers[profile.GetOnion()] = profile - if tag != "" { - profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) - } + if tag != "" { + profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) + } - app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) } func (app *application) DeletePeer(onion string, password string) { - log.Infof("DeletePeer called on %v\n", onion) - app.appmutex.Lock() - defer app.appmutex.Unlock() + log.Infof("DeletePeer called on %v\n", onion) + app.appmutex.Lock() + defer app.appmutex.Unlock() - if app.peers[onion].CheckPassword(password) { - app.shutdownPeer(onion) - app.peers[onion].Delete() + if app.peers[onion].CheckPassword(password) { + app.shutdownPeer(onion) + app.peers[onion].Delete() - // Shutdown and Remove the Engine + // Shutdown and Remove the Engine - log.Debugf("Delete peer for %v Done\n", onion) - app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) - return - } - app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion)) + log.Debugf("Delete peer for %v Done\n", onion) + app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) + return + } + app.appBus.Publish(event.NewEventList(event.AppError, event.Error, event.PasswordMatchError, event.Identity, onion)) } func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) { - app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn) + app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn) } func (app *application) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) { - profileDirectory := path.Join(app.directory, "profiles") - profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password) - if profile != nil || err == nil { - app.installProfile(profile) - } - return profile, err + profileDirectory := path.Join(app.directory, "profiles") + profile, err := peer.ImportProfile(exportedCwtchFile, profileDirectory, password) + if profile != nil || err == nil { + app.installProfile(profile) + } + return profile, err } // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them func (app *application) LoadProfiles(password string) { - count := 0 - migrating := false + count := 0 + migrating := false - files, err := os.ReadDir(path.Join(app.directory, "profiles")) - if err != nil { - log.Errorf("error: cannot read profiles directory: %v", err) - return - } + files, err := os.ReadDir(path.Join(app.directory, "profiles")) + if err != nil { + log.Errorf("error: cannot read profiles directory: %v", err) + return + } - for _, file := range files { - // Attempt to load an encrypted database - profileDirectory := path.Join(app.directory, "profiles", file.Name()) - profile, err := peer.FromEncryptedDatabase(profileDirectory, password) - loaded := false - if err == nil { - // return the load the profile... - log.Infof("loading profile from new-type storage database...") - loaded = app.installProfile(profile) - } else { // On failure attempt to load a legacy profile - profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) - if err != nil { - continue - } - log.Infof("found legacy profile. importing to new database structure...") - legacyProfile := profileStore.GetProfileCopy(true) - if !migrating { - migrating = true - app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion)) - } + for _, file := range files { + // Attempt to load an encrypted database + profileDirectory := path.Join(app.directory, "profiles", file.Name()) + profile, err := peer.FromEncryptedDatabase(profileDirectory, password) + loaded := false + if err == nil { + // return the load the profile... + log.Infof("loading profile from new-type storage database...") + loaded = app.installProfile(profile) + } else { // On failure attempt to load a legacy profile + profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) + if err != nil { + continue + } + log.Infof("found legacy profile. importing to new database structure...") + legacyProfile := profileStore.GetProfileCopy(true) + if !migrating { + migrating = true + app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion)) + } - cps, err := peer.CreateEncryptedStore(profileDirectory, password) - if err != nil { - log.Errorf("error creating encrypted store: %v", err) - } - profile := peer.ImportLegacyProfile(legacyProfile, cps) - loaded = app.installProfile(profile) - } - if loaded { - count++ - } - } - if count == 0 { - message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) - app.appBus.Publish(message) - } - if migrating { - app.appBus.Publish(event.NewEventList(event.DoneStorageMigration)) - } + cps, err := peer.CreateEncryptedStore(profileDirectory, password) + if err != nil { + log.Errorf("error creating encrypted store: %v", err) + } + profile := peer.ImportLegacyProfile(legacyProfile, cps) + loaded = app.installProfile(profile) + } + if loaded { + count++ + } + } + if count == 0 { + message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) + app.appBus.Publish(message) + } + if migrating { + app.appBus.Publish(event.NewEventList(event.DoneStorageMigration)) + } } // installProfile takes a profile and if it isn't loaded in the app, installs it and returns true func (app *application) installProfile(profile peer.CwtchPeer) bool { - app.appmutex.Lock() - defer app.appmutex.Unlock() + app.appmutex.Lock() + defer app.appmutex.Unlock() - // Only attempt to finalize the profile if we don't have one loaded... - if app.peers[profile.GetOnion()] == nil { - eventBus := event.NewEventManager() - app.eventBuses[profile.GetOnion()] = eventBus - profile.Init(app.eventBuses[profile.GetOnion()]) - app.peers[profile.GetOnion()] = profile - app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) - return true - } - // Otherwise shutdown the connections - profile.Shutdown() - return false + // Only attempt to finalize the profile if we don't have one loaded... + if app.peers[profile.GetOnion()] == nil { + eventBus := event.NewEventManager() + app.eventBuses[profile.GetOnion()] = eventBus + profile.Init(app.eventBuses[profile.GetOnion()]) + app.peers[profile.GetOnion()] = profile + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) + return true + } + // Otherwise shutdown the connections + profile.Shutdown() + return false } // ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) { - profile := app.GetPeer(onion) - if profile != nil { - app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) - if doListen { - profile.Listen() - } - if doPeers { - profile.StartPeersConnections() - } - if doServers { - profile.StartServerConnections() - } - } + profile := app.GetPeer(onion) + if profile != nil { + app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) + if doListen { + profile.Listen() + } + if doPeers { + profile.StartPeersConnections() + } + if doServers { + profile.StartServerConnections() + } + } } // DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline func (app *application) DeactivatePeerEngine(onion string) { - if engine, exists := app.engines[onion]; exists { - engine.Shutdown() - delete(app.engines, onion) - } + if engine, exists := app.engines[onion]; exists { + engine.Shutdown() + delete(app.engines, onion) + } } // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific func (app *application) GetPrimaryBus() event.Manager { - return app.appBus + return app.appBus } // GetEventBus returns a cwtchPeer's event bus func (app *application) GetEventBus(onion string) event.Manager { - if manager, ok := app.eventBuses[onion]; ok { - return manager - } - return nil + if manager, ok := app.eventBuses[onion]; ok { + return manager + } + return nil } func (app *application) getACNStatusHandler() func(int, string) { - return func(progress int, status string) { - progStr := strconv.Itoa(progress) - app.appmutex.Lock() - app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) - for _, bus := range app.eventBuses { - bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) - } - app.appmutex.Unlock() - } + return func(progress int, status string) { + progStr := strconv.Itoa(progress) + app.appmutex.Lock() + app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) + for _, bus := range app.eventBuses { + bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)) + } + app.appmutex.Unlock() + } } func (app *application) getACNVersionHandler() func(string) { - return func(version string) { - app.appmutex.Lock() - defer app.appmutex.Unlock() - app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) - } + return func(version string) { + app.appmutex.Lock() + defer app.appmutex.Unlock() + app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) + } } func (app *application) QueryACNStatus() { - prog, status := app.acn.GetBootstrapStatus() - app.getACNStatusHandler()(prog, status) + prog, status := app.acn.GetBootstrapStatus() + app.getACNStatusHandler()(prog, status) } func (app *application) QueryACNVersion() { - version := app.acn.GetVersion() - app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) + version := app.acn.GetVersion() + app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version)) } // ShutdownPeer shuts down a peer and removes it from the app's management func (app *application) ShutdownPeer(onion string) { - app.appmutex.Lock() - defer app.appmutex.Unlock() - app.shutdownPeer(onion) + app.appmutex.Lock() + defer app.appmutex.Unlock() + app.shutdownPeer(onion) } // shutdownPeer mutex unlocked helper shutdown peer func (app *application) shutdownPeer(onion string) { - app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) - app.eventBuses[onion].Shutdown() - delete(app.eventBuses, onion) - app.peers[onion].Shutdown() - delete(app.peers, onion) - if _, ok := app.engines[onion]; ok { - app.engines[onion].Shutdown() - delete(app.engines, onion) - } - log.Debugf("shutting down plugins for %v", onion) - pluginsI, ok := app.plugins.Load(onion) - if ok { - plugins := pluginsI.([]plugins.Plugin) - for _, plugin := range plugins { - log.Debugf("shutting down plugin: %v", plugin) - plugin.Shutdown() - } - } - app.plugins.Delete(onion) + app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) + app.eventBuses[onion].Shutdown() + delete(app.eventBuses, onion) + app.peers[onion].Shutdown() + delete(app.peers, onion) + if _, ok := app.engines[onion]; ok { + app.engines[onion].Shutdown() + delete(app.engines, onion) + } + log.Debugf("shutting down plugins for %v", onion) + pluginsI, ok := app.plugins.Load(onion) + if ok { + plugins := pluginsI.([]plugins.Plugin) + for _, plugin := range plugins { + log.Debugf("shutting down plugin: %v", plugin) + plugin.Shutdown() + } + } + app.plugins.Delete(onion) } // Shutdown shutsdown all peers of an app func (app *application) Shutdown() { - app.appmutex.Lock() - defer app.appmutex.Unlock() - for id := range app.peers { - log.Debugf("Shutting Down Peer %v", id) - app.shutdownPeer(id) - } - log.Debugf("Shutting Down App") - app.appBus.Shutdown() - log.Debugf("Shut Down Complete") + app.appmutex.Lock() + defer app.appmutex.Unlock() + for id := range app.peers { + log.Debugf("Shutting Down Peer %v", id) + app.shutdownPeer(id) + } + log.Debugf("Shutting Down App") + app.appBus.Shutdown() + log.Debugf("Shut Down Complete") } diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 40600f6..860ab02 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -1,70 +1,70 @@ package connections import ( - "encoding/base64" - "encoding/json" - "fmt" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" + "encoding/base64" + "encoding/json" + "fmt" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/model" - "cwtch.im/cwtch/protocol/files" - "cwtch.im/cwtch/protocol/groups" - pmodel "cwtch.im/cwtch/protocol/model" - "git.openprivacy.ca/cwtch.im/tapir" - "git.openprivacy.ca/cwtch.im/tapir/applications" - "git.openprivacy.ca/cwtch.im/tapir/networks/tor" - "git.openprivacy.ca/cwtch.im/tapir/primitives" - "git.openprivacy.ca/openprivacy/connectivity" - torProvider "git.openprivacy.ca/openprivacy/connectivity/tor" - "git.openprivacy.ca/openprivacy/log" - "github.com/gtank/ristretto255" - "golang.org/x/crypto/ed25519" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/protocol/files" + "cwtch.im/cwtch/protocol/groups" + pmodel "cwtch.im/cwtch/protocol/model" + "git.openprivacy.ca/cwtch.im/tapir" + "git.openprivacy.ca/cwtch.im/tapir/applications" + "git.openprivacy.ca/cwtch.im/tapir/networks/tor" + "git.openprivacy.ca/cwtch.im/tapir/primitives" + "git.openprivacy.ca/openprivacy/connectivity" + torProvider "git.openprivacy.ca/openprivacy/connectivity/tor" + "git.openprivacy.ca/openprivacy/log" + "github.com/gtank/ristretto255" + "golang.org/x/crypto/ed25519" ) type connectionLockedService struct { - service tapir.Service - connectingLock sync.Mutex + service tapir.Service + connectingLock sync.Mutex } type engine struct { - queue event.Queue + queue event.Queue - // Engine Attributes - identity primitives.Identity - acn connectivity.ACN + // Engine Attributes + identity primitives.Identity + acn connectivity.ACN - // Authorization list of contacts to authorization status - authorizations sync.Map // string(onion) => model.Authorization + // Authorization list of contacts to authorization status + authorizations sync.Map // string(onion) => model.Authorization - // Block Unknown Contacts - blockUnknownContacts bool + // Block Unknown Contacts + blockUnknownContacts bool - // Pointer to the Global Event Manager - eventManager event.Manager + // Pointer to the Global Event Manager + eventManager event.Manager - // Nextgen Tapir Service - service tapir.Service + // Nextgen Tapir Service + service tapir.Service - getValRequests sync.Map // [string]string eventID:Data + getValRequests sync.Map // [string]string eventID:Data - // Nextgen Tapir Service - ephemeralServices map[string]*connectionLockedService //sync.Map // string(onion) => tapir.Service - ephemeralServicesLock sync.Mutex + // Nextgen Tapir Service + ephemeralServices map[string]*connectionLockedService //sync.Map // string(onion) => tapir.Service + ephemeralServicesLock sync.Mutex - // Required for listen(), inaccessible from identity - privateKey ed25519.PrivateKey + // Required for listen(), inaccessible from identity + privateKey ed25519.PrivateKey - // file sharing subsystem is responsible for maintaining active shares and downloads - filesharingSubSystem files.FileSharingSubSystem + // file sharing subsystem is responsible for maintaining active shares and downloads + filesharingSubSystem files.FileSharingSubSystem - tokenManagers sync.Map // [tokenService][]TokenManager + tokenManagers sync.Map // [tokenService][]TokenManager - shuttingDown atomic.Bool + shuttingDown atomic.Bool } // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. @@ -72,678 +72,678 @@ type engine struct { // Protocol Engine *can* associate Group Identifiers with Group Servers, although we don't currently make use of this fact // other than to route errors back to the UI. type Engine interface { - ACN() connectivity.ACN - EventManager() event.Manager - Shutdown() + ACN() connectivity.ACN + EventManager() event.Manager + Shutdown() } // NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, peerAuthorizations map[string]model.Authorization) Engine { - engine := new(engine) - engine.identity = identity - engine.privateKey = privateKey - engine.ephemeralServices = make(map[string]*connectionLockedService) - engine.queue = event.NewQueue() - go engine.eventHandler() + engine := new(engine) + engine.identity = identity + engine.privateKey = privateKey + engine.ephemeralServices = make(map[string]*connectionLockedService) + engine.queue = event.NewQueue() + go engine.eventHandler() - engine.acn = acn + engine.acn = acn - // Init the Server running the Simple App. - engine.service = new(tor.BaseOnionService) - engine.service.Init(acn, privateKey, &identity) + // Init the Server running the Simple App. + engine.service = new(tor.BaseOnionService) + engine.service.Init(acn, privateKey, &identity) - engine.eventManager = eventManager + engine.eventManager = eventManager - engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue) - engine.eventManager.Subscribe(event.ProtocolEngineShutdown, engine.queue) - engine.eventManager.Subscribe(event.PeerRequest, engine.queue) - engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue) - engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue) - engine.eventManager.Subscribe(event.JoinServer, engine.queue) - engine.eventManager.Subscribe(event.LeaveServer, engine.queue) - engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue) - engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue) - engine.eventManager.Subscribe(event.SendGetValMessageToPeer, engine.queue) - engine.eventManager.Subscribe(event.SendRetValMessageToPeer, engine.queue) - engine.eventManager.Subscribe(event.DeleteContact, engine.queue) + engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue) + engine.eventManager.Subscribe(event.ProtocolEngineShutdown, engine.queue) + engine.eventManager.Subscribe(event.PeerRequest, engine.queue) + engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue) + engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue) + engine.eventManager.Subscribe(event.JoinServer, engine.queue) + engine.eventManager.Subscribe(event.LeaveServer, engine.queue) + engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue) + engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue) + engine.eventManager.Subscribe(event.SendGetValMessageToPeer, engine.queue) + engine.eventManager.Subscribe(event.SendRetValMessageToPeer, engine.queue) + engine.eventManager.Subscribe(event.DeleteContact, engine.queue) - engine.eventManager.Subscribe(event.UpdateConversationAuthorization, engine.queue) - engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue) - engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue) + engine.eventManager.Subscribe(event.UpdateConversationAuthorization, engine.queue) + engine.eventManager.Subscribe(event.BlockUnknownPeers, engine.queue) + engine.eventManager.Subscribe(event.AllowUnknownPeers, engine.queue) - // File Handling - engine.eventManager.Subscribe(event.ShareManifest, engine.queue) - engine.eventManager.Subscribe(event.StopFileShare, engine.queue) - engine.eventManager.Subscribe(event.StopAllFileShares, engine.queue) - engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue) - engine.eventManager.Subscribe(event.ManifestSaved, engine.queue) + // File Handling + engine.eventManager.Subscribe(event.ShareManifest, engine.queue) + engine.eventManager.Subscribe(event.StopFileShare, engine.queue) + engine.eventManager.Subscribe(event.StopAllFileShares, engine.queue) + engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue) + engine.eventManager.Subscribe(event.ManifestSaved, engine.queue) - // Token Server - engine.eventManager.Subscribe(event.MakeAntispamPayment, engine.queue) + // Token Server + engine.eventManager.Subscribe(event.MakeAntispamPayment, engine.queue) - for peer, authorization := range peerAuthorizations { - engine.authorizations.Store(peer, authorization) - } - return engine + for peer, authorization := range peerAuthorizations { + engine.authorizations.Store(peer, authorization) + } + return engine } func (e *engine) ACN() connectivity.ACN { - return e.acn + return e.acn } func (e *engine) EventManager() event.Manager { - return e.eventManager + return e.eventManager } // eventHandler process events from other subsystems func (e *engine) eventHandler() { - for { - ev := e.queue.Next() - // optimistic shutdown... - if e.shuttingDown.Load() { - return - } - switch ev.EventType { - case event.StatusRequest: - e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID}) - case event.PeerRequest: - if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { - go e.peerWithOnion(ev.Data[event.RemotePeer]) - } - case event.RetryPeerRequest: - // This event allows engine to treat (automated) retry peering requests differently to user-specified - // peer events - if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { - log.Debugf("Retrying Peer Request: %v", ev.Data[event.RemotePeer]) - go e.peerWithOnion(ev.Data[event.RemotePeer]) - } - case event.InvitePeerToGroup: - err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])}) - if err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.InvitePeerToGroup), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) - } - case event.JoinServer: - signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - if err != nil { - // will result in a full sync - signature = []byte{} - } - go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) - case event.MakeAntispamPayment: - go e.makeAntispamPayment(ev.Data[event.GroupServer]) - case event.LeaveServer: - e.leaveServer(ev.Data[event.GroupServer]) - case event.DeleteContact: - onion := ev.Data[event.RemotePeer] - // We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on. - e.authorizations.Delete(ev.Data[event.RemotePeer]) - e.deleteConnection(onion) - case event.SendMessageToGroup: - ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) - signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0) - case event.SendMessageToPeer: - // TODO: remove this passthrough once the UI is integrated. - context, ok := ev.Data[event.EventContext] - if !ok { - context = event.ContextRaw - } - if err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: context, Data: []byte(ev.Data[event.Data])}); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) - } - case event.SendGetValMessageToPeer: - if err := e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path]); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendGetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) - } - case event.SendRetValMessageToPeer: - if err := e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendRetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) - } - case event.UpdateConversationAuthorization: - accepted, _ := strconv.ParseBool(ev.Data[event.Accepted]) - blocked, _ := strconv.ParseBool(ev.Data[event.Blocked]) - auth := model.AuthUnknown - if blocked { - auth = model.AuthBlocked - } else if accepted { - auth = model.AuthApproved - } - e.authorizations.Store(ev.Data[event.RemotePeer], auth) - if auth == model.AuthBlocked { - connection, err := e.service.GetConnection(ev.Data[event.RemotePeer]) - if connection != nil && err == nil { - connection.Close() - } - // Explicitly send a disconnected event (if we don't do this here then the UI can wait for a while before - // an ongoing Open() connection fails and so the user will see a blocked peer as still connecting (because - // there isn't an active connection and we are stuck waiting for tor to time out) - e.peerDisconnected(ev.Data[event.RemotePeer]) - } - case event.AllowUnknownPeers: - log.Debugf("%v now allows unknown connections", e.identity.Hostname()) - e.blockUnknownContacts = false - case event.BlockUnknownPeers: - log.Debugf("%v now forbids unknown connections", e.identity.Hostname()) - e.blockUnknownContacts = true - case event.ProtocolEngineStartListen: - go e.listenFn() - case event.ShareManifest: - e.filesharingSubSystem.ShareFile(ev.Data[event.FileKey], ev.Data[event.SerializedManifest]) - case event.StopFileShare: - e.filesharingSubSystem.StopFileShare(ev.Data[event.FileKey]) - case event.StopAllFileShares: - e.filesharingSubSystem.StopAllFileShares() - case event.ManifestSizeReceived: - handle := ev.Data[event.Handle] - key := ev.Data[event.FileKey] - size, _ := strconv.Atoi(ev.Data[event.ManifestSize]) - if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) - } - case event.ManifestSaved: - handle := ev.Data[event.Handle] - key := ev.Data[event.FileKey] - serializedManifest := ev.Data[event.SerializedManifest] - tempFile := ev.Data[event.TempFile] - title := ev.Data[event.NameSuggestion] - // NOTE: for now there will probably only ever be a single chunk request. When we enable group - // sharing and rehosting then this loop will serve as a a way of splitting the request among multiple - // contacts - for _, message := range e.filesharingSubSystem.CompileChunkRequests(key, serializedManifest, tempFile, title) { - if err := e.sendPeerMessage(handle, message); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) - } - } - case event.ProtocolEngineShutdown: - return - default: - return - } - } + for { + ev := e.queue.Next() + // optimistic shutdown... + if e.shuttingDown.Load() { + return + } + switch ev.EventType { + case event.StatusRequest: + e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID}) + case event.PeerRequest: + if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { + go e.peerWithOnion(ev.Data[event.RemotePeer]) + } + case event.RetryPeerRequest: + // This event allows engine to treat (automated) retry peering requests differently to user-specified + // peer events + if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { + log.Debugf("Retrying Peer Request: %v", ev.Data[event.RemotePeer]) + go e.peerWithOnion(ev.Data[event.RemotePeer]) + } + case event.InvitePeerToGroup: + err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])}) + if err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.InvitePeerToGroup), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) + } + case event.JoinServer: + signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) + if err != nil { + // will result in a full sync + signature = []byte{} + } + go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) + case event.MakeAntispamPayment: + go e.makeAntispamPayment(ev.Data[event.GroupServer]) + case event.LeaveServer: + e.leaveServer(ev.Data[event.GroupServer]) + case event.DeleteContact: + onion := ev.Data[event.RemotePeer] + // We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on. + e.authorizations.Delete(ev.Data[event.RemotePeer]) + e.deleteConnection(onion) + case event.SendMessageToGroup: + ciphertext, _ := base64.StdEncoding.DecodeString(ev.Data[event.Ciphertext]) + signature, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) + go e.sendMessageToGroup(ev.Data[event.GroupID], ev.Data[event.GroupServer], ciphertext, signature, 0) + case event.SendMessageToPeer: + // TODO: remove this passthrough once the UI is integrated. + context, ok := ev.Data[event.EventContext] + if !ok { + context = event.ContextRaw + } + if err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: context, Data: []byte(ev.Data[event.Data])}); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) + } + case event.SendGetValMessageToPeer: + if err := e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path]); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendGetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + } + case event.SendRetValMessageToPeer: + if err := e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.EventContext: string(event.SendRetValMessageToPeer), event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + } + case event.UpdateConversationAuthorization: + accepted, _ := strconv.ParseBool(ev.Data[event.Accepted]) + blocked, _ := strconv.ParseBool(ev.Data[event.Blocked]) + auth := model.AuthUnknown + if blocked { + auth = model.AuthBlocked + } else if accepted { + auth = model.AuthApproved + } + e.authorizations.Store(ev.Data[event.RemotePeer], auth) + if auth == model.AuthBlocked { + connection, err := e.service.GetConnection(ev.Data[event.RemotePeer]) + if connection != nil && err == nil { + connection.Close() + } + // Explicitly send a disconnected event (if we don't do this here then the UI can wait for a while before + // an ongoing Open() connection fails and so the user will see a blocked peer as still connecting (because + // there isn't an active connection and we are stuck waiting for tor to time out) + e.peerDisconnected(ev.Data[event.RemotePeer]) + } + case event.AllowUnknownPeers: + log.Debugf("%v now allows unknown connections", e.identity.Hostname()) + e.blockUnknownContacts = false + case event.BlockUnknownPeers: + log.Debugf("%v now forbids unknown connections", e.identity.Hostname()) + e.blockUnknownContacts = true + case event.ProtocolEngineStartListen: + go e.listenFn() + case event.ShareManifest: + e.filesharingSubSystem.ShareFile(ev.Data[event.FileKey], ev.Data[event.SerializedManifest]) + case event.StopFileShare: + e.filesharingSubSystem.StopFileShare(ev.Data[event.FileKey]) + case event.StopAllFileShares: + e.filesharingSubSystem.StopAllFileShares() + case event.ManifestSizeReceived: + handle := ev.Data[event.Handle] + key := ev.Data[event.FileKey] + size, _ := strconv.Atoi(ev.Data[event.ManifestSize]) + if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + } + case event.ManifestSaved: + handle := ev.Data[event.Handle] + key := ev.Data[event.FileKey] + serializedManifest := ev.Data[event.SerializedManifest] + tempFile := ev.Data[event.TempFile] + title := ev.Data[event.NameSuggestion] + // NOTE: for now there will probably only ever be a single chunk request. When we enable group + // sharing and rehosting then this loop will serve as a a way of splitting the request among multiple + // contacts + for _, message := range e.filesharingSubSystem.CompileChunkRequests(key, serializedManifest, tempFile, title) { + if err := e.sendPeerMessage(handle, message); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + } + } + case event.ProtocolEngineShutdown: + return + default: + return + } + } } func (e *engine) isBlocked(onion string) bool { - authorization, known := e.authorizations.Load(onion) - if !known { - // if we block unknown peers we will block this contact - return e.blockUnknownContacts - } - return authorization.(model.Authorization) == model.AuthBlocked + authorization, known := e.authorizations.Load(onion) + if !known { + // if we block unknown peers we will block this contact + return e.blockUnknownContacts + } + return authorization.(model.Authorization) == model.AuthBlocked } func (e *engine) isAllowed(onion string) bool { - authorization, known := e.authorizations.Load(onion) - if !known { - log.Errorf("attempted to lookup authorization of onion not in map...that should never happen") - return false - } - if e.blockUnknownContacts { - return authorization.(model.Authorization) == model.AuthApproved - } - return authorization.(model.Authorization) != model.AuthBlocked + authorization, known := e.authorizations.Load(onion) + if !known { + log.Errorf("attempted to lookup authorization of onion not in map...that should never happen") + return false + } + if e.blockUnknownContacts { + return authorization.(model.Authorization) == model.AuthApproved + } + return authorization.(model.Authorization) != model.AuthBlocked } func (e *engine) createPeerTemplate() *PeerApp { - peerAppTemplate := new(PeerApp) - peerAppTemplate.IsBlocked = e.isBlocked - peerAppTemplate.IsAllowed = e.isAllowed - peerAppTemplate.MessageHandler = e.handlePeerMessage - peerAppTemplate.OnAcknowledgement = e.ignoreOnShutdown2(e.peerAck) - peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed) - peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting) - peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected) - return peerAppTemplate + peerAppTemplate := new(PeerApp) + peerAppTemplate.IsBlocked = e.isBlocked + peerAppTemplate.IsAllowed = e.isAllowed + peerAppTemplate.MessageHandler = e.handlePeerMessage + peerAppTemplate.OnAcknowledgement = e.ignoreOnShutdown2(e.peerAck) + peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed) + peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting) + peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected) + return peerAppTemplate } // Listen sets up an onion listener to process incoming cwtch messages func (e *engine) listenFn() { - err := e.service.Listen(e.createPeerTemplate()) - if !e.shuttingDown.Load() { - e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()})) - } + err := e.service.Listen(e.createPeerTemplate()) + if !e.shuttingDown.Load() { + e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[event.Field]string{event.Identity: e.identity.Hostname(), event.Error: err.Error()})) + } } // Shutdown tears down the eventHandler goroutine func (e *engine) Shutdown() { - // don't accept any more events... - e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{})) - e.service.Shutdown() + // don't accept any more events... + e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{})) + e.service.Shutdown() - e.shuttingDown.Store(true) + e.shuttingDown.Store(true) - e.ephemeralServicesLock.Lock() - defer e.ephemeralServicesLock.Unlock() - for _, connection := range e.ephemeralServices { - log.Infof("shutting down ephemeral service") - connection.connectingLock.Lock() - connection.service.Shutdown() - connection.connectingLock.Unlock() - } + e.ephemeralServicesLock.Lock() + defer e.ephemeralServicesLock.Unlock() + for _, connection := range e.ephemeralServices { + log.Infof("shutting down ephemeral service") + connection.connectingLock.Lock() + connection.service.Shutdown() + connection.connectingLock.Unlock() + } - e.queue.Shutdown() + e.queue.Shutdown() } // peerWithOnion is the entry point for cwtchPeer relationships // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithOnion(onion string) { - log.Debugf("Called PeerWithOnion for %v", onion) - if !e.isBlocked(onion) { - e.ignoreOnShutdown(e.peerConnecting)(onion) - connected, err := e.service.Connect(onion, e.createPeerTemplate()) + log.Debugf("Called PeerWithOnion for %v", onion) + if !e.isBlocked(onion) { + e.ignoreOnShutdown(e.peerConnecting)(onion) + connected, err := e.service.Connect(onion, e.createPeerTemplate()) - // If we are already connected...check if we are authed and issue an auth event - // (This allows the ui to be stateless) - if connected && err != nil { - conn, err := e.service.GetConnection(onion) - if err == nil { - if conn.HasCapability(cwtchCapability) { - e.ignoreOnShutdown(e.peerAuthed)(onion) - return - } - } - } + // If we are already connected...check if we are authed and issue an auth event + // (This allows the ui to be stateless) + if connected && err != nil { + conn, err := e.service.GetConnection(onion) + if err == nil { + if conn.HasCapability(cwtchCapability) { + e.ignoreOnShutdown(e.peerAuthed)(onion) + return + } + } + } - // Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists) - if !connected && err != nil { - e.ignoreOnShutdown(e.peerDisconnected)(onion) - } - } + // Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists) + if !connected && err != nil { + e.ignoreOnShutdown(e.peerDisconnected)(onion) + } + } } func (e *engine) makeAntispamPayment(onion string) { - log.Debugf("making antispam payment") - e.ephemeralServicesLock.Lock() - ephemeralService, ok := e.ephemeralServices[onion] - e.ephemeralServicesLock.Unlock() + log.Debugf("making antispam payment") + e.ephemeralServicesLock.Lock() + ephemeralService, ok := e.ephemeralServices[onion] + e.ephemeralServicesLock.Unlock() - if ephemeralService == nil || !ok { - log.Debugf("could not find associated group for antispam payment") - return - } + if ephemeralService == nil || !ok { + log.Debugf("could not find associated group for antispam payment") + return + } - conn, err := ephemeralService.service.GetConnection(onion) - if err == nil { - tokenApp, ok := (conn.App()).(*TokenBoardClient) - if ok { - tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager)) - tokenManager := tokenManagerPointer.(*TokenManager) - log.Debugf("checking antispam tokens %v", tokenManager.NumTokens()) - if tokenManager.NumTokens() < 5 { - go tokenApp.PurchaseTokens() - } - } - } + conn, err := ephemeralService.service.GetConnection(onion) + if err == nil { + tokenApp, ok := (conn.App()).(*TokenBoardClient) + if ok { + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager)) + tokenManager := tokenManagerPointer.(*TokenManager) + log.Debugf("checking antispam tokens %v", tokenManager.NumTokens()) + if tokenManager.NumTokens() < 5 { + go tokenApp.PurchaseTokens() + } + } + } } // peerWithTokenServer is the entry point for cwtchPeer - server relationships // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { - e.ephemeralServicesLock.Lock() - _, exists := e.ephemeralServices[onion] + e.ephemeralServicesLock.Lock() + _, exists := e.ephemeralServices[onion] - if exists { - e.ephemeralServicesLock.Unlock() - log.Debugf("attempted to join a server with an active connection") - return - } + if exists { + e.ephemeralServicesLock.Unlock() + log.Debugf("attempted to join a server with an active connection") + return + } - connectionService := &connectionLockedService{service: new(tor.BaseOnionService)} - e.ephemeralServices[onion] = connectionService + connectionService := &connectionLockedService{service: new(tor.BaseOnionService)} + e.ephemeralServices[onion] = connectionService - connectionService.connectingLock.Lock() - defer connectionService.connectingLock.Unlock() - e.ephemeralServicesLock.Unlock() + connectionService.connectingLock.Lock() + defer connectionService.connectingLock.Unlock() + e.ephemeralServicesLock.Unlock() - log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion) - e.ignoreOnShutdown(e.serverConnecting)(onion) - // Create a new ephemeral service for this connection - eid, epk := primitives.InitializeEphemeralIdentity() - connectionService.service.Init(e.acn, epk, &eid) + log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion) + e.ignoreOnShutdown(e.serverConnecting)(onion) + // Create a new ephemeral service for this connection + eid, epk := primitives.InitializeEphemeralIdentity() + connectionService.service.Init(e.acn, epk, &eid) - Y := new(ristretto255.Element) - Y.UnmarshalText([]byte(tokenServerY)) - connected, err := connectionService.service.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e)) - // If we are already connected...check if we are authed and issue an auth event - // (This allows the ui to be stateless) - if connected && err != nil { - conn, err := connectionService.service.GetConnection(onion) - if err == nil { + Y := new(ristretto255.Element) + Y.UnmarshalText([]byte(tokenServerY)) + connected, err := connectionService.service.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e)) + // If we are already connected...check if we are authed and issue an auth event + // (This allows the ui to be stateless) + if connected && err != nil { + conn, err := connectionService.service.GetConnection(onion) + if err == nil { - // If the server is synced, resend the synced status update - if conn.HasCapability(groups.CwtchServerSyncedCapability) { - e.ignoreOnShutdown(e.serverSynced)(onion) - return - } + // If the server is synced, resend the synced status update + if conn.HasCapability(groups.CwtchServerSyncedCapability) { + e.ignoreOnShutdown(e.serverSynced)(onion) + return + } - // If the server is authed, resend the auth status update - if conn.HasCapability(applications.AuthCapability) { - // Resend the authed event... - e.ignoreOnShutdown(e.serverAuthed)(onion) - return - } - } - } + // If the server is authed, resend the auth status update + if conn.HasCapability(applications.AuthCapability) { + // Resend the authed event... + e.ignoreOnShutdown(e.serverAuthed)(onion) + return + } + } + } - // Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists) - if !connected && err != nil { - e.ignoreOnShutdown(e.serverDisconnected)(onion) - } + // Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists) + if !connected && err != nil { + e.ignoreOnShutdown(e.serverDisconnected)(onion) + } } func (e *engine) ignoreOnShutdown(f func(string)) func(string) { - return func(x string) { - if !e.shuttingDown.Load() { - f(x) - } - } + return func(x string) { + if !e.shuttingDown.Load() { + f(x) + } + } } func (e *engine) ignoreOnShutdown2(f func(string, string)) func(string, string) { - return func(x, y string) { - if !e.shuttingDown.Load() { - f(x, y) - } - } + return func(x, y string) { + if !e.shuttingDown.Load() { + f(x, y) + } + } } func (e *engine) peerAuthed(onion string) { - _, known := e.authorizations.Load(onion) - if !known { - e.authorizations.Store(onion, model.AuthUnknown) - } + _, known := e.authorizations.Load(onion) + if !known { + e.authorizations.Store(onion, model.AuthUnknown) + } - // FIXME: This call uses WAY too much memory, and was responsible for the vast majority - // of allocations in the UI - // This is because Bine ends up reading the entire response into memory and then passes that back - // into Connectivity which eventually extracts just what it needs. - // Ideally we would just read from the control stream directly into reusable buffers. + // FIXME: This call uses WAY too much memory, and was responsible for the vast majority + // of allocations in the UI + // This is because Bine ends up reading the entire response into memory and then passes that back + // into Connectivity which eventually extracts just what it needs. + // Ideally we would just read from the control stream directly into reusable buffers. - //details, err := e.acn.GetInfo(onion) - //if err == nil { - // if hops, exists := details["circuit"]; exists { - // e.eventManager.Publish(event.NewEvent(event.ACNInfo, map[event.Field]string{ - // event.Handle: onion, - // event.Key: "circuit", - // event.Data: hops, - // })) - // } - //} else { - // log.Errorf("error getting info for onion %v", err) - //} + //details, err := e.acn.GetInfo(onion) + //if err == nil { + // if hops, exists := details["circuit"]; exists { + // e.eventManager.Publish(event.NewEvent(event.ACNInfo, map[event.Field]string{ + // event.Handle: onion, + // event.Key: "circuit", + // event.Data: hops, + // })) + // } + //} else { + // log.Errorf("error getting info for onion %v", err) + //} - e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ - event.RemotePeer: string(onion), - event.ConnectionState: ConnectionStateName[AUTHENTICATED], - })) + e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ + event.RemotePeer: string(onion), + event.ConnectionState: ConnectionStateName[AUTHENTICATED], + })) } func (e *engine) peerConnecting(onion string) { - e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ - event.RemotePeer: string(onion), - event.ConnectionState: ConnectionStateName[CONNECTING], - })) + e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ + event.RemotePeer: string(onion), + event.ConnectionState: ConnectionStateName[CONNECTING], + })) } func (e *engine) serverConnecting(onion string) { - e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ - event.GroupServer: string(onion), - event.ConnectionState: ConnectionStateName[CONNECTING], - })) + e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ + event.GroupServer: string(onion), + event.ConnectionState: ConnectionStateName[CONNECTING], + })) } func (e *engine) serverAuthed(onion string) { - e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ - event.GroupServer: onion, - event.ConnectionState: ConnectionStateName[AUTHENTICATED], - })) + e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ + event.GroupServer: onion, + event.ConnectionState: ConnectionStateName[AUTHENTICATED], + })) } func (e *engine) serverSynced(onion string) { - e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ - event.GroupServer: onion, - event.ConnectionState: ConnectionStateName[SYNCED], - })) + e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ + event.GroupServer: onion, + event.ConnectionState: ConnectionStateName[SYNCED], + })) } func (e *engine) serverDisconnected(onion string) { - e.leaveServer(onion) + e.leaveServer(onion) - e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ - event.GroupServer: onion, - event.ConnectionState: ConnectionStateName[DISCONNECTED], - })) + e.eventManager.Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ + event.GroupServer: onion, + event.ConnectionState: ConnectionStateName[DISCONNECTED], + })) } func (e *engine) peerAck(onion string, eventID string) { - e.eventManager.Publish(event.NewEvent(event.PeerAcknowledgement, map[event.Field]string{ - event.EventID: eventID, - event.RemotePeer: onion, - })) + e.eventManager.Publish(event.NewEvent(event.PeerAcknowledgement, map[event.Field]string{ + event.EventID: eventID, + event.RemotePeer: onion, + })) } func (e *engine) peerDisconnected(onion string) { - // Clean up any existing get value requests... - e.getValRequests.Range(func(key, value interface{}) bool { - keyString := key.(string) - if strings.HasPrefix(keyString, onion) { - e.getValRequests.Delete(keyString) - } - return true - }) + // Clean up any existing get value requests... + e.getValRequests.Range(func(key, value interface{}) bool { + keyString := key.(string) + if strings.HasPrefix(keyString, onion) { + e.getValRequests.Delete(keyString) + } + return true + }) - // Purge circuit information... - e.eventManager.Publish(event.NewEvent(event.ACNInfo, map[event.Field]string{ - event.Handle: onion, - event.Key: "circuit", - event.Data: "", - })) + // Purge circuit information... + e.eventManager.Publish(event.NewEvent(event.ACNInfo, map[event.Field]string{ + event.Handle: onion, + event.Key: "circuit", + event.Data: "", + })) - e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ - event.RemotePeer: string(onion), - event.ConnectionState: ConnectionStateName[DISCONNECTED], - })) + e.eventManager.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{ + event.RemotePeer: string(onion), + event.ConnectionState: ConnectionStateName[DISCONNECTED], + })) } func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error { - log.Debugf("sendGetValMessage to peer %v %v.%v\n", onion, scope, path) - getVal := peerGetVal{Scope: scope, Path: path} - message, err := json.Marshal(getVal) - if err != nil { - return err - } + log.Debugf("sendGetValMessage to peer %v %v.%v\n", onion, scope, path) + getVal := peerGetVal{Scope: scope, Path: path} + message, err := json.Marshal(getVal) + if err != nil { + return err + } - key := onion + eventID - e.getValRequests.Store(key, message) - err = e.sendPeerMessage(onion, pmodel.PeerMessage{ID: eventID, Context: event.ContextGetVal, Data: message}) - if err != nil { - e.getValRequests.Delete(key) - } - return err + key := onion + eventID + e.getValRequests.Store(key, message) + err = e.sendPeerMessage(onion, pmodel.PeerMessage{ID: eventID, Context: event.ContextGetVal, Data: message}) + if err != nil { + e.getValRequests.Delete(key) + } + return err } func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error { - log.Debugf("sendRetValMessage to peer %v (%v) %v %v\n", onion, eventID, val, existsStr) - exists, _ := strconv.ParseBool(existsStr) - retVal := peerRetVal{Val: val, Exists: exists} - message, err := json.Marshal(retVal) - if err != nil { - return err - } - return e.sendPeerMessage(onion, pmodel.PeerMessage{ID: eventID, Context: event.ContextRetVal, Data: message}) + log.Debugf("sendRetValMessage to peer %v (%v) %v %v\n", onion, eventID, val, existsStr) + exists, _ := strconv.ParseBool(existsStr) + retVal := peerRetVal{Val: val, Exists: exists} + message, err := json.Marshal(retVal) + if err != nil { + return err + } + return e.sendPeerMessage(onion, pmodel.PeerMessage{ID: eventID, Context: event.ContextRetVal, Data: message}) } func (e *engine) deleteConnection(id string) { - conn, err := e.service.GetConnection(id) - if err == nil { - conn.Close() - } + conn, err := e.service.GetConnection(id) + if err == nil { + conn.Close() + } } // receiveGroupMessage is a callback function that processes GroupMessages from a given server func (e *engine) receiveGroupMessage(server string, gm *groups.EncryptedGroupMessage) { - // Publish Event so that a Profile Engine can deal with it. - // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine! - e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.GroupServer: server, event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)})) + // Publish Event so that a Profile Engine can deal with it. + // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine! + e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.GroupServer: server, event.Ciphertext: base64.StdEncoding.EncodeToString(gm.Ciphertext), event.Signature: base64.StdEncoding.EncodeToString(gm.Signature)})) } // sendMessageToGroup attempts to sent the given message to the given group id. func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, sig []byte, attempts int) { - // sending to groups can fail for a few reasons (slow server, not enough tokens, etc.) - // rather than trying to keep all that logic in method we simply back-off and try again - // but if we fail more than 5 times then we report back to the client so they can investigate other options. - // Note: This flow only applies to online-and-connected servers (this method will return faster if the server is not - // online) - if attempts >= 5 { - log.Errorf("failed to post a message to a group after %v attempts", attempts) - e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "could not make payment to server", event.Signature: base64.StdEncoding.EncodeToString(sig)})) - return - } + // sending to groups can fail for a few reasons (slow server, not enough tokens, etc.) + // rather than trying to keep all that logic in method we simply back-off and try again + // but if we fail more than 5 times then we report back to the client so they can investigate other options. + // Note: This flow only applies to online-and-connected servers (this method will return faster if the server is not + // online) + if attempts >= 5 { + log.Errorf("failed to post a message to a group after %v attempts", attempts) + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "could not make payment to server", event.Signature: base64.StdEncoding.EncodeToString(sig)})) + return + } - e.ephemeralServicesLock.Lock() - ephemeralService, ok := e.ephemeralServices[server] - e.ephemeralServicesLock.Unlock() + e.ephemeralServicesLock.Lock() + ephemeralService, ok := e.ephemeralServices[server] + e.ephemeralServicesLock.Unlock() - if ephemeralService == nil || !ok { - log.Debugf("could not send message to group: serve not found") - e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)})) - return - } + if ephemeralService == nil || !ok { + log.Debugf("could not send message to group: serve not found") + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-not-found", event.Signature: base64.StdEncoding.EncodeToString(sig)})) + return + } - conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) - if err == nil { - tokenApp, ok := (conn.App()).(*TokenBoardClient) - if ok { - if spent, numtokens := tokenApp.Post(ct, sig); !spent { - // we failed to post, probably because we ran out of tokens... so make a payment - go tokenApp.PurchaseTokens() - // backoff - time.Sleep(time.Second * 5) - // try again - log.Debugf("sending message to group error attempt: %v", attempts) - e.sendMessageToGroup(groupID, server, ct, sig, attempts+1) - } else { - if numtokens < 5 { - go tokenApp.PurchaseTokens() - } - } - // regardless we return.... - return - } - } - log.Debugf("could not send message to group") - e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-connection-not-valid", event.Signature: base64.StdEncoding.EncodeToString(sig)})) + conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) + if err == nil { + tokenApp, ok := (conn.App()).(*TokenBoardClient) + if ok { + if spent, numtokens := tokenApp.Post(ct, sig); !spent { + // we failed to post, probably because we ran out of tokens... so make a payment + go tokenApp.PurchaseTokens() + // backoff + time.Sleep(time.Second * 5) + // try again + log.Debugf("sending message to group error attempt: %v", attempts) + e.sendMessageToGroup(groupID, server, ct, sig, attempts+1) + } else { + if numtokens < 5 { + go tokenApp.PurchaseTokens() + } + } + // regardless we return.... + return + } + } + log.Debugf("could not send message to group") + e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-connection-not-valid", event.Signature: base64.StdEncoding.EncodeToString(sig)})) } // TODO this is becoming cluttered func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) { - log.Debugf("New message from peer: %v %v", hostname, context) + log.Debugf("New message from peer: %v %v", hostname, context) - if context == event.ContextAck { - e.peerAck(hostname, eventID) - } else if context == event.ContextRetVal { - req, ok := e.getValRequests.Load(hostname + eventID) - if ok { - reqStr := req.([]byte) - e.handlePeerRetVal(hostname, reqStr, message) - e.getValRequests.Delete(hostname + eventID) - } else { - log.Errorf("could not find val request for %v %s", hostname, eventID) - } - } else if context == event.ContextGetVal { - var getVal peerGetVal - err := json.Unmarshal(message, &getVal) - if err == nil { - ev := event.NewEventList(event.NewGetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path) - ev.EventID = eventID - e.eventManager.Publish(ev) - } - } else if context == event.ContextRequestManifest { - for _, message := range e.filesharingSubSystem.RequestManifestParts(eventID) { - if err := e.sendPeerMessage(hostname, message); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()})) - } - } - } else if context == event.ContextSendManifest { - if fileKey, manifest := e.filesharingSubSystem.ReceiveManifestPart(eventID, message); len(manifest) != 0 { - // We have a valid manifest - e.eventManager.Publish(event.NewEvent(event.ManifestReceived, map[event.Field]string{event.Handle: hostname, event.FileKey: fileKey, event.SerializedManifest: manifest})) - } - } else if context == event.ContextRequestFile { - chunks := e.filesharingSubSystem.ProcessChunkRequest(eventID, message) - go func() { - for _, message := range chunks { - if err := e.sendPeerMessage(hostname, message); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()})) - } - } - }() - } else if context == event.ContextSendFile { - fileKey, progress, totalChunks, _, title := e.filesharingSubSystem.ProcessChunk(eventID, message) - if len(fileKey) != 0 { - e.eventManager.Publish(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string{event.FileKey: fileKey, event.Progress: strconv.Itoa(int(progress)), event.FileSizeInChunks: strconv.Itoa(int(totalChunks)), event.NameSuggestion: title})) - if progress == totalChunks { - if tempFile, filePath, success := e.filesharingSubSystem.VerifyFile(fileKey); success { - log.Debugf("file verified and downloaded!") - e.eventManager.Publish(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey, event.FilePath: filePath, event.TempFile: tempFile})) - } else { - log.Debugf("file failed to verify!") - e.eventManager.Publish(event.NewEvent(event.FileVerificationFailed, map[event.Field]string{event.FileKey: fileKey})) - } - } - } - } else { - // Fall through handler for the default text conversation. - e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)})) + if context == event.ContextAck { + e.peerAck(hostname, eventID) + } else if context == event.ContextRetVal { + req, ok := e.getValRequests.Load(hostname + eventID) + if ok { + reqStr := req.([]byte) + e.handlePeerRetVal(hostname, reqStr, message) + e.getValRequests.Delete(hostname + eventID) + } else { + log.Errorf("could not find val request for %v %s", hostname, eventID) + } + } else if context == event.ContextGetVal { + var getVal peerGetVal + err := json.Unmarshal(message, &getVal) + if err == nil { + ev := event.NewEventList(event.NewGetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path) + ev.EventID = eventID + e.eventManager.Publish(ev) + } + } else if context == event.ContextRequestManifest { + for _, message := range e.filesharingSubSystem.RequestManifestParts(eventID) { + if err := e.sendPeerMessage(hostname, message); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()})) + } + } + } else if context == event.ContextSendManifest { + if fileKey, manifest := e.filesharingSubSystem.ReceiveManifestPart(eventID, message); len(manifest) != 0 { + // We have a valid manifest + e.eventManager.Publish(event.NewEvent(event.ManifestReceived, map[event.Field]string{event.Handle: hostname, event.FileKey: fileKey, event.SerializedManifest: manifest})) + } + } else if context == event.ContextRequestFile { + chunks := e.filesharingSubSystem.ProcessChunkRequest(eventID, message) + go func() { + for _, message := range chunks { + if err := e.sendPeerMessage(hostname, message); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()})) + } + } + }() + } else if context == event.ContextSendFile { + fileKey, progress, totalChunks, _, title := e.filesharingSubSystem.ProcessChunk(eventID, message) + if len(fileKey) != 0 { + e.eventManager.Publish(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string{event.FileKey: fileKey, event.Progress: strconv.Itoa(int(progress)), event.FileSizeInChunks: strconv.Itoa(int(totalChunks)), event.NameSuggestion: title})) + if progress == totalChunks { + if tempFile, filePath, success := e.filesharingSubSystem.VerifyFile(fileKey); success { + log.Debugf("file verified and downloaded!") + e.eventManager.Publish(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey, event.FilePath: filePath, event.TempFile: tempFile})) + } else { + log.Debugf("file failed to verify!") + e.eventManager.Publish(event.NewEvent(event.FileVerificationFailed, map[event.Field]string{event.FileKey: fileKey})) + } + } + } + } else { + // Fall through handler for the default text conversation. + e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)})) - // Send an explicit acknowledgement - // Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow - if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil { - e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()})) - } - } + // Send an explicit acknowledgement + // Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow + if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()})) + } + } } func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte) { - var getVal peerGetVal - var retVal peerRetVal + var getVal peerGetVal + var retVal peerRetVal - err := json.Unmarshal(getValData, &getVal) - if err != nil { - log.Errorf("Unmarshalling our own getVal request: %v\n", err) - return - } - err = json.Unmarshal(retValData, &retVal) - if err != nil { - log.Errorf("Unmarshalling peer response to getVal request") - return - } + err := json.Unmarshal(getValData, &getVal) + if err != nil { + log.Errorf("Unmarshalling our own getVal request: %v\n", err) + return + } + err = json.Unmarshal(retValData, &retVal) + if err != nil { + log.Errorf("Unmarshalling peer response to getVal request") + return + } - e.eventManager.Publish(event.NewEventList(event.NewRetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path, event.Exists, strconv.FormatBool(retVal.Exists), event.Data, retVal.Val)) + e.eventManager.Publish(event.NewEventList(event.NewRetValMessageFromPeer, event.RemotePeer, hostname, event.Scope, getVal.Scope, event.Path, getVal.Path, event.Exists, strconv.FormatBool(retVal.Exists), event.Data, retVal.Val)) } // leaveServer disconnects from a server and deletes the ephemeral service func (e *engine) leaveServer(server string) { - e.ephemeralServicesLock.Lock() - defer e.ephemeralServicesLock.Unlock() - ephemeralService, ok := e.ephemeralServices[server] - if ok { - ephemeralService.service.Shutdown() - delete(e.ephemeralServices, server) - } + e.ephemeralServicesLock.Lock() + defer e.ephemeralServicesLock.Unlock() + ephemeralService, ok := e.ephemeralServices[server] + if ok { + ephemeralService.service.Shutdown() + delete(e.ephemeralServices, server) + } } func (e *engine) sendPeerMessage(handle string, message pmodel.PeerMessage) error { - conn, err := e.service.WaitForCapabilityOrClose(handle, cwtchCapability) - if err == nil { - peerApp, ok := (conn.App()).(*PeerApp) - if ok { - return peerApp.SendMessage(message) - } - log.Debugf("could not derive peer app: %v", err) - return fmt.Errorf("could not find peer app to send message to: %v", handle) - } - log.Debugf("could not send peer message: %v", err) - return err + conn, err := e.service.WaitForCapabilityOrClose(handle, cwtchCapability) + if err == nil { + peerApp, ok := (conn.App()).(*PeerApp) + if ok { + return peerApp.SendMessage(message) + } + log.Debugf("could not derive peer app: %v", err) + return fmt.Errorf("could not find peer app to send message to: %v", handle) + } + log.Debugf("could not send peer message: %v", err) + return err }