From 4588cbc604d7d7cc73b864bab82f4ec862adafe9 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 3 Apr 2023 12:44:28 -0700 Subject: [PATCH 1/4] Support Profile Status and Profile Attributes. Auto Fetch Updates on a Heartbeat. Move Profile Image Download Checks to Cwtch --- app/app.go | 1 + app/plugins/heartbeat.go | 47 +++++++++++++++++++ app/plugins/plugin.go | 3 ++ event/common.go | 3 ++ extensions/profile_value.go | 33 +++++++++++-- functionality/filesharing/image_previews.go | 11 ++++- model/constants/attributes.go | 5 ++ model/constants/experiments.go | 3 ++ peer/cwtch_peer.go | 16 +++++++ settings/settings.go | 2 + testing/cwtch_peer_server_integration_test.go | 6 +-- 11 files changed, 123 insertions(+), 7 deletions(-) create mode 100644 app/plugins/heartbeat.go diff --git a/app/app.go b/app/app.go index 871b32a..d49579f 100644 --- a/app/app.go +++ b/app/app.go @@ -221,6 +221,7 @@ func (app *application) setupPeer(profile peer.CwtchPeer) { // Register the Peer With Application Plugins.. app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory + app.AddPeerPlugin(profile.GetOnion(), plugins.HEARTBEAT) // Now Mandatory } diff --git a/app/plugins/heartbeat.go b/app/plugins/heartbeat.go new file mode 100644 index 0000000..9932497 --- /dev/null +++ b/app/plugins/heartbeat.go @@ -0,0 +1,47 @@ +package plugins + +import ( + "cwtch.im/cwtch/event" + "git.openprivacy.ca/openprivacy/log" + "time" +) + +const heartbeatTickTime = 60 * time.Second + +type heartbeat struct { + bus event.Manager + queue event.Queue + breakChan chan bool +} + +func (a *heartbeat) Start() { + go a.run() +} + +func (cr *heartbeat) Id() PluginID { + return HEARTBEAT +} + +func (a heartbeat) Shutdown() { + a.breakChan <- true +} + +func (a *heartbeat) run() { + log.Debugf("running heartbeat trigger plugin") + for { + select { + case <-time.After(heartbeatTickTime): + // no fuss, just trigger the beat. + a.bus.Publish(event.NewEvent(event.Heartbeat, map[event.Field]string{})) + continue + case <-a.breakChan: + return + } + } +} + +// NewHeartbeat returns a Plugin that when started will trigger heartbeat checks on a regular interval +func NewHeartbeat(bus event.Manager) Plugin { + cr := &heartbeat{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)} + return cr +} diff --git a/app/plugins/plugin.go b/app/plugins/plugin.go index 9141b1a..324c0c0 100644 --- a/app/plugins/plugin.go +++ b/app/plugins/plugin.go @@ -14,6 +14,7 @@ const ( CONNECTIONRETRY PluginID = iota NETWORKCHECK ANTISPAM + HEARTBEAT ) // Plugin is the interface for a plugin @@ -32,6 +33,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl return NewNetworkCheck(onion, bus, acn), nil case ANTISPAM: return NewAntiSpam(bus), nil + case HEARTBEAT: + return NewHeartbeat(bus), nil } return nil, fmt.Errorf("plugin not defined %v", id) diff --git a/event/common.go b/event/common.go index c9041b2..39f6627 100644 --- a/event/common.go +++ b/event/common.go @@ -219,6 +219,9 @@ const ( TokenManagerInfo = Type("TokenManagerInfo") TriggerAntispamCheck = Type("TriggerAntispamCheck") MakeAntispamPayment = Type("MakeAntispamPayment") + + // Heartbeat is used to trigger actions that need to happen every so often... + Heartbeat = Type("Heartbeat") ) // Field defines common event attributes diff --git a/extensions/profile_value.go b/extensions/profile_value.go index 0ac8b3b..4c9ca5d 100644 --- a/extensions/profile_value.go +++ b/extensions/profile_value.go @@ -6,6 +6,7 @@ import ( "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/settings" "git.openprivacy.ca/openprivacy/log" "strconv" @@ -19,15 +20,41 @@ func (pne ProfileValueExtension) NotifySettingsUpdate(settings settings.GlobalSe } func (pne ProfileValueExtension) EventsToRegister() []event.Type { - return nil + return []event.Type{event.PeerStateChange, event.Heartbeat} } func (pne ProfileValueExtension) ExperimentsToRegister() []string { return nil } -func (pne ProfileValueExtension) OnEvent(event event.Event, profile peer.CwtchPeer) { - // nop +func (pne ProfileValueExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) { + switch ev.EventType { + case event.Heartbeat: + // once every heartbeat, loop through conversations and, if they are online, request an update to any long info.. + conversations, _ := profile.FetchConversations() + for _, ci := range conversations { + if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED { + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.Name) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileStatus) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute1) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute2) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute3) + } + } + case event.PeerStateChange: + ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"]) + if err == nil { + // if we have re-authenticated with thie peer then request their profile image... + if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED { + // Request some profile information... + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.Name) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileStatus) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute1) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute2) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute3) + } + } + } } // OnContactReceiveValue for ProfileValueExtension handles saving specific Public Profile Values like Profile Name diff --git a/functionality/filesharing/image_previews.go b/functionality/filesharing/image_previews.go index 7deb959..222f200 100644 --- a/functionality/filesharing/image_previews.go +++ b/functionality/filesharing/image_previews.go @@ -6,6 +6,7 @@ import ( "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/settings" "encoding/json" "fmt" @@ -24,7 +25,7 @@ func (i *ImagePreviewsFunctionality) NotifySettingsUpdate(settings settings.Glob } func (i ImagePreviewsFunctionality) EventsToRegister() []event.Type { - return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup} + return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup, event.PeerStateChange} } func (i ImagePreviewsFunctionality) ExperimentsToRegister() []string { @@ -48,6 +49,14 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP i.handleImagePreviews(profile, &ev, ci.ID, ci.ID) } } + case event.PeerStateChange: + ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"]) + if err == nil { + // if we have re-authenticated with thie peer then request their profile image... + if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED { + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey) + } + } case event.ProtocolEngineCreated: // Now that the Peer Engine is Activated, Reshare Profile Images key, exists := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey) diff --git a/model/constants/attributes.go b/model/constants/attributes.go index e498222..ecdc03a 100644 --- a/model/constants/attributes.go +++ b/model/constants/attributes.go @@ -58,3 +58,8 @@ const SyncMostRecentMessageTime = "SyncMostRecentMessageTime" const AttrLastConnectionTime = "last-connection-time" const PeerAutostart = "autostart" const Archived = "archived" + +const ProfileStatus = "profile-status" +const ProfileAttribute1 = "profile-attribute-1" +const ProfileAttribute2 = "profile-attribute-2" +const ProfileAttribute3 = "profile-attribute-3" diff --git a/model/constants/experiments.go b/model/constants/experiments.go index 95486ff..a9fd358 100644 --- a/model/constants/experiments.go +++ b/model/constants/experiments.go @@ -14,3 +14,6 @@ const MessageFormattingExperiment = "message-formatting" // AutoDLFileExts Files with these extensions will be autodownloaded using ImagePreviewsExperiment var AutoDLFileExts = [...]string{".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"} + +// BlodeuweddExperiment enables the Blodeuwedd Assistant +const BlodeuweddExperiment = "blodeuwedd" diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index e8dc432..c9e7b59 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -1484,6 +1484,22 @@ func (cp *cwtchPeer) eventHandler() { cp.mutex.Unlock() } } + + // Safe Access to Extensions + cp.extensionLock.Lock() + for _, extension := range cp.extensions { + log.Debugf("checking extension...%v", extension) + // check if the current map of experiments satisfies the extension requirements + if !cp.checkExtensionExperiment(extension) { + log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension) + continue + } + if cp.checkEventExperiment(extension, ev.EventType) { + extension.extension.OnEvent(ev, cp) + } + } + cp.extensionLock.Unlock() + cp.mutex.Lock() cp.state[ev.Data[event.RemotePeer]] = connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] cp.mutex.Unlock() diff --git a/settings/settings.go b/settings/settings.go index 08cd263..397bc37 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -55,6 +55,7 @@ type GlobalSettings struct { CustomControlPort int UseTorCache bool TorCacheDir string + BlodeuweddPath string } var DefaultGlobalSettings = GlobalSettings{ @@ -79,6 +80,7 @@ var DefaultGlobalSettings = GlobalSettings{ CustomControlPort: -1, UseTorCache: false, TorCacheDir: "", + BlodeuweddPath: "", } func InitGlobalSettingsFile(directory string, password string) (*GlobalSettingsFile, error) { diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index a7421c3..9415e7e 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -164,21 +164,21 @@ func TestCwtchPeerIntegration(t *testing.T) { aliceBus := app.GetEventBus(alice.GetOnion()) app.ActivatePeerEngine(alice.GetOnion()) log.Infoln("Alice created:", alice.GetOnion()) - alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") + // alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob := app2.WaitGetPeer(app, "Bob") bobBus := app.GetEventBus(bob.GetOnion()) app.ActivatePeerEngine(bob.GetOnion()) log.Infoln("Bob created:", bob.GetOnion()) - bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") + // bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) carol := app2.WaitGetPeer(app, "Carol") carolBus := app.GetEventBus(carol.GetOnion()) app.ActivatePeerEngine(carol.GetOnion()) log.Infoln("Carol created:", carol.GetOnion()) - carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") + // carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) waitTime := time.Duration(60) * time.Second From 4d81529ce2f2f0ac61a6b932f32fa5e75b2f0b26 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 3 Apr 2023 14:33:25 -0700 Subject: [PATCH 2/4] Update Profile Extension to remove Duplication --- extensions/profile_value.go | 28 ++++++++++--------- functionality/filesharing/image_previews.go | 11 +++++++- testing/cwtch_peer_server_integration_test.go | 2 +- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/extensions/profile_value.go b/extensions/profile_value.go index 4c9ca5d..2ecd7ed 100644 --- a/extensions/profile_value.go +++ b/extensions/profile_value.go @@ -27,18 +27,24 @@ func (pne ProfileValueExtension) ExperimentsToRegister() []string { return nil } +func (pne ProfileValueExtension) requestProfileInfo(profile peer.CwtchPeer, ci *model.Conversation) { + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.Name) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileStatus) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute1) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute2) + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute3) +} + func (pne ProfileValueExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) { switch ev.EventType { case event.Heartbeat: // once every heartbeat, loop through conversations and, if they are online, request an update to any long info.. - conversations, _ := profile.FetchConversations() - for _, ci := range conversations { - if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED { - profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.Name) - profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileStatus) - profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute1) - profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute2) - profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute3) + conversations, err := profile.FetchConversations() + if err == nil { + for _, ci := range conversations { + if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED { + pne.requestProfileInfo(profile, ci) + } } } case event.PeerStateChange: @@ -47,11 +53,7 @@ func (pne ProfileValueExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) // if we have re-authenticated with thie peer then request their profile image... if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED { // Request some profile information... - profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.Name) - profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileStatus) - profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute1) - profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute2) - profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute3) + pne.requestProfileInfo(profile, ci) } } } diff --git a/functionality/filesharing/image_previews.go b/functionality/filesharing/image_previews.go index 222f200..7fdaa6b 100644 --- a/functionality/filesharing/image_previews.go +++ b/functionality/filesharing/image_previews.go @@ -25,7 +25,7 @@ func (i *ImagePreviewsFunctionality) NotifySettingsUpdate(settings settings.Glob } func (i ImagePreviewsFunctionality) EventsToRegister() []event.Type { - return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup, event.PeerStateChange} + return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup, event.PeerStateChange, event.Heartbeat} } func (i ImagePreviewsFunctionality) ExperimentsToRegister() []string { @@ -57,6 +57,15 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey) } } + case event.Heartbeat: + conversations, err := profile.FetchConversations() + if err == nil { + for _, ci := range conversations { + if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED { + profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey) + } + } + } case event.ProtocolEngineCreated: // Now that the Peer Engine is Activated, Reshare Profile Images key, exists := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 9415e7e..93d0a56 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -403,7 +403,7 @@ func TestCwtchPeerIntegration(t *testing.T) { log.Infof("Shutting down ACN...") acn.Close() - time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them + time.Sleep(time.Second * 60) // the network status / heartbeat plugin might keep goroutines alive for a minute before killing them numGoRoutinesPostAppShutdown := runtime.NumGoroutine() From 7107ad1eaa91709d91c5699633599c299910a7b2 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 3 Apr 2023 14:49:45 -0700 Subject: [PATCH 3/4] Close Heartbeat Queue --- app/plugins/heartbeat.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/app/plugins/heartbeat.go b/app/plugins/heartbeat.go index 9932497..bd75f4b 100644 --- a/app/plugins/heartbeat.go +++ b/app/plugins/heartbeat.go @@ -24,6 +24,7 @@ func (cr *heartbeat) Id() PluginID { func (a heartbeat) Shutdown() { a.breakChan <- true + a.bus.Shutdown() } func (a *heartbeat) run() { @@ -35,6 +36,7 @@ func (a *heartbeat) run() { a.bus.Publish(event.NewEvent(event.Heartbeat, map[event.Field]string{})) continue case <-a.breakChan: + log.Debugf("shutting down heartbeat plugin") return } } From 51f85ea619e44ae7597fccaac788c5cb1d43e536 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Mon, 3 Apr 2023 14:58:34 -0700 Subject: [PATCH 4/4] Fix queue shutdown --- app/plugins/heartbeat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/plugins/heartbeat.go b/app/plugins/heartbeat.go index bd75f4b..0c4b942 100644 --- a/app/plugins/heartbeat.go +++ b/app/plugins/heartbeat.go @@ -24,7 +24,7 @@ func (cr *heartbeat) Id() PluginID { func (a heartbeat) Shutdown() { a.breakChan <- true - a.bus.Shutdown() + a.queue.Shutdown() } func (a *heartbeat) run() {