engineRefine #416

Merged
sarah merged 4 commits from engineRefine into master 2021-12-18 21:23:29 +00:00
3 changed files with 87 additions and 79 deletions

View File

@ -9,7 +9,6 @@ import (
"cwtch.im/cwtch/peer" "cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/storage" "cwtch.im/cwtch/storage"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"io/ioutil" "io/ioutil"
@ -19,15 +18,10 @@ import (
"sync" "sync"
) )
type applicationCore struct {
eventBuses map[string]event.Manager
directory string
coremutex sync.Mutex
}
type application struct { type application struct {
applicationCore eventBuses map[string]event.Manager
directory string
coremutex sync.Mutex
appletPeers appletPeers
appletACN appletACN
appletPlugins appletPlugins
@ -60,30 +54,18 @@ type Application interface {
// LoadProfileFn is the function signature for a function in an app that loads a profile // LoadProfileFn is the function signature for a function in an app that loads a profile
type LoadProfileFn func(profile peer.CwtchPeer) type LoadProfileFn func(profile peer.CwtchPeer)
func newAppCore(appDirectory string) *applicationCore {
appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory}
os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700)
return appCore
}
// NewApp creates a new app with some environment awareness and initializes a Tor Manager // NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string) Application { func NewApp(acn connectivity.ACN, appDirectory string) Application {
log.Debugf("NewApp(%v)\n", appDirectory) log.Debugf("NewApp(%v)\n", appDirectory)
app := &application{engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()} os.MkdirAll(path.Join(appDirectory, "profiles"), 0700)
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()}
app.appletPeers.init() app.appletPeers.init()
app.appletACN.init(acn, app.getACNStatusHandler()) app.appletACN.init(acn, app.getACNStatusHandler())
return app return app
} }
func (ac *applicationCore) DeletePeer(onion string) {
ac.coremutex.Lock()
defer ac.coremutex.Unlock()
ac.eventBuses[onion].Shutdown()
delete(ac.eventBuses, onion)
}
func (app *application) CreateTaggedPeer(name string, password string, tag string) { func (app *application) CreateTaggedPeer(name string, password string, tag string) {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
@ -127,7 +109,11 @@ func (app *application) DeletePeer(onion string, password string) {
app.peers[onion].Delete() app.peers[onion].Delete()
delete(app.peers, onion) delete(app.peers, onion)
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion)) app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
app.applicationCore.DeletePeer(onion)
app.coremutex.Lock()
defer app.coremutex.Unlock()
app.eventBuses[onion].Shutdown()
delete(app.eventBuses, onion)
log.Debugf("Delete peer for %v Done\n", onion) log.Debugf("Delete peer for %v Done\n", onion)
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion)) app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
@ -145,63 +131,75 @@ func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
} }
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error { func (app *application) LoadProfiles(password string) {
files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles")) count := 0
migrating := false
files, err := ioutil.ReadDir(path.Join(app.directory, "profiles"))
if err != nil { if err != nil {
return fmt.Errorf("error: cannot read profiles directory: %v", err) log.Errorf("error: cannot read profiles directory: %v", err)
return
} }
for _, file := range files { for _, file := range files {
// Attempt to load an encrypted database // Attempt to load an encrypted database
profileDirectory := path.Join(ac.directory, "profiles", file.Name()) profileDirectory := path.Join(app.directory, "profiles", file.Name())
profile, err := peer.FromEncryptedDatabase(profileDirectory, password) profile, err := peer.FromEncryptedDatabase(profileDirectory, password)
loaded := false
if err == nil { if err == nil {
// return the load the profile... // return the load the profile...
log.Infof("loading profile from new-type storage database...") log.Infof("loading profile from new-type storage database...")
loadProfileFn(profile) loaded = app.installProfile(profile)
} else { // On failure attempt to load a legacy profile } else { // On failure attempt to load a legacy profile
profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password) profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password)
if err != nil { if err != nil {
continue continue
} }
log.Infof("found legacy profile. importing to new database structure...") log.Infof("found legacy profile. importing to new database structure...")
legacyProfile := profileStore.GetProfileCopy(timeline) legacyProfile := profileStore.GetProfileCopy(true)
if !migrating {
migrating = true
app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion))
}
cps, err := peer.CreateEncryptedStore(profileDirectory, password) cps, err := peer.CreateEncryptedStore(profileDirectory, password)
if err != nil { if err != nil {
log.Errorf("error creating encrypted store: %v", err) log.Errorf("error creating encrypted store: %v", err)
} }
profile := peer.ImportLegacyProfile(legacyProfile, cps) profile := peer.ImportLegacyProfile(legacyProfile, cps)
loadProfileFn(profile) loaded = app.installProfile(profile)
}
if loaded {
count++
} }
} }
return nil
}
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (app *application) LoadProfiles(password string) {
count := 0
app.applicationCore.LoadProfiles(password, true, func(profile peer.CwtchPeer) {
app.appmutex.Lock()
// Only attempt to finalize the profile if we don't have one loaded...
if app.peers[profile.GetOnion()] == nil {
eventBus := event.NewEventManager()
app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
count++
} else {
// Otherwise shutdown the connections
profile.Shutdown()
}
app.appmutex.Unlock()
})
if count == 0 { if count == 0 {
message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0) message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
app.appBus.Publish(message) app.appBus.Publish(message)
} }
if migrating {
app.appBus.Publish(event.NewEventList(event.DoneStorageMigration))
}
}
// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true
func (app *application) installProfile(profile peer.CwtchPeer) bool {
app.appmutex.Lock()
defer app.appmutex.Unlock()
// Only attempt to finalize the profile if we don't have one loaded...
if app.peers[profile.GetOnion()] == nil {
eventBus := event.NewEventManager()
app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
return true
}
// Otherwise shutdown the connections
profile.Shutdown()
return false
} }
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
@ -210,8 +208,8 @@ func (app *application) GetPrimaryBus() event.Manager {
} }
// GetEventBus returns a cwtchPeer's event bus // GetEventBus returns a cwtchPeer's event bus
func (ac *applicationCore) GetEventBus(onion string) event.Manager { func (app *application) GetEventBus(onion string) event.Manager {
if manager, ok := ac.eventBuses[onion]; ok { if manager, ok := app.eventBuses[onion]; ok {
return manager return manager
} }
return nil return nil

View File

@ -202,6 +202,9 @@ const (
// Profile Attribute Event // Profile Attribute Event
UpdatedProfileAttribute = Type("UpdatedProfileAttribute") UpdatedProfileAttribute = Type("UpdatedProfileAttribute")
StartingStorageMiragtion = Type("StartingStorageMigration")
DoneStorageMigration = Type("DoneStorageMigration")
) )
// Field defines common event attributes // Field defines common event attributes
@ -284,12 +287,6 @@ const (
PasswordMatchError = "Password did not match" PasswordMatchError = "Password did not match"
) )
// Values to be suplied in event.NewPeer for Status
const (
StorageRunning = "running"
StorageNew = "new"
)
// Defining Protocol Contexts // Defining Protocol Contexts
const ( const (
ContextAck = "im.cwtch.acknowledgement" ContextAck = "im.cwtch.acknowledgement"

View File

@ -24,6 +24,11 @@ import (
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
) )
type connectionLockedService struct {
service tapir.Service
connectingLock sync.Mutex
}
type engine struct { type engine struct {
queue event.Queue queue event.Queue
@ -46,7 +51,7 @@ type engine struct {
getValRequests sync.Map // [string]string eventID:Data getValRequests sync.Map // [string]string eventID:Data
// Nextgen Tapir Service // Nextgen Tapir Service
ephemeralServices map[string]tapir.Service //sync.Map // string(onion) => tapir.Service ephemeralServices map[string]*connectionLockedService //sync.Map // string(onion) => tapir.Service
ephemeralServicesLock sync.Mutex ephemeralServicesLock sync.Mutex
// Required for listen(), inaccessible from identity // Required for listen(), inaccessible from identity
@ -73,7 +78,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine := new(engine) engine := new(engine)
engine.identity = identity engine.identity = identity
engine.privateKey = privateKey engine.privateKey = privateKey
engine.ephemeralServices = make(map[string]tapir.Service) engine.ephemeralServices = make(map[string]*connectionLockedService)
engine.queue = event.NewQueue() engine.queue = event.NewQueue()
go engine.eventHandler() go engine.eventHandler()
@ -152,9 +157,7 @@ func (e *engine) eventHandler() {
} }
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature) go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
case event.LeaveServer: case event.LeaveServer:
e.ephemeralServicesLock.Lock()
e.leaveServer(ev.Data[event.GroupServer]) e.leaveServer(ev.Data[event.GroupServer])
e.ephemeralServicesLock.Unlock()
case event.DeleteContact: case event.DeleteContact:
onion := ev.Data[event.RemotePeer] onion := ev.Data[event.RemotePeer]
// We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on. // We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
@ -283,7 +286,7 @@ func (e *engine) Shutdown() {
defer e.ephemeralServicesLock.Unlock() defer e.ephemeralServicesLock.Unlock()
for _, connection := range e.ephemeralServices { for _, connection := range e.ephemeralServices {
log.Infof("shutting down ephemeral service") log.Infof("shutting down ephemeral service")
connection.Shutdown() connection.service.Shutdown()
} }
e.queue.Shutdown() e.queue.Shutdown()
} }
@ -319,22 +322,30 @@ func (e *engine) peerWithOnion(onion string) {
// needs to be run in a goroutine as will block on Open. // needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) { func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
e.ephemeralServicesLock.Lock() e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock() connectionService, exists := e.ephemeralServices[onion]
connection, exists := e.ephemeralServices[onion]
if exists { if exists && connectionService.service != nil {
if conn, err := connection.GetConnection(onion); err == nil { if conn, err := connectionService.service.GetConnection(onion); err == nil {
// We are already peered and synced so return... // We are already peered and synced so return...
// This will only not-trigger it lastKnownSignature has been wiped, which only happens when ResyncServer is called // This will only not-trigger if lastKnownSignature has been wiped, which only happens when ResyncServer is called
// in CwtchPeer. // in CwtchPeer.
if !conn.IsClosed() && len(lastKnownSignature) != 0 { if !conn.IsClosed() && len(lastKnownSignature) != 0 {
e.ephemeralServicesLock.Unlock()
return return
} }
// Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)... // Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)...
e.leaveServer(onion) connectionService.service.Shutdown()
} }
// Otherwise...let's reconnect // Otherwise...let's reconnect
} }
connectionService = &connectionLockedService{}
e.ephemeralServices[onion] = connectionService
connectionService.connectingLock.Lock()
defer connectionService.connectingLock.Unlock()
e.ephemeralServicesLock.Unlock()
log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion) log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion)
e.ignoreOnShutdown(e.serverConnecting)(onion) e.ignoreOnShutdown(e.serverConnecting)(onion)
// Create a new ephemeral service for this connection // Create a new ephemeral service for this connection
@ -345,7 +356,9 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
Y := ristretto255.NewElement() Y := ristretto255.NewElement()
Y.UnmarshalText([]byte(tokenServerY)) Y.UnmarshalText([]byte(tokenServerY))
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected))) connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected)))
e.ephemeralServices[onion] = ephemeralService e.ephemeralServicesLock.Lock()
e.ephemeralServices[onion].service = ephemeralService
e.ephemeralServicesLock.Unlock()
// If we are already connected...check if we are authed and issue an auth event // If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless) // (This allows the ui to be stateless)
if connected && err != nil { if connected && err != nil {
@ -508,7 +521,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
return return
} }
conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability) conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
if err == nil { if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient) tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok { if ok {
@ -619,12 +632,12 @@ func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte
} }
// leaveServer disconnects from a server and deletes the ephemeral service // leaveServer disconnects from a server and deletes the ephemeral service
// REQUIREMENTS: must be called inside a block with e.ephemeralServicesLock.Lock()
// can't do it iself because is called from inside peerWithTokenServer which holds the lock
func (e *engine) leaveServer(server string) { func (e *engine) leaveServer(server string) {
e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock()
ephemeralService, ok := e.ephemeralServices[server] ephemeralService, ok := e.ephemeralServices[server]
if ok { if ok {
ephemeralService.Shutdown() ephemeralService.service.Shutdown()
delete(e.ephemeralServices, server) delete(e.ephemeralServices, server)
} }
} }