Merge pull request 'engineRefine' (#416) from engineRefine into master
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/tag Build is passing Details

Reviewed-on: #416
This commit is contained in:
Sarah Jamie Lewis 2021-12-18 21:23:29 +00:00
commit fc73ee46fc
3 changed files with 87 additions and 79 deletions

View File

@ -9,7 +9,6 @@ import (
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/storage"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log"
"io/ioutil"
@ -19,15 +18,10 @@ import (
"sync"
)
type applicationCore struct {
eventBuses map[string]event.Manager
directory string
coremutex sync.Mutex
}
type application struct {
applicationCore
eventBuses map[string]event.Manager
directory string
coremutex sync.Mutex
appletPeers
appletACN
appletPlugins
@ -60,30 +54,18 @@ type Application interface {
// LoadProfileFn is the function signature for a function in an app that loads a profile
type LoadProfileFn func(profile peer.CwtchPeer)
func newAppCore(appDirectory string) *applicationCore {
appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory}
os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700)
return appCore
}
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string) Application {
log.Debugf("NewApp(%v)\n", appDirectory)
app := &application{engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()}
os.MkdirAll(path.Join(appDirectory, "profiles"), 0700)
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()}
app.appletPeers.init()
app.appletACN.init(acn, app.getACNStatusHandler())
return app
}
func (ac *applicationCore) DeletePeer(onion string) {
ac.coremutex.Lock()
defer ac.coremutex.Unlock()
ac.eventBuses[onion].Shutdown()
delete(ac.eventBuses, onion)
}
func (app *application) CreateTaggedPeer(name string, password string, tag string) {
app.appmutex.Lock()
defer app.appmutex.Unlock()
@ -127,7 +109,11 @@ func (app *application) DeletePeer(onion string, password string) {
app.peers[onion].Delete()
delete(app.peers, onion)
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
app.applicationCore.DeletePeer(onion)
app.coremutex.Lock()
defer app.coremutex.Unlock()
app.eventBuses[onion].Shutdown()
delete(app.eventBuses, onion)
log.Debugf("Delete peer for %v Done\n", onion)
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
@ -145,63 +131,75 @@ func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
}
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error {
files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles"))
func (app *application) LoadProfiles(password string) {
count := 0
migrating := false
files, err := ioutil.ReadDir(path.Join(app.directory, "profiles"))
if err != nil {
return fmt.Errorf("error: cannot read profiles directory: %v", err)
log.Errorf("error: cannot read profiles directory: %v", err)
return
}
for _, file := range files {
// Attempt to load an encrypted database
profileDirectory := path.Join(ac.directory, "profiles", file.Name())
profileDirectory := path.Join(app.directory, "profiles", file.Name())
profile, err := peer.FromEncryptedDatabase(profileDirectory, password)
loaded := false
if err == nil {
// return the load the profile...
log.Infof("loading profile from new-type storage database...")
loadProfileFn(profile)
loaded = app.installProfile(profile)
} else { // On failure attempt to load a legacy profile
profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password)
if err != nil {
continue
}
log.Infof("found legacy profile. importing to new database structure...")
legacyProfile := profileStore.GetProfileCopy(timeline)
legacyProfile := profileStore.GetProfileCopy(true)
if !migrating {
migrating = true
app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion))
}
cps, err := peer.CreateEncryptedStore(profileDirectory, password)
if err != nil {
log.Errorf("error creating encrypted store: %v", err)
}
profile := peer.ImportLegacyProfile(legacyProfile, cps)
loadProfileFn(profile)
loaded = app.installProfile(profile)
}
if loaded {
count++
}
}
return nil
}
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (app *application) LoadProfiles(password string) {
count := 0
app.applicationCore.LoadProfiles(password, true, func(profile peer.CwtchPeer) {
app.appmutex.Lock()
// Only attempt to finalize the profile if we don't have one loaded...
if app.peers[profile.GetOnion()] == nil {
eventBus := event.NewEventManager()
app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
count++
} else {
// Otherwise shutdown the connections
profile.Shutdown()
}
app.appmutex.Unlock()
})
if count == 0 {
message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
app.appBus.Publish(message)
}
if migrating {
app.appBus.Publish(event.NewEventList(event.DoneStorageMigration))
}
}
// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true
func (app *application) installProfile(profile peer.CwtchPeer) bool {
app.appmutex.Lock()
defer app.appmutex.Unlock()
// Only attempt to finalize the profile if we don't have one loaded...
if app.peers[profile.GetOnion()] == nil {
eventBus := event.NewEventManager()
app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
return true
}
// Otherwise shutdown the connections
profile.Shutdown()
return false
}
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
@ -210,8 +208,8 @@ func (app *application) GetPrimaryBus() event.Manager {
}
// GetEventBus returns a cwtchPeer's event bus
func (ac *applicationCore) GetEventBus(onion string) event.Manager {
if manager, ok := ac.eventBuses[onion]; ok {
func (app *application) GetEventBus(onion string) event.Manager {
if manager, ok := app.eventBuses[onion]; ok {
return manager
}
return nil

View File

@ -202,6 +202,9 @@ const (
// Profile Attribute Event
UpdatedProfileAttribute = Type("UpdatedProfileAttribute")
StartingStorageMiragtion = Type("StartingStorageMigration")
DoneStorageMigration = Type("DoneStorageMigration")
)
// Field defines common event attributes
@ -284,12 +287,6 @@ const (
PasswordMatchError = "Password did not match"
)
// Values to be suplied in event.NewPeer for Status
const (
StorageRunning = "running"
StorageNew = "new"
)
// Defining Protocol Contexts
const (
ContextAck = "im.cwtch.acknowledgement"

View File

@ -24,6 +24,11 @@ import (
"golang.org/x/crypto/ed25519"
)
type connectionLockedService struct {
service tapir.Service
connectingLock sync.Mutex
}
type engine struct {
queue event.Queue
@ -46,7 +51,7 @@ type engine struct {
getValRequests sync.Map // [string]string eventID:Data
// Nextgen Tapir Service
ephemeralServices map[string]tapir.Service //sync.Map // string(onion) => tapir.Service
ephemeralServices map[string]*connectionLockedService //sync.Map // string(onion) => tapir.Service
ephemeralServicesLock sync.Mutex
// Required for listen(), inaccessible from identity
@ -73,7 +78,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine := new(engine)
engine.identity = identity
engine.privateKey = privateKey
engine.ephemeralServices = make(map[string]tapir.Service)
engine.ephemeralServices = make(map[string]*connectionLockedService)
engine.queue = event.NewQueue()
go engine.eventHandler()
@ -152,9 +157,7 @@ func (e *engine) eventHandler() {
}
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
case event.LeaveServer:
e.ephemeralServicesLock.Lock()
e.leaveServer(ev.Data[event.GroupServer])
e.ephemeralServicesLock.Unlock()
case event.DeleteContact:
onion := ev.Data[event.RemotePeer]
// We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
@ -283,7 +286,7 @@ func (e *engine) Shutdown() {
defer e.ephemeralServicesLock.Unlock()
for _, connection := range e.ephemeralServices {
log.Infof("shutting down ephemeral service")
connection.Shutdown()
connection.service.Shutdown()
}
e.queue.Shutdown()
}
@ -319,22 +322,30 @@ func (e *engine) peerWithOnion(onion string) {
// needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock()
connection, exists := e.ephemeralServices[onion]
if exists {
if conn, err := connection.GetConnection(onion); err == nil {
connectionService, exists := e.ephemeralServices[onion]
if exists && connectionService.service != nil {
if conn, err := connectionService.service.GetConnection(onion); err == nil {
// We are already peered and synced so return...
// This will only not-trigger it lastKnownSignature has been wiped, which only happens when ResyncServer is called
// This will only not-trigger if lastKnownSignature has been wiped, which only happens when ResyncServer is called
// in CwtchPeer.
if !conn.IsClosed() && len(lastKnownSignature) != 0 {
e.ephemeralServicesLock.Unlock()
return
}
// Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)...
e.leaveServer(onion)
connectionService.service.Shutdown()
}
// Otherwise...let's reconnect
}
connectionService = &connectionLockedService{}
e.ephemeralServices[onion] = connectionService
connectionService.connectingLock.Lock()
defer connectionService.connectingLock.Unlock()
e.ephemeralServicesLock.Unlock()
log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion)
e.ignoreOnShutdown(e.serverConnecting)(onion)
// Create a new ephemeral service for this connection
@ -345,7 +356,9 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
Y := ristretto255.NewElement()
Y.UnmarshalText([]byte(tokenServerY))
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected)))
e.ephemeralServices[onion] = ephemeralService
e.ephemeralServicesLock.Lock()
e.ephemeralServices[onion].service = ephemeralService
e.ephemeralServicesLock.Unlock()
// If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless)
if connected && err != nil {
@ -508,7 +521,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
return
}
conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient)
if ok {
@ -619,12 +632,12 @@ func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte
}
// leaveServer disconnects from a server and deletes the ephemeral service
// REQUIREMENTS: must be called inside a block with e.ephemeralServicesLock.Lock()
// can't do it iself because is called from inside peerWithTokenServer which holds the lock
func (e *engine) leaveServer(server string) {
e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock()
ephemeralService, ok := e.ephemeralServices[server]
if ok {
ephemeralService.Shutdown()
ephemeralService.service.Shutdown()
delete(e.ephemeralServices, server)
}
}