package utils import ( "cwtch.im/cwtch/app" "cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" "encoding/json" "git.openprivacy.ca/flutter/libcwtch-go/constants" "git.openprivacy.ca/openprivacy/log" ) import "cwtch.im/cwtch/event" type EventProfileEnvelope struct { Event event.Event Profile string } type EventHandler struct { app app.Application appBusQueue event.Queue profileEvents chan EventProfileEnvelope profileQueues map[string]event.Queue } func NewEventHandler(application app.Application) *EventHandler { appBusQueue := event.NewQueue() application.GetPrimaryBus().Subscribe(event.NewPeer, appBusQueue) application.GetPrimaryBus().Subscribe(event.PeerError, appBusQueue) application.GetPrimaryBus().Subscribe(event.AppError, appBusQueue) application.GetPrimaryBus().Subscribe(event.ACNStatus, appBusQueue) application.GetPrimaryBus().Subscribe(event.ReloadDone, appBusQueue) application.GetPrimaryBus().Subscribe(event.ACNVersion, appBusQueue) application.GetPrimaryBus().Subscribe(UpdateGlobalSettings, appBusQueue) return &EventHandler{app: application, appBusQueue: appBusQueue, profileQueues: make(map[string]event.Queue), profileEvents: make(chan EventProfileEnvelope)} } func (eh *EventHandler) GetNextEvent() string { appChan := eh.appBusQueue.OutChan() select { case e := <-appChan: return eh.handleAppBusEvent(&e) case ev := <-eh.profileEvents: return eh.handleProfileEvent(&ev) } } // handleAppBusEvent enriches AppBus events so they are usable with out further data fetches func (eh *EventHandler) handleAppBusEvent(e *event.Event) string { switch e.EventType { case event.NewPeer: onion := e.Data[event.Identity] profile := eh.app.GetPeer(e.Data[event.Identity]) eh.startHandlingPeer(onion) if e.Data[event.Created] == event.True { profile.SetAttribute(attr.GetPublicScope(constants.Name), profile.GetName()) profile.SetAttribute(attr.GetPublicScope(constants.Picture), ImageToString(NewImage(RandomProfileImage(onion), TypeImageDistro))) } if e.Data[event.Status] != event.StorageRunning || e.Data[event.Created] == event.True { profile.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.False) eh.app.AddPeerPlugin(onion, plugins.CONNECTIONRETRY) eh.app.AddPeerPlugin(onion, plugins.NETWORKCHECK) } nick, exists := profile.GetAttribute(attr.GetPublicScope(constants.Name)) if !exists { nick = onion } picVal, ok := profile.GetAttribute(attr.GetPublicScope(constants.Picture)) if !ok { picVal = ImageToString(NewImage(RandomProfileImage(onion), TypeImageDistro)) } pic, err := StringToImage(picVal) if err != nil { pic = NewImage(RandomProfileImage(onion), TypeImageDistro) } picPath := GetPicturePath(pic) //tag, _ := profile.GetAttribute(app.AttributeTag) online, _ := profile.GetAttribute(attr.GetLocalScope(constants.PeerOnline)) e.Data[constants.Name] = nick e.Data[constants.Picture] = picPath e.Data["Online"] = online var contacts []Contact for _, contact := range profile.GetContacts() { if profile.GetContact(contact).IsServer() { continue } cpicVal, ok := profile.GetContactAttribute(contact, attr.GetPeerScope(constants.Picture)) if !ok { cpicVal = ImageToString(NewImage(RandomProfileImage(contact), TypeImageDistro)) } cpic, err := StringToImage(cpicVal) if err != nil { cpic = NewImage(RandomProfileImage(contact), TypeImageDistro) } cpicPath := GetPicturePath(cpic) contactInfo := profile.GetContact(contact) contacts = append(contacts, Contact{Name: contactInfo.Name, Onion: contactInfo.Onion, Status: contactInfo.State, Picture: cpicPath,}) } bytes, _ := json.Marshal(contacts) e.Data["ContactsJson"] = string(bytes) log.Infof("contactsJson %v", e.Data["ContactsJson"]) } json, _ := json.Marshal(e) return string(json) } // handleProfileEvent enriches Profile events so they are usable with out further data fetches func (eh *EventHandler) handleProfileEvent(ev *EventProfileEnvelope) string { peer := eh.app.GetPeer(ev.Profile) ph := NewPeerHelper(peer) switch ev.Event.EventType { /*case event.NetworkStatus: online, _ := peer.GetAttribute(attr.GetLocalScope(constants.PeerOnline)) if e.Data[event.Status] == plugins.NetworkCheckSuccess && online == event.False { peer.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.True) uiManager.UpdateNetworkStatus(true) // TODO we may have to reinitialize the peer } else if e.Data[event.Status] == plugins.NetworkCheckError && online == event.True { peer.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.False) uiManager.UpdateNetworkStatus(false) }*/ case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data // I Don't think we need to enrich // TODO: deprecate and move to dart /*func (p *PeerHelper) updateLastReadTime(id string) { lastRead, _ := time.Now().MarshalText() if p.IsGroup(id) { p.peer.SetGroupAttribute(id, attr.GetLocalScope(constants.LastRead), string(lastRead)) } else { p.peer.SetContactAttribute(id, attr.GetLocalScope(constants.LastRead), string(lastRead)) } }*/ // legacy //ts, _ := time.Parse(time.RFC3339Nano, e.Data[event.TimestampReceived]) case event.PeerAcknowledgement: // No enrichement required //Acknowledge(ev.Event.Data[event.RemotePeer], ev.Event.Data[event.EventID]) /* case event.NewMessageFromGroup: //event.TimestampReceived, event.TimestampSent, event.Data, event.GroupID, event.RemotePeer ts, _ := time.Parse(time.RFC3339Nano, e.Data[event.TimestampSent]) uiManager.AddMessage(e.Data[event.GroupID], e.Data[event.RemotePeer], e.Data[event.Data], e.Data[event.RemotePeer] == peer.GetOnion(), hex.EncodeToString([]byte(e.Data[event.Signature])), ts, true) case event.NewGroupInvite: gid, err := peer.ProcessInvite(e.Data[event.GroupInvite], e.Data[event.RemotePeer]) group := peer.GetGroup(gid) if err == nil && group != nil { uiManager.AddContact(gid) } */ case event.PeerCreated: handle := ev.Event.Data[event.RemotePeer] err := EnrichNewPeer(handle, ph, ev) if err != nil { return "" } /* case event.SendMessageToGroupError: uiManager.AddSendMessageError(e.Data[event.GroupServer], e.Data[event.Signature], e.Data[event.Error]) case event.SendMessageToPeerError: uiManager.AddSendMessageError(e.Data[event.RemotePeer], e.Data[event.EventID], e.Data[event.Error]) */ case event.PeerStateChange: cxnState := connections.ConnectionStateToType[ev.Event.Data[event.ConnectionState]] contact := peer.GetContact(ev.Event.Data[event.RemotePeer]) if cxnState == connections.AUTHENTICATED && contact == nil { // Contact does not exist, change event to NewPeer peer.AddContact(ev.Event.Data[event.RemotePeer], ev.Event.Data[event.RemotePeer], model.AuthUnknown) contact = peer.GetContact(ev.Event.Data[event.RemotePeer]) ev.Event.EventType = event.PeerCreated err := EnrichNewPeer(ev.Event.Data[event.RemotePeer], ph, ev) if err != nil { return "" } } if contact != nil { // No enrichment needed //uiManager.UpdateContactStatus(contact.Onion, int(cxnState), false) if cxnState == connections.AUTHENTICATED { // if known and authed, get vars peer.SendGetValToPeer(ev.Event.Data[event.RemotePeer], attr.PublicScope, constants.Name) peer.SendGetValToPeer(ev.Event.Data[event.RemotePeer], attr.PublicScope, constants.Picture) } } /*case event.NewRetValMessageFromPeer: onion := e.Data[event.RemotePeer] scope := e.Data[event.Scope] path := e.Data[event.Path] val := e.Data[event.Data] exists, _ := strconv.ParseBool(e.Data[event.Exists]) if exists && scope == attr.PublicScope { switch path { case constants.Name: peer.SetContactAttribute(onion, attr.GetPeerScope(constants.Name), val) uiManager.UpdateContactDisplayName(onion) case constants.Picture: peer.SetContactAttribute(onion, attr.GetPeerScope(constants.Picture), val) uiManager.UpdateContactPicture(onion) } } case event.ServerStateChange: serverOnion := e.Data[event.GroupServer] state := connections.ConnectionStateToType[e.Data[event.ConnectionState]] groups := peer.GetGroups() for _, groupID := range groups { group := peer.GetGroup(groupID) if group != nil && group.GroupServer == serverOnion { group.State = e.Data[event.ConnectionState] loading := false if state == connections.AUTHENTICATED { loading = true } uiManager.UpdateContactStatus(groupID, int(state), loading) uiManager.UpdateContactStatus(serverOnion, int(state), loading) } } case event.DeletePeer: log.Infof("PeerHandler got DeletePeer, SHUTTING down!\n") uiManager.ReloadProfiles() return */ } json, _ := json.Marshal(ev) return string(json) } func (eh *EventHandler) startHandlingPeer(onion string) { eventBus := eh.app.GetEventBus(onion) q := event.NewQueue() eventBus.Subscribe(event.NewMessageFromPeer, q) eventBus.Subscribe(event.PeerAcknowledgement, q) eventBus.Subscribe(event.NewMessageFromGroup, q) eventBus.Subscribe(event.NewGroupInvite, q) eventBus.Subscribe(event.SendMessageToGroupError, q) eventBus.Subscribe(event.SendMessageToPeerError, q) eventBus.Subscribe(event.ServerStateChange, q) eventBus.Subscribe(event.PeerStateChange, q) eventBus.Subscribe(event.PeerCreated, q) eventBus.Subscribe(event.NetworkStatus, q) eventBus.Subscribe(event.ChangePasswordSuccess, q) eventBus.Subscribe(event.ChangePasswordError, q) eventBus.Subscribe(event.NewRetValMessageFromPeer, q) eh.profileQueues[onion] = q go eh.forwardProfileMessages(onion, q) } func (eh *EventHandler) forwardProfileMessages(onion string, q event.Queue) { // TODO: graceful shutdown, via an injected event of special QUIT type exiting loop/go routine for { e := q.Next() ev := EventProfileEnvelope{Event: *e, Profile: onion} eh.profileEvents <- ev } }