Add Content Addressing to Timeline #381
|
@ -1,7 +1,9 @@
|
|||
package model
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -16,6 +18,22 @@ type Timeline struct {
|
|||
|
||||
// a cache to allow quick checks for existing messages...
|
||||
signatureCache map[string]bool
|
||||
|
||||
// a cache to allowing looking up messages by content hash
|
||||
// we need this for features like reply-to message, and other self
|
||||
// referential applications.
|
||||
// note: that the index stored here is not global as different peers may have difference views of the timeline
|
||||
// depending on if they save history, and when the last time they purged their timeline was, as such we can't
|
||||
// simply send the index of the message.
|
||||
hashCache map[string][]int
|
||||
}
|
||||
|
||||
|
||||
// LocallyIndexedMessage is a type wrapper around a Message and a TimeLine Index that is local to this
|
||||
// instance of the timeline.
|
||||
type LocallyIndexedMessage struct {
|
||||
Message
|
||||
LocalIndex int
|
||||
}
|
||||
|
||||
// Message is a local representation of a given message sent over a group chat channel.
|
||||
|
@ -76,11 +94,50 @@ func (t *Timeline) SetMessages(messages []Message) {
|
|||
defer t.lock.Unlock()
|
||||
t.init()
|
||||
t.Messages = messages
|
||||
for _, message := range t.Messages {
|
||||
for idx, message := range t.Messages {
|
||||
t.signatureCache[base64.StdEncoding.EncodeToString(message.Signature)] = true
|
||||
t.hashCache[t.calculateHash(message)] = append(t.hashCache[t.calculateHash(message)], idx)
|
||||
}
|
||||
}
|
||||
|
||||
// GetMessagesByHash attempts to find messages that match the given
|
||||
// content hash in the timeline. If successful it returns a list of messages as well as their local index
|
||||
//, on failure it returns an error.
|
||||
// We return a list of messages because content hashes are not guaranteed to be unique from a given Peer. This allows
|
||||
// us to do things like: ensure that reply-to and quotes reference the last seen message from the message they are quoted
|
||||
// in or detect duplicate messages from a peer.
|
||||
func (t *Timeline) GetMessagesByHash(contentHash string) ([]LocallyIndexedMessage, error) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
t.init()
|
||||
if idxs, exists := t.hashCache[contentHash]; exists {
|
||||
var messages []LocallyIndexedMessage
|
||||
for _,idx := range idxs {
|
||||
messages = append(messages, LocallyIndexedMessage{LocalIndex: idx, Message: t.Messages[idx]})
|
||||
}
|
||||
return messages, nil
|
||||
}
|
||||
return nil, errors.New("cannot find message by hash")
|
||||
}
|
||||
|
||||
// calculateHash calculates the content hash of a given message
|
||||
// the content used is the sender of the message, the body of the message
|
||||
//
|
||||
// content hashes must be calculable across timeline views so that different participants can
|
||||
// calculate the same hash for the same message - as such we cannot use timestamps from peers or groups
|
||||
// as they are mostly fuzzy.
|
||||
//
|
||||
// As a reminder: for p2p messages PeerID is authenticated by the initial 3DH handshake, for groups
|
||||
// each message is signed by the sender, and this signature is checked prior to inclusion in the timeline.
|
||||
//
|
||||
// Multiple messages from the same peer can result in the same hash (where the same user sends the same message more
|
||||
// than once) - in this case we will only store the idx of the most recent message - and use that for reference lookups.
|
||||
func (t *Timeline) calculateHash(message Message) string {
|
||||
content := []byte(message.PeerID + message.Message)
|
||||
contentBasedHash := sha256.Sum256(content)
|
||||
return base64.StdEncoding.EncodeToString(contentBasedHash[:])
|
||||
}
|
||||
|
||||
// Len gets the length of the timeline
|
||||
func (t *Timeline) Len() int {
|
||||
return len(t.Messages)
|
||||
|
@ -130,13 +187,20 @@ func (t *Timeline) Insert(mi *Message) bool {
|
|||
// assert timeline is initialized
|
||||
t.init()
|
||||
|
||||
// check that we haven't seen this message before (this has no impact on p2p messages, but is essential for
|
||||
// group messages)
|
||||
_, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)]
|
||||
if exists {
|
||||
return true
|
||||
}
|
||||
|
||||
// update the message store
|
||||
t.Messages = append(t.Messages, *mi)
|
||||
// add to signature cache for fast checking of group messages...
|
||||
t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = true
|
||||
// content based addressing index
|
||||
contentHash := t.calculateHash(*mi)
|
||||
t.hashCache[contentHash] = append(t.hashCache[contentHash], len(t.Messages)-1)
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -145,4 +209,8 @@ func (t *Timeline) init() {
|
|||
if t.signatureCache == nil {
|
||||
t.signatureCache = make(map[string]bool)
|
||||
}
|
||||
|
||||
if t.hashCache == nil {
|
||||
t.hashCache = make(map[string][]int)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -100,4 +100,28 @@ func TestTranscriptConsistency(t *testing.T) {
|
|||
|
||||
t.Logf("Messages %v: %v %x %x", i, m.Message, m.Signature, m.PreviousMessageSig)
|
||||
}
|
||||
|
||||
// Test message by hash lookup...
|
||||
hash := timeline.calculateHash(*m5)
|
||||
|
||||
t.Logf("Looking up %v ", hash)
|
||||
|
||||
for key,msgs := range timeline.hashCache {
|
||||
t.Logf("%v %v", key, msgs)
|
||||
}
|
||||
|
||||
// check a real message..
|
||||
msgs, err := timeline.GetMessagesByHash(hash)
|
||||
if err != nil || len(msgs) != 1 {
|
||||
t.Fatalf("looking up message by hash %v should have not errored: %v", hash, err)
|
||||
} else if msgs[0].Message.Message != m5.Message {
|
||||
t.Fatalf("%v != %v", msgs[0].Message, m5.Message)
|
||||
}
|
||||
|
||||
// Check a non existed hash... error if there is no error
|
||||
_, err = timeline.GetMessagesByHash("not a real hash")
|
||||
if err == nil {
|
||||
t.Fatalf("looking up message by hash %v should have errored: %v", hash, err)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue