2021-06-24 22:30:46 +00:00
package utils
import (
"cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
2021-10-15 20:09:36 +00:00
"cwtch.im/cwtch/model/constants"
2021-06-24 22:30:46 +00:00
"cwtch.im/cwtch/protocol/connections"
"encoding/json"
2021-10-15 20:09:36 +00:00
constants2 "git.openprivacy.ca/cwtch.im/libcwtch-go/constants"
2021-08-05 23:29:20 +00:00
"git.openprivacy.ca/cwtch.im/libcwtch-go/features/groups"
2021-10-29 23:14:08 +00:00
"git.openprivacy.ca/cwtch.im/libcwtch-go/features/servers"
2021-06-24 22:30:46 +00:00
"git.openprivacy.ca/openprivacy/log"
"strconv"
2021-11-17 20:33:51 +00:00
"time"
2021-06-24 22:30:46 +00:00
)
import "cwtch.im/cwtch/event"
type EventProfileEnvelope struct {
Event event . Event
Profile string
}
type EventHandler struct {
app app . Application
appBusQueue event . Queue
profileEvents chan EventProfileEnvelope
}
func NewEventHandler ( ) * EventHandler {
eh := & EventHandler { app : nil , appBusQueue : event . NewQueue ( ) , profileEvents : make ( chan EventProfileEnvelope ) }
return eh
}
// PublishAppEvent is a way for libCwtch-go to publish an event for consumption by a UI before a Cwtch app has been initialized
// Main use: to signal an error before a cwtch app could be created
func ( eh * EventHandler ) PublishAppEvent ( event event . Event ) {
eh . appBusQueue . Publish ( event )
}
func ( eh * EventHandler ) HandleApp ( application app . Application ) {
eh . app = application
application . GetPrimaryBus ( ) . Subscribe ( event . NewPeer , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . PeerError , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . PeerDeleted , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . Shutdown , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . AppError , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . ACNStatus , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . ReloadDone , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( event . ACNVersion , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( UpdateGlobalSettings , eh . appBusQueue )
application . GetPrimaryBus ( ) . Subscribe ( CwtchStarted , eh . appBusQueue )
2021-10-29 23:14:08 +00:00
application . GetPrimaryBus ( ) . Subscribe ( servers . NewServer , eh . appBusQueue )
2021-11-02 01:48:00 +00:00
application . GetPrimaryBus ( ) . Subscribe ( servers . ServerIntentUpdate , eh . appBusQueue )
2021-11-04 01:19:30 +00:00
application . GetPrimaryBus ( ) . Subscribe ( servers . ServerDeleted , eh . appBusQueue )
2021-06-24 22:30:46 +00:00
}
func ( eh * EventHandler ) GetNextEvent ( ) string {
appChan := eh . appBusQueue . OutChan ( )
select {
case e := <- appChan :
return eh . handleAppBusEvent ( & e )
2021-11-17 20:33:51 +00:00
default :
select {
case e := <- appChan :
return eh . handleAppBusEvent ( & e )
case ev := <- eh . profileEvents :
return eh . handleProfileEvent ( & ev )
}
2021-06-24 22:30:46 +00:00
}
}
// handleAppBusEvent enriches AppBus events so they are usable with out further data fetches
func ( eh * EventHandler ) handleAppBusEvent ( e * event . Event ) string {
log . Debugf ( "New AppBus Event to Handle: %v" , e )
if eh . app != nil {
switch e . EventType {
case event . ACNStatus :
if e . Data [ event . Progress ] == "100" {
2021-10-15 20:09:36 +00:00
for _ , onion := range eh . app . ListProfiles ( ) {
2021-06-24 22:30:46 +00:00
// launch a listen thread (internally this does a check that the protocol engine is not listening)
// and as such is safe to call.
eh . app . GetPeer ( onion ) . Listen ( )
}
}
case event . NewPeer :
onion := e . Data [ event . Identity ]
profile := eh . app . GetPeer ( e . Data [ event . Identity ] )
log . Debug ( "New Peer Event: %v" , e )
if e . Data [ "Reload" ] != event . True {
eh . startHandlingPeer ( onion )
}
2021-10-15 20:09:36 +00:00
// CwtchPeer will always set this now...
2021-10-15 20:19:14 +00:00
tag , _ := profile . GetScopedZonedAttribute ( attr . LocalScope , attr . ProfileZone , constants . Tag )
2021-10-15 20:09:36 +00:00
e . Data [ constants . Tag ] = tag
2021-06-24 22:30:46 +00:00
if e . Data [ event . Created ] == event . True {
2021-10-15 20:09:36 +00:00
profile . SetScopedZonedAttribute ( attr . PublicScope , attr . ProfileZone , constants2 . Picture , ImageToString ( NewImage ( RandomProfileImage ( onion ) , TypeImageDistro ) ) )
2021-06-24 22:30:46 +00:00
}
if e . Data [ event . Status ] != event . StorageRunning || e . Data [ event . Created ] == event . True {
2021-10-15 20:09:36 +00:00
profile . SetScopedZonedAttribute ( attr . LocalScope , attr . ProfileZone , constants2 . PeerOnline , event . False )
2021-06-24 22:30:46 +00:00
eh . app . AddPeerPlugin ( onion , plugins . CONNECTIONRETRY )
eh . app . AddPeerPlugin ( onion , plugins . NETWORKCHECK )
// If the user has chosen to block unknown profiles
// then explicitly configure the protocol engine to do so..
if ReadGlobalSettings ( ) . BlockUnknownConnections {
profile . BlockUnknownConnections ( )
} else {
// For completeness
profile . AllowUnknownConnections ( )
}
// Start up the Profile
profile . Listen ( )
profile . StartPeersConnections ( )
if _ , err := groups . ExperimentGate ( ReadGlobalSettings ( ) . Experiments ) ; err == nil {
profile . StartServerConnections ( )
}
}
2021-10-15 20:09:36 +00:00
online , _ := profile . GetScopedZonedAttribute ( attr . LocalScope , attr . ProfileZone , constants2 . PeerOnline )
2021-10-15 20:48:05 +00:00
// Name always exists
e . Data [ constants . Name ] , _ = profile . GetScopedZonedAttribute ( attr . PublicScope , attr . ProfileZone , constants . Name )
2021-11-17 20:33:51 +00:00
e . Data [ constants2 . Picture ] = RandomProfileImage ( onion )
2021-06-24 22:30:46 +00:00
e . Data [ "Online" ] = online
var contacts [ ] Contact
var servers [ ] groups . Server
2021-11-17 20:33:51 +00:00
conversations , err := profile . FetchConversations ( )
if err != nil {
/// um....
return ""
}
for _ , conversationInfo := range conversations {
2021-06-24 22:30:46 +00:00
// Only compile the server info if we have enabled the experiment...
// Note that this means that this info can become stale if when first loaded the experiment
// has been disabled and then is later re-enabled. As such we need to ensure that this list is
// re-fetched when the group experiment is enabled via a dedicated ListServerInfo event...
2021-11-17 20:33:51 +00:00
if conversationInfo . IsServer ( ) {
2021-06-24 22:30:46 +00:00
groupHandler , err := groups . ExperimentGate ( ReadGlobalSettings ( ) . Experiments )
if err == nil {
2021-11-17 20:33:51 +00:00
servers = append ( servers , groupHandler . GetServerInfo ( conversationInfo . Handle , profile ) )
2021-06-24 22:30:46 +00:00
}
continue
}
2021-11-18 23:44:21 +00:00
name , exists := conversationInfo . GetAttribute ( attr . LocalScope , attr . ProfileZone , constants . Name )
if ! exists {
name , exists = conversationInfo . GetAttribute ( attr . PeerScope , attr . ProfileZone , constants . Name )
if ! exists {
name = conversationInfo . Handle
}
}
var cpicPath string
if conversationInfo . IsGroup ( ) {
cpicPath = RandomGroupImage ( conversationInfo . Handle )
} else {
cpicPath = RandomProfileImage ( conversationInfo . Handle )
}
2021-11-17 20:33:51 +00:00
saveHistory , set := conversationInfo . GetAttribute ( attr . LocalScope , attr . ProfileZone , event . SaveHistoryKey )
2021-06-24 22:30:46 +00:00
if ! set {
saveHistory = event . DeleteHistoryDefault
}
2021-11-17 20:33:51 +00:00
isArchived , set := conversationInfo . GetAttribute ( attr . LocalScope , attr . ProfileZone , constants2 . Archived )
2021-08-27 20:25:41 +00:00
if ! set {
isArchived = event . False
}
2021-06-24 22:30:46 +00:00
2021-11-17 22:34:35 +00:00
state := profile . GetPeerState ( conversationInfo . Handle )
2021-11-17 20:33:51 +00:00
if ! set {
state = connections . DISCONNECTED
2021-06-24 22:30:46 +00:00
}
authorization := model . AuthUnknown
2021-11-17 20:33:51 +00:00
if conversationInfo . Accepted {
2021-06-24 22:30:46 +00:00
authorization = model . AuthApproved
}
2021-11-17 20:33:51 +00:00
if acl , exists := conversationInfo . ACL [ conversationInfo . Handle ] ; exists && acl . Blocked {
authorization = model . AuthBlocked
}
2021-11-18 23:44:21 +00:00
groupServer , _ := conversationInfo . GetAttribute ( attr . LocalScope , attr . LegacyGroupZone , constants . GroupServer )
2021-11-17 20:33:51 +00:00
count , err := profile . GetChannelMessageCount ( conversationInfo . ID , 0 )
if err != nil {
log . Errorf ( "error fetching channel message count %v %v" , conversationInfo . ID , err )
2021-08-27 20:25:41 +00:00
}
2021-11-17 20:33:51 +00:00
2021-11-18 23:44:21 +00:00
lastMessage , _ := profile . GetMostRecentMessages ( conversationInfo . ID , 0 , 0 , 1 )
2021-06-30 20:28:16 +00:00
2021-06-24 22:30:46 +00:00
contacts = append ( contacts , Contact {
2021-11-17 20:33:51 +00:00
Name : name ,
2021-11-18 23:44:21 +00:00
Identifier : conversationInfo . ID ,
2021-11-17 20:33:51 +00:00
Onion : conversationInfo . Handle ,
Status : connections . ConnectionStateName [ state ] ,
2021-06-24 22:30:46 +00:00
Picture : cpicPath ,
Authorization : string ( authorization ) ,
2021-11-17 20:33:51 +00:00
SaveHistory : saveHistory ,
Messages : count ,
2021-06-24 22:30:46 +00:00
Unread : 0 ,
2021-11-17 20:33:51 +00:00
LastMessage : strconv . Itoa ( getLastMessageTime ( lastMessage ) ) ,
IsGroup : conversationInfo . IsGroup ( ) ,
GroupServer : groupServer ,
2021-08-27 20:25:41 +00:00
IsArchived : isArchived == event . True ,
2021-06-24 22:30:46 +00:00
} )
}
bytes , _ := json . Marshal ( contacts )
e . Data [ "ContactsJson" ] = string ( bytes )
// Marshal the server list into the new peer event...
serversListBytes , _ := json . Marshal ( servers )
e . Data [ groups . ServerList ] = string ( serversListBytes )
log . Debugf ( "contactsJson %v" , e . Data [ "ContactsJson" ] )
}
}
json , _ := json . Marshal ( e )
return string ( json )
}
// handleProfileEvent enriches Profile events so they are usable with out further data fetches
func ( eh * EventHandler ) handleProfileEvent ( ev * EventProfileEnvelope ) string {
if eh . app == nil {
log . Errorf ( "eh.app == nil in handleProfileEvent... this shouldnt happen?" )
} else {
2021-11-17 20:33:51 +00:00
profile := eh . app . GetPeer ( ev . Profile )
2021-06-24 22:30:46 +00:00
log . Debugf ( "New Profile Event to Handle: %v" , ev )
switch ev . Event . EventType {
case event . NewMessageFromPeer : //event.TimestampReceived, event.RemotePeer, event.Data
// only needs contact nickname and picture, for displaying on popup notifications
2021-11-17 20:33:51 +00:00
ci , err := profile . FetchConversationInfo ( ev . Event . Data [ "RemotePeer" ] )
if ci != nil && err == nil {
ev . Event . Data [ event . ConversationID ] = strconv . Itoa ( ci . ID )
profile . SetConversationAttribute ( ci . ID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants2 . Archived ) ) , event . False )
2021-11-17 22:34:35 +00:00
} else {
// TODO This Conversation May Not Exist Yet...But we are not in charge of creating it...
2021-11-18 23:44:21 +00:00
log . Errorf ( "todo wait for contact to be added before processing this event..." )
2021-11-17 20:33:51 +00:00
}
2021-11-18 23:44:21 +00:00
ev . Event . Data [ "Nick" ] , _ = ci . GetAttribute ( attr . PublicScope , attr . ProfileZone , constants . Name )
2021-11-17 20:33:51 +00:00
ev . Event . Data [ "Picture" ] = RandomProfileImage ( ev . Event . Data [ "RemotePeer" ] )
2021-06-24 22:30:46 +00:00
case event . NewMessageFromGroup :
// only needs contact nickname and picture, for displaying on popup notifications
2021-11-17 20:33:51 +00:00
ci , err := profile . FetchConversationInfo ( ev . Event . Data [ "RemotePeer" ] )
if ci != nil && err == nil {
2021-11-18 23:44:21 +00:00
ev . Event . Data [ "Nick" ] , _ = ci . GetAttribute ( attr . PublicScope , attr . ProfileZone , constants . Name )
2021-11-17 20:33:51 +00:00
}
ev . Event . Data [ "Picture" ] = RandomProfileImage ( ev . Event . Data [ event . GroupID ] )
2021-11-18 23:44:21 +00:00
conversationID , _ := strconv . Atoi ( ev . Event . Data [ event . ConversationID ] )
2021-11-17 20:33:51 +00:00
profile . SetConversationAttribute ( conversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants2 . Archived ) ) , event . False )
2021-06-24 22:30:46 +00:00
case event . PeerAcknowledgement :
2021-11-17 20:33:51 +00:00
ci , err := profile . FetchConversationInfo ( ev . Event . Data [ "RemotePeer" ] )
if ci != nil && err == nil {
ev . Event . Data [ event . ConversationID ] = strconv . Itoa ( ci . ID )
}
2021-11-17 22:34:35 +00:00
case event . ContactCreated :
2021-11-18 23:44:21 +00:00
conversationID , _ := strconv . Atoi ( ev . Event . Data [ event . ConversationID ] )
2021-06-24 22:30:46 +00:00
handle := ev . Event . Data [ event . RemotePeer ]
2021-11-17 22:34:35 +00:00
count , err := profile . GetChannelMessageCount ( conversationID , 0 )
2021-06-24 22:30:46 +00:00
if err != nil {
2021-11-17 22:34:35 +00:00
log . Errorf ( "error fetching channel message count %v %v" , conversationID , err )
2021-06-24 22:30:46 +00:00
}
2021-11-17 22:34:35 +00:00
2021-11-18 23:44:21 +00:00
lastMessage , _ := profile . GetMostRecentMessages ( conversationID , 0 , 0 , 1 )
2021-11-17 22:34:35 +00:00
ev . Event . Data [ "unread" ] = strconv . Itoa ( 1 ) // we've just created this contact so by definition this must be 1...
ev . Event . Data [ "picture" ] = RandomProfileImage ( handle )
ev . Event . Data [ "numMessages" ] = strconv . Itoa ( count )
ev . Event . Data [ "nick" ] = handle
ev . Event . Data [ "status" ] = connections . ConnectionStateName [ profile . GetPeerState ( handle ) ]
ev . Event . Data [ "authorization" ] = string ( model . AuthUnknown )
ev . Event . Data [ "loading" ] = "false"
ev . Event . Data [ "lastMsgTime" ] = strconv . Itoa ( getLastMessageTime ( lastMessage ) )
2021-06-24 22:30:46 +00:00
case event . GroupCreated :
// This event should only happen after we have validated the invite, as such the error
// condition *should* never happen.
2021-11-17 20:33:51 +00:00
groupPic := RandomGroupImage ( ev . Event . Data [ event . GroupID ] )
2021-06-24 22:30:46 +00:00
ev . Event . Data [ "PicturePath" ] = groupPic
case event . NewGroup :
// This event should only happen after we have validated the invite, as such the error
// condition *should* never happen.
serializedInvite := ev . Event . Data [ event . GroupInvite ]
if invite , err := model . ValidateInvite ( serializedInvite ) ; err == nil {
2021-11-17 20:33:51 +00:00
groupPic := RandomGroupImage ( invite . GroupID )
2021-06-24 22:30:46 +00:00
ev . Event . Data [ "PicturePath" ] = groupPic
} else {
log . Errorf ( "received a new group event which contained an invalid invite %v. this should never happen and likely means there is a bug in cwtch. Please file a ticket @ https://git.openprivcy.ca/cwtch.im/cwtch" , err )
return ""
}
case event . PeerStateChange :
cxnState := connections . ConnectionStateToType ( ) [ ev . Event . Data [ event . ConnectionState ] ]
2021-11-18 23:44:21 +00:00
contact , _ := profile . FetchConversationInfo ( ev . Event . Data [ event . RemotePeer ] )
2021-06-24 22:30:46 +00:00
if cxnState == connections . AUTHENTICATED && contact == nil {
2021-11-17 20:33:51 +00:00
profile . NewContactConversation ( ev . Event . Data [ event . RemotePeer ] , model . AccessControl { Read : false , Append : false , Blocked : false } , false )
2021-06-24 22:30:46 +00:00
return ""
}
if contact != nil {
// No enrichment needed
//uiManager.UpdateContactStatus(contact.Onion, int(cxnState), false)
if cxnState == connections . AUTHENTICATED {
// if known and authed, get vars
2021-11-17 22:34:35 +00:00
profile . SendScopedZonedGetValToContact ( contact . ID , attr . PublicScope , attr . ProfileZone , constants . Name )
profile . SendScopedZonedGetValToContact ( contact . ID , attr . PublicScope , attr . ProfileZone , constants2 . Picture )
2021-06-24 22:30:46 +00:00
}
}
case event . NewRetValMessageFromPeer :
// auto handled event means the setting is already done, we're just deciding if we need to tell the UI
2021-11-18 23:44:21 +00:00
conversationID , _ := strconv . Atoi ( ev . Event . Data [ event . ConversationID ] )
2021-06-24 22:30:46 +00:00
scope := ev . Event . Data [ event . Scope ]
path := ev . Event . Data [ event . Path ]
exists , _ := strconv . ParseBool ( ev . Event . Data [ event . Exists ] )
2021-10-15 17:59:42 +00:00
if exists && attr . IntoScope ( scope ) == attr . PublicScope {
2021-11-18 23:44:21 +00:00
zone , path := attr . ParseZone ( path )
2021-11-17 20:33:51 +00:00
if _ , err := profile . GetConversationAttribute ( conversationID , attr . LocalScope . ConstructScopedZonedPath ( zone . ConstructZonedPath ( path ) ) ) ; err != nil {
2021-10-15 17:59:42 +00:00
// we have a locally set override, don't pass this remote set public scope update to UI
2021-06-24 22:30:46 +00:00
return ""
}
}
}
}
json , _ := json . Marshal ( unwrap ( ev ) )
return string ( json )
}
func unwrap ( original * EventProfileEnvelope ) * event . Event {
unwrapped := & original . Event
unwrapped . Data [ "ProfileOnion" ] = original . Profile
return unwrapped
}
func ( eh * EventHandler ) startHandlingPeer ( onion string ) {
eventBus := eh . app . GetEventBus ( onion )
q := event . NewQueue ( )
eventBus . Subscribe ( event . NewMessageFromPeer , q )
eventBus . Subscribe ( event . PeerAcknowledgement , q )
eventBus . Subscribe ( event . DeleteContact , q )
eventBus . Subscribe ( event . AppError , q )
eventBus . Subscribe ( event . IndexedAcknowledgement , q )
eventBus . Subscribe ( event . IndexedFailure , q )
eventBus . Subscribe ( event . NewMessageFromGroup , q )
2021-06-29 23:38:12 +00:00
eventBus . Subscribe ( event . MessageCounterResync , q )
2021-06-24 22:30:46 +00:00
eventBus . Subscribe ( event . GroupCreated , q )
eventBus . Subscribe ( event . NewGroup , q )
eventBus . Subscribe ( event . AcceptGroupInvite , q )
2021-08-27 20:56:32 +00:00
eventBus . Subscribe ( event . SetPeerAttribute , q )
2021-06-24 22:30:46 +00:00
eventBus . Subscribe ( event . SetGroupAttribute , q )
eventBus . Subscribe ( event . DeleteGroup , q )
eventBus . Subscribe ( event . SendMessageToGroupError , q )
eventBus . Subscribe ( event . SendMessageToPeerError , q )
eventBus . Subscribe ( event . ServerStateChange , q )
eventBus . Subscribe ( event . PeerStateChange , q )
eventBus . Subscribe ( event . NetworkStatus , q )
eventBus . Subscribe ( event . ChangePasswordSuccess , q )
eventBus . Subscribe ( event . ChangePasswordError , q )
eventBus . Subscribe ( event . NewRetValMessageFromPeer , q )
eventBus . Subscribe ( event . SetAttribute , q )
2021-09-30 17:21:36 +00:00
eventBus . Subscribe ( event . ShareManifest , q )
eventBus . Subscribe ( event . ManifestSizeReceived , q )
eventBus . Subscribe ( event . ManifestError , q )
eventBus . Subscribe ( event . ManifestReceived , q )
eventBus . Subscribe ( event . ManifestSaved , q )
eventBus . Subscribe ( event . FileDownloadProgressUpdate , q )
eventBus . Subscribe ( event . FileDownloaded , q )
2021-06-24 22:30:46 +00:00
go eh . forwardProfileMessages ( onion , q )
}
func ( eh * EventHandler ) forwardProfileMessages ( onion string , q event . Queue ) {
2021-06-25 05:16:47 +00:00
log . Infof ( "Launching Forwarding Goroutine" )
2021-06-24 22:30:46 +00:00
// TODO: graceful shutdown, via an injected event of special QUIT type exiting loop/go routine
for {
e := q . Next ( )
ev := EventProfileEnvelope { Event : e , Profile : onion }
eh . profileEvents <- ev
if ev . Event . EventType == event . Shutdown {
return
}
}
}
func ( eh * EventHandler ) Push ( newEvent event . Event ) {
eh . appBusQueue . Publish ( newEvent )
}
2021-11-17 20:33:51 +00:00
func getLastMessageTime ( conversationMessages [ ] model . ConversationMessage ) int {
if len ( conversationMessages ) == 0 {
return 0
}
time , err := time . Parse ( time . RFC3339Nano , conversationMessages [ 0 ] . Attr [ constants . AttrSentTimestamp ] )
if err != nil {
return 0
}
return int ( time . Unix ( ) )
2021-11-18 23:44:21 +00:00
}