Initial Sketch of Managed Groups
continuous-integration/drone/push Build was killed Details

This commit is contained in:
Sarah Jamie Lewis 2022-05-12 13:41:18 -07:00
parent bc38f4ec0a
commit 0aa0c94af7
11 changed files with 280 additions and 20 deletions

2
.gitignore vendored
View File

@ -28,4 +28,6 @@ tokens1.db
arch/ arch/
testing/encryptedstorage/encrypted_storage_profiles testing/encryptedstorage/encrypted_storage_profiles
testing/encryptedstorage/tordir testing/encryptedstorage/tordir
testing/*/storage
testing/*/tordir
*.tar.gz *.tar.gz

View File

@ -0,0 +1,69 @@
package groupmanagement
import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"encoding/json"
"errors"
"fmt"
)
// Functionality groups some common UI triggered functions for contacts...
type Functionality struct {
}
// FunctionalityGate returns groupmanagement if enabled in the given experiment map
// Note: Experiment maps are currently in libcwtch-go
func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) {
if experimentMap[constants.GroupManagementExperiment] {
return new(Functionality), nil
}
return nil, errors.New("groupmanagement is not enabled")
}
// CreateManagedGroup is a convenience function for creating a new managed group
func (f *Functionality) CreateManagedGroup(cp peer.CwtchPeer, name string, server string) (int, error) {
return cp.StartGroup(name, server)
}
// AddNewMember is a convenience function for adding a new member to a group and sending the updated ACL to the group.
func (f *Functionality) AddNewMember(cp peer.CwtchPeer, id int, handle string, ac model.AccessControl) error {
err := cp.DoStorageTransaction(func(storage *peer.CwtchProfileStorage) error {
ci, err := storage.GetConversation(id)
if err != nil {
return err
}
requesterHandle, _ := storage.LoadProfileKeyValue(peer.TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
if ci.ACL[string(requesterHandle)].ManageACL {
aclCopy := ci.ACL
aclCopy[handle] = ac
storage.SetConversationACL(id, ci.ACL)
}
return fmt.Errorf("unable to add a member to the conversation")
})
if err == nil {
return f.syncACL(cp, id)
}
return err
}
// syncACL is a convenience function for sending the latest ACL to an existing group.
func (f *Functionality) syncACL(cp peer.CwtchPeer, id int) error {
ci, err := cp.GetConversationInfo(id)
if err != nil {
return err
}
wrapper := model.MessageWrapper{
Overlay: model.OverlayGroupManagement,
Data: string(ci.ACL.Serialize()),
}
message, _ := json.Marshal(wrapper)
_, err = cp.SendMessage(id, string(message))
return err
}

View File

@ -12,3 +12,6 @@ const ImagePreviewMaxSizeInBytes = 20971520
// AutoDLFileExts Files with these extensions will be autodownloaded using ImagePreviewsExperiment // AutoDLFileExts Files with these extensions will be autodownloaded using ImagePreviewsExperiment
var AutoDLFileExts = [...]string{".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"} var AutoDLFileExts = [...]string{".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
// GroupManagementExperiment Allows interfacing with managed groups
const GroupManagementExperiment = "groupmanagement"

View File

@ -9,14 +9,20 @@ import (
// AccessControl is a type determining client assigned authorization to a peer // AccessControl is a type determining client assigned authorization to a peer
type AccessControl struct { type AccessControl struct {
Blocked bool // Any attempts from this handle to connect are blocked Blocked bool // Any attempts from this handle to connect are blocked
Read bool // Allows a handle to access the conversation Read bool // Allows a handle to access the conversation
Append bool // Allows a handle to append new messages to the conversation Append bool // Allows a handle to append new messages to the conversation
ManageACL bool // Allows a handle to modify the ACL for a conversation
}
func (c AccessControl) Serialize() string {
data, _ := json.Marshal(c)
return string(data)
} }
// DefaultP2PAccessControl - because in the year 2021, go does not support constant structs... // DefaultP2PAccessControl - because in the year 2021, go does not support constant structs...
func DefaultP2PAccessControl() AccessControl { func DefaultP2PAccessControl() AccessControl {
return AccessControl{Read: true, Append: true, Blocked: false} return AccessControl{Read: true, Append: true, Blocked: false, ManageACL: false}
} }
// AccessControlList represents an access control list for a conversation. Mapping handles to conversation // AccessControlList represents an access control list for a conversation. Mapping handles to conversation

View File

@ -41,6 +41,17 @@ type Group struct {
LocalID string LocalID string
} }
// GroupInvite provides a structured type for communicating group information to peers
type GroupInvite struct {
GroupID string
GroupName string
SignedGroupID []byte
Timestamp uint64
SharedKey []byte
ServerHost string
ACL AccessControlList `json:",omitempty"`
}
// NewGroup initializes a new group associated with a given CwtchServer // NewGroup initializes a new group associated with a given CwtchServer
func NewGroup(server string) (*Group, error) { func NewGroup(server string) (*Group, error) {
group := new(Group) group := new(Group)
@ -79,7 +90,7 @@ func deriveGroupID(groupKey []byte, serverHostname string) string {
// Invite generates a invitation that can be sent to a cwtch peer // Invite generates a invitation that can be sent to a cwtch peer
func (g *Group) Invite() (string, error) { func (g *Group) Invite() (string, error) {
gci := &groups.GroupInvite{ gci := &GroupInvite{
GroupID: g.GroupID, GroupID: g.GroupID,
GroupName: g.GroupName, GroupName: g.GroupName,
SharedKey: g.GroupKey[:], SharedKey: g.GroupKey[:],
@ -126,13 +137,13 @@ func (g *Group) DecryptMessage(ciphertext []byte) (bool, *groups.DecryptedGroupM
// ValidateInvite takes in a serialized invite and returns the invite structure if it is cryptographically valid // ValidateInvite takes in a serialized invite and returns the invite structure if it is cryptographically valid
// and an error if it is not // and an error if it is not
func ValidateInvite(invite string) (*groups.GroupInvite, error) { func ValidateInvite(invite string) (*GroupInvite, error) {
// We prefix invites for groups with torv3 // We prefix invites for groups with torv3
if strings.HasPrefix(invite, GroupInvitePrefix) { if strings.HasPrefix(invite, GroupInvitePrefix) {
data, err := base64.StdEncoding.DecodeString(invite[len(GroupInvitePrefix):]) data, err := base64.StdEncoding.DecodeString(invite[len(GroupInvitePrefix):])
if err == nil { if err == nil {
// First attempt to unmarshal the json... // First attempt to unmarshal the json...
var gci groups.GroupInvite var gci GroupInvite
err := json.Unmarshal(data, &gci) err := json.Unmarshal(data, &gci)
if err == nil { if err == nil {

View File

@ -17,3 +17,6 @@ const OverlayInviteGroup = 101
// OverlayFileSharing is the canonical identifier for the file sharing overlay // OverlayFileSharing is the canonical identifier for the file sharing overlay
const OverlayFileSharing = 200 const OverlayFileSharing = 200
// OverlayGroupManagement is the canonical identifier for the group management protocol
const OverlayGroupManagement = 500

View File

@ -72,6 +72,12 @@ type cwtchPeer struct {
eventBus event.Manager eventBus event.Manager
} }
func (cp *cwtchPeer) DoStorageTransaction(f func(storage *CwtchProfileStorage) error) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return f(cp.storage)
}
func (cp *cwtchPeer) Export(file string) error { func (cp *cwtchPeer) Export(file string) error {
cp.mutex.Lock() cp.mutex.Lock()
defer cp.mutex.Unlock() defer cp.mutex.Unlock()
@ -469,7 +475,7 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
return -1, err return -1, err
} }
cp.mutex.Lock() cp.mutex.Lock()
groupConversationID, err := cp.storage.NewConversation(gci.GroupID, map[string]string{}, model.AccessControlList{}, true) groupConversationID, err := cp.storage.NewConversation(gci.GroupID, map[string]string{}, gci.ACL, true)
cp.mutex.Unlock() cp.mutex.Unlock()
if err == nil { if err == nil {
cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), gci.GroupID) cp.SetConversationAttribute(groupConversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupID)), gci.GroupID)
@ -536,6 +542,24 @@ func (cp *cwtchPeer) BlockConversation(id int) error {
return cp.storage.SetConversationACL(id, ci.ACL) return cp.storage.SetConversationACL(id, ci.ACL)
} }
func (cp *cwtchPeer) AddMember(id int, handle string, ac model.AccessControl) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
ci, err := cp.storage.GetConversation(id)
if err != nil {
return err
}
requesterHandle, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
if ci.ACL[string(requesterHandle)].ManageACL {
aclCopy := ci.ACL
aclCopy[handle] = ac
cp.storage.SetConversationACL(id, ci.ACL)
}
return fmt.Errorf("unable to add a member to the conversation")
}
// UnblockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true` // UnblockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
// Further actions depend on the Accepted field // Further actions depend on the Accepted field
func (cp *cwtchPeer) UnblockConversation(id int) error { func (cp *cwtchPeer) UnblockConversation(id int) error {
@ -655,9 +679,17 @@ func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id in
// Status: TODO change server handle to conversation id...? // Status: TODO change server handle to conversation id...?
func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) { func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) {
group, err := model.NewGroup(server) group, err := model.NewGroup(server)
manager := cp.GetOnion()
if err == nil { if err == nil {
cp.mutex.Lock() cp.mutex.Lock()
conversationID, err := cp.storage.NewConversation(group.GroupID, map[string]string{}, model.AccessControlList{}, true) conversationID, err := cp.storage.NewConversation(group.GroupID, map[string]string{}, model.AccessControlList{
manager: model.AccessControl{
ManageACL: true,
Read: true,
Append: true,
Blocked: false,
},
}, true)
cp.mutex.Unlock() cp.mutex.Unlock()
if err != nil { if err != nil {
return -1, err return -1, err
@ -1181,6 +1213,7 @@ func (cp *cwtchPeer) eventHandler() {
err = manifest.Save(manifestFilePath) err = manifest.Save(manifestFilePath)
if err != nil { if err != nil {
log.Errorf("could not save manifest: %v", err) log.Errorf("could not save manifest: %v", err)
cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.error", fileKey), err.Error())
} else { } else {
tempFile := "" tempFile := ""
if runtime.GOOS == "android" { if runtime.GOOS == "android" {
@ -1322,6 +1355,26 @@ func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversat
contenthash := model.CalculateContentHash(dm.Onion, dm.Text) contenthash := model.CalculateContentHash(dm.Onion, dm.Text)
id, err := cp.storage.InsertMessage(conversationID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.True, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano)}, signature, contenthash) id, err := cp.storage.InsertMessage(conversationID, 0, dm.Text, model.Attributes{constants.AttrAck: constants.True, "PreviousSignature": base64.StdEncoding.EncodeToString(dm.PreviousMessageSig), constants.AttrAuthor: dm.Onion, constants.AttrSentTimestamp: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano)}, signature, contenthash)
if err == nil { if err == nil {
/// Time to Handle any Meta-Actions
overlayMessage := new(model.MessageWrapper)
err := json.Unmarshal([]byte(dm.Text), overlayMessage)
if err == nil {
if overlayMessage.Overlay == model.OverlayGroupManagement {
ci, _ := cp.storage.GetConversation(conversationID)
// NOTE: this is safe because dm.Onion will only be valid if there is a valid signature
// from the onion on the message. As such we don't re-verify the signature here.
acl, exists := ci.ACL[dm.Onion]
if exists && acl.ManageACL {
newACL := new(model.AccessControlList)
err := json.Unmarshal([]byte(overlayMessage.Data), newACL)
if err == nil {
cp.storage.SetConversationACL(conversationID, *newACL)
}
}
}
}
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.TimestampSent: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano), event.RemotePeer: dm.Onion, event.Index: strconv.Itoa(id), event.Data: dm.Text, event.ContentHash: contenthash})) cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.TimestampSent: time.Unix(int64(dm.Timestamp), 0).Format(time.RFC3339Nano), event.RemotePeer: dm.Onion, event.Index: strconv.Itoa(id), event.Data: dm.Text, event.ContentHash: contenthash}))
} }
cp.mutex.Unlock() cp.mutex.Unlock()

View File

@ -796,7 +796,7 @@ func (cps *CwtchProfileStorage) Export(filename string) error {
defer tarWriter.Close() defer tarWriter.Close()
// We need to know the base directory so we can import it later (and prevent duplicates)... // We need to know the base directory so we can import it later (and prevent duplicates)...
profilePath := filepath.Base(cps.ProfileDirectory) profilePath := filepath.Base(cps.ProfileDirectory)
err = addFileToTarWriter(profilePath, profileDB, tarWriter) err = addFileToTarWriter(profilePath, profileDB, tarWriter)
if err != nil { if err != nil {

View File

@ -106,6 +106,10 @@ type CwtchPeer interface {
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error) GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
DeleteConversation(conversation int) error DeleteConversation(conversation int) error
AddMember(conversation int, handle string, ac model.AccessControl) error
DoStorageTransaction(func(storage *CwtchProfileStorage) error) error
// New Unified Conversation Channel Interfaces // New Unified Conversation Channel Interfaces
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error) GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
GetChannelMessageCount(conversation int, channel int) (int, error) GetChannelMessageCount(conversation int, channel int) (int, error)

View File

@ -9,16 +9,6 @@ import (
// CwtchServerSyncedCapability is used to indicate that a given cwtch server is synced // CwtchServerSyncedCapability is used to indicate that a given cwtch server is synced
const CwtchServerSyncedCapability = tapir.Capability("CwtchServerSyncedCapability") const CwtchServerSyncedCapability = tapir.Capability("CwtchServerSyncedCapability")
// GroupInvite provides a structured type for communicating group information to peers
type GroupInvite struct {
GroupID string
GroupName string
SignedGroupID []byte
Timestamp uint64
SharedKey []byte
ServerHost string
}
// DecryptedGroupMessage is the main encapsulation of group message data // DecryptedGroupMessage is the main encapsulation of group message data
type DecryptedGroupMessage struct { type DecryptedGroupMessage struct {
Text string Text string

View File

@ -0,0 +1,119 @@
package managed_groups
import (
"crypto/rand"
"cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/utils"
"cwtch.im/cwtch/functionality/groupmanagement"
"cwtch.im/cwtch/model/constants"
"encoding/base64"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"io/ioutil"
mrand "math/rand"
"os"
"os/user"
path "path/filepath"
"runtime"
"runtime/pprof"
"testing"
"time"
_ "github.com/mutecomm/go-sqlcipher/v4"
)
func setupACN(t *testing.T) connectivity.ACN {
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700)
// we don't need real randomness for the port, just to avoid a possible conflict...
mrand.Seed(int64(time.Now().Nanosecond()))
socksPort := mrand.Intn(1000) + 9051
controlPort := mrand.Intn(1000) + 9052
// generate a random password
key := make([]byte, 64)
_, err := rand.Read(key)
if err != nil {
panic(err)
}
torDataDir := ""
if torDataDir, err = ioutil.TempDir(dataDir, "data-dir-"); err != nil {
t.Fatalf("could not create data dir")
}
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
if err != nil {
t.Fatalf("Could not start Tor: %v", err)
}
acn.WaitTillBootstrapped()
return acn
}
func TestManagedGroups(t *testing.T) {
os.RemoveAll("./storage")
os.RemoveAll("./tordir")
numGoRoutinesStart := runtime.NumGoroutine()
log.SetLevel(log.LevelDebug)
acn := setupACN(t)
defer acn.Close()
const ServerKeyBundleBase64 = "eyJLZXlzIjp7ImJ1bGxldGluX2JvYXJkX29uaW9uIjoibmZoeHp2enhpbnJpcGdkaDR0Mm00eGN5M2NyZjZwNGNiaGVjdGdja3VqM2lkc2pzYW90Z293YWQiLCJwcml2YWN5X3Bhc3NfcHVibGljX2tleSI6IjVwd2hQRGJ0c0EvdFI3ZHlUVUkzakpZZnM1L3Jaai9iQ1ZWZEpTc0Jtbk09IiwidG9rZW5fc2VydmljZV9vbmlvbiI6ImVvd25mcTRsNTZxMmU0NWs0bW03MjdsanJod3Z0aDZ5ZWN0dWV1bXB4emJ5cWxnbXVhZm1qdXFkIn0sIlNpZ25hdHVyZSI6IlY5R3NPMHNZWFJ1bGZxdzdmbGdtclVxSTBXS0JlSFIzNjIvR3hGbWZPekpEZjJaRks2ck9jNVRRR1ZxVWIrbXIwV2xId0pwdXh0UW1JRU9KNkplYkNRPT0ifQ=="
const ServerAddr = "nfhxzvzxinripgdh4t2m4xcy3crf6p4cbhectgckuj3idsjsaotgowad"
serverKeyBundle, _ := base64.StdEncoding.DecodeString(ServerKeyBundleBase64)
app := app.NewApp(acn, "./storage")
usr, _ := user.Current()
cwtchDir := path.Join(usr.HomeDir, ".cwtch")
os.Mkdir(cwtchDir, 0700)
os.RemoveAll(path.Join(cwtchDir, "testing"))
os.Mkdir(path.Join(cwtchDir, "testing"), 0700)
t.Logf("Creating Alice...")
app.CreateTaggedPeer("alice", "asdfasdf", "testing")
t.Logf("** Waiting for Alice...")
alice := utils.WaitGetPeer(app, "alice")
t.Logf("Have Alice: %v", alice.GetOnion())
// import server key bundle
_, err := alice.AddServer(string(serverKeyBundle))
if err != nil {
t.Fatalf("could not import server key bundle")
}
// establish new group management functionality
gmf, err := groupmanagement.FunctionalityGate(map[string]bool{constants.GroupManagementExperiment: true})
if err != nil {
t.Fatalf("could not instantiate group management functionality")
}
// create a new managed group bound to the test server
gid, err := gmf.CreateManagedGroup(alice, "Test Managed Group", ServerAddr)
if err != nil {
t.Fatalf("could not create managed group")
}
t.Logf("created new test group: %v", gid)
// Shutdown automatically closes Alice...
app.Shutdown()
acn.Close()
time.Sleep(time.Second * 10)
numGoRoutinesEnd := runtime.NumGoroutine()
if numGoRoutinesStart != numGoRoutinesEnd {
// Printing out the current goroutines
// Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.Fatalf("goroutine leak detected: %v %v", numGoRoutinesStart, numGoRoutinesEnd)
}
}