2021-06-24 22:30:46 +00:00
package utils
import (
2022-08-26 17:58:18 +00:00
"encoding/json"
"fmt"
2022-09-09 04:32:27 +00:00
"git.openprivacy.ca/cwtch.im/libcwtch-go/features/servers"
2022-08-26 17:58:18 +00:00
"os"
"strconv"
"cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
constants2 "git.openprivacy.ca/cwtch.im/libcwtch-go/constants"
"git.openprivacy.ca/cwtch.im/libcwtch-go/features/groups"
"git.openprivacy.ca/openprivacy/log"
"time"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/functionality/filesharing"
2021-06-24 22:30:46 +00:00
)
type EventProfileEnvelope struct {
2022-08-26 17:58:18 +00:00
Event event . Event
Profile string
2021-06-24 22:30:46 +00:00
}
2022-09-09 04:32:27 +00:00
type LCG_API_Handler struct {
LaunchServers func ( )
StopServers func ( )
}
2021-06-24 22:30:46 +00:00
type EventHandler struct {
2022-08-26 17:58:18 +00:00
app app . Application
appBusQueue event . Queue
profileEvents chan EventProfileEnvelope
2022-09-09 04:32:27 +00:00
api LCG_API_Handler
2021-06-24 22:30:46 +00:00
}
2022-04-21 00:18:45 +00:00
// We should be reading from profile events pretty quickly, but we make this buffer fairly large...
const profileEventsBufferSize = 512
2022-09-09 04:32:27 +00:00
func NewEventHandler ( api LCG_API_Handler ) * EventHandler {
eh := & EventHandler { app : nil , appBusQueue : event . NewQueue ( ) , profileEvents : make ( chan EventProfileEnvelope , profileEventsBufferSize ) , api : api }
2022-08-26 17:58:18 +00:00
return eh
2021-06-24 22:30:46 +00:00
}
func ( eh * EventHandler ) HandleApp ( application app . Application ) {
2022-08-26 17:58:18 +00:00
eh . app = application
application . GetPrimaryBus ( ) . Subscribe ( event . NewPeer , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . PeerError , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . PeerDeleted , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . Shutdown , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . AppError , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . ACNStatus , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . ACNVersion , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( UpdateGlobalSettings , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( CwtchStarted , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( servers . NewServer , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( servers . ServerIntentUpdate , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( servers . ServerDeleted , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( servers . ServerStatsUpdate , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . StartingStorageMiragtion , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . DoneStorageMigration , eh . appBusQueue )
2021-06-24 22:30:46 +00:00
}
func ( eh * EventHandler ) GetNextEvent ( ) string {
2022-08-26 17:58:18 +00:00
appChan := eh . appBusQueue . OutChan ( )
select {
case e := <- appChan :
return eh . handleAppBusEvent ( & e )
default :
select {
case e := <- appChan :
return eh . handleAppBusEvent ( & e )
case ev := <- eh . profileEvents :
return eh . handleProfileEvent ( & ev )
}
}
2021-06-24 22:30:46 +00:00
}
2022-09-10 18:45:00 +00:00
// track acnStatus across events
var acnStatus = - 1
2021-06-24 22:30:46 +00:00
// handleAppBusEvent enriches AppBus events so they are usable with out further data fetches
func ( eh * EventHandler ) handleAppBusEvent ( e * event . Event ) string {
2022-09-10 18:45:00 +00:00
2022-08-26 17:58:18 +00:00
if eh . app != nil {
switch e . EventType {
case event . ACNStatus :
2022-08-29 03:12:54 +00:00
newAcnStatus , err := strconv . Atoi ( e . Data [ event . Progress ] )
if err != nil {
break
}
if newAcnStatus == 100 {
if acnStatus != 100 {
// just came online
2022-12-03 00:54:39 +00:00
doServers := false
if _ , err := groups . ExperimentGate ( ReadGlobalSettings ( ) . Experiments ) ; err == nil {
doServers = true
2022-08-29 03:12:54 +00:00
}
2022-12-05 03:48:54 +00:00
for _ , onion := range eh . app . ListProfiles ( ) {
profile := eh . app . GetPeer ( onion )
autostart , exists := profile . GetScopedZonedAttribute ( attr . LocalScope , attr . ProfileZone , constants2 . PeerAutostart )
if ! exists || autostart == "true" {
eh . app . ActivatePeerEngine ( onion , true , true , doServers )
}
}
2022-09-09 04:32:27 +00:00
eh . api . LaunchServers ( )
2022-08-29 03:12:54 +00:00
}
} else {
if acnStatus == 100 {
// just fell offline
for _ , onion := range eh . app . ListProfiles ( ) {
eh . app . DeactivatePeerEngine ( onion )
}
2022-09-09 04:32:27 +00:00
eh . api . StopServers ( )
2022-08-26 17:58:18 +00:00
}
}
2022-08-29 03:12:54 +00:00
acnStatus = newAcnStatus
2022-08-26 17:58:18 +00:00
case event . NewPeer :
onion := e . Data [ event . Identity ]
profile := eh . app . GetPeer ( e . Data [ event . Identity ] )
if profile == nil {
log . Errorf ( "NewPeer: skipping profile initialization. this should only happen when the app is rapidly opened+closed (eg during testing)" )
break
}
log . Debug ( "New Peer Event: %v" , e )
if e . Data [ "Reload" ] != event . True {
eh . startHandlingPeer ( onion )
}
// CwtchPeer will always set this now...
tag , _ := profile . GetScopedZonedAttribute ( attr . LocalScope , attr . ProfileZone , constants . Tag )
e . Data [ constants . Tag ] = tag
if e . Data [ event . Created ] == event . True {
profile . SetScopedZonedAttribute ( attr . PublicScope , attr . ProfileZone , constants2 . Picture , ImageToString ( NewImage ( RandomProfileImage ( onion ) , TypeImageDistro ) ) )
}
profile . SetScopedZonedAttribute ( attr . LocalScope , attr . ProfileZone , constants2 . PeerOnline , event . False )
2022-12-03 00:54:39 +00:00
// disabeling network check for connection attempt reservation, needs rework
//eh.app.AddPeerPlugin(onion, plugins.NETWORKCHECK)
2022-09-10 17:51:34 +00:00
eh . app . AddPeerPlugin ( onion , plugins . ANTISPAM )
2022-08-26 17:58:18 +00:00
// If the user has chosen to block unknown profiles
// then explicitly configure the protocol engine to do so..
settings := ReadGlobalSettings ( )
if settings . BlockUnknownConnections {
profile . BlockUnknownConnections ( )
} else {
// For completeness
profile . AllowUnknownConnections ( )
}
// Start up the Profile
2022-08-29 03:12:54 +00:00
if acnStatus == 100 {
2022-12-05 03:48:54 +00:00
autostart , exists := profile . GetScopedZonedAttribute ( attr . LocalScope , attr . ProfileZone , constants2 . PeerAutostart )
if ! exists || autostart == "true" {
doServers := false
if _ , err := groups . ExperimentGate ( ReadGlobalSettings ( ) . Experiments ) ; err == nil {
doServers = true
}
eh . app . ActivatePeerEngine ( onion , true , true , doServers )
2022-08-29 03:12:54 +00:00
}
2022-08-26 17:58:18 +00:00
}
online , _ := profile . GetScopedZonedAttribute ( attr . LocalScope , attr . ProfileZone , constants2 . PeerOnline )
2022-12-05 03:48:54 +00:00
e . Data [ "Online" ] = online
autostart , exists := profile . GetScopedZonedAttribute ( attr . LocalScope , attr . ProfileZone , constants2 . PeerAutostart )
// legacy profiles should autostart by default
if ! exists {
autostart = "true"
}
e . Data [ "autostart" ] = autostart
2022-08-26 17:58:18 +00:00
// Name always exists
e . Data [ constants . Name ] , _ = profile . GetScopedZonedAttribute ( attr . PublicScope , attr . ProfileZone , constants . Name )
e . Data [ constants2 . DefaultProfilePicture ] = RandomProfileImage ( onion )
// if a custom profile image exists then default to it.
key , exists := profile . GetScopedZonedAttribute ( attr . PublicScope , attr . ProfileZone , constants . CustomProfileImageKey )
if ! exists {
e . Data [ constants2 . Picture ] = RandomProfileImage ( onion )
} else {
e . Data [ constants2 . Picture ] , _ = profile . GetScopedZonedAttribute ( attr . LocalScope , attr . FilesharingZone , fmt . Sprintf ( "%s.path" , key ) )
}
// Construct our conversations and our srever lists
var contacts [ ] Contact
var servers [ ] groups . Server
conversations , err := profile . FetchConversations ( )
if err == nil {
// We have conversations attached to this profile...
for _ , conversationInfo := range conversations {
// Only compile the server info if we have enabled the experiment...
// Note that this means that this info can become stale if when first loaded the experiment
// has been disabled and then is later re-enabled. As such we need to ensure that this list is
// re-fetched when the group experiment is enabled via a dedicated ListServerInfo event...
if conversationInfo . IsServer ( ) {
groupHandler , err := groups . ExperimentGate ( ReadGlobalSettings ( ) . Experiments )
if err == nil {
servers = append ( servers , groupHandler . GetServerInfo ( conversationInfo . Handle , profile ) )
}
continue
}
// Prefer local override to public name...
name , exists := conversationInfo . GetAttribute ( attr . LocalScope , attr . ProfileZone , constants . Name )
if ! exists {
name , exists = conversationInfo . GetAttribute ( attr . PublicScope , attr . ProfileZone , constants . Name )
if ! exists {
name = conversationInfo . Handle
}
}
// Resolve the profile image of the contact
var cpicPath string
var defaultPath string
if conversationInfo . IsGroup ( ) {
cpicPath = RandomGroupImage ( conversationInfo . Handle )
defaultPath = RandomGroupImage ( conversationInfo . Handle )
} else {
cpicPath = GetProfileImage ( profile , conversationInfo , settings . DownloadPath )
defaultPath = RandomProfileImage ( conversationInfo . Handle )
}
// Resolve Save History Setting
saveHistory , set := conversationInfo . GetAttribute ( attr . LocalScope , attr . ProfileZone , event . SaveHistoryKey )
if ! set {
saveHistory = event . DeleteHistoryDefault
}
// Resolve Archived Setting
isArchived , set := conversationInfo . GetAttribute ( attr . LocalScope , attr . ProfileZone , constants2 . Archived )
if ! set {
isArchived = event . False
}
unread := 0
lastSeenMessageId := - 1
lastSeenTimeStr , set := conversationInfo . GetAttribute ( attr . LocalScope , attr . ProfileZone , constants2 . LastSeenTime )
if set {
lastSeenTime , err := time . Parse ( constants2 . DartIso8601 , lastSeenTimeStr )
if err == nil {
// get last 100 messages and count how many are after the lastSeenTime (100 cus hte ui just shows 99+ after)
messages , err := profile . GetMostRecentMessages ( conversationInfo . ID , 0 , 0 , 100 )
if err == nil {
for _ , message := range messages {
msgTime , err := time . Parse ( time . RFC3339Nano , message . Attr [ constants . AttrSentTimestamp ] )
if err == nil {
if msgTime . UTC ( ) . After ( lastSeenTime . UTC ( ) ) {
unread ++
} else {
lastSeenMessageId = message . ID
break
}
}
}
}
}
}
groupServer , _ := conversationInfo . GetAttribute ( attr . LocalScope , attr . LegacyGroupZone , constants . GroupServer )
stateHandle := conversationInfo . Handle
if conversationInfo . IsGroup ( ) {
stateHandle = groupServer
}
state := profile . GetPeerState ( stateHandle )
if ! set {
state = connections . DISCONNECTED
}
blocked := false
if conversationInfo . ACL [ conversationInfo . Handle ] . Blocked {
blocked = true
}
// Fetch the message count, and the time of the most recent message
count , err := profile . GetChannelMessageCount ( conversationInfo . ID , 0 )
if err != nil {
log . Errorf ( "error fetching channel message count %v %v" , conversationInfo . ID , err )
}
lastMessage , _ := profile . GetMostRecentMessages ( conversationInfo . ID , 0 , 0 , 1 )
notificationPolicy := constants2 . ConversationNotificationPolicyDefault
if notificationPolicyAttr , exists := conversationInfo . GetAttribute ( attr . LocalScope , attr . ProfileZone , constants2 . ConversationNotificationPolicy ) ; exists {
notificationPolicy = notificationPolicyAttr
}
contacts = append ( contacts , Contact {
Name : name ,
Identifier : conversationInfo . ID ,
Onion : conversationInfo . Handle ,
Status : connections . ConnectionStateName [ state ] ,
Picture : cpicPath ,
DefaultPicture : defaultPath ,
Accepted : conversationInfo . Accepted ,
AccessControlList : conversationInfo . ACL ,
Blocked : blocked ,
SaveHistory : saveHistory ,
Messages : count ,
Unread : unread ,
LastSeenMessageId : lastSeenMessageId ,
LastMessage : strconv . Itoa ( getLastMessageTime ( lastMessage ) ) ,
IsGroup : conversationInfo . IsGroup ( ) ,
GroupServer : groupServer ,
IsArchived : isArchived == event . True ,
NotificationPolicy : notificationPolicy ,
Attributes : conversationInfo . Attributes ,
} )
}
}
bytes , _ := json . Marshal ( contacts )
e . Data [ "ContactsJson" ] = string ( bytes )
// Marshal the server list into the new peer event...
serversListBytes , _ := json . Marshal ( servers )
e . Data [ groups . ServerList ] = string ( serversListBytes )
log . Debugf ( "contactsJson %v" , e . Data [ "ContactsJson" ] )
}
}
json , _ := json . Marshal ( e )
return string ( json )
2021-06-24 22:30:46 +00:00
}
2022-02-08 20:43:57 +00:00
func GetProfileImage ( profile peer . CwtchPeer , conversationInfo * model . Conversation , basepath string ) string {
2022-08-26 17:58:18 +00:00
fileKey , err := profile . GetConversationAttribute ( conversationInfo . ID , attr . PublicScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . CustomProfileImageKey ) ) )
if err == nil {
if value , exists := profile . GetScopedZonedAttribute ( attr . LocalScope , attr . FilesharingZone , fmt . Sprintf ( "%s.complete" , fileKey ) ) ; exists && value == event . True {
fp , _ := filesharing . GenerateDownloadPath ( basepath , fileKey , true )
// check if the file exists...if it does then set the path...
if _ , err := os . Stat ( fp ) ; err == nil {
image , _ := profile . GetScopedZonedAttribute ( attr . LocalScope , attr . FilesharingZone , fmt . Sprintf ( "%s.path" , fileKey ) )
return image
}
}
}
return RandomProfileImage ( conversationInfo . Handle )
2022-02-03 23:14:39 +00:00
}
2021-06-24 22:30:46 +00:00
// handleProfileEvent enriches Profile events so they are usable with out further data fetches
func ( eh * EventHandler ) handleProfileEvent ( ev * EventProfileEnvelope ) string {
2022-08-26 17:58:18 +00:00
// cache of contact states to use to filter out events repeating known states
var contactStateCache = make ( map [ string ] connections . ConnectionState )
if eh . app == nil {
log . Errorf ( "eh.app == nil in handleProfileEvent... this shouldnt happen?" )
} else {
profile := eh . app . GetPeer ( ev . Profile )
log . Debugf ( "New Profile Event to Handle: %v" , ev )
switch ev . Event . EventType {
case event . NewMessageFromPeer : //event.TimestampReceived, event.RemotePeer, event.Data
// only needs contact nickname and picture, for displaying on popup notifications
ci , err := profile . FetchConversationInfo ( ev . Event . Data [ "RemotePeer" ] )
ev . Event . Data [ constants2 . Picture ] = RandomProfileImage ( ev . Event . Data [ "RemotePeer" ] )
if ci != nil && err == nil {
ev . Event . Data [ event . ConversationID ] = strconv . Itoa ( ci . ID )
profile . SetConversationAttribute ( ci . ID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants2 . Archived ) ) , event . False )
ev . Event . Data [ constants2 . Picture ] = GetProfileImage ( profile , ci , ReadGlobalSettings ( ) . DownloadPath )
} else {
// TODO This Conversation May Not Exist Yet...But we are not in charge of creating it...
log . Errorf ( "todo wait for contact to be added before processing this event..." )
return ""
}
var exists bool
ev . Event . Data [ "Nick" ] , exists = ci . GetAttribute ( attr . LocalScope , attr . ProfileZone , constants . Name )
if ! exists {
ev . Event . Data [ "Nick" ] , exists = ci . GetAttribute ( attr . PublicScope , attr . ProfileZone , constants . Name )
if ! exists {
ev . Event . Data [ "Nick" ] = ev . Event . Data [ "RemotePeer" ]
// If we dont have a name val for a peer, but they have sent us a message, we might be approved now, re-ask
profile . SendScopedZonedGetValToContact ( ci . ID , attr . PublicScope , attr . ProfileZone , constants . Name )
profile . SendScopedZonedGetValToContact ( ci . ID , attr . PublicScope , attr . ProfileZone , constants . CustomProfileImageKey )
}
}
if ci . Accepted {
handleImagePreviews ( profile , & ev . Event , ci . ID , ci . ID )
}
ev . Event . Data [ "notification" ] = string ( determineNotification ( ci ) )
case event . NewMessageFromGroup :
// only needs contact nickname and picture, for displaying on popup notifications
ci , err := profile . FetchConversationInfo ( ev . Event . Data [ "RemotePeer" ] )
ev . Event . Data [ constants2 . Picture ] = RandomProfileImage ( ev . Event . Data [ "RemotePeer" ] )
if ci != nil && err == nil {
var exists bool
ev . Event . Data [ "Nick" ] , exists = ci . GetAttribute ( attr . LocalScope , attr . ProfileZone , constants . Name )
if ! exists {
ev . Event . Data [ "Nick" ] , exists = ci . GetAttribute ( attr . PublicScope , attr . ProfileZone , constants . Name )
if ! exists {
ev . Event . Data [ "Nick" ] = ev . Event . Data [ "RemotePeer" ]
}
}
ev . Event . Data [ constants2 . Picture ] = GetProfileImage ( profile , ci , ReadGlobalSettings ( ) . DownloadPath )
}
conversationID , _ := strconv . Atoi ( ev . Event . Data [ event . ConversationID ] )
profile . SetConversationAttribute ( conversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants2 . Archived ) ) , event . False )
if ci != nil && ci . Accepted {
handleImagePreviews ( profile , & ev . Event , conversationID , ci . ID )
}
gci , _ := profile . GetConversationInfo ( conversationID )
groupServer := gci . Attributes [ attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupServer ) ) . ToString ( ) ]
state := profile . GetPeerState ( groupServer )
// if syncing, don't flood with notifications
if state == connections . SYNCED {
ev . Event . Data [ "notification" ] = string ( determineNotification ( gci ) )
} else {
ev . Event . Data [ "notification" ] = string ( constants2 . NotificationNone )
}
case event . PeerAcknowledgement :
ci , err := profile . FetchConversationInfo ( ev . Event . Data [ "RemotePeer" ] )
if ci != nil && err == nil {
ev . Event . Data [ event . ConversationID ] = strconv . Itoa ( ci . ID )
}
case event . ContactCreated :
conversationID , _ := strconv . Atoi ( ev . Event . Data [ event . ConversationID ] )
count , err := profile . GetChannelMessageCount ( conversationID , 0 )
if err != nil {
log . Errorf ( "error fetching channel message count %v %v" , conversationID , err )
}
conversationInfo , err := profile . GetConversationInfo ( conversationID )
if err != nil {
log . Errorf ( "error fetching conversation info for %v %v" , conversationID , err )
}
blocked := constants . False
if conversationInfo . ACL [ conversationInfo . Handle ] . Blocked {
blocked = constants . True
}
accepted := constants . False
if conversationInfo . Accepted {
accepted = constants . True
}
acl , _ := json . Marshal ( conversationInfo . ACL )
lastMessage , _ := profile . GetMostRecentMessages ( conversationID , 0 , 0 , 1 )
ev . Event . Data [ "unread" ] = strconv . Itoa ( count ) // if this is a new contact with messages attached then by-definition these are unread...
ev . Event . Data [ constants2 . Picture ] = RandomProfileImage ( conversationInfo . Handle )
ev . Event . Data [ constants2 . DefaultProfilePicture ] = RandomProfileImage ( conversationInfo . Handle )
ev . Event . Data [ "numMessages" ] = strconv . Itoa ( count )
ev . Event . Data [ "nick" ] = conversationInfo . Handle
ev . Event . Data [ "status" ] = connections . ConnectionStateName [ profile . GetPeerState ( conversationInfo . Handle ) ]
ev . Event . Data [ "accepted" ] = accepted
ev . Event . Data [ "accessControlList" ] = string ( acl )
ev . Event . Data [ "blocked" ] = blocked
ev . Event . Data [ "loading" ] = "false"
ev . Event . Data [ "lastMsgTime" ] = strconv . Itoa ( getLastMessageTime ( lastMessage ) )
case event . GroupCreated :
// This event should only happen after we have validated the invite, as such the error
// condition *should* never happen.
groupPic := RandomGroupImage ( ev . Event . Data [ event . GroupID ] )
ev . Event . Data [ constants2 . Picture ] = groupPic
conversationID , _ := strconv . Atoi ( ev . Event . Data [ event . ConversationID ] )
conversationInfo , _ := profile . GetConversationInfo ( conversationID )
acl , _ := json . Marshal ( conversationInfo . ACL )
ev . Event . Data [ "accessControlList" ] = string ( acl )
case event . NewGroup :
// This event should only happen after we have validated the invite, as such the error
// condition *should* never happen.
serializedInvite := ev . Event . Data [ event . GroupInvite ]
if invite , err := model . ValidateInvite ( serializedInvite ) ; err == nil {
groupPic := RandomGroupImage ( invite . GroupID )
ev . Event . Data [ constants2 . Picture ] = groupPic
conversationID , _ := strconv . Atoi ( ev . Event . Data [ event . ConversationID ] )
conversationInfo , _ := profile . GetConversationInfo ( conversationID )
acl , _ := json . Marshal ( conversationInfo . ACL )
ev . Event . Data [ "accessControlList" ] = string ( acl )
} else {
log . Errorf ( "received a new group event which contained an invalid invite %v. this should never happen and likely means there is a bug in cwtch. Please file a ticket @ https://git.openprivacy.ca/cwtch.im/cwtch" , err )
return ""
}
case event . PeerStateChange :
cxnState := connections . ConnectionStateToType ( ) [ ev . Event . Data [ event . ConnectionState ] ]
// skip events the UI doesn't act on
if cxnState == connections . CONNECTING || cxnState == connections . CONNECTED {
return ""
}
contact , err := profile . FetchConversationInfo ( ev . Event . Data [ event . RemotePeer ] )
if ev . Event . Data [ event . RemotePeer ] == profile . GetOnion ( ) {
return "" // suppress events from our own profile...
}
// We do not know who this is...don't send any event until we see a message from them
// (at that point the conversation will have been created...)
if contact == nil || err != nil || contact . ID == 0 {
return ""
}
// if we already know this state, suppress
if knownState , exists := contactStateCache [ ev . Event . Data [ event . RemotePeer ] ] ; exists && cxnState == knownState {
return ""
}
contactStateCache [ ev . Event . Data [ event . RemotePeer ] ] = cxnState
if contact != nil {
// No enrichment needed
if cxnState == connections . AUTHENTICATED {
// if known and authed, get vars
profile . SendScopedZonedGetValToContact ( contact . ID , attr . PublicScope , attr . ProfileZone , constants . Name )
profile . SendScopedZonedGetValToContact ( contact . ID , attr . PublicScope , attr . ProfileZone , constants . CustomProfileImageKey )
}
}
case event . ServerStateChange :
cxnState := connections . ConnectionStateToType ( ) [ ev . Event . Data [ event . ConnectionState ] ]
// skip events the UI doesn't act on
if cxnState == connections . CONNECTING || cxnState == connections . CONNECTED {
return ""
}
// if we already know this state, suppress
if knownState , exists := contactStateCache [ ev . Event . Data [ event . RemotePeer ] ] ; exists && cxnState == knownState {
return ""
}
contactStateCache [ ev . Event . Data [ event . RemotePeer ] ] = cxnState
case event . NewRetValMessageFromPeer :
// auto handled event means the setting is already done, we're just deciding if we need to tell the UI
onion := ev . Event . Data [ event . RemotePeer ]
scope := ev . Event . Data [ event . Scope ]
path := ev . Event . Data [ event . Path ]
val := ev . Event . Data [ event . Data ]
exists , _ := strconv . ParseBool ( ev . Event . Data [ event . Exists ] )
conversation , err := profile . FetchConversationInfo ( onion )
if err == nil {
if exists && attr . IntoScope ( scope ) == attr . PublicScope {
zone , path := attr . ParseZone ( path )
// auto download profile images from contacts...
settings := ReadGlobalSettings ( )
if settings . ExperimentsEnabled && zone == attr . ProfileZone && path == constants . CustomProfileImageKey {
fileKey := val
fsf , err := filesharing . FunctionalityGate ( settings . Experiments )
imagePreviewsEnabled := settings . Experiments [ "filesharing-images" ]
if err == nil && imagePreviewsEnabled && conversation . Accepted {
basepath := settings . DownloadPath
fp , mp := filesharing . GenerateDownloadPath ( basepath , fileKey , true )
if value , exists := profile . GetScopedZonedAttribute ( attr . LocalScope , attr . FilesharingZone , fmt . Sprintf ( "%s.complete" , fileKey ) ) ; exists && value == event . True {
if _ , err := os . Stat ( fp ) ; err == nil {
// file is marked as completed downloaded and exists...
return ""
} else {
// the user probably deleted the file, mark completed as false...
profile . SetScopedZonedAttribute ( attr . LocalScope , attr . FilesharingZone , fmt . Sprintf ( "%s.complete" , fileKey ) , event . False )
}
}
log . Debugf ( "Downloading Profile Image %v %v %v" , fp , mp , fileKey )
ev . Event . Data [ event . FilePath ] = fp
fsf . DownloadFile ( profile , conversation . ID , fp , mp , val , constants . ImagePreviewMaxSizeInBytes )
} else {
// if image previews are disabled then ignore this event...
return ""
}
}
if val , err := profile . GetConversationAttribute ( conversation . ID , attr . LocalScope . ConstructScopedZonedPath ( zone . ConstructZonedPath ( path ) ) ) ; err == nil || val != "" {
// we have a locally set override, don't pass this remote set public scope update to UI
return ""
}
}
}
2022-09-07 16:43:24 +00:00
case event . TokenManagerInfo :
conversations , err := profile . FetchConversations ( )
if err == nil {
var associatedGroups [ ] int
for _ , ci := range conversations {
groupServer , groupServerExists := ci . Attributes [ attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupServer ) ) . ToString ( ) ]
if groupServerExists {
gci , err := profile . FetchConversationInfo ( groupServer )
if err == nil {
tokenOnion , onionExists := gci . Attributes [ attr . PublicScope . ConstructScopedZonedPath ( attr . ServerKeyZone . ConstructZonedPath ( string ( model . KeyTypeTokenOnion ) ) ) . ToString ( ) ]
if onionExists && tokenOnion == ev . Event . Data [ event . ServerTokenOnion ] {
associatedGroups = append ( associatedGroups , ci . ID )
}
}
}
}
associatedGroupsJson , _ := json . Marshal ( associatedGroups )
ev . Event . Data [ event . Data ] = string ( associatedGroupsJson )
}
2022-12-07 21:46:15 +00:00
case event . ProtocolEngineCreated :
// TODO this code should be moved into Cwtch during the API officialization...
settings := ReadGlobalSettings ( )
2023-02-08 21:51:09 +00:00
// ensure that protocol engine respects blocking settings...
if settings . BlockUnknownConnections {
profile . BlockUnknownConnections ( )
} else {
profile . AllowUnknownConnections ( )
}
2022-12-07 21:46:15 +00:00
// Now that the Peer Engine is Activated, Share Files
key , exists := profile . GetScopedZonedAttribute ( attr . PublicScope , attr . ProfileZone , constants . CustomProfileImageKey )
if exists {
serializedManifest , _ := profile . GetScopedZonedAttribute ( attr . ConversationScope , attr . FilesharingZone , fmt . Sprintf ( "%s.manifest" , key ) )
// reset the share timestamp, currently file shares are hardcoded to expire after 30 days...
// we reset the profile image here so that it is always available.
profile . SetScopedZonedAttribute ( attr . LocalScope , attr . FilesharingZone , fmt . Sprintf ( "%s.ts" , key ) , strconv . FormatInt ( time . Now ( ) . Unix ( ) , 10 ) )
log . Debugf ( "Custom Profile Image: %v %s" , key , serializedManifest )
}
// If file sharing is enabled then reshare all active files...
fsf , err := filesharing . FunctionalityGate ( settings . Experiments )
if err == nil {
fsf . ReShareFiles ( profile )
}
2022-08-26 17:58:18 +00:00
}
}
json , _ := json . Marshal ( unwrap ( ev ) )
return string ( json )
2021-06-24 22:30:46 +00:00
}
func unwrap ( original * EventProfileEnvelope ) * event . Event {
2022-08-26 17:58:18 +00:00
unwrapped := & original . Event
unwrapped . Data [ "ProfileOnion" ] = original . Profile
return unwrapped
2021-06-24 22:30:46 +00:00
}
func ( eh * EventHandler ) startHandlingPeer ( onion string ) {
2022-08-26 17:58:18 +00:00
eventBus := eh . app . GetEventBus ( onion )
q := event . NewQueue ( )
eventBus . Subscribe ( event . NetworkStatus , q )
eventBus . Subscribe ( event . ACNInfo , q )
eventBus . Subscribe ( event . NewMessageFromPeer , q )
eventBus . Subscribe ( event . UpdatedProfileAttribute , q )
eventBus . Subscribe ( event . PeerAcknowledgement , q )
eventBus . Subscribe ( event . DeleteContact , q )
eventBus . Subscribe ( event . AppError , q )
eventBus . Subscribe ( event . IndexedAcknowledgement , q )
eventBus . Subscribe ( event . IndexedFailure , q )
eventBus . Subscribe ( event . ContactCreated , q )
eventBus . Subscribe ( event . NewMessageFromGroup , q )
eventBus . Subscribe ( event . GroupCreated , q )
eventBus . Subscribe ( event . NewGroup , q )
eventBus . Subscribe ( event . ServerStateChange , q )
eventBus . Subscribe ( event . PeerStateChange , q )
eventBus . Subscribe ( event . NewRetValMessageFromPeer , q )
eventBus . Subscribe ( event . ShareManifest , q )
eventBus . Subscribe ( event . ManifestSizeReceived , q )
eventBus . Subscribe ( event . ManifestError , q )
eventBus . Subscribe ( event . ManifestReceived , q )
eventBus . Subscribe ( event . ManifestSaved , q )
eventBus . Subscribe ( event . FileDownloadProgressUpdate , q )
eventBus . Subscribe ( event . FileDownloaded , q )
2022-09-07 16:43:24 +00:00
eventBus . Subscribe ( event . TokenManagerInfo , q )
2022-12-07 21:46:15 +00:00
eventBus . Subscribe ( event . ProtocolEngineCreated , q )
2022-08-26 17:58:18 +00:00
go eh . forwardProfileMessages ( onion , q )
2021-06-24 22:30:46 +00:00
}
func ( eh * EventHandler ) forwardProfileMessages ( onion string , q event . Queue ) {
2022-08-26 17:58:18 +00:00
log . Infof ( "Launching Forwarding Goroutine" )
// TODO: graceful shutdown, via an injected event of special QUIT type exiting loop/go routine
for {
e := q . Next ( )
ev := EventProfileEnvelope { Event : e , Profile : onion }
eh . profileEvents <- ev
if ev . Event . EventType == event . Shutdown {
return
}
}
2021-06-24 22:30:46 +00:00
}
2021-11-26 01:59:00 +00:00
// Push pushes an event onto the app event bus
2022-09-07 16:43:24 +00:00
//
// It is also a way for libCwtch-go to publish an event for consumption by a UI before a Cwtch app has been initialized
// use: to signal an error before a cwtch app could be created
2021-06-24 22:30:46 +00:00
func ( eh * EventHandler ) Push ( newEvent event . Event ) {
2022-08-26 17:58:18 +00:00
eh . appBusQueue . Publish ( newEvent )
2021-06-24 22:30:46 +00:00
}
2021-11-17 20:33:51 +00:00
func getLastMessageTime ( conversationMessages [ ] model . ConversationMessage ) int {
2022-08-26 17:58:18 +00:00
if len ( conversationMessages ) == 0 {
return 0
}
time , err := time . Parse ( time . RFC3339Nano , conversationMessages [ 0 ] . Attr [ constants . AttrSentTimestamp ] )
if err != nil {
return 0
}
return int ( time . Unix ( ) )
2021-11-18 23:44:21 +00:00
}
2021-12-19 01:16:21 +00:00
// handleImagePreviews checks settings and, if appropriate, auto-downloads any images
func handleImagePreviews ( profile peer . CwtchPeer , ev * event . Event , conversationID , senderID int ) {
2022-08-26 17:58:18 +00:00
settings := ReadGlobalSettings ( )
fh , err := filesharing . PreviewFunctionalityGate ( settings . Experiments )
// Short-circuit if file sharing is disabled
if err != nil {
return
}
// Short-circuit failures
// Don't autodownload images if the download path does not exist.
if settings . DownloadPath == "" {
return
}
// Don't autodownload images if the download path does not exist.
if _ , err := os . Stat ( settings . DownloadPath ) ; os . IsNotExist ( err ) {
return
}
// Now look at the image preview experiment
imagePreviewsEnabled := settings . Experiments [ "filesharing-images" ]
if imagePreviewsEnabled {
var cm model . MessageWrapper
err := json . Unmarshal ( [ ] byte ( ev . Data [ event . Data ] ) , & cm )
if err == nil && cm . Overlay == model . OverlayFileSharing {
var fm filesharing . OverlayMessage
err = json . Unmarshal ( [ ] byte ( cm . Data ) , & fm )
if err == nil {
if fm . ShouldAutoDL ( ) {
basepath := settings . DownloadPath
fp , mp := filesharing . GenerateDownloadPath ( basepath , fm . Name , false )
log . Debugf ( "autodownloading file!" )
ev . Data [ "Auto" ] = constants . True
mID , _ := strconv . Atoi ( ev . Data [ "Index" ] )
profile . UpdateMessageAttribute ( conversationID , 0 , mID , constants . AttrDownloaded , constants . True )
fh . DownloadFile ( profile , senderID , fp , mp , fm . FileKey ( ) , constants . ImagePreviewMaxSizeInBytes )
}
}
}
}
2021-12-19 01:16:21 +00:00
}