Support Profile Status and Profile Attributes. Auto Fetch Updates on a Heartbeat. Move Profile Image Download Checks to Cwtch #503

Merged
sarah merged 4 commits from autodownload into master 2023-04-04 21:04:02 +00:00
11 changed files with 137 additions and 8 deletions

View File

@ -221,6 +221,7 @@ func (app *application) setupPeer(profile peer.CwtchPeer) {
// Register the Peer With Application Plugins..
app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory
app.AddPeerPlugin(profile.GetOnion(), plugins.HEARTBEAT) // Now Mandatory
}

49
app/plugins/heartbeat.go Normal file
View File

@ -0,0 +1,49 @@
package plugins
import (
"cwtch.im/cwtch/event"
"git.openprivacy.ca/openprivacy/log"
"time"
)
const heartbeatTickTime = 60 * time.Second
type heartbeat struct {
bus event.Manager
queue event.Queue
breakChan chan bool
}
func (a *heartbeat) Start() {
go a.run()
}
func (cr *heartbeat) Id() PluginID {
return HEARTBEAT
}
func (a heartbeat) Shutdown() {
a.breakChan <- true
a.queue.Shutdown()
}
func (a *heartbeat) run() {
log.Debugf("running heartbeat trigger plugin")
for {
select {
case <-time.After(heartbeatTickTime):
// no fuss, just trigger the beat.
a.bus.Publish(event.NewEvent(event.Heartbeat, map[event.Field]string{}))
continue
case <-a.breakChan:
log.Debugf("shutting down heartbeat plugin")
return
}
}
}
// NewHeartbeat returns a Plugin that when started will trigger heartbeat checks on a regular interval
func NewHeartbeat(bus event.Manager) Plugin {
cr := &heartbeat{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)}
return cr
}

View File

@ -14,6 +14,7 @@ const (
CONNECTIONRETRY PluginID = iota
NETWORKCHECK
ANTISPAM
HEARTBEAT
)
// Plugin is the interface for a plugin
@ -32,6 +33,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl
return NewNetworkCheck(onion, bus, acn), nil
case ANTISPAM:
return NewAntiSpam(bus), nil
case HEARTBEAT:
return NewHeartbeat(bus), nil
}
return nil, fmt.Errorf("plugin not defined %v", id)

View File

@ -219,6 +219,9 @@ const (
TokenManagerInfo = Type("TokenManagerInfo")
TriggerAntispamCheck = Type("TriggerAntispamCheck")
MakeAntispamPayment = Type("MakeAntispamPayment")
// Heartbeat is used to trigger actions that need to happen every so often...
Heartbeat = Type("Heartbeat")
)
// Field defines common event attributes

View File

@ -6,6 +6,7 @@ import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/openprivacy/log"
"strconv"
@ -19,15 +20,43 @@ func (pne ProfileValueExtension) NotifySettingsUpdate(settings settings.GlobalSe
}
func (pne ProfileValueExtension) EventsToRegister() []event.Type {
return nil
return []event.Type{event.PeerStateChange, event.Heartbeat}
}
func (pne ProfileValueExtension) ExperimentsToRegister() []string {
return nil
}
func (pne ProfileValueExtension) OnEvent(event event.Event, profile peer.CwtchPeer) {
// nop
func (pne ProfileValueExtension) requestProfileInfo(profile peer.CwtchPeer, ci *model.Conversation) {
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.Name)
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileStatus)
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute1)
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute2)
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute3)
}
func (pne ProfileValueExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
case event.Heartbeat:
// once every heartbeat, loop through conversations and, if they are online, request an update to any long info..
conversations, err := profile.FetchConversations()
if err == nil {
for _, ci := range conversations {
if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED {
pne.requestProfileInfo(profile, ci)
}
}
}
case event.PeerStateChange:
dan marked this conversation as resolved
Review

should be 1 fun reqinfo() so we dont dup this code and adding new is simple

should be 1 `fun reqinfo()` so we dont dup this code and adding new is simple
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
// if we have re-authenticated with thie peer then request their profile image...
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
// Request some profile information...
pne.requestProfileInfo(profile, ci)
}
}
}
}
// OnContactReceiveValue for ProfileValueExtension handles saving specific Public Profile Values like Profile Name

View File

@ -6,6 +6,7 @@ import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"encoding/json"
"fmt"
@ -24,7 +25,7 @@ func (i *ImagePreviewsFunctionality) NotifySettingsUpdate(settings settings.Glob
}
func (i ImagePreviewsFunctionality) EventsToRegister() []event.Type {
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup}
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup, event.PeerStateChange, event.Heartbeat}
}
func (i ImagePreviewsFunctionality) ExperimentsToRegister() []string {
@ -48,6 +49,23 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP
i.handleImagePreviews(profile, &ev, ci.ID, ci.ID)
}
}
case event.PeerStateChange:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
// if we have re-authenticated with thie peer then request their profile image...
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
Review

do we likewise want this on a heartbeat in case they change?

do we likewise want this on a heartbeat in case they change?
}
}
case event.Heartbeat:
conversations, err := profile.FetchConversations()
if err == nil {
for _, ci := range conversations {
if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED {
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
}
}
}
case event.ProtocolEngineCreated:
// Now that the Peer Engine is Activated, Reshare Profile Images
key, exists := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)

View File

@ -58,3 +58,8 @@ const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"
const AttrLastConnectionTime = "last-connection-time"
const PeerAutostart = "autostart"
const Archived = "archived"
const ProfileStatus = "profile-status"
const ProfileAttribute1 = "profile-attribute-1"
Review

asking ux, not sure how i feel about attr 1-3

asking ux, not sure how i feel about attr 1-3
const ProfileAttribute2 = "profile-attribute-2"
const ProfileAttribute3 = "profile-attribute-3"

View File

@ -14,3 +14,6 @@ const MessageFormattingExperiment = "message-formatting"
// AutoDLFileExts Files with these extensions will be autodownloaded using ImagePreviewsExperiment
var AutoDLFileExts = [...]string{".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
// BlodeuweddExperiment enables the Blodeuwedd Assistant
const BlodeuweddExperiment = "blodeuwedd"

View File

@ -1484,6 +1484,22 @@ func (cp *cwtchPeer) eventHandler() {
cp.mutex.Unlock()
}
}
// Safe Access to Extensions
cp.extensionLock.Lock()
Review

this loop through extension for ones matching the event and dispatching is repeated a few times, should be pulled out into a function somewhere

this loop through extension for ones matching the event and dispatching is repeated a few times, should be pulled out into a function somewhere
for _, extension := range cp.extensions {
log.Debugf("checking extension...%v", extension)
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
continue
}
if cp.checkEventExperiment(extension, ev.EventType) {
extension.extension.OnEvent(ev, cp)
dan marked this conversation as resolved
Review

we're doing onEvent form peer.handle event for onPeerStausChange, but did registering the onPeerEventStatusChange event event bus listener in the extension not mean it'd pull it off the bus? like i'm assuing it is doing for heartbeat? or else if not than we i assume need a dup of this for heartbeat? and to have peer consume it?

we're doing onEvent form peer.handle event for onPeerStausChange, but did registering the onPeerEventStatusChange event event bus listener in the extension not mean it'd pull it off the bus? like i'm assuing it is doing for heartbeat? or else if not than we i assume need a dup of this for heartbeat? and to have peer consume it?
}
}
cp.extensionLock.Unlock()
cp.mutex.Lock()
cp.state[ev.Data[event.RemotePeer]] = connections.ConnectionStateToType()[ev.Data[event.ConnectionState]]
cp.mutex.Unlock()

View File

@ -55,6 +55,7 @@ type GlobalSettings struct {
CustomControlPort int
UseTorCache bool
TorCacheDir string
BlodeuweddPath string
}
var DefaultGlobalSettings = GlobalSettings{
@ -79,6 +80,7 @@ var DefaultGlobalSettings = GlobalSettings{
CustomControlPort: -1,
UseTorCache: false,
TorCacheDir: "",
BlodeuweddPath: "",
}
func InitGlobalSettingsFile(directory string, password string) (*GlobalSettingsFile, error) {

View File

@ -164,21 +164,21 @@ func TestCwtchPeerIntegration(t *testing.T) {
aliceBus := app.GetEventBus(alice.GetOnion())
app.ActivatePeerEngine(alice.GetOnion())
log.Infoln("Alice created:", alice.GetOnion())
alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice")
// alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
bob := app2.WaitGetPeer(app, "Bob")
bobBus := app.GetEventBus(bob.GetOnion())
app.ActivatePeerEngine(bob.GetOnion())
log.Infoln("Bob created:", bob.GetOnion())
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
// bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
carol := app2.WaitGetPeer(app, "Carol")
carolBus := app.GetEventBus(carol.GetOnion())
app.ActivatePeerEngine(carol.GetOnion())
log.Infoln("Carol created:", carol.GetOnion())
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
// carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
waitTime := time.Duration(60) * time.Second
@ -403,7 +403,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.Infof("Shutting down ACN...")
acn.Close()
time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them
time.Sleep(time.Second * 60) // the network status / heartbeat plugin might keep goroutines alive for a minute before killing them
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()