Browse Source

Merge pull request 'Fixes to enable more efficient message syncing / storage' (#372) from launch into master

Reviewed-on: https://git.openprivacy.ca/cwtch.im/cwtch/pulls/372
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
launch
erinn 4 months ago
parent
commit
8375a330f6
  1. 6
      app/app.go
  2. 4
      app/plugins/contactRetry.go
  3. 2
      event/common.go
  4. 39
      model/message.go
  5. 1
      peer/cwtch_peer.go
  6. 2
      storage/v1/profile_store.go

6
app/app.go

@ -290,10 +290,16 @@ func (app *application) ShutdownPeer(onion string) {
func (app *application) Shutdown() {
for id, peer := range app.peers {
peer.Shutdown()
log.Debugf("Shutting Down Peer %v", id)
app.appletPlugins.ShutdownPeer(id)
log.Debugf("Shutting Down Engines for %v", id)
app.engines[id].Shutdown()
log.Debugf("Shutting Down Storage for %v", id)
app.storage[id].Shutdown()
log.Debugf("Shutting Down Bus for %v", id)
app.eventBuses[id].Shutdown()
}
log.Debugf("Shutting Down App")
app.appBus.Shutdown()
log.Debugf("Shut Down Complete")
}

4
app/plugins/contactRetry.go

@ -9,7 +9,7 @@ import (
)
const tickTime = 10 * time.Second
const maxBakoff int = 32 // 320 seconds or ~5 min
const maxBackoff int = 32 // 320 seconds or ~5 min
type connectionType int
@ -143,7 +143,7 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
p.state = connections.DISCONNECTED
if p.backoff == 0 {
p.backoff = 1
} else if p.backoff < maxBakoff {
} else if p.backoff < maxBackoff {
p.backoff *= 2
}
p.ticks = 0

2
event/common.go

@ -316,7 +316,7 @@ const (
// Defining Common errors
const (
AppErrLoaded0 = "Loaded 0 profiles"
AppErrLoaded0 = "Loaded 0 profiles"
PasswordMatchError = "Password did not match"
)

39
model/message.go

@ -1,7 +1,7 @@
package model
import (
"encoding/json"
"encoding/base64"
"sort"
"sync"
"time"
@ -13,6 +13,9 @@ type Timeline struct {
Messages []Message
SignedGroupID []byte
lock sync.Mutex
// a cache to allow quick checks for existing messages...
signatureCache map[string]bool
}
// Message is a local representation of a given message sent over a group chat channel.
@ -33,6 +36,9 @@ type Message struct {
// MessageBaseSize 2021.06 byte size of an *empty* message json serialized
const MessageBaseSize float64 = 463
// compareSignatures checks if a and b are equal. Note: this function does
// not need to be constant time - in fact it is better that it is not as it's only main use
// is in sorting timeline state consistently.
func compareSignatures(a []byte, b []byte) bool {
if len(a) != len(b) {
return false
@ -58,9 +64,9 @@ func (t *Timeline) GetMessages() []Message {
func (t *Timeline) GetCopy() *Timeline {
t.lock.Lock()
defer t.lock.Unlock()
bytes, _ := json.Marshal(t)
newt := &Timeline{}
json.Unmarshal(bytes, newt)
// initialize the timeline and copy the message over...
newt.SetMessages(t.Messages)
return newt
}
@ -68,7 +74,11 @@ func (t *Timeline) GetCopy() *Timeline {
func (t *Timeline) SetMessages(messages []Message) {
t.lock.Lock()
defer t.lock.Unlock()
t.init()
t.Messages = messages
for _, message := range t.Messages {
t.signatureCache[base64.StdEncoding.EncodeToString(message.Signature)] = true
}
}
// Len gets the length of the timeline
@ -108,22 +118,31 @@ func (t *Timeline) Less(i, j int) bool {
func (t *Timeline) Sort() {
t.lock.Lock()
defer t.lock.Unlock()
sort.Sort(t)
}
// Insert inserts a message into the timeline in a thread safe way.
// Insert a message into the timeline in a thread safe way.
func (t *Timeline) Insert(mi *Message) bool {
t.lock.Lock()
defer t.lock.Unlock()
for _, m := range t.Messages {
// If the message already exists, then we don't add it
if compareSignatures(m.Signature, mi.Signature) {
return true
}
// assert timeline is initialized
t.init()
_, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)]
if exists {
return true
}
t.Messages = append(t.Messages, *mi)
sort.Sort(t)
t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = true
return false
}
func (t *Timeline) init() {
// only allow this setting once...
if t.signatureCache == nil {
t.signatureCache = make(map[string]bool)
}
}

1
peer/cwtch_peer.go

@ -721,6 +721,7 @@ func (cp *cwtchPeer) eventHandler() {
// This is mitigated somewhat by resync events which do wipe things entire.
// The security of cwtch groups are also not dependent on the servers inability to uniquely tag connections (as long as
// it learns nothing else about each connection).
// store the base64 encoded signature for later use
cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
cp.mutex.Lock()

2
storage/v1/profile_store.go

@ -317,6 +317,7 @@ func (ps *ProfileStoreV1) load() error {
ss := NewStreamStore(ps.directory, group.LocalID, ps.key)
cp.Groups[gid].Timeline.SetMessages(ss.Read())
cp.Groups[gid].Timeline.Sort()
ps.streamStores[group.GroupID] = ss
}
@ -504,6 +505,7 @@ func (ps *ProfileStoreV1) eventHandler() {
}
}
default:
log.Debugf("shutting down profile store: %v", ev)
return
}

Loading…
Cancel
Save