package utils import ( "cwtch.im/cwtch/app" "cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" groups2 "cwtch.im/cwtch/protocol/groups" "encoding/json" "git.openprivacy.ca/flutter/libcwtch-go/constants" "git.openprivacy.ca/flutter/libcwtch-go/features/groups" "git.openprivacy.ca/openprivacy/log" "strconv" ) import "cwtch.im/cwtch/event" type EventProfileEnvelope struct { Event event.Event Profile string } type EventHandler struct { app app.Application appBusQueue event.Queue profileEvents chan EventProfileEnvelope profileQueues map[string]event.Queue } func NewEventHandler(application app.Application) *EventHandler { appBusQueue := event.NewQueue() application.GetPrimaryBus().Subscribe(event.NewPeer, appBusQueue) application.GetPrimaryBus().Subscribe(event.PeerError, appBusQueue) application.GetPrimaryBus().Subscribe(event.AppError, appBusQueue) application.GetPrimaryBus().Subscribe(event.ACNStatus, appBusQueue) application.GetPrimaryBus().Subscribe(event.ReloadDone, appBusQueue) application.GetPrimaryBus().Subscribe(event.ACNVersion, appBusQueue) application.GetPrimaryBus().Subscribe(UpdateGlobalSettings, appBusQueue) return &EventHandler{app: application, appBusQueue: appBusQueue, profileQueues: make(map[string]event.Queue), profileEvents: make(chan EventProfileEnvelope)} } func (eh *EventHandler) GetNextEvent() string { appChan := eh.appBusQueue.OutChan() select { case e := <-appChan: return eh.handleAppBusEvent(&e) case ev := <-eh.profileEvents: return eh.handleProfileEvent(&ev) } } // handleAppBusEvent enriches AppBus events so they are usable with out further data fetches func (eh *EventHandler) handleAppBusEvent(e *event.Event) string { log.Debugf("New AppBus Event to Handle: %v", e) switch e.EventType { case event.ACNStatus: if e.Data[event.Progress] == "100" { for onion := range eh.app.ListPeers() { // launch a listen thread (internally this does a check that the protocol engine is not listening) // and as such is safe to call. eh.app.GetPeer(onion).Listen() } } case event.NewPeer: onion := e.Data[event.Identity] profile := eh.app.GetPeer(e.Data[event.Identity]) log.Debug("New Peer Event: %v", e) eh.startHandlingPeer(onion) if e.Data[event.Created] == event.True { profile.SetAttribute(attr.GetPublicScope(constants.Name), profile.GetName()) profile.SetAttribute(attr.GetPublicScope(constants.Picture), ImageToString(NewImage(RandomProfileImage(onion), TypeImageDistro))) } if e.Data[event.Status] != event.StorageRunning || e.Data[event.Created] == event.True { profile.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.False) eh.app.AddPeerPlugin(onion, plugins.CONNECTIONRETRY) eh.app.AddPeerPlugin(onion, plugins.NETWORKCHECK) // If the user has chosen to block unknown profiles // then explicitly configure the protocol engine to do so.. if ReadGlobalSettings().BlockUnknownConnections { profile.BlockUnknownConnections() } else { // For completeness profile.AllowUnknownConnections() } // Start up the Profile profile.Listen() profile.StartPeersConnections() if _, err := groups.ExperimentGate(ReadGlobalSettings().Experiments); err == nil { profile.StartServerConnections() } } nick, exists := profile.GetAttribute(attr.GetPublicScope(constants.Name)) if !exists { nick = onion } picVal, ok := profile.GetAttribute(attr.GetPublicScope(constants.Picture)) if !ok { picVal = ImageToString(NewImage(RandomProfileImage(onion), TypeImageDistro)) } pic, err := StringToImage(picVal) if err != nil { pic = NewImage(RandomProfileImage(onion), TypeImageDistro) } picPath := GetPicturePath(pic) //tag, _ := profile.GetAttribute(app.AttributeTag) online, _ := profile.GetAttribute(attr.GetLocalScope(constants.PeerOnline)) e.Data[constants.Name] = nick e.Data[constants.Picture] = picPath e.Data["Online"] = online var contacts []Contact var servers []groups.Server for _, contact := range profile.GetContacts() { // Only compile the server info if we have enabled the experiment... // Note that this means that this info can become stale if when first loaded the experiment // has been disabled and then is later re-enabled. As such we need to ensure that this list is // re-fetched when the group experiment is enabled via a dedicated ListServerInfo event... if profile.GetContact(contact).IsServer() { groupHandler, err := groups.ExperimentGate(ReadGlobalSettings().Experiments) if err == nil { servers = append(servers, groupHandler.GetServerInfo(contact, profile)) } continue } contactInfo := profile.GetContact(contact) ph := NewPeerHelper(profile) name := ph.GetNick(contact) cpicPath := ph.GetProfilePic(contact) saveHistory, set := contactInfo.GetAttribute(event.SaveHistoryKey) if !set { saveHistory = event.DeleteHistoryDefault } contacts = append(contacts, Contact{ Name: name, Onion: contactInfo.Onion, Status: contactInfo.State, Picture: cpicPath, Authorization: string(contactInfo.Authorization), SaveHistory: saveHistory, Messages: contactInfo.Timeline.Len(), Unread: 0, LastMessage: strconv.Itoa(getLastMessageTime(&contactInfo.Timeline)), IsGroup: false, }) } // We compile and send the groups regardless of the experiment flag, and hide them in the UI for _, groupId := range profile.GetGroups() { group := profile.GetGroup(groupId) ph := NewPeerHelper(profile) cpicPath := ph.GetProfilePic(groupId) // todo hack for now, auto accept groups profile.AcceptInvite(groupId) authorization := model.AuthUnknown if group.Accepted { authorization = model.AuthApproved } contacts = append(contacts, Contact{ Name: ph.GetNick(groupId), Onion: group.GroupID, Status: group.State, Picture: cpicPath, Authorization: string(authorization), SaveHistory: event.SaveHistoryConfirmed, Messages: group.Timeline.Len(), Unread: 0, LastMessage: strconv.Itoa(getLastMessageTime(&group.Timeline)), IsGroup: true, GroupServer: group.GroupServer, }) } bytes, _ := json.Marshal(contacts) e.Data["ContactsJson"] = string(bytes) // Marshal the server list into the new peer event... serversListBytes, _ := json.Marshal(servers) e.Data[groups.ServerList] = string(serversListBytes) log.Infof("contactsJson %v", e.Data["ContactsJson"]) } json, _ := json.Marshal(e) return string(json) } // handleProfileEvent enriches Profile events so they are usable with out further data fetches func (eh *EventHandler) handleProfileEvent(ev *EventProfileEnvelope) string { peer := eh.app.GetPeer(ev.Profile) ph := NewPeerHelper(peer) log.Debugf("New Profile Event to Handle: %v", ev) switch ev.Event.EventType { /*case event.NetworkStatus: online, _ := peer.GetAttribute(attr.GetLocalScope(constants.PeerOnline)) if e.Data[event.Status] == plugins.NetworkCheckSuccess && online == event.False { peer.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.True) uiManager.UpdateNetworkStatus(true) // TODO we may have to reinitialize the peer } else if e.Data[event.Status] == plugins.NetworkCheckError && online == event.True { peer.SetAttribute(attr.GetLocalScope(constants.PeerOnline), event.False) uiManager.UpdateNetworkStatus(false) }*/ case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data // I Don't think we need to enrich // TODO: deprecate and move to dart /*func (p *PeerHelper) updateLastReadTime(id string) { lastRead, _ := time.Now().MarshalText() if p.IsGroup(id) { p.peer.SetGroupAttribute(id, attr.GetLocalScope(constants.LastRead), string(lastRead)) } else { p.peer.SetContactAttribute(id, attr.GetLocalScope(constants.LastRead), string(lastRead)) } }*/ // legacy //ts, _ := time.Parse(time.RFC3339Nano, e.Data[event.TimestampReceived]) case event.PeerAcknowledgement: // No enrichement required //Acknowledge(ev.Event.Data[event.RemotePeer], ev.Event.Data[event.EventID]) /* case event.NewMessageFromGroup: //event.TimestampReceived, event.TimestampSent, event.Data, event.GroupID, event.RemotePeer ts, _ := time.Parse(time.RFC3339Nano, e.Data[event.TimestampSent]) uiManager.AddMessage(e.Data[event.GroupID], e.Data[event.RemotePeer], e.Data[event.Data], e.Data[event.RemotePeer] == peer.GetOnion(), hex.EncodeToString([]byte(e.Data[event.Signature])), ts, true) case event.NewGroupInvite: gid, err := peer.ProcessInvite(e.Data[event.GroupInvite], e.Data[event.RemotePeer]) group := peer.GetGroup(gid) if err == nil && group != nil { uiManager.AddContact(gid) } */ case event.PeerCreated: handle := ev.Event.Data[event.RemotePeer] err := EnrichNewPeer(handle, ph, ev) if err != nil { return "" } /* case event.SendMessageToGroupError: uiManager.AddSendMessageError(e.Data[event.GroupServer], e.Data[event.Signature], e.Data[event.Error]) case event.SendMessageToPeerError: uiManager.AddSendMessageError(e.Data[event.RemotePeer], e.Data[event.EventID], e.Data[event.Error]) */ case event.NewGroupInvite: var invite = groups2.GroupInvite{} json.Unmarshal([]byte(ev.Event.Data[event.GroupInvite]), &invite) groupPic := ph.GetProfilePic(invite.GroupID) ev.Event.Data["PicturePath"] = groupPic case event.PeerStateChange: cxnState := connections.ConnectionStateToType()[ev.Event.Data[event.ConnectionState]] contact := peer.GetContact(ev.Event.Data[event.RemotePeer]) if cxnState == connections.AUTHENTICATED && contact == nil { peer.AddContact(ev.Event.Data[event.RemotePeer], ev.Event.Data[event.RemotePeer], model.AuthUnknown) return "" } if contact != nil { // No enrichment needed //uiManager.UpdateContactStatus(contact.Onion, int(cxnState), false) if cxnState == connections.AUTHENTICATED { // if known and authed, get vars peer.SendGetValToPeer(ev.Event.Data[event.RemotePeer], attr.PublicScope, constants.Name) peer.SendGetValToPeer(ev.Event.Data[event.RemotePeer], attr.PublicScope, constants.Picture) } } /*case event.NewRetValMessageFromPeer: onion := e.Data[event.RemotePeer] scope := e.Data[event.Scope] path := e.Data[event.Path] val := e.Data[event.Data] exists, _ := strconv.ParseBool(e.Data[event.Exists]) if exists && scope == attr.PublicScope { switch path { case constants.Name: peer.SetContactAttribute(onion, attr.GetPeerScope(constants.Name), val) uiManager.UpdateContactDisplayName(onion) case constants.Picture: peer.SetContactAttribute(onion, attr.GetPeerScope(constants.Picture), val) uiManager.UpdateContactPicture(onion) } } case event.ServerStateChange: serverOnion := e.Data[event.GroupServer] state := connections.ConnectionStateToType[e.Data[event.ConnectionState]] groups := peer.GetGroups() for _, groupID := range groups { group := peer.GetGroup(groupID) if group != nil && group.GroupServer == serverOnion { group.State = e.Data[event.ConnectionState] loading := false if state == connections.AUTHENTICATED { loading = true } uiManager.UpdateContactStatus(groupID, int(state), loading) uiManager.UpdateContactStatus(serverOnion, int(state), loading) } } case event.DeletePeer: log.Infof("PeerHandler got DeletePeer, SHUTTING down!\n") uiManager.ReloadProfiles() return */ } json, _ := json.Marshal(unwrap(ev)) return string(json) } func unwrap(original *EventProfileEnvelope) *event.Event { unwrapped := &original.Event unwrapped.Data["ProfileOnion"] = original.Profile return unwrapped } func (eh *EventHandler) startHandlingPeer(onion string) { eventBus := eh.app.GetEventBus(onion) q := event.NewQueue() eventBus.Subscribe(event.NewMessageFromPeer, q) eventBus.Subscribe(event.PeerAcknowledgement, q) eventBus.Subscribe(event.NewMessageFromGroup, q) eventBus.Subscribe(event.NewGroupInvite, q) eventBus.Subscribe(event.SendMessageToGroupError, q) eventBus.Subscribe(event.SendMessageToPeerError, q) eventBus.Subscribe(event.ServerStateChange, q) eventBus.Subscribe(event.PeerStateChange, q) eventBus.Subscribe(event.PeerCreated, q) eventBus.Subscribe(event.NetworkStatus, q) eventBus.Subscribe(event.ChangePasswordSuccess, q) eventBus.Subscribe(event.ChangePasswordError, q) eventBus.Subscribe(event.NewRetValMessageFromPeer, q) eventBus.Subscribe(event.SetAttribute, q) eh.profileQueues[onion] = q go eh.forwardProfileMessages(onion, q) } func (eh *EventHandler) forwardProfileMessages(onion string, q event.Queue) { // TODO: graceful shutdown, via an injected event of special QUIT type exiting loop/go routine for { e := q.Next() ev := EventProfileEnvelope{Event: e, Profile: onion} eh.profileEvents <- ev } } func (eh *EventHandler) Push(newEvent event.Event) { eh.appBusQueue.Publish(newEvent) }