From 87c7d7719b1be8deeed92681b4b52cb1f5aac8ee Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Thu, 4 Mar 2021 15:57:48 -0800 Subject: [PATCH] make event manager, clean event code out of lib.go, start handing some peer events and enrich for front end use --- lib.go | 63 ++-------- utils/eventHandler.go | 259 ++++++++++++++++++++++++++++++++++++++++++ utils/manager.go | 125 ++++++++------------ 3 files changed, 313 insertions(+), 134 deletions(-) create mode 100644 utils/eventHandler.go diff --git a/lib.go b/lib.go index c4c305d..213a02c 100644 --- a/lib.go +++ b/lib.go @@ -5,9 +5,9 @@ import "C" import ( "crypto/rand" "cwtch.im/cwtch/app" - "cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/event" "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/peer" "encoding/json" @@ -26,8 +26,7 @@ import ( ) var application app.Application -var appBusQueue event.Queue -var profileRepaintQueue event.Queue +var eventHandler *utils.EventHandler var acnQueue event.Queue var contactEventsQueue event.Queue @@ -68,15 +67,8 @@ func StartCwtch(appDir string, torPath string) { acnQueue = event.NewQueue() newApp.GetPrimaryBus().Subscribe(event.ACNStatus, acnQueue) - appBusQueue = event.NewQueue() - newApp.GetPrimaryBus().Subscribe(event.NewPeer, appBusQueue) - newApp.GetPrimaryBus().Subscribe(event.PeerError, appBusQueue) - newApp.GetPrimaryBus().Subscribe(event.AppError, appBusQueue) - newApp.GetPrimaryBus().Subscribe(event.ACNStatus, appBusQueue) - newApp.GetPrimaryBus().Subscribe(event.ReloadDone, appBusQueue) - newApp.GetPrimaryBus().Subscribe(event.ACNVersion, appBusQueue) + eventHandler = utils.NewEventHandler(newApp) - // Lol this wasn't an intended use peer.DefaultEventsToHandle = []event.Type{ event.EncryptedGroupMessage, event.NewMessageFromPeer, @@ -86,11 +78,11 @@ func StartCwtch(appDir string, torPath string) { event.SendMessageToGroupError, event.NewGetValMessageFromPeer, event.PeerStateChange, + event.NewRetValMessageFromPeer, + event.NewGroupInvite, + event.ServerStateChange, } - profileRepaintQueue = event.NewQueue() - newApp.GetPrimaryBus().Subscribe(event.NewPeer, profileRepaintQueue) - newApp.LoadProfiles("be gay do crime") newApp.LaunchPeers() application = newApp @@ -152,48 +144,7 @@ func c_GetAppBusEvent() *C.char { // GetAppBusEvent blocks until an event func GetAppBusEvent() string { - e := appBusQueue.Next() - if e.EventType == event.NewPeer { - //e.Data[event.ProfileName] = "orb Quen" - //e.Data[event.Path] = "profiles/044-witch.png" - onion := e.Data[event.Identity] - profile := application.GetPeer(e.Data[event.Identity]) - - if e.Data[event.Created] == event.True { - profile.SetAttribute(attr.GetPublicScope(constants.Name), profile.GetName()) - profile.SetAttribute(attr.GetPublicScope(constants.Picture), utils.ImageToString(utils.NewImage(utils.RandomProfileImage(onion), utils.TypeImageDistro))) - } - if e.Data[event.Status] != event.StorageRunning || e.Data[event.Created] == event.True { - profile.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.False) - application.AddPeerPlugin(onion, plugins.CONNECTIONRETRY) - application.AddPeerPlugin(onion, plugins.NETWORKCHECK) - } - - nick, exists := profile.GetAttribute(attr.GetPublicScope(constants.Name)) - if !exists { - nick = onion - } - - picVal, ok := profile.GetAttribute(attr.GetPublicScope(constants.Picture)) - if !ok { - picVal = utils.ImageToString(utils.NewImage(utils.RandomProfileImage(onion), utils.TypeImageDistro)) - } - pic, err := utils.StringToImage(picVal) - if err != nil { - pic = utils.NewImage(utils.RandomProfileImage(onion), utils.TypeImageDistro) - } - picPath := utils.GetPicturePath(pic) - - //tag, _ := profile.GetAttribute(app.AttributeTag) - - online, _ := profile.GetAttribute(attr.GetLocalScope(constants.PeerOnline)) - - e.Data[constants.Name] = nick - e.Data[constants.Picture] = picPath - e.Data["Online"] = online - } - ba, _ := json.Marshal(e) - return string(ba) + return eventHandler.GetNextEvent() } //export c_GetProfileRepaintEvent diff --git a/utils/eventHandler.go b/utils/eventHandler.go new file mode 100644 index 0000000..160a4b7 --- /dev/null +++ b/utils/eventHandler.go @@ -0,0 +1,259 @@ +package utils + +import ( + "cwtch.im/cwtch/app" + "cwtch.im/cwtch/app/plugins" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/protocol/connections" + "encoding/json" + "git.openprivacy.ca/flutter/libcwtch-go/constants" +) +import "cwtch.im/cwtch/event" + + +type EventProfileEnvelope struct { + event event.Event + profile string +} + +type EventHandler struct { + app app.Application + appBusQueue event.Queue + profileEvents chan EventProfileEnvelope + + profileQueues map[string]event.Queue +} + +func NewEventHandler(application app.Application) *EventHandler { + appBusQueue := event.NewQueue() + application.GetPrimaryBus().Subscribe(event.NewPeer, appBusQueue) + application.GetPrimaryBus().Subscribe(event.PeerError, appBusQueue) + application.GetPrimaryBus().Subscribe(event.AppError, appBusQueue) + application.GetPrimaryBus().Subscribe(event.ACNStatus, appBusQueue) + application.GetPrimaryBus().Subscribe(event.ReloadDone, appBusQueue) + application.GetPrimaryBus().Subscribe(event.ACNVersion, appBusQueue) + + return &EventHandler{ appBusQueue: appBusQueue, profileQueues: make(map[string]event.Queue), profileEvents: make(chan EventProfileEnvelope)} +} + +func (eh *EventHandler) GetNextEvent() string { + appChan := eh.appBusQueue.OutChan() + + select { + case e := <-appChan: + return eh.handleAppBusEvent(&e) + case ev := <-eh.profileEvents: + return eh.handleProfileEvent(&ev) + } +} + +// handleAppBusEvent enriches AppBus events so they are usable with out further data fetches +func (eh *EventHandler) handleAppBusEvent(e *event.Event) string { + switch e.EventType { + case event.NewPeer: + onion := e.Data[event.Identity] + profile := eh.app.GetPeer(e.Data[event.Identity]) + + eh.startHandlingPeer(onion) + + if e.Data[event.Created] == event.True { + profile.SetAttribute(attr.GetPublicScope(constants.Name), profile.GetName()) + profile.SetAttribute(attr.GetPublicScope(constants.Picture), ImageToString(NewImage(RandomProfileImage(onion), TypeImageDistro))) + } + if e.Data[event.Status] != event.StorageRunning || e.Data[event.Created] == event.True { + profile.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.False) + eh.app.AddPeerPlugin(onion, plugins.CONNECTIONRETRY) + eh.app.AddPeerPlugin(onion, plugins.NETWORKCHECK) + } + + nick, exists := profile.GetAttribute(attr.GetPublicScope(constants.Name)) + if !exists { + nick = onion + } + + picVal, ok := profile.GetAttribute(attr.GetPublicScope(constants.Picture)) + if !ok { + picVal = ImageToString(NewImage(RandomProfileImage(onion), TypeImageDistro)) + } + pic, err := StringToImage(picVal) + if err != nil { + pic = NewImage(RandomProfileImage(onion), TypeImageDistro) + } + picPath := GetPicturePath(pic) + + //tag, _ := profile.GetAttribute(app.AttributeTag) + + online, _ := profile.GetAttribute(attr.GetLocalScope(constants.PeerOnline)) + + e.Data[constants.Name] = nick + e.Data[constants.Picture] = picPath + e.Data["Online"] = online + } + + json, _ := json.Marshal(e) + return string(json) +} + +// handleProfileEvent enriches profile events so they are usable with out further data fetches +func (eh *EventHandler) handleProfileEvent(ev *EventProfileEnvelope) string { + + peer := eh.app.GetPeer(ev.profile) + ph := NewPeerHelper(peer) + + switch ev.event.EventType { + + /*case event.NetworkStatus: + online, _ := peer.GetAttribute(attr.GetLocalScope(constants.PeerOnline)) + if e.Data[event.Status] == plugins.NetworkCheckSuccess && online == event.False { + peer.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.True) + uiManager.UpdateNetworkStatus(true) + // TODO we may have to reinitialize the peer + } else if e.Data[event.Status] == plugins.NetworkCheckError && online == event.True { + peer.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.False) + uiManager.UpdateNetworkStatus(false) + }*/ + + case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data + // I Don't think we need to enrich + + // TODO: deprecate and move to dart + /*func (p *PeerHelper) updateLastReadTime(id string) { + lastRead, _ := time.Now().MarshalText() + if p.IsGroup(id) { + p.peer.SetGroupAttribute(id, attr.GetLocalScope(constants.LastRead), string(lastRead)) + } else { + p.peer.SetContactAttribute(id, attr.GetLocalScope(constants.LastRead), string(lastRead)) + } + }*/ + + // legacy + //ts, _ := time.Parse(time.RFC3339Nano, e.Data[event.TimestampReceived]) + + case event.PeerAcknowledgement: + // No enrichement required + //Acknowledge(ev.event.Data[event.RemotePeer], ev.event.Data[event.EventID]) + /* + case event.NewMessageFromGroup: //event.TimestampReceived, event.TimestampSent, event.Data, event.GroupID, event.RemotePeer + ts, _ := time.Parse(time.RFC3339Nano, e.Data[event.TimestampSent]) + uiManager.AddMessage(e.Data[event.GroupID], e.Data[event.RemotePeer], e.Data[event.Data], e.Data[event.RemotePeer] == peer.GetOnion(), hex.EncodeToString([]byte(e.Data[event.Signature])), ts, true) + + case event.NewGroupInvite: + gid, err := peer.ProcessInvite(e.Data[event.GroupInvite], e.Data[event.RemotePeer]) + group := peer.GetGroup(gid) + if err == nil && group != nil { + uiManager.AddContact(gid) + } + */ + case event.PeerCreated: + handle := ev.event.Data[event.RemotePeer] + err := EnrichNewPeer(handle, ph, ev) + if err != nil { + return err.Error() + } + /* + case event.SendMessageToGroupError: + uiManager.AddSendMessageError(e.Data[event.GroupServer], e.Data[event.Signature], e.Data[event.Error]) + case event.SendMessageToPeerError: + uiManager.AddSendMessageError(e.Data[event.RemotePeer], e.Data[event.EventID], e.Data[event.Error]) + */ + case event.PeerStateChange: + cxnState := connections.ConnectionStateToType[ev.event.Data[event.ConnectionState]] + contact := peer.GetContact(ev.event.Data[event.RemotePeer]) + + if contact != nil { + contact.State = ev.event.Data[event.ConnectionState] + // No enrichment needed + //uiManager.UpdateContactStatus(contact.Onion, int(cxnState), false) + if cxnState == connections.AUTHENTICATED { + // if known and authed, get vars + peer.SendGetValToPeer(ev.event.Data[event.RemotePeer], attr.PublicScope, constants.Name) + peer.SendGetValToPeer(ev.event.Data[event.RemotePeer], attr.PublicScope, constants.Picture) + } + } else if cxnState == connections.AUTHENTICATED && contact == nil { + // Contact does not exist, change event to NewPeer + peer.AddContact(ev.event.Data[event.RemotePeer], ev.event.Data[event.RemotePeer], model.AuthUnknown) + peer.SendGetValToPeer(ev.event.Data[event.RemotePeer], attr.PublicScope, constants.Name) + peer.SendGetValToPeer(ev.event.Data[event.RemotePeer], attr.PublicScope, constants.Picture) + ev.event.EventType = event.PeerCreated + err := EnrichNewPeer(ev.event.Data[event.RemotePeer], ph, ev) + if err != nil { + return err.Error() + } + } + + /*case event.NewRetValMessageFromPeer: + onion := e.Data[event.RemotePeer] + scope := e.Data[event.Scope] + path := e.Data[event.Path] + val := e.Data[event.Data] + exists, _ := strconv.ParseBool(e.Data[event.Exists]) + + if exists && scope == attr.PublicScope { + switch path { + case constants.Name: + peer.SetContactAttribute(onion, attr.GetPeerScope(constants.Name), val) + uiManager.UpdateContactDisplayName(onion) + case constants.Picture: + peer.SetContactAttribute(onion, attr.GetPeerScope(constants.Picture), val) + uiManager.UpdateContactPicture(onion) + } + } + + case event.ServerStateChange: + serverOnion := e.Data[event.GroupServer] + state := connections.ConnectionStateToType[e.Data[event.ConnectionState]] + groups := peer.GetGroups() + for _, groupID := range groups { + group := peer.GetGroup(groupID) + if group != nil && group.GroupServer == serverOnion { + group.State = e.Data[event.ConnectionState] + loading := false + if state == connections.AUTHENTICATED { + loading = true + } + uiManager.UpdateContactStatus(groupID, int(state), loading) + uiManager.UpdateContactStatus(serverOnion, int(state), loading) + } + } + case event.DeletePeer: + log.Infof("PeerHandler got DeletePeer, SHUTTING down!\n") + uiManager.ReloadProfiles() + return + */ + } + + json, _ := json.Marshal(ev) + return string(json) +} + +func (eh *EventHandler) startHandlingPeer(onion string) { + eventBus := eh.app.GetEventBus(onion) + q := event.NewQueue() + eventBus.Subscribe(event.NewMessageFromPeer, q) + eventBus.Subscribe(event.PeerAcknowledgement, q) + eventBus.Subscribe(event.NewMessageFromGroup, q) + eventBus.Subscribe(event.NewGroupInvite, q) + eventBus.Subscribe(event.SendMessageToGroupError, q) + eventBus.Subscribe(event.SendMessageToPeerError, q) + eventBus.Subscribe(event.ServerStateChange, q) + eventBus.Subscribe(event.PeerStateChange, q) + eventBus.Subscribe(event.PeerCreated, q) + eventBus.Subscribe(event.NetworkStatus, q) + eventBus.Subscribe(event.ChangePasswordSuccess, q) + eventBus.Subscribe(event.ChangePasswordError, q) + eventBus.Subscribe(event.NewRetValMessageFromPeer, q) + + eh.profileQueues[onion] = q + + go eh.forwardProfileMessages(onion, q) + +} + +func (eh *EventHandler) forwardProfileMessages(onion string, q event.Queue) { + for { + e := q.Next() + ev := EventProfileEnvelope{event: *e, profile: onion} + eh.profileEvents <- ev + } +} diff --git a/utils/manager.go b/utils/manager.go index 22ffb4e..3699b0e 100644 --- a/utils/manager.go +++ b/utils/manager.go @@ -4,8 +4,11 @@ import ( "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" + "errors" "git.openprivacy.ca/flutter/libcwtch-go/constants" "git.openprivacy.ca/openprivacy/log" + "strconv" "strings" "time" ) @@ -150,14 +153,6 @@ func GetPicturePath(pic *image) string { } } -func (p *PeerHelper) updateLastReadTime(id string) { - lastRead, _ := time.Now().MarshalText() - if p.IsGroup(id) { - p.peer.SetGroupAttribute(id, attr.GetLocalScope(constants.LastRead), string(lastRead)) - } else { - p.peer.SetContactAttribute(id, attr.GetLocalScope(constants.LastRead), string(lastRead)) - } -} func (p *PeerHelper) CountUnread(messages []model.Message, lastRead time.Time) int { count := 0 @@ -171,6 +166,14 @@ func (p *PeerHelper) CountUnread(messages []model.Message, lastRead time.Time) i return count } +func getLastMessageTime(tl *model.Timeline) int { + if len(tl.Messages) == 0 { + return 0 + } + + return int(tl.Messages[len(tl.Messages)-1].Timestamp.Unix()) +} + /* // AddProfile adds a new profile to the UI func AddProfile(gcd *GrandCentralDispatcher, handle string) { @@ -237,54 +240,48 @@ func NewManager(profile string, gcd *GrandCentralDispatcher) Manager { return &manager{gcd: gcd, profile: profile} } -// Acknowledge acknowledges the given message id in the UI -func (this *manager) Acknowledge(handle, mID string) { - this.gcd.DoIfProfile(this.profile, func() { - this.gcd.DoIfConversation(handle, func() { - this.gcd.PeerAckAlert(mID) - }) - }) -} -func getLastMessageTime(tl *model.Timeline) int { - if len(tl.Messages) == 0 { - return 0 - } +*/ +// EnrichNewPeer populates required data for use by frontend +// uiManager.AddContact(onion) +// (handle string, displayName string, image string, badge int, status int, authorization string, loading bool, lastMsgTime int) +func EnrichNewPeer(handle string, ph *PeerHelper, ev *EventProfileEnvelope) error { + if ph.IsGroup(handle) { + group := ph.peer.GetGroup(handle) + if group != nil { + lastRead := ph.InitLastReadTime(group.GroupID) + ev.event.Data["unread"] = strconv.Itoa(ph.CountUnread(group.Timeline.GetMessages(), lastRead)) + ev.event.Data["picture"] = ph.GetProfilePic(handle) - return int(tl.Messages[len(tl.Messages)-1].Timestamp.Unix()) -} - -// AddContact adds a new contact to the ui for this manager's profile -func (this *manager) AddContact(handle string) { - this.gcd.DoIfProfile(this.profile, func() { - - if IsGroup(handle) { - group := the.Peer.GetGroup(handle) - if group != nil { - lastRead := InitLastReadTime(group.GroupID) - unread := CountUnread(group.Timeline.GetMessages(), lastRead) - picture := GetProfilePic(handle) - - this.gcd.AddContact(handle, GetNick(handle), picture, unread, int(connections.ConnectionStateToType[group.State]), string(model.AuthApproved), false, getLastMessageTime(&group.Timeline)) - } - return - } else if !IsPeer(handle) { - log.Errorf("sorry, unable to handle AddContact(%v)", handle) - debug.PrintStack() - return + ev.event.Data["nick"] = ph.GetNick(handle) + ev.event.Data["status"] = strconv.Itoa(int(connections.ConnectionStateToType[group.State])) + ev.event.Data["authorization"] = string(model.AuthApproved) + ev.event.Data["loading"] = "false" + ev.event.Data["lastMsgTime"] = strconv.Itoa(getLastMessageTime(&group.Timeline)) } - - contact := the.Peer.GetContact(handle) + } else if ph.IsPeer(handle) { + contact := ph.peer.GetContact(handle) if contact != nil { - lastRead := InitLastReadTime(contact.Onion) - unread := CountUnread(contact.Timeline.GetMessages(), lastRead) - picture := GetProfilePic(handle) + lastRead := ph.InitLastReadTime(contact.Onion) + ev.event.Data["unread"] = strconv.Itoa(ph.CountUnread(contact.Timeline.GetMessages(), lastRead)) + ev.event.Data["picture"] = ph.GetProfilePic(handle) - this.gcd.AddContact(handle, GetNick(handle), picture, unread, int(connections.ConnectionStateToType[contact.State]), string(contact.Authorization), false, getLastMessageTime(&contact.Timeline)) + ev.event.Data["nick"] = ph.GetNick(handle) + ev.event.Data["status"] = strconv.Itoa(int(connections.ConnectionStateToType[contact.State])) + + ev.event.Data["authorization"] = string(contact.Authorization) + ev.event.Data["loading"] = "false" + ev.event.Data["lastMsgTime"] = strconv.Itoa(getLastMessageTime(&contact.Timeline)) } - }) + } else { + // could be a server? + log.Errorf("sorry, unable to handle AddContact(%v)", handle) + return errors.New("Not a peer or group") + } + return nil } +/* // AddSendMessageError adds an error not and icon to a message in a conversation in the ui for the message identified by the peer/sig combo func (this *manager) AddSendMessageError(peer string, signature string, err string) { this.gcd.DoIfProfile(this.profile, func() { @@ -303,30 +300,9 @@ func (this *manager) AboutToAddMessage() { func (this *manager) MessageJustAdded() { this.gcd.TimelineInterface.RequestEIR() -} - -func (this *manager) StoreAndNotify(pere peer.CwtchPeer, onion string, messageTxt string, sent time.Time, profileOnion string) { - - // Send a New Message from Peer Notification - this.gcd.AndroidCwtchActivity.SetChannel(onion) - this.gcd.AndroidCwtchActivity.NotificationChanged("New Message from Peer") - - this.gcd.DoIfProfileElse(this.profile, func() { - this.gcd.DoIfConversationElse(onion, func() { - this.gcd.TimelineInterface.AddMessage(this.gcd.TimelineInterface.num()) - pere.StoreMessage(onion, messageTxt, sent) - this.gcd.TimelineInterface.RequestEIR() - updateLastReadTime(onion) - }, func() { - pere.StoreMessage(onion, messageTxt, sent) - }) - this.gcd.IncContactUnreadCount(onion) - }, func() { - the.CwtchApp.GetPeer(profileOnion).StoreMessage(onion, messageTxt, sent) - }) - this.gcd.Notify(onion) -} +}*/ +/* // AddMessage adds a message to the message pane for the supplied conversation if it is active func (this *manager) AddMessage(handle string, from string, message string, fromMe bool, messageID string, timestamp time.Time, Acknowledged bool) { this.gcd.DoIfProfile(this.profile, func() { @@ -365,13 +341,6 @@ func (this *manager) UpdateContactPicture(handle string) { }) } -// UpdateContactStatus updates a contact's status in the ui -func (this *manager) UpdateContactStatus(handle string, status int, loading bool) { - this.gcd.DoIfProfile(this.profile, func() { - this.gcd.UpdateContactStatus(handle, status, loading) - }) -} - // UpdateContactAttribute update's a contacts attribute in the ui func (this *manager) UpdateContactAttribute(handle, key, value string) { this.gcd.DoIfProfile(this.profile, func() {