autobindings/utils/eventHandler.go

637 lines
25 KiB
Go

package utils
import (
"cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/functionality/servers"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"encoding/json"
"fmt"
constants2 "git.openprivacy.ca/cwtch.im/cwtch-autobindings/constants"
"git.openprivacy.ca/cwtch.im/cwtch-autobindings/experiments/server_hosting"
"git.openprivacy.ca/openprivacy/log"
"os"
"strconv"
"time"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/functionality/filesharing"
)
type EventProfileEnvelope struct {
Event event.Event
Profile string
}
type EventHandler struct {
app app.Application
appBusQueue event.Queue
profileEvents chan EventProfileEnvelope
modules []EventHandlerInterface
}
type EventHandlerInterface interface {
OnACNStatusEvent(appl app.Application, e *event.Event)
}
const ReloadEvent = event.Field("Reload")
// We should be reading from profile events pretty quickly, but we make this buffer fairly large...
const profileEventsBufferSize = 512
func NewEventHandler() *EventHandler {
eh := &EventHandler{app: nil, appBusQueue: event.NewQueue(), profileEvents: make(chan EventProfileEnvelope, profileEventsBufferSize), modules: nil}
return eh
}
func (eh *EventHandler) AddModule(ehi EventHandlerInterface) {
eh.modules = append(eh.modules, ehi)
}
func (eh *EventHandler) HandleApp(application app.Application) {
eh.app = application
application.GetPrimaryBus().Subscribe(event.NewPeer, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(event.PeerError, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(event.PeerDeleted, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(event.Shutdown, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(event.AppError, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(event.ACNStatus, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(event.ACNVersion, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(settings.UpdateGlobalSettings, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(settings.CwtchStarted, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(server_hosting.NewServer, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(server_hosting.ServerIntentUpdate, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(server_hosting.ServerDeleted, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(server_hosting.ServerStatsUpdate, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(event.StartingStorageMiragtion, eh.appBusQueue)
application.GetPrimaryBus().Subscribe(event.DoneStorageMigration, eh.appBusQueue)
}
func (eh *EventHandler) GetNextEvent() string {
appChan := eh.appBusQueue.OutChan()
select {
case e := <-appChan:
return eh.handleAppBusEvent(&e)
default:
select {
case e := <-appChan:
return eh.handleAppBusEvent(&e)
case ev := <-eh.profileEvents:
return eh.handleProfileEvent(&ev)
}
}
}
// track acnStatus across events
var acnStatus = -1
// handleAppBusEvent enriches AppBus events so they are usable with out further data fetches
func (eh *EventHandler) handleAppBusEvent(e *event.Event) string {
if eh.app != nil {
switch e.EventType {
case event.ACNStatus:
newAcnStatus, err := strconv.Atoi(e.Data[event.Progress])
if err != nil {
break
}
acnStatus = newAcnStatus
for _, module := range eh.modules {
module.OnACNStatusEvent(eh.app, e)
}
case event.NewPeer:
onion := e.Data[event.Identity]
profile := eh.app.GetPeer(e.Data[event.Identity])
if profile == nil {
log.Errorf("NewPeer: skipping profile initialization. this should only happen when the app is rapidly opened+closed (eg during testing)")
break
}
log.Debug("New Peer Event: %v", e)
if e.Data[ReloadEvent] != event.True {
eh.startHandlingPeer(onion)
}
// CwtchPeer will always set this now...
tag, _ := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag)
e.Data[constants.Tag] = tag
if e.Data[event.Created] == event.True {
profile.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants2.Picture, ImageToString(NewImage(RandomProfileImage(onion), TypeImageDistro)))
}
profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants2.PeerOnline, event.False)
// disabeling network check for connection attempt reservation, needs rework
//eh.app.AddPeerPlugin(onion, plugins.NETWORKCHECK)
eh.app.AddPeerPlugin(onion, plugins.ANTISPAM)
// If the user has chosen to block unknown profiles
// then explicitly configure the protocol engine to do so..
settings := eh.app.ReadSettings()
if settings.BlockUnknownConnections {
profile.BlockUnknownConnections()
} else {
// For completeness
profile.AllowUnknownConnections()
}
// Start up the Profile
if acnStatus == 100 {
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
appearOffline, appearOfflineExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAppearOffline)
if !exists || autostart == "true" {
eh.app.ActivatePeerEngine(onion)
if appearOfflineExists && appearOffline == "true" {
// don't configure any connections...
} else {
eh.app.ConfigureConnections(onion, true, true, true)
}
}
}
online, _ := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants2.PeerOnline)
e.Data["Online"] = online
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants2.PeerAutostart)
// legacy profiles should autostart by default
if !exists {
autostart = "true"
}
e.Data["autostart"] = autostart
appearOffline, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAppearOffline)
// legacy profiles should not appearOffline by default
if !exists {
appearOffline = "false"
}
e.Data["appearOffline"] = appearOffline
// Name always exists
e.Data[constants.Name], _ = profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
e.Data[constants2.DefaultProfilePicture] = RandomProfileImage(onion)
// if a custom profile image exists then default to it.
key, exists := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
if !exists {
e.Data[constants2.Picture] = RandomProfileImage(onion)
} else {
e.Data[constants2.Picture], _ = profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", key))
}
// Construct our conversations and our srever lists
var contacts []Contact
var knownServers []servers.Server
conversations, err := profile.FetchConversations()
if err == nil {
// We have conversations attached to this profile...
for _, conversationInfo := range conversations {
// Only compile the server info if we have enabled the experiment...
// Note that this means that this info can become stale if when first loaded the experiment
// has been disabled and then is later re-enabled. As such we need to ensure that this list is
// re-fetched when the group experiment is enabled via a dedicated ListServerInfo event...
if conversationInfo.IsServer() {
groupHandler := servers.FunctionalityGate()
if err == nil {
serverInfo, _ := groupHandler.GetServerInfo(profile, conversationInfo.Handle)
knownServers = append(knownServers, serverInfo)
}
continue
}
// Prefer local override to public name...
localname, exists := conversationInfo.GetAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
if !exists {
localname = ""
}
name, exists := conversationInfo.GetAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
if !exists {
name = conversationInfo.Handle
}
// Resolve the profile image of the contact
var cpicPath string
var defaultPath string
if conversationInfo.IsGroup() {
cpicPath = RandomGroupImage(conversationInfo.Handle)
defaultPath = RandomGroupImage(conversationInfo.Handle)
} else {
cpicPath = eh.GetProfileImage(profile, conversationInfo, settings.DownloadPath)
defaultPath = RandomProfileImage(conversationInfo.Handle)
}
// Resolve Save History Setting
saveHistory, set := conversationInfo.GetAttribute(attr.LocalScope, attr.ProfileZone, event.SaveHistoryKey)
if !set {
saveHistory = event.DeleteHistoryDefault
}
// Resolve Archived Setting
isArchived, set := conversationInfo.GetAttribute(attr.LocalScope, attr.ProfileZone, constants2.Archived)
if !set {
isArchived = event.False
}
unread := 0
lastSeenMessageId := -1
lastSeenTimeStr, set := conversationInfo.GetAttribute(attr.LocalScope, attr.ProfileZone, constants2.LastSeenTime)
if set {
lastSeenTime, err := time.Parse(constants2.DartIso8601, lastSeenTimeStr)
if err == nil {
// get last 100 messages and count how many are after the lastSeenTime (100 cus hte ui just shows 99+ after)
messages, err := profile.GetMostRecentMessages(conversationInfo.ID, 0, 0, 100)
if err == nil {
for _, message := range messages {
msgTime, err := time.Parse(time.RFC3339Nano, message.Attr[constants.AttrSentTimestamp])
if err == nil {
if msgTime.UTC().After(lastSeenTime.UTC()) {
unread++
} else {
lastSeenMessageId = message.ID
break
}
}
}
}
}
}
groupServer, _ := conversationInfo.GetAttribute(attr.LocalScope, attr.LegacyGroupZone, constants.GroupServer)
stateHandle := conversationInfo.Handle
if conversationInfo.IsGroup() {
stateHandle = groupServer
}
state := profile.GetPeerState(stateHandle)
if !set {
state = connections.DISCONNECTED
}
blocked := false
if conversationInfo.ACL[conversationInfo.Handle].Blocked {
blocked = true
}
// Fetch the message count, and the time of the most recent message
count, err := profile.GetChannelMessageCount(conversationInfo.ID, 0)
if err != nil {
log.Errorf("error fetching channel message count %v %v", conversationInfo.ID, err)
}
lastMessage, _ := profile.GetMostRecentMessages(conversationInfo.ID, 0, 0, 1)
notificationPolicy := constants2.ConversationNotificationPolicyDefault
if notificationPolicyAttr, exists := conversationInfo.GetAttribute(attr.LocalScope, attr.ProfileZone, constants2.ConversationNotificationPolicy); exists {
notificationPolicy = notificationPolicyAttr
}
contacts = append(contacts, Contact{
Name: name,
LocalName: localname,
Identifier: conversationInfo.ID,
Onion: conversationInfo.Handle,
Status: connections.ConnectionStateName[state],
Picture: cpicPath,
DefaultPicture: defaultPath,
Accepted: conversationInfo.Accepted,
AccessControlList: conversationInfo.ACL,
Blocked: blocked,
SaveHistory: saveHistory,
Messages: count,
Unread: unread,
LastSeenMessageId: lastSeenMessageId,
LastMessage: strconv.Itoa(getLastMessageTime(lastMessage)),
IsGroup: conversationInfo.IsGroup(),
GroupServer: groupServer,
IsArchived: isArchived == event.True,
NotificationPolicy: notificationPolicy,
Attributes: conversationInfo.Attributes,
})
}
}
bytes, _ := json.Marshal(contacts)
e.Data["ContactsJson"] = string(bytes)
// Marshal the server list into the new peer event...
serversListBytes, _ := json.Marshal(knownServers)
e.Data[servers.ServerList] = string(serversListBytes)
log.Debugf("contactsJson %v", e.Data["ContactsJson"])
}
}
json, _ := json.Marshal(e)
return string(json)
}
func (eh *EventHandler) GetProfileImage(profile peer.CwtchPeer, conversationInfo *model.Conversation, basepath string) string {
if eh.app.IsFeatureEnabled(constants.ImagePreviewsExperiment) {
fileKey, err := profile.GetConversationAttribute(conversationInfo.ID, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.CustomProfileImageKey)))
if err == nil {
if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True {
fp, _ := filesharing.GenerateDownloadPath(basepath, fileKey, true)
// check if the file exists...if it does then set the path...
if _, err := os.Stat(fp); err == nil {
image, _ := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", fileKey))
return image
}
}
}
}
return RandomProfileImage(conversationInfo.Handle)
}
// handleProfileEvent enriches Profile events so they are usable with out further data fetches
func (eh *EventHandler) handleProfileEvent(ev *EventProfileEnvelope) string {
// cache of contact states to use to filter out events repeating known states
var contactStateCache = make(map[string]connections.ConnectionState)
if eh.app == nil {
log.Errorf("eh.app == nil in handleProfileEvent... this shouldnt happen?")
} else {
profile := eh.app.GetPeer(ev.Profile)
if profile == nil {
log.Errorf("something has gone very wrong. profile is nil in handleProfileEvent")
return ""
}
log.Debugf("New Profile Event to Handle: %v", ev)
switch ev.Event.EventType {
case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data
// only needs contact nickname and picture, for displaying on popup notifications
ci, err := profile.FetchConversationInfo(ev.Event.Data["RemotePeer"])
ev.Event.Data[constants2.Picture] = RandomProfileImage(ev.Event.Data["RemotePeer"])
if ci != nil && err == nil {
ev.Event.Data[event.ConversationID] = strconv.Itoa(ci.ID)
profile.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants2.Archived)), event.False)
ev.Event.Data[constants2.Picture] = eh.GetProfileImage(profile, ci, eh.app.ReadSettings().DownloadPath)
} else {
// TODO This Conversation May Not Exist Yet...But we are not in charge of creating it...
log.Errorf("todo wait for contact to be added before processing this event...")
return ""
}
var exists bool
ev.Event.Data["Nick"], exists = ci.GetAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
if !exists {
ev.Event.Data["Nick"], exists = ci.GetAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
if !exists {
ev.Event.Data["Nick"] = ev.Event.Data["RemotePeer"]
}
}
ev.Event.Data["notification"] = string(determineNotification(ci, eh.app.ReadSettings()))
case event.NewMessageFromGroup:
// only needs contact nickname and picture, for displaying on popup notifications
ci, err := profile.FetchConversationInfo(ev.Event.Data["RemotePeer"])
ev.Event.Data[constants2.Picture] = RandomProfileImage(ev.Event.Data["RemotePeer"])
if ci != nil && err == nil {
var exists bool
ev.Event.Data["Nick"], exists = ci.GetAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
if !exists {
ev.Event.Data["Nick"], exists = ci.GetAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
if !exists {
ev.Event.Data["Nick"] = ev.Event.Data["RemotePeer"]
}
}
ev.Event.Data[constants2.Picture] = eh.GetProfileImage(profile, ci, eh.app.ReadSettings().DownloadPath)
}
conversationID, _ := strconv.Atoi(ev.Event.Data[event.ConversationID])
profile.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants2.Archived)), event.False)
gci, err := profile.GetConversationInfo(conversationID)
if err != nil {
log.Errorf("new message from non-existant group: %v", err)
break
}
groupServer := gci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)).ToString()]
state := profile.GetPeerState(groupServer)
// if syncing, don't flood with notifications
if state == connections.SYNCED {
ev.Event.Data["notification"] = string(determineNotification(gci, eh.app.ReadSettings()))
} else {
ev.Event.Data["notification"] = string(constants2.NotificationNone)
}
case event.PeerAcknowledgement:
ci, err := profile.FetchConversationInfo(ev.Event.Data["RemotePeer"])
if ci != nil && err == nil {
ev.Event.Data[event.ConversationID] = strconv.Itoa(ci.ID)
}
case event.ContactCreated:
conversationID, _ := strconv.Atoi(ev.Event.Data[event.ConversationID])
count, err := profile.GetChannelMessageCount(conversationID, 0)
if err != nil {
log.Errorf("error fetching channel message count %v %v", conversationID, err)
break
}
conversationInfo, err := profile.GetConversationInfo(conversationID)
if err != nil {
log.Errorf("error fetching conversation info for %v %v", conversationID, err)
break
}
blocked := constants.False
if conversationInfo.ACL[conversationInfo.Handle].Blocked {
blocked = constants.True
}
accepted := constants.False
if conversationInfo.Accepted {
accepted = constants.True
}
acl, err := json.Marshal(conversationInfo.ACL)
if err != nil {
log.Errorf("received invalid ACL in conversation: %v", err)
break
}
lastMessage, _ := profile.GetMostRecentMessages(conversationID, 0, 0, 1)
ev.Event.Data["unread"] = strconv.Itoa(count) // if this is a new contact with messages attached then by-definition these are unread...
ev.Event.Data[constants2.Picture] = RandomProfileImage(conversationInfo.Handle)
ev.Event.Data[constants2.DefaultProfilePicture] = RandomProfileImage(conversationInfo.Handle)
ev.Event.Data["numMessages"] = strconv.Itoa(count)
ev.Event.Data["nick"] = conversationInfo.Handle
ev.Event.Data["status"] = connections.ConnectionStateName[profile.GetPeerState(conversationInfo.Handle)]
ev.Event.Data["accepted"] = accepted
ev.Event.Data["accessControlList"] = string(acl)
ev.Event.Data["blocked"] = blocked
ev.Event.Data["loading"] = "false"
ev.Event.Data["lastMsgTime"] = strconv.Itoa(getLastMessageTime(lastMessage))
case event.GroupCreated:
// This event should only happen after we have validated the invite, as such the error
// condition *should* never happen.
groupPic := RandomGroupImage(ev.Event.Data[event.GroupID])
ev.Event.Data[constants2.Picture] = groupPic
conversationID, err := strconv.Atoi(ev.Event.Data[event.ConversationID])
if err != nil {
log.Errorf("invalid conversation id recieved %v", err)
break
}
conversationInfo, err := profile.GetConversationInfo(conversationID)
if err != nil {
log.Errorf("error fetching conversation info for %v %v", conversationID, err)
break
}
acl, _ := json.Marshal(conversationInfo.ACL)
ev.Event.Data["accessControlList"] = string(acl)
case event.NewGroup:
// This event should only happen after we have validated the invite, as such the error
// condition *should* never happen.
serializedInvite := ev.Event.Data[event.GroupInvite]
if invite, err := model.ValidateInvite(serializedInvite); err == nil {
groupPic := RandomGroupImage(invite.GroupID)
ev.Event.Data[constants2.Picture] = groupPic
conversationID, err := strconv.Atoi(ev.Event.Data[event.ConversationID])
if err != nil {
log.Errorf("invalid conversation id recieved %v", err)
break
}
conversationInfo, err := profile.GetConversationInfo(conversationID)
if err != nil {
log.Errorf("error fetching conversation info for %v %v", conversationID, err)
break
}
acl, err := json.Marshal(conversationInfo.ACL)
ev.Event.Data["accessControlList"] = string(acl)
if err != nil {
log.Errorf("received invalid ACL in conversation: %v", err)
break
}
} else {
log.Errorf("received a new group event which contained an invalid invite %v. this should never happen and likely means there is a bug in cwtch. Please file a ticket @ https://git.openprivacy.ca/cwtch.im/cwtch", err)
return ""
}
case event.PeerStateChange:
cxnState := connections.ConnectionStateToType()[ev.Event.Data[event.ConnectionState]]
if ev.Event.Data[event.RemotePeer] == profile.GetOnion() {
return "" // suppress events from our own profile...
}
contactStateCache[ev.Event.Data[event.RemotePeer]] = cxnState
case event.ServerStateChange:
cxnState := connections.ConnectionStateToType()[ev.Event.Data[event.ConnectionState]]
contactStateCache[ev.Event.Data[event.RemotePeer]] = cxnState
case event.TokenManagerInfo:
conversations, err := profile.FetchConversations()
if err == nil {
var associatedGroups []int
for _, ci := range conversations {
groupServer, groupServerExists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)).ToString()]
if groupServerExists {
gci, err := profile.FetchConversationInfo(groupServer)
if err == nil {
tokenOnion, onionExists := gci.Attributes[attr.PublicScope.ConstructScopedZonedPath(attr.ServerKeyZone.ConstructZonedPath(string(model.KeyTypeTokenOnion))).ToString()]
if onionExists && tokenOnion == ev.Event.Data[event.ServerTokenOnion] {
associatedGroups = append(associatedGroups, ci.ID)
}
}
}
}
associatedGroupsJson, _ := json.Marshal(associatedGroups)
ev.Event.Data[event.Data] = string(associatedGroupsJson)
}
case event.ProtocolEngineCreated:
// TODO this code should be moved into Cwtch during the API officialization...
settings := eh.app.ReadSettings()
// ensure that protocol engine respects blocking settings...
if settings.BlockUnknownConnections {
profile.BlockUnknownConnections()
} else {
profile.AllowUnknownConnections()
}
}
}
json, _ := json.Marshal(unwrap(ev))
return string(json)
}
func unwrap(original *EventProfileEnvelope) *event.Event {
unwrapped := &original.Event
unwrapped.Data["ProfileOnion"] = original.Profile
return unwrapped
}
func (eh *EventHandler) startHandlingPeer(onion string) {
eventBus := eh.app.GetEventBus(onion)
if eventBus == nil {
log.Errorf("could not start handling peer events .. event bus is nil")
return
}
q := event.NewQueue()
eventBus.Subscribe(event.NetworkStatus, q)
eventBus.Subscribe(event.ACNInfo, q)
eventBus.Subscribe(event.NewMessageFromPeer, q)
eventBus.Subscribe(event.UpdatedProfileAttribute, q)
eventBus.Subscribe(event.PeerAcknowledgement, q)
eventBus.Subscribe(event.DeleteContact, q)
eventBus.Subscribe(event.AppError, q)
eventBus.Subscribe(event.IndexedAcknowledgement, q)
eventBus.Subscribe(event.IndexedFailure, q)
eventBus.Subscribe(event.ContactCreated, q)
eventBus.Subscribe(event.NewMessageFromGroup, q)
eventBus.Subscribe(event.GroupCreated, q)
eventBus.Subscribe(event.NewGroup, q)
eventBus.Subscribe(event.ServerStateChange, q)
eventBus.Subscribe(event.PeerStateChange, q)
eventBus.Subscribe(event.UpdatedConversationAttribute, q)
eventBus.Subscribe(event.ManifestSizeReceived, q)
eventBus.Subscribe(event.ManifestError, q)
eventBus.Subscribe(event.ManifestReceived, q)
eventBus.Subscribe(event.ManifestSaved, q)
eventBus.Subscribe(event.FileDownloadProgressUpdate, q)
eventBus.Subscribe(event.FileDownloaded, q)
eventBus.Subscribe(event.TokenManagerInfo, q)
eventBus.Subscribe(event.ProtocolEngineCreated, q)
eventBus.Subscribe(event.SearchResult, q)
eventBus.Subscribe(event.SearchCancelled, q)
eventBus.Subscribe(servers.UpdateServerInfo, q)
go eh.forwardProfileMessages(onion, q)
}
func (eh *EventHandler) forwardProfileMessages(onion string, q event.Queue) {
log.Infof("Launching Forwarding Goroutine")
// TODO: graceful shutdown, via an injected event of special QUIT type exiting loop/go routine
for {
e := q.Next()
ev := EventProfileEnvelope{Event: e, Profile: onion}
eh.profileEvents <- ev
if ev.Event.EventType == event.Shutdown {
return
}
}
}
// Push pushes an event onto the app event bus
//
// It is also a way for libCwtch-go to publish an event for consumption by a UI before a Cwtch app has been initialized
// use: to signal an error before a cwtch app could be created
func (eh *EventHandler) Push(newEvent event.Event) {
eh.appBusQueue.Publish(newEvent)
}
func getLastMessageTime(conversationMessages []model.ConversationMessage) int {
if len(conversationMessages) == 0 {
return 0
}
time, err := time.Parse(time.RFC3339Nano, conversationMessages[0].Attr[constants.AttrSentTimestamp])
if err != nil {
return 0
}
return int(time.Unix())
}