Store Messages and Send when Online #553

Merged
sarah merged 2 commits from offline-messages into master 2024-04-16 18:35:02 +00:00
7 changed files with 83 additions and 18 deletions

View File

@ -363,6 +363,7 @@ func (app *application) LoadProfiles(password string) {
func (app *application) registerHooks(profile peer.CwtchPeer) {
// Register Hooks
profile.RegisterHook(extensions.ProfileValueExtension{})
profile.RegisterHook(extensions.SendWhenOnlineExtension{})
profile.RegisterHook(new(filesharing.Functionality))
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
profile.RegisterHook(new(servers.Functionality))

View File

@ -0,0 +1,66 @@
package extensions
import (
"strconv"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/openprivacy/log"
)
// SendWhenOnlineExtension implements automatic sending
// Some Considerations:
// - There are race conditions inherant in this approach e.g. a peer could go offline just after recieving a message and never sending an ack
// - In that case the next time we connect we will send a duplicate message.
sarah marked this conversation as resolved
Review

dont messages be acked by the message signature? if the client receives a message with an identical signature... is it too hard to ask the message store for a convo if it already has a message with that sig? we're set up to get them to ack right?

dont messages be acked by the message signature? if the client receives a message with an identical signature... is it too hard to ask the message store for a convo if it already has a message with that sig? we're set up to get them to ack right?
// - Currently we do not include metadata like sent time in raw peer protocols (however Overlay does now have support for that information)
type SendWhenOnlineExtension struct {
}
func (soe SendWhenOnlineExtension) NotifySettingsUpdate(_ settings.GlobalSettings) {
}
func (soe SendWhenOnlineExtension) EventsToRegister() []event.Type {
return []event.Type{event.PeerStateChange}
}
func (soe SendWhenOnlineExtension) ExperimentsToRegister() []string {
return nil
}
func (soe SendWhenOnlineExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
case event.PeerStateChange:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
// if we have re-authenticated with thie peer then request their profile image...
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
// Check the last 100 messages, if any of them are pending, then send them now...
messsages, _ := profile.GetMostRecentMessages(ci.ID, 0, 0, uint(100))
dan marked this conversation as resolved
Review

what about a bool foundUnset and if we do resend a message, we set to true, and do another poll of n messages from the DB? they ... should all be bunched together at the top more or less so even polling in less than 100 should catch them but if someone went on one heck of a screed we should keep checking till we run out

what about a bool `foundUnset` and if we do resend a message, we set to true, and do another poll of n messages from the DB? they ... should all be bunched together at the top more or less so even polling in less than 100 should catch them but if someone went on one heck of a screed we should keep checking till we run out
for _, message := range messsages {
if message.Attr[constants.AttrAck] == constants.False {
body := message.Body
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Data: body})
ev.EventID = message.Signature // we need this ensure that we correctly ack this in the db when it comes back
// TODO: The EventBus is becoming very noisy...we may want to consider a one-way shortcut to Engine i.e. profile.Engine.SendMessageToPeer
sarah marked this conversation as resolved
Review

FWIW engine is the only thing subbed to this message? so its prlly fine?

FWIW engine is the only thing subbed to this message? so its prlly fine?
Review

i mean the event bus itself it becoming clogged, generally for forward feeding messages Peer->Engine we only have 1 sub so replacing with an actual function call relieves some of the pressure that we get.

i mean the event bus itself it becoming clogged, generally for forward feeding messages Peer->Engine we only have 1 sub so replacing with an actual function call relieves some of the pressure that we get.
log.Debugf("resending message that was sent when peer was offline")
profile.PublishEvent(ev)
}
}
}
}
}
}
// OnContactReceiveValue is nop for SendWhenOnnlineExtension
func (soe SendWhenOnlineExtension) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, szp attr.ScopedZonedPath, value string, exists bool) {
}
// OnContactRequestValue is nop for SendWhenOnnlineExtension
func (soe SendWhenOnlineExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) {
}

1
go.mod
View File

@ -16,7 +16,6 @@ require (
require (
filippo.io/edwards25519 v1.0.0 // indirect
git.openprivacy.ca/openprivacy/bine v0.0.5 // indirect
github.com/client9/misspell v0.3.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/gtank/merlin v0.1.1 // indirect
github.com/mimoo/StrobeGo v0.0.0-20220103164710-9a04d6ca976b // indirect

2
go.sum
View File

@ -8,8 +8,6 @@ git.openprivacy.ca/openprivacy/connectivity v1.11.0 h1:roASjaFtQLu+HdH5fa2wx6F00
git.openprivacy.ca/openprivacy/connectivity v1.11.0/go.mod h1:OQO1+7OIz/jLxDrorEMzvZA6SEbpbDyLGpjoFqT3z1Y=
git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0=
git.openprivacy.ca/openprivacy/log v1.0.3/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -1,11 +1,13 @@
package model
import (
"encoding/json"
"time"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"encoding/json"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"time"
)
// AccessControl is a type determining client assigned authorization to a peer
@ -99,6 +101,11 @@ func (ci *Conversation) GetPeerAC() AccessControl {
return DefaultP2PAccessControl()
}
// IsCwtchPeer is a helper attribute that identifies whether a conversation is a cwtch peer
func (ci *Conversation) IsCwtchPeer() bool {
return tor.IsValidHostname(ci.Handle)
}
// IsGroup is a helper attribute that identifies whether a conversation is a legacy group
func (ci *Conversation) IsGroup() bool {
if _, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()]; exists {

View File

@ -310,8 +310,8 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
authorizations := make(map[string]model.Authorization)
for _, conversation := range conversations {
if tor.IsValidHostname(conversation.Handle) {
// Only perform the following actions for Peer-type Conversaions...
if conversation.IsCwtchPeer() {
sarah marked this conversation as resolved
Review

why this change?

why this change?
Review

alignment with conversation.IsGroup conversation.IsServer / removing the tor specific hostname logic from cwtch checks.

alignment with conversation.IsGroup conversation.IsServer / removing the tor specific hostname logic from cwtch checks.
// if this profile does not have an ACL version, and the profile is accepted (OR the acl version is v1 and the profile is accepted...)
// then migrate the permissions to the v2 ACL
// migrate the old accepted AC to a new fine-grained one
@ -1683,6 +1683,7 @@ func (cp *cwtchPeer) eventHandler() {
timestamp := time.Now().Format(time.RFC3339Nano)
cp.SetConversationAttribute(cid, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp)
} else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED {
ci, err := cp.FetchConversationInfo(handle)
if err == nil {

View File

@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) {
os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest")
log.SetLevel(log.LevelInfo)
log.SetLevel(log.LevelDebug)
log.ExcludeFromPattern("tapir")
os.Mkdir("tordir", 0700)
@ -151,13 +151,6 @@ func TestFileSharing(t *testing.T) {
bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.PeerWithOnion(bob.GetOnion())
t.Logf("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
alice.AcceptConversation(1)
t.Logf("Alice and Bob are Connected!!")
filesharingFunctionality := filesharing.FunctionalityGate()
@ -167,10 +160,10 @@ func TestFileSharing(t *testing.T) {
}
alice.SendMessage(1, fileSharingMessage)
bob.AcceptConversation(1)
// Wait for the messages to arrive...
time.Sleep(time.Second * 10)
// Ok this is fun...we just Sent a Message we may not have a connection yet...
// so this test will only pass if sending offline works...
waitForPeerPeerConnection(t, bob, alice)
bob.SendMessage(1, "this is a test message")
bob.SendMessage(1, "this is another test message")