Support Profile Status and Profile Attributes. Auto Fetch Updates on a Heartbeat. Move Profile Image Download Checks to Cwtch #503
|
@ -221,6 +221,7 @@ func (app *application) setupPeer(profile peer.CwtchPeer) {
|
|||
|
||||
// Register the Peer With Application Plugins..
|
||||
app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory
|
||||
app.AddPeerPlugin(profile.GetOnion(), plugins.HEARTBEAT) // Now Mandatory
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
package plugins
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"time"
|
||||
)
|
||||
|
||||
const heartbeatTickTime = 60 * time.Second
|
||||
|
||||
type heartbeat struct {
|
||||
bus event.Manager
|
||||
queue event.Queue
|
||||
breakChan chan bool
|
||||
}
|
||||
|
||||
func (a *heartbeat) Start() {
|
||||
go a.run()
|
||||
}
|
||||
|
||||
func (cr *heartbeat) Id() PluginID {
|
||||
return HEARTBEAT
|
||||
}
|
||||
|
||||
func (a heartbeat) Shutdown() {
|
||||
a.breakChan <- true
|
||||
a.queue.Shutdown()
|
||||
}
|
||||
|
||||
func (a *heartbeat) run() {
|
||||
log.Debugf("running heartbeat trigger plugin")
|
||||
for {
|
||||
select {
|
||||
case <-time.After(heartbeatTickTime):
|
||||
// no fuss, just trigger the beat.
|
||||
a.bus.Publish(event.NewEvent(event.Heartbeat, map[event.Field]string{}))
|
||||
continue
|
||||
case <-a.breakChan:
|
||||
log.Debugf("shutting down heartbeat plugin")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewHeartbeat returns a Plugin that when started will trigger heartbeat checks on a regular interval
|
||||
func NewHeartbeat(bus event.Manager) Plugin {
|
||||
cr := &heartbeat{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)}
|
||||
return cr
|
||||
}
|
|
@ -14,6 +14,7 @@ const (
|
|||
CONNECTIONRETRY PluginID = iota
|
||||
NETWORKCHECK
|
||||
ANTISPAM
|
||||
HEARTBEAT
|
||||
)
|
||||
|
||||
// Plugin is the interface for a plugin
|
||||
|
@ -32,6 +33,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl
|
|||
return NewNetworkCheck(onion, bus, acn), nil
|
||||
case ANTISPAM:
|
||||
return NewAntiSpam(bus), nil
|
||||
case HEARTBEAT:
|
||||
return NewHeartbeat(bus), nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("plugin not defined %v", id)
|
||||
|
|
|
@ -219,6 +219,9 @@ const (
|
|||
TokenManagerInfo = Type("TokenManagerInfo")
|
||||
TriggerAntispamCheck = Type("TriggerAntispamCheck")
|
||||
MakeAntispamPayment = Type("MakeAntispamPayment")
|
||||
|
||||
// Heartbeat is used to trigger actions that need to happen every so often...
|
||||
Heartbeat = Type("Heartbeat")
|
||||
)
|
||||
|
||||
// Field defines common event attributes
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"cwtch.im/cwtch/settings"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"strconv"
|
||||
|
@ -19,15 +20,43 @@ func (pne ProfileValueExtension) NotifySettingsUpdate(settings settings.GlobalSe
|
|||
}
|
||||
|
||||
func (pne ProfileValueExtension) EventsToRegister() []event.Type {
|
||||
return nil
|
||||
return []event.Type{event.PeerStateChange, event.Heartbeat}
|
||||
}
|
||||
|
||||
func (pne ProfileValueExtension) ExperimentsToRegister() []string {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pne ProfileValueExtension) OnEvent(event event.Event, profile peer.CwtchPeer) {
|
||||
// nop
|
||||
func (pne ProfileValueExtension) requestProfileInfo(profile peer.CwtchPeer, ci *model.Conversation) {
|
||||
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.Name)
|
||||
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileStatus)
|
||||
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute1)
|
||||
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute2)
|
||||
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.ProfileAttribute3)
|
||||
}
|
||||
|
||||
func (pne ProfileValueExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) {
|
||||
switch ev.EventType {
|
||||
case event.Heartbeat:
|
||||
// once every heartbeat, loop through conversations and, if they are online, request an update to any long info..
|
||||
conversations, err := profile.FetchConversations()
|
||||
if err == nil {
|
||||
for _, ci := range conversations {
|
||||
if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED {
|
||||
pne.requestProfileInfo(profile, ci)
|
||||
}
|
||||
}
|
||||
}
|
||||
case event.PeerStateChange:
|
||||
dan marked this conversation as resolved
|
||||
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
|
||||
if err == nil {
|
||||
// if we have re-authenticated with thie peer then request their profile image...
|
||||
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
|
||||
// Request some profile information...
|
||||
pne.requestProfileInfo(profile, ci)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// OnContactReceiveValue for ProfileValueExtension handles saving specific Public Profile Values like Profile Name
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"cwtch.im/cwtch/settings"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
@ -24,7 +25,7 @@ func (i *ImagePreviewsFunctionality) NotifySettingsUpdate(settings settings.Glob
|
|||
}
|
||||
|
||||
func (i ImagePreviewsFunctionality) EventsToRegister() []event.Type {
|
||||
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup}
|
||||
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup, event.PeerStateChange, event.Heartbeat}
|
||||
}
|
||||
|
||||
func (i ImagePreviewsFunctionality) ExperimentsToRegister() []string {
|
||||
|
@ -48,6 +49,23 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP
|
|||
i.handleImagePreviews(profile, &ev, ci.ID, ci.ID)
|
||||
}
|
||||
}
|
||||
case event.PeerStateChange:
|
||||
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
|
||||
if err == nil {
|
||||
// if we have re-authenticated with thie peer then request their profile image...
|
||||
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
|
||||
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
|
||||
dan
commented
do we likewise want this on a heartbeat in case they change? do we likewise want this on a heartbeat in case they change?
|
||||
}
|
||||
}
|
||||
case event.Heartbeat:
|
||||
conversations, err := profile.FetchConversations()
|
||||
if err == nil {
|
||||
for _, ci := range conversations {
|
||||
if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED {
|
||||
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
case event.ProtocolEngineCreated:
|
||||
// Now that the Peer Engine is Activated, Reshare Profile Images
|
||||
key, exists := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
|
||||
|
|
|
@ -58,3 +58,8 @@ const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"
|
|||
const AttrLastConnectionTime = "last-connection-time"
|
||||
const PeerAutostart = "autostart"
|
||||
const Archived = "archived"
|
||||
|
||||
const ProfileStatus = "profile-status"
|
||||
const ProfileAttribute1 = "profile-attribute-1"
|
||||
dan
commented
asking ux, not sure how i feel about attr 1-3 asking ux, not sure how i feel about attr 1-3
|
||||
const ProfileAttribute2 = "profile-attribute-2"
|
||||
const ProfileAttribute3 = "profile-attribute-3"
|
||||
|
|
|
@ -14,3 +14,6 @@ const MessageFormattingExperiment = "message-formatting"
|
|||
|
||||
// AutoDLFileExts Files with these extensions will be autodownloaded using ImagePreviewsExperiment
|
||||
var AutoDLFileExts = [...]string{".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
|
||||
|
||||
// BlodeuweddExperiment enables the Blodeuwedd Assistant
|
||||
const BlodeuweddExperiment = "blodeuwedd"
|
||||
|
|
|
@ -1484,6 +1484,22 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
cp.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Safe Access to Extensions
|
||||
cp.extensionLock.Lock()
|
||||
dan
commented
this loop through extension for ones matching the event and dispatching is repeated a few times, should be pulled out into a function somewhere this loop through extension for ones matching the event and dispatching is repeated a few times, should be pulled out into a function somewhere
|
||||
for _, extension := range cp.extensions {
|
||||
log.Debugf("checking extension...%v", extension)
|
||||
// check if the current map of experiments satisfies the extension requirements
|
||||
if !cp.checkExtensionExperiment(extension) {
|
||||
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
|
||||
continue
|
||||
}
|
||||
if cp.checkEventExperiment(extension, ev.EventType) {
|
||||
extension.extension.OnEvent(ev, cp)
|
||||
dan marked this conversation as resolved
dan
commented
we're doing onEvent form peer.handle event for onPeerStausChange, but did registering the onPeerEventStatusChange event event bus listener in the extension not mean it'd pull it off the bus? like i'm assuing it is doing for heartbeat? or else if not than we i assume need a dup of this for heartbeat? and to have peer consume it? we're doing onEvent form peer.handle event for onPeerStausChange, but did registering the onPeerEventStatusChange event event bus listener in the extension not mean it'd pull it off the bus? like i'm assuing it is doing for heartbeat? or else if not than we i assume need a dup of this for heartbeat? and to have peer consume it?
|
||||
}
|
||||
}
|
||||
cp.extensionLock.Unlock()
|
||||
|
||||
cp.mutex.Lock()
|
||||
cp.state[ev.Data[event.RemotePeer]] = connections.ConnectionStateToType()[ev.Data[event.ConnectionState]]
|
||||
cp.mutex.Unlock()
|
||||
|
|
|
@ -55,6 +55,7 @@ type GlobalSettings struct {
|
|||
CustomControlPort int
|
||||
UseTorCache bool
|
||||
TorCacheDir string
|
||||
BlodeuweddPath string
|
||||
}
|
||||
|
||||
var DefaultGlobalSettings = GlobalSettings{
|
||||
|
@ -79,6 +80,7 @@ var DefaultGlobalSettings = GlobalSettings{
|
|||
CustomControlPort: -1,
|
||||
UseTorCache: false,
|
||||
TorCacheDir: "",
|
||||
BlodeuweddPath: "",
|
||||
}
|
||||
|
||||
func InitGlobalSettingsFile(directory string, password string) (*GlobalSettingsFile, error) {
|
||||
|
|
|
@ -164,21 +164,21 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
aliceBus := app.GetEventBus(alice.GetOnion())
|
||||
app.ActivatePeerEngine(alice.GetOnion())
|
||||
log.Infoln("Alice created:", alice.GetOnion())
|
||||
alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice")
|
||||
// alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
|
||||
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
||||
|
||||
bob := app2.WaitGetPeer(app, "Bob")
|
||||
bobBus := app.GetEventBus(bob.GetOnion())
|
||||
app.ActivatePeerEngine(bob.GetOnion())
|
||||
log.Infoln("Bob created:", bob.GetOnion())
|
||||
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
|
||||
// bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
|
||||
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
||||
|
||||
carol := app2.WaitGetPeer(app, "Carol")
|
||||
carolBus := app.GetEventBus(carol.GetOnion())
|
||||
app.ActivatePeerEngine(carol.GetOnion())
|
||||
log.Infoln("Carol created:", carol.GetOnion())
|
||||
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
|
||||
// carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
|
||||
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
||||
|
||||
waitTime := time.Duration(60) * time.Second
|
||||
|
@ -403,7 +403,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
|
||||
log.Infof("Shutting down ACN...")
|
||||
acn.Close()
|
||||
time.Sleep(time.Second * 30) // the network status plugin might keep goroutines alive for a minute before killing them
|
||||
time.Sleep(time.Second * 60) // the network status / heartbeat plugin might keep goroutines alive for a minute before killing them
|
||||
|
||||
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
|
||||
|
||||
|
|
Loading…
Reference in New Issue
should be 1
fun reqinfo()
so we dont dup this code and adding new is simple