Managed Groups First Cut
continuous-integration/drone/pr Build is failing Details

This commit is contained in:
Sarah Jamie Lewis 2024-05-06 10:47:20 -07:00
parent a7b885166a
commit e8f11cc329
15 changed files with 1144 additions and 33 deletions

View File

@ -1,10 +1,16 @@
package app
import (
"os"
path "path/filepath"
"strconv"
"sync"
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/extensions"
"cwtch.im/cwtch/functionality/filesharing"
"cwtch.im/cwtch/functionality/hybrid"
"cwtch.im/cwtch/functionality/servers"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
@ -15,10 +21,6 @@ import (
"cwtch.im/cwtch/storage"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log"
"os"
path "path/filepath"
"strconv"
"sync"
)
type application struct {
@ -367,6 +369,8 @@ func (app *application) registerHooks(profile peer.CwtchPeer) {
profile.RegisterHook(new(filesharing.Functionality))
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
profile.RegisterHook(new(servers.Functionality))
profile.RegisterHook(new(hybrid.ManagedGroupFunctionality))
profile.RegisterHook(new(hybrid.GroupManagerFunctionality)) // will only be activated if GroupManagerExperiment is enabled...
// Ensure that Profiles have the Most Up to Date Settings...
profile.NotifySettingsUpdate(app.settings.ReadGlobalSettings())
}

View File

@ -1,7 +1,9 @@
package extensions
import (
"slices"
"strconv"
"time"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
@ -39,8 +41,29 @@ func (soe SendWhenOnlineExtension) OnEvent(ev event.Event, profile peer.CwtchPee
if err == nil {
// if we have re-authenticated with thie peer then request their profile image...
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
log.Infof("Sending Offline Messages to %s", ci.Handle)
// Check the last 100 messages, if any of them are pending, then send them now...
messsages, _ := profile.GetMostRecentMessages(ci.ID, 0, 0, uint(100))
slices.Reverse(messsages)
for _, message := range messsages {
if message.Attr[constants.AttrAck] == constants.False {
sent, timeparseerr := time.Parse(time.RFC3339, message.Attr[constants.AttrSentTimestamp])
if timeparseerr != nil {
continue
}
if time.Since(sent) > time.Hour*24*7 {
continue
}
body := message.Body
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Data: body})
ev.EventID = message.Signature // we need this ensure that we correctly ack this in the db when it comes back
// TODO: The EventBus is becoming very noisy...we may want to consider a one-way shortcut to Engine i.e. profile.Engine.SendMessageToPeer
log.Debugf("resending message that was sent when peer was offline")
profile.PublishEvent(ev)
}
}
messsages, _ = profile.GetMostRecentMessages(ci.ID, 2, 0, uint(100))
slices.Reverse(messsages)
for _, message := range messsages {
if message.Attr[constants.AttrAck] == constants.False {
body := message.Body

View File

@ -0,0 +1,106 @@
package hybrid
import (
"crypto/ed25519"
"encoding/base32"
"encoding/json"
"fmt"
"strings"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"git.openprivacy.ca/openprivacy/log"
)
const ManagedGroupOpen = "managed-group-open"
type GroupEventType int
const (
MemberGroupIDKey = "member_group_id_key"
MemberMessageIDKey = "member_group_messge_id"
)
const (
AddMember = GroupEventType(0x1000)
RemoveMember = GroupEventType(0x2000)
RotateKey = GroupEventType(0x3000)
NewMessage = GroupEventType(0x4000)
NewClearMessage = GroupEventType(0x5000)
SyncRequest = GroupEventType(0x6000)
)
type ManageGroupEvent struct {
EventType GroupEventType `json:"t"`
Data string `json:"d"` // json encoded data
}
type AddMemberEvent struct {
Handle string `json:"h"`
}
type RemoveMemberEvent struct {
Handle string `json:"h"`
}
type RotateKeyEvent struct {
Key []byte `json:"k"`
}
type NewMessageEvent struct {
EncryptedHybridGroupMessage []byte `json:"m"`
}
type NewClearMessageEvent struct {
HybridGroupMessage HybridGroupMessage `json:"m"`
}
type SyncRequestMessage struct {
// a map of MemberGroupID: MemberMessageID
LastSeen map[int]int `json:"l"`
}
// This file contains code for the Hybrid Group / Managed Group types..
type HybridGroupMessage struct {
Author string // the authors cwtch address
MemberGroupID uint32
MemberMessageID uint32
MessageBody string
Sent uint64 // milliseconds since epoch
Signature []byte // of json-encoded content (including empty sig)
}
// AuthenticateMessage returns true if the Author of the message produced the Signature over the message
func AuthenticateMessage(message HybridGroupMessage) bool {
messageCopy := message
messageCopy.Signature = []byte{}
// Otherwise we derive the public key from the sender and check it against that.
decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(message.Author))
if err == nil {
data, err := json.Marshal(messageCopy)
if err == nil && len(decodedPub) >= 32 {
return ed25519.Verify(decodedPub[:32], data, message.Signature)
}
}
return false
}
func CheckACL(handle string, group *model.Conversation) (*model.AccessControl, error) {
if isOpen, exists := group.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(ManagedGroupOpen)).ToString()]; !exists {
return nil, fmt.Errorf("group has not been setup correctly - ManagedGroupOpen does not exist ")
} else if isOpen == event.True {
// We don't need to do a membership check
defaultACL := group.GetPeerAC()
return &defaultACL, nil
}
// If this is a closed group. Check if we have an ACL entry for this member
// If we don't OR that member has been blocked, then close the connection.
if acl, inGroup := group.ACL[handle]; !inGroup || acl.Blocked {
log.Infof("ACL Check Failed: %v %v %v", handle, acl, inGroup)
return nil, fmt.Errorf("peer is not a member of this group")
} else {
return &acl, nil
}
}

View File

@ -0,0 +1,199 @@
// This file contains all code related to how a Group Manager operates over a group.
package hybrid
import (
"encoding/json"
"fmt"
"strconv"
"time"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/openprivacy/log"
)
type GroupManagerFunctionality struct {
}
func (f *GroupManagerFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
}
func (f *GroupManagerFunctionality) EventsToRegister() []event.Type {
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeerEngine}
}
func (f *GroupManagerFunctionality) ExperimentsToRegister() []string {
return []string{constants.GroupManagerExperiment, constants.GroupsExperiment}
}
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
func (f *GroupManagerFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
case event.PeerStateChange:
handle := ev.Data["RemotePeer"]
// check that we have authenticated with this peer
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
mg, err := f.GetManagedGroup(profile)
if err != nil {
log.Infof("group manager received peer connections but no suitable group has been found: %v %v", handle, err)
profile.DisconnectFromPeer(handle)
break
}
if _, err := CheckACL(handle, mg); err != nil {
log.Infof("received managed group connection from unauthorized peer: %v %v", handle, err)
profile.DisconnectFromPeer(handle)
break
}
}
// This is where most of the magic happens for managed groups. A few notes:
// - CwtchPeer has already taken care of storing this for us, we don't need to worry about that
// - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected.
case event.NewMessageFromPeerEngine:
log.Infof("received new message from peer: manager")
ci, err := f.GetManagedGroup(profile)
if err != nil {
log.Errorf("unknown conversation %v", err)
break // we don't care about unknown conversations...
}
var cm model.MessageWrapper
err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
if err != nil {
log.Errorf("could not deserialize json %s %v", ev.Data[event.Data], err)
break
}
// The overlay type of this message **must** be ManageGroupEvent
if cm.Overlay == model.OverlayManageGroupEvent {
var mge ManageGroupEvent
err = json.Unmarshal([]byte(cm.Data), &mge)
if err == nil {
f.handleEvent(profile, *ci, mge, ev.Data[event.Data])
}
}
}
}
// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *GroupManagerFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent, original string) {
switch mge.EventType {
case NewClearMessage:
var nme NewClearMessageEvent
err := json.Unmarshal([]byte(mge.Data), &nme)
if err == nil {
f.handleNewMessageEvent(profile, conversation, nme, original)
}
}
}
func (f *GroupManagerFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent, original string) {
log.Infof("handling new clear message event")
hgm := nme.HybridGroupMessage
if AuthenticateMessage(hgm) {
log.Infof("authenticated message")
group, err := f.GetManagedGroup(profile)
if err != nil {
log.Infof("received fraudulant hybrid message from group: %v", err)
return
}
if acl, err := CheckACL(hgm.Author, group); err != nil {
log.Infof("received fraudulant hybrid message from group: %v", err)
return
} else if !acl.Append {
log.Infof("received fraudulant hybrid message from group: peer does not have append privileges")
return
} else {
mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on...
newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID))
// Set the attributes of this message...
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr,
constants.AttrAuthor: hgm.Author,
constants.AttrAck: event.True,
constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)}
profile.InternalInsertMessage(conversation.ID, 0, hgm.Author, hgm.MessageBody, attr, hgm.Signature)
// forward the message to everyone who the server has added as a contact
// and who are represented in the ACL...
allConversations, _ := profile.FetchConversations()
for _, ci := range allConversations {
// NOTE: This check works for Open Groups too as CheckACL will return the default ACL
// for the group....
if acl, err := CheckACL(hgm.Author, group); err == nil && acl.Read {
log.Infof("forwarding group message to: %v", ci.Handle)
profile.SendMessage(ci.ID, original)
}
}
}
} else {
log.Errorf("received fraudulant hybrid message fom group")
}
}
// GetManagedGroup is a convieniance function that looks up the managed group
func (f *GroupManagerFunctionality) GetManagedGroup(profile peer.CwtchPeer) (*model.Conversation, error) {
handle := fmt.Sprintf("managed:%d", 000)
return profile.FetchConversationInfo(handle)
}
// Establish a new Managed Group and return its conversation id
func (f *GroupManagerFunctionality) ManageNewGroup(profile peer.CwtchPeer) (int, error) {
ac := model.DefaultP2PAccessControl()
// by setting the ManageGroup permission in this ACL we are allowing the manage to
// take control of how this group is structured, see OnEvent above...
ac.ManageGroup = true
// note: a manager can only manage one group. This will (probably) always be true and has a few benefits
// and downsides.
// The main downside is that it requires a new manager per group (and thus an onion service per group)
// However, it means that we can lean on p2p functionality like profile images / metadata / name
// etc. for group metadata and effectively get that for-free in the client.
// HOWEVER: hedging our bets here by giving this group a numeric handle...
handle := fmt.Sprintf("managed:%d", 000)
acl := model.AccessControlList{}
acl[profile.GetOnion()] = ac
ci, err := profile.NewConversation(handle, acl)
if err != nil {
return -1, err
}
profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(ManagedGroupOpen)), event.False)
return ci, nil
}
// AddHybridContact is a wrapper arround NewContactConversation which sets the contact
// up for Hybrid Group channel messages...
// TODO this function assumes that authorization has been done at a higher level..
func (f *GroupManagerFunctionality) AddHybridContact(profile peer.CwtchPeer, handle string) error {
ac := model.DefaultP2PAccessControl()
ac.ManageGroup = false
ci, err := profile.NewContactConversation(handle, ac, true)
if err != nil {
return err
}
mg, err := f.GetManagedGroup(profile)
if err != nil {
return err
}
// Update the ACL list to add this contact...
acl := mg.ACL
acl[handle] = model.DefaultP2PAccessControl()
profile.UpdateConversationAccessControlList(mg.ID, acl)
// enable channel 2 on this conversation (hybrid groups management channel)
profile.InitChannel(ci, 2)
key := fmt.Sprintf("channel.%d", 2)
profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True)
return nil
}
func (f *GroupManagerFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
// nop hybrid group conversations do not exchange contact requests
}
func (f *GroupManagerFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
// nop hybrid group conversations do not exchange contact requests
}

