Browse Source

Add plugin system for apps; add contact retry plugin

pull/260/head
Dan Ballard 2 years ago
parent
commit
f2e69f48d1
  1. 69
      app/app.go
  2. 7
      app/appClient.go
  3. 7
      app/appService.go
  4. 101
      app/applets.go
  5. 98
      app/plugins/contactRetry.go
  6. 29
      app/plugins/plugin.go
  7. 3
      event/common.go

69
app/app.go

@ -1,20 +1,20 @@
package app
import (
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/storage"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"strconv"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"io/ioutil"
"os"
"path"
"strconv"
"sync"
)
@ -25,19 +25,11 @@ type applicationCore struct {
mutex sync.Mutex
}
type appletPeers struct {
peers map[string]peer.CwtchPeer
launched bool // bit hacky, place holder while we transition to full multi peer support and a better api
}
type appletACN struct {
acn connectivity.ACN
}
type application struct {
applicationCore
appletPeers
appletACN
appletPlugins
storage map[string]storage.ProfileStore
engines map[string]connections.Engine
appBus event.Manager
@ -47,6 +39,7 @@ type application struct {
type Application interface {
LoadProfiles(password string)
CreatePeer(name string, password string)
AddPeerPlugin(onion string, pluginID plugins.PluginID)
LaunchPeers()
GetPrimaryBus() event.Manager
@ -68,22 +61,6 @@ func newAppCore(appDirectory string) *applicationCore {
return appCore
}
func (ap *appletPeers) init() {
ap.peers = make(map[string]peer.CwtchPeer)
ap.launched = false
}
func (a *appletACN) init(acn connectivity.ACN, publish func(int, string)) {
a.acn = acn
acn.SetStatusCallback(publish)
prog, status := acn.GetBootstrapStatus()
publish(prog, status)
}
func (a *appletACN) Shutdown() {
a.acn.Close()
}
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string) Application {
log.Debugf("NewApp(%v)\n", appDirectory)
@ -144,6 +121,10 @@ func (app *application) CreatePeer(name string, password string) {
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}))
}
func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
app.AddPlugin(onion, pluginID, app.eventBuses[onion])
}
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error {
files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles"))
@ -207,37 +188,6 @@ func (app *application) GetPrimaryBus() event.Manager {
return app.appBus
}
// LaunchPeers starts each peer Listening and connecting to peers and groups
func (ap *appletPeers) LaunchPeers() {
log.Debugf("appletPeers LaunchPeers\n")
if ap.launched {
return
}
for _, p := range ap.peers {
p.Listen()
p.StartPeersConnections()
p.StartGroupConnections()
}
ap.launched = true
}
// ListPeers returns a map of onions to their profile's Name
func (ap *appletPeers) ListPeers() map[string]string {
keys := map[string]string{}
for k, p := range ap.peers {
keys[k] = p.GetProfile().Name
}
return keys
}
// GetPeer returns a cwtchPeer for a given onion address
func (ap *appletPeers) GetPeer(onion string) peer.CwtchPeer {
if peer, ok := ap.peers[onion]; ok {
return peer
}
return nil
}
// GetEventBus returns a cwtchPeer's event bus
func (ac *applicationCore) GetEventBus(onion string) event.Manager {
if manager, ok := ac.eventBuses[onion]; ok {
@ -266,6 +216,7 @@ func (app *application) Shutdown() {
peer.Shutdown()
app.engines[id].Shutdown()
app.storage[id].Shutdown()
app.appletPlugins.Shutdown()
app.eventBuses[id].Shutdown()
}
app.appBus.Shutdown()

7
app/appClient.go

@ -1,12 +1,14 @@
package app
import (
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/storage"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"path"
"strconv"
)
type applicationClient struct {
@ -94,6 +96,11 @@ func (ac *applicationClient) CreatePeer(name string, password string) {
ac.bridge.Write(&message)
}
func (ac *applicationClient) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.AddPeerPlugin, map[event.Field]string{event.Identity: onion, event.Data: strconv.Itoa(int(pluginID))})}
ac.bridge.Write(&message)
}
// LoadProfiles messages the service to load any profiles for the given password
func (ac *applicationClient) LoadProfiles(password string) {
message := event.IPCMessage{Dest: DestApp, Message: event.NewEvent(event.LoadProfiles, map[event.Field]string{event.Password: password})}

7
app/appService.go

@ -1,6 +1,7 @@
package app
import (
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol/connections"
@ -15,6 +16,7 @@ import (
type applicationService struct {
applicationBridge
appletACN
appletPlugins
storage map[string]storage.ProfileStore
engines map[string]connections.Engine
@ -49,6 +51,10 @@ func (as *applicationService) handleEvent(ev *event.Event) {
profileName := ev.Data[event.ProfileName]
password := ev.Data[event.Password]
as.createPeer(profileName, password)
case event.AddPeerPlugin:
onion := ev.Data[event.Identity]
pluginID, _ := strconv.Atoi(ev.Data[event.Data])
as.AddPlugin(onion, plugins.PluginID(pluginID), as.eventBuses[onion])
case event.LoadProfiles:
password := ev.Data[event.Password]
as.loadProfiles(password)
@ -133,6 +139,7 @@ func (as *applicationService) ShutdownPeer(onion string) {
// Shutdown shuts down the application Service and all peer related backend parts
func (as *applicationService) Shutdown() {
for id := range as.engines {
as.appletPlugins.Shutdown()
as.ShutdownPeer(id)
}
}

101
app/applets.go

@ -0,0 +1,101 @@
package app
import (
"cwtch.im/cwtch/event"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"sync"
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/peer"
)
type appletPeers struct {
peers map[string]peer.CwtchPeer
launched bool // bit hacky, place holder while we transition to full multi peer support and a better api
}
type appletACN struct {
acn connectivity.ACN
}
type appletPlugins struct {
plugins sync.Map //map[string] []plugins.Plugin
}
// ***** applet ACN
func (a *appletACN) init(acn connectivity.ACN, publish func(int, string)) {
a.acn = acn
acn.SetStatusCallback(publish)
prog, status := acn.GetBootstrapStatus()
publish(prog, status)
}
func (a *appletACN) Shutdown() {
a.acn.Close()
}
// ***** appletPeers
func (ap *appletPeers) init() {
ap.peers = make(map[string]peer.CwtchPeer)
ap.launched = false
}
// LaunchPeers starts each peer Listening and connecting to peers and groups
func (ap *appletPeers) LaunchPeers() {
log.Debugf("appletPeers LaunchPeers\n")
if ap.launched {
return
}
for _, p := range ap.peers {
p.Listen()
p.StartPeersConnections()
p.StartGroupConnections()
}
ap.launched = true
}
// ListPeers returns a map of onions to their profile's Name
func (ap *appletPeers) ListPeers() map[string]string {
keys := map[string]string{}
for k, p := range ap.peers {
keys[k] = p.GetProfile().Name
}
return keys
}
// GetPeer returns a cwtchPeer for a given onion address
func (ap *appletPeers) GetPeer(onion string) peer.CwtchPeer {
if peer, ok := ap.peers[onion]; ok {
return peer
}
return nil
}
// ***** applet Plugins
func (ap *appletPlugins) Shutdown() {
ap.plugins.Range(func(k, v interface{}) bool {
plugins := v.([]plugins.Plugin)
for _, plugin := range plugins {
plugin.Shutdown()
}
return true
})
}
func (ap *appletPlugins) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager) {
if _, exists := ap.plugins.Load(peerid); !exists {
ap.plugins.Store(peerid, []plugins.Plugin{})
}
pluginsinf, _ := ap.plugins.Load(peerid)
peerPlugins := pluginsinf.([]plugins.Plugin)
newp := plugins.Get(id, bus)
newp.Start()
peerPlugins = append(peerPlugins, newp)
ap.plugins.Store(peerid, peerPlugins)
}

98
app/plugins/contactRetry.go

@ -0,0 +1,98 @@
package plugins
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections"
"sync"
"time"
)
const tickTime = 10 * time.Second
const maxBakoff int = 32 // 320 seconds or ~5 min
type peer struct {
id string
state connections.ConnectionState
ticks int
backoff int
}
type contactRetry struct {
bus event.Manager
queue *event.Queue
breakChan chan bool
peers sync.Map //[string]*peer
}
// NewContactRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing
func NewContactRetry(bus event.Manager) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewEventQueue(1000), breakChan: make(chan bool), peers: sync.Map{}}
return cr
}
func (cr *contactRetry) Start() {
go cr.run()
}
func (cr *contactRetry) run() {
cr.bus.Subscribe(event.PeerStateChange, cr.queue.EventChannel)
for {
select {
case e := <-cr.queue.EventChannel:
switch e.EventType {
case event.PeerStateChange:
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]
peer := e.Data[event.RemotePeer]
cr.handleEvent(peer, state)
}
case <-time.After(tickTime):
cr.peers.Range(func(k, v interface{}) bool {
p := v.(*peer)
if p.state == connections.DISCONNECTED {
p.ticks++
if p.ticks == p.backoff {
p.ticks = 0
cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
}
}
return true
})
case <-cr.breakChan:
return
}
}
}
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState) {
if _, exists := cr.peers.Load(id); !exists {
p := &peer{id: id, state: connections.DISCONNECTED, backoff: 1, ticks: 0}
cr.peers.Store(id, p)
return
}
pinf, _ := cr.peers.Load(id)
p := pinf.(*peer)
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
p.state = connections.DISCONNECTED
if p.backoff < maxBakoff {
p.backoff *= 2
}
p.ticks = 0
} else if state == connections.CONNECTING || state == connections.CONNECTED {
p.state = state
} else if state == connections.AUTHENTICATED {
p.state = state
p.backoff = 1
}
}
func (cr *contactRetry) Shutdown() {
cr.breakChan <- true
}

29
app/plugins/plugin.go

@ -0,0 +1,29 @@
package plugins
import (
"cwtch.im/cwtch/event"
)
// PluginID is used as an ID for signaling plugin activities
type PluginID int
// These are the plugin IDs for the supplied plugins
const (
CONTACTRETRY PluginID = iota
)
// Plugin is the interface for a plugin
type Plugin interface {
Start()
Shutdown()
}
// Get is a plugin factory for the requested plugin
func Get(id PluginID, bus event.Manager) Plugin {
switch id {
case CONTACTRETRY:
return NewContactRetry(bus)
}
return nil
}

3
event/common.go

@ -106,6 +106,9 @@ const (
// app -> Identity(onion)
NewPeer = Type("NewPeer")
// Identity(onion), Data(pluginID)
AddPeerPlugin = Type("AddPeerPlugin")
// Password
LoadProfiles = Type("LoadProfiles")

Loading…
Cancel
Save