WIP: Hybrid Groups Initial Sketch #545
|
@ -33,4 +33,5 @@ data-dir-cwtchtool/
|
|||
tokens
|
||||
tordir/
|
||||
testing/autodownload/download_dir
|
||||
testing/autodownload/storage
|
||||
testing/autodownload/storage
|
||||
testing/managerstorage
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/extensions"
|
||||
"cwtch.im/cwtch/functionality/filesharing"
|
||||
"cwtch.im/cwtch/functionality/hybrid"
|
||||
"cwtch.im/cwtch/functionality/servers"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
|
@ -366,6 +367,8 @@ func (app *application) registerHooks(profile peer.CwtchPeer) {
|
|||
profile.RegisterHook(new(filesharing.Functionality))
|
||||
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
|
||||
profile.RegisterHook(new(servers.Functionality))
|
||||
profile.RegisterHook(new(hybrid.ManagedGroupFunctionality))
|
||||
profile.RegisterHook(new(hybrid.GroupManagerFunctionality)) // will only be activated if GroupManagerExperiment is enabled...
|
||||
// Ensure that Profiles have the Most Up to Date Settings...
|
||||
profile.NotifySettingsUpdate(app.settings.ReadGlobalSettings())
|
||||
}
|
||||
|
@ -404,7 +407,7 @@ func (app *application) ActivatePeerEngine(onion string) {
|
|||
if err == nil {
|
||||
log.Debugf("restartFlow: Creating a New Protocol Engine...")
|
||||
app.engines[profile.GetOnion()] = engine
|
||||
eventBus.Publish(event.NewEventList(event.ProtocolEngineCreated))
|
||||
eventBus.Publish(event.NewEventList(event.ProtocolEngineCreated, event.Handle, profile.GetOnion()))
|
||||
|
||||
app.QueryACNStatus()
|
||||
} else {
|
||||
log.Errorf("corrupted profile detected for %v", onion)
|
||||
|
@ -515,6 +518,7 @@ func (app *application) eventHandler() {
|
|||
log.Infof("peer appearing offline, not launching listen threads or connecting jobs")
|
||||
app.ConfigureConnections(onion, false, false, false)
|
||||
} else {
|
||||
log.Infof("configuring online connections for peer")
|
||||
dan
commented
add onion address else this line becomes noise when multiple peers activating (or remove log) add onion address else this line becomes noise when multiple peers activating (or remove log)
|
||||
app.ConfigureConnections(onion, true, true, true)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,6 +95,11 @@ func (em *manager) initialize() {
|
|||
func (em *manager) Subscribe(eventType Type, queue Queue) {
|
||||
em.mapMutex.Lock()
|
||||
defer em.mapMutex.Unlock()
|
||||
for _, sub := range em.subscribers[eventType] {
|
||||
if sub == queue {
|
||||
return // don't add the same queue for the same event twice...
|
||||
}
|
||||
}
|
||||
em.subscribers[eventType] = append(em.subscribers[eventType], queue)
|
||||
}
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ func (q *infiniteQueue) resize() {
|
|||
|
||||
// Add puts an element on the end of the queue.
|
||||
func (q *infiniteQueue) Add(elem Event) {
|
||||
|
||||
dan
commented
delete delete
|
||||
if q.count == len(q.buf) {
|
||||
q.resize()
|
||||
}
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
package hybrid
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"encoding/base32"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type GroupEventType int
|
||||
|
||||
const (
|
||||
MemberGroupIDKey = "member_group_id_key"
|
||||
MemberMessageIDKey = "member_group_messge_id"
|
||||
)
|
||||
|
||||
const (
|
||||
AddMember = GroupEventType(0x1000)
|
||||
RemoveMember = GroupEventType(0x2000)
|
||||
RotateKey = GroupEventType(0x3000)
|
||||
NewMessage = GroupEventType(0x4000)
|
||||
NewClearMessage = GroupEventType(0x5000)
|
||||
SyncRequest = GroupEventType(0x6000)
|
||||
)
|
||||
|
||||
type ManageGroupEvent struct {
|
||||
EventType GroupEventType `json:"t"`
|
||||
Data string `json:"d"` // json encoded data
|
||||
}
|
||||
|
||||
type AddMemberEvent struct {
|
||||
Handle string `json:"h"`
|
||||
}
|
||||
|
||||
type RemoveMemberEvent struct {
|
||||
Handle string `json:"h"`
|
||||
}
|
||||
|
||||
type RotateKeyEvent struct {
|
||||
Key []byte `json:"k"`
|
||||
}
|
||||
|
||||
type NewMessageEvent struct {
|
||||
EncryptedHybridGroupMessage []byte `json:"m"`
|
||||
}
|
||||
|
||||
type NewClearMessageEvent struct {
|
||||
HybridGroupMessage HybridGroupMessage `json:"m"`
|
||||
}
|
||||
|
||||
type SyncRequestMessage struct {
|
||||
// a map of MemberGroupID: MemberMessageID
|
||||
LastSeen map[int]int `json:"l"`
|
||||
}
|
||||
|
||||
// This file contains code for the Hybrid Group / Managed Group types..
|
||||
type HybridGroupMessage struct {
|
||||
Author string // the authors cwtch address
|
||||
MemberGroupID uint32
|
||||
dan
commented
why not GroupID and MessageID? ah is it not the ID of hte group but some member ID in the group? i see and the MemeberMessageID? why not GroupID and MessageID?
---
ah is it not the ID of hte group but some member ID in the group? i see
and the MemeberMessageID?
|
||||
MemberMessageID uint32
|
||||
MessageBody string
|
||||
Sent uint64 // milliseconds since epoch
|
||||
Signature []byte // of json-encoded content (including empty sig)
|
||||
}
|
||||
|
||||
// AuthenticateMessage returns true if the Author of the message produced the Signature over the message
|
||||
func AuthenticateMessage(message HybridGroupMessage) bool {
|
||||
messageCopy := message
|
||||
messageCopy.Signature = []byte{}
|
||||
// Otherwise we derive the public key from the sender and check it against that.
|
||||
dan
commented
otherwise? otherwise?
|
||||
decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(message.Author))
|
||||
if err == nil {
|
||||
data, err := json.Marshal(messageCopy)
|
||||
if err == nil && len(decodedPub) >= 32 {
|
||||
return ed25519.Verify(decodedPub[:32], data, message.Signature)
|
||||
dan
commented
only the first 32 bytes of an decoded onion address are the public key? can we make a like onion utils file and functions somewhere so it's a little easier to read / write with out having to remember / google the structure of an onion address? only the first 32 bytes of an decoded onion address are the public key? can we make a like onion utils file and functions somewhere so it's a little easier to read / write with out having to remember / google the structure of an onion address?
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
|
@ -0,0 +1,312 @@
|
|||
package hybrid
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/settings"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"golang.org/x/crypto/nacl/secretbox"
|
||||
"math"
|
||||
"math/big"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ManagedGroupFunctionality struct {
|
||||
}
|
||||
|
||||
func (f *ManagedGroupFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
|
||||
}
|
||||
|
||||
func (f *ManagedGroupFunctionality) EventsToRegister() []event.Type {
|
||||
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeerEngine}
|
||||
}
|
||||
|
||||
func (f *ManagedGroupFunctionality) ExperimentsToRegister() []string {
|
||||
return []string{constants.GroupsExperiment}
|
||||
}
|
||||
|
||||
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
|
||||
func (f *ManagedGroupFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
|
||||
switch ev.EventType {
|
||||
// This is where most of the magic happens for managed groups. A few notes:
|
||||
// - CwtchPeer has already taken care of storing this for us, we don't need to worry about that
|
||||
// - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected.
|
||||
case event.NewMessageFromPeerEngine:
|
||||
handle := ev.Data[event.RemotePeer]
|
||||
ci, err := profile.FetchConversationInfo(handle)
|
||||
if err != nil {
|
||||
break // we don't care about unknown conversations...
|
||||
}
|
||||
if ci.ACL[handle].ManageGroup {
|
||||
var cm model.MessageWrapper
|
||||
err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
// The overlay type of this message **must** be ManageGroupEvent
|
||||
if cm.Overlay == model.OverlayManageGroupEvent {
|
||||
var mge ManageGroupEvent
|
||||
err = json.Unmarshal([]byte(cm.Data), &mge)
|
||||
if err == nil {
|
||||
cid, err := profile.FetchConversationInfo(handle)
|
||||
if err == nil {
|
||||
f.handleEvent(profile, *cid, mge)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling
|
||||
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
|
||||
func (f *ManagedGroupFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent) {
|
||||
switch mge.EventType {
|
||||
case AddMember:
|
||||
var ame AddMemberEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &ame)
|
||||
if err == nil {
|
||||
f.handleAddMemberEvent(profile, conversation, ame)
|
||||
}
|
||||
case RemoveMember:
|
||||
var rme RemoveMemberEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &rme)
|
||||
if err == nil {
|
||||
f.handleRemoveMemberEvent(profile, conversation, rme)
|
||||
}
|
||||
case NewMessage:
|
||||
var nme NewMessageEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &nme)
|
||||
if err == nil {
|
||||
f.handleNewMessageEvent(profile, conversation, nme)
|
||||
}
|
||||
case NewClearMessage:
|
||||
var nme NewClearMessageEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &nme)
|
||||
if err == nil {
|
||||
f.handleNewClearMessageEvent(profile, conversation, nme)
|
||||
}
|
||||
case RotateKey:
|
||||
var rke RotateKeyEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &rke)
|
||||
if err == nil {
|
||||
f.handleRotateKeyEvent(profile, conversation, rke)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleAddMemberEvent adds a group member to the conversation ACL
|
||||
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
|
||||
func (f *ManagedGroupFunctionality) handleAddMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, ame AddMemberEvent) {
|
||||
acl := conversation.ACL
|
||||
acl[ame.Handle] = model.DefaultP2PAccessControl()
|
||||
profile.UpdateConversationAccessControlList(conversation.ID, acl)
|
||||
}
|
||||
|
||||
// handleRemoveMemberEvent removes a group member from the conversation ACL
|
||||
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
|
||||
func (f *ManagedGroupFunctionality) handleRemoveMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, rme RemoveMemberEvent) {
|
||||
acl := conversation.ACL
|
||||
delete(acl, rme.Handle)
|
||||
profile.UpdateConversationAccessControlList(conversation.ID, acl)
|
||||
}
|
||||
|
||||
// handleRotateKeyEvent rotates the encryption key for a given group
|
||||
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
|
||||
func (f *ManagedGroupFunctionality) handleRotateKeyEvent(profile peer.CwtchPeer, conversation model.Conversation, rke RotateKeyEvent) {
|
||||
dan
commented
Do we need to store the old keys so we can reconstruct message history? or is it that if we have the invite, we'll have the first key, and subequently can read the rotate messages in order, etc, and reconstruct, but only if we have full history? Do we need to store the old keys so we can reconstruct message history? or is it that if we have the invite, we'll have the first key, and subequently can read the rotate messages in order, etc, and reconstruct, but only if we have full history?
|
||||
keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key"))
|
||||
keyB64 := base64.StdEncoding.EncodeToString(rke.Key)
|
||||
profile.SetConversationAttribute(conversation.ID, keyScope, keyB64)
|
||||
}
|
||||
|
||||
func (f *ManagedGroupFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewMessageEvent) {
|
||||
dan
commented
so a group manager can also send messages? relay them? what's the imagined use case here? group manager replacing servers? so a group manager can also send messages? relay them? what's the imagined use case here? group manager replacing servers?
|
||||
keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key"))
|
||||
if keyB64, err := profile.GetConversationAttribute(conversation.ID, keyScope); err == nil {
|
||||
key, err := base64.StdEncoding.DecodeString(keyB64)
|
||||
if err != nil || len(key) != 32 {
|
||||
log.Errorf("hybrid group key is corrupted")
|
||||
return
|
||||
}
|
||||
// decrypt the message with key...
|
||||
hgm, err := f.decryptMessage(key, nme.EncryptedHybridGroupMessage)
|
||||
if hgm == nil || err != nil {
|
||||
log.Errorf("unable to decrypt hybrid group message: %v", err)
|
||||
return
|
||||
}
|
||||
//
|
||||
dan
commented
ah we dont do anything with it yet? ah we dont do anything with it yet?
|
||||
}
|
||||
}
|
||||
|
||||
func (f *ManagedGroupFunctionality) handleNewClearMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent) {
|
||||
dan
commented
what is the difference between receiving a message and a clear message from a group manager? what is the difference between receiving a message and a clear message from a group manager?
|
||||
hgm := nme.HybridGroupMessage
|
||||
if AuthenticateMessage(hgm) {
|
||||
// TODO Closed Group Membership Check
|
||||
if profile.GetOnion() == hgm.Author {
|
||||
// ack
|
||||
signatureB64 := base64.StdEncoding.EncodeToString(hgm.Signature)
|
||||
id, err := profile.GetChannelMessageBySignature(conversation.ID, 0, signatureB64)
|
||||
if err == nil {
|
||||
profile.UpdateMessageAttribute(conversation.ID, 0, id, constants.AttrAck, constants.True)
|
||||
profile.PublishEvent(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.Index: strconv.Itoa(id)}))
|
||||
}
|
||||
} else {
|
||||
mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on...
|
||||
newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID))
|
||||
// Set the attributes of this message...
|
||||
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr,
|
||||
constants.AttrAuthor: hgm.Author,
|
||||
constants.AttrAck: event.True,
|
||||
constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)}
|
||||
profile.InternalInsertMessage(conversation.ID, 0, hgm.Author, hgm.MessageBody, attr, hgm.Signature)
|
||||
}
|
||||
|
||||
// TODO need to send an event here...
|
||||
} else {
|
||||
log.Errorf("received fraudulant hybrid message fom group")
|
||||
}
|
||||
}
|
||||
|
||||
func (f *ManagedGroupFunctionality) decryptMessage(key []byte, ciphertext []byte) (*HybridGroupMessage, error) {
|
||||
if len(ciphertext) > 24 {
|
||||
var decryptNonce [24]byte
|
||||
copy(decryptNonce[:], ciphertext[:24])
|
||||
var fixedSizeKey [32]byte
|
||||
copy(fixedSizeKey[:], key[:32])
|
||||
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &fixedSizeKey)
|
||||
if ok {
|
||||
var hgm HybridGroupMessage
|
||||
err := json.Unmarshal(decrypted, &hgm)
|
||||
return &hgm, err
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("invalid ciphertext/key error")
|
||||
}
|
||||
|
||||
// Define a new managed group, managed by the manager...
|
||||
func (f *ManagedGroupFunctionality) NewManagedGroup(profile peer.CwtchPeer, manager string) error {
|
||||
|
||||
// generate a truely random member id for this group in [0..2^32)
|
||||
nBig, err := rand.Int(rand.Reader, big.NewInt(math.MaxUint32))
|
||||
if err != nil {
|
||||
return err // if there is a problem with random we want to exit now rather than have to clean up group setup...
|
||||
}
|
||||
|
||||
ac := model.DefaultP2PAccessControl()
|
||||
ac.ManageGroup = true // by setting the ManageGroup permission in this ACL we are allowing the manager to control of how this group is structured
|
||||
ci, err := profile.NewContactConversation(manager, ac, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// enable channel 2 on this conversation (hybrid groups management channel)
|
||||
key := fmt.Sprintf("channel.%d", 2)
|
||||
err = profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not enable channel 2 on hybrid group: %v", err) // likely a catestrophic error...fail
|
||||
}
|
||||
|
||||
// finally, set the member group id on this group...
|
||||
mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey))
|
||||
err = profile.SetConversationAttributeInt(ci, mgidkey, int(nBig.Uint64()))
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not set group id on hybrid group: %v", err) // likely a catestrophic error...fail
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendMessageToManagedGroup acts like SendMessage(ToPeer), but with a few additional bookkeeping steps for Hybrid Groups
|
||||
func (f *ManagedGroupFunctionality) SendMessageToManagedGroup(profile peer.CwtchPeer, conversation int, message string) (int, error) {
|
||||
mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey))
|
||||
mgid, err := profile.GetConversationAttributeInt(conversation, mgidkey)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
mmidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberMessageIDKey))
|
||||
mmid, err := profile.GetConversationAttributeInt(conversation, mmidkey)
|
||||
if err != nil {
|
||||
mmid = 0 // first message
|
||||
}
|
||||
|
||||
mmid += 1
|
||||
|
||||
hgm := HybridGroupMessage{
|
||||
MemberGroupID: uint32(mgid),
|
||||
MemberMessageID: uint32(mmid),
|
||||
Sent: uint64(time.Now().UnixMilli()),
|
||||
Author: profile.GetOnion(),
|
||||
MessageBody: message,
|
||||
Signature: []byte{}, // Leave blank so we can sign this message...
|
||||
}
|
||||
|
||||
data, err := json.Marshal(hgm)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
sig, err := profile.SignMessage(data)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
hgm.Signature = sig
|
||||
|
||||
ncm := NewClearMessageEvent{
|
||||
HybridGroupMessage: hgm,
|
||||
}
|
||||
|
||||
signedData, err := json.Marshal(ncm)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
mgm := ManageGroupEvent{
|
||||
EventType: NewClearMessage,
|
||||
Data: string(signedData),
|
||||
}
|
||||
|
||||
odata, err := json.Marshal(mgm)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
overlay := model.MessageWrapper{
|
||||
Overlay: model.OverlayManageGroupEvent,
|
||||
Data: string(odata),
|
||||
}
|
||||
|
||||
ojson, err := json.Marshal(overlay)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// send the message to the manager and update our message is string for tracking...
|
||||
_, err = profile.SendMessage(conversation, string(ojson))
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
profile.SetConversationAttributeInt(conversation, mmidkey, mmid)
|
||||
|
||||
// ok there is still one more thing we need to do...
|
||||
// insert this message as part of our group log, for members of the group
|
||||
// this exists in channel 0 of the conversation with the group manager...
|
||||
mgidstr := strconv.Itoa(mgid) // we need both MemberGroupId and MemberMessageId for attestation later on...
|
||||
newmmidstr := strconv.Itoa(mmid)
|
||||
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr, constants.AttrAuthor: profile.GetOnion(), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}
|
||||
return profile.InternalInsertMessage(conversation, 0, hgm.Author, message, attr, hgm.Signature)
|
||||
}
|
||||
|
||||
func (f *ManagedGroupFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
|
||||
// nop hybrid group conversations do not exchange contact requests
|
||||
}
|
||||
|
||||
func (f *ManagedGroupFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
|
||||
// nop hybrid group conversations do not exchange contact requests
|
||||
}
|
|
@ -0,0 +1,144 @@
|
|||
// This file contains all code related to how a Group Manager operates over a group.
|
||||
package hybrid
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/settings"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type GroupManagerFunctionality struct {
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) EventsToRegister() []event.Type {
|
||||
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeerEngine}
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) ExperimentsToRegister() []string {
|
||||
return []string{constants.GroupManagerExperiment, constants.GroupsExperiment}
|
||||
}
|
||||
|
||||
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
|
||||
func (f *GroupManagerFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
|
||||
switch ev.EventType {
|
||||
// This is where most of the magic happens for managed groups. A few notes:
|
||||
// - CwtchPeer has already taken care of storing this for us, we don't need to worry about that
|
||||
// - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected.
|
||||
case event.NewMessageFromPeerEngine:
|
||||
log.Infof("received new message from peer: manager")
|
||||
ci, err := profile.GetConversationInfo(1)
|
||||
if err != nil {
|
||||
log.Errorf("unknown conversation %v", err)
|
||||
break // we don't care about unknown conversations...
|
||||
}
|
||||
var cm model.MessageWrapper
|
||||
err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
|
||||
if err != nil {
|
||||
log.Errorf("could not deserialize json %v", err)
|
||||
break
|
||||
}
|
||||
// The overlay type of this message **must** be ManageGroupEvent
|
||||
if cm.Overlay == model.OverlayManageGroupEvent {
|
||||
var mge ManageGroupEvent
|
||||
err = json.Unmarshal([]byte(cm.Data), &mge)
|
||||
if err == nil {
|
||||
f.handleEvent(profile, *ci, mge, ev.Data[event.Data])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling
|
||||
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
|
||||
func (f *GroupManagerFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent, original string) {
|
||||
switch mge.EventType {
|
||||
case NewClearMessage:
|
||||
var nme NewClearMessageEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &nme)
|
||||
if err == nil {
|
||||
f.handleNewMessageEvent(profile, conversation, nme, original)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent, original string) {
|
||||
log.Infof("handling new clear message event")
|
||||
hgm := nme.HybridGroupMessage
|
||||
if AuthenticateMessage(hgm) {
|
||||
log.Infof("authenticated message")
|
||||
// TODO Closed Group Membership Check
|
||||
mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on...
|
||||
newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID))
|
||||
// Set the attributes of this message...
|
||||
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr,
|
||||
constants.AttrAuthor: hgm.Author,
|
||||
constants.AttrAck: event.True,
|
||||
constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)}
|
||||
profile.InternalInsertMessage(conversation.ID, 0, hgm.Author, hgm.MessageBody, attr, hgm.Signature)
|
||||
|
||||
// forward the message to everyone who the server has added as a contact
|
||||
// TODO: filter by conversation.ACL
|
||||
allConversations, _ := profile.FetchConversations()
|
||||
for _, ci := range allConversations {
|
||||
log.Infof("forwarding group message to: %v", ci.Handle)
|
||||
profile.SendMessage(ci.ID, original)
|
||||
}
|
||||
} else {
|
||||
log.Errorf("received fraudulant hybrid message fom group")
|
||||
}
|
||||
}
|
||||
|
||||
// Establish a new Managed Group
|
||||
func (f *GroupManagerFunctionality) ManageNewGroup(profile peer.CwtchPeer) (int, error) {
|
||||
ac := model.DefaultP2PAccessControl()
|
||||
// by setting the ManageGroup permission in this ACL we are allowing the manage to
|
||||
// take control of how this group is structured, see OnEvent above...
|
||||
ac.ManageGroup = true
|
||||
// note: a manager can only manage one group. This will always be true and has a few benefits
|
||||
// and downsides.
|
||||
// The main downside is that it requires a new manager per group (and thus an onion service per group)
|
||||
// However, it means that we can lean on p2p functionality like profile images / metadata / name
|
||||
// etc. for group metadata and effectively get that for-free in the client.
|
||||
handle := fmt.Sprintf("managed:%d", 000)
|
||||
acl := model.AccessControlList{}
|
||||
acl[profile.GetOnion()] = ac
|
||||
ci, err := profile.NewConversation(handle, acl)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
return ci, nil
|
||||
}
|
||||
|
||||
// AddHybridContact is a wrapper arround NewContactConversation which sets the contact
|
||||
// up for Hybrid Group channel messages...
|
||||
func (f *GroupManagerFunctionality) AddHybridContact(profile peer.CwtchPeer, handle string) error {
|
||||
ac := model.DefaultP2PAccessControl()
|
||||
ac.ManageGroup = false
|
||||
ci, err := profile.NewContactConversation(handle, ac, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// enable channel 2 on this conversation (hybrid groups management channel)
|
||||
dan
commented
what are channels? what are channels?
|
||||
key := fmt.Sprintf("channel.%d", 2)
|
||||
profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
|
||||
// nop hybrid group conversations do not exchange contact requests
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
|
||||
// nop hybrid group conversations do not exchange contact requests
|
||||
}
|
|
@ -20,6 +20,9 @@ const (
|
|||
// LegacyGroupZone for attributes related to legacy group experiment
|
||||
LegacyGroupZone = Zone("legacygroup")
|
||||
|
||||
// ConversationZone for attributes related to structure of the conversation
|
||||
ConversationZone = Zone("conversation")
|
||||
|
||||
// FilesharingZone for attributes related to file sharing
|
||||
FilesharingZone = Zone("filesharing")
|
||||
|
||||
|
@ -65,6 +68,8 @@ func ParseZone(path string) (Zone, string) {
|
|||
return ServerKeyZone, parts[1]
|
||||
case ServerZone:
|
||||
return ServerZone, parts[1]
|
||||
case ConversationZone:
|
||||
return ConversationZone, parts[1]
|
||||
default:
|
||||
return UnknownZone, parts[1]
|
||||
}
|
||||
|
|
|
@ -2,6 +2,9 @@ package constants
|
|||
|
||||
const GroupsExperiment = "tapir-groups-experiment"
|
||||
|
||||
// for now only used by bots, do not expose in UI..for now
|
||||
const GroupManagerExperiment = "manage-group-experiment"
|
||||
|
||||
// FileSharingExperiment Allows file sharing
|
||||
const FileSharingExperiment = "filesharing"
|
||||
|
||||
|
|
|
@ -4,7 +4,9 @@ import (
|
|||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -23,6 +25,8 @@ type AccessControl struct {
|
|||
// Extension Related Permissions
|
||||
ShareFiles bool // Allows a handle to share files to a conversation
|
||||
RenderImages bool // Indicates that certain filetypes should be autodownloaded and rendered when shared by this contact
|
||||
|
||||
ManageGroup bool // Allows a handle to actively manage the group
|
||||
}
|
||||
|
||||
// DefaultP2PAccessControl defaults to a semi-trusted peer with no access to special extensions.
|
||||
|
@ -85,6 +89,22 @@ func (ci *Conversation) GetAttribute(scope attr.Scope, zone attr.Zone, key strin
|
|||
return "", false
|
||||
}
|
||||
|
||||
// GetPeerAC returns a suitable Access Control object for a the given peer conversation
|
||||
// If this is called for a group conversation, this method will error and return a safe default AC.
|
||||
func (ci *Conversation) HasChannel(requestedChannel int) bool {
|
||||
if requestedChannel == 0 {
|
||||
return true
|
||||
}
|
||||
if requestedChannel == 1 {
|
||||
return false // channel 1 is mapped to channel 0 for backwards compatibility
|
||||
dan
commented
? this hasn't launched? backwards compatibility with what? and why return false if it's "mapped to 0"? we have 0 so isnt this kinda true? ? this hasn't launched? backwards compatibility with what? and why return false if it's "mapped to 0"? we have 0 so isnt this kinda true?
|
||||
}
|
||||
key := fmt.Sprintf("channel.%d", requestedChannel)
|
||||
if value, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)).ToString()]; exists {
|
||||
return value == constants.True
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// GetPeerAC returns a suitable Access Control object for a the given peer conversation
|
||||
// If this is called for a group conversation, this method will error and return a safe default AC.
|
||||
func (ci *Conversation) GetPeerAC() AccessControl {
|
||||
|
@ -111,6 +131,11 @@ func (ci *Conversation) IsServer() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// IsManaged is a helper attribute that identifies whether a conversation is managed
|
||||
func (ci *Conversation) IsManaged() bool {
|
||||
return strings.HasPrefix(ci.Handle, "managed")
|
||||
}
|
||||
|
||||
// ServerSyncProgress is only valid during a server being in the AUTHENTICATED state and therefor in the syncing process
|
||||
// it returns a double (0-1) representing the estimated progress of the syncing
|
||||
func (ci *Conversation) ServerSyncProgress() float64 {
|
||||
|
|
|
@ -6,6 +6,13 @@ type MessageWrapper struct {
|
|||
Data string `json:"d"`
|
||||
}
|
||||
|
||||
// Overlay Identifiers now have a dual strucutre, their full id defines the overlay
|
||||
// type, and thus the encoding of the Data.
|
||||
// But the last 8 bits of the overlay now also encode the **channel* that the overlay is sent
|
||||
// to. This is needed for Hybrid Groups where there exists both a main channel and the
|
||||
// management channel (whose messages we don't want to show up in a default fetch).
|
||||
// To support backwards compatibility, any overlay id less than 0x300 to resolve to 0
|
||||
|
||||
// OverlayChat is the canonical identifier for chat overlays
|
||||
const OverlayChat = 1
|
||||
|
||||
|
@ -17,3 +24,6 @@ const OverlayInviteGroup = 101
|
|||
|
||||
// OverlayFileSharing is the canonical identifier for the file sharing overlay
|
||||
const OverlayFileSharing = 200
|
||||
|
||||
// ManageGroupEvent is the canonical identifier for the manage group overlay
|
||||
const OverlayManageGroupEvent = 0x302
|
||||
dan
commented
shouldn't there also be a 0x300 for the admin channel 0 then? I'm not sure I get why we're overloading channel into overlay here and not just another byte sized field in managed group messages? don't this mean adding binary logic somewhere to check an overlay contains 0x300 then? shouldn't there also be a 0x300 for the admin channel 0 then?
I'm not sure I get why we're overloading channel into overlay here and not just another byte sized field in managed group messages? don't this mean adding binary logic somewhere to check an overlay contains 0x300 then?
|
||||
|
|
|
@ -421,6 +421,36 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
|
|||
}
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) SignMessage(blob []byte) ([]byte, error) {
|
||||
privateKey, err := cp.storage.LoadProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey")
|
||||
if err != nil {
|
||||
log.Errorf("error loading private key from storage")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
publicKey, err := cp.storage.LoadProfileKeyValue(TypePublicKey, "Ed25519PublicKey")
|
||||
if err != nil {
|
||||
log.Errorf("error loading public key from storage")
|
||||
return nil, err
|
||||
}
|
||||
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
|
||||
return identity.Sign(blob), nil
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) InternalInsertMessage(conversation int, channel int, author string, body string, attrs model.Attributes, signature []byte) (int, error) {
|
||||
signatureB64 := base64.StdEncoding.EncodeToString(signature)
|
||||
return cp.storage.InsertMessage(conversation, channel, body, attrs, signatureB64, model.CalculateContentHash(author, body))
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) resolveChannel(overlay int) int {
|
||||
requestedChannel := overlay & 0xFF
|
||||
dan
commented
right like this. we use big json blob structs for so much, why bit packing for this? also, is there an envisioned use for the range between 0xFF and 0x300? that seems abandoned? right like this. we use big json blob structs for so much, why bit packing for this?
also, is there an envisioned use for the range between 0xFF and 0x300? that seems abandoned?
|
||||
// Legacy mapping of old chat/group overlays to channel 0
|
||||
if overlay < 0x300 {
|
||||
requestedChannel = 0
|
||||
}
|
||||
return requestedChannel
|
||||
}
|
||||
|
||||
// SendMessage is a higher level that merges sending messages to contacts and group handles
|
||||
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
|
||||
// this function will error
|
||||
|
@ -436,8 +466,22 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
|
|||
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message})
|
||||
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
|
||||
|
||||
var cm model.MessageWrapper
|
||||
err = json.Unmarshal([]byte(message), &cm)
|
||||
|
||||
// we are now explictly rejecting invalidly encoded messages...
|
||||
dan
commented
was this happening a lot? seems like that's a failing of our API then. perhaps the exposed send message shouldn't take a pre encoded message wrapper but the arguments ti construct one? was this happening a lot?
seems like that's a failing of our API then. perhaps the exposed send message shouldn't take a pre encoded message wrapper but the arguments ti construct one?
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
requestedChannel := cp.resolveChannel(cm.Overlay)
|
||||
// if this channel has not been registered for a particular conversation
|
||||
// then default to channel = 0;
|
||||
if conversationInfo.HasChannel(requestedChannel) {
|
||||
dan
commented
resolveChannel will turn all legacy overlays to have channel 0 and hasChannel auto trues 0. for managed groups was it 0 or 2 that is the admin channel? do we need this manual setting? wont managed group messages have a channel explicitly set? resolveChannel will turn all legacy overlays to have channel 0 and hasChannel auto trues 0.
for managed groups was it 0 or 2 that is the admin channel? do we need this manual setting? wont managed group messages have a channel explicitly set?
|
||||
requestedChannel = 0
|
||||
}
|
||||
|
||||
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
|
||||
id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
|
||||
id, err := cp.storage.InsertMessage(conversationInfo.ID, requestedChannel, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
@ -700,6 +744,14 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
|
|||
return groupConversationID, err
|
||||
}
|
||||
|
||||
// NewConversation create a new multi-peer conversation.
|
||||
func (cp *cwtchPeer) NewConversation(handle string, acl model.AccessControlList) (int, error) {
|
||||
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, acl, true)
|
||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne)
|
||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano))
|
||||
return conversationID, err
|
||||
}
|
||||
|
||||
// NewContactConversation create a new p2p conversation with the given acl applied to the handle.
|
||||
func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) {
|
||||
cp.mutex.Lock()
|
||||
|
@ -717,6 +769,8 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
|
|||
|
||||
// UpdateConversationAccessControlList is a genric ACL update method
|
||||
func (cp *cwtchPeer) UpdateConversationAccessControlList(id int, acl model.AccessControlList) error {
|
||||
// set the ACL version to the most recent version
|
||||
cp.SetConversationAttribute(id, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne)
|
||||
return cp.storage.SetConversationACL(id, acl)
|
||||
}
|
||||
|
||||
|
@ -880,6 +934,26 @@ func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath)
|
|||
return val, nil
|
||||
}
|
||||
|
||||
// SetConversationAttributeInt sets the conversation attribute at path to an integer value
|
||||
func (cp *cwtchPeer) SetConversationAttributeInt(id int, path attr.ScopedZonedPath, value int) error {
|
||||
strvalue := strconv.Itoa(value)
|
||||
return cp.storage.SetConversationAttribute(id, path, strvalue)
|
||||
}
|
||||
|
||||
// GetConversationAttributeInt is a method for retrieving an integer value of a given conversation
|
||||
func (cp *cwtchPeer) GetConversationAttributeInt(id int, path attr.ScopedZonedPath) (int, error) {
|
||||
ci, err := cp.storage.GetConversation(id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
val, exists := ci.Attributes[path.ToString()]
|
||||
if !exists {
|
||||
return 0, fmt.Errorf("%v does not exist for conversation %v", path.ToString(), id)
|
||||
}
|
||||
intvalue, err := strconv.Atoi(val)
|
||||
return intvalue, err
|
||||
}
|
||||
|
||||
// GetChannelMessage returns a message from a conversation channel referenced by the absolute ID.
|
||||
// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position
|
||||
// in the table (e.g. deleted messages, expired messages, etc.)
|
||||
|
@ -1414,6 +1488,8 @@ func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) {
|
|||
if conversation.model.IsServer() && cp.IsFeatureEnabled(constants.GroupsExperiment) {
|
||||
log.Debugf(" QueueJoinServer(%v)", conversation.model.Handle)
|
||||
cp.QueueJoinServer(conversation.model.Handle)
|
||||
} else if conversation.model.IsManaged() {
|
||||
// do nothing...
|
||||
} else {
|
||||
log.Debugf(" QueuePeerWithOnion(%v)", conversation.model.Handle)
|
||||
if conversation.model.GetPeerAC().AutoConnect {
|
||||
|
@ -1482,9 +1558,23 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
|
|||
}
|
||||
}
|
||||
|
||||
var cm model.MessageWrapper
|
||||
dan
commented
this check again? at least make it a private helper function like this check again? at least make it a private helper function like `validateMessageWrapper`
|
||||
err = json.Unmarshal([]byte(message), &cm)
|
||||
|
||||
// we are now explictly rejecting invalidly encoded messages...
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
requestedChannel := cp.resolveChannel(cm.Overlay)
|
||||
// if this channel has not been registered for a particular conversation
|
||||
// then default to channel = 0;
|
||||
if ci.HasChannel(requestedChannel) {
|
||||
requestedChannel = 0
|
||||
}
|
||||
// Generate a random number and use it as the signature
|
||||
signature := event.GetRandNumber().String()
|
||||
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
|
||||
return cp.storage.InsertMessage(ci.ID, requestedChannel, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
|
||||
}
|
||||
|
||||
// eventHandler process events from other subsystems
|
||||
|
@ -1499,6 +1589,7 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
|
||||
log.Infof("Protocol engine for %s has stopped listening: %v", onion, ev.Data[event.Error])
|
||||
cp.mutex.Unlock()
|
||||
cp.processExtensionsEvent(ev)
|
||||
case event.EncryptedGroupMessage:
|
||||
|
||||
// If successful, a side effect is the message is added to the group's timeline
|
||||
|
@ -1543,6 +1634,7 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
case event.NewMessageFromPeerEngine: //event.TimestampReceived, event.RemotePeer, event.Data
|
||||
ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
||||
id, err := cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
|
||||
cp.processExtensionsEvent(ev)
|
||||
if err == nil {
|
||||
// Republish as NewMessageFromPeer
|
||||
ev.EventType = event.NewMessageFromPeer
|
||||
|
@ -1671,22 +1763,7 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
cp.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Safe Access to Extensions
|
||||
cp.extensionLock.Lock()
|
||||
for _, extension := range cp.extensions {
|
||||
log.Debugf("checking extension...%v", extension)
|
||||
// check if the current map of experiments satisfies the extension requirements
|
||||
if !cp.checkExtensionExperiment(extension) {
|
||||
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
|
||||
continue
|
||||
}
|
||||
if cp.checkEventExperiment(extension, ev.EventType) {
|
||||
extension.extension.OnEvent(ev, cp)
|
||||
}
|
||||
}
|
||||
cp.extensionLock.Unlock()
|
||||
|
||||
cp.processExtensionsEvent(ev)
|
||||
case event.ServerStateChange:
|
||||
cp.mutex.Lock()
|
||||
prevState := cp.state[ev.Data[event.GroupServer]]
|
||||
|
@ -1750,7 +1827,7 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
|
||||
// check if the current map of experiments satisfies the extension requirements
|
||||
if !cp.checkExtensionExperiment(extension) {
|
||||
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
|
||||
log.Debugf("skipping extension (%v) ..not all experiments satisfied", extension)
|
||||
if cp.checkEventExperiment(extension, ev.EventType) {
|
||||
// If this experiment was enabled...we might have processed this event...
|
||||
// To avoid flagging an error later on in this method we set processed to true.
|
||||
|
@ -1796,6 +1873,24 @@ func (cp *cwtchPeer) checkExtensionExperiment(hook ProfileHook) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) processExtensionsEvent(ev event.Event) {
|
||||
cp.extensionLock.Lock()
|
||||
for _, extension := range cp.extensions {
|
||||
|
||||
// check if the current map of experiments satisfies the extension requirements
|
||||
if !cp.checkExtensionExperiment(extension) {
|
||||
log.Debugf("skipping extension (%v) ..not all experiments satisfied", extension)
|
||||
continue
|
||||
}
|
||||
|
||||
// if the extension is registered for this event type then process
|
||||
if _, contains := extension.events[ev.EventType]; contains {
|
||||
extension.extension.OnEvent(ev, cp)
|
||||
}
|
||||
}
|
||||
cp.extensionLock.Unlock()
|
||||
}
|
||||
|
||||
// attemptInsertOrAcknowledgeLegacyGroupConversation is a convenience method that looks up the conversation
|
||||
// by the given handle and attempts to mark the message as acknowledged. returns error on failure
|
||||
// to either find the contact or the associated message
|
||||
|
|
|
@ -114,7 +114,13 @@ type CwtchPeer interface {
|
|||
ImportBundle(string) error
|
||||
EnhancedImportBundle(string) string
|
||||
|
||||
SignMessage(blob []byte) ([]byte, error)
|
||||
|
||||
// Used for Internal Bookkeeping by Extensions, **do not expose in autobindings**
|
||||
InternalInsertMessage(conversation int, channel int, author string, body string, attributes model.Attributes, signature []byte) (int, error)
|
||||
|
||||
// New Unified Conversation Interfaces
|
||||
NewConversation(handle string, acl model.AccessControlList) (int, error)
|
||||
NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error)
|
||||
FetchConversations() ([]*model.Conversation, error)
|
||||
ArchiveConversation(conversation int)
|
||||
|
@ -135,12 +141,15 @@ type CwtchPeer interface {
|
|||
|
||||
SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error
|
||||
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
|
||||
SetConversationAttributeInt(conversation int, path attr.ScopedZonedPath, value int) error
|
||||
GetConversationAttributeInt(conversation int, path attr.ScopedZonedPath) (int, error)
|
||||
DeleteConversation(conversation int) error
|
||||
|
||||
// New Unified Conversation Channel Interfaces
|
||||
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
|
||||
GetChannelMessageCount(conversation int, channel int) (int, error)
|
||||
GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error)
|
||||
GetChannelMessageBySignature(conversationID int, channelID int, signature string) (int, error)
|
||||
GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error)
|
||||
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
|
||||
SearchConversations(pattern string) string
|
||||
|
|
|
@ -34,26 +34,6 @@ var (
|
|||
carolLines = []string{"Howdy, thanks!"}
|
||||
)
|
||||
|
||||
func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
|
||||
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
|
||||
for {
|
||||
log.Infof("%v checking connection...\n", peerName)
|
||||
state := peer.GetPeerState(addr)
|
||||
log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state])
|
||||
if state == connections.FAILED {
|
||||
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
|
||||
}
|
||||
if state != target {
|
||||
log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
|
||||
time.Sleep(time.Second * 5)
|
||||
continue
|
||||
} else {
|
||||
log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitForRetVal(peer peer.CwtchPeer, convId int, szp attr.ScopedZonedPath) {
|
||||
for {
|
||||
_, err := peer.GetConversationAttribute(convId, szp)
|
||||
|
@ -236,10 +216,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
t.Fatalf("Alice password did not change...")
|
||||
}
|
||||
|
||||
waitForConnection(t, alice, bob.GetOnion(), connections.AUTHENTICATED)
|
||||
waitForConnection(t, alice, carol.GetOnion(), connections.AUTHENTICATED)
|
||||
waitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED)
|
||||
waitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED)
|
||||
WaitForConnection(t, alice, bob.GetOnion(), connections.AUTHENTICATED)
|
||||
WaitForConnection(t, alice, carol.GetOnion(), connections.AUTHENTICATED)
|
||||
WaitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED)
|
||||
WaitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED)
|
||||
|
||||
log.Infof("Alice and Bob getVal public.name...")
|
||||
|
||||
|
@ -323,9 +303,9 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
}
|
||||
|
||||
log.Infof("Waiting for alice to join server...")
|
||||
waitForConnection(t, alice, ServerAddr, connections.SYNCED)
|
||||
WaitForConnection(t, alice, ServerAddr, connections.SYNCED)
|
||||
log.Infof("Waiting for Bob to join connect to group server...")
|
||||
waitForConnection(t, bob, ServerAddr, connections.SYNCED)
|
||||
WaitForConnection(t, bob, ServerAddr, connections.SYNCED)
|
||||
|
||||
// 1 = Alice
|
||||
// 2 = Server
|
||||
|
@ -351,7 +331,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
if len(cachedTokens) > (usedTokens + len(carolLines)) {
|
||||
carol.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)])
|
||||
}
|
||||
waitForConnection(t, carol, ServerAddr, connections.SYNCED)
|
||||
WaitForConnection(t, carol, ServerAddr, connections.SYNCED)
|
||||
numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
|
||||
|
||||
// Check Alice Timeline
|
||||
|
|
|
@ -0,0 +1,217 @@
|
|||
package testing
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
app2 "cwtch.im/cwtch/app"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/functionality/hybrid"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
_ "github.com/mutecomm/go-sqlcipher/v4"
|
||||
mrand "math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestHyrbidGroupIntegration(t *testing.T) {
|
||||
|
||||
os.RemoveAll("./storage")
|
||||
os.RemoveAll("./managerstorage")
|
||||
|
||||
// Goroutine Monitoring Start..
|
||||
numGoRoutinesStart := runtime.NumGoroutine()
|
||||
|
||||
log.AddEverythingFromPattern("connectivity")
|
||||
log.SetLevel(log.LevelInfo)
|
||||
log.ExcludeFromPattern("connection/connection")
|
||||
log.ExcludeFromPattern("outbound/3dhauthchannel")
|
||||
log.ExcludeFromPattern("event/eventmanager")
|
||||
log.ExcludeFromPattern("tapir")
|
||||
|
||||
os.Mkdir("tordir", 0700)
|
||||
dataDir := path.Join("tordir", "tor")
|
||||
os.MkdirAll(dataDir, 0700)
|
||||
|
||||
// we don't need real randomness for the port, just to avoid a possible conflict...
|
||||
socksPort := mrand.Intn(1000) + 9051
|
||||
controlPort := mrand.Intn(1000) + 9052
|
||||
|
||||
// generate a random password
|
||||
key := make([]byte, 64)
|
||||
_, err := rand.Read(key)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
useCache := os.Getenv("TORCACHE") == "true"
|
||||
|
||||
torDataDir := ""
|
||||
if useCache {
|
||||
log.Infof("using tor cache")
|
||||
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
|
||||
os.MkdirAll(torDataDir, 0700)
|
||||
} else {
|
||||
log.Infof("using clean tor data dir")
|
||||
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
|
||||
t.Fatalf("could not create data dir")
|
||||
}
|
||||
}
|
||||
|
||||
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
|
||||
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
|
||||
if err != nil {
|
||||
t.Fatalf("Could not start Tor: %v", err)
|
||||
}
|
||||
log.Infof("Waiting for tor to bootstrap...")
|
||||
acn.WaitTillBootstrapped()
|
||||
defer acn.Close()
|
||||
|
||||
// ***** Cwtch Server management *****
|
||||
|
||||
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"))
|
||||
managerApp := app2.NewApp(acn, "./managerstorage", app2.LoadAppSettings("./managerstorage"))
|
||||
dan
commented
why a second app for the manager profile? why a second app for the manager profile?
|
||||
|
||||
// ***** cwtchPeer setup *****
|
||||
// Turn on Groups Experiment...
|
||||
settings := app.ReadSettings()
|
||||
settings.ExperimentsEnabled = true
|
||||
settings.Experiments[constants.GroupsExperiment] = true
|
||||
settings.Experiments[constants.GroupManagerExperiment] = false
|
||||
app.UpdateSettings(settings)
|
||||
|
||||
// Create a Manager App that has the Group Manager Experiment Enabled....
|
||||
managerSettings := managerApp.ReadSettings()
|
||||
managerSettings.ExperimentsEnabled = true
|
||||
managerSettings.Experiments[constants.GroupsExperiment] = true
|
||||
managerSettings.Experiments[constants.GroupManagerExperiment] = true
|
||||
managerApp.UpdateSettings(managerSettings)
|
||||
|
||||
alice := MakeProfile(app, "Alice")
|
||||
bob := MakeProfile(app, "Bob")
|
||||
manager := MakeProfile(managerApp, "Manager")
|
||||
|
||||
waitTime := time.Duration(60) * time.Second
|
||||
log.Infof("** Waiting for Alice, Bob, and Carol to register their onion hidden service on the network... (%v)\n", waitTime)
|
||||
time.Sleep(waitTime)
|
||||
log.Infof("** Wait Done!")
|
||||
|
||||
// Ok Lets Start By Creating a Hybrid Group...
|
||||
|
||||
hgmf := hybrid.GroupManagerFunctionality{}
|
||||
ci, err := hgmf.ManageNewGroup(manager)
|
||||
if err != nil {
|
||||
t.Fatalf("could not create hybrid group: %v", err)
|
||||
}
|
||||
log.Infof("created a hybrid group: %d. moving onto adding hybrid contacts...", ci)
|
||||
err = hgmf.AddHybridContact(manager, alice.GetOnion())
|
||||
if err != nil {
|
||||
t.Fatalf("could not create hybrid contact (alice): %v", err)
|
||||
}
|
||||
err = hgmf.AddHybridContact(manager, bob.GetOnion())
|
||||
if err != nil {
|
||||
t.Fatalf("could not create hybrid contact (bob): %v", err)
|
||||
}
|
||||
|
||||
// Now we can allow alice, bob and carol to create a new hybrid group...
|
||||
dan
commented
nit: there is no carol nit: there is no carol
|
||||
log.Infof("now we can allow alice bob and carol to join the hybrid group")
|
||||
mgf := hybrid.ManagedGroupFunctionality{}
|
||||
err = mgf.NewManagedGroup(alice, manager.GetOnion())
|
||||
if err != nil {
|
||||
t.Fatalf("could not create hybrid group contact (carol): %v", err)
|
||||
}
|
||||
alice.PeerWithOnion(manager.GetOnion()) // explictly trigger a peer request
|
||||
err = mgf.NewManagedGroup(bob, manager.GetOnion())
|
||||
if err != nil {
|
||||
t.Fatalf("could not create hybrid group contact (carol): %v", err)
|
||||
}
|
||||
bob.PeerWithOnion(manager.GetOnion())
|
||||
|
||||
log.Infof("waiting for alice and manager to connect")
|
||||
WaitForConnection(t, alice, manager.GetOnion(), connections.AUTHENTICATED)
|
||||
log.Infof("waiting for bob and manager to connect")
|
||||
WaitForConnection(t, bob, manager.GetOnion(), connections.AUTHENTICATED)
|
||||
|
||||
// at this pont we should be able to send messages to the group, and receive them in the timeline
|
||||
log.Infof("sending message to group")
|
||||
_, err = mgf.SendMessageToManagedGroup(alice, 1, "hello everyone!!!")
|
||||
if err != nil {
|
||||
t.Fatalf("hybrid group sending failed... %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 10)
|
||||
|
||||
bobMessages, err := bob.GetMostRecentMessages(1, 0, 0, 1)
|
||||
if err != nil || len(bobMessages) != 1 {
|
||||
t.Fatalf("hybrid group receipt failed... %v", err)
|
||||
}
|
||||
|
||||
if bobMessages[0].Body != "hello everyone!!!" {
|
||||
t.Fatalf("hybrid group receipt failed...message does not match")
|
||||
}
|
||||
|
||||
aliceMessages, err := alice.GetMostRecentMessages(1, 0, 0, 1)
|
||||
if err != nil || len(aliceMessages) != 1 {
|
||||
t.Fatalf("hybrid group receipt failed... %v", err)
|
||||
}
|
||||
|
||||
if aliceMessages[0].Attr[constants.AttrAck] != constants.True {
|
||||
t.Fatalf("hybrid group receipt failed...alice's message was not ack'd")
|
||||
}
|
||||
|
||||
// Time to Clean Up....
|
||||
log.Infof("Shutting down Alice...")
|
||||
app.ShutdownPeer(alice.GetOnion())
|
||||
time.Sleep(time.Second * 3)
|
||||
|
||||
log.Infof("Shutting down Bob...")
|
||||
app.ShutdownPeer(bob.GetOnion())
|
||||
time.Sleep(time.Second * 3)
|
||||
|
||||
log.Infof("Shutting fown Manager...")
|
||||
managerApp.ShutdownPeer(manager.GetOnion())
|
||||
time.Sleep(time.Second * 3)
|
||||
|
||||
log.Infof("Shutting down apps...")
|
||||
log.Infof("app Shutdown: %v\n", runtime.NumGoroutine())
|
||||
app.Shutdown()
|
||||
managerApp.Shutdown()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
log.Infof("Done shutdown: %v\n", runtime.NumGoroutine())
|
||||
|
||||
log.Infof("Shutting down ACN...")
|
||||
acn.Close()
|
||||
time.Sleep(time.Second * 60) // the network status / heartbeat plugin might keep goroutines alive for a minute before killing them
|
||||
|
||||
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
|
||||
|
||||
// Printing out the current goroutines
|
||||
// Very useful if we are leaking any.
|
||||
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
fmt.Println("")
|
||||
|
||||
if numGoRoutinesStart != numGoRoutinesPostAppShutdown {
|
||||
t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, v detected!", numGoRoutinesStart, numGoRoutinesPostAppShutdown)
|
||||
}
|
||||
}
|
||||
|
||||
func MakeProfile(application app2.Application, name string) peer.CwtchPeer {
|
||||
application.CreateProfile(name, "asdfasdf", true)
|
||||
p := app2.WaitGetPeer(application, name)
|
||||
application.ConfigureConnections(p.GetOnion(), true, true, false)
|
||||
log.Infof("%s created: %s", name, p.GetOnion())
|
||||
// bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
|
||||
p.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
||||
return p
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package testing
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
_ "github.com/mutecomm/go-sqlcipher/v4"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func WaitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
|
||||
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
|
||||
for {
|
||||
log.Infof("%v checking connection...\n", peerName)
|
||||
state := peer.GetPeerState(addr)
|
||||
log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state])
|
||||
if state == connections.FAILED {
|
||||
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
|
||||
}
|
||||
if state != target {
|
||||
log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
|
||||
time.Sleep(time.Second * 5)
|
||||
continue
|
||||
} else {
|
||||
log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
Can you add/update the comment above event.ProtocolEngineCreated in common.go. We don't type arguments to events but I've been trying to at least leave comments there about how to use them