View File

@ -0,0 +1,327 @@
package hybrid
import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/settings"
"encoding/base64"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"golang.org/x/crypto/nacl/secretbox"
"math"
"math/big"
"strconv"
"time"
)
type ManagedGroupFunctionality struct {
}
func (f ManagedGroupFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
}
func (f ManagedGroupFunctionality) EventsToRegister() []event.Type {
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeerEngine}
}
func (f ManagedGroupFunctionality) ExperimentsToRegister() []string {
return []string{constants.GroupsExperiment}
}
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
func (f *ManagedGroupFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
// This is where most of the magic happens for managed groups. A few notes:
// - CwtchPeer has already taken care of storing this for us, we don't need to worry about that
// - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected.
case event.NewMessageFromPeerEngine:
handle := ev.Data[event.RemotePeer]
ci, err := profile.FetchConversationInfo(handle)
if err != nil {
break // we don't care about unknown conversations...
}
// We reject managed group requests for groups not setup as managed groups...
if ci.ACL[handle].ManageGroup {
var cm model.MessageWrapper
err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
if err != nil {
break
}
// The overlay type of this message **must** be ManageGroupEvent
if cm.Overlay == model.OverlayManageGroupEvent {
var mge ManageGroupEvent
err = json.Unmarshal([]byte(cm.Data), &mge)
if err == nil {
cid, err := profile.FetchConversationInfo(handle)
if err == nil {
f.handleEvent(profile, *cid, mge)
}
}
}
}
}
}
// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *ManagedGroupFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent) {
switch mge.EventType {
case AddMember:
var ame AddMemberEvent
err := json.Unmarshal([]byte(mge.Data), &ame)
if err == nil {
f.handleAddMemberEvent(profile, conversation, ame)
}
case RemoveMember:
var rme RemoveMemberEvent
err := json.Unmarshal([]byte(mge.Data), &rme)
if err == nil {
f.handleRemoveMemberEvent(profile, conversation, rme)
}
case NewMessage:
var nme NewMessageEvent
err := json.Unmarshal([]byte(mge.Data), &nme)
if err == nil {
f.handleNewMessageEvent(profile, conversation, nme)
}
case NewClearMessage:
var nme NewClearMessageEvent
err := json.Unmarshal([]byte(mge.Data), &nme)
if err == nil {
f.handleNewClearMessageEvent(profile, conversation, nme)
}
case RotateKey:
var rke RotateKeyEvent
err := json.Unmarshal([]byte(mge.Data), &rke)
if err == nil {
f.handleRotateKeyEvent(profile, conversation, rke)
}
}
}
// handleAddMemberEvent adds a group member to the conversation ACL
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *ManagedGroupFunctionality) handleAddMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, ame AddMemberEvent) {
acl := conversation.ACL
acl[ame.Handle] = model.DefaultP2PAccessControl()
profile.UpdateConversationAccessControlList(conversation.ID, acl)
}
// handleRemoveMemberEvent removes a group member from the conversation ACL
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *ManagedGroupFunctionality) handleRemoveMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, rme RemoveMemberEvent) {
acl := conversation.ACL
delete(acl, rme.Handle)
profile.UpdateConversationAccessControlList(conversation.ID, acl)
}
// handleRotateKeyEvent rotates the encryption key for a given group
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *ManagedGroupFunctionality) handleRotateKeyEvent(profile peer.CwtchPeer, conversation model.Conversation, rke RotateKeyEvent) {
keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key"))
keyB64 := base64.StdEncoding.EncodeToString(rke.Key)
profile.SetConversationAttribute(conversation.ID, keyScope, keyB64)
}
func (f *ManagedGroupFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewMessageEvent) {
keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key"))
if keyB64, err := profile.GetConversationAttribute(conversation.ID, keyScope); err == nil {
key, err := base64.StdEncoding.DecodeString(keyB64)
if err != nil || len(key) != 32 {
log.Errorf("hybrid group key is corrupted")
return
}
// decrypt the message with key...
hgm, err := f.decryptMessage(key, nme.EncryptedHybridGroupMessage)
if hgm == nil || err != nil {
log.Errorf("unable to decrypt hybrid group message: %v", err)
return
}
f.handleNewClearMessageEvent(profile, conversation, NewClearMessageEvent{HybridGroupMessage: *hgm})
}
}
func (f *ManagedGroupFunctionality) handleNewClearMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent) {
hgm := nme.HybridGroupMessage
if AuthenticateMessage(hgm) {
// TODO Closed Group Membership Check
if profile.GetOnion() == hgm.Author {
// ack
signatureB64 := base64.StdEncoding.EncodeToString(hgm.Signature)
id, err := profile.GetChannelMessageBySignature(conversation.ID, 0, signatureB64)
if err == nil {
profile.UpdateMessageAttribute(conversation.ID, 0, id, constants.AttrAck, constants.True)
profile.PublishEvent(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.Index: strconv.Itoa(id)}))
}
} else {
mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on...
newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID))
// Set the attributes of this message...
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr,
constants.AttrAuthor: hgm.Author,
constants.AttrAck: event.True,
constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)}
// Note: The Channel here is 0...this is the main channel that UIs understand as the default, so this message is
// becomes part of the conversation...
mid, err := profile.InternalInsertMessage(conversation.ID, 0, hgm.Author, hgm.MessageBody, attr, hgm.Signature)
contenthash := model.CalculateContentHash(hgm.Author, hgm.MessageBody)
if err == nil {
profile.PublishEvent(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.TimestampSent: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano), event.RemotePeer: hgm.Author, event.Index: strconv.Itoa(mid), event.Data: hgm.MessageBody, event.ContentHash: contenthash}))
}
}
// TODO need to send an event here...
} else {
log.Errorf("received fraudulant hybrid message fom group")
}
}
func (f *ManagedGroupFunctionality) decryptMessage(key []byte, ciphertext []byte) (*HybridGroupMessage, error) {
if len(ciphertext) > 24 {
var decryptNonce [24]byte
copy(decryptNonce[:], ciphertext[:24])
var fixedSizeKey [32]byte
copy(fixedSizeKey[:], key[:32])
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &fixedSizeKey)
if ok {
var hgm HybridGroupMessage
err := json.Unmarshal(decrypted, &hgm)
return &hgm, err
}
}
return nil, fmt.Errorf("invalid ciphertext/key error")
}
// Define a new managed group, managed by the manager...
func (f *ManagedGroupFunctionality) NewManagedGroup(profile peer.CwtchPeer, manager string) error {
// generate a truely random member id for this group in [0..2^32)
nBig, err := rand.Int(rand.Reader, big.NewInt(math.MaxUint32))
if err != nil {
return err // if there is a problem with random we want to exit now rather than have to clean up group setup...
}
ac := model.DefaultP2PAccessControl()
ac.ManageGroup = true // by setting the ManageGroup permission in this ACL we are allowing the manager to control of how this group is structured
ci, err := profile.NewContactConversation(manager, ac, true)
if err != nil {
return err
}
// enable channel 2 on this conversation (hybrid groups management channel)
key := fmt.Sprintf("channel.%d", 2)
err = profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True)
if err != nil {
return fmt.Errorf("could not enable channel 2 on hybrid group: %v", err) // likely a catestrophic error...fail
}
err = profile.InitChannel(ci, 2)
if err != nil {
return fmt.Errorf("could not enable channel 2 on hybrid group: %v", err) // likely a catestrophic error...fail
}
// finally, set the member group id on this group...
mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey))
err = profile.SetConversationAttributeInt(ci, mgidkey, int(nBig.Uint64()))
if err != nil {
return fmt.Errorf("could not set group id on hybrid group: %v", err) // likely a catestrophic error...fail
}
return nil
}
// SendMessageToManagedGroup acts like SendMessage(ToPeer), but with a few additional bookkeeping steps for Hybrid Groups
func (f *ManagedGroupFunctionality) SendMessageToManagedGroup(profile peer.CwtchPeer, conversation int, message string) (int, error) {
mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey))
mgid, err := profile.GetConversationAttributeInt(conversation, mgidkey)
if err != nil {
return -1, err
}
mmidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberMessageIDKey))
mmid, err := profile.GetConversationAttributeInt(conversation, mmidkey)
if err != nil {
mmid = 0 // first message
}
mmid += 1
// Now time to package this whole thing in layers of JSON...
hgm := HybridGroupMessage{
MemberGroupID: uint32(mgid),
MemberMessageID: uint32(mmid),
Sent: uint64(time.Now().UnixMilli()),
Author: profile.GetOnion(),
MessageBody: message,
Signature: []byte{}, // Leave blank so we can sign this message...
}
data, err := json.Marshal(hgm)
if err != nil {
return -1, err
}
// Don't forget to sign the message...
sig, err := profile.SignMessage(data)
if err != nil {
return -1, err
}
hgm.Signature = sig
ncm := NewClearMessageEvent{
HybridGroupMessage: hgm,
}
signedData, err := json.Marshal(ncm)
if err != nil {
return -1, err
}
mgm := ManageGroupEvent{
EventType: NewClearMessage,
Data: string(signedData),
}
odata, err := json.Marshal(mgm)
if err != nil {
return -1, err
}
overlay := model.MessageWrapper{
Overlay: model.OverlayManageGroupEvent,
Data: string(odata),
}
ojson, err := json.Marshal(overlay)
if err != nil {
return -1, err
}
// send the message to the manager and update our message is string for tracking...
_, err = profile.SendMessage(conversation, string(ojson))
if err != nil {
return -1, err
}
profile.SetConversationAttributeInt(conversation, mmidkey, mmid)
// ok there is still one more thing we need to do...
// insert this message as part of our group log, for members of the group
// this exists in channel 0 of the conversation with the group manager...
mgidstr := strconv.Itoa(mgid) // we need both MemberGroupId and MemberMessageId for attestation later on...
newmmidstr := strconv.Itoa(mmid)
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr, constants.AttrAuthor: profile.GetOnion(), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}
return profile.InternalInsertMessage(conversation, 0, hgm.Author, message, attr, hgm.Signature)
}
func (f ManagedGroupFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
// nop hybrid group conversations do not exchange contact requests
}
func (f ManagedGroupFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
// nop hybrid group conversations do not exchange contact requests
}

