Fixes to enable more efficient message syncing / storage #372
|
@ -290,10 +290,16 @@ func (app *application) ShutdownPeer(onion string) {
|
|||
func (app *application) Shutdown() {
|
||||
for id, peer := range app.peers {
|
||||
peer.Shutdown()
|
||||
log.Debugf("Shutting Down Peer %v", id)
|
||||
app.appletPlugins.ShutdownPeer(id)
|
||||
log.Debugf("Shutting Down Engines for %v", id)
|
||||
app.engines[id].Shutdown()
|
||||
log.Debugf("Shutting Down Storage for %v", id)
|
||||
app.storage[id].Shutdown()
|
||||
log.Debugf("Shutting Down Bus for %v", id)
|
||||
app.eventBuses[id].Shutdown()
|
||||
}
|
||||
log.Debugf("Shutting Down App")
|
||||
app.appBus.Shutdown()
|
||||
log.Debugf("Shut Down Complete")
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@ import (
|
|||
)
|
||||
|
||||
const tickTime = 10 * time.Second
|
||||
const maxBakoff int = 32 // 320 seconds or ~5 min
|
||||
const maxBackoff int = 32 // 320 seconds or ~5 min
|
||||
|
||||
type connectionType int
|
||||
|
||||
|
@ -143,7 +143,7 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
|
|||
p.state = connections.DISCONNECTED
|
||||
if p.backoff == 0 {
|
||||
p.backoff = 1
|
||||
} else if p.backoff < maxBakoff {
|
||||
} else if p.backoff < maxBackoff {
|
||||
p.backoff *= 2
|
||||
}
|
||||
p.ticks = 0
|
||||
|
|
|
@ -316,7 +316,7 @@ const (
|
|||
|
||||
// Defining Common errors
|
||||
const (
|
||||
AppErrLoaded0 = "Loaded 0 profiles"
|
||||
AppErrLoaded0 = "Loaded 0 profiles"
|
||||
PasswordMatchError = "Password did not match"
|
||||
)
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"encoding/base64"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -13,6 +13,9 @@ type Timeline struct {
|
|||
Messages []Message
|
||||
SignedGroupID []byte
|
||||
lock sync.Mutex
|
||||
|
||||
// a cache to allow quick checks for existing messages...
|
||||
signatureCache map[string]bool
|
||||
}
|
||||
|
||||
// Message is a local representation of a given message sent over a group chat channel.
|
||||
|
@ -33,6 +36,9 @@ type Message struct {
|
|||
// MessageBaseSize 2021.06 byte size of an *empty* message json serialized
|
||||
const MessageBaseSize float64 = 463
|
||||
|
||||
// compareSignatures checks if a and b are equal. Note: this function does
|
||||
// not need to be constant time - in fact it is better that it is not as it's only main use
|
||||
// is in sorting timeline state consistently.
|
||||
func compareSignatures(a []byte, b []byte) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
|
@ -58,9 +64,9 @@ func (t *Timeline) GetMessages() []Message {
|
|||
func (t *Timeline) GetCopy() *Timeline {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
bytes, _ := json.Marshal(t)
|
||||
newt := &Timeline{}
|
||||
json.Unmarshal(bytes, newt)
|
||||
// initialize the timeline and copy the message over...
|
||||
newt.SetMessages(t.Messages)
|
||||
return newt
|
||||
}
|
||||
|
||||
|
@ -68,7 +74,11 @@ func (t *Timeline) GetCopy() *Timeline {
|
|||
func (t *Timeline) SetMessages(messages []Message) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
t.init()
|
||||
t.Messages = messages
|
||||
for _, message := range t.Messages {
|
||||
t.signatureCache[base64.StdEncoding.EncodeToString(message.Signature)] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Len gets the length of the timeline
|
||||
|
@ -108,22 +118,31 @@ func (t *Timeline) Less(i, j int) bool {
|
|||
func (t *Timeline) Sort() {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
sort.Sort(t)
|
||||
}
|
||||
|
||||
// Insert inserts a message into the timeline in a thread safe way.
|
||||
// Insert a message into the timeline in a thread safe way.
|
||||
func (t *Timeline) Insert(mi *Message) bool {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
for _, m := range t.Messages {
|
||||
// If the message already exists, then we don't add it
|
||||
if compareSignatures(m.Signature, mi.Signature) {
|
||||
return true
|
||||
}
|
||||
// assert timeline is initialized
|
||||
t.init()
|
||||
|
||||
_, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)]
|
||||
if exists {
|
||||
return true
|
||||
}
|
||||
|
||||
t.Messages = append(t.Messages, *mi)
|
||||
sort.Sort(t)
|
||||
t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = true
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *Timeline) init() {
|
||||
// only allow this setting once...
|
||||
if t.signatureCache == nil {
|
||||
t.signatureCache = make(map[string]bool)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -721,6 +721,7 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
// This is mitigated somewhat by resync events which do wipe things entire.
|
||||
// The security of cwtch groups are also not dependent on the servers inability to uniquely tag connections (as long as
|
||||
// it learns nothing else about each connection).
|
||||
// store the base64 encoded signature for later use
|
||||
cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
|
||||
|
||||
cp.mutex.Lock()
|
||||
|
|
|
@ -317,6 +317,7 @@ func (ps *ProfileStoreV1) load() error {
|
|||
ss := NewStreamStore(ps.directory, group.LocalID, ps.key)
|
||||
|
||||
cp.Groups[gid].Timeline.SetMessages(ss.Read())
|
||||
cp.Groups[gid].Timeline.Sort()
|
||||
ps.streamStores[group.GroupID] = ss
|
||||
}
|
||||
|
||||
|
@ -504,6 +505,7 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
}
|
||||
}
|
||||
default:
|
||||
log.Debugf("shutting down profile store: %v", ev)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue