Official cwtch.im peer implementation. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

298 lines
9.4 KiB

4 years ago
4 years ago
3 years ago
4 years ago
3 years ago
4 years ago
4 years ago
  1. package app
  2. import (
  3. "cwtch.im/cwtch/app/plugins"
  4. "cwtch.im/cwtch/event"
  5. "cwtch.im/cwtch/model"
  6. "cwtch.im/cwtch/peer"
  7. "cwtch.im/cwtch/protocol/connections"
  8. "cwtch.im/cwtch/storage"
  9. "fmt"
  10. "git.openprivacy.ca/cwtch.im/tapir/primitives"
  11. "git.openprivacy.ca/openprivacy/connectivity"
  12. "git.openprivacy.ca/openprivacy/log"
  13. "io/ioutil"
  14. "os"
  15. "path"
  16. "strconv"
  17. "sync"
  18. )
  19. // AttributeTag is a const name for a peer attribute that can be set at creation time, for example for versioning info
  20. const AttributeTag = "tag"
  21. type applicationCore struct {
  22. eventBuses map[string]event.Manager
  23. directory string
  24. coremutex sync.Mutex
  25. }
  26. type application struct {
  27. applicationCore
  28. appletPeers
  29. appletACN
  30. appletPlugins
  31. storage map[string]storage.ProfileStore
  32. engines map[string]connections.Engine
  33. appBus event.Manager
  34. appmutex sync.Mutex
  35. }
  36. // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
  37. type Application interface {
  38. LoadProfiles(password string)
  39. CreatePeer(name string, password string)
  40. CreateTaggedPeer(name string, password string, tag string)
  41. DeletePeer(onion string, currentPassword string) bool
  42. AddPeerPlugin(onion string, pluginID plugins.PluginID)
  43. ChangePeerPassword(onion, oldpass, newpass string)
  44. LaunchPeers()
  45. GetPrimaryBus() event.Manager
  46. GetEventBus(onion string) event.Manager
  47. QueryACNStatus()
  48. QueryACNVersion()
  49. ShutdownPeer(string)
  50. Shutdown()
  51. GetPeer(onion string) peer.CwtchPeer
  52. ListPeers() map[string]string
  53. }
  54. // LoadProfileFn is the function signature for a function in an app that loads a profile
  55. type LoadProfileFn func(profile *model.Profile, store storage.ProfileStore)
  56. func newAppCore(appDirectory string) *applicationCore {
  57. appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory}
  58. os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700)
  59. return appCore
  60. }
  61. // NewApp creates a new app with some environment awareness and initializes a Tor Manager
  62. func NewApp(acn connectivity.ACN, appDirectory string) Application {
  63. log.Debugf("NewApp(%v)\n", appDirectory)
  64. app := &application{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()}
  65. app.appletPeers.init()
  66. app.appletACN.init(acn, app.getACNStatusHandler())
  67. return app
  68. }
  69. // CreatePeer creates a new Peer with a given name and core required accessories (eventbus)
  70. func (ac *applicationCore) CreatePeer(name string) (*model.Profile, error) {
  71. log.Debugf("CreatePeer(%v)\n", name)
  72. profile := storage.NewProfile(name)
  73. ac.coremutex.Lock()
  74. defer ac.coremutex.Unlock()
  75. _, exists := ac.eventBuses[profile.Onion]
  76. if exists {
  77. return nil, fmt.Errorf("error: profile for onion %v already exists", profile.Onion)
  78. }
  79. eventBus := event.NewEventManager()
  80. ac.eventBuses[profile.Onion] = eventBus
  81. return profile, nil
  82. }
  83. func (ac *applicationCore) DeletePeer(onion string) {
  84. ac.coremutex.Lock()
  85. defer ac.coremutex.Unlock()
  86. ac.eventBuses[onion].Shutdown()
  87. delete(ac.eventBuses, onion)
  88. }
  89. func (app *application) CreateTaggedPeer(name string, password string, tag string) {
  90. profile, err := app.applicationCore.CreatePeer(name)
  91. if err != nil {
  92. app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error()))
  93. return
  94. }
  95. profileStore := storage.CreateProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile)
  96. app.storage[profile.Onion] = profileStore
  97. pc := app.storage[profile.Onion].GetProfileCopy(true)
  98. p := peer.FromProfile(pc)
  99. p.Init(app.eventBuses[profile.Onion])
  100. peerAuthorizations := profile.ContactsAuthorizations()
  101. // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
  102. identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
  103. engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, app.eventBuses[profile.Onion], peerAuthorizations)
  104. app.peers[profile.Onion] = p
  105. app.engines[profile.Onion] = engine
  106. if tag != "" {
  107. p.SetAttribute(AttributeTag, tag)
  108. }
  109. app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion, event.Created: event.True}))
  110. }
  111. // CreatePeer creates a new Peer with the given name and required accessories (eventbus, storage, protocol engine)
  112. func (app *application) CreatePeer(name string, password string) {
  113. app.CreateTaggedPeer(name, password, "")
  114. }
  115. func (app *application) DeletePeer(onion string, password string) bool {
  116. log.Infof("DeletePeer called on %v\n", onion)
  117. app.appmutex.Lock()
  118. defer app.appmutex.Unlock()
  119. if app.storage[onion].CheckPassword(password) {
  120. app.appletPlugins.ShutdownPeer(onion)
  121. app.plugins.Delete(onion)
  122. app.peers[onion].Shutdown()
  123. delete(app.peers, onion)
  124. app.engines[onion].Shutdown()
  125. delete(app.engines, onion)
  126. app.storage[onion].Shutdown()
  127. app.storage[onion].Delete()
  128. delete(app.storage, onion)
  129. app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
  130. app.applicationCore.DeletePeer(onion)
  131. log.Debugf("Delete peer for %v Done\n", onion)
  132. return true
  133. }
  134. return false
  135. }
  136. func (app *application) ChangePeerPassword(onion, oldpass, newpass string) {
  137. app.eventBuses[onion].Publish(event.NewEventList(event.ChangePassword, event.Password, oldpass, event.NewPassword, newpass))
  138. }
  139. func (app *application) AddPeerPlugin(onion string, pluginID plugins.PluginID) {
  140. app.AddPlugin(onion, pluginID, app.eventBuses[onion], app.acn)
  141. }
  142. // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
  143. func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error {
  144. files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles"))
  145. if err != nil {
  146. return fmt.Errorf("error: cannot read profiles directory: %v", err)
  147. }
  148. for _, file := range files {
  149. eventBus := event.NewEventManager()
  150. profileStore, err := storage.LoadProfileWriterStore(eventBus, path.Join(ac.directory, "profiles", file.Name()), password)
  151. if err != nil {
  152. continue
  153. }
  154. profile := profileStore.GetProfileCopy(timeline)
  155. _, exists := ac.eventBuses[profile.Onion]
  156. if exists {
  157. profileStore.Shutdown()
  158. eventBus.Shutdown()
  159. log.Errorf("profile for onion %v already exists", profile.Onion)
  160. continue
  161. }
  162. ac.coremutex.Lock()
  163. ac.eventBuses[profile.Onion] = eventBus
  164. ac.coremutex.Unlock()
  165. loadProfileFn(profile, profileStore)
  166. }
  167. return nil
  168. }
  169. // LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
  170. func (app *application) LoadProfiles(password string) {
  171. count := 0
  172. app.applicationCore.LoadProfiles(password, true, func(profile *model.Profile, profileStore storage.ProfileStore) {
  173. peer := peer.FromProfile(profile)
  174. peer.Init(app.eventBuses[profile.Onion])
  175. peerAuthorizations := profile.ContactsAuthorizations()
  176. identity := primitives.InitializeIdentity(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
  177. engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, app.eventBuses[profile.Onion], peerAuthorizations)
  178. app.appmutex.Lock()
  179. app.peers[profile.Onion] = peer
  180. app.storage[profile.Onion] = profileStore
  181. app.engines[profile.Onion] = engine
  182. app.appmutex.Unlock()
  183. app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion, event.Created: event.False}))
  184. count++
  185. })
  186. if count == 0 {
  187. message := event.NewEventList(event.AppError, event.Error, event.AppErrLoaded0)
  188. app.appBus.Publish(message)
  189. }
  190. }
  191. // GetPrimaryBus returns the bus the Application uses for events that aren't peer specific
  192. func (app *application) GetPrimaryBus() event.Manager {
  193. return app.appBus
  194. }
  195. // GetEventBus returns a cwtchPeer's event bus
  196. func (ac *applicationCore) GetEventBus(onion string) event.Manager {
  197. if manager, ok := ac.eventBuses[onion]; ok {
  198. return manager
  199. }
  200. return nil
  201. }
  202. func (app *application) getACNStatusHandler() func(int, string) {
  203. return func(progress int, status string) {
  204. progStr := strconv.Itoa(progress)
  205. app.peerLock.Lock()
  206. app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
  207. for _, bus := range app.eventBuses {
  208. bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
  209. }
  210. app.peerLock.Unlock()
  211. }
  212. }
  213. func (app *application) QueryACNStatus() {
  214. prog, status := app.acn.GetBootstrapStatus()
  215. app.getACNStatusHandler()(prog, status)
  216. }
  217. func (app *application) QueryACNVersion() {
  218. version := app.acn.GetVersion()
  219. app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
  220. }
  221. // ShutdownPeer shuts down a peer and removes it from the app's management
  222. func (app *application) ShutdownPeer(onion string) {
  223. app.appmutex.Lock()
  224. defer app.appmutex.Unlock()
  225. app.eventBuses[onion].Shutdown()
  226. delete(app.eventBuses, onion)
  227. app.peers[onion].Shutdown()
  228. delete(app.peers, onion)
  229. app.engines[onion].Shutdown()
  230. delete(app.engines, onion)
  231. app.storage[onion].Shutdown()
  232. delete(app.storage, onion)
  233. app.appletPlugins.Shutdown()
  234. }
  235. // Shutdown shutsdown all peers of an app and then the tormanager
  236. func (app *application) Shutdown() {
  237. for id, peer := range app.peers {
  238. peer.Shutdown()
  239. app.appletPlugins.ShutdownPeer(id)
  240. app.engines[id].Shutdown()
  241. app.storage[id].Shutdown()
  242. app.eventBuses[id].Shutdown()
  243. }
  244. app.appBus.Shutdown()
  245. }