Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

app.go 7.6KB


  1. package app
  2. import (
  3. "cwtch.im/cwtch/app/plugins"
  4. "cwtch.im/cwtch/event"
  5. "cwtch.im/cwtch/model"
  6. "cwtch.im/cwtch/peer"
  7. "cwtch.im/cwtch/protocol/connections"
  8. "cwtch.im/cwtch/storage"
  9. "cwtch.im/tapir/primitives"
  10. "fmt"
  11. "git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
  12. "git.openprivacy.ca/openprivacy/libricochet-go/log"
  13. "io/ioutil"
  14. "os"
  15. "path"
  16. "strconv"
  17. "sync"
  18. )
  19. type applicationCore struct {
  20. eventBuses map[string]event.Manager
  21. directory string
  22. mutex sync.Mutex
  23. }
  24. type application struct {
  25. applicationCore
  26. appletPeers
  27. appletACN
  28. appletPlugins
  29. storage map[string]storage.ProfileStore
  30. engines map[string]connections.Engine
  31. appBus event.Manager
  32. }
  33. // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
  34. type Application interface {
  35. LoadProfiles(password string)
  36. CreatePeer(name string, password string)
  37. AddPeerPlugin(onion string, pluginID plugins.PluginID)
  38. LaunchPeers()
  39. GetPrimaryBus() event.Manager
  40. GetEventBus(onion string) event.Manager
  41. QueryACNStatus()
  42. ShutdownPeer(string)
  43. Shutdown()
  44. GetPeer(onion string) peer.CwtchPeer
  45. ListPeers() map[string]string
  46. }
  47. // LoadProfileFn is the function signature for a function in an app that loads a profile
  48. type LoadProfileFn func(profile *model.Profile, store storage.ProfileStore)
  49. func newAppCore(appDirectory string) *applicationCore {
  50. appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory}
  51. os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700)
  52. return appCore
  53. }
  54. // NewApp creates a new app with some environment awareness and initializes a Tor Manager
  55. func NewApp(acn connectivity.ACN, appDirectory string) Application {
  56. log.Debugf("NewApp(%v)\n", appDirectory)
  57. app := &application{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()}
  58. app.appletPeers.init()
  59. app.appletACN.init(acn, app.getACNStatusHandler())
  60. return app
  61. }
  62. // CreatePeer creates a new Peer with a given name and core required accessories (eventbus)
  63. func (ac *applicationCore) CreatePeer(name string) (*model.Profile, error) {
  64. log.Debugf("CreatePeer(%v)\n", name)
  65. profile := storage.NewProfile(name)
  66. ac.mutex.Lock()
  67. defer ac.mutex.Unlock()
  68. _, exists := ac.eventBuses[profile.Onion]
  69. if exists {
  70. return nil, fmt.Errorf("Error: profile for onion %v already exists", profile.Onion)
  71. }
  72. eventBus := event.NewEventManager()
  73. ac.eventBuses[profile.Onion] = eventBus
  74. return profile, nil
  75. }
  76. // CreatePeer creates a new Peer with the given name and required accessories (eventbus, storage, protocol engine)
  77. func (app *application) CreatePeer(name string, password string) {
  78. profile, err := app.applicationCore.CreatePeer(name)
  79. if err != nil {
  80. app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error()))
  81. return
  82. }
  83. profileStore := storage.NewProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile)
  84. app.storage[profile.Onion] = profileStore
  85. pc := app.storage[profile.Onion].GetProfileCopy(true)
  86. peer := peer.FromProfile(pc)
  87. peer.Init(app.eventBuses[profile.Onion])
  88. blockedPeers := profile.BlockedPeers()
  89. // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
  90. identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
  91. engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, app.eventBuses[profile.Onion], profile.GetContacts(), blockedPeers)
  92. app.peers[profile.Onion] = peer
  93. app.engines[profile.Onion] = engine
  94. app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}))
  95. }
  96. func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
  97. app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn)
  98. }
  99. // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
  100. func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error {
  101. files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles"))
  102. if err != nil {
  103. return fmt.Errorf("Error: cannot read profiles directory: %v", err)
  104. }
  105. for _, file := range files {
  106. eventBus := event.NewEventManager()
  107. profileStore := storage.NewProfileWriterStore(eventBus, path.Join(ac.directory, "profiles", file.Name()), password, nil)
  108. err = profileStore.Load()
  109. if err != nil {
  110. continue
  111. }
  112. profile := profileStore.GetProfileCopy(timeline)
  113. _, exists := ac.eventBuses[profile.Onion]
  114. if exists {
  115. profileStore.Shutdown()
  116. eventBus.Shutdown()
  117. log.Errorf("profile for onion %v already exists", profile.Onion)
  118. continue
  119. }
  120. ac.mutex.Lock()
  121. ac.eventBuses[profile.Onion] = eventBus
  122. ac.mutex.Unlock()
  123. loadProfileFn(profile, profileStore)
  124. }
  125. return nil
  126. }
  127. // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
  128. func (app *application) LoadProfiles(password string) {
  129. count := 0
  130. app.applicationCore.LoadProfiles(password, true, func(profile *model.Profile, profileStore storage.ProfileStore) {
  131. peer := peer.FromProfile(profile)
  132. peer.Init(app.eventBuses[profile.Onion])
  133. blockedPeers := profile.BlockedPeers()
  134. identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
  135. engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, app.eventBuses[profile.Onion], profile.GetContacts(), blockedPeers)
  136. app.mutex.Lock()
  137. app.peers[profile.Onion] = peer
  138. app.storage[profile.Onion] = profileStore
  139. app.engines[profile.Onion] = engine
  140. app.mutex.Unlock()
  141. app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}))
  142. count++
  143. })
  144. if count == 0 {
  145. message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
  146. app.appBus.Publish(message)
  147. }
  148. }
  149. // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
  150. func (app *application) GetPrimaryBus() event.Manager {
  151. return app.appBus
  152. }
  153. // GetEventBus returns a cwtchPeer's event bus
  154. func (ac *applicationCore) GetEventBus(onion string) event.Manager {
  155. if manager, ok := ac.eventBuses[onion]; ok {
  156. return manager
  157. }
  158. return nil
  159. }
  160. func (app *application) getACNStatusHandler() func(int, string) {
  161. return func(progress int, status string) {
  162. progStr := strconv.Itoa(progress)
  163. app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
  164. for _, bus := range app.eventBuses {
  165. bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
  166. }
  167. }
  168. }
  169. func (app *application) QueryACNStatus() {
  170. prog, status := app.acn.GetBootstrapStatus()
  171. app.getACNStatusHandler()(prog, status)
  172. }
  173. // ShutdownPeer shuts down a peer and removes it from the app's management
  174. func (app *application) ShutdownPeer(onion string) {
  175. app.mutex.Lock()
  176. defer app.mutex.Unlock()
  177. app.eventBuses[onion].Shutdown()
  178. delete(app.eventBuses, onion)
  179. app.peers[onion].Shutdown()
  180. delete(app.peers, onion)
  181. app.engines[onion].Shutdown()
  182. delete(app.engines, onion)
  183. app.storage[onion].Shutdown()
  184. delete(app.storage, onion)
  185. app.appletPlugins.Shutdown()
  186. }
  187. // Shutdown shutsdown all peers of an app and then the tormanager
  188. func (app *application) Shutdown() {
  189. for id, peer := range app.peers {
  190. peer.Shutdown()
  191. app.appletPlugins.ShutdownPeer(id)
  192. app.engines[id].Shutdown()
  193. app.storage[id].Shutdown()
  194. app.eventBuses[id].Shutdown()
  195. }
  196. app.appBus.Shutdown()
  197. }