diff --git a/app/app.go b/app/app.go index 85bc997..5a9c5df 100644 --- a/app/app.go +++ b/app/app.go @@ -1,20 +1,20 @@ package app import ( + "cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/peer" "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/storage" "fmt" - "git.openprivacy.ca/openprivacy/libricochet-go/identity" - "strconv" - "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" + "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "io/ioutil" "os" "path" + "strconv" "sync" ) @@ -25,19 +25,11 @@ type applicationCore struct { mutex sync.Mutex } -type appletPeers struct { - peers map[string]peer.CwtchPeer - launched bool // bit hacky, place holder while we transition to full multi peer support and a better api -} - -type appletACN struct { - acn connectivity.ACN -} - type application struct { applicationCore appletPeers appletACN + appletPlugins storage map[string]storage.ProfileStore engines map[string]connections.Engine appBus event.Manager @@ -47,6 +39,7 @@ type application struct { type Application interface { LoadProfiles(password string) CreatePeer(name string, password string) + AddPeerPlugin(onion string, pluginID plugins.PluginID) LaunchPeers() GetPrimaryBus() event.Manager @@ -68,22 +61,6 @@ func newAppCore(appDirectory string) *applicationCore { return appCore } -func (ap *appletPeers) init() { - ap.peers = make(map[string]peer.CwtchPeer) - ap.launched = false -} - -func (a *appletACN) init(acn connectivity.ACN, publish func(int, string)) { - a.acn = acn - acn.SetStatusCallback(publish) - prog, status := acn.GetBootstrapStatus() - publish(prog, status) -} - -func (a *appletACN) Shutdown() { - a.acn.Close() -} - // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string) Application { log.Debugf("NewApp(%v)\n", appDirectory) @@ -144,6 +121,10 @@ func (app *application) CreatePeer(name string, password string) { app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})) } +func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) { + app.AddPlugin(onion, pluginID, app.eventBuses[onion]) +} + // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error { files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles")) @@ -207,37 +188,6 @@ func (app *application) GetPrimaryBus() event.Manager { return app.appBus } -// LaunchPeers starts each peer Listening and connecting to peers and groups -func (ap *appletPeers) LaunchPeers() { - log.Debugf("appletPeers LaunchPeers\n") - if ap.launched { - return - } - for _, p := range ap.peers { - p.Listen() - p.StartPeersConnections() - p.StartGroupConnections() - } - ap.launched = true -} - -// ListPeers returns a map of onions to their profile's Name -func (ap *appletPeers) ListPeers() map[string]string { - keys := map[string]string{} - for k, p := range ap.peers { - keys[k] = p.GetProfile().Name - } - return keys -} - -// GetPeer returns a cwtchPeer for a given onion address -func (ap *appletPeers) GetPeer(onion string) peer.CwtchPeer { - if peer, ok := ap.peers[onion]; ok { - return peer - } - return nil -} - // GetEventBus returns a cwtchPeer's event bus func (ac *applicationCore) GetEventBus(onion string) event.Manager { if manager, ok := ac.eventBuses[onion]; ok { @@ -266,6 +216,7 @@ func (app *application) Shutdown() { peer.Shutdown() app.engines[id].Shutdown() app.storage[id].Shutdown() + app.appletPlugins.Shutdown() app.eventBuses[id].Shutdown() } app.appBus.Shutdown() diff --git a/app/appClient.go b/app/appClient.go index bd2e488..a54325a 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -1,12 +1,14 @@ package app import ( + "cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/event" "cwtch.im/cwtch/peer" "cwtch.im/cwtch/storage" "fmt" "git.openprivacy.ca/openprivacy/libricochet-go/log" "path" + "strconv" ) type applicationClient struct { @@ -94,6 +96,11 @@ func (ac *applicationClient) CreatePeer(name string, password string) { ac.bridge.Write(&message) } +func (ac *applicationClient) AddPeerPlugin(onion string, pluginID plugins.PluginID) { + message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.AddPeerPlugin, map[event.Field]string{event.Identity: onion, event.Data: strconv.Itoa(int(pluginID))})} + ac.bridge.Write(&message) +} + // LoadProfiles messages the service to load any profiles for the given password func (ac *applicationClient) LoadProfiles(password string) { message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.LoadProfiles, map[event.Field]string{event.Password: password})} diff --git a/app/appService.go b/app/appService.go index 48414b0..57853aa 100644 --- a/app/appService.go +++ b/app/appService.go @@ -1,6 +1,7 @@ package app import ( + "cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/protocol/connections" @@ -15,6 +16,7 @@ import ( type applicationService struct { applicationBridge appletACN + appletPlugins storage map[string]storage.ProfileStore engines map[string]connections.Engine @@ -49,6 +51,10 @@ func (as *applicationService) handleEvent(ev *event.Event) { profileName := ev.Data[event.ProfileName] password := ev.Data[event.Password] as.createPeer(profileName, password) + case event.AddPeerPlugin: + onion := ev.Data[event.Identity] + pluginID, _ := strconv.Atoi(ev.Data[event.Data]) + as.AddPlugin(onion, plugins.PluginID(pluginID), as.eventBuses[onion]) case event.LoadProfiles: password := ev.Data[event.Password] as.loadProfiles(password) @@ -133,6 +139,7 @@ func (as *applicationService) ShutdownPeer(onion string) { // Shutdown shuts down the application Service and all peer related backend parts func (as *applicationService) Shutdown() { for id := range as.engines { + as.appletPlugins.Shutdown() as.ShutdownPeer(id) } } diff --git a/app/applets.go b/app/applets.go new file mode 100644 index 0000000..507c023 --- /dev/null +++ b/app/applets.go @@ -0,0 +1,101 @@ +package app + +import ( + "cwtch.im/cwtch/event" + "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "sync" + + "cwtch.im/cwtch/app/plugins" + "cwtch.im/cwtch/peer" +) + +type appletPeers struct { + peers map[string]peer.CwtchPeer + launched bool // bit hacky, place holder while we transition to full multi peer support and a better api +} + +type appletACN struct { + acn connectivity.ACN +} + +type appletPlugins struct { + plugins sync.Map //map[string] []plugins.Plugin +} + +// ***** applet ACN + +func (a *appletACN) init(acn connectivity.ACN, publish func(int, string)) { + a.acn = acn + acn.SetStatusCallback(publish) + prog, status := acn.GetBootstrapStatus() + publish(prog, status) +} + +func (a *appletACN) Shutdown() { + a.acn.Close() +} + +// ***** appletPeers + +func (ap *appletPeers) init() { + ap.peers = make(map[string]peer.CwtchPeer) + ap.launched = false +} + +// LaunchPeers starts each peer Listening and connecting to peers and groups +func (ap *appletPeers) LaunchPeers() { + log.Debugf("appletPeers LaunchPeers\n") + if ap.launched { + return + } + for _, p := range ap.peers { + p.Listen() + p.StartPeersConnections() + p.StartGroupConnections() + } + ap.launched = true +} + +// ListPeers returns a map of onions to their profile's Name +func (ap *appletPeers) ListPeers() map[string]string { + keys := map[string]string{} + for k, p := range ap.peers { + keys[k] = p.GetProfile().Name + } + return keys +} + +// GetPeer returns a cwtchPeer for a given onion address +func (ap *appletPeers) GetPeer(onion string) peer.CwtchPeer { + if peer, ok := ap.peers[onion]; ok { + return peer + } + return nil +} + +// ***** applet Plugins + +func (ap *appletPlugins) Shutdown() { + ap.plugins.Range(func(k, v interface{}) bool { + plugins := v.([]plugins.Plugin) + for _, plugin := range plugins { + plugin.Shutdown() + } + return true + }) +} + +func (ap *appletPlugins) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager) { + if _, exists := ap.plugins.Load(peerid); !exists { + ap.plugins.Store(peerid, []plugins.Plugin{}) + } + + pluginsinf, _ := ap.plugins.Load(peerid) + peerPlugins := pluginsinf.([]plugins.Plugin) + + newp := plugins.Get(id, bus) + newp.Start() + peerPlugins = append(peerPlugins, newp) + ap.plugins.Store(peerid, peerPlugins) +} diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go new file mode 100644 index 0000000..1b51ba0 --- /dev/null +++ b/app/plugins/contactRetry.go @@ -0,0 +1,98 @@ +package plugins + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/protocol/connections" + "sync" + "time" +) + +const tickTime = 10 * time.Second +const maxBakoff int = 32 // 320 seconds or ~5 min + +type peer struct { + id string + state connections.ConnectionState + + ticks int + backoff int +} + +type contactRetry struct { + bus event.Manager + queue *event.Queue + + breakChan chan bool + + peers sync.Map //[string]*peer +} + +// NewContactRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing +func NewContactRetry(bus event.Manager) Plugin { + cr := &contactRetry{bus: bus, queue: event.NewEventQueue(1000), breakChan: make(chan bool), peers: sync.Map{}} + return cr +} + +func (cr *contactRetry) Start() { + go cr.run() +} + +func (cr *contactRetry) run() { + cr.bus.Subscribe(event.PeerStateChange, cr.queue.EventChannel) + for { + select { + case e := <-cr.queue.EventChannel: + switch e.EventType { + case event.PeerStateChange: + state := connections.ConnectionStateToType[e.Data[event.ConnectionState]] + peer := e.Data[event.RemotePeer] + cr.handleEvent(peer, state) + } + + case <-time.After(tickTime): + cr.peers.Range(func(k, v interface{}) bool { + p := v.(*peer) + + if p.state == connections.DISCONNECTED { + p.ticks++ + if p.ticks == p.backoff { + p.ticks = 0 + cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: p.id})) + } + } + + return true + }) + + case <-cr.breakChan: + return + } + } +} + +func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState) { + if _, exists := cr.peers.Load(id); !exists { + p := &peer{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0} + cr.peers.Store(id, p) + return + } + + pinf, _ := cr.peers.Load(id) + p := pinf.(*peer) + if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { + p.state = connections.DISCONNECTED + if p.backoff < maxBakoff { + p.backoff *= 2 + } + p.ticks = 0 + } else if state == connections.CONNECTING || state == connections.CONNECTED { + p.state = state + } else if state == connections.AUTHENTICATED { + p.state = state + p.backoff = 1 + } +} + +func (cr *contactRetry) Shutdown() { + cr.breakChan <- true +} diff --git a/app/plugins/plugin.go b/app/plugins/plugin.go new file mode 100644 index 0000000..6ef0a5e --- /dev/null +++ b/app/plugins/plugin.go @@ -0,0 +1,29 @@ +package plugins + +import ( + "cwtch.im/cwtch/event" +) + +// PluginID is used as an ID for signaling plugin activities +type PluginID int + +// These are the plugin IDs for the supplied plugins +const ( + CONTACTRETRY PluginID = iota +) + +// Plugin is the interface for a plugin +type Plugin interface { + Start() + Shutdown() +} + +// Get is a plugin factory for the requested plugin +func Get(id PluginID, bus event.Manager) Plugin { + switch id { + case CONTACTRETRY: + return NewContactRetry(bus) + } + + return nil +} diff --git a/event/common.go b/event/common.go index a6f24e9..aa93538 100644 --- a/event/common.go +++ b/event/common.go @@ -106,6 +106,9 @@ const ( // app -> Identity(onion) NewPeer = Type("NewPeer") + // Identity(onion), Data(pluginID) + AddPeerPlugin = Type("AddPeerPlugin") + // Password LoadProfiles = Type("LoadProfiles")