engineRefine #416
90
app/app.go
90
app/app.go
|
@ -9,7 +9,6 @@ import (
|
||||||
"cwtch.im/cwtch/peer"
|
"cwtch.im/cwtch/peer"
|
||||||
"cwtch.im/cwtch/protocol/connections"
|
"cwtch.im/cwtch/protocol/connections"
|
||||||
"cwtch.im/cwtch/storage"
|
"cwtch.im/cwtch/storage"
|
||||||
"fmt"
|
|
||||||
"git.openprivacy.ca/openprivacy/connectivity"
|
"git.openprivacy.ca/openprivacy/connectivity"
|
||||||
"git.openprivacy.ca/openprivacy/log"
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
@ -19,15 +18,10 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type applicationCore struct {
|
type application struct {
|
||||||
eventBuses map[string]event.Manager
|
eventBuses map[string]event.Manager
|
||||||
|
|
||||||
directory string
|
directory string
|
||||||
coremutex sync.Mutex
|
coremutex sync.Mutex
|
||||||
}
|
|
||||||
|
|
||||||
type application struct {
|
|
||||||
applicationCore
|
|
||||||
appletPeers
|
appletPeers
|
||||||
appletACN
|
appletACN
|
||||||
appletPlugins
|
appletPlugins
|
||||||
|
@ -60,30 +54,18 @@ type Application interface {
|
||||||
// LoadProfileFn is the function signature for a function in an app that loads a profile
|
// LoadProfileFn is the function signature for a function in an app that loads a profile
|
||||||
type LoadProfileFn func(profile peer.CwtchPeer)
|
type LoadProfileFn func(profile peer.CwtchPeer)
|
||||||
|
|
||||||
func newAppCore(appDirectory string) *applicationCore {
|
|
||||||
appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory}
|
|
||||||
os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700)
|
|
||||||
return appCore
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
||||||
func NewApp(acn connectivity.ACN, appDirectory string) Application {
|
func NewApp(acn connectivity.ACN, appDirectory string) Application {
|
||||||
log.Debugf("NewApp(%v)\n", appDirectory)
|
log.Debugf("NewApp(%v)\n", appDirectory)
|
||||||
app := &application{engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()}
|
os.MkdirAll(path.Join(appDirectory, "profiles"), 0700)
|
||||||
|
|
||||||
|
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager()}
|
||||||
app.appletPeers.init()
|
app.appletPeers.init()
|
||||||
|
|
||||||
app.appletACN.init(acn, app.getACNStatusHandler())
|
app.appletACN.init(acn, app.getACNStatusHandler())
|
||||||
return app
|
return app
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ac *applicationCore) DeletePeer(onion string) {
|
|
||||||
ac.coremutex.Lock()
|
|
||||||
defer ac.coremutex.Unlock()
|
|
||||||
|
|
||||||
ac.eventBuses[onion].Shutdown()
|
|
||||||
delete(ac.eventBuses, onion)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *application) CreateTaggedPeer(name string, password string, tag string) {
|
func (app *application) CreateTaggedPeer(name string, password string, tag string) {
|
||||||
app.appmutex.Lock()
|
app.appmutex.Lock()
|
||||||
defer app.appmutex.Unlock()
|
defer app.appmutex.Unlock()
|
||||||
|
@ -127,7 +109,11 @@ func (app *application) DeletePeer(onion string, password string) {
|
||||||
app.peers[onion].Delete()
|
app.peers[onion].Delete()
|
||||||
delete(app.peers, onion)
|
delete(app.peers, onion)
|
||||||
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
|
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
|
||||||
app.applicationCore.DeletePeer(onion)
|
|
||||||
|
app.coremutex.Lock()
|
||||||
|
defer app.coremutex.Unlock()
|
||||||
|
app.eventBuses[onion].Shutdown()
|
||||||
|
delete(app.eventBuses, onion)
|
||||||
|
|
||||||
log.Debugf("Delete peer for %v Done\n", onion)
|
log.Debugf("Delete peer for %v Done\n", onion)
|
||||||
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
|
app.appBus.Publish(event.NewEventList(event.PeerDeleted, event.Identity, onion))
|
||||||
|
@ -145,44 +131,62 @@ func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
|
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
|
||||||
func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error {
|
func (app *application) LoadProfiles(password string) {
|
||||||
files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles"))
|
count := 0
|
||||||
|
migrating := false
|
||||||
|
|
||||||
|
files, err := ioutil.ReadDir(path.Join(app.directory, "profiles"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error: cannot read profiles directory: %v", err)
|
log.Errorf("error: cannot read profiles directory: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, file := range files {
|
for _, file := range files {
|
||||||
// Attempt to load an encrypted database
|
// Attempt to load an encrypted database
|
||||||
profileDirectory := path.Join(ac.directory, "profiles", file.Name())
|
profileDirectory := path.Join(app.directory, "profiles", file.Name())
|
||||||
profile, err := peer.FromEncryptedDatabase(profileDirectory, password)
|
profile, err := peer.FromEncryptedDatabase(profileDirectory, password)
|
||||||
|
loaded := false
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// return the load the profile...
|
// return the load the profile...
|
||||||
log.Infof("loading profile from new-type storage database...")
|
log.Infof("loading profile from new-type storage database...")
|
||||||
loadProfileFn(profile)
|
loaded = app.installProfile(profile)
|
||||||
} else { // On failure attempt to load a legacy profile
|
} else { // On failure attempt to load a legacy profile
|
||||||
profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password)
|
profileStore, err := storage.LoadProfileWriterStore(profileDirectory, password)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Infof("found legacy profile. importing to new database structure...")
|
log.Infof("found legacy profile. importing to new database structure...")
|
||||||
legacyProfile := profileStore.GetProfileCopy(timeline)
|
legacyProfile := profileStore.GetProfileCopy(true)
|
||||||
|
if !migrating {
|
||||||
|
migrating = true
|
||||||
|
app.appBus.Publish(event.NewEventList(event.StartingStorageMiragtion))
|
||||||
|
}
|
||||||
|
|
||||||
cps, err := peer.CreateEncryptedStore(profileDirectory, password)
|
cps, err := peer.CreateEncryptedStore(profileDirectory, password)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error creating encrypted store: %v", err)
|
log.Errorf("error creating encrypted store: %v", err)
|
||||||
}
|
}
|
||||||
profile := peer.ImportLegacyProfile(legacyProfile, cps)
|
profile := peer.ImportLegacyProfile(legacyProfile, cps)
|
||||||
loadProfileFn(profile)
|
loaded = app.installProfile(profile)
|
||||||
|
}
|
||||||
|
if loaded {
|
||||||
|
count++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
if count == 0 {
|
||||||
|
message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
|
||||||
|
app.appBus.Publish(message)
|
||||||
|
}
|
||||||
|
if migrating {
|
||||||
|
app.appBus.Publish(event.NewEventList(event.DoneStorageMigration))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
|
// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true
|
||||||
func (app *application) LoadProfiles(password string) {
|
func (app *application) installProfile(profile peer.CwtchPeer) bool {
|
||||||
count := 0
|
|
||||||
app.applicationCore.LoadProfiles(password, true, func(profile peer.CwtchPeer) {
|
|
||||||
app.appmutex.Lock()
|
app.appmutex.Lock()
|
||||||
|
defer app.appmutex.Unlock()
|
||||||
|
|
||||||
// Only attempt to finalize the profile if we don't have one loaded...
|
// Only attempt to finalize the profile if we don't have one loaded...
|
||||||
if app.peers[profile.GetOnion()] == nil {
|
if app.peers[profile.GetOnion()] == nil {
|
||||||
eventBus := event.NewEventManager()
|
eventBus := event.NewEventManager()
|
||||||
|
@ -191,17 +195,11 @@ func (app *application) LoadProfiles(password string) {
|
||||||
app.peers[profile.GetOnion()] = profile
|
app.peers[profile.GetOnion()] = profile
|
||||||
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
|
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
|
||||||
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
|
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
|
||||||
count++
|
return true
|
||||||
} else {
|
}
|
||||||
// Otherwise shutdown the connections
|
// Otherwise shutdown the connections
|
||||||
profile.Shutdown()
|
profile.Shutdown()
|
||||||
}
|
return false
|
||||||
app.appmutex.Unlock()
|
|
||||||
})
|
|
||||||
if count == 0 {
|
|
||||||
message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
|
|
||||||
app.appBus.Publish(message)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
|
// GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
|
||||||
|
@ -210,8 +208,8 @@ func (app *application) GetPrimaryBus() event.Manager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEventBus returns a cwtchPeer's event bus
|
// GetEventBus returns a cwtchPeer's event bus
|
||||||
func (ac *applicationCore) GetEventBus(onion string) event.Manager {
|
func (app *application) GetEventBus(onion string) event.Manager {
|
||||||
if manager, ok := ac.eventBuses[onion]; ok {
|
if manager, ok := app.eventBuses[onion]; ok {
|
||||||
return manager
|
return manager
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -202,6 +202,9 @@ const (
|
||||||
|
|
||||||
// Profile Attribute Event
|
// Profile Attribute Event
|
||||||
UpdatedProfileAttribute = Type("UpdatedProfileAttribute")
|
UpdatedProfileAttribute = Type("UpdatedProfileAttribute")
|
||||||
|
|
||||||
|
StartingStorageMiragtion = Type("StartingStorageMigration")
|
||||||
|
DoneStorageMigration = Type("DoneStorageMigration")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Field defines common event attributes
|
// Field defines common event attributes
|
||||||
|
@ -284,12 +287,6 @@ const (
|
||||||
PasswordMatchError = "Password did not match"
|
PasswordMatchError = "Password did not match"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Values to be suplied in event.NewPeer for Status
|
|
||||||
const (
|
|
||||||
StorageRunning = "running"
|
|
||||||
StorageNew = "new"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Defining Protocol Contexts
|
// Defining Protocol Contexts
|
||||||
const (
|
const (
|
||||||
ContextAck = "im.cwtch.acknowledgement"
|
ContextAck = "im.cwtch.acknowledgement"
|
||||||
|
|
|
@ -24,6 +24,11 @@ import (
|
||||||
"golang.org/x/crypto/ed25519"
|
"golang.org/x/crypto/ed25519"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type connectionLockedService struct {
|
||||||
|
service tapir.Service
|
||||||
|
connectingLock sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
type engine struct {
|
type engine struct {
|
||||||
queue event.Queue
|
queue event.Queue
|
||||||
|
|
||||||
|
@ -46,7 +51,7 @@ type engine struct {
|
||||||
getValRequests sync.Map // [string]string eventID:Data
|
getValRequests sync.Map // [string]string eventID:Data
|
||||||
|
|
||||||
// Nextgen Tapir Service
|
// Nextgen Tapir Service
|
||||||
ephemeralServices map[string]tapir.Service //sync.Map // string(onion) => tapir.Service
|
ephemeralServices map[string]*connectionLockedService //sync.Map // string(onion) => tapir.Service
|
||||||
ephemeralServicesLock sync.Mutex
|
ephemeralServicesLock sync.Mutex
|
||||||
|
|
||||||
// Required for listen(), inaccessible from identity
|
// Required for listen(), inaccessible from identity
|
||||||
|
@ -73,7 +78,7 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
|
||||||
engine := new(engine)
|
engine := new(engine)
|
||||||
engine.identity = identity
|
engine.identity = identity
|
||||||
engine.privateKey = privateKey
|
engine.privateKey = privateKey
|
||||||
engine.ephemeralServices = make(map[string]tapir.Service)
|
engine.ephemeralServices = make(map[string]*connectionLockedService)
|
||||||
engine.queue = event.NewQueue()
|
engine.queue = event.NewQueue()
|
||||||
go engine.eventHandler()
|
go engine.eventHandler()
|
||||||
|
|
||||||
|
@ -152,9 +157,7 @@ func (e *engine) eventHandler() {
|
||||||
}
|
}
|
||||||
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
||||||
case event.LeaveServer:
|
case event.LeaveServer:
|
||||||
e.ephemeralServicesLock.Lock()
|
|
||||||
e.leaveServer(ev.Data[event.GroupServer])
|
e.leaveServer(ev.Data[event.GroupServer])
|
||||||
e.ephemeralServicesLock.Unlock()
|
|
||||||
case event.DeleteContact:
|
case event.DeleteContact:
|
||||||
onion := ev.Data[event.RemotePeer]
|
onion := ev.Data[event.RemotePeer]
|
||||||
// We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
|
// We remove this peer from out blocklist which will prevent them from contacting us if we have "block unknown peers" turned on.
|
||||||
|
@ -283,7 +286,7 @@ func (e *engine) Shutdown() {
|
||||||
defer e.ephemeralServicesLock.Unlock()
|
defer e.ephemeralServicesLock.Unlock()
|
||||||
for _, connection := range e.ephemeralServices {
|
for _, connection := range e.ephemeralServices {
|
||||||
log.Infof("shutting down ephemeral service")
|
log.Infof("shutting down ephemeral service")
|
||||||
connection.Shutdown()
|
connection.service.Shutdown()
|
||||||
}
|
}
|
||||||
e.queue.Shutdown()
|
e.queue.Shutdown()
|
||||||
}
|
}
|
||||||
|
@ -319,22 +322,30 @@ func (e *engine) peerWithOnion(onion string) {
|
||||||
// needs to be run in a goroutine as will block on Open.
|
// needs to be run in a goroutine as will block on Open.
|
||||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
|
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
|
||||||
e.ephemeralServicesLock.Lock()
|
e.ephemeralServicesLock.Lock()
|
||||||
defer e.ephemeralServicesLock.Unlock()
|
connectionService, exists := e.ephemeralServices[onion]
|
||||||
connection, exists := e.ephemeralServices[onion]
|
|
||||||
if exists {
|
if exists && connectionService.service != nil {
|
||||||
if conn, err := connection.GetConnection(onion); err == nil {
|
if conn, err := connectionService.service.GetConnection(onion); err == nil {
|
||||||
// We are already peered and synced so return...
|
// We are already peered and synced so return...
|
||||||
// This will only not-trigger it lastKnownSignature has been wiped, which only happens when ResyncServer is called
|
// This will only not-trigger if lastKnownSignature has been wiped, which only happens when ResyncServer is called
|
||||||
// in CwtchPeer.
|
// in CwtchPeer.
|
||||||
if !conn.IsClosed() && len(lastKnownSignature) != 0 {
|
if !conn.IsClosed() && len(lastKnownSignature) != 0 {
|
||||||
|
e.ephemeralServicesLock.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)...
|
// Otherwise...we are going to rebuild the connection(which will result in a bandwidth heavy resync)...
|
||||||
e.leaveServer(onion)
|
connectionService.service.Shutdown()
|
||||||
}
|
}
|
||||||
// Otherwise...let's reconnect
|
// Otherwise...let's reconnect
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connectionService = &connectionLockedService{}
|
||||||
|
e.ephemeralServices[onion] = connectionService
|
||||||
|
|
||||||
|
connectionService.connectingLock.Lock()
|
||||||
|
defer connectionService.connectingLock.Unlock()
|
||||||
|
e.ephemeralServicesLock.Unlock()
|
||||||
|
|
||||||
log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion)
|
log.Debugf("Peering with Token Server %v %v", onion, tokenServerOnion)
|
||||||
e.ignoreOnShutdown(e.serverConnecting)(onion)
|
e.ignoreOnShutdown(e.serverConnecting)(onion)
|
||||||
// Create a new ephemeral service for this connection
|
// Create a new ephemeral service for this connection
|
||||||
|
@ -345,7 +356,9 @@ func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, toke
|
||||||
Y := ristretto255.NewElement()
|
Y := ristretto255.NewElement()
|
||||||
Y.UnmarshalText([]byte(tokenServerY))
|
Y.UnmarshalText([]byte(tokenServerY))
|
||||||
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected)))
|
connected, err := ephemeralService.Connect(onion, NewTokenBoardClient(e.acn, Y, tokenServerOnion, lastKnownSignature, e.receiveGroupMessage, e.serverAuthed, e.serverSynced, e.ignoreOnShutdown(e.serverDisconnected)))
|
||||||
e.ephemeralServices[onion] = ephemeralService
|
e.ephemeralServicesLock.Lock()
|
||||||
|
e.ephemeralServices[onion].service = ephemeralService
|
||||||
|
e.ephemeralServicesLock.Unlock()
|
||||||
// If we are already connected...check if we are authed and issue an auth event
|
// If we are already connected...check if we are authed and issue an auth event
|
||||||
// (This allows the ui to be stateless)
|
// (This allows the ui to be stateless)
|
||||||
if connected && err != nil {
|
if connected && err != nil {
|
||||||
|
@ -508,7 +521,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := ephemeralService.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
|
conn, err := ephemeralService.service.WaitForCapabilityOrClose(server, groups.CwtchServerSyncedCapability)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||||
if ok {
|
if ok {
|
||||||
|
@ -619,12 +632,12 @@ func (e *engine) handlePeerRetVal(hostname string, getValData, retValData []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// leaveServer disconnects from a server and deletes the ephemeral service
|
// leaveServer disconnects from a server and deletes the ephemeral service
|
||||||
// REQUIREMENTS: must be called inside a block with e.ephemeralServicesLock.Lock()
|
|
||||||
// can't do it iself because is called from inside peerWithTokenServer which holds the lock
|
|
||||||
func (e *engine) leaveServer(server string) {
|
func (e *engine) leaveServer(server string) {
|
||||||
|
e.ephemeralServicesLock.Lock()
|
||||||
|
defer e.ephemeralServicesLock.Unlock()
|
||||||
ephemeralService, ok := e.ephemeralServices[server]
|
ephemeralService, ok := e.ephemeralServices[server]
|
||||||
if ok {
|
if ok {
|
||||||
ephemeralService.Shutdown()
|
ephemeralService.service.Shutdown()
|
||||||
delete(e.ephemeralServices, server)
|
delete(e.ephemeralServices, server)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue