Merge pull request 'Support Profile Status and Profile Attributes. Auto Fetch Updates on a Heartbeat. Move Profile Image Download Checks to Cwtch' (#503) from autodownload into master
continuous-integration/drone/push Build is pending Details

Reviewed-on: #503
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
This commit is contained in:
Sarah Jamie Lewis 2023-04-04 21:04:02 +00:00
commit 2e59cc43ab
11 changed files with 137 additions and 8 deletions

View File

@ -221,6 +221,7 @@ func (app *application) setupPeer(profile peer.CwtchPeer) {
// Register the Peer With Application Plugins..
app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory
app.AddPeerPlugin(profile.GetOnion(), plugins.HEARTBEAT) // Now Mandatory
}

49
app/plugins/heartbeat.go Normal file
View File

@ -0,0 +1,49 @@
package plugins
import (
"cwtch.im/cwtch/event"
"git.openprivacy.ca/openprivacy/log"
"time"
)
const heartbeatTickTime = 60 * time.Second
type heartbeat struct {
bus event.Manager
queue event.Queue
breakChan chan bool
}
func (a *heartbeat) Start() {
go a.run()
}
func (cr *heartbeat) Id() PluginID {
return HEARTBEAT
}
func (a heartbeat) Shutdown() {
a.breakChan <- true
a.queue.Shutdown()
}
func (a *heartbeat) run() {
log.Debugf("running heartbeat trigger plugin")
for {
select {
case <-time.After(heartbeatTickTime):
// no fuss, just trigger the beat.
a.bus.Publish(event.NewEvent(event.Heartbeat, map[event.Field]string{}))
continue
case <-a.breakChan:
log.Debugf("shutting down heartbeat plugin")
return
}
}
}
// NewHeartbeat returns a Plugin that when started will trigger heartbeat checks on a regular interval
func NewHeartbeat(bus event.Manager) Plugin {
cr := &heartbeat{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)}
return cr
}

View File

@ -14,6 +14,7 @@ const (
CONNECTIONRETRY PluginID = iota
NETWORKCHECK
ANTISPAM
HEARTBEAT
)
// Plugin is the interface for a plugin
@ -32,6 +33,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl
return NewNetworkCheck(onion, bus, acn), nil
case ANTISPAM:
return NewAntiSpam(bus), nil
case HEARTBEAT:
return NewHeartbeat(bus), nil
}
return nil, fmt.Errorf("plugin not defined %v", id)

View File

@ -219,6 +219,9 @@ const (
TokenManagerInfo = Type("TokenManagerInfo")
TriggerAntispamCheck = Type("TriggerAntispamCheck")
MakeAntispamPayment = Type("MakeAntispamPayment")
// Heartbeat is used to trigger actions that need to happen every so often...
Heartbeat = Type("Heartbeat")
)
// Field defines common event attributes

View File

@ -6,6 +6,7 @@ import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/openprivacy/log"
"strconv"
@ -19,15 +20,43 @@ func (pne ProfileValueExtension) NotifySettingsUpdate(settings settings.GlobalSe
}
func (pne ProfileValueExtension) EventsToRegister() []event.Type {
return nil
return []event.Type{event.PeerStateChange, event.Heartbeat}
}
func (pne ProfileValueExtension) ExperimentsToRegister() []string {
return nil
}
func (pne ProfileValueExtension) OnEvent(event event.Event, profile peer.CwtchPeer) {
// nop
func (pne ProfileValueExtension) requestProfileInfo(profile peer.CwtchPeer, ci *model.Conversation) {
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.Name)
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileStatus)
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute1)
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute2)
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute3)
}
func (pne ProfileValueExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
case event.Heartbeat:
// once every heartbeat, loop through conversations and, if they are online, request an update to any long info..
conversations, err := profile.FetchConversations()
if err == nil {
for _, ci := range conversations {
if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED {
pne.requestProfileInfo(profile, ci)
}
}
}
case event.PeerStateChange:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
// if we have re-authenticated with thie peer then request their profile image...
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
// Request some profile information...
pne.requestProfileInfo(profile, ci)
}
}
}
}
// OnContactReceiveValue for ProfileValueExtension handles saving specific Public Profile Values like Profile Name

View File

@ -6,6 +6,7 @@ import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"encoding/json"
"fmt"
@ -24,7 +25,7 @@ func (i *ImagePreviewsFunctionality) NotifySettingsUpdate(settings settings.Glob
}
func (i ImagePreviewsFunctionality) EventsToRegister() []event.Type {
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup}
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup, event.PeerStateChange, event.Heartbeat}
}
func (i ImagePreviewsFunctionality) ExperimentsToRegister() []string {
@ -48,6 +49,23 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP
i.handleImagePreviews(profile, &ev, ci.ID, ci.ID)
}
}
case event.PeerStateChange:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
// if we have re-authenticated with thie peer then request their profile image...
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
}
}
case event.Heartbeat:
conversations, err := profile.FetchConversations()
if err == nil {
for _, ci := range conversations {
if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED {
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
}
}
}
case event.ProtocolEngineCreated:
// Now that the Peer Engine is Activated, Reshare Profile Images
key, exists := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)

View File

@ -58,3 +58,8 @@ const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"
const AttrLastConnectionTime = "last-connection-time"
const PeerAutostart = "autostart"
const Archived = "archived"
const ProfileStatus = "profile-status"
const ProfileAttribute1 = "profile-attribute-1"
const ProfileAttribute2 = "profile-attribute-2"
const ProfileAttribute3 = "profile-attribute-3"

View File

@ -14,3 +14,6 @@ const MessageFormattingExperiment = "message-formatting"
// AutoDLFileExts Files with these extensions will be autodownloaded using ImagePreviewsExperiment
var AutoDLFileExts = [...]string{".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
// BlodeuweddExperiment enables the Blodeuwedd Assistant
const BlodeuweddExperiment = "blodeuwedd"

View File

@ -1484,6 +1484,22 @@ func (cp *cwtchPeer) eventHandler() {
cp.mutex.Unlock()
}
}
// Safe Access to Extensions
cp.extensionLock.Lock()
for _, extension := range cp.extensions {
log.Debugf("checking extension...%v", extension)
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
continue
}
if cp.checkEventExperiment(extension, ev.EventType) {
extension.extension.OnEvent(ev, cp)
}
}
cp.extensionLock.Unlock()
cp.mutex.Lock()
cp.state[ev.Data[event.RemotePeer]] = connections.ConnectionStateToType()[ev.Data[event.ConnectionState]]
cp.mutex.Unlock()

View File

@ -55,6 +55,7 @@ type GlobalSettings struct {
CustomControlPort int
UseTorCache bool
TorCacheDir string
BlodeuweddPath string
}
var DefaultGlobalSettings = GlobalSettings{
@ -79,6 +80,7 @@ var DefaultGlobalSettings = GlobalSettings{
CustomControlPort: -1,
UseTorCache: false,
TorCacheDir: "",
BlodeuweddPath: "",
}
func InitGlobalSettingsFile(directory string, password string) (*GlobalSettingsFile, error) {

View File

@ -164,21 +164,21 @@ func TestCwtchPeerIntegration(t *testing.T) {
aliceBus := app.GetEventBus(alice.GetOnion())
app.ActivatePeerEngine(alice.GetOnion())
log.Infoln("Alice created:", alice.GetOnion())
alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice")
// alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
bob := app2.WaitGetPeer(app, "Bob")
bobBus := app.GetEventBus(bob.GetOnion())
app.ActivatePeerEngine(bob.GetOnion())
log.Infoln("Bob created:", bob.GetOnion())
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
// bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
carol := app2.WaitGetPeer(app, "Carol")
carolBus := app.GetEventBus(carol.GetOnion())
app.ActivatePeerEngine(carol.GetOnion())
log.Infoln("Carol created:", carol.GetOnion())
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
// carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
waitTime := time.Duration(60) * time.Second
@ -403,7 +403,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.Infof("Shutting down ACN...")
acn.Close()
time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them
time.Sleep(time.Second * 60) // the network status / heartbeat plugin might keep goroutines alive for a minute before killing them
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()