From 8250c04c52318d2c0eed96de66eca80bd2ae758d Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 17 Dec 2021 14:42:55 -0500 Subject: [PATCH 1/4] refactor out appCore and add migration start and done notification events --- app/app.go | 113 ++++++++++++++++++++++++------------------------ event/common.go | 9 ++-- 2 files changed, 59 insertions(+), 63 deletions(-) diff --git a/app/app.go b/app/app.go index 7c487fc..78c7136 100644 --- a/app/app.go +++ b/app/app.go @@ -9,7 +9,6 @@ import ( "cwtch.im/cwtch/peer" "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/storage" - "fmt" "git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/log" "io/ioutil" @@ -19,15 +18,10 @@ import ( "sync" ) -type applicationCore struct { - eventBuses map[string]event.Manager - - directory string - coremutex sync.Mutex -} - type application struct { - applicationCore + eventBuses map[string]event.Manager + directory string + coremutex sync.Mutex appletPeers appletACN appletPlugins @@ -60,30 +54,18 @@ type Application interface { // LoadProfileFn is the function signature for a function in an app that loads a profile type LoadProfileFn func(profile peer.CwtchPeer) -func newAppCore(appDirectory string) *applicationCore { - appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory} - os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700) - return appCore -} - // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string) Application { log.Debugf("NewApp(%v)\n", appDirectory) - app := &application{engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()} + os.MkdirAll(path.Join(appDirectory, "profiles"), 0700) + + app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()} app.appletPeers.init() app.appletACN.init(acn, app.getACNStatusHandler()) return app } -func (ac *applicationCore) DeletePeer(onion string) { - ac.coremutex.Lock() - defer ac.coremutex.Unlock() - - ac.eventBuses[onion].Shutdown() - delete(ac.eventBuses, onion) -} - func (app *application) CreateTaggedPeer(name string, password string, tag string) { app.appmutex.Lock() defer app.appmutex.Unlock() @@ -127,7 +109,11 @@ func (app *application) DeletePeer(onion string, password string) { app.peers[onion].Delete() delete(app.peers, onion) app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) - app.applicationCore.DeletePeer(onion) + + app.coremutex.Lock() + defer app.coremutex.Unlock() + app.eventBuses[onion].Shutdown() + delete(app.eventBuses, onion) log.Debugf("Delete peer for %v Done\n", onion) app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) @@ -145,63 +131,76 @@ func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) { } // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them -func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error { - files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles")) +func (app *application) LoadProfiles(password string) { + count := 0 + migrating := false + + files, err := ioutil.ReadDir(path.Join(app.directory, "profiles")) if err != nil { - return fmt.Errorf("error: cannot read profiles directory: %v", err) + log.Errorf("error: cannot read profiles directory: %v", err) + return } for _, file := range files { // Attempt to load an encrypted database - profileDirectory := path.Join(ac.directory, "profiles", file.Name()) + profileDirectory := path.Join(app.directory, "profiles", file.Name()) profile, err := peer.FromEncryptedDatabase(profileDirectory, password) + loaded := false if err == nil { // return the load the profile... log.Infof("loading profile from new-type storage database...") - loadProfileFn(profile) + loaded = app.installProfile(profile) } else { // On failure attempt to load a legacy profile profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) if err != nil { continue } log.Infof("found legacy profile. importing to new database structure...") - legacyProfile := profileStore.GetProfileCopy(timeline) + legacyProfile := profileStore.GetProfileCopy(true) + if !migrating { + migrating = true + app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion)) + } cps, err := peer.CreateEncryptedStore(profileDirectory, password) if err != nil { log.Errorf("error creating encrypted store: %v", err) } profile := peer.ImportLegacyProfile(legacyProfile, cps) - loadProfileFn(profile) + loaded = app.installProfile(profile) + } + if loaded { + count++ } } - return nil -} - -// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them -func (app *application) LoadProfiles(password string) { - count := 0 - app.applicationCore.LoadProfiles(password, true, func(profile peer.CwtchPeer) { - app.appmutex.Lock() - // Only attempt to finalize the profile if we don't have one loaded... - if app.peers[profile.GetOnion()] == nil { - eventBus := event.NewEventManager() - app.eventBuses[profile.GetOnion()] = eventBus - profile.Init(app.eventBuses[profile.GetOnion()]) - app.peers[profile.GetOnion()] = profile - app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) - app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) - count++ - } else { - // Otherwise shutdown the connections - profile.Shutdown() - } - app.appmutex.Unlock() - }) if count == 0 { message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) app.appBus.Publish(message) } + if migrating { + app.appBus.Publish(event.NewEventList(event.DoneStorageMigration)) + } +} + +// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true +func (app *application) installProfile(profile peer.CwtchPeer) bool { + app.appmutex.Lock() + defer app.appmutex.Unlock() + + // Only attempt to finalize the profile if we don't have one loaded... + if app.peers[profile.GetOnion()] == nil { + eventBus := event.NewEventManager() + app.eventBuses[profile.GetOnion()] = eventBus + profile.Init(app.eventBuses[profile.GetOnion()]) + app.peers[profile.GetOnion()] = profile + app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) + app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) + return true + } else { + // Otherwise shutdown the connections + profile.Shutdown() + return false + } } // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific @@ -210,8 +209,8 @@ func (app *application) GetPrimaryBus() event.Manager { } // GetEventBus returns a cwtchPeer's event bus -func (ac *applicationCore) GetEventBus(onion string) event.Manager { - if manager, ok := ac.eventBuses[onion]; ok { +func (app *application) GetEventBus(onion string) event.Manager { + if manager, ok := app.eventBuses[onion]; ok { return manager } return nil diff --git a/event/common.go b/event/common.go index 1fe4ad5..28eed5a 100644 --- a/event/common.go +++ b/event/common.go @@ -202,6 +202,9 @@ const ( // Profile Attribute Event UpdatedProfileAttribute = Type("UpdatedProfileAttribute") + + StartingStorageMiragtion = Type("StartingStorageMigration") + DoneStorageMigration = Type("DoneStorageMigration") ) // Field defines common event attributes @@ -284,12 +287,6 @@ const ( PasswordMatchError = "Password did not match" ) -// Values to be suplied in event.NewPeer for Status -const ( - StorageRunning = "running" - StorageNew = "new" -) - // Defining Protocol Contexts const ( ContextAck = "im.cwtch.acknowledgement" -- 2.25.1 From 1d220381ebd7654316ef1c235891ec45b486f8bf Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 17 Dec 2021 22:55:25 -0500 Subject: [PATCH 2/4] fix govet --- app/app.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/app/app.go b/app/app.go index 78c7136..6c68e71 100644 --- a/app/app.go +++ b/app/app.go @@ -196,11 +196,10 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool { app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) return true - } else { - // Otherwise shutdown the connections - profile.Shutdown() - return false } + // Otherwise shutdown the connections + profile.Shutdown() + return false } // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific -- 2.25.1 From ff012313be4843a80ac90fe27ae7b53ed0a051d9 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 17 Dec 2021 20:27:56 -0500 Subject: [PATCH 3/4] engine: add more granular locking around ephemeral token services --- protocol/connections/engine.go | 37 ++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 8e66e68..0fdd219 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -24,6 +24,11 @@ import ( "golang.org/x/crypto/ed25519" ) +type connectionLockedService struct { + service tapir.Service + connectingLock sync.Mutex +} + type engine struct { queue event.Queue @@ -46,7 +51,7 @@ type engine struct { getValRequests sync.Map // [string]string eventID:Data // Nextgen Tapir Service - ephemeralServices map[string]tapir.Service //sync.Map // string(onion) => tapir.Service + ephemeralServices map[string]*connectionLockedService //sync.Map // string(onion) => tapir.Service ephemeralServicesLock sync.Mutex // Required for listen(), inaccessible from identity @@ -73,7 +78,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine := new(engine) engine.identity = identity engine.privateKey = privateKey - engine.ephemeralServices = make(map[string]tapir.Service) + engine.ephemeralServices = make(map[string]*connectionLockedService) engine.queue = event.NewQueue() go engine.eventHandler() @@ -283,7 +288,7 @@ func (e *engine) Shutdown() { defer e.ephemeralServicesLock.Unlock() for _, connection := range e.ephemeralServices { log.Infof("shutting down ephemeral service") - connection.Shutdown() + connection.service.Shutdown() } e.queue.Shutdown() } @@ -319,14 +324,15 @@ func (e *engine) peerWithOnion(onion string) { // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { e.ephemeralServicesLock.Lock() - defer e.ephemeralServicesLock.Unlock() - connection, exists := e.ephemeralServices[onion] - if exists { - if conn, err := connection.GetConnection(onion); err == nil { + connectionService, exists := e.ephemeralServices[onion] + + if exists && connectionService.service != nil { + if conn, err := connectionService.service.GetConnection(onion); err == nil { // We are already peered and synced so return... - // This will only not-trigger it lastKnownSignature has been wiped, which only happens when ResyncServer is called + // This will only not-trigger if lastKnownSignature has been wiped, which only happens when ResyncServer is called // in CwtchPeer. if !conn.IsClosed() && len(lastKnownSignature) != 0 { + e.ephemeralServicesLock.Unlock() return } // Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)... @@ -335,6 +341,13 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke // Otherwise...let's reconnect } + connectionService = &connectionLockedService{} + e.ephemeralServices[onion] = connectionService + + connectionService.connectingLock.Lock() + defer connectionService.connectingLock.Unlock() + e.ephemeralServicesLock.Unlock() + log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion) e.ignoreOnShutdown(e.serverConnecting)(onion) // Create a new ephemeral service for this connection @@ -345,7 +358,9 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke Y := ristretto255.NewElement() Y.UnmarshalText([]byte(tokenServerY)) connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected))) - e.ephemeralServices[onion] = ephemeralService + e.ephemeralServicesLock.Lock() + e.ephemeralServices[onion].service = ephemeralService + e.ephemeralServicesLock.Unlock() // If we are already connected...check if we are authed and issue an auth event // (This allows the ui to be stateless) if connected && err != nil { @@ -508,7 +523,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si return } - conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) + conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) if ok { @@ -624,7 +639,7 @@ func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte func (e *engine) leaveServer(server string) { ephemeralService, ok := e.ephemeralServices[server] if ok { - ephemeralService.Shutdown() + ephemeralService.service.Shutdown() delete(e.ephemeralServices, server) } } -- 2.25.1 From 13811def941ab165dd58f8ce3aa97aa5b0576a74 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Fri, 17 Dec 2021 22:52:53 -0500 Subject: [PATCH 4/4] peerwithTokenService no longer uses Leave so as to preserve lock --- protocol/connections/engine.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 0fdd219..067c9f2 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -157,9 +157,7 @@ func (e *engine) eventHandler() { } go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) case event.LeaveServer: - e.ephemeralServicesLock.Lock() e.leaveServer(ev.Data[event.GroupServer]) - e.ephemeralServicesLock.Unlock() case event.DeleteContact: onion := ev.Data[event.RemotePeer] // We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on. @@ -336,7 +334,7 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke return } // Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)... - e.leaveServer(onion) + connectionService.service.Shutdown() } // Otherwise...let's reconnect } @@ -634,9 +632,9 @@ func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte } // leaveServer disconnects from a server and deletes the ephemeral service -// REQUIREMENTS: must be called inside a block with e.ephemeralServicesLock.Lock() -// can't do it iself because is called from inside peerWithTokenServer which holds the lock func (e *engine) leaveServer(server string) { + e.ephemeralServicesLock.Lock() + defer e.ephemeralServicesLock.Unlock() ephemeralService, ok := e.ephemeralServices[server] if ok { ephemeralService.service.Shutdown() -- 2.25.1