forked from cwtch.im/cwtch
Allow Updating of Message Flags
This commit is contained in:
parent
d5024e2bd3
commit
c2ab6af7b8
|
@ -110,6 +110,12 @@ const (
|
|||
// RemotePeer: The peer associated with the acknowledgement
|
||||
IndexedFailure = Type("IndexedFailure")
|
||||
|
||||
// UpdateMessageFlags will change the flags associated with a given message.
|
||||
// Handle
|
||||
// Message Index
|
||||
// Flags
|
||||
UpdateMessageFlags = Type("UpdateMessageFlags")
|
||||
|
||||
// attributes:
|
||||
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"]
|
||||
// Error: string describing the error
|
||||
|
@ -287,6 +293,12 @@ const (
|
|||
EventContext = Field("EventContext")
|
||||
Index = Field("Index")
|
||||
|
||||
// Handle denotes a contact handle of any type.
|
||||
Handle = Field("Handle")
|
||||
|
||||
// Flags denotes a set of message flags
|
||||
Flags = Field("Flags")
|
||||
|
||||
Authorization = Field("Authorization")
|
||||
|
||||
KeyBundle = Field("KeyBundle")
|
||||
|
|
|
@ -26,6 +26,8 @@ type Message struct {
|
|||
ReceivedByServer bool // messages sent to a server
|
||||
Acknowledged bool // peer to peer
|
||||
Error string `json:",omitempty"`
|
||||
// Application specific flags, useful for storing small amounts of metadata
|
||||
Flags uint64
|
||||
}
|
||||
|
||||
// MessageBaseSize is a rough estimate of the base number of bytes the struct uses before strings are populated
|
||||
|
|
|
@ -124,6 +124,21 @@ func (p *Profile) AddContact(onion string, profile *PublicProfile) {
|
|||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
// UpdateMessageFlags updates the flags stored with a message
|
||||
func (p *Profile) UpdateMessageFlags(handle string, mIdx int, flags uint64) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
if contact, exists := p.Contacts[handle]; exists {
|
||||
if len(contact.Timeline.Messages) > mIdx {
|
||||
contact.Timeline.Messages[mIdx].Flags = flags
|
||||
}
|
||||
} else if group, exists := p.Groups[handle]; exists {
|
||||
if len(group.Timeline.Messages) > mIdx {
|
||||
contact.Timeline.Messages[mIdx].Flags = flags
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteContact deletes a peer contact
|
||||
func (p *Profile) DeleteContact(onion string) {
|
||||
p.lock.Lock()
|
||||
|
|
|
@ -48,6 +48,14 @@ type cwtchPeer struct {
|
|||
eventBus event.Manager
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) UpdateMessageFlags(handle string, mIdx int, flags uint64) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
log.Debugf("Updating Flags for %v %v %v", handle, mIdx, flags)
|
||||
cp.Profile.UpdateMessageFlags(handle, mIdx, flags)
|
||||
cp.eventBus.Publish(event.NewEvent(event.UpdateMessageFlags, map[event.Field]string{event.Handle: handle, event.Index: strconv.Itoa(mIdx), event.Flags: strconv.FormatUint(flags, 2)}))
|
||||
}
|
||||
|
||||
// BlockUnknownConnections will auto disconnect from connections if authentication doesn't resolve a hostname
|
||||
// known to peer.
|
||||
func (cp *cwtchPeer) BlockUnknownConnections() {
|
||||
|
@ -143,6 +151,11 @@ type SendMessagesToGroup interface {
|
|||
SendMessageToGroupTracked(string, string) (string, error)
|
||||
}
|
||||
|
||||
// ModifyMessages enables a caller to modify the messages in a timline
|
||||
type ModifyMessages interface {
|
||||
UpdateMessageFlags(string, int, uint64)
|
||||
}
|
||||
|
||||
// CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
|
||||
// directly implement a cwtchPeer.
|
||||
type CwtchPeer interface {
|
||||
|
@ -174,6 +187,7 @@ type CwtchPeer interface {
|
|||
ModifyServers
|
||||
|
||||
SendMessages
|
||||
ModifyMessages
|
||||
SendMessagesToGroup
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -86,6 +87,7 @@ func (ps *ProfileStoreV1) initProfileWriterStore() {
|
|||
ps.eventManager.Subscribe(event.DeleteContact, ps.queue)
|
||||
ps.eventManager.Subscribe(event.DeleteGroup, ps.queue)
|
||||
ps.eventManager.Subscribe(event.ChangePassword, ps.queue)
|
||||
ps.eventManager.Subscribe(event.UpdateMessageFlags, ps.queue)
|
||||
}
|
||||
|
||||
// LoadProfileWriterStore loads a profile store from filestore listening for events and saving them
|
||||
|
@ -243,9 +245,26 @@ func (ps *ProfileStoreV1) save() error {
|
|||
bytes, _ := json.Marshal(ps.profile)
|
||||
return ps.fs.Write(bytes)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *ProfileStoreV1) regenStreamStore(messages []model.Message, contact string) {
|
||||
oldss := ps.streamStores[contact]
|
||||
newLocalID := model.GenerateRandomID()
|
||||
newSS := NewStreamStore(ps.directory, newLocalID, ps.key)
|
||||
newSS.WriteN(messages)
|
||||
if len(contact) == groupIDLen {
|
||||
ps.profile.Groups[contact].LocalID = newLocalID
|
||||
} else {
|
||||
// We can assume this exists as regen stream store should only happen to *update* a message
|
||||
ps.profile.Contacts[contact].LocalID = newLocalID
|
||||
}
|
||||
ps.streamStores[contact] = newSS
|
||||
ps.save()
|
||||
oldss.Delete()
|
||||
}
|
||||
|
||||
// load instantiates a cwtchPeer from the file store
|
||||
func (ps *ProfileStoreV1) load() error {
|
||||
decrypted, err := ps.fs.Read()
|
||||
|
@ -394,6 +413,7 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
// stream store doesn't support updates, so we don't want to commit it till ack'd
|
||||
ps.profile.AddSentMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now(), ev.EventID)
|
||||
case event.NewMessageFromPeer:
|
||||
ps.profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now())
|
||||
ps.attemptSavePeerMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.TimestampReceived], true)
|
||||
case event.PeerAcknowledgement:
|
||||
onion := ev.Data[event.RemotePeer]
|
||||
|
@ -416,6 +436,8 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: sig, PreviousMessageSig: prevsig, Acknowledged: true}
|
||||
ss, exists := ps.streamStores[groupid]
|
||||
if exists {
|
||||
// We need to store a local copy of the message...
|
||||
ps.profile.GetGroup(groupid).Timeline.Insert(&message)
|
||||
ss.Write(message)
|
||||
} else {
|
||||
log.Errorf("error storing new group message: %v stream store does not exist", ev)
|
||||
|
@ -452,6 +474,29 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
oldpass := ev.Data[event.Password]
|
||||
newpass := ev.Data[event.NewPassword]
|
||||
ps.ChangePassword(oldpass, newpass, ev.EventID)
|
||||
case event.UpdateMessageFlags:
|
||||
handle := ev.Data[event.Handle]
|
||||
mIx, err := strconv.Atoi(ev.Data[event.Index])
|
||||
if err != nil {
|
||||
log.Errorf("Invalid Message Index: %v", err)
|
||||
return
|
||||
}
|
||||
flags, err := strconv.ParseUint(ev.Data[event.Flags], 2, 64)
|
||||
if err != nil {
|
||||
log.Errorf("Invalid Message Falgs: %v", err)
|
||||
return
|
||||
}
|
||||
ps.profile.UpdateMessageFlags(handle, mIx, flags)
|
||||
if len(handle) == groupIDLen {
|
||||
ps.regenStreamStore(ps.profile.GetGroup(handle).Timeline.Messages, handle)
|
||||
} else if contact, exists := ps.profile.GetContact(handle); exists {
|
||||
if exists {
|
||||
val, _ := contact.GetAttribute(event.SaveHistoryKey)
|
||||
if val == event.SaveHistoryConfirmed {
|
||||
ps.regenStreamStore(contact.Timeline.Messages, handle)
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
@ -472,7 +517,6 @@ func (ps *ProfileStoreV1) attemptSavePeerMessage(peerID, messageData, timestampe
|
|||
peerID := peerID
|
||||
var received time.Time
|
||||
var message model.Message
|
||||
|
||||
if fromPeer {
|
||||
received, _ = time.Parse(time.RFC3339Nano, timestampeReceived)
|
||||
message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}
|
||||
|
|
Loading…
Reference in New Issue