libcwtch-go/utils/eventHandler.go

375 lines
13 KiB
Go

package utils
import (
"cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
groups2 "cwtch.im/cwtch/protocol/groups"
"encoding/json"
"git.openprivacy.ca/flutter/libcwtch-go/constants"
"git.openprivacy.ca/flutter/libcwtch-go/features/groups"
"git.openprivacy.ca/openprivacy/log"
"strconv"
)
import "cwtch.im/cwtch/event"
type EventProfileEnvelope struct {
Event event.Event
Profile string
}
type EventHandler struct {
app app.Application
appBusQueue event.Queue
profileEvents chan EventProfileEnvelope
profileQueues map[string]event.Queue
}
func NewEventHandler(application app.Application) *EventHandler {
appBusQueue := event.NewQueue()
application.GetPrimaryBus().Subscribe(event.NewPeer, appBusQueue)
application.GetPrimaryBus().Subscribe(event.PeerError, appBusQueue)
application.GetPrimaryBus().Subscribe(event.AppError, appBusQueue)
application.GetPrimaryBus().Subscribe(event.ACNStatus, appBusQueue)
application.GetPrimaryBus().Subscribe(event.ReloadDone, appBusQueue)
application.GetPrimaryBus().Subscribe(event.ACNVersion, appBusQueue)
application.GetPrimaryBus().Subscribe(UpdateGlobalSettings, appBusQueue)
return &EventHandler{app: application, appBusQueue: appBusQueue, profileQueues: make(map[string]event.Queue), profileEvents: make(chan EventProfileEnvelope)}
}
func (eh *EventHandler) GetNextEvent() string {
appChan := eh.appBusQueue.OutChan()
select {
case e := <-appChan:
return eh.handleAppBusEvent(&e)
case ev := <-eh.profileEvents:
return eh.handleProfileEvent(&ev)
}
}
// handleAppBusEvent enriches AppBus events so they are usable with out further data fetches
func (eh *EventHandler) handleAppBusEvent(e *event.Event) string {
log.Debugf("New AppBus Event to Handle: %v", e)
switch e.EventType {
case event.ACNStatus:
if e.Data[event.Progress] == "100" {
for onion := range eh.app.ListPeers() {
// launch a listen thread (internally this does a check that the protocol engine is not listening)
// and as such is safe to call.
eh.app.GetPeer(onion).Listen()
}
}
case event.NewPeer:
onion := e.Data[event.Identity]
profile := eh.app.GetPeer(e.Data[event.Identity])
log.Debug("New Peer Event: %v", e)
eh.startHandlingPeer(onion)
if e.Data[event.Created] == event.True {
profile.SetAttribute(attr.GetPublicScope(constants.Name), profile.GetName())
profile.SetAttribute(attr.GetPublicScope(constants.Picture), ImageToString(NewImage(RandomProfileImage(onion), TypeImageDistro)))
}
if e.Data[event.Status] != event.StorageRunning || e.Data[event.Created] == event.True {
profile.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.False)
eh.app.AddPeerPlugin(onion, plugins.CONNECTIONRETRY)
eh.app.AddPeerPlugin(onion, plugins.NETWORKCHECK)
// If the user has chosen to block unknown profiles
// then explicitly configure the protocol engine to do so..
if ReadGlobalSettings().BlockUnknownConnections {
profile.BlockUnknownConnections()
} else {
// For completeness
profile.AllowUnknownConnections()
}
// Start up the Profile
profile.Listen()
profile.StartPeersConnections()
if _, err := groups.ExperimentGate(ReadGlobalSettings().Experiments); err == nil {
profile.StartServerConnections()
}
}
nick, exists := profile.GetAttribute(attr.GetPublicScope(constants.Name))
if !exists {
nick = onion
}
picVal, ok := profile.GetAttribute(attr.GetPublicScope(constants.Picture))
if !ok {
picVal = ImageToString(NewImage(RandomProfileImage(onion), TypeImageDistro))
}
pic, err := StringToImage(picVal)
if err != nil {
pic = NewImage(RandomProfileImage(onion), TypeImageDistro)
}
picPath := GetPicturePath(pic)
//tag, _ := profile.GetAttribute(app.AttributeTag)
online, _ := profile.GetAttribute(attr.GetLocalScope(constants.PeerOnline))
e.Data[constants.Name] = nick
e.Data[constants.Picture] = picPath
e.Data["Online"] = online
var contacts []Contact
var servers []groups.Server
for _, contact := range profile.GetContacts() {
// Only compile the server info if we have enabled the experiment...
// Note that this means that this info can become stale if when first loaded the experiment
// has been disabled and then is later re-enabled. As such we need to ensure that this list is
// re-fetched when the group experiment is enabled via a dedicated ListServerInfo event...
if profile.GetContact(contact).IsServer() {
groupHandler, err := groups.ExperimentGate(ReadGlobalSettings().Experiments)
if err == nil {
servers = append(servers, groupHandler.GetServerInfo(contact, profile))
}
continue
}
contactInfo := profile.GetContact(contact)
ph := NewPeerHelper(profile)
name := ph.GetNick(contact)
cpicPath := ph.GetProfilePic(contact)
saveHistory, set := contactInfo.GetAttribute(event.SaveHistoryKey)
if !set {
saveHistory = event.DeleteHistoryDefault
}
contacts = append(contacts, Contact{
Name: name,
Onion: contactInfo.Onion,
Status: contactInfo.State,
Picture: cpicPath,
Authorization: string(contactInfo.Authorization),
SaveHistory: saveHistory,
Messages: contactInfo.Timeline.Len(),
Unread: 0,
LastMessage: strconv.Itoa(getLastMessageTime(&contactInfo.Timeline)),
IsGroup: false,
})
}
// We compile and send the groups regardless of the experiment flag, and hide them in the UI
for _, groupId := range profile.GetGroups() {
group := profile.GetGroup(groupId)
ph := NewPeerHelper(profile)
cpicPath := ph.GetProfilePic(groupId)
// todo hack for now, auto accept groups
profile.AcceptInvite(groupId)
authorization := model.AuthUnknown
if group.Accepted {
authorization = model.AuthApproved
}
contacts = append(contacts, Contact{
Name: ph.GetNick(groupId),
Onion: group.GroupID,
Status: group.State,
Picture: cpicPath,
Authorization: string(authorization),
SaveHistory: event.SaveHistoryConfirmed,
Messages: group.Timeline.Len(),
Unread: 0,
LastMessage: strconv.Itoa(getLastMessageTime(&group.Timeline)),
IsGroup: true,
GroupServer: group.GroupServer,
})
}
bytes, _ := json.Marshal(contacts)
e.Data["ContactsJson"] = string(bytes)
// Marshal the server list into the new peer event...
serversListBytes, _ := json.Marshal(servers)
e.Data[groups.ServerList] = string(serversListBytes)
log.Infof("contactsJson %v", e.Data["ContactsJson"])
}
json, _ := json.Marshal(e)
return string(json)
}
// handleProfileEvent enriches Profile events so they are usable with out further data fetches
func (eh *EventHandler) handleProfileEvent(ev *EventProfileEnvelope) string {
peer := eh.app.GetPeer(ev.Profile)
ph := NewPeerHelper(peer)
log.Debugf("New Profile Event to Handle: %v", ev)
switch ev.Event.EventType {
/*case event.NetworkStatus:
online, _ := peer.GetAttribute(attr.GetLocalScope(constants.PeerOnline))
if e.Data[event.Status] == plugins.NetworkCheckSuccess && online == event.False {
peer.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.True)
uiManager.UpdateNetworkStatus(true)
// TODO we may have to reinitialize the peer
} else if e.Data[event.Status] == plugins.NetworkCheckError && online == event.True {
peer.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.False)
uiManager.UpdateNetworkStatus(false)
}*/
case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data
// I Don't think we need to enrich
// TODO: deprecate and move to dart
/*func (p *PeerHelper) updateLastReadTime(id string) {
lastRead, _ := time.Now().MarshalText()
if p.IsGroup(id) {
p.peer.SetGroupAttribute(id, attr.GetLocalScope(constants.LastRead), string(lastRead))
} else {
p.peer.SetContactAttribute(id, attr.GetLocalScope(constants.LastRead), string(lastRead))
}
}*/
// legacy
//ts, _ := time.Parse(time.RFC3339Nano, e.Data[event.TimestampReceived])
case event.PeerAcknowledgement:
// No enrichement required
//Acknowledge(ev.Event.Data[event.RemotePeer], ev.Event.Data[event.EventID])
/*
case event.NewMessageFromGroup: //event.TimestampReceived, event.TimestampSent, event.Data, event.GroupID, event.RemotePeer
ts, _ := time.Parse(time.RFC3339Nano, e.Data[event.TimestampSent])
uiManager.AddMessage(e.Data[event.GroupID], e.Data[event.RemotePeer], e.Data[event.Data], e.Data[event.RemotePeer] == peer.GetOnion(), hex.EncodeToString([]byte(e.Data[event.Signature])), ts, true)
case event.NewGroupInvite:
gid, err := peer.ProcessInvite(e.Data[event.GroupInvite], e.Data[event.RemotePeer])
group := peer.GetGroup(gid)
if err == nil && group != nil {
uiManager.AddContact(gid)
}
*/
case event.PeerCreated:
handle := ev.Event.Data[event.RemotePeer]
err := EnrichNewPeer(handle, ph, ev)
if err != nil {
return ""
}
/*
case event.SendMessageToGroupError:
uiManager.AddSendMessageError(e.Data[event.GroupServer], e.Data[event.Signature], e.Data[event.Error])
case event.SendMessageToPeerError:
uiManager.AddSendMessageError(e.Data[event.RemotePeer], e.Data[event.EventID], e.Data[event.Error])
*/
case event.NewGroupInvite:
var invite = groups2.GroupInvite{}
json.Unmarshal([]byte(ev.Event.Data[event.GroupInvite]), &invite)
groupPic := ph.GetProfilePic(invite.GroupID)
ev.Event.Data["PicturePath"] = groupPic
case event.PeerStateChange:
cxnState := connections.ConnectionStateToType()[ev.Event.Data[event.ConnectionState]]
contact := peer.GetContact(ev.Event.Data[event.RemotePeer])
if cxnState == connections.AUTHENTICATED && contact == nil {
peer.AddContact(ev.Event.Data[event.RemotePeer], ev.Event.Data[event.RemotePeer], model.AuthUnknown)
return ""
}
if contact != nil {
// No enrichment needed
//uiManager.UpdateContactStatus(contact.Onion, int(cxnState), false)
if cxnState == connections.AUTHENTICATED {
// if known and authed, get vars
peer.SendGetValToPeer(ev.Event.Data[event.RemotePeer], attr.PublicScope, constants.Name)
peer.SendGetValToPeer(ev.Event.Data[event.RemotePeer], attr.PublicScope, constants.Picture)
}
}
/*case event.NewRetValMessageFromPeer:
onion := e.Data[event.RemotePeer]
scope := e.Data[event.Scope]
path := e.Data[event.Path]
val := e.Data[event.Data]
exists, _ := strconv.ParseBool(e.Data[event.Exists])
if exists && scope == attr.PublicScope {
switch path {
case constants.Name:
peer.SetContactAttribute(onion, attr.GetPeerScope(constants.Name), val)
uiManager.UpdateContactDisplayName(onion)
case constants.Picture:
peer.SetContactAttribute(onion, attr.GetPeerScope(constants.Picture), val)
uiManager.UpdateContactPicture(onion)
}
}
case event.ServerStateChange:
serverOnion := e.Data[event.GroupServer]
state := connections.ConnectionStateToType[e.Data[event.ConnectionState]]
groups := peer.GetGroups()
for _, groupID := range groups {
group := peer.GetGroup(groupID)
if group != nil && group.GroupServer == serverOnion {
group.State = e.Data[event.ConnectionState]
loading := false
if state == connections.AUTHENTICATED {
loading = true
}
uiManager.UpdateContactStatus(groupID, int(state), loading)
uiManager.UpdateContactStatus(serverOnion, int(state), loading)
}
}
case event.DeletePeer:
log.Infof("PeerHandler got DeletePeer, SHUTTING down!\n")
uiManager.ReloadProfiles()
return
*/
}
json, _ := json.Marshal(unwrap(ev))
return string(json)
}
func unwrap(original *EventProfileEnvelope) *event.Event {
unwrapped := &original.Event
unwrapped.Data["ProfileOnion"] = original.Profile
return unwrapped
}
func (eh *EventHandler) startHandlingPeer(onion string) {
eventBus := eh.app.GetEventBus(onion)
q := event.NewQueue()
eventBus.Subscribe(event.NewMessageFromPeer, q)
eventBus.Subscribe(event.PeerAcknowledgement, q)
eventBus.Subscribe(event.NewMessageFromGroup, q)
eventBus.Subscribe(event.NewGroupInvite, q)
eventBus.Subscribe(event.SendMessageToGroupError, q)
eventBus.Subscribe(event.SendMessageToPeerError, q)
eventBus.Subscribe(event.ServerStateChange, q)
eventBus.Subscribe(event.PeerStateChange, q)
eventBus.Subscribe(event.PeerCreated, q)
eventBus.Subscribe(event.NetworkStatus, q)
eventBus.Subscribe(event.ChangePasswordSuccess, q)
eventBus.Subscribe(event.ChangePasswordError, q)
eventBus.Subscribe(event.NewRetValMessageFromPeer, q)
eventBus.Subscribe(event.SetAttribute, q)
eh.profileQueues[onion] = q
go eh.forwardProfileMessages(onion, q)
}
func (eh *EventHandler) forwardProfileMessages(onion string, q event.Queue) {
// TODO: graceful shutdown, via an injected event of special QUIT type exiting loop/go routine
for {
e := q.Next()
ev := EventProfileEnvelope{Event: e, Profile: onion}
eh.profileEvents <- ev
}
}
func (eh *EventHandler) Push(newEvent event.Event) {
eh.appBusQueue.Publish(newEvent)
}