View File

@ -0,0 +1,69 @@
package inter
import (
"strings"
"cwtch.im/cwtch/functionality/hybrid"
"cwtch.im/cwtch/peer"
)
// This functionality is a little different. It's not functionality per-se. It's a wrapper around
// CwtchProfile function that combines some core-functionalities like Hybrid Groups so that
// they can be transparently exposed in autobindings.
// DEV NOTE: consider moving other cross-cutting interface functions here to simplfy CwtchPeer
type InterfaceFunctionality struct {
}
// FunctionalityGate returns filesharing functionality - gates now happen on function calls.
func FunctionalityGate() *InterfaceFunctionality {
return new(InterfaceFunctionality)
}
func (i InterfaceFunctionality) ImportBundle(profile peer.CwtchPeer, uri string) error {
// check if this is a managed group. Note: managed groups do not comply with the server bundle format.
if strings.HasPrefix(uri, "managed:") {
uri = uri[len("managed:"):]
mgf := hybrid.ManagedGroupFunctionality{}
return mgf.NewManagedGroup(profile, uri)
}
// DEV NOTE: we may want to eventually move Server Import code to ServerFunctionality and add a hook here...
// DEV NOTE: consider making ImportBundle a high-level functionality interface? to support different kinds of contacts?
return profile.ImportBundle(uri)
}
// EnhancedImportBundle is identical to EnhancedImportBundle in CwtchPeer but instead of wrapping CwtchPeer.ImportBundle it instead
// wraps InterfaceFunctionality.ImportBundle
func (i InterfaceFunctionality) EnhancedImportBundle(profile peer.CwtchPeer, uri string) string {
err := i.ImportBundle(profile, uri)
if err == nil {
return "importBundle.success"
}
return err.Error()
}
// SendMessage sends a message to a conversation.
// NOTE: Unlike CwtchPeer.SendMessage this interface makes no guarentees about the raw-ness of the message sent to peer contacts.
// If the conversation is a hybrid groups then the message may be wrapped in multiple layers of overlay messages / encryption
// prior to being send. To send a raw message to a peer then use peer.CwtchPeer
// DEV NOTE: Move Legacy Group message send here...
func (i InterfaceFunctionality) SendMessage(profile peer.CwtchPeer, conversation int, message string) (int, error) {
ci, err := profile.GetConversationInfo(conversation)
if err != nil {
return -1, err
}
if ci.ACL[ci.Handle].ManageGroup {
mgf := hybrid.ManagedGroupFunctionality{}
return mgf.SendMessageToManagedGroup(profile, conversation, message)
}
return profile.SendMessage(conversation, message)
}
// EnhancedSendMessage Attempts to Send a Message and Immediately Attempts to Lookup the Message in the Database
// this wraps InterfaceFunctionality.SendMessage to support HybridGroups
func (i InterfaceFunctionality) EnhancedSendMessage(profile peer.CwtchPeer, conversation int, message string) string {
mid, err := i.SendMessage(profile, conversation, message)
if err != nil {
return ""
}
return profile.EnhancedGetMessageById(conversation, mid)
}

