Store Messages and Send when Online
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Sarah Jamie Lewis 2024-02-26 13:16:31 -08:00
parent cd918c02ea
commit 89aca91b37
Signed by: sarah
GPG Key ID: F27FD21A270837EF
7 changed files with 82 additions and 17 deletions

View File

@ -363,6 +363,7 @@ func (app *application) LoadProfiles(password string) {
func (app *application) registerHooks(profile peer.CwtchPeer) {
// Register Hooks
profile.RegisterHook(extensions.ProfileValueExtension{})
profile.RegisterHook(extensions.SendWhenOnlineExtension{})
profile.RegisterHook(new(filesharing.Functionality))
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
profile.RegisterHook(new(servers.Functionality))

View File

@ -0,0 +1,66 @@
package extensions
import (
"strconv"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/openprivacy/log"
)
// SendWhenOnlineExtension implements automatic sending
// Some Considerations:
// - There are race conditions inherant in this approach e.g. a peer could go offline just after recieving a message and never sending an ack
// - In that case the next time we connect we will send a duplicate message.
// - Currently we do not include metadata like sent time in raw peer protocols (however Overlay does now have support for that information)
type SendWhenOnlineExtension struct {
}
func (soe SendWhenOnlineExtension) NotifySettingsUpdate(_ settings.GlobalSettings) {
}
func (soe SendWhenOnlineExtension) EventsToRegister() []event.Type {
return []event.Type{event.PeerStateChange}
}
func (soe SendWhenOnlineExtension) ExperimentsToRegister() []string {
return nil
}
func (soe SendWhenOnlineExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
case event.PeerStateChange:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
// if we have re-authenticated with thie peer then request their profile image...
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
// Check the last 100 messages, if any of them are pending, then send them now...
messsages, _ := profile.GetMostRecentMessages(ci.ID, 0, 0, uint(100))
for _, message := range messsages {
if message.Attr[constants.AttrAck] == constants.False {
body := message.Body
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Data: body})
ev.EventID = message.Signature // we need this ensure that we correctly ack this in the db when it comes back
// TODO: The EventBus is becoming very noisy...we may want to consider a one-way shortcut to Engine i.e. profile.Engine.SendMessageToPeer
log.Infof("resending message that was sent when peer was offline")
profile.PublishEvent(ev)
}
}
}
}
}
}
// OnContactReceiveValue is nop for SendWhenOnnlineExtension
func (soe SendWhenOnlineExtension) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, szp attr.ScopedZonedPath, value string, exists bool) {
}
// OnContactRequestValue is nop for SendWhenOnnlineExtension
func (soe SendWhenOnlineExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) {
}

1
go.mod
View File

@ -16,7 +16,6 @@ require (
require (
filippo.io/edwards25519 v1.0.0 // indirect
git.openprivacy.ca/openprivacy/bine v0.0.5 // indirect
github.com/client9/misspell v0.3.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/gtank/merlin v0.1.1 // indirect
github.com/mimoo/StrobeGo v0.0.0-20220103164710-9a04d6ca976b // indirect

2
go.sum
View File

@ -8,8 +8,6 @@ git.openprivacy.ca/openprivacy/connectivity v1.11.0 h1:roASjaFtQLu+HdH5fa2wx6F00
git.openprivacy.ca/openprivacy/connectivity v1.11.0/go.mod h1:OQO1+7OIz/jLxDrorEMzvZA6SEbpbDyLGpjoFqT3z1Y=
git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0=
git.openprivacy.ca/openprivacy/log v1.0.3/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -1,11 +1,13 @@
package model
import (
"encoding/json"
"time"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"encoding/json"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"time"
)
// AccessControl is a type determining client assigned authorization to a peer
@ -99,6 +101,11 @@ func (ci *Conversation) GetPeerAC() AccessControl {
return DefaultP2PAccessControl()
}
// IsCwtchPeer is a helper attribute that identifies whether a conversation is a cwtch peer
func (ci *Conversation) IsCwtchPeer() bool {
return tor.IsValidHostname(ci.Handle)
}
// IsGroup is a helper attribute that identifies whether a conversation is a legacy group
func (ci *Conversation) IsGroup() bool {
if _, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)).ToString()]; exists {

View File

@ -310,8 +310,8 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
authorizations := make(map[string]model.Authorization)
for _, conversation := range conversations {
if tor.IsValidHostname(conversation.Handle) {
// Only perform the following actions for Peer-type Conversaions...
if conversation.IsCwtchPeer() {
// if this profile does not have an ACL version, and the profile is accepted (OR the acl version is v1 and the profile is accepted...)
// then migrate the permissions to the v2 ACL
// migrate the old accepted AC to a new fine-grained one
@ -1683,6 +1683,7 @@ func (cp *cwtchPeer) eventHandler() {
timestamp := time.Now().Format(time.RFC3339Nano)
cp.SetConversationAttribute(cid, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp)
} else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED {
ci, err := cp.FetchConversationInfo(handle)
if err == nil {

View File

@ -151,13 +151,6 @@ func TestFileSharing(t *testing.T) {
bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.PeerWithOnion(bob.GetOnion())
t.Logf("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
alice.AcceptConversation(1)
t.Logf("Alice and Bob are Connected!!")
filesharingFunctionality := filesharing.FunctionalityGate()
@ -167,10 +160,10 @@ func TestFileSharing(t *testing.T) {
}
alice.SendMessage(1, fileSharingMessage)
bob.AcceptConversation(1)
// Wait for the messages to arrive...
time.Sleep(time.Second * 10)
// Ok this is fun...we just Sent a Message we may not have a connection yet...
// so this test will only pass if sending offline works...
waitForPeerPeerConnection(t, bob, alice)
bob.SendMessage(1, "this is a test message")
bob.SendMessage(1, "this is another test message")