From 25e86426b22768ed708f5cc950c40af052a4cbe9 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Fri, 26 Jan 2024 13:33:30 -0800 Subject: [PATCH] WIP: Hybrid Groups Initial Sketch --- .gitignore | 3 +- app/app.go | 6 +- event/eventmanager.go | 5 + event/infinitequeue.go | 1 + functionality/hybrid/common.go | 80 +++++ functionality/hybrid/managed.go | 312 ++++++++++++++++++ functionality/hybrid/manager.go | 144 ++++++++ model/attr/zone.go | 5 + model/constants/experiments.go | 3 + model/conversation.go | 25 ++ model/overlay.go | 10 + peer/cwtch_peer.go | 133 ++++++-- peer/profile_interface.go | 9 + testing/cwtch_peer_server_integration_test.go | 34 +- testing/hybrid_group_integration_test.go | 217 ++++++++++++ testing/utils.go | 32 ++ 16 files changed, 971 insertions(+), 48 deletions(-) create mode 100644 functionality/hybrid/common.go create mode 100644 functionality/hybrid/managed.go create mode 100644 functionality/hybrid/manager.go create mode 100644 testing/hybrid_group_integration_test.go create mode 100644 testing/utils.go diff --git a/.gitignore b/.gitignore index 7e47152..541519f 100644 --- a/.gitignore +++ b/.gitignore @@ -33,4 +33,5 @@ data-dir-cwtchtool/ tokens tordir/ testing/autodownload/download_dir -testing/autodownload/storage \ No newline at end of file +testing/autodownload/storage +testing/managerstorage diff --git a/app/app.go b/app/app.go index c3b0ff7..3033018 100644 --- a/app/app.go +++ b/app/app.go @@ -5,6 +5,7 @@ import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/extensions" "cwtch.im/cwtch/functionality/filesharing" + "cwtch.im/cwtch/functionality/hybrid" "cwtch.im/cwtch/functionality/servers" "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" @@ -366,6 +367,8 @@ func (app *application) registerHooks(profile peer.CwtchPeer) { profile.RegisterHook(new(filesharing.Functionality)) profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality)) profile.RegisterHook(new(servers.Functionality)) + profile.RegisterHook(new(hybrid.ManagedGroupFunctionality)) + profile.RegisterHook(new(hybrid.GroupManagerFunctionality)) // will only be activated if GroupManagerExperiment is enabled... // Ensure that Profiles have the Most Up to Date Settings... profile.NotifySettingsUpdate(app.settings.ReadGlobalSettings()) } @@ -404,7 +407,7 @@ func (app *application) ActivatePeerEngine(onion string) { if err == nil { log.Debugf("restartFlow: Creating a New Protocol Engine...") app.engines[profile.GetOnion()] = engine - eventBus.Publish(event.NewEventList(event.ProtocolEngineCreated)) + eventBus.Publish(event.NewEventList(event.ProtocolEngineCreated, event.Handle, profile.GetOnion())) app.QueryACNStatus() } else { log.Errorf("corrupted profile detected for %v", onion) @@ -515,6 +518,7 @@ func (app *application) eventHandler() { log.Infof("peer appearing offline, not launching listen threads or connecting jobs") app.ConfigureConnections(onion, false, false, false) } else { + log.Infof("configuring online connections for peer") app.ConfigureConnections(onion, true, true, true) } } diff --git a/event/eventmanager.go b/event/eventmanager.go index 04f506d..8394667 100644 --- a/event/eventmanager.go +++ b/event/eventmanager.go @@ -95,6 +95,11 @@ func (em *manager) initialize() { func (em *manager) Subscribe(eventType Type, queue Queue) { em.mapMutex.Lock() defer em.mapMutex.Unlock() + for _, sub := range em.subscribers[eventType] { + if sub == queue { + return // don't add the same queue for the same event twice... + } + } em.subscribers[eventType] = append(em.subscribers[eventType], queue) } diff --git a/event/infinitequeue.go b/event/infinitequeue.go index e25a5c6..74de05c 100644 --- a/event/infinitequeue.go +++ b/event/infinitequeue.go @@ -54,6 +54,7 @@ func (q *infiniteQueue) resize() { // Add puts an element on the end of the queue. func (q *infiniteQueue) Add(elem Event) { + if q.count == len(q.buf) { q.resize() } diff --git a/functionality/hybrid/common.go b/functionality/hybrid/common.go new file mode 100644 index 0000000..c000bc6 --- /dev/null +++ b/functionality/hybrid/common.go @@ -0,0 +1,80 @@ +package hybrid + +import ( + "crypto/ed25519" + "encoding/base32" + "encoding/json" + "strings" +) + +type GroupEventType int + +const ( + MemberGroupIDKey = "member_group_id_key" + MemberMessageIDKey = "member_group_messge_id" +) + +const ( + AddMember = GroupEventType(0x1000) + RemoveMember = GroupEventType(0x2000) + RotateKey = GroupEventType(0x3000) + NewMessage = GroupEventType(0x4000) + NewClearMessage = GroupEventType(0x5000) + SyncRequest = GroupEventType(0x6000) +) + +type ManageGroupEvent struct { + EventType GroupEventType `json:"t"` + Data string `json:"d"` // json encoded data +} + +type AddMemberEvent struct { + Handle string `json:"h"` +} + +type RemoveMemberEvent struct { + Handle string `json:"h"` +} + +type RotateKeyEvent struct { + Key []byte `json:"k"` +} + +type NewMessageEvent struct { + EncryptedHybridGroupMessage []byte `json:"m"` +} + +type NewClearMessageEvent struct { + HybridGroupMessage HybridGroupMessage `json:"m"` +} + +type SyncRequestMessage struct { + // a map of MemberGroupID: MemberMessageID + LastSeen map[int]int `json:"l"` +} + +// This file contains code for the Hybrid Group / Managed Group types.. +type HybridGroupMessage struct { + Author string // the authors cwtch address + MemberGroupID uint32 + MemberMessageID uint32 + MessageBody string + Sent uint64 // milliseconds since epoch + Signature []byte // of json-encoded content (including empty sig) +} + +// AuthenticateMessage returns true if the Author of the message produced the Signature over the message +func AuthenticateMessage(message HybridGroupMessage) bool { + messageCopy := message + messageCopy.Signature = []byte{} + // Otherwise we derive the public key from the sender and check it against that. + decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(message.Author)) + if err == nil { + data, err := json.Marshal(messageCopy) + if err == nil && len(decodedPub) >= 32 { + return ed25519.Verify(decodedPub[:32], data, message.Signature) + } + } + + return false +} diff --git a/functionality/hybrid/managed.go b/functionality/hybrid/managed.go new file mode 100644 index 0000000..77aecf4 --- /dev/null +++ b/functionality/hybrid/managed.go @@ -0,0 +1,312 @@ +package hybrid + +import ( + "crypto/rand" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/settings" + "encoding/base64" + "encoding/json" + "fmt" + "git.openprivacy.ca/openprivacy/log" + "golang.org/x/crypto/nacl/secretbox" + "math" + "math/big" + "strconv" + "time" +) + +type ManagedGroupFunctionality struct { +} + +func (f *ManagedGroupFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) { +} + +func (f *ManagedGroupFunctionality) EventsToRegister() []event.Type { + return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeerEngine} +} + +func (f *ManagedGroupFunctionality) ExperimentsToRegister() []string { + return []string{constants.GroupsExperiment} +} + +// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded +func (f *ManagedGroupFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) { + switch ev.EventType { + // This is where most of the magic happens for managed groups. A few notes: + // - CwtchPeer has already taken care of storing this for us, we don't need to worry about that + // - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected. + case event.NewMessageFromPeerEngine: + handle := ev.Data[event.RemotePeer] + ci, err := profile.FetchConversationInfo(handle) + if err != nil { + break // we don't care about unknown conversations... + } + if ci.ACL[handle].ManageGroup { + var cm model.MessageWrapper + err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm) + if err != nil { + break + } + // The overlay type of this message **must** be ManageGroupEvent + if cm.Overlay == model.OverlayManageGroupEvent { + var mge ManageGroupEvent + err = json.Unmarshal([]byte(cm.Data), &mge) + if err == nil { + cid, err := profile.FetchConversationInfo(handle) + if err == nil { + f.handleEvent(profile, *cid, mge) + } + } + } + } + } +} + +// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling +// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true) +func (f *ManagedGroupFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent) { + switch mge.EventType { + case AddMember: + var ame AddMemberEvent + err := json.Unmarshal([]byte(mge.Data), &ame) + if err == nil { + f.handleAddMemberEvent(profile, conversation, ame) + } + case RemoveMember: + var rme RemoveMemberEvent + err := json.Unmarshal([]byte(mge.Data), &rme) + if err == nil { + f.handleRemoveMemberEvent(profile, conversation, rme) + } + case NewMessage: + var nme NewMessageEvent + err := json.Unmarshal([]byte(mge.Data), &nme) + if err == nil { + f.handleNewMessageEvent(profile, conversation, nme) + } + case NewClearMessage: + var nme NewClearMessageEvent + err := json.Unmarshal([]byte(mge.Data), &nme) + if err == nil { + f.handleNewClearMessageEvent(profile, conversation, nme) + } + case RotateKey: + var rke RotateKeyEvent + err := json.Unmarshal([]byte(mge.Data), &rke) + if err == nil { + f.handleRotateKeyEvent(profile, conversation, rke) + } + } +} + +// handleAddMemberEvent adds a group member to the conversation ACL +// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true) +func (f *ManagedGroupFunctionality) handleAddMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, ame AddMemberEvent) { + acl := conversation.ACL + acl[ame.Handle] = model.DefaultP2PAccessControl() + profile.UpdateConversationAccessControlList(conversation.ID, acl) +} + +// handleRemoveMemberEvent removes a group member from the conversation ACL +// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true) +func (f *ManagedGroupFunctionality) handleRemoveMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, rme RemoveMemberEvent) { + acl := conversation.ACL + delete(acl, rme.Handle) + profile.UpdateConversationAccessControlList(conversation.ID, acl) +} + +// handleRotateKeyEvent rotates the encryption key for a given group +// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true) +func (f *ManagedGroupFunctionality) handleRotateKeyEvent(profile peer.CwtchPeer, conversation model.Conversation, rke RotateKeyEvent) { + keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key")) + keyB64 := base64.StdEncoding.EncodeToString(rke.Key) + profile.SetConversationAttribute(conversation.ID, keyScope, keyB64) +} + +func (f *ManagedGroupFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewMessageEvent) { + keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key")) + if keyB64, err := profile.GetConversationAttribute(conversation.ID, keyScope); err == nil { + key, err := base64.StdEncoding.DecodeString(keyB64) + if err != nil || len(key) != 32 { + log.Errorf("hybrid group key is corrupted") + return + } + // decrypt the message with key... + hgm, err := f.decryptMessage(key, nme.EncryptedHybridGroupMessage) + if hgm == nil || err != nil { + log.Errorf("unable to decrypt hybrid group message: %v", err) + return + } + // + } +} + +func (f *ManagedGroupFunctionality) handleNewClearMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent) { + hgm := nme.HybridGroupMessage + if AuthenticateMessage(hgm) { + // TODO Closed Group Membership Check + if profile.GetOnion() == hgm.Author { + // ack + signatureB64 := base64.StdEncoding.EncodeToString(hgm.Signature) + id, err := profile.GetChannelMessageBySignature(conversation.ID, 0, signatureB64) + if err == nil { + profile.UpdateMessageAttribute(conversation.ID, 0, id, constants.AttrAck, constants.True) + profile.PublishEvent(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.Index: strconv.Itoa(id)})) + } + } else { + mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on... + newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID)) + // Set the attributes of this message... + attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr, + constants.AttrAuthor: hgm.Author, + constants.AttrAck: event.True, + constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)} + profile.InternalInsertMessage(conversation.ID, 0, hgm.Author, hgm.MessageBody, attr, hgm.Signature) + } + + // TODO need to send an event here... + } else { + log.Errorf("received fraudulant hybrid message fom group") + } +} + +func (f *ManagedGroupFunctionality) decryptMessage(key []byte, ciphertext []byte) (*HybridGroupMessage, error) { + if len(ciphertext) > 24 { + var decryptNonce [24]byte + copy(decryptNonce[:], ciphertext[:24]) + var fixedSizeKey [32]byte + copy(fixedSizeKey[:], key[:32]) + decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &fixedSizeKey) + if ok { + var hgm HybridGroupMessage + err := json.Unmarshal(decrypted, &hgm) + return &hgm, err + } + } + return nil, fmt.Errorf("invalid ciphertext/key error") +} + +// Define a new managed group, managed by the manager... +func (f *ManagedGroupFunctionality) NewManagedGroup(profile peer.CwtchPeer, manager string) error { + + // generate a truely random member id for this group in [0..2^32) + nBig, err := rand.Int(rand.Reader, big.NewInt(math.MaxUint32)) + if err != nil { + return err // if there is a problem with random we want to exit now rather than have to clean up group setup... + } + + ac := model.DefaultP2PAccessControl() + ac.ManageGroup = true // by setting the ManageGroup permission in this ACL we are allowing the manager to control of how this group is structured + ci, err := profile.NewContactConversation(manager, ac, true) + if err != nil { + return err + } + // enable channel 2 on this conversation (hybrid groups management channel) + key := fmt.Sprintf("channel.%d", 2) + err = profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True) + if err != nil { + return fmt.Errorf("could not enable channel 2 on hybrid group: %v", err) // likely a catestrophic error...fail + } + + // finally, set the member group id on this group... + mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey)) + err = profile.SetConversationAttributeInt(ci, mgidkey, int(nBig.Uint64())) + if err != nil { + return fmt.Errorf("could not set group id on hybrid group: %v", err) // likely a catestrophic error...fail + } + return nil +} + +// SendMessageToManagedGroup acts like SendMessage(ToPeer), but with a few additional bookkeeping steps for Hybrid Groups +func (f *ManagedGroupFunctionality) SendMessageToManagedGroup(profile peer.CwtchPeer, conversation int, message string) (int, error) { + mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey)) + mgid, err := profile.GetConversationAttributeInt(conversation, mgidkey) + if err != nil { + return -1, err + } + + mmidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberMessageIDKey)) + mmid, err := profile.GetConversationAttributeInt(conversation, mmidkey) + if err != nil { + mmid = 0 // first message + } + + mmid += 1 + + hgm := HybridGroupMessage{ + MemberGroupID: uint32(mgid), + MemberMessageID: uint32(mmid), + Sent: uint64(time.Now().UnixMilli()), + Author: profile.GetOnion(), + MessageBody: message, + Signature: []byte{}, // Leave blank so we can sign this message... + } + + data, err := json.Marshal(hgm) + if err != nil { + return -1, err + } + + sig, err := profile.SignMessage(data) + if err != nil { + return -1, err + } + + hgm.Signature = sig + + ncm := NewClearMessageEvent{ + HybridGroupMessage: hgm, + } + + signedData, err := json.Marshal(ncm) + if err != nil { + return -1, err + } + + mgm := ManageGroupEvent{ + EventType: NewClearMessage, + Data: string(signedData), + } + + odata, err := json.Marshal(mgm) + if err != nil { + return -1, err + } + + overlay := model.MessageWrapper{ + Overlay: model.OverlayManageGroupEvent, + Data: string(odata), + } + + ojson, err := json.Marshal(overlay) + if err != nil { + return -1, err + } + + // send the message to the manager and update our message is string for tracking... + _, err = profile.SendMessage(conversation, string(ojson)) + if err != nil { + return -1, err + } + profile.SetConversationAttributeInt(conversation, mmidkey, mmid) + + // ok there is still one more thing we need to do... + // insert this message as part of our group log, for members of the group + // this exists in channel 0 of the conversation with the group manager... + mgidstr := strconv.Itoa(mgid) // we need both MemberGroupId and MemberMessageId for attestation later on... + newmmidstr := strconv.Itoa(mmid) + attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr, constants.AttrAuthor: profile.GetOnion(), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)} + return profile.InternalInsertMessage(conversation, 0, hgm.Author, message, attr, hgm.Signature) +} + +func (f *ManagedGroupFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) { + // nop hybrid group conversations do not exchange contact requests +} + +func (f *ManagedGroupFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) { + // nop hybrid group conversations do not exchange contact requests +} diff --git a/functionality/hybrid/manager.go b/functionality/hybrid/manager.go new file mode 100644 index 0000000..a141737 --- /dev/null +++ b/functionality/hybrid/manager.go @@ -0,0 +1,144 @@ +// This file contains all code related to how a Group Manager operates over a group. +package hybrid + +import ( + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/model" + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/settings" + "encoding/json" + "fmt" + "git.openprivacy.ca/openprivacy/log" + "strconv" + "time" +) + +type GroupManagerFunctionality struct { +} + +func (f *GroupManagerFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) { +} + +func (f *GroupManagerFunctionality) EventsToRegister() []event.Type { + return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeerEngine} +} + +func (f *GroupManagerFunctionality) ExperimentsToRegister() []string { + return []string{constants.GroupManagerExperiment, constants.GroupsExperiment} +} + +// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded +func (f *GroupManagerFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) { + switch ev.EventType { + // This is where most of the magic happens for managed groups. A few notes: + // - CwtchPeer has already taken care of storing this for us, we don't need to worry about that + // - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected. + case event.NewMessageFromPeerEngine: + log.Infof("received new message from peer: manager") + ci, err := profile.GetConversationInfo(1) + if err != nil { + log.Errorf("unknown conversation %v", err) + break // we don't care about unknown conversations... + } + var cm model.MessageWrapper + err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm) + if err != nil { + log.Errorf("could not deserialize json %v", err) + break + } + // The overlay type of this message **must** be ManageGroupEvent + if cm.Overlay == model.OverlayManageGroupEvent { + var mge ManageGroupEvent + err = json.Unmarshal([]byte(cm.Data), &mge) + if err == nil { + f.handleEvent(profile, *ci, mge, ev.Data[event.Data]) + } + } + } +} + +// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling +// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true) +func (f *GroupManagerFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent, original string) { + switch mge.EventType { + case NewClearMessage: + var nme NewClearMessageEvent + err := json.Unmarshal([]byte(mge.Data), &nme) + if err == nil { + f.handleNewMessageEvent(profile, conversation, nme, original) + } + } +} + +func (f *GroupManagerFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent, original string) { + log.Infof("handling new clear message event") + hgm := nme.HybridGroupMessage + if AuthenticateMessage(hgm) { + log.Infof("authenticated message") + // TODO Closed Group Membership Check + mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on... + newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID)) + // Set the attributes of this message... + attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr, + constants.AttrAuthor: hgm.Author, + constants.AttrAck: event.True, + constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)} + profile.InternalInsertMessage(conversation.ID, 0, hgm.Author, hgm.MessageBody, attr, hgm.Signature) + + // forward the message to everyone who the server has added as a contact + // TODO: filter by conversation.ACL + allConversations, _ := profile.FetchConversations() + for _, ci := range allConversations { + log.Infof("forwarding group message to: %v", ci.Handle) + profile.SendMessage(ci.ID, original) + } + } else { + log.Errorf("received fraudulant hybrid message fom group") + } +} + +// Establish a new Managed Group +func (f *GroupManagerFunctionality) ManageNewGroup(profile peer.CwtchPeer) (int, error) { + ac := model.DefaultP2PAccessControl() + // by setting the ManageGroup permission in this ACL we are allowing the manage to + // take control of how this group is structured, see OnEvent above... + ac.ManageGroup = true + // note: a manager can only manage one group. This will always be true and has a few benefits + // and downsides. + // The main downside is that it requires a new manager per group (and thus an onion service per group) + // However, it means that we can lean on p2p functionality like profile images / metadata / name + // etc. for group metadata and effectively get that for-free in the client. + handle := fmt.Sprintf("managed:%d", 000) + acl := model.AccessControlList{} + acl[profile.GetOnion()] = ac + ci, err := profile.NewConversation(handle, acl) + if err != nil { + return -1, err + } + return ci, nil +} + +// AddHybridContact is a wrapper arround NewContactConversation which sets the contact +// up for Hybrid Group channel messages... +func (f *GroupManagerFunctionality) AddHybridContact(profile peer.CwtchPeer, handle string) error { + ac := model.DefaultP2PAccessControl() + ac.ManageGroup = false + ci, err := profile.NewContactConversation(handle, ac, true) + if err != nil { + return err + } + // enable channel 2 on this conversation (hybrid groups management channel) + key := fmt.Sprintf("channel.%d", 2) + profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True) + return nil +} + +func (f *GroupManagerFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) { + // nop hybrid group conversations do not exchange contact requests +} + +func (f *GroupManagerFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) { + // nop hybrid group conversations do not exchange contact requests +} diff --git a/model/attr/zone.go b/model/attr/zone.go index 72b446b..204dd24 100644 --- a/model/attr/zone.go +++ b/model/attr/zone.go @@ -20,6 +20,9 @@ const ( // LegacyGroupZone for attributes related to legacy group experiment LegacyGroupZone = Zone("legacygroup") + // ConversationZone for attributes related to structure of the conversation + ConversationZone = Zone("conversation") + // FilesharingZone for attributes related to file sharing FilesharingZone = Zone("filesharing") @@ -65,6 +68,8 @@ func ParseZone(path string) (Zone, string) { return ServerKeyZone, parts[1] case ServerZone: return ServerZone, parts[1] + case ConversationZone: + return ConversationZone, parts[1] default: return UnknownZone, parts[1] } diff --git a/model/constants/experiments.go b/model/constants/experiments.go index 62cd482..c962dc6 100644 --- a/model/constants/experiments.go +++ b/model/constants/experiments.go @@ -2,6 +2,9 @@ package constants const GroupsExperiment = "tapir-groups-experiment" +// for now only used by bots, do not expose in UI..for now +const GroupManagerExperiment = "manage-group-experiment" + // FileSharingExperiment Allows file sharing const FileSharingExperiment = "filesharing" diff --git a/model/conversation.go b/model/conversation.go index 324fdb4..2b5b6fd 100644 --- a/model/conversation.go +++ b/model/conversation.go @@ -4,7 +4,9 @@ import ( "cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/constants" "encoding/json" + "fmt" "git.openprivacy.ca/openprivacy/log" + "strings" "time" ) @@ -23,6 +25,8 @@ type AccessControl struct { // Extension Related Permissions ShareFiles bool // Allows a handle to share files to a conversation RenderImages bool // Indicates that certain filetypes should be autodownloaded and rendered when shared by this contact + + ManageGroup bool // Allows a handle to actively manage the group } // DefaultP2PAccessControl defaults to a semi-trusted peer with no access to special extensions. @@ -85,6 +89,22 @@ func (ci *Conversation) GetAttribute(scope attr.Scope, zone attr.Zone, key strin return "", false } +// GetPeerAC returns a suitable Access Control object for a the given peer conversation +// If this is called for a group conversation, this method will error and return a safe default AC. +func (ci *Conversation) HasChannel(requestedChannel int) bool { + if requestedChannel == 0 { + return true + } + if requestedChannel == 1 { + return false // channel 1 is mapped to channel 0 for backwards compatibility + } + key := fmt.Sprintf("channel.%d", requestedChannel) + if value, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)).ToString()]; exists { + return value == constants.True + } + return false +} + // GetPeerAC returns a suitable Access Control object for a the given peer conversation // If this is called for a group conversation, this method will error and return a safe default AC. func (ci *Conversation) GetPeerAC() AccessControl { @@ -111,6 +131,11 @@ func (ci *Conversation) IsServer() bool { return false } +// IsManaged is a helper attribute that identifies whether a conversation is managed +func (ci *Conversation) IsManaged() bool { + return strings.HasPrefix(ci.Handle, "managed") +} + // ServerSyncProgress is only valid during a server being in the AUTHENTICATED state and therefor in the syncing process // it returns a double (0-1) representing the estimated progress of the syncing func (ci *Conversation) ServerSyncProgress() float64 { diff --git a/model/overlay.go b/model/overlay.go index 4149798..1a90af4 100644 --- a/model/overlay.go +++ b/model/overlay.go @@ -6,6 +6,13 @@ type MessageWrapper struct { Data string `json:"d"` } +// Overlay Identifiers now have a dual strucutre, their full id defines the overlay +// type, and thus the encoding of the Data. +// But the last 8 bits of the overlay now also encode the **channel* that the overlay is sent +// to. This is needed for Hybrid Groups where there exists both a main channel and the +// management channel (whose messages we don't want to show up in a default fetch). +// To support backwards compatibility, any overlay id less than 0x300 to resolve to 0 + // OverlayChat is the canonical identifier for chat overlays const OverlayChat = 1 @@ -17,3 +24,6 @@ const OverlayInviteGroup = 101 // OverlayFileSharing is the canonical identifier for the file sharing overlay const OverlayFileSharing = 200 + +// ManageGroupEvent is the canonical identifier for the manage group overlay +const OverlayManageGroupEvent = 0x302 diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index d378dd4..a460ece 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -421,6 +421,36 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k } } +func (cp *cwtchPeer) SignMessage(blob []byte) ([]byte, error) { + privateKey, err := cp.storage.LoadProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey") + if err != nil { + log.Errorf("error loading private key from storage") + return nil, err + } + + publicKey, err := cp.storage.LoadProfileKeyValue(TypePublicKey, "Ed25519PublicKey") + if err != nil { + log.Errorf("error loading public key from storage") + return nil, err + } + identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey)) + return identity.Sign(blob), nil +} + +func (cp *cwtchPeer) InternalInsertMessage(conversation int, channel int, author string, body string, attrs model.Attributes, signature []byte) (int, error) { + signatureB64 := base64.StdEncoding.EncodeToString(signature) + return cp.storage.InsertMessage(conversation, channel, body, attrs, signatureB64, model.CalculateContentHash(author, body)) +} + +func (cp *cwtchPeer) resolveChannel(overlay int) int { + requestedChannel := overlay & 0xFF + // Legacy mapping of old chat/group overlays to channel 0 + if overlay < 0x300 { + requestedChannel = 0 + } + return requestedChannel +} + // SendMessage is a higher level that merges sending messages to contacts and group handles // If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then // this function will error @@ -436,8 +466,22 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error) ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message}) onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString()) + var cm model.MessageWrapper + err = json.Unmarshal([]byte(message), &cm) + + // we are now explictly rejecting invalidly encoded messages... + if err != nil { + return -1, err + } + requestedChannel := cp.resolveChannel(cm.Overlay) + // if this channel has not been registered for a particular conversation + // then default to channel = 0; + if conversationInfo.HasChannel(requestedChannel) { + requestedChannel = 0 + } + // For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks - id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message)) + id, err := cp.storage.InsertMessage(conversationInfo.ID, requestedChannel, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message)) if err != nil { return -1, err } @@ -700,6 +744,14 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) { return groupConversationID, err } +// NewConversation create a new multi-peer conversation. +func (cp *cwtchPeer) NewConversation(handle string, acl model.AccessControlList) (int, error) { + conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, acl, true) + cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne) + cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano)) + return conversationID, err +} + // NewContactConversation create a new p2p conversation with the given acl applied to the handle. func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) { cp.mutex.Lock() @@ -717,6 +769,8 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr // UpdateConversationAccessControlList is a genric ACL update method func (cp *cwtchPeer) UpdateConversationAccessControlList(id int, acl model.AccessControlList) error { + // set the ACL version to the most recent version + cp.SetConversationAttribute(id, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne) return cp.storage.SetConversationACL(id, acl) } @@ -880,6 +934,26 @@ func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath) return val, nil } +// SetConversationAttributeInt sets the conversation attribute at path to an integer value +func (cp *cwtchPeer) SetConversationAttributeInt(id int, path attr.ScopedZonedPath, value int) error { + strvalue := strconv.Itoa(value) + return cp.storage.SetConversationAttribute(id, path, strvalue) +} + +// GetConversationAttributeInt is a method for retrieving an integer value of a given conversation +func (cp *cwtchPeer) GetConversationAttributeInt(id int, path attr.ScopedZonedPath) (int, error) { + ci, err := cp.storage.GetConversation(id) + if err != nil { + return 0, err + } + val, exists := ci.Attributes[path.ToString()] + if !exists { + return 0, fmt.Errorf("%v does not exist for conversation %v", path.ToString(), id) + } + intvalue, err := strconv.Atoi(val) + return intvalue, err +} + // GetChannelMessage returns a message from a conversation channel referenced by the absolute ID. // Note: This should note be used to index a list as the ID is not expected to be tied to absolute position // in the table (e.g. deleted messages, expired messages, etc.) @@ -1414,6 +1488,8 @@ func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) { if conversation.model.IsServer() && cp.IsFeatureEnabled(constants.GroupsExperiment) { log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle) cp.QueueJoinServer(conversation.model.Handle) + } else if conversation.model.IsManaged() { + // do nothing... } else { log.Debugf(" QueuePeerWithOnion(%v)", conversation.model.Handle) if conversation.model.GetPeerAC().AutoConnect { @@ -1482,9 +1558,23 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time) } } + var cm model.MessageWrapper + err = json.Unmarshal([]byte(message), &cm) + + // we are now explictly rejecting invalidly encoded messages... + if err != nil { + return -1, err + } + + requestedChannel := cp.resolveChannel(cm.Overlay) + // if this channel has not been registered for a particular conversation + // then default to channel = 0; + if ci.HasChannel(requestedChannel) { + requestedChannel = 0 + } // Generate a random number and use it as the signature signature := event.GetRandNumber().String() - return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message)) + return cp.storage.InsertMessage(ci.ID, requestedChannel, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message)) } // eventHandler process events from other subsystems @@ -1499,6 +1589,7 @@ func (cp *cwtchPeer) eventHandler() { onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString()) log.Infof("Protocol engine for %s has stopped listening: %v", onion, ev.Data[event.Error]) cp.mutex.Unlock() + cp.processExtensionsEvent(ev) case event.EncryptedGroupMessage: // If successful, a side effect is the message is added to the group's timeline @@ -1543,6 +1634,7 @@ func (cp *cwtchPeer) eventHandler() { case event.NewMessageFromPeerEngine: //event.TimestampReceived, event.RemotePeer, event.Data ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) id, err := cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) + cp.processExtensionsEvent(ev) if err == nil { // Republish as NewMessageFromPeer ev.EventType = event.NewMessageFromPeer @@ -1671,22 +1763,7 @@ func (cp *cwtchPeer) eventHandler() { cp.mutex.Unlock() } } - - // Safe Access to Extensions - cp.extensionLock.Lock() - for _, extension := range cp.extensions { - log.Debugf("checking extension...%v", extension) - // check if the current map of experiments satisfies the extension requirements - if !cp.checkExtensionExperiment(extension) { - log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension) - continue - } - if cp.checkEventExperiment(extension, ev.EventType) { - extension.extension.OnEvent(ev, cp) - } - } - cp.extensionLock.Unlock() - + cp.processExtensionsEvent(ev) case event.ServerStateChange: cp.mutex.Lock() prevState := cp.state[ev.Data[event.GroupServer]] @@ -1750,7 +1827,7 @@ func (cp *cwtchPeer) eventHandler() { // check if the current map of experiments satisfies the extension requirements if !cp.checkExtensionExperiment(extension) { - log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension) + log.Debugf("skipping extension (%v) ..not all experiments satisfied", extension) if cp.checkEventExperiment(extension, ev.EventType) { // If this experiment was enabled...we might have processed this event... // To avoid flagging an error later on in this method we set processed to true. @@ -1796,6 +1873,24 @@ func (cp *cwtchPeer) checkExtensionExperiment(hook ProfileHook) bool { return true } +func (cp *cwtchPeer) processExtensionsEvent(ev event.Event) { + cp.extensionLock.Lock() + for _, extension := range cp.extensions { + + // check if the current map of experiments satisfies the extension requirements + if !cp.checkExtensionExperiment(extension) { + log.Debugf("skipping extension (%v) ..not all experiments satisfied", extension) + continue + } + + // if the extension is registered for this event type then process + if _, contains := extension.events[ev.EventType]; contains { + extension.extension.OnEvent(ev, cp) + } + } + cp.extensionLock.Unlock() +} + // attemptInsertOrAcknowledgeLegacyGroupConversation is a convenience method that looks up the conversation // by the given handle and attempts to mark the message as acknowledged. returns error on failure // to either find the contact or the associated message diff --git a/peer/profile_interface.go b/peer/profile_interface.go index 9675d5a..8c3c14b 100644 --- a/peer/profile_interface.go +++ b/peer/profile_interface.go @@ -114,7 +114,13 @@ type CwtchPeer interface { ImportBundle(string) error EnhancedImportBundle(string) string + SignMessage(blob []byte) ([]byte, error) + + // Used for Internal Bookkeeping by Extensions, **do not expose in autobindings** + InternalInsertMessage(conversation int, channel int, author string, body string, attributes model.Attributes, signature []byte) (int, error) + // New Unified Conversation Interfaces + NewConversation(handle string, acl model.AccessControlList) (int, error) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) FetchConversations() ([]*model.Conversation, error) ArchiveConversation(conversation int) @@ -135,12 +141,15 @@ type CwtchPeer interface { SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error) + SetConversationAttributeInt(conversation int, path attr.ScopedZonedPath, value int) error + GetConversationAttributeInt(conversation int, path attr.ScopedZonedPath) (int, error) DeleteConversation(conversation int) error // New Unified Conversation Channel Interfaces GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error) GetChannelMessageCount(conversation int, channel int) (int, error) GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error) + GetChannelMessageBySignature(conversationID int, channelID int, signature string) (int, error) GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error SearchConversations(pattern string) string diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 7b59689..1a7b138 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -34,26 +34,6 @@ var ( carolLines = []string{"Howdy, thanks!"} ) -func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) { - peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name) - for { - log.Infof("%v checking connection...\n", peerName) - state := peer.GetPeerState(addr) - log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state]) - if state == connections.FAILED { - t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr) - } - if state != target { - log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state]) - time.Sleep(time.Second * 5) - continue - } else { - log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr) - break - } - } -} - func waitForRetVal(peer peer.CwtchPeer, convId int, szp attr.ScopedZonedPath) { for { _, err := peer.GetConversationAttribute(convId, szp) @@ -236,10 +216,10 @@ func TestCwtchPeerIntegration(t *testing.T) { t.Fatalf("Alice password did not change...") } - waitForConnection(t, alice, bob.GetOnion(), connections.AUTHENTICATED) - waitForConnection(t, alice, carol.GetOnion(), connections.AUTHENTICATED) - waitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED) - waitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED) + WaitForConnection(t, alice, bob.GetOnion(), connections.AUTHENTICATED) + WaitForConnection(t, alice, carol.GetOnion(), connections.AUTHENTICATED) + WaitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED) + WaitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED) log.Infof("Alice and Bob getVal public.name...") @@ -323,9 +303,9 @@ func TestCwtchPeerIntegration(t *testing.T) { } log.Infof("Waiting for alice to join server...") - waitForConnection(t, alice, ServerAddr, connections.SYNCED) + WaitForConnection(t, alice, ServerAddr, connections.SYNCED) log.Infof("Waiting for Bob to join connect to group server...") - waitForConnection(t, bob, ServerAddr, connections.SYNCED) + WaitForConnection(t, bob, ServerAddr, connections.SYNCED) // 1 = Alice // 2 = Server @@ -351,7 +331,7 @@ func TestCwtchPeerIntegration(t *testing.T) { if len(cachedTokens) > (usedTokens + len(carolLines)) { carol.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)]) } - waitForConnection(t, carol, ServerAddr, connections.SYNCED) + WaitForConnection(t, carol, ServerAddr, connections.SYNCED) numGoRoutinesPostCarolConnect := runtime.NumGoroutine() // Check Alice Timeline diff --git a/testing/hybrid_group_integration_test.go b/testing/hybrid_group_integration_test.go new file mode 100644 index 0000000..d93c895 --- /dev/null +++ b/testing/hybrid_group_integration_test.go @@ -0,0 +1,217 @@ +package testing + +import ( + "crypto/rand" + app2 "cwtch.im/cwtch/app" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/functionality/hybrid" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" + "encoding/base64" + "fmt" + "git.openprivacy.ca/openprivacy/connectivity/tor" + "git.openprivacy.ca/openprivacy/log" + _ "github.com/mutecomm/go-sqlcipher/v4" + mrand "math/rand" + "os" + "path" + "path/filepath" + "runtime" + "runtime/pprof" + "testing" + "time" +) + +func TestHyrbidGroupIntegration(t *testing.T) { + + os.RemoveAll("./storage") + os.RemoveAll("./managerstorage") + + // Goroutine Monitoring Start.. + numGoRoutinesStart := runtime.NumGoroutine() + + log.AddEverythingFromPattern("connectivity") + log.SetLevel(log.LevelInfo) + log.ExcludeFromPattern("connection/connection") + log.ExcludeFromPattern("outbound/3dhauthchannel") + log.ExcludeFromPattern("event/eventmanager") + log.ExcludeFromPattern("tapir") + + os.Mkdir("tordir", 0700) + dataDir := path.Join("tordir", "tor") + os.MkdirAll(dataDir, 0700) + + // we don't need real randomness for the port, just to avoid a possible conflict... + socksPort := mrand.Intn(1000) + 9051 + controlPort := mrand.Intn(1000) + 9052 + + // generate a random password + key := make([]byte, 64) + _, err := rand.Read(key) + if err != nil { + panic(err) + } + + useCache := os.Getenv("TORCACHE") == "true" + + torDataDir := "" + if useCache { + log.Infof("using tor cache") + torDataDir = filepath.Join(dataDir, "data-dir-torcache") + os.MkdirAll(torDataDir, 0700) + } else { + log.Infof("using clean tor data dir") + if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil { + t.Fatalf("could not create data dir") + } + } + + tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc") + acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) + if err != nil { + t.Fatalf("Could not start Tor: %v", err) + } + log.Infof("Waiting for tor to bootstrap...") + acn.WaitTillBootstrapped() + defer acn.Close() + + // ***** Cwtch Server management ***** + + app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage")) + managerApp := app2.NewApp(acn, "./managerstorage", app2.LoadAppSettings("./managerstorage")) + + // ***** cwtchPeer setup ***** + // Turn on Groups Experiment... + settings := app.ReadSettings() + settings.ExperimentsEnabled = true + settings.Experiments[constants.GroupsExperiment] = true + settings.Experiments[constants.GroupManagerExperiment] = false + app.UpdateSettings(settings) + + // Create a Manager App that has the Group Manager Experiment Enabled.... + managerSettings := managerApp.ReadSettings() + managerSettings.ExperimentsEnabled = true + managerSettings.Experiments[constants.GroupsExperiment] = true + managerSettings.Experiments[constants.GroupManagerExperiment] = true + managerApp.UpdateSettings(managerSettings) + + alice := MakeProfile(app, "Alice") + bob := MakeProfile(app, "Bob") + manager := MakeProfile(managerApp, "Manager") + + waitTime := time.Duration(60) * time.Second + log.Infof("** Waiting for Alice, Bob, and Carol to register their onion hidden service on the network... (%v)\n", waitTime) + time.Sleep(waitTime) + log.Infof("** Wait Done!") + + // Ok Lets Start By Creating a Hybrid Group... + + hgmf := hybrid.GroupManagerFunctionality{} + ci, err := hgmf.ManageNewGroup(manager) + if err != nil { + t.Fatalf("could not create hybrid group: %v", err) + } + log.Infof("created a hybrid group: %d. moving onto adding hybrid contacts...", ci) + err = hgmf.AddHybridContact(manager, alice.GetOnion()) + if err != nil { + t.Fatalf("could not create hybrid contact (alice): %v", err) + } + err = hgmf.AddHybridContact(manager, bob.GetOnion()) + if err != nil { + t.Fatalf("could not create hybrid contact (bob): %v", err) + } + + // Now we can allow alice, bob and carol to create a new hybrid group... + log.Infof("now we can allow alice bob and carol to join the hybrid group") + mgf := hybrid.ManagedGroupFunctionality{} + err = mgf.NewManagedGroup(alice, manager.GetOnion()) + if err != nil { + t.Fatalf("could not create hybrid group contact (carol): %v", err) + } + alice.PeerWithOnion(manager.GetOnion()) // explictly trigger a peer request + err = mgf.NewManagedGroup(bob, manager.GetOnion()) + if err != nil { + t.Fatalf("could not create hybrid group contact (carol): %v", err) + } + bob.PeerWithOnion(manager.GetOnion()) + + log.Infof("waiting for alice and manager to connect") + WaitForConnection(t, alice, manager.GetOnion(), connections.AUTHENTICATED) + log.Infof("waiting for bob and manager to connect") + WaitForConnection(t, bob, manager.GetOnion(), connections.AUTHENTICATED) + + // at this pont we should be able to send messages to the group, and receive them in the timeline + log.Infof("sending message to group") + _, err = mgf.SendMessageToManagedGroup(alice, 1, "hello everyone!!!") + if err != nil { + t.Fatalf("hybrid group sending failed... %v", err) + } + + time.Sleep(time.Second * 10) + + bobMessages, err := bob.GetMostRecentMessages(1, 0, 0, 1) + if err != nil || len(bobMessages) != 1 { + t.Fatalf("hybrid group receipt failed... %v", err) + } + + if bobMessages[0].Body != "hello everyone!!!" { + t.Fatalf("hybrid group receipt failed...message does not match") + } + + aliceMessages, err := alice.GetMostRecentMessages(1, 0, 0, 1) + if err != nil || len(aliceMessages) != 1 { + t.Fatalf("hybrid group receipt failed... %v", err) + } + + if aliceMessages[0].Attr[constants.AttrAck] != constants.True { + t.Fatalf("hybrid group receipt failed...alice's message was not ack'd") + } + + // Time to Clean Up.... + log.Infof("Shutting down Alice...") + app.ShutdownPeer(alice.GetOnion()) + time.Sleep(time.Second * 3) + + log.Infof("Shutting down Bob...") + app.ShutdownPeer(bob.GetOnion()) + time.Sleep(time.Second * 3) + + log.Infof("Shutting fown Manager...") + managerApp.ShutdownPeer(manager.GetOnion()) + time.Sleep(time.Second * 3) + + log.Infof("Shutting down apps...") + log.Infof("app Shutdown: %v\n", runtime.NumGoroutine()) + app.Shutdown() + managerApp.Shutdown() + + time.Sleep(2 * time.Second) + + log.Infof("Done shutdown: %v\n", runtime.NumGoroutine()) + + log.Infof("Shutting down ACN...") + acn.Close() + time.Sleep(time.Second * 60) // the network status / heartbeat plugin might keep goroutines alive for a minute before killing them + + numGoRoutinesPostAppShutdown := runtime.NumGoroutine() + + // Printing out the current goroutines + // Very useful if we are leaking any. + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + fmt.Println("") + + if numGoRoutinesStart != numGoRoutinesPostAppShutdown { + t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, v detected!", numGoRoutinesStart, numGoRoutinesPostAppShutdown) + } +} + +func MakeProfile(application app2.Application, name string) peer.CwtchPeer { + application.CreateProfile(name, "asdfasdf", true) + p := app2.WaitGetPeer(application, name) + application.ConfigureConnections(p.GetOnion(), true, true, false) + log.Infof("%s created: %s", name, p.GetOnion()) + // bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity + p.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) + return p +} diff --git a/testing/utils.go b/testing/utils.go new file mode 100644 index 0000000..5fde872 --- /dev/null +++ b/testing/utils.go @@ -0,0 +1,32 @@ +package testing + +import ( + "cwtch.im/cwtch/model/attr" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/protocol/connections" + "git.openprivacy.ca/openprivacy/log" + _ "github.com/mutecomm/go-sqlcipher/v4" + "testing" + "time" +) + +func WaitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) { + peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name) + for { + log.Infof("%v checking connection...\n", peerName) + state := peer.GetPeerState(addr) + log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state]) + if state == connections.FAILED { + t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr) + } + if state != target { + log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state]) + time.Sleep(time.Second * 5) + continue + } else { + log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr) + break + } + } +}