View File

@ -20,6 +20,9 @@ const (
// LegacyGroupZone for attributes related to legacy group experiment
LegacyGroupZone = Zone("legacygroup")
// ConversationZone for attributes related to structure of the conversation
ConversationZone = Zone("conversation")
// FilesharingZone for attributes related to file sharing
FilesharingZone = Zone("filesharing")
@ -65,6 +68,8 @@ func ParseZone(path string) (Zone, string) {
return ServerKeyZone, parts[1]
case ServerZone:
return ServerZone, parts[1]
case ConversationZone:
return ConversationZone, parts[1]
default:
return UnknownZone, parts[1]
}

View File

@ -19,3 +19,6 @@ var AutoDLFileExts = [...]string{".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp
// BlodeuweddExperiment enables the Blodeuwedd Assistant
const BlodeuweddExperiment = "blodeuwedd"
// Enables the Hybrid Group Manager Extension
const GroupManagerExperiment = "group-manager"

View File

@ -1,13 +1,13 @@
package model
import (
"encoding/json"
"time"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"time"
)
// AccessControl is a type determining client assigned authorization to a peer
@ -25,6 +25,8 @@ type AccessControl struct {
// Extension Related Permissions
ShareFiles bool // Allows a handle to share files to a conversation
RenderImages bool // Indicates that certain filetypes should be autodownloaded and rendered when shared by this contact
ManageGroup bool // Allows this conversation to be managed by hybrid groups
}
// DefaultP2PAccessControl defaults to a semi-trusted peer with no access to special extensions.
@ -101,6 +103,21 @@ func (ci *Conversation) GetPeerAC() AccessControl {
return DefaultP2PAccessControl()
}
// HasChannel returns true if the requested channel has been setup for this conversation
func (ci *Conversation) HasChannel(requestedChannel int) bool {
if requestedChannel == 0 {
return true
}
if requestedChannel == 1 {
return false // channel 1 is mapped to channel 0 for backwards compatibility
}
key := fmt.Sprintf("channel.%d", requestedChannel)
if value, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)).ToString()]; exists {
return value == constants.True
}
return false
}
// IsCwtchPeer is a helper attribute that identifies whether a conversation is a cwtch peer
func (ci *Conversation) IsCwtchPeer() bool {
return tor.IsValidHostname(ci.Handle)

View File

@ -24,6 +24,7 @@ type MessageWrapper struct {
// Channel 2 is reserved for conversation admin (managed groups)
// Channel 7 is reserved for streams (no ack, no store)
func (mw MessageWrapper) Channel() int {
// 1024 / 0x400 is the start of new channel overlays
if mw.Overlay > 1024 {
return mw.Overlay & 0x07
}
@ -48,3 +49,6 @@ const OverlayInviteGroup = 101
// OverlayFileSharing is the canonical identifier for the file sharing overlay
const OverlayFileSharing = 200
// ManageGroupEvent is the canonical identifier for the manage group overlay
const OverlayManageGroupEvent = 0x402

View File

@ -3,20 +3,11 @@ package peer
import (
"context"
"crypto/rand"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"os"
path "path/filepath"
"sort"
@ -25,6 +16,16 @@ import (
"sync"
"time"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
@ -441,7 +442,7 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
if cm, err := model.DeserializeMessage(message); err == nil {
if !cm.IsStream() {
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err = cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
id, err = cp.storage.InsertMessage(conversationInfo.ID, cm.Channel(), message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil {
return -1, err
}
@ -707,6 +708,14 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
return groupConversationID, err
}
// NewConversation create a new multi-peer conversation.
func (cp *cwtchPeer) NewConversation(handle string, acl model.AccessControlList) (int, error) {
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, acl, true)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano))
return conversationID, err
}
// NewContactConversation create a new p2p conversation with the given acl applied to the handle.
func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) {
cp.mutex.Lock()
@ -897,6 +906,26 @@ func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath)
return val, nil
}
// SetConversationAttributeInt sets the conversation attribute at path to an integer value
func (cp *cwtchPeer) SetConversationAttributeInt(id int, path attr.ScopedZonedPath, value int) error {
strvalue := strconv.Itoa(value)
return cp.storage.SetConversationAttribute(id, path, strvalue)
}
// GetConversationAttributeInt is a method for retrieving an integer value of a given conversation
func (cp *cwtchPeer) GetConversationAttributeInt(id int, path attr.ScopedZonedPath) (int, error) {
ci, err := cp.storage.GetConversation(id)
if err != nil {
return 0, err
}
val, exists := ci.Attributes[path.ToString()]
if !exists {
return 0, fmt.Errorf("%v does not exist for conversation %v", path.ToString(), id)
}
intvalue, err := strconv.Atoi(val)
return intvalue, err
}
// GetChannelMessage returns a message from a conversation channel referenced by the absolute ID.
// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position
// in the table (e.g. deleted messages, expired messages, etc.)
@ -1500,15 +1529,21 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
}
// Don't store messages in channel 7
channel := 0
if cm, err := model.DeserializeMessage(message); err == nil {
if cm.IsStream() {
return -1, nil
}
channel = cm.Channel()
}
// Generate a random number and use it as the signature
signature := event.GetRandNumber().String()
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
return cp.storage.InsertMessage(ci.ID, channel, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
}
func (cp *cwtchPeer) InitChannel(conversation int, channel int) error {
return cp.storage.InitChannelOnConversation(conversation, channel)
}
// eventHandler process events from other subsystems
@ -1523,6 +1558,7 @@ func (cp *cwtchPeer) eventHandler() {
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
log.Infof("Protocol engine for %s has stopped listening: %v", onion, ev.Data[event.Error])
cp.mutex.Unlock()
cp.processExtensionsEvent(ev)
case event.EncryptedGroupMessage:
// If successful, a side effect is the message is added to the group's timeline
@ -1567,6 +1603,7 @@ func (cp *cwtchPeer) eventHandler() {
case event.NewMessageFromPeerEngine: //event.TimestampReceived, event.RemotePeer, event.Data
ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
id, err := cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
cp.processExtensionsEvent(ev)
if err == nil {
// Republish as NewMessageFromPeer
ev.EventType = event.NewMessageFromPeer
@ -1697,19 +1734,7 @@ func (cp *cwtchPeer) eventHandler() {
}
// Safe Access to Extensions
cp.extensionLock.Lock()
for _, extension := range cp.extensions {
log.Debugf("checking extension...%v", extension)
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
continue
}
if cp.checkEventExperiment(extension, ev.EventType) {
extension.extension.OnEvent(ev, cp)
}
}
cp.extensionLock.Unlock()
cp.processExtensionsEvent(ev)
case event.ServerStateChange:
cp.mutex.Lock()
@ -1931,3 +1956,45 @@ func (cp *cwtchPeer) constructGroupFromConversation(conversationInfo *model.Conv
}
return &group, nil
}
// Insert a message directly into a conversation-channel, with the given attributes.
// NOTE: This should only be used by cwtch-extensions. Regular API uses should use more direct methods
// like SendMessage
func (cp *cwtchPeer) InternalInsertMessage(conversation int, channel int, author string, body string, attrs model.Attributes, signature []byte) (int, error) {
signatureB64 := base64.StdEncoding.EncodeToString(signature)
return cp.storage.InsertMessage(conversation, channel, body, attrs, signatureB64, model.CalculateContentHash(author, body))
}
func (cp *cwtchPeer) SignMessage(blob []byte) ([]byte, error) {
privateKey, err := cp.storage.LoadProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey")
if err != nil {
log.Errorf("error loading private key from storage")
return nil, err
}
publicKey, err := cp.storage.LoadProfileKeyValue(TypePublicKey, "Ed25519PublicKey")
if err != nil {
log.Errorf("error loading public key from storage")
return nil, err
}
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
return identity.Sign(blob), nil
}
func (cp *cwtchPeer) processExtensionsEvent(ev event.Event) {
cp.extensionLock.Lock()
for _, extension := range cp.extensions {
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension (%v) ..not all experiments satisfied", extension)
continue
}
// if the extension is registered for this event type then process
if _, contains := extension.events[ev.EventType]; contains {
extension.extension.OnEvent(ev, cp)
}
}
cp.extensionLock.Unlock()
}

View File

@ -87,7 +87,7 @@ const setConversationACLSQLStmt = `update conversations set ACL=(?) where ID=(?)
const deleteConversationSQLStmt = `delete from conversations where ID=(?);`
// createTableConversationMessagesSQLStmt is a template for creating conversation based tables...
const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_0_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime, Signature text unique, ContentHash blob text);`
const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_%d_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime, Signature text unique, ContentHash blob text);`
// insertMessageIntoConversationSQLStmt is a template for creating conversation based tables...
const insertMessageIntoConversationSQLStmt = `insert into channel_%d_%d_chat (Body, Attributes, Signature, ContentHash) values(?,?,?,?);`
@ -324,7 +324,7 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.
return -1, tx.Rollback()
}
result, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id))
result, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id, 0))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return -1, tx.Rollback()
@ -345,6 +345,27 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.
return int(conversationID), nil
}
func (cps *CwtchProfileStorage) InitChannelOnConversation(id int, channel int) error {
cps.mutex.Lock()
defer cps.mutex.Unlock()
tx, err := cps.db.Begin()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return tx.Rollback()
}
_, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return tx.Rollback()
}
err = tx.Commit()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return tx.Rollback()
}
return nil
}
// GetConversationByHandle is a convenience method to fetch an active conversation by a handle
// Usage Notes: This should **only** be used to look up p2p conversations by convention.
// Ideally this function should not exist, and all lookups should happen by ID (this is currently

View File

@ -50,6 +50,8 @@ type ModifyServers interface {
// SendMessages enables a caller to sender messages to a contact
type SendMessages interface {
// SendMessage sends a raw message to the conversation.
// SendMessage is a deprecated public API. Use EnhancedSendMessage instead
SendMessage(conversation int, message string) (int, error)
// EnhancedSendMessage Attempts to Send a Message and Immediately Attempts to Lookup the Message in the Database
@ -115,6 +117,8 @@ type CwtchPeer interface {
EnhancedImportBundle(string) string
// New Unified Conversation Interfaces
NewConversation(handle string, acl model.AccessControlList) (int, error)
InitChannel(conversation int, channel int) error
NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error)
FetchConversations() ([]*model.Conversation, error)
ArchiveConversation(conversation int)
@ -135,12 +139,15 @@ type CwtchPeer interface {
SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
SetConversationAttributeInt(conversation int, path attr.ScopedZonedPath, value int) error
GetConversationAttributeInt(conversation int, path attr.ScopedZonedPath) (int, error)
DeleteConversation(conversation int) error
// New Unified Conversation Channel Interfaces
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
GetChannelMessageCount(conversation int, channel int) (int, error)
GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error)
GetChannelMessageBySignature(conversationID int, channelID int, signature string) (int, error)
GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error)
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
SearchConversations(pattern string) string
@ -168,6 +175,10 @@ type CwtchPeer interface {
UpdateExperiments(enabled bool, experiments map[string]bool)
NotifySettingsUpdate(settings settings.GlobalSettings)
IsFeatureEnabled(featureName string) bool
SignMessage(blob []byte) ([]byte, error)
// Used for Internal Bookkeeping by Extensions, **do not expose in autobindings**
InternalInsertMessage(conversation int, channel int, author string, body string, attributes model.Attributes, signature []byte) (int, error)
}
// EnhancedMessage wraps a Cwtch model.Message with some additional data to reduce calls from the UI.

View File

@ -0,0 +1,223 @@
package testing
import (
"crypto/rand"
"encoding/base64"
"fmt"
mrand "math/rand"
"os"
"path"
"path/filepath"
"runtime"
"runtime/pprof"
"testing"
"time"
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/functionality/hybrid"
"cwtch.im/cwtch/functionality/inter"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4"
)
func TestHyrbidGroupIntegration(t *testing.T) {
t.Logf("Starting Hybrid Groups Test")
os.RemoveAll("./storage")
os.RemoveAll("./managerstorage")
// Goroutine Monitoring Start..
numGoRoutinesStart := runtime.NumGoroutine()
log.AddEverythingFromPattern("connectivity")
log.SetLevel(log.LevelInfo)
log.ExcludeFromPattern("connection/connection")
log.ExcludeFromPattern("outbound/3dhauthchannel")
log.ExcludeFromPattern("event/eventmanager")
log.ExcludeFromPattern("tapir")
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700)
// we don't need real randomness for the port, just to avoid a possible conflict...
socksPort := mrand.Intn(1000) + 9051
controlPort := mrand.Intn(1000) + 9052
// generate a random password
key := make([]byte, 64)
_, err := rand.Read(key)
if err != nil {
panic(err)
}
useCache := os.Getenv("TORCACHE") == "true"
torDataDir := ""
if useCache {
log.Infof("using tor cache")
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
os.MkdirAll(torDataDir, 0700)
} else {
log.Infof("using clean tor data dir")
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
t.Fatalf("could not create data dir")
}
}
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
if err != nil {
t.Fatalf("Could not start Tor: %v", err)
}
log.Infof("Waiting for tor to bootstrap...")
acn.WaitTillBootstrapped()
defer acn.Close()
// ***** Cwtch Server management *****
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"))
managerApp := app2.NewApp(acn, "./managerstorage", app2.LoadAppSettings("./managerstorage"))
// ***** cwtchPeer setup *****
// Turn on Groups Experiment...
settings := app.ReadSettings()
settings.ExperimentsEnabled = true
settings.Experiments[constants.GroupsExperiment] = true
settings.Experiments[constants.GroupManagerExperiment] = false
app.UpdateSettings(settings)
// Create a Manager App that has the Group Manager Experiment Enabled....
managerSettings := managerApp.ReadSettings()
managerSettings.ExperimentsEnabled = true
managerSettings.Experiments[constants.GroupsExperiment] = true
managerSettings.Experiments[constants.GroupManagerExperiment] = true
managerApp.UpdateSettings(managerSettings)
alice := MakeProfile(app, "Alice")
bob := MakeProfile(app, "Bob")
manager := MakeProfile(managerApp, "Manager")
waitTime := time.Duration(60) * time.Second
log.Infof("** Waiting for Alice, Bob, and Carol to register their onion hidden service on the network... (%v)\n", waitTime)
time.Sleep(waitTime)
log.Infof("** Wait Done!")
// Ok Lets Start By Creating a Hybrid Group...
hgmf := hybrid.GroupManagerFunctionality{}
ci, err := hgmf.ManageNewGroup(manager)
if err != nil {
t.Fatalf("could not create hybrid group: %v", err)
}
log.Infof("created a hybrid group: %d. moving onto adding hybrid contacts...", ci)
err = hgmf.AddHybridContact(manager, alice.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid contact (alice): %v", err)
}
err = hgmf.AddHybridContact(manager, bob.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid contact (bob): %v", err)
}
// Now we can allow alice, bob and carol to create a new hybrid group...
log.Infof("now we can allow alice bob and carol to join the hybrid group")
inter := inter.InterfaceFunctionality{}
err = inter.ImportBundle(alice, "managed:"+manager.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid group contact (carol): %v", err)
}
alice.PeerWithOnion(manager.GetOnion()) // explictly trigger a peer request
err = inter.ImportBundle(bob, "managed:"+manager.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid group contact (carol): %v", err)
}
bob.PeerWithOnion(manager.GetOnion())
log.Infof("waiting for alice and manager to connect")
WaitForConnection(t, alice, manager.GetOnion(), connections.AUTHENTICATED)
log.Infof("waiting for bob and manager to connect")
WaitForConnection(t, bob, manager.GetOnion(), connections.AUTHENTICATED)
// at this pont we should be able to send messages to the group, and receive them in the timeline
log.Infof("sending message to group")
_, err = inter.SendMessage(alice, 1, "hello everyone!!!")
if err != nil {
t.Fatalf("hybrid group sending failed... %v", err)
}
// Note: From this point onwards there are no managed-group specific calls. Everything happens
// transparently with respect to the receiver.
time.Sleep(time.Second * 10)
bobMessages, err := bob.GetMostRecentMessages(1, 0, 0, 1)
if err != nil || len(bobMessages) != 1 {
t.Fatalf("hybrid group receipt failed... %v", err)
}
if bobMessages[0].Body != "hello everyone!!!" {
t.Fatalf("hybrid group receipt failed...message does not match")
}
aliceMessages, err := alice.GetMostRecentMessages(1, 0, 0, 1)
if err != nil || len(aliceMessages) != 1 {
t.Fatalf("hybrid group receipt failed... %v", err)
}
if aliceMessages[0].Attr[constants.AttrAck] != constants.True {
t.Fatalf("hybrid group receipt failed...alice's message was not ack'd")
}
// Time to Clean Up....
log.Infof("Shutting down Alice...")
app.ShutdownPeer(alice.GetOnion())
time.Sleep(time.Second * 3)
log.Infof("Shutting down Bob...")
app.ShutdownPeer(bob.GetOnion())
time.Sleep(time.Second * 3)
log.Infof("Shutting fown Manager...")
managerApp.ShutdownPeer(manager.GetOnion())
time.Sleep(time.Second * 3)
log.Infof("Shutting down apps...")
log.Infof("app Shutdown: %v\n", runtime.NumGoroutine())
app.Shutdown()
managerApp.Shutdown()
time.Sleep(2 * time.Second)
log.Infof("Done shutdown: %v\n", runtime.NumGoroutine())
log.Infof("Shutting down ACN...")
acn.Close()
time.Sleep(time.Second * 60) // the network status / heartbeat plugin might keep goroutines alive for a minute before killing them
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
// Printing out the current goroutines
// Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
fmt.Println("")
if numGoRoutinesStart != numGoRoutinesPostAppShutdown {
t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, v detected!", numGoRoutinesStart, numGoRoutinesPostAppShutdown)
}
}
func MakeProfile(application app2.Application, name string) peer.CwtchPeer {
application.CreateProfile(name, "asdfasdf", true)
p := app2.WaitGetPeer(application, name)
application.ConfigureConnections(p.GetOnion(), true, true, false)
log.Infof("%s created: %s", name, p.GetOnion())
// bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
p.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
return p
}

32
testing/utils.go Normal file
View File

@ -0,0 +1,32 @@
package testing
import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4"
"testing"
"time"
)
func WaitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
for {
log.Infof("%v checking connection...\n", peerName)
state := peer.GetPeerState(addr)
log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state])
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
}
if state != target {
log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
break
}
}
}