Official cwtch.im peer and server implementations. https://cwtch.im
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

119 lines
4.2 KiB

  1. package server
  2. import (
  3. "cwtch.im/cwtch/protocol/groups"
  4. "cwtch.im/cwtch/server/storage"
  5. "cwtch.im/tapir"
  6. "cwtch.im/tapir/applications"
  7. "cwtch.im/tapir/primitives/privacypass"
  8. "encoding/json"
  9. "git.openprivacy.ca/openprivacy/log"
  10. )
  11. // NewTokenBoardServer generates new Server for Token Board
  12. func NewTokenBoardServer(store storage.MessageStoreInterface, tokenService *privacypass.TokenServer) tapir.Application {
  13. tba := new(TokenboardServer)
  14. tba.TokenService = tokenService
  15. tba.LegacyMessageStore = store
  16. return tba
  17. }
  18. // TokenboardServer defines the token board server
  19. type TokenboardServer struct {
  20. applications.AuthApp
  21. connection tapir.Connection
  22. TokenService *privacypass.TokenServer
  23. LegacyMessageStore storage.MessageStoreInterface
  24. }
  25. // NewInstance creates a new TokenBoardApp
  26. func (ta *TokenboardServer) NewInstance() tapir.Application {
  27. tba := new(TokenboardServer)
  28. tba.TokenService = ta.TokenService
  29. tba.LegacyMessageStore = ta.LegacyMessageStore
  30. return tba
  31. }
  32. // Init initializes the cryptographic TokenBoardApp
  33. func (ta *TokenboardServer) Init(connection tapir.Connection) {
  34. ta.AuthApp.Init(connection)
  35. if connection.HasCapability(applications.AuthCapability) {
  36. ta.connection = connection
  37. go ta.Listen()
  38. } else {
  39. connection.Close()
  40. }
  41. }
  42. // Listen processes the messages for this application
  43. func (ta *TokenboardServer) Listen() {
  44. for {
  45. data := ta.connection.Expect()
  46. if len(data) == 0 {
  47. log.Debugf("Server Closing Connection")
  48. ta.connection.Close()
  49. return // connection is closed
  50. }
  51. var message groups.Message
  52. if err := json.Unmarshal(data, &message); err != nil {
  53. log.Debugf("Server Closing Connection Because of Malformed Client Packet %v", err)
  54. ta.connection.Close()
  55. return // connection is closed
  56. }
  57. switch message.MessageType {
  58. case groups.PostRequestMessage:
  59. if message.PostRequest != nil {
  60. postrequest := *message.PostRequest
  61. log.Debugf("Received a Post Message Request: %v", ta.connection.Hostname())
  62. ta.postMessageRequest(postrequest)
  63. } else {
  64. log.Debugf("Server Closing Connection Because of PostRequestMessage Client Packet")
  65. ta.connection.Close()
  66. return // connection is closed
  67. }
  68. case groups.ReplayRequestMessage:
  69. if message.ReplayRequest != nil {
  70. log.Debugf("Received Replay Request %v", message.ReplayRequest)
  71. messages := ta.LegacyMessageStore.FetchMessages()
  72. response, _ := json.Marshal(groups.Message{MessageType: groups.ReplayResultMessage, ReplayResult: &groups.ReplayResult{NumMessages: len(messages)}})
  73. log.Debugf("Sending Replay Response %v", groups.ReplayResult{NumMessages: len(messages)})
  74. ta.connection.Send(response)
  75. for _, message := range messages {
  76. data, _ = json.Marshal(message)
  77. ta.connection.Send(data)
  78. }
  79. // Set sync and then send any new messages that might have happened while we were syncing
  80. ta.connection.SetCapability(groups.CwtchServerSyncedCapability)
  81. newMessages := ta.LegacyMessageStore.FetchMessages()
  82. if len(newMessages) > len(messages) {
  83. for _, message := range newMessages[len(messages):] {
  84. data, _ = json.Marshal(groups.Message{MessageType: groups.NewMessageMessage, NewMessage: &groups.NewMessage{EGM: *message}})
  85. ta.connection.Send(data)
  86. }
  87. }
  88. } else {
  89. log.Debugf("Server Closing Connection Because of Malformed ReplayRequestMessage Packet")
  90. ta.connection.Close()
  91. return // connection is closed
  92. }
  93. }
  94. }
  95. }
  96. func (ta *TokenboardServer) postMessageRequest(pr groups.PostRequest) {
  97. if err := ta.TokenService.SpendToken(pr.Token, append(pr.EGM.ToBytes(), ta.connection.ID().Hostname()...)); err == nil {
  98. log.Debugf("Token is valid")
  99. ta.LegacyMessageStore.AddMessage(pr.EGM)
  100. data, _ := json.Marshal(groups.Message{MessageType: groups.PostResultMessage, PostResult: &groups.PostResult{Success: true}})
  101. ta.connection.Send(data)
  102. data, _ = json.Marshal(groups.Message{MessageType: groups.NewMessageMessage, NewMessage: &groups.NewMessage{EGM: pr.EGM}})
  103. ta.connection.Broadcast(data, groups.CwtchServerSyncedCapability)
  104. } else {
  105. log.Debugf("Attempt to spend an invalid token: %v", err)
  106. data, _ := json.Marshal(groups.Message{MessageType: groups.PostResultMessage, PostResult: &groups.PostResult{Success: false}})
  107. ta.connection.Send(data)
  108. }
  109. }