package peer import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/protocol/connections" "encoding/base32" "encoding/base64" "encoding/json" "errors" "git.openprivacy.ca/openprivacy/log" "strconv" "strings" "sync" "time" ) var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true, event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true} // DefaultEventsToHandle specifies which events will be subscribed to // when a peer has its Init() function called var DefaultEventsToHandle = []event.Type{ event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement, event.NewGroupInvite, event.PeerError, event.SendMessageToGroupError, event.NewGetValMessageFromPeer, } // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer type cwtchPeer struct { Profile *model.Profile mutex sync.Mutex shutdown bool queue event.Queue eventBus event.Manager } // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to // directly implement a cwtchPeer. type CwtchPeer interface { Init(event.Manager) AutoHandleEvents(events []event.Type) PeerWithOnion(string) InviteOnionToGroup(string, string) error SendMessageToPeer(string, string) string SendGetValToPeer(string, string, string) StoreMessage(onion string, messageTxt string, sent time.Time) SetContactAuthorization(string, model.Authorization) error ProcessInvite(string, string) (string, string, error) AcceptInvite(string) error RejectInvite(string) DeleteContact(string) DeleteGroup(string) AddServer(string) error JoinServer(string) error GetServers() []string SendMessageToGroupTracked(string, string) (string, error) GetName() string SetName(string) GetOnion() string GetPeerState(string) (connections.ConnectionState, bool) StartGroup(string) (string, []byte, error) ImportGroup(string) error ExportGroup(string) (string, error) GetGroup(string) *model.Group GetGroupState(string) (connections.ConnectionState, bool) GetGroups() []string AddContact(nick, onion string, authorization model.Authorization) GetContacts() []string GetContact(string) *model.PublicProfile SetAttribute(string, string) GetAttribute(string) (string, bool) SetContactAttribute(string, string, string) GetContactAttribute(string, string) (string, bool) SetGroupAttribute(string, string, string) GetGroupAttribute(string, string) (string, bool) Listen() StartPeersConnections() StartServerConnections() Shutdown() } // NewCwtchPeer creates and returns a new cwtchPeer with the given name. func NewCwtchPeer(name string) CwtchPeer { cp := new(cwtchPeer) cp.Profile = model.GenerateNewProfile(name) cp.shutdown = false return cp } // FromProfile generates a new peer from a profile. func FromProfile(profile *model.Profile) CwtchPeer { cp := new(cwtchPeer) cp.Profile = profile cp.shutdown = false return cp } // Init instantiates a cwtchPeer func (cp *cwtchPeer) Init(eventBus event.Manager) { cp.InitForEvents(eventBus, DefaultEventsToHandle) } func (cp *cwtchPeer) InitForEvents(eventBus event.Manager, toBeHandled []event.Type) { cp.queue = event.NewQueue() go cp.eventHandler() cp.eventBus = eventBus cp.AutoHandleEvents(toBeHandled) } // AutoHandleEvents sets an event (if able) to be handled by this peer func (cp *cwtchPeer) AutoHandleEvents(events []event.Type) { for _, ev := range events { if _, exists := autoHandleableEvents[ev]; exists { cp.eventBus.Subscribe(ev, cp.queue) } else { log.Errorf("Peer asked to autohandle event it cannot: %v\n", ev) } } } // ImportGroup intializes a group from an imported source rather than a peer invite func (cp *cwtchPeer) ImportGroup(exportedInvite string) (err error) { if strings.HasPrefix(exportedInvite, "torv3") { data, err := base64.StdEncoding.DecodeString(exportedInvite[5:]) if err == nil { cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{ event.GroupInvite: string(data), event.Imported: "true", })) } else { log.Errorf("error decoding group invite: %v", err) } return nil } return errors.New("unsupported exported group type") } // ExportGroup serializes a group invite so it can be given offline func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) { cp.mutex.Lock() defer cp.mutex.Unlock() group := cp.Profile.GetGroup(groupID) if group != nil { invite, err := group.Invite(group.GetInitialMessage()) if err == nil { exportedInvite := "torv3" + base64.StdEncoding.EncodeToString(invite) return exportedInvite, err } } return "", errors.New("group id could not be found") } // StartGroup create a new group linked to the given server and returns the group ID, an invite or an error. func (cp *cwtchPeer) StartGroup(server string) (string, []byte, error) { return cp.StartGroupWithMessage(server, []byte{}) } // StartGroupWithMessage create a new group linked to the given server and returns the group ID, an invite or an error. func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte) (groupID string, invite []byte, err error) { cp.mutex.Lock() groupID, invite, err = cp.Profile.StartGroupWithMessage(server, initialMessage) cp.mutex.Unlock() if err == nil { group := cp.GetGroup(groupID) jsobj, err := json.Marshal(group) if err == nil { cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{ event.Data: string(jsobj), })) } } else { log.Errorf("error creating group: %v", err) } return } // GetGroups returns an unordered list of all group IDs. func (cp *cwtchPeer) GetGroups() []string { cp.mutex.Lock() defer cp.mutex.Unlock() return cp.Profile.GetGroups() } // GetGroup returns a pointer to a specific group, nil if no group exists. func (cp *cwtchPeer) GetGroup(groupID string) *model.Group { cp.mutex.Lock() defer cp.mutex.Unlock() return cp.Profile.GetGroup(groupID) } func (cp *cwtchPeer) AddContact(nick, onion string, authorization model.Authorization) { decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion)) pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: decodedPub, Authorization: authorization, Onion: onion, Attributes: map[string]string{"nick": nick}} cp.Profile.AddContact(onion, pp) pd, _ := json.Marshal(pp) cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{ event.Data: string(pd), event.RemotePeer: onion, })) cp.eventBus.Publish(event.NewEventList(event.SetPeerAuthorization, event.RemotePeer, onion, event.Authorization, string(authorization))) // Default to Deleting Peer History cp.eventBus.Publish(event.NewEventList(event.SetPeerAttribute, event.RemotePeer, onion, event.SaveHistoryKey, event.DeleteHistoryDefault)) } // AddServer takes in a serialized server specification (a bundle of related keys) and adds a contact for the // server assuming there are no errors and the contact doesn't already exist. // TODO in the future this function should also integrate with a trust provider to validate the key bundle. func (cp *cwtchPeer) AddServer(serverSpecification string) error { // This confirms that the server did at least sign the bundle keyBundle, err := model.DeserializeAndVerify([]byte(serverSpecification)) if err != nil { return err } log.Debugf("Got new key bundle %v", keyBundle) if keyBundle.HasKeyType(model.KeyTypeServerOnion) { onionKey, _ := keyBundle.GetKey(model.KeyTypeServerOnion) onion := string(onionKey) if cp.GetContact(onion) == nil { decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion)) ab := keyBundle.AttributeBundle() pp := &model.PublicProfile{Name: onion, Ed25519PublicKey: decodedPub, Authorization: model.AuthUnknown, Onion: onion, Attributes: ab} cp.Profile.AddContact(onion, pp) pd, _ := json.Marshal(pp) cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{ event.Data: string(pd), event.RemotePeer: onion, })) // Publish every key as an attribute for k, v := range ab { log.Debugf("Server (%v) has %v key %v", onion, k, v) cp.eventBus.Publish(event.NewEventList(event.SetPeerAttribute, event.RemotePeer, onion, k, v)) } // Default to Deleting Peer History cp.eventBus.Publish(event.NewEventList(event.SetPeerAttribute, event.RemotePeer, onion, event.SaveHistoryKey, event.DeleteHistoryDefault)) return nil } // We have already seen this server and so some additional checks are needed (and we don't need to create the // peer). server := cp.GetContact(onion) ab := keyBundle.AttributeBundle() // Check server bundle for consistency for k, v := range ab { val, exists := server.GetAttribute(k) if exists { if val != v { // this is inconsistent! return model.InconsistentKeyBundleError } } // we haven't seen this key associated with the server before } // If we have gotten to this point we can assume this is a safe key bundle signed by the // server with no conflicting keys. So we are going to publish all the keys for k, v := range ab { log.Debugf("Server (%v) has %v key %v", onion, k, v) cp.eventBus.Publish(event.NewEventList(event.SetPeerAttribute, event.RemotePeer, onion, k, v)) } return nil } return err } // GetContacts returns an unordered list of onions func (cp *cwtchPeer) GetContacts() []string { cp.mutex.Lock() defer cp.mutex.Unlock() return cp.Profile.GetContacts() } // GetServers returns an unordered list of servers func (cp *cwtchPeer) GetServers() []string { contacts := cp.Profile.GetContacts() var servers []string for _, contact := range contacts { if cp.GetContact(contact).IsServer() { servers = append(servers, contact) } } return servers } // GetContact returns a given contact, nil is no such contact exists func (cp *cwtchPeer) GetContact(onion string) *model.PublicProfile { cp.mutex.Lock() defer cp.mutex.Unlock() contact, _ := cp.Profile.GetContact(onion) return contact } func (cp *cwtchPeer) GetName() string { cp.mutex.Lock() defer cp.mutex.Unlock() return cp.Profile.Name } func (cp *cwtchPeer) SetName(newName string) { cp.mutex.Lock() defer cp.mutex.Unlock() cp.Profile.Name = newName } func (cp *cwtchPeer) GetOnion() string { cp.mutex.Lock() defer cp.mutex.Unlock() return cp.Profile.Onion } func (cp *cwtchPeer) GetPeerState(onion string) (connections.ConnectionState, bool) { cp.mutex.Lock() defer cp.mutex.Unlock() if peer, ok := cp.Profile.Contacts[onion]; ok { return connections.ConnectionStateToType()[peer.State], true } return connections.DISCONNECTED, false } func (cp *cwtchPeer) GetGroupState(groupid string) (connections.ConnectionState, bool) { cp.mutex.Lock() defer cp.mutex.Unlock() if group, ok := cp.Profile.Groups[groupid]; ok { return connections.ConnectionStateToType()[group.State], true } return connections.DISCONNECTED, false } // PeerWithOnion is the entry point for cwtchPeer relationships func (cp *cwtchPeer) PeerWithOnion(onion string) { cp.mutex.Lock() defer cp.mutex.Unlock() if _, exists := cp.Profile.GetContact(onion); !exists { cp.AddContact(onion, onion, model.AuthApproved) } cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) } // DeleteContact deletes a peer from the profile, storage, and handling func (cp *cwtchPeer) DeleteContact(onion string) { cp.mutex.Lock() cp.Profile.DeleteContact(onion) defer cp.mutex.Unlock() cp.eventBus.Publish(event.NewEventList(event.DeleteContact, event.RemotePeer, onion)) } // DeleteGroup deletes a Group from the profile, storage, and handling func (cp *cwtchPeer) DeleteGroup(groupID string) { cp.mutex.Lock() cp.Profile.DeleteGroup(groupID) defer cp.mutex.Unlock() cp.eventBus.Publish(event.NewEventList(event.DeleteGroup, event.GroupID, groupID)) } // InviteOnionToGroup kicks off the invite process func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error { cp.mutex.Lock() group := cp.Profile.GetGroup(groupid) defer cp.mutex.Unlock() if group == nil { return errors.New("invalid group id") } invite, err := group.Invite(group.InitialMessage) if err == nil { cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[event.Field]string{event.RemotePeer: onion, event.GroupInvite: string(invite)})) } return err } // JoinServer manages a new server connection with the given onion address func (cp *cwtchPeer) JoinServer(onion string) error { if cp.GetContact(onion) != nil { tokenY, yExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypePrivacyPass)) tokenOnion, onionExists := cp.GetContact(onion).GetAttribute(string(model.KeyTypeTokenOnion)) if yExists && onionExists { cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion})) return nil } } return errors.New("no keys found for server connection") } // SendMessageToGroupTracked attempts to sent the given message to the given group id. // It returns the signature of the message which can be used to identify it in any UX layer. func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) { cp.mutex.Lock() group := cp.Profile.GetGroup(groupid) defer cp.mutex.Unlock() if group == nil { return "", errors.New("invalid group id") } ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid) if err == nil { cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupServer: group.GroupServer, event.Ciphertext: string(ct), event.Signature: string(sig)})) } return string(sig), err } func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message}) cp.mutex.Lock() contact, _ := cp.Profile.GetContact(onion) event.EventID = strconv.Itoa(contact.Timeline.Len()) cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID) cp.mutex.Unlock() cp.eventBus.Publish(event) return event.EventID } func (cp *cwtchPeer) SendGetValToPeer(onion string, scope string, path string) { event := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, onion, event.Scope, scope, event.Path, path) cp.eventBus.Publish(event) } // BlockPeer blocks an existing peer relationship. func (cp *cwtchPeer) SetContactAuthorization(peer string, authorization model.Authorization) error { cp.mutex.Lock() err := cp.Profile.SetContactAuthorization(peer, authorization) cp.mutex.Unlock() cp.eventBus.Publish(event.NewEvent(event.SetPeerAuthorization, map[event.Field]string{event.RemotePeer: peer, event.Authorization: string(authorization)})) return err } // ProcessInvite adds a new group invite to the profile. returns the new group ID func (cp *cwtchPeer) ProcessInvite(invite string, remotePeer string) (string, string, error) { cp.mutex.Lock() defer cp.mutex.Unlock() return cp.Profile.ProcessInvite(invite, remotePeer) } // AcceptInvite accepts a given existing group invite func (cp *cwtchPeer) AcceptInvite(groupID string) error { cp.mutex.Lock() err := cp.Profile.AcceptInvite(groupID) cp.mutex.Unlock() if err != nil { return err } cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID})) cp.JoinServer(cp.Profile.Groups[groupID].GroupServer) return nil } // RejectInvite rejects a given group invite. func (cp *cwtchPeer) RejectInvite(groupID string) { cp.mutex.Lock() defer cp.mutex.Unlock() cp.Profile.RejectInvite(groupID) } // Listen makes the peer open a listening port to accept incoming connections (and be detactably online) func (cp *cwtchPeer) Listen() { log.Debugf("cwtchPeer Listen sending ProtocolEngineStartListen\n") cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: cp.Profile.Onion})) } // StartPeersConnections attempts to connect to peer connections func (cp *cwtchPeer) StartPeersConnections() { for _, contact := range cp.GetContacts() { if cp.GetContact(contact).IsServer() == false { cp.PeerWithOnion(contact) } } } // StartServerConnections attempts to connect to all server connections func (cp *cwtchPeer) StartServerConnections() { for _, contact := range cp.GetContacts() { if cp.GetContact(contact).IsServer() { cp.JoinServer(contact) } } } // SetAttribute sets an attribute for this profile and emits an event func (cp *cwtchPeer) SetAttribute(key string, val string) { cp.mutex.Lock() cp.Profile.SetAttribute(key, val) defer cp.mutex.Unlock() cp.eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{ event.Key: key, event.Data: val, })) } // GetAttribute gets an attribute for the profile func (cp *cwtchPeer) GetAttribute(key string) (string, bool) { cp.mutex.Lock() defer cp.mutex.Unlock() if val, exists := cp.Profile.GetAttribute(key); exists { return val, true } return "", false } // SetContactAttribute sets an attribute for the indicated contact and emits an event func (cp *cwtchPeer) SetContactAttribute(onion string, key string, val string) { cp.mutex.Lock() defer cp.mutex.Unlock() if contact, ok := cp.Profile.GetContact(onion); ok { contact.SetAttribute(key, val) cp.eventBus.Publish(event.NewEvent(event.SetPeerAttribute, map[event.Field]string{ event.RemotePeer: onion, event.Key: key, event.Data: val, })) } } // GetContactAttribute gets an attribute for the indicated contact func (cp *cwtchPeer) GetContactAttribute(onion string, key string) (string, bool) { cp.mutex.Lock() defer cp.mutex.Unlock() if contact, ok := cp.Profile.GetContact(onion); ok { if val, exists := contact.GetAttribute(key); exists { return val, true } } return "", false } // SetGroupAttribute sets an attribute for the indicated group and emits an event func (cp *cwtchPeer) SetGroupAttribute(gid string, key string, val string) { cp.mutex.Lock() defer cp.mutex.Unlock() if group := cp.Profile.GetGroup(gid); group != nil { group.SetAttribute(key, val) cp.eventBus.Publish(event.NewEvent(event.SetGroupAttribute, map[event.Field]string{ event.GroupID: gid, event.Key: key, event.Data: val, })) } } // GetGroupAttribute gets an attribute for the indicated group func (cp *cwtchPeer) GetGroupAttribute(gid string, key string) (string, bool) { cp.mutex.Lock() defer cp.mutex.Unlock() if group := cp.Profile.GetGroup(gid); group != nil { if val, exists := group.GetAttribute(key); exists { return val, true } } return "", false } // Shutdown kills all connections and cleans up all goroutines for the peer func (cp *cwtchPeer) Shutdown() { cp.mutex.Lock() defer cp.mutex.Unlock() cp.shutdown = true cp.queue.Shutdown() } func (cp *cwtchPeer) StoreMessage(onion string, messageTxt string, sent time.Time) { if cp.GetContact(onion) == nil { cp.AddContact(onion, onion, model.AuthUnknown) } cp.mutex.Lock() cp.Profile.AddMessageToContactTimeline(onion, messageTxt, sent) cp.mutex.Unlock() } // eventHandler process events from other subsystems func (cp *cwtchPeer) eventHandler() { for { ev := cp.queue.Next() switch ev.EventType { /***** Default auto handled events *****/ case event.EncryptedGroupMessage: // If successful, a side effect is the message is added to the group's timeline cp.mutex.Lock() ok, groupID, message, seen := cp.Profile.AttemptDecryption([]byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature])) cp.mutex.Unlock() if ok && !seen { cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: string(message.Signature), event.PreviousSignature: string(message.PreviousMessageSig), event.RemotePeer: message.PeerID})) } case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) cp.StoreMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) case event.PeerAcknowledgement: cp.mutex.Lock() cp.Profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID]) cp.mutex.Unlock() case event.SendMessageToGroupError: cp.mutex.Lock() cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupServer], ev.Data[event.Signature], ev.Data[event.Error]) cp.mutex.Unlock() case event.SendMessageToPeerError: cp.mutex.Lock() cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error]) cp.mutex.Unlock() case event.NewGetValMessageFromPeer: onion := ev.Data[event.RemotePeer] scope := ev.Data[event.Scope] path := ev.Data[event.Path] log.Debugf("NewGetValMessageFromPeer for %v%v from %v\n", scope, path, onion) remotePeer := cp.GetContact(onion) if remotePeer != nil && remotePeer.Authorization == model.AuthApproved { if scope == attr.PublicScope { val, exists := cp.GetAttribute(attr.GetPublicScope(path)) resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Exists: strconv.FormatBool(exists)}) resp.EventID = ev.EventID if exists { resp.Data[event.Data] = val } else { resp.Data[event.Data] = "" } log.Debugf("Responding with SendRetValMessageToPeer exists:%v data: %v\n", exists, val) cp.eventBus.Publish(resp) } } /***** Non default but requestable handlable events *****/ case event.NewRetValMessageFromPeer: onion := ev.Data[event.RemotePeer] scope := ev.Data[event.Scope] path := ev.Data[event.Path] val := ev.Data[event.Data] exists, _ := strconv.ParseBool(ev.Data[event.Exists]) log.Debugf("NewRetValMessageFromPeer %v %v%v %v %v\n", onion, scope, path, exists, val) if exists { if scope == attr.PublicScope { cp.SetContactAttribute(onion, attr.GetPeerScope(path), val) } } case event.NewGroupInvite: cp.mutex.Lock() group, groupName, err := cp.Profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer]) if err == nil { if ev.Data[event.Imported] == "true" { cp.Profile.GetGroup(group).Accepted = true cp.mutex.Unlock() // TODO...seriously need a better way of handling these cases cp.SetGroupAttribute(group, attr.GetLocalScope("name"), groupName) err = cp.JoinServer(cp.Profile.GetGroup(group).GroupServer) cp.mutex.Lock() if err != nil { log.Errorf("Joining Server should have worked %v", err) } } cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.GroupID: group})) } cp.mutex.Unlock() case event.PeerStateChange: cp.mutex.Lock() if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists { cp.Profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState] } cp.mutex.Unlock() case event.ServerStateChange: cp.mutex.Lock() for _, group := range cp.Profile.Groups { if group.GroupServer == ev.Data[event.GroupServer] { group.State = ev.Data[event.ConnectionState] } } cp.mutex.Unlock() default: if ev.EventType != "" { log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) } return } } }