diff --git a/app/app.go b/app/app.go index 9aba3741..a828266 100644 --- a/app/app.go +++ b/app/app.go @@ -290,10 +290,16 @@ func (app *application) ShutdownPeer(onion string) { func (app *application) Shutdown() { for id, peer := range app.peers { peer.Shutdown() + log.Debugf("Shutting Down Peer %v", id) app.appletPlugins.ShutdownPeer(id) + log.Debugf("Shutting Down Engines for %v", id) app.engines[id].Shutdown() + log.Debugf("Shutting Down Storage for %v", id) app.storage[id].Shutdown() + log.Debugf("Shutting Down Bus for %v", id) app.eventBuses[id].Shutdown() } + log.Debugf("Shutting Down App") app.appBus.Shutdown() + log.Debugf("Shut Down Complete") } diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 327dbea..b019c13 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -9,7 +9,7 @@ import ( ) const tickTime = 10 * time.Second -const maxBakoff int = 32 // 320 seconds or ~5 min +const maxBackoff int = 32 // 320 seconds or ~5 min type connectionType int @@ -143,7 +143,7 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState p.state = connections.DISCONNECTED if p.backoff == 0 { p.backoff = 1 - } else if p.backoff < maxBakoff { + } else if p.backoff < maxBackoff { p.backoff *= 2 } p.ticks = 0 diff --git a/event/common.go b/event/common.go index eca9d67..f5a7332 100644 --- a/event/common.go +++ b/event/common.go @@ -316,7 +316,7 @@ const ( // Defining Common errors const ( - AppErrLoaded0 = "Loaded 0 profiles" + AppErrLoaded0 = "Loaded 0 profiles" PasswordMatchError = "Password did not match" ) diff --git a/model/message.go b/model/message.go index dd5e5fb..0370643 100644 --- a/model/message.go +++ b/model/message.go @@ -1,7 +1,7 @@ package model import ( - "encoding/json" + "encoding/base64" "sort" "sync" "time" @@ -13,6 +13,9 @@ type Timeline struct { Messages []Message SignedGroupID []byte lock sync.Mutex + + // a cache to allow quick checks for existing messages... + signatureCache map[string]bool } // Message is a local representation of a given message sent over a group chat channel. @@ -33,6 +36,9 @@ type Message struct { // MessageBaseSize 2021.06 byte size of an *empty* message json serialized const MessageBaseSize float64 = 463 +// compareSignatures checks if a and b are equal. Note: this function does +// not need to be constant time - in fact it is better that it is not as it's only main use +// is in sorting timeline state consistently. func compareSignatures(a []byte, b []byte) bool { if len(a) != len(b) { return false @@ -58,9 +64,9 @@ func (t *Timeline) GetMessages() []Message { func (t *Timeline) GetCopy() *Timeline { t.lock.Lock() defer t.lock.Unlock() - bytes, _ := json.Marshal(t) newt := &Timeline{} - json.Unmarshal(bytes, newt) + // initialize the timeline and copy the message over... + newt.SetMessages(t.Messages) return newt } @@ -68,7 +74,11 @@ func (t *Timeline) GetCopy() *Timeline { func (t *Timeline) SetMessages(messages []Message) { t.lock.Lock() defer t.lock.Unlock() + t.init() t.Messages = messages + for _, message := range t.Messages { + t.signatureCache[base64.StdEncoding.EncodeToString(message.Signature)] = true + } } // Len gets the length of the timeline @@ -108,22 +118,31 @@ func (t *Timeline) Less(i, j int) bool { func (t *Timeline) Sort() { t.lock.Lock() defer t.lock.Unlock() + sort.Sort(t) } -// Insert inserts a message into the timeline in a thread safe way. +// Insert a message into the timeline in a thread safe way. func (t *Timeline) Insert(mi *Message) bool { t.lock.Lock() defer t.lock.Unlock() - for _, m := range t.Messages { - // If the message already exists, then we don't add it - if compareSignatures(m.Signature, mi.Signature) { - return true - } + // assert timeline is initialized + t.init() + + _, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] + if exists { + return true } t.Messages = append(t.Messages, *mi) - sort.Sort(t) + t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = true return false } + +func (t *Timeline) init() { + // only allow this setting once... + if t.signatureCache == nil { + t.signatureCache = make(map[string]bool) + } +} diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 8e924ac..4814cf9 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -721,6 +721,7 @@ func (cp *cwtchPeer) eventHandler() { // This is mitigated somewhat by resync events which do wipe things entire. // The security of cwtch groups are also not dependent on the servers inability to uniquely tag connections (as long as // it learns nothing else about each connection). + // store the base64 encoded signature for later use cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature]) cp.mutex.Lock() diff --git a/storage/v1/profile_store.go b/storage/v1/profile_store.go index 436a4a8..65cdb44 100644 --- a/storage/v1/profile_store.go +++ b/storage/v1/profile_store.go @@ -317,6 +317,7 @@ func (ps *ProfileStoreV1) load() error { ss := NewStreamStore(ps.directory, group.LocalID, ps.key) cp.Groups[gid].Timeline.SetMessages(ss.Read()) + cp.Groups[gid].Timeline.Sort() ps.streamStores[group.GroupID] = ss } @@ -504,6 +505,7 @@ func (ps *ProfileStoreV1) eventHandler() { } } default: + log.Debugf("shutting down profile store: %v", ev) return }