cwtch/model/message.go

238 lines
7.6 KiB
Go

package model
import (
"crypto/sha256"
"encoding/base64"
"errors"
"sort"
"sync"
"time"
)
// Timeline encapsulates a collection of ordered Messages, and a mechanism to access them
// in a threadsafe manner.
type Timeline struct {
Messages []Message
SignedGroupID []byte
lock sync.Mutex
// a cache to allow quick checks for existing messages...
signatureCache map[string]int
// a cache to allowing looking up messages by content hash
// we need this for features like reply-to message, and other self
// referential applications.
// note: that the index stored here is not global as different peers may have difference views of the timeline
// depending on if they save history, and when the last time they purged their timeline was, as such we can't
// simply send the index of the message.
hashCache map[string][]int
}
// LocallyIndexedMessage is a type wrapper around a Message and a TimeLine Index that is local to this
// instance of the timeline.
type LocallyIndexedMessage struct {
Message
LocalIndex int
}
// Message is a local representation of a given message sent over a group chat channel.
type Message struct {
Timestamp time.Time
Received time.Time
PeerID string
Message string
Signature []byte
PreviousMessageSig []byte
ReceivedByServer bool // messages sent to a server
Acknowledged bool // peer to peer
Error string `json:",omitempty"`
// Application specific flags, useful for storing small amounts of metadata
Flags uint64
}
// MessageBaseSize 2021.06 byte size of an *empty* message json serialized
const MessageBaseSize float64 = 463
// compareSignatures checks if a and b are equal. Note: this function does
// not need to be constant time - in fact it is better that it is not as it's only main use
// is in sorting timeline state consistently.
func compareSignatures(a []byte, b []byte) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
// GetMessages returns a copy of the entire timeline
func (t *Timeline) GetMessages() []Message {
t.lock.Lock()
messages := make([]Message, len(t.Messages))
copy(messages[:], t.Messages[:])
t.lock.Unlock()
return messages
}
// GetCopy returns a duplicate of the Timeline
func (t *Timeline) GetCopy() *Timeline {
t.lock.Lock()
defer t.lock.Unlock()
newt := &Timeline{}
// initialize the timeline and copy the message over...
newt.SetMessages(t.Messages)
return newt
}
// SetMessages sets the Messages of this timeline. Only to be used in loading/initialization
func (t *Timeline) SetMessages(messages []Message) {
t.lock.Lock()
t.init()
t.lock.Unlock()
for _, m := range messages {
t.Insert(&m)
}
}
// GetMessagesByHash attempts to find messages that match the given
// content hash in the timeline. If successful it returns a list of messages as well as their local index
// , on failure it returns an error.
// We return a list of messages because content hashes are not guaranteed to be unique from a given Peer. This allows
// us to do things like: ensure that reply-to and quotes reference the last seen message from the message they are quoted
// in or detect duplicate messages from a peer.
func (t *Timeline) GetMessagesByHash(contentHash string) ([]LocallyIndexedMessage, error) {
t.lock.Lock()
defer t.lock.Unlock()
t.init()
if idxs, exists := t.hashCache[contentHash]; exists {
var messages []LocallyIndexedMessage
for _, idx := range idxs {
messages = append(messages, LocallyIndexedMessage{LocalIndex: idx, Message: t.Messages[idx]})
}
return messages, nil
}
return nil, errors.New("cannot find message by hash")
}
// calculateHash calculates the content hash of a given message
// the content used is the sender of the message, the body of the message
//
// content hashes must be calculable across timeline views so that different participants can
// calculate the same hash for the same message - as such we cannot use timestamps from peers or groups
// as they are mostly fuzzy.
//
// As a reminder: for p2p messages PeerID is authenticated by the initial 3DH handshake, for groups
// each message is signed by the sender, and this signature is checked prior to inclusion in the timeline.
//
// Multiple messages from the same peer can result in the same hash (where the same user sends the same message more
// than once) - in this case we will only store the idx of the most recent message - and use that for reference lookups.
func (t *Timeline) calculateHash(message Message) string {
content := []byte(message.PeerID + message.Message)
contentBasedHash := sha256.Sum256(content)
return base64.StdEncoding.EncodeToString(contentBasedHash[:])
}
// Len gets the length of the timeline
func (t *Timeline) Len() int {
return len(t.Messages)
}
// Swap swaps 2 Messages on the timeline.
func (t *Timeline) Swap(i, j int) {
t.Messages[i], t.Messages[j] = t.Messages[j], t.Messages[i]
}
// Less checks 2 Messages (i and j) in the timeline and returns true if i occurred before j, else false
func (t *Timeline) Less(i, j int) bool {
if t.Messages[i].Timestamp.Before(t.Messages[j].Timestamp) {
return true
}
// Short circuit false if j is before i, signature checks will give a wrong order in this case.
if t.Messages[j].Timestamp.Before(t.Messages[i].Timestamp) {
return false
}
if compareSignatures(t.Messages[i].PreviousMessageSig, t.SignedGroupID) {
return true
}
if compareSignatures(t.Messages[i].Signature, t.Messages[j].PreviousMessageSig) {
return true
}
return false
}
// Sort sorts the timeline in a canonical order.
func (t *Timeline) Sort() {
t.lock.Lock()
defer t.lock.Unlock()
sort.Sort(t)
}
// Insert a message into the timeline in a thread safe way.
func (t *Timeline) Insert(mi *Message) int {
t.lock.Lock()
defer t.lock.Unlock()
// assert timeline is initialized
t.init()
// check that we haven't seen this message before (this has no impact on p2p messages, but is essential for
// group messages)
// FIXME: The below code now checks if the message has a signature. If it doesn't then skip duplication check.
// We do this because p2p messages right now do not have a signature, and so many p2p messages are not stored
// with a signature. In the future in hybrid groups this check will go away as all timelines will use the same
// underlying protocol.
// This is currently safe to do because p2p does not rely on signatures and groups will verify the signature of
// messages prior to generating an event to include them in the timeline.
if len(mi.Signature) != 0 {
idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)]
if exists {
t.Messages[idx].Acknowledged = true
return idx
}
}
// update the message store
t.Messages = append(t.Messages, *mi)
// add to signature cache for fast checking of group messages...
t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = len(t.Messages) - 1
// content based addressing index
contentHash := t.calculateHash(*mi)
t.hashCache[contentHash] = append(t.hashCache[contentHash], len(t.Messages)-1)
return len(t.Messages) - 1
}
func (t *Timeline) init() {
// only allow this setting once...
if t.signatureCache == nil {
t.signatureCache = make(map[string]int)
}
if t.hashCache == nil {
t.hashCache = make(map[string][]int)
}
}
// SetSendError marks a message has having some kind of application specific error.
// Note: The message here is indexed by signature.
func (t *Timeline) SetSendError(sig []byte, e string) bool {
t.lock.Lock()
defer t.lock.Unlock()
idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(sig)]
if !exists {
return false
}
t.Messages[idx].Error = e
return true
}