WIP: Hybrid Groups Initial Sketch
continuous-integration/drone/pr Build is pending Details

This commit is contained in:
Sarah Jamie Lewis 2024-01-26 13:33:30 -08:00
parent b0a87ee8d0
commit 25e86426b2
16 changed files with 971 additions and 48 deletions

3
.gitignore vendored
View File

@ -33,4 +33,5 @@ data-dir-cwtchtool/
tokens
tordir/
testing/autodownload/download_dir
testing/autodownload/storage
testing/autodownload/storage
testing/managerstorage

View File

@ -5,6 +5,7 @@ import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/extensions"
"cwtch.im/cwtch/functionality/filesharing"
"cwtch.im/cwtch/functionality/hybrid"
"cwtch.im/cwtch/functionality/servers"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
@ -366,6 +367,8 @@ func (app *application) registerHooks(profile peer.CwtchPeer) {
profile.RegisterHook(new(filesharing.Functionality))
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
profile.RegisterHook(new(servers.Functionality))
profile.RegisterHook(new(hybrid.ManagedGroupFunctionality))
profile.RegisterHook(new(hybrid.GroupManagerFunctionality)) // will only be activated if GroupManagerExperiment is enabled...
// Ensure that Profiles have the Most Up to Date Settings...
profile.NotifySettingsUpdate(app.settings.ReadGlobalSettings())
}
@ -404,7 +407,7 @@ func (app *application) ActivatePeerEngine(onion string) {
if err == nil {
log.Debugf("restartFlow: Creating a New Protocol Engine...")
app.engines[profile.GetOnion()] = engine
eventBus.Publish(event.NewEventList(event.ProtocolEngineCreated))
eventBus.Publish(event.NewEventList(event.ProtocolEngineCreated, event.Handle, profile.GetOnion()))
app.QueryACNStatus()
} else {
log.Errorf("corrupted profile detected for %v", onion)
@ -515,6 +518,7 @@ func (app *application) eventHandler() {
log.Infof("peer appearing offline, not launching listen threads or connecting jobs")
app.ConfigureConnections(onion, false, false, false)
} else {
log.Infof("configuring online connections for peer")
app.ConfigureConnections(onion, true, true, true)
}
}

View File

@ -95,6 +95,11 @@ func (em *manager) initialize() {
func (em *manager) Subscribe(eventType Type, queue Queue) {
em.mapMutex.Lock()
defer em.mapMutex.Unlock()
for _, sub := range em.subscribers[eventType] {
if sub == queue {
return // don't add the same queue for the same event twice...
}
}
em.subscribers[eventType] = append(em.subscribers[eventType], queue)
}

View File

@ -54,6 +54,7 @@ func (q *infiniteQueue) resize() {
// Add puts an element on the end of the queue.
func (q *infiniteQueue) Add(elem Event) {
if q.count == len(q.buf) {
q.resize()
}

View File

@ -0,0 +1,80 @@
package hybrid
import (
"crypto/ed25519"
"encoding/base32"
"encoding/json"
"strings"
)
type GroupEventType int
const (
MemberGroupIDKey = "member_group_id_key"
MemberMessageIDKey = "member_group_messge_id"
)
const (
AddMember = GroupEventType(0x1000)
RemoveMember = GroupEventType(0x2000)
RotateKey = GroupEventType(0x3000)
NewMessage = GroupEventType(0x4000)
NewClearMessage = GroupEventType(0x5000)
SyncRequest = GroupEventType(0x6000)
)
type ManageGroupEvent struct {
EventType GroupEventType `json:"t"`
Data string `json:"d"` // json encoded data
}
type AddMemberEvent struct {
Handle string `json:"h"`
}
type RemoveMemberEvent struct {
Handle string `json:"h"`
}
type RotateKeyEvent struct {
Key []byte `json:"k"`
}
type NewMessageEvent struct {
EncryptedHybridGroupMessage []byte `json:"m"`
}
type NewClearMessageEvent struct {
HybridGroupMessage HybridGroupMessage `json:"m"`
}
type SyncRequestMessage struct {
// a map of MemberGroupID: MemberMessageID
LastSeen map[int]int `json:"l"`
}
// This file contains code for the Hybrid Group / Managed Group types..
type HybridGroupMessage struct {
Author string // the authors cwtch address
MemberGroupID uint32
MemberMessageID uint32
MessageBody string
Sent uint64 // milliseconds since epoch
Signature []byte // of json-encoded content (including empty sig)
}
// AuthenticateMessage returns true if the Author of the message produced the Signature over the message
func AuthenticateMessage(message HybridGroupMessage) bool {
messageCopy := message
messageCopy.Signature = []byte{}
// Otherwise we derive the public key from the sender and check it against that.
decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(message.Author))
if err == nil {
data, err := json.Marshal(messageCopy)
if err == nil && len(decodedPub) >= 32 {
return ed25519.Verify(decodedPub[:32], data, message.Signature)
}
}
return false
}

View File

@ -0,0 +1,312 @@
package hybrid
import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/settings"
"encoding/base64"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"golang.org/x/crypto/nacl/secretbox"
"math"
"math/big"
"strconv"
"time"
)
type ManagedGroupFunctionality struct {
}
func (f *ManagedGroupFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
}
func (f *ManagedGroupFunctionality) EventsToRegister() []event.Type {
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeerEngine}
}
func (f *ManagedGroupFunctionality) ExperimentsToRegister() []string {
return []string{constants.GroupsExperiment}
}
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
func (f *ManagedGroupFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
// This is where most of the magic happens for managed groups. A few notes:
// - CwtchPeer has already taken care of storing this for us, we don't need to worry about that
// - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected.
case event.NewMessageFromPeerEngine:
handle := ev.Data[event.RemotePeer]
ci, err := profile.FetchConversationInfo(handle)
if err != nil {
break // we don't care about unknown conversations...
}
if ci.ACL[handle].ManageGroup {
var cm model.MessageWrapper
err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
if err != nil {
break
}
// The overlay type of this message **must** be ManageGroupEvent
if cm.Overlay == model.OverlayManageGroupEvent {
var mge ManageGroupEvent
err = json.Unmarshal([]byte(cm.Data), &mge)
if err == nil {
cid, err := profile.FetchConversationInfo(handle)
if err == nil {
f.handleEvent(profile, *cid, mge)
}
}
}
}
}
}
// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *ManagedGroupFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent) {
switch mge.EventType {
case AddMember:
var ame AddMemberEvent
err := json.Unmarshal([]byte(mge.Data), &ame)
if err == nil {
f.handleAddMemberEvent(profile, conversation, ame)
}
case RemoveMember:
var rme RemoveMemberEvent
err := json.Unmarshal([]byte(mge.Data), &rme)
if err == nil {
f.handleRemoveMemberEvent(profile, conversation, rme)
}
case NewMessage:
var nme NewMessageEvent
err := json.Unmarshal([]byte(mge.Data), &nme)
if err == nil {
f.handleNewMessageEvent(profile, conversation, nme)
}
case NewClearMessage:
var nme NewClearMessageEvent
err := json.Unmarshal([]byte(mge.Data), &nme)
if err == nil {
f.handleNewClearMessageEvent(profile, conversation, nme)
}
case RotateKey:
var rke RotateKeyEvent
err := json.Unmarshal([]byte(mge.Data), &rke)
if err == nil {
f.handleRotateKeyEvent(profile, conversation, rke)
}
}
}
// handleAddMemberEvent adds a group member to the conversation ACL
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *ManagedGroupFunctionality) handleAddMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, ame AddMemberEvent) {
acl := conversation.ACL
acl[ame.Handle] = model.DefaultP2PAccessControl()
profile.UpdateConversationAccessControlList(conversation.ID, acl)
}
// handleRemoveMemberEvent removes a group member from the conversation ACL
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *ManagedGroupFunctionality) handleRemoveMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, rme RemoveMemberEvent) {
acl := conversation.ACL
delete(acl, rme.Handle)
profile.UpdateConversationAccessControlList(conversation.ID, acl)
}
// handleRotateKeyEvent rotates the encryption key for a given group
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *ManagedGroupFunctionality) handleRotateKeyEvent(profile peer.CwtchPeer, conversation model.Conversation, rke RotateKeyEvent) {
keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key"))
keyB64 := base64.StdEncoding.EncodeToString(rke.Key)
profile.SetConversationAttribute(conversation.ID, keyScope, keyB64)
}
func (f *ManagedGroupFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewMessageEvent) {
keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key"))
if keyB64, err := profile.GetConversationAttribute(conversation.ID, keyScope); err == nil {
key, err := base64.StdEncoding.DecodeString(keyB64)
if err != nil || len(key) != 32 {
log.Errorf("hybrid group key is corrupted")
return
}
// decrypt the message with key...
hgm, err := f.decryptMessage(key, nme.EncryptedHybridGroupMessage)
if hgm == nil || err != nil {
log.Errorf("unable to decrypt hybrid group message: %v", err)
return
}
//
}
}
func (f *ManagedGroupFunctionality) handleNewClearMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent) {
hgm := nme.HybridGroupMessage
if AuthenticateMessage(hgm) {
// TODO Closed Group Membership Check
if profile.GetOnion() == hgm.Author {
// ack
signatureB64 := base64.StdEncoding.EncodeToString(hgm.Signature)
id, err := profile.GetChannelMessageBySignature(conversation.ID, 0, signatureB64)
if err == nil {
profile.UpdateMessageAttribute(conversation.ID, 0, id, constants.AttrAck, constants.True)
profile.PublishEvent(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.Index: strconv.Itoa(id)}))
}
} else {
mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on...
newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID))
// Set the attributes of this message...
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr,
constants.AttrAuthor: hgm.Author,
constants.AttrAck: event.True,
constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)}
profile.InternalInsertMessage(conversation.ID, 0, hgm.Author, hgm.MessageBody, attr, hgm.Signature)
}
// TODO need to send an event here...
} else {
log.Errorf("received fraudulant hybrid message fom group")
}
}
func (f *ManagedGroupFunctionality) decryptMessage(key []byte, ciphertext []byte) (*HybridGroupMessage, error) {
if len(ciphertext) > 24 {
var decryptNonce [24]byte
copy(decryptNonce[:], ciphertext[:24])
var fixedSizeKey [32]byte
copy(fixedSizeKey[:], key[:32])
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &fixedSizeKey)
if ok {
var hgm HybridGroupMessage
err := json.Unmarshal(decrypted, &hgm)
return &hgm, err
}
}
return nil, fmt.Errorf("invalid ciphertext/key error")
}
// Define a new managed group, managed by the manager...
func (f *ManagedGroupFunctionality) NewManagedGroup(profile peer.CwtchPeer, manager string) error {
// generate a truely random member id for this group in [0..2^32)
nBig, err := rand.Int(rand.Reader, big.NewInt(math.MaxUint32))
if err != nil {
return err // if there is a problem with random we want to exit now rather than have to clean up group setup...
}
ac := model.DefaultP2PAccessControl()
ac.ManageGroup = true // by setting the ManageGroup permission in this ACL we are allowing the manager to control of how this group is structured
ci, err := profile.NewContactConversation(manager, ac, true)
if err != nil {
return err
}
// enable channel 2 on this conversation (hybrid groups management channel)
key := fmt.Sprintf("channel.%d", 2)
err = profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True)
if err != nil {
return fmt.Errorf("could not enable channel 2 on hybrid group: %v", err) // likely a catestrophic error...fail
}
// finally, set the member group id on this group...
mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey))
err = profile.SetConversationAttributeInt(ci, mgidkey, int(nBig.Uint64()))
if err != nil {
return fmt.Errorf("could not set group id on hybrid group: %v", err) // likely a catestrophic error...fail
}
return nil
}
// SendMessageToManagedGroup acts like SendMessage(ToPeer), but with a few additional bookkeeping steps for Hybrid Groups
func (f *ManagedGroupFunctionality) SendMessageToManagedGroup(profile peer.CwtchPeer, conversation int, message string) (int, error) {
mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey))
mgid, err := profile.GetConversationAttributeInt(conversation, mgidkey)
if err != nil {
return -1, err
}
mmidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberMessageIDKey))
mmid, err := profile.GetConversationAttributeInt(conversation, mmidkey)
if err != nil {
mmid = 0 // first message
}
mmid += 1
hgm := HybridGroupMessage{
MemberGroupID: uint32(mgid),
MemberMessageID: uint32(mmid),
Sent: uint64(time.Now().UnixMilli()),
Author: profile.GetOnion(),
MessageBody: message,
Signature: []byte{}, // Leave blank so we can sign this message...
}
data, err := json.Marshal(hgm)
if err != nil {
return -1, err
}
sig, err := profile.SignMessage(data)
if err != nil {
return -1, err
}
hgm.Signature = sig
ncm := NewClearMessageEvent{
HybridGroupMessage: hgm,
}
signedData, err := json.Marshal(ncm)
if err != nil {
return -1, err
}
mgm := ManageGroupEvent{
EventType: NewClearMessage,
Data: string(signedData),
}
odata, err := json.Marshal(mgm)
if err != nil {
return -1, err
}
overlay := model.MessageWrapper{
Overlay: model.OverlayManageGroupEvent,
Data: string(odata),
}
ojson, err := json.Marshal(overlay)
if err != nil {
return -1, err
}
// send the message to the manager and update our message is string for tracking...
_, err = profile.SendMessage(conversation, string(ojson))
if err != nil {
return -1, err
}
profile.SetConversationAttributeInt(conversation, mmidkey, mmid)
// ok there is still one more thing we need to do...
// insert this message as part of our group log, for members of the group
// this exists in channel 0 of the conversation with the group manager...
mgidstr := strconv.Itoa(mgid) // we need both MemberGroupId and MemberMessageId for attestation later on...
newmmidstr := strconv.Itoa(mmid)
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr, constants.AttrAuthor: profile.GetOnion(), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}
return profile.InternalInsertMessage(conversation, 0, hgm.Author, message, attr, hgm.Signature)
}
func (f *ManagedGroupFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
// nop hybrid group conversations do not exchange contact requests
}
func (f *ManagedGroupFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
// nop hybrid group conversations do not exchange contact requests
}

View File

@ -0,0 +1,144 @@
// This file contains all code related to how a Group Manager operates over a group.
package hybrid
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/settings"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"strconv"
"time"
)
type GroupManagerFunctionality struct {
}
func (f *GroupManagerFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
}
func (f *GroupManagerFunctionality) EventsToRegister() []event.Type {
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeerEngine}
}
func (f *GroupManagerFunctionality) ExperimentsToRegister() []string {
return []string{constants.GroupManagerExperiment, constants.GroupsExperiment}
}
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
func (f *GroupManagerFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
// This is where most of the magic happens for managed groups. A few notes:
// - CwtchPeer has already taken care of storing this for us, we don't need to worry about that
// - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected.
case event.NewMessageFromPeerEngine:
log.Infof("received new message from peer: manager")
ci, err := profile.GetConversationInfo(1)
if err != nil {
log.Errorf("unknown conversation %v", err)
break // we don't care about unknown conversations...
}
var cm model.MessageWrapper
err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
if err != nil {
log.Errorf("could not deserialize json %v", err)
break
}
// The overlay type of this message **must** be ManageGroupEvent
if cm.Overlay == model.OverlayManageGroupEvent {
var mge ManageGroupEvent
err = json.Unmarshal([]byte(cm.Data), &mge)
if err == nil {
f.handleEvent(profile, *ci, mge, ev.Data[event.Data])
}
}
}
}
// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *GroupManagerFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent, original string) {
switch mge.EventType {
case NewClearMessage:
var nme NewClearMessageEvent
err := json.Unmarshal([]byte(mge.Data), &nme)
if err == nil {
f.handleNewMessageEvent(profile, conversation, nme, original)
}
}
}
func (f *GroupManagerFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent, original string) {
log.Infof("handling new clear message event")
hgm := nme.HybridGroupMessage
if AuthenticateMessage(hgm) {
log.Infof("authenticated message")
// TODO Closed Group Membership Check
mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on...
newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID))
// Set the attributes of this message...
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr,
constants.AttrAuthor: hgm.Author,
constants.AttrAck: event.True,
constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)}
profile.InternalInsertMessage(conversation.ID, 0, hgm.Author, hgm.MessageBody, attr, hgm.Signature)
// forward the message to everyone who the server has added as a contact
// TODO: filter by conversation.ACL
allConversations, _ := profile.FetchConversations()
for _, ci := range allConversations {
log.Infof("forwarding group message to: %v", ci.Handle)
profile.SendMessage(ci.ID, original)
}
} else {
log.Errorf("received fraudulant hybrid message fom group")
}
}
// Establish a new Managed Group
func (f *GroupManagerFunctionality) ManageNewGroup(profile peer.CwtchPeer) (int, error) {
ac := model.DefaultP2PAccessControl()
// by setting the ManageGroup permission in this ACL we are allowing the manage to
// take control of how this group is structured, see OnEvent above...
ac.ManageGroup = true
// note: a manager can only manage one group. This will always be true and has a few benefits
// and downsides.
// The main downside is that it requires a new manager per group (and thus an onion service per group)
// However, it means that we can lean on p2p functionality like profile images / metadata / name
// etc. for group metadata and effectively get that for-free in the client.
handle := fmt.Sprintf("managed:%d", 000)
acl := model.AccessControlList{}
acl[profile.GetOnion()] = ac
ci, err := profile.NewConversation(handle, acl)
if err != nil {
return -1, err
}
return ci, nil
}
// AddHybridContact is a wrapper arround NewContactConversation which sets the contact
// up for Hybrid Group channel messages...
func (f *GroupManagerFunctionality) AddHybridContact(profile peer.CwtchPeer, handle string) error {
ac := model.DefaultP2PAccessControl()
ac.ManageGroup = false
ci, err := profile.NewContactConversation(handle, ac, true)
if err != nil {
return err
}
// enable channel 2 on this conversation (hybrid groups management channel)
key := fmt.Sprintf("channel.%d", 2)
profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True)
return nil
}
func (f *GroupManagerFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
// nop hybrid group conversations do not exchange contact requests
}
func (f *GroupManagerFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
// nop hybrid group conversations do not exchange contact requests
}

View File

@ -20,6 +20,9 @@ const (
// LegacyGroupZone for attributes related to legacy group experiment
LegacyGroupZone = Zone("legacygroup")
// ConversationZone for attributes related to structure of the conversation
ConversationZone = Zone("conversation")
// FilesharingZone for attributes related to file sharing
FilesharingZone = Zone("filesharing")
@ -65,6 +68,8 @@ func ParseZone(path string) (Zone, string) {
return ServerKeyZone, parts[1]
case ServerZone:
return ServerZone, parts[1]
case ConversationZone:
return ConversationZone, parts[1]
default:
return UnknownZone, parts[1]
}

View File

@ -2,6 +2,9 @@ package constants
const GroupsExperiment = "tapir-groups-experiment"
// for now only used by bots, do not expose in UI..for now
const GroupManagerExperiment = "manage-group-experiment"
// FileSharingExperiment Allows file sharing
const FileSharingExperiment = "filesharing"

View File

@ -4,7 +4,9 @@ import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"strings"
"time"
)
@ -23,6 +25,8 @@ type AccessControl struct {
// Extension Related Permissions
ShareFiles bool // Allows a handle to share files to a conversation
RenderImages bool // Indicates that certain filetypes should be autodownloaded and rendered when shared by this contact
ManageGroup bool // Allows a handle to actively manage the group
}
// DefaultP2PAccessControl defaults to a semi-trusted peer with no access to special extensions.
@ -85,6 +89,22 @@ func (ci *Conversation) GetAttribute(scope attr.Scope, zone attr.Zone, key strin
return "", false
}
// GetPeerAC returns a suitable Access Control object for a the given peer conversation
// If this is called for a group conversation, this method will error and return a safe default AC.
func (ci *Conversation) HasChannel(requestedChannel int) bool {
if requestedChannel == 0 {
return true
}
if requestedChannel == 1 {
return false // channel 1 is mapped to channel 0 for backwards compatibility
}
key := fmt.Sprintf("channel.%d", requestedChannel)
if value, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)).ToString()]; exists {
return value == constants.True
}
return false
}
// GetPeerAC returns a suitable Access Control object for a the given peer conversation
// If this is called for a group conversation, this method will error and return a safe default AC.
func (ci *Conversation) GetPeerAC() AccessControl {
@ -111,6 +131,11 @@ func (ci *Conversation) IsServer() bool {
return false
}
// IsManaged is a helper attribute that identifies whether a conversation is managed
func (ci *Conversation) IsManaged() bool {
return strings.HasPrefix(ci.Handle, "managed")
}
// ServerSyncProgress is only valid during a server being in the AUTHENTICATED state and therefor in the syncing process
// it returns a double (0-1) representing the estimated progress of the syncing
func (ci *Conversation) ServerSyncProgress() float64 {

View File

@ -6,6 +6,13 @@ type MessageWrapper struct {
Data string `json:"d"`
}
// Overlay Identifiers now have a dual strucutre, their full id defines the overlay
// type, and thus the encoding of the Data.
// But the last 8 bits of the overlay now also encode the **channel* that the overlay is sent
// to. This is needed for Hybrid Groups where there exists both a main channel and the
// management channel (whose messages we don't want to show up in a default fetch).
// To support backwards compatibility, any overlay id less than 0x300 to resolve to 0
// OverlayChat is the canonical identifier for chat overlays
const OverlayChat = 1
@ -17,3 +24,6 @@ const OverlayInviteGroup = 101
// OverlayFileSharing is the canonical identifier for the file sharing overlay
const OverlayFileSharing = 200
// ManageGroupEvent is the canonical identifier for the manage group overlay
const OverlayManageGroupEvent = 0x302

View File

@ -421,6 +421,36 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
}
}
func (cp *cwtchPeer) SignMessage(blob []byte) ([]byte, error) {
privateKey, err := cp.storage.LoadProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey")
if err != nil {
log.Errorf("error loading private key from storage")
return nil, err
}
publicKey, err := cp.storage.LoadProfileKeyValue(TypePublicKey, "Ed25519PublicKey")
if err != nil {
log.Errorf("error loading public key from storage")
return nil, err
}
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
return identity.Sign(blob), nil
}
func (cp *cwtchPeer) InternalInsertMessage(conversation int, channel int, author string, body string, attrs model.Attributes, signature []byte) (int, error) {
signatureB64 := base64.StdEncoding.EncodeToString(signature)
return cp.storage.InsertMessage(conversation, channel, body, attrs, signatureB64, model.CalculateContentHash(author, body))
}
func (cp *cwtchPeer) resolveChannel(overlay int) int {
requestedChannel := overlay & 0xFF
// Legacy mapping of old chat/group overlays to channel 0
if overlay < 0x300 {
requestedChannel = 0
}
return requestedChannel
}
// SendMessage is a higher level that merges sending messages to contacts and group handles
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
// this function will error
@ -436,8 +466,22 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message})
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
var cm model.MessageWrapper
err = json.Unmarshal([]byte(message), &cm)
// we are now explictly rejecting invalidly encoded messages...
if err != nil {
return -1, err
}
requestedChannel := cp.resolveChannel(cm.Overlay)
// if this channel has not been registered for a particular conversation
// then default to channel = 0;
if conversationInfo.HasChannel(requestedChannel) {
requestedChannel = 0
}
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
id, err := cp.storage.InsertMessage(conversationInfo.ID, requestedChannel, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return -1, err
}
@ -700,6 +744,14 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
return groupConversationID, err
}
// NewConversation create a new multi-peer conversation.
func (cp *cwtchPeer) NewConversation(handle string, acl model.AccessControlList) (int, error) {
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, acl, true)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano))
return conversationID, err
}
// NewContactConversation create a new p2p conversation with the given acl applied to the handle.
func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) {
cp.mutex.Lock()
@ -717,6 +769,8 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
// UpdateConversationAccessControlList is a genric ACL update method
func (cp *cwtchPeer) UpdateConversationAccessControlList(id int, acl model.AccessControlList) error {
// set the ACL version to the most recent version
cp.SetConversationAttribute(id, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne)
return cp.storage.SetConversationACL(id, acl)
}
@ -880,6 +934,26 @@ func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath)
return val, nil
}
// SetConversationAttributeInt sets the conversation attribute at path to an integer value
func (cp *cwtchPeer) SetConversationAttributeInt(id int, path attr.ScopedZonedPath, value int) error {
strvalue := strconv.Itoa(value)
return cp.storage.SetConversationAttribute(id, path, strvalue)
}
// GetConversationAttributeInt is a method for retrieving an integer value of a given conversation
func (cp *cwtchPeer) GetConversationAttributeInt(id int, path attr.ScopedZonedPath) (int, error) {
ci, err := cp.storage.GetConversation(id)
if err != nil {
return 0, err
}
val, exists := ci.Attributes[path.ToString()]
if !exists {
return 0, fmt.Errorf("%v does not exist for conversation %v", path.ToString(), id)
}
intvalue, err := strconv.Atoi(val)
return intvalue, err
}
// GetChannelMessage returns a message from a conversation channel referenced by the absolute ID.
// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position
// in the table (e.g. deleted messages, expired messages, etc.)
@ -1414,6 +1488,8 @@ func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) {
if conversation.model.IsServer() && cp.IsFeatureEnabled(constants.GroupsExperiment) {
log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle)
cp.QueueJoinServer(conversation.model.Handle)
} else if conversation.model.IsManaged() {
// do nothing...
} else {
log.Debugf(" QueuePeerWithOnion(%v)", conversation.model.Handle)
if conversation.model.GetPeerAC().AutoConnect {
@ -1482,9 +1558,23 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
}
}
var cm model.MessageWrapper
err = json.Unmarshal([]byte(message), &cm)
// we are now explictly rejecting invalidly encoded messages...
if err != nil {
return -1, err
}
requestedChannel := cp.resolveChannel(cm.Overlay)
// if this channel has not been registered for a particular conversation
// then default to channel = 0;
if ci.HasChannel(requestedChannel) {
requestedChannel = 0
}
// Generate a random number and use it as the signature
signature := event.GetRandNumber().String()
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
return cp.storage.InsertMessage(ci.ID, requestedChannel, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
}
// eventHandler process events from other subsystems
@ -1499,6 +1589,7 @@ func (cp *cwtchPeer) eventHandler() {
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
log.Infof("Protocol engine for %s has stopped listening: %v", onion, ev.Data[event.Error])
cp.mutex.Unlock()
cp.processExtensionsEvent(ev)
case event.EncryptedGroupMessage:
// If successful, a side effect is the message is added to the group's timeline
@ -1543,6 +1634,7 @@ func (cp *cwtchPeer) eventHandler() {
case event.NewMessageFromPeerEngine: //event.TimestampReceived, event.RemotePeer, event.Data
ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
id, err := cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
cp.processExtensionsEvent(ev)
if err == nil {
// Republish as NewMessageFromPeer
ev.EventType = event.NewMessageFromPeer
@ -1671,22 +1763,7 @@ func (cp *cwtchPeer) eventHandler() {
cp.mutex.Unlock()
}
}
// Safe Access to Extensions
cp.extensionLock.Lock()
for _, extension := range cp.extensions {
log.Debugf("checking extension...%v", extension)
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
continue
}
if cp.checkEventExperiment(extension, ev.EventType) {
extension.extension.OnEvent(ev, cp)
}
}
cp.extensionLock.Unlock()
cp.processExtensionsEvent(ev)
case event.ServerStateChange:
cp.mutex.Lock()
prevState := cp.state[ev.Data[event.GroupServer]]
@ -1750,7 +1827,7 @@ func (cp *cwtchPeer) eventHandler() {
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
log.Debugf("skipping extension (%v) ..not all experiments satisfied", extension)
if cp.checkEventExperiment(extension, ev.EventType) {
// If this experiment was enabled...we might have processed this event...
// To avoid flagging an error later on in this method we set processed to true.
@ -1796,6 +1873,24 @@ func (cp *cwtchPeer) checkExtensionExperiment(hook ProfileHook) bool {
return true
}
func (cp *cwtchPeer) processExtensionsEvent(ev event.Event) {
cp.extensionLock.Lock()
for _, extension := range cp.extensions {
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension (%v) ..not all experiments satisfied", extension)
continue
}
// if the extension is registered for this event type then process
if _, contains := extension.events[ev.EventType]; contains {
extension.extension.OnEvent(ev, cp)
}
}
cp.extensionLock.Unlock()
}
// attemptInsertOrAcknowledgeLegacyGroupConversation is a convenience method that looks up the conversation
// by the given handle and attempts to mark the message as acknowledged. returns error on failure
// to either find the contact or the associated message

View File

@ -114,7 +114,13 @@ type CwtchPeer interface {
ImportBundle(string) error
EnhancedImportBundle(string) string
SignMessage(blob []byte) ([]byte, error)
// Used for Internal Bookkeeping by Extensions, **do not expose in autobindings**
InternalInsertMessage(conversation int, channel int, author string, body string, attributes model.Attributes, signature []byte) (int, error)
// New Unified Conversation Interfaces
NewConversation(handle string, acl model.AccessControlList) (int, error)
NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error)
FetchConversations() ([]*model.Conversation, error)
ArchiveConversation(conversation int)
@ -135,12 +141,15 @@ type CwtchPeer interface {
SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
SetConversationAttributeInt(conversation int, path attr.ScopedZonedPath, value int) error
GetConversationAttributeInt(conversation int, path attr.ScopedZonedPath) (int, error)
DeleteConversation(conversation int) error
// New Unified Conversation Channel Interfaces
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
GetChannelMessageCount(conversation int, channel int) (int, error)
GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error)
GetChannelMessageBySignature(conversationID int, channelID int, signature string) (int, error)
GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error)
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
SearchConversations(pattern string) string

View File

@ -34,26 +34,6 @@ var (
carolLines = []string{"Howdy, thanks!"}
)
func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
for {
log.Infof("%v checking connection...\n", peerName)
state := peer.GetPeerState(addr)
log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state])
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
}
if state != target {
log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
break
}
}
}
func waitForRetVal(peer peer.CwtchPeer, convId int, szp attr.ScopedZonedPath) {
for {
_, err := peer.GetConversationAttribute(convId, szp)
@ -236,10 +216,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
t.Fatalf("Alice password did not change...")
}
waitForConnection(t, alice, bob.GetOnion(), connections.AUTHENTICATED)
waitForConnection(t, alice, carol.GetOnion(), connections.AUTHENTICATED)
waitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED)
waitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED)
WaitForConnection(t, alice, bob.GetOnion(), connections.AUTHENTICATED)
WaitForConnection(t, alice, carol.GetOnion(), connections.AUTHENTICATED)
WaitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED)
WaitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED)
log.Infof("Alice and Bob getVal public.name...")
@ -323,9 +303,9 @@ func TestCwtchPeerIntegration(t *testing.T) {
}
log.Infof("Waiting for alice to join server...")
waitForConnection(t, alice, ServerAddr, connections.SYNCED)
WaitForConnection(t, alice, ServerAddr, connections.SYNCED)
log.Infof("Waiting for Bob to join connect to group server...")
waitForConnection(t, bob, ServerAddr, connections.SYNCED)
WaitForConnection(t, bob, ServerAddr, connections.SYNCED)
// 1 = Alice
// 2 = Server
@ -351,7 +331,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
if len(cachedTokens) > (usedTokens + len(carolLines)) {
carol.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)])
}
waitForConnection(t, carol, ServerAddr, connections.SYNCED)
WaitForConnection(t, carol, ServerAddr, connections.SYNCED)
numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
// Check Alice Timeline

View File

@ -0,0 +1,217 @@
package testing
import (
"crypto/rand"
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/functionality/hybrid"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"encoding/base64"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4"
mrand "math/rand"
"os"
"path"
"path/filepath"
"runtime"
"runtime/pprof"
"testing"
"time"
)
func TestHyrbidGroupIntegration(t *testing.T) {
os.RemoveAll("./storage")
os.RemoveAll("./managerstorage")
// Goroutine Monitoring Start..
numGoRoutinesStart := runtime.NumGoroutine()
log.AddEverythingFromPattern("connectivity")
log.SetLevel(log.LevelInfo)
log.ExcludeFromPattern("connection/connection")
log.ExcludeFromPattern("outbound/3dhauthchannel")
log.ExcludeFromPattern("event/eventmanager")
log.ExcludeFromPattern("tapir")
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700)
// we don't need real randomness for the port, just to avoid a possible conflict...
socksPort := mrand.Intn(1000) + 9051
controlPort := mrand.Intn(1000) + 9052
// generate a random password
key := make([]byte, 64)
_, err := rand.Read(key)
if err != nil {
panic(err)
}
useCache := os.Getenv("TORCACHE") == "true"
torDataDir := ""
if useCache {
log.Infof("using tor cache")
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
os.MkdirAll(torDataDir, 0700)
} else {
log.Infof("using clean tor data dir")
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
t.Fatalf("could not create data dir")
}
}
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
if err != nil {
t.Fatalf("Could not start Tor: %v", err)
}
log.Infof("Waiting for tor to bootstrap...")
acn.WaitTillBootstrapped()
defer acn.Close()
// ***** Cwtch Server management *****
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"))
managerApp := app2.NewApp(acn, "./managerstorage", app2.LoadAppSettings("./managerstorage"))
// ***** cwtchPeer setup *****
// Turn on Groups Experiment...
settings := app.ReadSettings()
settings.ExperimentsEnabled = true
settings.Experiments[constants.GroupsExperiment] = true
settings.Experiments[constants.GroupManagerExperiment] = false
app.UpdateSettings(settings)
// Create a Manager App that has the Group Manager Experiment Enabled....
managerSettings := managerApp.ReadSettings()
managerSettings.ExperimentsEnabled = true
managerSettings.Experiments[constants.GroupsExperiment] = true
managerSettings.Experiments[constants.GroupManagerExperiment] = true
managerApp.UpdateSettings(managerSettings)
alice := MakeProfile(app, "Alice")
bob := MakeProfile(app, "Bob")
manager := MakeProfile(managerApp, "Manager")
waitTime := time.Duration(60) * time.Second
log.Infof("** Waiting for Alice, Bob, and Carol to register their onion hidden service on the network... (%v)\n", waitTime)
time.Sleep(waitTime)
log.Infof("** Wait Done!")
// Ok Lets Start By Creating a Hybrid Group...
hgmf := hybrid.GroupManagerFunctionality{}
ci, err := hgmf.ManageNewGroup(manager)
if err != nil {
t.Fatalf("could not create hybrid group: %v", err)
}
log.Infof("created a hybrid group: %d. moving onto adding hybrid contacts...", ci)
err = hgmf.AddHybridContact(manager, alice.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid contact (alice): %v", err)
}
err = hgmf.AddHybridContact(manager, bob.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid contact (bob): %v", err)
}
// Now we can allow alice, bob and carol to create a new hybrid group...
log.Infof("now we can allow alice bob and carol to join the hybrid group")
mgf := hybrid.ManagedGroupFunctionality{}
err = mgf.NewManagedGroup(alice, manager.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid group contact (carol): %v", err)
}
alice.PeerWithOnion(manager.GetOnion()) // explictly trigger a peer request
err = mgf.NewManagedGroup(bob, manager.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid group contact (carol): %v", err)
}
bob.PeerWithOnion(manager.GetOnion())
log.Infof("waiting for alice and manager to connect")
WaitForConnection(t, alice, manager.GetOnion(), connections.AUTHENTICATED)
log.Infof("waiting for bob and manager to connect")
WaitForConnection(t, bob, manager.GetOnion(), connections.AUTHENTICATED)
// at this pont we should be able to send messages to the group, and receive them in the timeline
log.Infof("sending message to group")
_, err = mgf.SendMessageToManagedGroup(alice, 1, "hello everyone!!!")
if err != nil {
t.Fatalf("hybrid group sending failed... %v", err)
}
time.Sleep(time.Second * 10)
bobMessages, err := bob.GetMostRecentMessages(1, 0, 0, 1)
if err != nil || len(bobMessages) != 1 {
t.Fatalf("hybrid group receipt failed... %v", err)
}
if bobMessages[0].Body != "hello everyone!!!" {
t.Fatalf("hybrid group receipt failed...message does not match")
}
aliceMessages, err := alice.GetMostRecentMessages(1, 0, 0, 1)
if err != nil || len(aliceMessages) != 1 {
t.Fatalf("hybrid group receipt failed... %v", err)
}
if aliceMessages[0].Attr[constants.AttrAck] != constants.True {
t.Fatalf("hybrid group receipt failed...alice's message was not ack'd")
}
// Time to Clean Up....
log.Infof("Shutting down Alice...")
app.ShutdownPeer(alice.GetOnion())
time.Sleep(time.Second * 3)
log.Infof("Shutting down Bob...")
app.ShutdownPeer(bob.GetOnion())
time.Sleep(time.Second * 3)
log.Infof("Shutting fown Manager...")
managerApp.ShutdownPeer(manager.GetOnion())
time.Sleep(time.Second * 3)
log.Infof("Shutting down apps...")
log.Infof("app Shutdown: %v\n", runtime.NumGoroutine())
app.Shutdown()
managerApp.Shutdown()
time.Sleep(2 * time.Second)
log.Infof("Done shutdown: %v\n", runtime.NumGoroutine())
log.Infof("Shutting down ACN...")
acn.Close()
time.Sleep(time.Second * 60) // the network status / heartbeat plugin might keep goroutines alive for a minute before killing them
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
// Printing out the current goroutines
// Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
fmt.Println("")
if numGoRoutinesStart != numGoRoutinesPostAppShutdown {
t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, v detected!", numGoRoutinesStart, numGoRoutinesPostAppShutdown)
}
}
func MakeProfile(application app2.Application, name string) peer.CwtchPeer {
application.CreateProfile(name, "asdfasdf", true)
p := app2.WaitGetPeer(application, name)
application.ConfigureConnections(p.GetOnion(), true, true, false)
log.Infof("%s created: %s", name, p.GetOnion())
// bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
p.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
return p
}

32
testing/utils.go Normal file
View File

@ -0,0 +1,32 @@
package testing
import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4"
"testing"
"time"
)
func WaitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
for {
log.Infof("%v checking connection...\n", peerName)
state := peer.GetPeerState(addr)
log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state])
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
}
if state != target {
log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
break
}
}
}