diff --git a/event/common.go b/event/common.go index 57cc35e..7e75f43 100644 --- a/event/common.go +++ b/event/common.go @@ -110,6 +110,12 @@ const ( // RemotePeer: The peer associated with the acknowledgement IndexedFailure = Type("IndexedFailure") + // UpdateMessageFlags will change the flags associated with a given message. + // Handle + // Message Index + // Flags + UpdateMessageFlags = Type("UpdateMessageFlags") + // attributes: // RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"] // Error: string describing the error @@ -287,6 +293,12 @@ const ( EventContext = Field("EventContext") Index = Field("Index") + // Handle denotes a contact handle of any type. + Handle = Field("Handle") + + // Flags denotes a set of message flags + Flags = Field("Flags") + Authorization = Field("Authorization") KeyBundle = Field("KeyBundle") diff --git a/go.mod b/go.mod index b803206..579fb67 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module cwtch.im/cwtch go 1.14 require ( - git.openprivacy.ca/cwtch.im/tapir v0.4.2 - git.openprivacy.ca/openprivacy/connectivity v1.4.3 + git.openprivacy.ca/cwtch.im/tapir v0.4.3 + git.openprivacy.ca/openprivacy/connectivity v1.4.4 git.openprivacy.ca/openprivacy/log v1.0.2 github.com/gtank/ristretto255 v0.1.2 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect diff --git a/go.sum b/go.sum index 161160b..71566fb 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ git.openprivacy.ca/cwtch.im/tapir v0.4.1 h1:9LMpQX41IzecNNlRc1FZKXHg6wlFss679tFs git.openprivacy.ca/cwtch.im/tapir v0.4.1/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E= git.openprivacy.ca/cwtch.im/tapir v0.4.2 h1:bxMWZnVJXX4dqqOFS7ELW4iFkVL4GS8wiRkjRv5rJe8= git.openprivacy.ca/cwtch.im/tapir v0.4.2/go.mod h1:eH6dZxXrhW0C4KZX18ksUa6XJCrEvtg8cJJ/Fy6gv+E= +git.openprivacy.ca/cwtch.im/tapir v0.4.3 h1:sctSfUXHDIqaHfJPDl+5lHtmoEJolQiHTcHZGAe5Qc4= +git.openprivacy.ca/cwtch.im/tapir v0.4.3/go.mod h1:10qEaib5x021zgyZ/97JKWsEpedH5+Vfy2CvB2V+08E= git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c= git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU= git.openprivacy.ca/openprivacy/connectivity v1.4.0 h1:c7AANUCrlA4hIqXxIGDOWMtSe8CpDleD1877PShScbM= @@ -24,6 +26,8 @@ git.openprivacy.ca/openprivacy/connectivity v1.4.2 h1:rQFIjWunLlRmXL5Efsv+7+1cA7 git.openprivacy.ca/openprivacy/connectivity v1.4.2/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo= git.openprivacy.ca/openprivacy/connectivity v1.4.3 h1:i2Ad/U9FlL9dKr2bhRck7lJ8NoWyGtoEfUwoCyMT0fU= git.openprivacy.ca/openprivacy/connectivity v1.4.3/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo= +git.openprivacy.ca/openprivacy/connectivity v1.4.4 h1:11M3akVCyy/luuhMpZTM1r9Jayl7IHD944Bxsn2FDpU= +git.openprivacy.ca/openprivacy/connectivity v1.4.4/go.mod h1:JVRCIdL+lAG6ohBFWiKeC/MN42nnC0sfFszR9XG6vPQ= git.openprivacy.ca/openprivacy/log v1.0.1/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= git.openprivacy.ca/openprivacy/log v1.0.2 h1:HLP4wsw4ljczFAelYnbObIs821z+jgMPCe8uODPnGQM= git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= @@ -52,6 +56,7 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/struCoder/pidusage v0.1.3 h1:pZcSa6asBE38TJtW0Nui6GeCjLTpaT/jAnNP7dUTLSQ= github.com/struCoder/pidusage v0.1.3/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI= +github.com/yuin/goldmark v1.3.5 h1:dPmz1Snjq0kmkz159iL7S6WzdahUTHnHB5M56WFVifs= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg= go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= @@ -63,6 +68,7 @@ golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -70,6 +76,7 @@ golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -78,6 +85,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -92,6 +100,7 @@ golang.org/x/tools v0.1.2 h1:kRBLX7v7Af8W7Gdbbc908OJcdgtK8bOz9Uaj8/F1ACA= golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/model/message.go b/model/message.go index 6d8347d..1d62c7c 100644 --- a/model/message.go +++ b/model/message.go @@ -26,6 +26,8 @@ type Message struct { ReceivedByServer bool // messages sent to a server Acknowledged bool // peer to peer Error string `json:",omitempty"` + // Application specific flags, useful for storing small amounts of metadata + Flags uint64 } // MessageBaseSize is a rough estimate of the base number of bytes the struct uses before strings are populated diff --git a/model/profile.go b/model/profile.go index ae93a0f..d33704b 100644 --- a/model/profile.go +++ b/model/profile.go @@ -124,6 +124,21 @@ func (p *Profile) AddContact(onion string, profile *PublicProfile) { p.lock.Unlock() } +// UpdateMessageFlags updates the flags stored with a message +func (p *Profile) UpdateMessageFlags(handle string, mIdx int, flags uint64) { + p.lock.Lock() + defer p.lock.Unlock() + if contact, exists := p.Contacts[handle]; exists { + if len(contact.Timeline.Messages) > mIdx { + contact.Timeline.Messages[mIdx].Flags = flags + } + } else if group, exists := p.Groups[handle]; exists { + if len(group.Timeline.Messages) > mIdx { + contact.Timeline.Messages[mIdx].Flags = flags + } + } +} + // DeleteContact deletes a peer contact func (p *Profile) DeleteContact(onion string) { p.lock.Lock() diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 25213fd..2dab063 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -48,6 +48,14 @@ type cwtchPeer struct { eventBus event.Manager } +func (cp *cwtchPeer) UpdateMessageFlags(handle string, mIdx int, flags uint64) { + cp.mutex.Lock() + defer cp.mutex.Unlock() + log.Debugf("Updating Flags for %v %v %v", handle, mIdx, flags) + cp.Profile.UpdateMessageFlags(handle, mIdx, flags) + cp.eventBus.Publish(event.NewEvent(event.UpdateMessageFlags, map[event.Field]string{event.Handle: handle, event.Index: strconv.Itoa(mIdx), event.Flags: strconv.FormatUint(flags, 2)})) +} + // BlockUnknownConnections will auto disconnect from connections if authentication doesn't resolve a hostname // known to peer. func (cp *cwtchPeer) BlockUnknownConnections() { @@ -143,6 +151,11 @@ type SendMessagesToGroup interface { SendMessageToGroupTracked(string, string) (string, error) } +// ModifyMessages enables a caller to modify the messages in a timline +type ModifyMessages interface { + UpdateMessageFlags(string, int, uint64) +} + // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to // directly implement a cwtchPeer. type CwtchPeer interface { @@ -174,6 +187,7 @@ type CwtchPeer interface { ModifyServers SendMessages + ModifyMessages SendMessagesToGroup } diff --git a/storage/v1/profile_store.go b/storage/v1/profile_store.go index 400ab35..5e7f607 100644 --- a/storage/v1/profile_store.go +++ b/storage/v1/profile_store.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "os" "path" + "strconv" "time" ) @@ -86,6 +87,7 @@ func (ps *ProfileStoreV1) initProfileWriterStore() { ps.eventManager.Subscribe(event.DeleteContact, ps.queue) ps.eventManager.Subscribe(event.DeleteGroup, ps.queue) ps.eventManager.Subscribe(event.ChangePassword, ps.queue) + ps.eventManager.Subscribe(event.UpdateMessageFlags, ps.queue) } // LoadProfileWriterStore loads a profile store from filestore listening for events and saving them @@ -243,9 +245,26 @@ func (ps *ProfileStoreV1) save() error { bytes, _ := json.Marshal(ps.profile) return ps.fs.Write(bytes) } + return nil } +func (ps *ProfileStoreV1) regenStreamStore(messages []model.Message, contact string) { + oldss := ps.streamStores[contact] + newLocalID := model.GenerateRandomID() + newSS := NewStreamStore(ps.directory, newLocalID, ps.key) + newSS.WriteN(messages) + if len(contact) == groupIDLen { + ps.profile.Groups[contact].LocalID = newLocalID + } else { + // We can assume this exists as regen stream store should only happen to *update* a message + ps.profile.Contacts[contact].LocalID = newLocalID + } + ps.streamStores[contact] = newSS + ps.save() + oldss.Delete() +} + // load instantiates a cwtchPeer from the file store func (ps *ProfileStoreV1) load() error { decrypted, err := ps.fs.Read() @@ -394,6 +413,7 @@ func (ps *ProfileStoreV1) eventHandler() { // stream store doesn't support updates, so we don't want to commit it till ack'd ps.profile.AddSentMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now(), ev.EventID) case event.NewMessageFromPeer: + ps.profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], time.Now()) ps.attemptSavePeerMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.TimestampReceived], true) case event.PeerAcknowledgement: onion := ev.Data[event.RemotePeer] @@ -416,6 +436,8 @@ func (ps *ProfileStoreV1) eventHandler() { message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: sig, PreviousMessageSig: prevsig, Acknowledged: true} ss, exists := ps.streamStores[groupid] if exists { + // We need to store a local copy of the message... + ps.profile.GetGroup(groupid).Timeline.Insert(&message) ss.Write(message) } else { log.Errorf("error storing new group message: %v stream store does not exist", ev) @@ -452,6 +474,29 @@ func (ps *ProfileStoreV1) eventHandler() { oldpass := ev.Data[event.Password] newpass := ev.Data[event.NewPassword] ps.ChangePassword(oldpass, newpass, ev.EventID) + case event.UpdateMessageFlags: + handle := ev.Data[event.Handle] + mIx, err := strconv.Atoi(ev.Data[event.Index]) + if err != nil { + log.Errorf("Invalid Message Index: %v", err) + return + } + flags, err := strconv.ParseUint(ev.Data[event.Flags], 2, 64) + if err != nil { + log.Errorf("Invalid Message Falgs: %v", err) + return + } + ps.profile.UpdateMessageFlags(handle, mIx, flags) + if len(handle) == groupIDLen { + ps.regenStreamStore(ps.profile.GetGroup(handle).Timeline.Messages, handle) + } else if contact, exists := ps.profile.GetContact(handle); exists { + if exists { + val, _ := contact.GetAttribute(event.SaveHistoryKey) + if val == event.SaveHistoryConfirmed { + ps.regenStreamStore(contact.Timeline.Messages, handle) + } + } + } default: return } @@ -472,7 +517,6 @@ func (ps *ProfileStoreV1) attemptSavePeerMessage(peerID, messageData, timestampe peerID := peerID var received time.Time var message model.Message - if fromPeer { received, _ = time.Parse(time.RFC3339Nano, timestampeReceived) message = model.Message{Received: received, Timestamp: received, Message: messageData, PeerID: peerID, Signature: []byte{}, PreviousMessageSig: []byte{}}