cwtch/peer/cwtch_peer.go

638 lines
22 KiB
Go
Raw Normal View History

package peer
2018-03-09 20:44:13 +00:00
import (
2019-01-04 21:44:21 +00:00
"cwtch.im/cwtch/event"
2018-05-28 18:05:06 +00:00
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
2019-01-04 21:44:21 +00:00
"cwtch.im/cwtch/protocol/connections"
"encoding/base32"
"encoding/base64"
2019-02-03 01:18:33 +00:00
"encoding/json"
2018-03-30 21:16:51 +00:00
"errors"
"git.openprivacy.ca/openprivacy/log"
"strconv"
"strings"
2018-03-09 20:44:13 +00:00
"sync"
2019-01-22 19:11:25 +00:00
"time"
2018-03-09 20:44:13 +00:00
)
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true,
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true}
2018-10-06 03:50:55 +00:00
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
type cwtchPeer struct {
2019-01-04 21:44:21 +00:00
Profile *model.Profile
mutex sync.Mutex
shutdown bool
queue event.Queue
eventBus event.Manager
2018-03-09 20:44:13 +00:00
}
// CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
// directly implement a cwtchPeer.
type CwtchPeer interface {
Init(event.Manager)
AutoHandleEvents(events []event.Type)
2019-07-17 19:10:52 +00:00
PeerWithOnion(string)
InviteOnionToGroup(string, string) error
SendMessageToPeer(string, string) string
SendGetValToPeer(string, string, string)
SetContactAuthorization(string, model.Authorization) error
ProcessInvite(string, string) (string, error)
AcceptInvite(string) error
RejectInvite(string)
DeleteContact(string)
DeleteGroup(string)
AddServer(string) error
JoinServer(string)
SendMessageToGroup(string, string) error
SendMessageToGroupTracked(string, string) (string, error)
GetName() string
SetName(string)
GetOnion() string
GetPeerState(string) (connections.ConnectionState, bool)
StartGroup(string) (string, []byte, error)
ImportGroup(string) error
ExportGroup(string) (string, error)
GetGroup(string) *model.Group
GetGroupState(string) (connections.ConnectionState, bool)
GetGroups() []string
AddContact(nick, onion string, authorization model.Authorization)
GetContacts() []string
GetContact(string) *model.PublicProfile
SetAttribute(string, string)
GetAttribute(string) (string, bool)
SetContactAttribute(string, string, string)
GetContactAttribute(string, string) (string, bool)
SetGroupAttribute(string, string, string)
GetGroupAttribute(string, string) (string, bool)
Listen()
StartPeersConnections()
StartGroupConnections()
Shutdown()
}
// NewCwtchPeer creates and returns a new cwtchPeer with the given name.
2018-10-06 03:50:55 +00:00
func NewCwtchPeer(name string) CwtchPeer {
cp := new(cwtchPeer)
cp.Profile = model.GenerateNewProfile(name)
cp.shutdown = false
2018-10-06 03:50:55 +00:00
return cp
2018-10-15 00:59:53 +00:00
}
2018-10-06 03:50:55 +00:00
// FromProfile generates a new peer from a profile.
func FromProfile(profile *model.Profile) CwtchPeer {
cp := new(cwtchPeer)
cp.Profile = profile
cp.shutdown = false
2018-10-06 03:50:55 +00:00
return cp
2018-03-09 20:44:13 +00:00
}
2018-10-06 03:50:55 +00:00
// Init instantiates a cwtchPeer
func (cp *cwtchPeer) Init(eventBus event.Manager) {
cp.queue = event.NewQueue()
2019-01-04 21:44:21 +00:00
go cp.eventHandler()
cp.eventBus = eventBus
cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement,
event.PeerError, event.SendMessageToGroupError, event.NewGetValMessageFromPeer})
}
// AutoHandleEvents sets an event (if able) to be handled by this peer
func (cp *cwtchPeer) AutoHandleEvents(events []event.Type) {
for _, ev := range events {
if _, exists := autoHandleableEvents[ev]; exists {
cp.eventBus.Subscribe(ev, cp.queue)
} else {
log.Errorf("Peer asked to autohandle event it cannot: %v\n", ev)
}
}
2018-03-09 20:44:13 +00:00
}
// ImportGroup intializes a group from an imported source rather than a peer invite
func (cp *cwtchPeer) ImportGroup(exportedInvite string) (err error) {
2018-10-05 03:18:34 +00:00
if strings.HasPrefix(exportedInvite, "torv3") {
data, err := base64.StdEncoding.DecodeString(exportedInvite[5:])
if err == nil {
cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{
event.GroupInvite: string(data),
}))
} else {
log.Errorf("error decoding group invite: %v", err)
}
return nil
}
return errors.New("unsupported exported group type")
}
// ExportGroup serializes a group invite so it can be given offline
func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
group := cp.Profile.GetGroup(groupID)
if group != nil {
2018-10-03 05:46:00 +00:00
invite, err := group.Invite(group.GetInitialMessage())
if err == nil {
exportedInvite := "torv3" + base64.StdEncoding.EncodeToString(invite)
return exportedInvite, err
}
}
return "", errors.New("group id could not be found")
}
// StartGroup create a new group linked to the given server and returns the group ID, an invite or an error.
func (cp *cwtchPeer) StartGroup(server string) (string, []byte, error) {
2019-02-03 01:18:33 +00:00
return cp.StartGroupWithMessage(server, []byte{})
}
// StartGroupWithMessage create a new group linked to the given server and returns the group ID, an invite or an error.
2019-02-03 01:18:33 +00:00
func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte) (groupID string, invite []byte, err error) {
cp.mutex.Lock()
2019-02-03 01:18:33 +00:00
groupID, invite, err = cp.Profile.StartGroupWithMessage(server, initialMessage)
cp.mutex.Unlock()
2019-02-03 01:18:33 +00:00
if err == nil {
group := cp.GetGroup(groupID)
jsobj, err := json.Marshal(group)
if err == nil {
2019-02-03 01:18:33 +00:00
cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
event.Data: string(jsobj),
}))
}
} else {
log.Errorf("error creating group: %v", err)
}
return
}
// GetGroups returns an unordered list of all group IDs.
func (cp *cwtchPeer) GetGroups() []string {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.GetGroups()
}
// GetGroup returns a pointer to a specific group, nil if no group exists.
func (cp *cwtchPeer) GetGroup(groupID string) *model.Group {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.GetGroup(groupID)
}
func (cp *cwtchPeer) AddContact(nick, onion string, authorization model.Authorization) {
decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion))
pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: decodedPub, Authorization: authorization, Onion: onion, Attributes: map[string]string{"nick": nick}}
2019-02-03 01:18:33 +00:00
cp.Profile.AddContact(onion, pp)
2019-02-04 19:39:58 +00:00
pd, _ := json.Marshal(pp)
2019-02-03 01:18:33 +00:00
cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{
2019-02-04 19:39:58 +00:00
event.Data: string(pd),
2019-02-03 01:18:33 +00:00
event.RemotePeer: onion,
}))
cp.eventBus.Publish(event.NewEventList(event.SetPeerAuthorization, event.RemotePeer, onion, event.Authorization, string(authorization)))
2020-07-08 18:29:33 +00:00
// Default to Deleting Peer History
cp.eventBus.Publish(event.NewEventList(event.SetPeerAttribute, event.RemotePeer, onion, event.SaveHistoryKey, event.DeleteHistoryDefault))
2019-02-03 01:18:33 +00:00
}
2020-07-23 21:40:44 +00:00
// AddServer takes in a serialized server specification (a bundle of related keys) and adds a contact for the
// server assuming there are no errors and the contact doesn't already exist.
// TODO in the future this function should also integrate with a trust provider to validate the key bundle.
func (cp *cwtchPeer) AddServer(serverSpecification string) error {
2020-07-14 00:46:05 +00:00
keyBundle := new(model.KeyBundle)
err := json.Unmarshal([]byte(serverSpecification), &keyBundle)
2020-07-14 00:46:05 +00:00
log.Debugf("Got new key bundle %v", keyBundle)
if keyBundle.HasKeyType(model.KeyTypeServerOnion) {
onionKey, _ := keyBundle.GetKey(model.KeyTypeServerOnion)
2020-07-14 00:46:05 +00:00
onion := string(onionKey)
if cp.GetContact(onion) == nil {
decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion))
ab := keyBundle.AttributeBundle()
pp := &model.PublicProfile{Name: onion, Ed25519PublicKey: decodedPub, Authorization: model.AuthUnknown, Onion: onion, Attributes: ab}
cp.Profile.AddContact(onion, pp)
pd, _ := json.Marshal(pp)
cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{
event.Data: string(pd),
event.RemotePeer: onion,
}))
// Publish every key as an attribute
for k, v := range ab {
log.Debugf("Server (%v) has %v key %v", onion, k, v)
cp.eventBus.Publish(event.NewEventList(event.SetPeerAttribute, event.RemotePeer, onion, k, v))
}
// Default to Deleting Peer History
cp.eventBus.Publish(event.NewEventList(event.SetPeerAttribute, event.RemotePeer, onion, event.SaveHistoryKey, event.DeleteHistoryDefault))
return nil
}
// Server Already Exists
return errors.New("a contact already exists with the given onion address")
2020-07-14 00:46:05 +00:00
}
return err
2020-07-14 00:46:05 +00:00
}
// GetContacts returns an unordered list of onions
func (cp *cwtchPeer) GetContacts() []string {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.GetContacts()
}
// GetContact returns a given contact, nil is no such contact exists
func (cp *cwtchPeer) GetContact(onion string) *model.PublicProfile {
cp.mutex.Lock()
defer cp.mutex.Unlock()
contact, _ := cp.Profile.GetContact(onion)
return contact
}
func (cp *cwtchPeer) GetName() string {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.Name
}
func (cp *cwtchPeer) SetName(newName string) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
cp.Profile.Name = newName
}
func (cp *cwtchPeer) GetOnion() string {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.Onion
}
func (cp *cwtchPeer) GetPeerState(onion string) (connections.ConnectionState, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if peer, ok := cp.Profile.Contacts[onion]; ok {
return connections.ConnectionStateToType[peer.State], true
}
return connections.DISCONNECTED, false
}
func (cp *cwtchPeer) GetGroupState(groupid string) (connections.ConnectionState, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if group, ok := cp.Profile.Groups[groupid]; ok {
return connections.ConnectionStateToType[group.State], true
}
return connections.DISCONNECTED, false
}
// PeerWithOnion is the entry point for cwtchPeer relationships
2019-07-17 19:10:52 +00:00
func (cp *cwtchPeer) PeerWithOnion(onion string) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
2019-11-05 22:15:56 +00:00
if _, exists := cp.Profile.GetContact(onion); !exists {
cp.AddContact(onion, onion, model.AuthApproved)
2019-11-05 22:15:56 +00:00
}
2019-01-21 20:08:03 +00:00
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
2018-03-09 20:44:13 +00:00
}
// DeleteContact deletes a peer from the profile, storage, and handling
func (cp *cwtchPeer) DeleteContact(onion string) {
cp.mutex.Lock()
cp.Profile.DeleteContact(onion)
defer cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEventList(event.DeleteContact, event.RemotePeer, onion))
}
// DeleteGroup deletes a Group from the profile, storage, and handling
func (cp *cwtchPeer) DeleteGroup(groupID string) {
cp.mutex.Lock()
cp.Profile.DeleteGroup(groupID)
defer cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEventList(event.DeleteGroup, event.GroupID, groupID))
}
2018-03-09 20:44:13 +00:00
// InviteOnionToGroup kicks off the invite process
func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
cp.mutex.Lock()
group := cp.Profile.GetGroup(groupid)
defer cp.mutex.Unlock()
2019-01-04 21:44:21 +00:00
if group == nil {
return errors.New("invalid group id")
}
2018-03-09 20:44:13 +00:00
2019-01-04 21:44:21 +00:00
invite, err := group.Invite(group.InitialMessage)
if err == nil {
2019-01-21 20:08:03 +00:00
cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[event.Field]string{event.RemotePeer: onion, event.GroupInvite: string(invite)}))
2019-01-04 21:44:21 +00:00
}
return err
2018-03-30 21:16:51 +00:00
}
// JoinServer manages a new server connection with the given onion address
func (cp *cwtchPeer) JoinServer(onion string) {
2020-07-14 00:46:05 +00:00
if cp.GetContact(onion) != nil {
tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass))
tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion))
2020-07-14 00:46:05 +00:00
if yExists && onionExists {
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion}))
}
}
// TODO HANDLE ERROR
2018-03-09 20:44:13 +00:00
}
2019-01-28 20:09:25 +00:00
// SendMessageToGroup attempts to sent the given message to the given group id.
// TODO: Deprecate in favour of SendMessageToGroupTracked
func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error {
_, err := cp.SendMessageToGroupTracked(groupid, message)
return err
}
// SendMessageToGroup attempts to sent the given message to the given group id.
// It returns the signature of the message which can be used to identify it in any UX layer.
func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) {
cp.mutex.Lock()
group := cp.Profile.GetGroup(groupid)
defer cp.mutex.Unlock()
2018-05-03 04:12:45 +00:00
if group == nil {
return "", errors.New("invalid group id")
}
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
2019-01-04 21:44:21 +00:00
if err == nil {
2019-01-21 20:08:03 +00:00
cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: string(ct), event.Signature: string(sig)}))
2018-03-30 21:16:51 +00:00
}
2019-01-04 21:44:21 +00:00
return string(sig), err
2018-03-09 20:44:13 +00:00
}
func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
2019-01-21 20:08:03 +00:00
event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message})
cp.eventBus.Publish(event)
cp.mutex.Lock()
cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID)
cp.mutex.Unlock()
return event.EventID
2019-01-04 21:44:21 +00:00
}
func (cp *cwtchPeer) SendGetValToPeer(onion string, scope string, path string) {
event := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, onion, event.Scope, scope, event.Path, path)
cp.eventBus.Publish(event)
}
// BlockPeer blocks an existing peer relationship.
func (cp *cwtchPeer) SetContactAuthorization(peer string, authorization model.Authorization) error {
cp.mutex.Lock()
err := cp.Profile.SetContactAuthorization(peer, authorization)
cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.RemotePeer: peer, event.Authorization: string(authorization)}))
2019-08-07 18:49:44 +00:00
return err
}
// ProcessInvite adds a new group invite to the profile. returns the new group ID
func (cp *cwtchPeer) ProcessInvite(invite string, remotePeer string) (string, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.ProcessInvite(invite, remotePeer)
}
// AcceptInvite accepts a given existing group invite
func (cp *cwtchPeer) AcceptInvite(groupID string) error {
cp.mutex.Lock()
err := cp.Profile.AcceptInvite(groupID)
cp.mutex.Unlock()
if err != nil {
return err
}
2019-02-14 01:57:42 +00:00
cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID}))
cp.JoinServer(cp.Profile.Groups[groupID].GroupServer)
return nil
2018-05-03 04:12:45 +00:00
}
// RejectInvite rejects a given group invite.
func (cp *cwtchPeer) RejectInvite(groupID string) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
cp.Profile.RejectInvite(groupID)
}
// Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
func (cp *cwtchPeer) Listen() {
log.Debugf("cwtchPeer Listen sending ProtocolEngineStartListen\n")
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: cp.Profile.Onion}))
2018-03-09 20:44:13 +00:00
}
// StartGroupConnections attempts to connect to all group servers (thus initiating reconnect attempts in the conectionsmanager)
func (cp *cwtchPeer) StartPeersConnections() {
for _, contact := range cp.GetContacts() {
cp.PeerWithOnion(contact)
}
}
// StartPeerConnections attempts to connect to all peers (thus initiating reconnect attempts in the conectionsmanager)
func (cp *cwtchPeer) StartGroupConnections() {
joinedServers := map[string]bool{}
for _, groupID := range cp.GetGroups() {
// Only send a join server packet if we haven't joined this server yet...
group := cp.GetGroup(groupID)
cp.mutex.Lock()
if joined := joinedServers[groupID]; group.Accepted && !joined {
log.Infof("Join Server %v (%v)\n", group.GroupServer, joined)
cp.JoinServer(group.GroupServer)
joinedServers[group.GroupServer] = true
}
cp.mutex.Unlock()
}
}
// SetAttribute sets an attribute for this profile and emits an event
func (cp *cwtchPeer) SetAttribute(key string, val string) {
cp.mutex.Lock()
cp.Profile.SetAttribute(key, val)
defer cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{
event.Key: key,
event.Data: val,
}))
}
// GetAttribute gets an attribute for the profile
func (cp *cwtchPeer) GetAttribute(key string) (string, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if val, exists := cp.Profile.GetAttribute(key); exists {
return val, true
}
return "", false
}
// SetContactAttribute sets an attribute for the indicated contact and emits an event
func (cp *cwtchPeer) SetContactAttribute(onion string, key string, val string) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if contact, ok := cp.Profile.GetContact(onion); ok {
contact.SetAttribute(key, val)
cp.eventBus.Publish(event.NewEvent(event.SetPeerAttribute, map[event.Field]string{
event.RemotePeer: onion,
event.Key: key,
event.Data: val,
}))
}
}
// GetContactAttribute gets an attribute for the indicated contact
func (cp *cwtchPeer) GetContactAttribute(onion string, key string) (string, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if contact, ok := cp.Profile.GetContact(onion); ok {
if val, exists := contact.GetAttribute(key); exists {
return val, true
}
}
return "", false
}
// SetGroupAttribute sets an attribute for the indicated group and emits an event
func (cp *cwtchPeer) SetGroupAttribute(gid string, key string, val string) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if group := cp.Profile.GetGroup(gid); group != nil {
group.SetAttribute(key, val)
cp.eventBus.Publish(event.NewEvent(event.SetGroupAttribute, map[event.Field]string{
event.GroupID: gid,
event.Key: key,
event.Data: val,
}))
}
}
// GetGroupAttribute gets an attribute for the indicated group
func (cp *cwtchPeer) GetGroupAttribute(gid string, key string) (string, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if group := cp.Profile.GetGroup(gid); group != nil {
if val, exists := group.GetAttribute(key); exists {
return val, true
}
}
return "", false
}
// Shutdown kills all connections and cleans up all goroutines for the peer
func (cp *cwtchPeer) Shutdown() {
cp.mutex.Lock()
defer cp.mutex.Unlock()
cp.shutdown = true
2019-01-04 21:44:21 +00:00
cp.queue.Shutdown()
}
2019-01-04 21:44:21 +00:00
// eventHandler process events from other subsystems
func (cp *cwtchPeer) eventHandler() {
for {
ev := cp.queue.Next()
switch ev.EventType {
/***** Default auto handled events *****/
2019-01-04 21:44:21 +00:00
case event.EncryptedGroupMessage:
// If successful, a side effect is the message is added to the group's timeline
cp.mutex.Lock()
ok, groupID, message, seen := cp.Profile.AttemptDecryption([]byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature]))
cp.mutex.Unlock()
if ok && !seen {
2019-02-04 01:55:13 +00:00
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: string(message.Signature), event.PreviousSignature: string(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
2019-01-04 21:44:21 +00:00
}
case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data
ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
cp.mutex.Lock()
cp.Profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
cp.mutex.Unlock()
case event.PeerAcknowledgement:
cp.mutex.Lock()
cp.Profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID])
cp.mutex.Unlock()
case event.SendMessageToGroupError:
cp.mutex.Lock()
cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupServer], ev.Data[event.Signature], ev.Data[event.Error])
cp.mutex.Unlock()
case event.SendMessageToPeerError:
cp.mutex.Lock()
cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error])
cp.mutex.Unlock()
case event.NewGetValMessageFromPeer:
onion := ev.Data[event.RemotePeer]
scope := ev.Data[event.Scope]
path := ev.Data[event.Path]
log.Debugf("NewGetValMessageFromPeer for %v%v from %v\n", scope, path, onion)
if scope == attr.PublicScope {
val, exists := cp.GetAttribute(attr.GetPublicScope(path))
resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Exists: strconv.FormatBool(exists)})
resp.EventID = ev.EventID
if exists {
resp.Data[event.Data] = val
} else {
resp.Data[event.Data] = ""
}
log.Debugf("Responding with SendRetValMessageToPeer exists:%v data: %v\n", exists, val)
cp.eventBus.Publish(resp)
}
/***** Non default but requestable handlable events *****/
case event.NewRetValMessageFromPeer:
onion := ev.Data[event.RemotePeer]
scope := ev.Data[event.Scope]
path := ev.Data[event.Path]
val := ev.Data[event.Data]
exists, _ := strconv.ParseBool(ev.Data[event.Exists])
log.Debugf("NewRetValMessageFromPeer %v %v%v %v %v\n", onion, scope, path, exists, val)
if exists {
if scope == attr.PublicScope {
cp.SetContactAttribute(onion, attr.GetPeerScope(path), val)
}
}
2019-01-04 21:44:21 +00:00
case event.NewGroupInvite:
cp.mutex.Lock()
cp.Profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
cp.mutex.Unlock()
case event.PeerStateChange:
cp.mutex.Lock()
if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists {
cp.Profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
}
cp.mutex.Unlock()
case event.ServerStateChange:
cp.mutex.Lock()
for _, group := range cp.Profile.Groups {
if group.GroupServer == ev.Data[event.GroupServer] {
group.State = ev.Data[event.ConnectionState]
}
}
cp.mutex.Unlock()
2019-01-04 21:44:21 +00:00
default:
if ev.EventType != "" {
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
2019-01-04 21:44:21 +00:00
}
2019-01-21 20:08:03 +00:00
return
2019-01-04 21:44:21 +00:00
}
}
2018-10-04 19:15:03 +00:00
}