Compare commits

..

18 Commits

Author SHA1 Message Date
Sarah Jamie Lewis 9b235ba732 Merge pull request 'Gate the Import Bundle method for Managed Groups' (#564) from managed-groups into master
continuous-integration/drone/push Build is pending Details
Reviewed-on: #564
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2024-06-20 16:55:40 +00:00
Sarah Jamie Lewis 8b7cb44e44
Gate the Import Bundle method for Managed Groups
continuous-integration/drone/pr Build is passing Details
2024-06-18 12:57:21 -07:00
Dan Ballard d5145c631d createProfile return onion if possible; add attribute for private name
continuous-integration/drone/push Build is pending Details
2024-06-15 00:17:35 +00:00
Sarah Jamie Lewis 3e09a25b2d Merge pull request 'Managed Group Refinement' (#561) from managed-groups into master
continuous-integration/drone/push Build is pending Details
Reviewed-on: #561
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2024-06-13 17:48:35 +00:00
Sarah Jamie Lewis 229743c507
Managed Group Refinement
continuous-integration/drone/pr Build is passing Details
- Add NoAccessControl Fallback
- Prevent Facade Contacts from the Contact Retry Plugin
- Force Group Manager to Save History by Default
- Fix Ack'ing on Channel_Manager
2024-06-11 11:03:59 -07:00
Sarah Jamie Lewis c5aa6905a4 Merge pull request 'Managed Groups First Cut' (#558) from managed-groups into master
continuous-integration/drone/push Build is pending Details
Reviewed-on: #558
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2024-06-10 21:45:46 +00:00
Sarah Jamie Lewis 74d2aec96a
Fixup Channel Setting and Timeouts for Tests
continuous-integration/drone/pr Build is passing Details
2024-06-10 14:04:34 -07:00
Sarah Jamie Lewis 4bce08dc00
Add explicit check when sending offline to Manager Channel
continuous-integration/drone/pr Build is failing Details
2024-06-10 13:04:33 -07:00
Sarah Jamie Lewis 77c6139792
Add Comments indicating current status of hybrid groups
continuous-integration/drone/pr Build is failing Details
2024-06-10 12:15:14 -07:00
Sarah Jamie Lewis a35374f200
Revert
continuous-integration/drone/pr Build is failing Details
2024-06-10 11:08:42 -07:00
Sarah Jamie Lewis e14044e404
Add JSON Annotations
continuous-integration/drone/pr Build is passing Details
Also fix race condition in app map
2024-06-10 10:34:11 -07:00
Sarah Jamie Lewis fdec3302af
Clarify comments, add constants, clean up tests
continuous-integration/drone/pr Build is failing Details
2024-05-13 12:24:22 -07:00
Sarah Jamie Lewis d61dc30bb2
Managed Groups First Cut 2024-05-13 12:24:22 -07:00
Sarah Jamie Lewis a7b885166a Merge pull request 'Enable per-contact file sharing permissions' (#554) from ep into master
continuous-integration/drone/push Build is pending Details
Reviewed-on: #554
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2024-04-29 15:37:50 +00:00
Sarah Jamie Lewis b32b11c711
Enable per-contact file sharing permissions
continuous-integration/drone/pr Build is passing Details
2024-04-16 11:35:21 -07:00
Sarah Jamie Lewis 0e96539f22 Merge pull request 'Store Messages and Send when Online' (#553) from offline-messages into master
continuous-integration/drone/push Build is pending Details
Reviewed-on: #553
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2024-04-16 18:35:02 +00:00
Sarah Jamie Lewis e55f342324
Updating Logging -> Debug
continuous-integration/drone/pr Build is passing Details
2024-02-26 13:40:47 -08:00
Sarah Jamie Lewis 89aca91b37
Store Messages and Send when Online
continuous-integration/drone/pr Build is passing Details
2024-02-26 13:18:38 -08:00
27 changed files with 1360 additions and 109 deletions

View File

@ -1,10 +1,16 @@
package app package app
import ( import (
"os"
path "path/filepath"
"strconv"
"sync"
"cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
"cwtch.im/cwtch/extensions" "cwtch.im/cwtch/extensions"
"cwtch.im/cwtch/functionality/filesharing" "cwtch.im/cwtch/functionality/filesharing"
"cwtch.im/cwtch/functionality/hybrid"
"cwtch.im/cwtch/functionality/servers" "cwtch.im/cwtch/functionality/servers"
"cwtch.im/cwtch/model" "cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/attr"
@ -15,10 +21,6 @@ import (
"cwtch.im/cwtch/storage" "cwtch.im/cwtch/storage"
"git.openprivacy.ca/openprivacy/connectivity" "git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"os"
path "path/filepath"
"strconv"
"sync"
) )
type application struct { type application struct {
@ -51,7 +53,7 @@ func (app *application) IsFeatureEnabled(experiment string) bool {
// Application is a full cwtch peer application. It allows management, usage and storage of multiple peers // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
type Application interface { type Application interface {
LoadProfiles(password string) LoadProfiles(password string)
CreateProfile(name string, password string, autostart bool) CreateProfile(name string, password string, autostart bool) string
InstallEngineHooks(engineHooks connections.EngineHooks) InstallEngineHooks(engineHooks connections.EngineHooks)
ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error)
EnhancedImportProfile(exportedCwtchFile string, password string) string EnhancedImportProfile(exportedCwtchFile string, password string) string
@ -195,7 +197,7 @@ func (app *application) AddPlugin(peerid string, id plugins.PluginID, bus event.
} }
} }
func (app *application) CreateProfile(name string, password string, autostart bool) { func (app *application) CreateProfile(name string, password string, autostart bool) string {
autostartVal := constants.True autostartVal := constants.True
if !autostart { if !autostart {
autostartVal = constants.False autostartVal = constants.False
@ -205,10 +207,15 @@ func (app *application) CreateProfile(name string, password string, autostart bo
tagVal = constants.ProfileTypeV1DefaultPassword tagVal = constants.ProfileTypeV1DefaultPassword
} }
app.CreatePeer(name, password, map[attr.ZonedPath]string{ profile_id, err := app.CreatePeer(name, password, map[attr.ZonedPath]string{
attr.ProfileZone.ConstructZonedPath(constants.Tag): tagVal, attr.ProfileZone.ConstructZonedPath(constants.Tag): tagVal,
attr.ProfileZone.ConstructZonedPath(constants.PeerAutostart): autostartVal, attr.ProfileZone.ConstructZonedPath(constants.PeerAutostart): autostartVal,
}) })
if err == nil {
return profile_id
}
return ""
} }
func (app *application) setupPeer(profile peer.CwtchPeer) { func (app *application) setupPeer(profile peer.CwtchPeer) {
@ -230,7 +237,7 @@ func (app *application) setupPeer(profile peer.CwtchPeer) {
} }
func (app *application) CreatePeer(name string, password string, attributes map[attr.ZonedPath]string) { func (app *application) CreatePeer(name string, password string, attributes map[attr.ZonedPath]string) (string, error) {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
@ -240,7 +247,7 @@ func (app *application) CreatePeer(name string, password string, attributes map[
if err != nil { if err != nil {
log.Errorf("Error Creating Peer: %v", err) log.Errorf("Error Creating Peer: %v", err)
app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error())) app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error()))
return return "", err
} }
app.setupPeer(profile) app.setupPeer(profile)
@ -251,6 +258,7 @@ func (app *application) CreatePeer(name string, password string, attributes map[
} }
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True}))
return profile.GetOnion(), nil
} }
func (app *application) DeleteProfile(onion string, password string) { func (app *application) DeleteProfile(onion string, password string) {
@ -363,9 +371,12 @@ func (app *application) LoadProfiles(password string) {
func (app *application) registerHooks(profile peer.CwtchPeer) { func (app *application) registerHooks(profile peer.CwtchPeer) {
// Register Hooks // Register Hooks
profile.RegisterHook(extensions.ProfileValueExtension{}) profile.RegisterHook(extensions.ProfileValueExtension{})
profile.RegisterHook(extensions.SendWhenOnlineExtension{})
profile.RegisterHook(new(filesharing.Functionality)) profile.RegisterHook(new(filesharing.Functionality))
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality)) profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
profile.RegisterHook(new(servers.Functionality)) profile.RegisterHook(new(servers.Functionality))
profile.RegisterHook(new(hybrid.ManagedGroupFunctionality))
profile.RegisterHook(new(hybrid.GroupManagerFunctionality)) // will only be activated if GroupManagerExperiment is enabled...
// Ensure that Profiles have the Most Up to Date Settings... // Ensure that Profiles have the Most Up to Date Settings...
profile.NotifySettingsUpdate(app.settings.ReadGlobalSettings()) profile.NotifySettingsUpdate(app.settings.ReadGlobalSettings())
} }
@ -391,12 +402,14 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
func (app *application) ActivatePeerEngine(onion string) { func (app *application) ActivatePeerEngine(onion string) {
profile := app.GetPeer(onion) profile := app.GetPeer(onion)
if profile != nil { if profile != nil {
app.appmutex.Lock()
if _, exists := app.engines[onion]; !exists { if _, exists := app.engines[onion]; !exists {
eventBus, exists := app.eventBuses[profile.GetOnion()] eventBus, exists := app.eventBuses[profile.GetOnion()]
if !exists { if !exists {
// todo handle this case? // todo handle this case?
log.Errorf("cannot activate peer engine without an event bus") log.Errorf("cannot activate peer engine without an event bus")
app.appmutex.Unlock()
return return
} }
@ -405,12 +418,13 @@ func (app *application) ActivatePeerEngine(onion string) {
log.Debugf("restartFlow: Creating a New Protocol Engine...") log.Debugf("restartFlow: Creating a New Protocol Engine...")
app.engines[profile.GetOnion()] = engine app.engines[profile.GetOnion()] = engine
eventBus.Publish(event.NewEventList(event.ProtocolEngineCreated)) eventBus.Publish(event.NewEventList(event.ProtocolEngineCreated))
app.QueryACNStatus()
} else { } else {
log.Errorf("corrupted profile detected for %v", onion) log.Errorf("corrupted profile detected for %v", onion)
} }
} }
app.appmutex.Unlock()
} }
app.QueryACNStatus()
} }
// ConfigureConnections autostarts the given kinds of connections. // ConfigureConnections autostarts the given kinds of connections.
@ -418,7 +432,9 @@ func (app *application) ConfigureConnections(onion string, listen bool, peers bo
profile := app.GetPeer(onion) profile := app.GetPeer(onion)
if profile != nil { if profile != nil {
app.appmutex.Lock()
profileBus, exists := app.eventBuses[profile.GetOnion()] profileBus, exists := app.eventBuses[profile.GetOnion()]
app.appmutex.Unlock()
if exists { if exists {
// if we are making a decision to ignore // if we are making a decision to ignore
if !peers || !servers { if !peers || !servers {

View File

@ -284,6 +284,7 @@ const (
Status = Field("Status") Status = Field("Status")
EventID = Field("EventID") EventID = Field("EventID")
EventContext = Field("EventContext") EventContext = Field("EventContext")
Channel = Field("Channel")
Index = Field("Index") Index = Field("Index")
RowIndex = Field("RowIndex") RowIndex = Field("RowIndex")
ContentHash = Field("ContentHash") ContentHash = Field("ContentHash")

View File

@ -104,6 +104,15 @@ func (pne ProfileValueExtension) OnContactRequestValue(profile peer.CwtchPeer, c
val, exists = profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name) val, exists = profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
} }
// NOTE: Cwtch 1.15+ requires that profiles be able to restrict file downloading to specific contacts. As such we need an ACL check here
// on the fileshareing zone.
// TODO: Split this functionality into FilesharingFunctionality, and restrict this function to only considering Profile zoned attributes?
if zone == attr.FilesharingZone {
if !conversation.GetPeerAC().ShareFiles {
return
}
}
// Construct a Response // Construct a Response
resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.RemotePeer: conversation.Handle, event.Exists: strconv.FormatBool(exists)}) resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.RemotePeer: conversation.Handle, event.Exists: strconv.FormatBool(exists)})
resp.EventID = eventID resp.EventID = eventID

View File

@ -0,0 +1,91 @@
package extensions
import (
"slices"
"strconv"
"time"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/openprivacy/log"
)
// SendWhenOnlineExtension implements automatic sending
// Some Considerations:
// - There are race conditions inherant in this approach e.g. a peer could go offline just after recieving a message and never sending an ack
// - In that case the next time we connect we will send a duplicate message.
// - Currently we do not include metadata like sent time in raw peer protocols (however Overlay does now have support for that information)
type SendWhenOnlineExtension struct {
}
func (soe SendWhenOnlineExtension) NotifySettingsUpdate(_ settings.GlobalSettings) {
}
func (soe SendWhenOnlineExtension) EventsToRegister() []event.Type {
return []event.Type{event.PeerStateChange}
}
func (soe SendWhenOnlineExtension) ExperimentsToRegister() []string {
return nil
}
func (soe SendWhenOnlineExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
case event.PeerStateChange:
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
if err == nil {
// if we have re-authenticated with thie peer then request their profile image...
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
log.Infof("Sending Offline Messages to %s", ci.Handle)
// Check the last 100 messages, if any of them are pending, then send them now...
messsages, _ := profile.GetMostRecentMessages(ci.ID, constants.CHANNEL_CHAT, 0, uint(100))
slices.Reverse(messsages)
for _, message := range messsages {
if message.Attr[constants.AttrAck] == constants.False {
sent, timeparseerr := time.Parse(time.RFC3339, message.Attr[constants.AttrSentTimestamp])
if timeparseerr != nil {
continue
}
if time.Since(sent) > time.Hour*24*7 {
continue
}
body := message.Body
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Data: body})
ev.EventID = message.Signature // we need this ensure that we correctly ack this in the db when it comes back
// TODO: The EventBus is becoming very noisy...we may want to consider a one-way shortcut to Engine i.e. profile.Engine.SendMessageToPeer
log.Infof("resending message that was sent when peer was offline")
profile.PublishEvent(ev)
}
}
if ci.HasChannel(constants.CHANNEL_MANAGER) {
messsages, _ = profile.GetMostRecentMessages(ci.ID, constants.CHANNEL_MANAGER, 0, uint(100))
slices.Reverse(messsages)
for _, message := range messsages {
if message.Attr[constants.AttrAck] == constants.False {
body := message.Body
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Data: body})
ev.EventID = message.Signature // we need this ensure that we correctly ack this in the db when it comes back
// TODO: The EventBus is becoming very noisy...we may want to consider a one-way shortcut to Engine i.e. profile.Engine.SendMessageToPeer
log.Debugf("resending message that was sent when peer was offline")
profile.PublishEvent(ev)
}
}
}
}
}
}
}
// OnContactReceiveValue is nop for SendWhenOnnlineExtension
func (soe SendWhenOnlineExtension) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, szp attr.ScopedZonedPath, value string, exists bool) {
}
// OnContactRequestValue is nop for SendWhenOnnlineExtension
func (soe SendWhenOnlineExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) {
}

View File

@ -62,10 +62,18 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP
if err == nil { if err == nil {
for _, ci := range conversations { for _, ci := range conversations {
if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED { if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED {
// if we have enabled file shares for this contact, then send them our profile image
// NOTE: In the past, Cwtch treated "profile image" as a public file share. As such, anyone with the file key and who is able
// to authenticate with the profile (i.e. non-blocked peers) can download the file (if the global profile images experiment is enabled)
// To better allow for fine-grained permissions (and to support hybrid group permissions), we want to enable per-conversation file
// sharing permissions. As such, profile images are now only shared with contacts with that permission enabled.
// (i.e. all previous accepted contacts, new accepted contacts, and contacts who have this toggle set explictly)
if ci.GetPeerAC().ShareFiles {
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey) profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
} }
} }
} }
}
case event.ProtocolEngineCreated: case event.ProtocolEngineCreated:
// Now that the Peer Engine is Activated, Reshare Profile Images // Now that the Peer Engine is Activated, Reshare Profile Images
key, exists := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey) key, exists := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)

View File

@ -0,0 +1,107 @@
package hybrid
import (
"crypto/ed25519"
"encoding/base32"
"encoding/json"
"fmt"
"strings"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"git.openprivacy.ca/openprivacy/log"
)
const ManagedGroupOpen = "managed-group-open"
type GroupEventType int
const (
MemberGroupIDKey = "member_group_id_key"
MemberMessageIDKey = "member_group_messge_id"
)
const (
AddMember = GroupEventType(0x1000)
RemoveMember = GroupEventType(0x2000)
RotateKey = GroupEventType(0x3000)
NewMessage = GroupEventType(0x4000)
NewClearMessage = GroupEventType(0x5000)
SyncRequest = GroupEventType(0x6000)
)
type ManageGroupEvent struct {
EventType GroupEventType `json:"t"`
Data string `json:"d"` // json encoded data
}
type AddMemberEvent struct {
Handle string `json:"h"`
}
type RemoveMemberEvent struct {
Handle string `json:"h"`
}
type RotateKeyEvent struct {
Key []byte `json:"k"`
}
type NewMessageEvent struct {
EncryptedHybridGroupMessage []byte `json:"m"`
}
type NewClearMessageEvent struct {
HybridGroupMessage HybridGroupMessage `json:"m"`
}
type SyncRequestMessage struct {
// a map of MemberGroupID: MemberMessageID
LastSeen map[int]int `json:"l"`
}
// This file contains code for the Hybrid Group / Managed Group types..
type HybridGroupMessage struct {
Author string `json:"a"` // the authors cwtch address
MemberGroupID uint32 `json:"g"`
MemberMessageID uint32 `json:"m"`
MessageBody string `json:"b"`
Sent uint64 `json:"t"` // milliseconds since epoch
Signature []byte `json:"s"` // of json-encoded content (including empty sig)
}
// AuthenticateMessage returns true if the Author of the message produced the Signature over the message
func AuthenticateMessage(message HybridGroupMessage) bool {
messageCopy := message
messageCopy.Signature = []byte{}
// Otherwise we derive the public key from the sender and check it against that.
decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(message.Author))
if err == nil {
data, err := json.Marshal(messageCopy)
if err == nil && len(decodedPub) >= 32 {
return ed25519.Verify(decodedPub[:32], data, message.Signature)
}
}
log.Errorf("invalid signature on message from %s", message)
return false
}
func CheckACL(handle string, group *model.Conversation) (*model.AccessControl, error) {
if isOpen, exists := group.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(ManagedGroupOpen)).ToString()]; !exists {
return nil, fmt.Errorf("group has not been setup correctly - ManagedGroupOpen does not exist ")
} else if isOpen == event.True {
// We don't need to do a membership check
defaultACL := group.GetPeerAC()
return &defaultACL, nil
}
// If this is a closed group. Check if we have an ACL entry for this member
// If we don't OR that member has been blocked, then close the connection.
if acl, inGroup := group.ACL[handle]; !inGroup || acl.Blocked {
log.Infof("ACL Check Failed: %v %v %v", handle, acl, inGroup)
return nil, fmt.Errorf("peer is not a member of this group")
} else {
return &acl, nil
}
}

View File

@ -0,0 +1,227 @@
// This file contains all code related to how a Group Manager operates over a group.
// Managed groups are canonically controlled by members setting
// the ManageGroup permission in the conversation ACL; allowing the manager to
// take control of how this group is structured, see OnEvent below...
// TODO: This file represents stage 1 of the roll out which de-risks most of the
// integration into cwtch peer, new interfaces, and UI integration
// The following functionality is not yet implemented:
// - group-level encryption
// - key rotation / membership ACL
// Cwtch Hybrid Groups are still very experimental functionality and should
// only be used for testing purposes.
package hybrid
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/settings"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/log"
)
// MANAGED_GROUP_HANDLE denotes the nominal name that the managed group is given, for easier handling
// Note: we could use id here, as the managed group should technically always be the first group
// But we don't want to assume that, and also allow conversations to be moved around without
// constantly referring to a magic id.
const MANAGED_GROUP_HANDLE = "managed:000"
type GroupManagerFunctionality struct {
}
func (f *GroupManagerFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
}
func (f *GroupManagerFunctionality) EventsToRegister() []event.Type {
return []event.Type{event.PeerStateChange, event.NewMessageFromPeerEngine}
}
func (f *GroupManagerFunctionality) ExperimentsToRegister() []string {
return []string{constants.GroupManagerExperiment, constants.GroupsExperiment}
}
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
func (f *GroupManagerFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
// We only want to engage this functionality if the peer is managing a group.
// In that case ALL peer connections and messages need to be routed through
// the management logic
// For now, we assume that a manager is a peer with a special management group.
// In the future we may want to make this a profile-level switch/attribute.
isManager := false
if ci, err := profile.FetchConversationInfo(MANAGED_GROUP_HANDLE); ci != nil && err == nil {
isManager = true
}
if isManager {
switch ev.EventType {
case event.PeerStateChange:
handle := ev.Data["RemotePeer"]
// check that we have authenticated with this peer
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
mg, err := f.GetManagedGroup(profile)
if err != nil {
log.Infof("group manager received peer connections but no suitable group has been found: %v %v", handle, err)
profile.DisconnectFromPeer(handle)
break
}
if _, err := CheckACL(handle, mg); err != nil {
log.Infof("received managed group connection from unauthorized peer: %v %v", handle, err)
profile.DisconnectFromPeer(handle)
break
}
}
// This is where most of the magic happens for managed groups. A few notes:
// - CwtchPeer has already taken care of storing this for us, we don't need to worry about that
// - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected.
case event.NewMessageFromPeerEngine:
log.Infof("received new message from peer: manager")
ci, err := f.GetManagedGroup(profile)
if err != nil {
log.Errorf("unknown conversation %v", err)
break // we don't care about unknown conversations...
}
var cm model.MessageWrapper
err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
if err != nil {
log.Errorf("could not deserialize json %s %v", ev.Data[event.Data], err)
break
}
// The overlay type of this message **must** be ManageGroupEvent
if cm.Overlay == model.OverlayManageGroupEvent {
var mge ManageGroupEvent
err = json.Unmarshal([]byte(cm.Data), &mge)
if err == nil {
f.handleEvent(profile, *ci, mge, ev.Data[event.Data])
}
}
}
}
}
// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *GroupManagerFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent, original string) {
switch mge.EventType {
case NewClearMessage:
var nme NewClearMessageEvent
err := json.Unmarshal([]byte(mge.Data), &nme)
if err == nil {
f.handleNewMessageEvent(profile, conversation, nme, original)
}
}
}
func (f *GroupManagerFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent, original string) {
log.Infof("handling new clear message event")
hgm := nme.HybridGroupMessage
if AuthenticateMessage(hgm) {
log.Infof("authenticated message")
group, err := f.GetManagedGroup(profile)
if err != nil {
log.Infof("received fraudulant hybrid message from group: %v", err)
return
}
if acl, err := CheckACL(hgm.Author, group); err != nil {
log.Infof("received fraudulant hybrid message from group: %v", err)
return
} else if !acl.Append {
log.Infof("received fraudulant hybrid message from group: peer does not have append privileges")
return
} else {
// TODO - Store this message locally in a format that makes it easier to
// do assurance later on
// forward the message to everyone who the server has added as a contact
// and who are represented in the ACL...
allConversations, _ := profile.FetchConversations()
for _, ci := range allConversations {
// NOTE: This check works for Open Groups too as CheckACL will return the default ACL
// for the group....
if ci.Handle != MANAGED_GROUP_HANDLE { // don't send to ourselves...
if acl, err := CheckACL(hgm.Author, group); err == nil && acl.Read {
log.Infof("forwarding group message to: %v", ci.Handle)
profile.SendMessage(ci.ID, original)
}
}
}
}
} else {
log.Errorf("received fraudulant hybrid message fom group")
}
}
// GetManagedGroup is a convieniance function that looks up the managed group
func (f *GroupManagerFunctionality) GetManagedGroup(profile peer.CwtchPeer) (*model.Conversation, error) {
return profile.FetchConversationInfo(MANAGED_GROUP_HANDLE)
}
// Establish a new Managed Group and return its conversation id
func (f *GroupManagerFunctionality) ManageNewGroup(profile peer.CwtchPeer) (int, error) {
// note: a manager can only manage one group. This will (probably) always be true and has a few benefits
// and downsides.
// The main downside is that it requires a new manager per group (and thus an onion service per group)
// However, it means that we can lean on p2p functionality like profile images / metadata / name
// etc. for group metadata and effectively get that for-free in the client.
// HOWEVER: hedging our bets here by giving this group a numeric handle...
if _, err := profile.FetchConversationInfo(MANAGED_GROUP_HANDLE); err == nil {
return -1, fmt.Errorf("manager is already managing a group")
}
ac := model.DefaultP2PAccessControl()
// by setting the ManageGroup permission in this ACL we are allowing the manager to
// take control of how this group is structured, see OnEvent above...
ac.ManageGroup = true
acl := model.AccessControlList{}
acl[profile.GetOnion()] = ac
acl[MANAGED_GROUP_HANDLE] = model.NoAccessControl()
ci, err := profile.NewConversation(MANAGED_GROUP_HANDLE, acl)
if err != nil {
return -1, err
}
profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(ManagedGroupOpen)), event.False)
return ci, nil
}
// AddHybridContact is a wrapper arround NewContactConversation which sets the contact
// up for Hybrid Group channel messages...
// TODO this function assumes that authorization has been done at a higher level..
func (f *GroupManagerFunctionality) AddHybridContact(profile peer.CwtchPeer, handle string) error {
ac := model.DefaultP2PAccessControl()
ac.ManageGroup = false
ci, err := profile.NewContactConversation(handle, ac, true)
if err != nil {
return err
}
mg, err := f.GetManagedGroup(profile)
if err != nil {
return err
}
// Update the ACL list to add this contact...
acl := mg.ACL
acl[handle] = model.DefaultP2PAccessControl()
profile.UpdateConversationAccessControlList(mg.ID, acl)
// enable channel 2 on this conversation (hybrid groups management channel)
profile.InitChannel(ci, constants.CHANNEL_MANAGER)
key := fmt.Sprintf("channel.%d", constants.CHANNEL_MANAGER)
profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True)
// Group managers need to always save history (and manually deal with purging...)
profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(event.SaveHistoryKey)), event.SaveHistoryConfirmed)
return nil
}
func (f *GroupManagerFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
// nop hybrid group conversations do not exchange contact requests
}
func (f *GroupManagerFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
// nop hybrid group conversations do not exchange contact requests
}

View File

@ -0,0 +1,330 @@
package hybrid
import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/settings"
"encoding/base64"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"golang.org/x/crypto/nacl/secretbox"
"math"
"math/big"
"strconv"
"time"
)
type ManagedGroupFunctionality struct {
}
func (f ManagedGroupFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
}
func (f ManagedGroupFunctionality) EventsToRegister() []event.Type {
return []event.Type{event.NewMessageFromPeerEngine}
}
func (f ManagedGroupFunctionality) ExperimentsToRegister() []string {
return []string{constants.GroupsExperiment}
}
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
func (f *ManagedGroupFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
switch ev.EventType {
// This is where most of the magic happens for managed groups. A few notes:
// - CwtchPeer has already taken care of storing this for us, we don't need to worry about that
// - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected.
case event.NewMessageFromPeerEngine:
handle := ev.Data[event.RemotePeer]
ci, err := profile.FetchConversationInfo(handle)
if err != nil {
break // we don't care about unknown conversations...
}
// We reject managed group requests for groups not setup as managed groups...
if ci.ACL[handle].ManageGroup {
var cm model.MessageWrapper
err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
if err != nil {
break
}
// The overlay type of this message **must** be ManageGroupEvent
if cm.Overlay == model.OverlayManageGroupEvent {
var mge ManageGroupEvent
err = json.Unmarshal([]byte(cm.Data), &mge)
if err == nil {
cid, err := profile.FetchConversationInfo(handle)
if err == nil {
f.handleEvent(profile, *cid, mge)
}
}
}
}
}
}
// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *ManagedGroupFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent) {
switch mge.EventType {
case AddMember:
var ame AddMemberEvent
err := json.Unmarshal([]byte(mge.Data), &ame)
if err == nil {
f.handleAddMemberEvent(profile, conversation, ame)
}
case RemoveMember:
var rme RemoveMemberEvent
err := json.Unmarshal([]byte(mge.Data), &rme)
if err == nil {
f.handleRemoveMemberEvent(profile, conversation, rme)
}
case NewMessage:
var nme NewMessageEvent
err := json.Unmarshal([]byte(mge.Data), &nme)
if err == nil {
f.handleNewMessageEvent(profile, conversation, nme)
}
case NewClearMessage:
var nme NewClearMessageEvent
err := json.Unmarshal([]byte(mge.Data), &nme)
if err == nil {
f.handleNewClearMessageEvent(profile, conversation, nme)
}
case RotateKey:
var rke RotateKeyEvent
err := json.Unmarshal([]byte(mge.Data), &rke)
if err == nil {
f.handleRotateKeyEvent(profile, conversation, rke)
}
}
}
// handleAddMemberEvent adds a group member to the conversation ACL
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *ManagedGroupFunctionality) handleAddMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, ame AddMemberEvent) {
acl := conversation.ACL
acl[ame.Handle] = model.DefaultP2PAccessControl()
profile.UpdateConversationAccessControlList(conversation.ID, acl)
}
// handleRemoveMemberEvent removes a group member from the conversation ACL
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
func (f *ManagedGroupFunctionality) handleRemoveMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, rme RemoveMemberEvent) {
acl := conversation.ACL
delete(acl, rme.Handle)
profile.UpdateConversationAccessControlList(conversation.ID, acl)
}
// handleRotateKeyEvent rotates the encryption key for a given group
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
// TODO this currently is a noop as group levle encryption is unimplemented
func (f *ManagedGroupFunctionality) handleRotateKeyEvent(profile peer.CwtchPeer, conversation model.Conversation, rke RotateKeyEvent) {
keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key"))
keyB64 := base64.StdEncoding.EncodeToString(rke.Key)
profile.SetConversationAttribute(conversation.ID, keyScope, keyB64)
}
// TODO this is a sketch implementation that is not yet complete.
func (f *ManagedGroupFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewMessageEvent) {
keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key"))
if keyB64, err := profile.GetConversationAttribute(conversation.ID, keyScope); err == nil {
key, err := base64.StdEncoding.DecodeString(keyB64)
if err != nil || len(key) != 32 {
log.Errorf("hybrid group key is corrupted")
return
}
// decrypt the message with key...
hgm, err := f.decryptMessage(key, nme.EncryptedHybridGroupMessage)
if hgm == nil || err != nil {
log.Errorf("unable to decrypt hybrid group message: %v", err)
return
}
f.handleNewClearMessageEvent(profile, conversation, NewClearMessageEvent{HybridGroupMessage: *hgm})
}
}
func (f *ManagedGroupFunctionality) handleNewClearMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent) {
hgm := nme.HybridGroupMessage
if AuthenticateMessage(hgm) {
// TODO Closed Group Membership Check - right now we only support open groups...
if profile.GetOnion() == hgm.Author {
// ack
signatureB64 := base64.StdEncoding.EncodeToString(hgm.Signature)
id, err := profile.GetChannelMessageBySignature(conversation.ID, constants.CHANNEL_CHAT, signatureB64)
if err == nil {
profile.UpdateMessageAttribute(conversation.ID, constants.CHANNEL_CHAT, id, constants.AttrAck, constants.True)
profile.PublishEvent(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.Index: strconv.Itoa(id)}))
}
} else {
mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on...
newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID))
// Set the attributes of this message...
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr,
constants.AttrAuthor: hgm.Author,
constants.AttrAck: event.True,
constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)}
// Note: The Channel here is 0...this is the main channel that UIs understand as the default, so this message is
// becomes part of the conversation...
mid, err := profile.InternalInsertMessage(conversation.ID, constants.CHANNEL_CHAT, hgm.Author, hgm.MessageBody, attr, hgm.Signature)
contenthash := model.CalculateContentHash(hgm.Author, hgm.MessageBody)
if err == nil {
profile.PublishEvent(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.TimestampSent: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano), event.RemotePeer: hgm.Author, event.Index: strconv.Itoa(mid), event.Data: hgm.MessageBody, event.ContentHash: contenthash}))
}
}
// TODO need to send an event here...
} else {
log.Errorf("received fraudulant hybrid message fom group")
}
}
// todo sketch function
func (f *ManagedGroupFunctionality) decryptMessage(key []byte, ciphertext []byte) (*HybridGroupMessage, error) {
if len(ciphertext) > 24 {
var decryptNonce [24]byte
copy(decryptNonce[:], ciphertext[:24])
var fixedSizeKey [32]byte
copy(fixedSizeKey[:], key[:32])
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &fixedSizeKey)
if ok {
var hgm HybridGroupMessage
err := json.Unmarshal(decrypted, &hgm)
return &hgm, err
}
}
return nil, fmt.Errorf("invalid ciphertext/key error")
}
// Define a new managed group, managed by the manager...
func (f *ManagedGroupFunctionality) NewManagedGroup(profile peer.CwtchPeer, manager string) error {
// generate a truely random member id for this group in [0..2^32)
nBig, err := rand.Int(rand.Reader, big.NewInt(math.MaxUint32))
if err != nil {
return err // if there is a problem with random we want to exit now rather than have to clean up group setup...
}
ac := model.DefaultP2PAccessControl()
ac.ManageGroup = true // by setting the ManageGroup permission in this ACL we are allowing the manager to control of how this group is structured
ci, err := profile.NewContactConversation(manager, ac, true)
if err != nil {
return err
}
// enable channel 2 on this conversation (hybrid groups management channel)
key := fmt.Sprintf("channel.%d", 2)
err = profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True)
if err != nil {
return fmt.Errorf("could not enable channel 2 on hybrid group: %v", err) // likely a catestrophic error...fail
}
err = profile.InitChannel(ci, 2)
if err != nil {
return fmt.Errorf("could not enable channel 2 on hybrid group: %v", err) // likely a catestrophic error...fail
}
// finally, set the member group id on this group...
mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey))
err = profile.SetConversationAttributeInt(ci, mgidkey, int(nBig.Uint64()))
if err != nil {
return fmt.Errorf("could not set group id on hybrid group: %v", err) // likely a catestrophic error...fail
}
return nil
}
// SendMessageToManagedGroup acts like SendMessage(ToPeer), but with a few additional bookkeeping steps for Hybrid Groups
func (f *ManagedGroupFunctionality) SendMessageToManagedGroup(profile peer.CwtchPeer, conversation int, message string) (int, error) {
mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey))
mgid, err := profile.GetConversationAttributeInt(conversation, mgidkey)
if err != nil {
return -1, err
}
mmidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberMessageIDKey))
mmid, err := profile.GetConversationAttributeInt(conversation, mmidkey)
if err != nil {
mmid = 0 // first message
}
mmid += 1
// Now time to package this whole thing in layers of JSON...
hgm := HybridGroupMessage{
MemberGroupID: uint32(mgid),
MemberMessageID: uint32(mmid),
Sent: uint64(time.Now().UnixMilli()),
Author: profile.GetOnion(),
MessageBody: message,
Signature: []byte{}, // Leave blank so we can sign this message...
}
data, err := json.Marshal(hgm)
if err != nil {
return -1, err
}
// Don't forget to sign the message...
sig, err := profile.SignMessage(data)
if err != nil {
return -1, err
}
hgm.Signature = sig
ncm := NewClearMessageEvent{
HybridGroupMessage: hgm,
}
signedData, err := json.Marshal(ncm)
if err != nil {
return -1, err
}
mgm := ManageGroupEvent{
EventType: NewClearMessage,
Data: string(signedData),
}
odata, err := json.Marshal(mgm)
if err != nil {
return -1, err
}
overlay := model.MessageWrapper{
Overlay: model.OverlayManageGroupEvent,
Data: string(odata),
}
ojson, err := json.Marshal(overlay)
if err != nil {
return -1, err
}
// send the message to the manager and update our message is string for tracking...
_, err = profile.SendMessage(conversation, string(ojson))
if err != nil {
return -1, err
}
profile.SetConversationAttributeInt(conversation, mmidkey, mmid)
// ok there is still one more thing we need to do...
// insert this message as part of our group log, for members of the group
// this exists in channel 0 of the conversation with the group manager...
mgidstr := strconv.Itoa(mgid) // we need both MemberGroupId and MemberMessageId for attestation later on...
newmmidstr := strconv.Itoa(mmid)
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr, constants.AttrAuthor: profile.GetOnion(), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}
return profile.InternalInsertMessage(conversation, 0, hgm.Author, message, attr, hgm.Signature)
}
func (f ManagedGroupFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
// nop hybrid group conversations do not exchange contact requests
}
func (f ManagedGroupFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
// nop hybrid group conversations do not exchange contact requests
}

View File

@ -0,0 +1,75 @@
package inter
import (
"errors"
"strings"
"cwtch.im/cwtch/functionality/hybrid"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
)
// This functionality is a little different. It's not functionality per-se. It's a wrapper around
// CwtchProfile function that combines some core-functionalities like Hybrid Groups so that
// they can be transparently exposed in autobindings.
// DEV NOTE: consider moving other cross-cutting interface functions here to simplfy CwtchPeer
type InterfaceFunctionality struct {
}
// FunctionalityGate returns filesharing functionality - gates now happen on function calls.
func FunctionalityGate() *InterfaceFunctionality {
return new(InterfaceFunctionality)
}
func (i InterfaceFunctionality) ImportBundle(profile peer.CwtchPeer, uri string) error {
// check if this is a managed group. Note: managed groups do not comply with the server bundle format.
if strings.HasPrefix(uri, "managed:") {
uri = uri[len("managed:"):]
if profile.IsFeatureEnabled(constants.GroupsExperiment) {
mgf := hybrid.ManagedGroupFunctionality{}
return mgf.NewManagedGroup(profile, uri)
} else {
return errors.New("managed groups require the group experiment to be enabled")
}
}
// DEV NOTE: we may want to eventually move Server Import code to ServerFunctionality and add a hook here...
// DEV NOTE: consider making ImportBundle a high-level functionality interface? to support different kinds of contacts?
return profile.ImportBundle(uri)
}
// EnhancedImportBundle is identical to EnhancedImportBundle in CwtchPeer but instead of wrapping CwtchPeer.ImportBundle it instead
// wraps InterfaceFunctionality.ImportBundle
func (i InterfaceFunctionality) EnhancedImportBundle(profile peer.CwtchPeer, uri string) string {
err := i.ImportBundle(profile, uri)
if err == nil {
return "importBundle.success"
}
return err.Error()
}
// SendMessage sends a message to a conversation.
// NOTE: Unlike CwtchPeer.SendMessage this interface makes no guarentees about the raw-ness of the message sent to peer contacts.
// If the conversation is a hybrid groups then the message may be wrapped in multiple layers of overlay messages / encryption
// prior to being send. To send a raw message to a peer then use peer.CwtchPeer
// DEV NOTE: Move Legacy Group message send here...
func (i InterfaceFunctionality) SendMessage(profile peer.CwtchPeer, conversation int, message string) (int, error) {
ci, err := profile.GetConversationInfo(conversation)
if err != nil {
return -1, err
}
if ci.ACL[ci.Handle].ManageGroup {
mgf := hybrid.ManagedGroupFunctionality{}
return mgf.SendMessageToManagedGroup(profile, conversation, message)
}
return profile.SendMessage(conversation, message)
}
// EnhancedSendMessage Attempts to Send a Message and Immediately Attempts to Lookup the Message in the Database
// this wraps InterfaceFunctionality.SendMessage to support HybridGroups
func (i InterfaceFunctionality) EnhancedSendMessage(profile peer.CwtchPeer, conversation int, message string) string {
mid, err := i.SendMessage(profile, conversation, message)
if err != nil {
return ""
}
return profile.EnhancedGetMessageById(conversation, mid)
}

1
go.mod
View File

@ -16,7 +16,6 @@ require (
require ( require (
filippo.io/edwards25519 v1.0.0 // indirect filippo.io/edwards25519 v1.0.0 // indirect
git.openprivacy.ca/openprivacy/bine v0.0.5 // indirect git.openprivacy.ca/openprivacy/bine v0.0.5 // indirect
github.com/client9/misspell v0.3.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect github.com/google/go-cmp v0.5.8 // indirect
github.com/gtank/merlin v0.1.1 // indirect github.com/gtank/merlin v0.1.1 // indirect
github.com/mimoo/StrobeGo v0.0.0-20220103164710-9a04d6ca976b // indirect github.com/mimoo/StrobeGo v0.0.0-20220103164710-9a04d6ca976b // indirect

2
go.sum
View File

@ -8,8 +8,6 @@ git.openprivacy.ca/openprivacy/connectivity v1.11.0 h1:roASjaFtQLu+HdH5fa2wx6F00
git.openprivacy.ca/openprivacy/connectivity v1.11.0/go.mod h1:OQO1+7OIz/jLxDrorEMzvZA6SEbpbDyLGpjoFqT3z1Y= git.openprivacy.ca/openprivacy/connectivity v1.11.0/go.mod h1:OQO1+7OIz/jLxDrorEMzvZA6SEbpbDyLGpjoFqT3z1Y=
git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0= git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0=
git.openprivacy.ca/openprivacy/log v1.0.3/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= git.openprivacy.ca/openprivacy/log v1.0.3/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -20,6 +20,9 @@ const (
// LegacyGroupZone for attributes related to legacy group experiment // LegacyGroupZone for attributes related to legacy group experiment
LegacyGroupZone = Zone("legacygroup") LegacyGroupZone = Zone("legacygroup")
// ConversationZone for attributes related to structure of the conversation
ConversationZone = Zone("conversation")
// FilesharingZone for attributes related to file sharing // FilesharingZone for attributes related to file sharing
FilesharingZone = Zone("filesharing") FilesharingZone = Zone("filesharing")
@ -65,6 +68,8 @@ func ParseZone(path string) (Zone, string) {
return ServerKeyZone, parts[1] return ServerKeyZone, parts[1]
case ServerZone: case ServerZone:
return ServerZone, parts[1] return ServerZone, parts[1]
case ConversationZone:
return ConversationZone, parts[1]
default: default:
return UnknownZone, parts[1] return UnknownZone, parts[1]
} }

View File

@ -58,6 +58,7 @@ const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"
const AttrLastConnectionTime = "last-connection-time" const AttrLastConnectionTime = "last-connection-time"
const PeerAutostart = "autostart" const PeerAutostart = "autostart"
const PeerAppearOffline = "appear-offline" const PeerAppearOffline = "appear-offline"
const PrivateName = "private-name"
const Archived = "archived" const Archived = "archived"
const ProfileStatus = "profile-status" const ProfileStatus = "profile-status"

View File

@ -0,0 +1,4 @@
package constants
const CHANNEL_CHAT = 0
const CHANNEL_MANAGER = 2

View File

@ -19,3 +19,6 @@ var AutoDLFileExts = [...]string{".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp
// BlodeuweddExperiment enables the Blodeuwedd Assistant // BlodeuweddExperiment enables the Blodeuwedd Assistant
const BlodeuweddExperiment = "blodeuwedd" const BlodeuweddExperiment = "blodeuwedd"
// Enables the Hybrid Group Manager Extension
const GroupManagerExperiment = "group-manager"

View File

@ -4,6 +4,8 @@ import (
"cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants" "cwtch.im/cwtch/model/constants"
"encoding/json" "encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log" "git.openprivacy.ca/openprivacy/log"
"time" "time"
) )
@ -23,6 +25,8 @@ type AccessControl struct {
// Extension Related Permissions // Extension Related Permissions
ShareFiles bool // Allows a handle to share files to a conversation ShareFiles bool // Allows a handle to share files to a conversation
RenderImages bool // Indicates that certain filetypes should be autodownloaded and rendered when shared by this contact RenderImages bool // Indicates that certain filetypes should be autodownloaded and rendered when shared by this contact
ManageGroup bool // Allows this conversation to be managed by hybrid groups
} }
// DefaultP2PAccessControl defaults to a semi-trusted peer with no access to special extensions. // DefaultP2PAccessControl defaults to a semi-trusted peer with no access to special extensions.
@ -31,6 +35,14 @@ func DefaultP2PAccessControl() AccessControl {
AutoConnect: true, ShareFiles: false, RenderImages: false} AutoConnect: true, ShareFiles: false, RenderImages: false}
} }
// NoAccessControl defaults to a none-trusted peer with no access to special extensions.
// This is used as a fall back (if due to a software glitch a contact was setup without an access control, or for
// special contacts that should never be involvedt in external networking e.g. notes-to-self, or managed peers)
func NoAccessControl() AccessControl {
return AccessControl{Read: false, Append: false, ExchangeAttributes: false, Blocked: false,
AutoConnect: false, ShareFiles: false, RenderImages: false}
}
// AccessControlList represents an access control list for a conversation. Mapping handles to conversation // AccessControlList represents an access control list for a conversation. Mapping handles to conversation
// functions // functions
type AccessControlList map[string]AccessControl type AccessControlList map[string]AccessControl
@ -95,8 +107,28 @@ func (ci *Conversation) GetPeerAC() AccessControl {
if acl, exists := ci.ACL[ci.Handle]; exists { if acl, exists := ci.ACL[ci.Handle]; exists {
return acl return acl
} }
log.Errorf("attempted to access a Peer Access Control object from %v but peer ACL is undefined. This is likely a programming error", ci.Handle) log.Errorf("attempted to access a Peer Access Control object from %v but peer ACL is undefined. This is likely a programming error - fallback to a NoAccess AC", ci.Handle)
return DefaultP2PAccessControl() return NoAccessControl()
}
// HasChannel returns true if the requested channel has been setup for this conversation
func (ci *Conversation) HasChannel(requestedChannel int) bool {
if requestedChannel == 0 {
return true
}
if requestedChannel == 1 {
return false // channel 1 is mapped to channel 0 for backwards compatibility
}
key := fmt.Sprintf("channel.%d", requestedChannel)
if value, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)).ToString()]; exists {
return value == constants.True
}
return false
}
// IsCwtchPeer is a helper attribute that identifies whether a conversation is a cwtch peer
func (ci *Conversation) IsCwtchPeer() bool {
return tor.IsValidHostname(ci.Handle)
} }
// IsGroup is a helper attribute that identifies whether a conversation is a legacy group // IsGroup is a helper attribute that identifies whether a conversation is a legacy group

View File

@ -6,13 +6,13 @@ import "sync"
// examples of experiments include File Sharing, Profile Images and Groups. // examples of experiments include File Sharing, Profile Images and Groups.
type Experiments struct { type Experiments struct {
enabled bool enabled bool
experiments sync.Map experiments *sync.Map
} }
// InitExperiments encapsulates a set of experiments separate from their storage in GlobalSettings. // InitExperiments encapsulates a set of experiments separate from their storage in GlobalSettings.
func InitExperiments(enabled bool, experiments map[string]bool) Experiments { func InitExperiments(enabled bool, experiments map[string]bool) Experiments {
var syncExperiments sync.Map syncExperiments := new(sync.Map)
for experiment, set := range experiments { for experiment, set := range experiments {
syncExperiments.Store(experiment, set) syncExperiments.Store(experiment, set)
} }

View File

@ -24,6 +24,7 @@ type MessageWrapper struct {
// Channel 2 is reserved for conversation admin (managed groups) // Channel 2 is reserved for conversation admin (managed groups)
// Channel 7 is reserved for streams (no ack, no store) // Channel 7 is reserved for streams (no ack, no store)
func (mw MessageWrapper) Channel() int { func (mw MessageWrapper) Channel() int {
// 1024 / 0x400 is the start of new channel overlays
if mw.Overlay > 1024 { if mw.Overlay > 1024 {
return mw.Overlay & 0x07 return mw.Overlay & 0x07
} }
@ -48,3 +49,6 @@ const OverlayInviteGroup = 101
// OverlayFileSharing is the canonical identifier for the file sharing overlay // OverlayFileSharing is the canonical identifier for the file sharing overlay
const OverlayFileSharing = 200 const OverlayFileSharing = 200
// ManageGroupEvent is the canonical identifier for the manage group overlay
const OverlayManageGroupEvent = 0x402

View File

@ -54,12 +54,10 @@ const MaxGroupMessageLength = 1800
func getRandomness(arr *[]byte) { func getRandomness(arr *[]byte) {
if _, err := io.ReadFull(rand.Reader, (*arr)[:]); err != nil { if _, err := io.ReadFull(rand.Reader, (*arr)[:]); err != nil {
if err != nil {
// If we can't do randomness, just crash something is very very wrong and we are not going // If we can't do randomness, just crash something is very very wrong and we are not going
// to resolve it here.... // to resolve it here....
panic(err.Error()) panic(err.Error())
} }
}
} }
// GenerateRandomID generates a random 16 byte hex id code // GenerateRandomID generates a random 16 byte hex id code

View File

@ -3,20 +3,11 @@ package peer
import ( import (
"context" "context"
"crypto/rand" "crypto/rand"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"encoding/base64" "encoding/base64"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"os" "os"
path "path/filepath" path "path/filepath"
"sort" "sort"
@ -25,6 +16,16 @@ import (
"sync" "sync"
"time" "time"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
"cwtch.im/cwtch/model/attr" "cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections" "cwtch.im/cwtch/protocol/connections"
@ -310,8 +311,8 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
authorizations := make(map[string]model.Authorization) authorizations := make(map[string]model.Authorization)
for _, conversation := range conversations { for _, conversation := range conversations {
if tor.IsValidHostname(conversation.Handle) { // Only perform the following actions for Peer-type Conversaions...
if conversation.IsCwtchPeer() {
// if this profile does not have an ACL version, and the profile is accepted (OR the acl version is v1 and the profile is accepted...) // if this profile does not have an ACL version, and the profile is accepted (OR the acl version is v1 and the profile is accepted...)
// then migrate the permissions to the v2 ACL // then migrate the permissions to the v2 ACL
// migrate the old accepted AC to a new fine-grained one // migrate the old accepted AC to a new fine-grained one
@ -441,7 +442,7 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
if cm, err := model.DeserializeMessage(message); err == nil { if cm, err := model.DeserializeMessage(message); err == nil {
if !cm.IsStream() { if !cm.IsStream() {
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks // For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
id, err = cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message)) id, err = cp.storage.InsertMessage(conversationInfo.ID, cm.Channel(), message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
if err != nil { if err != nil {
return -1, err return -1, err
} }
@ -707,6 +708,14 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
return groupConversationID, err return groupConversationID, err
} }
// NewConversation create a new multi-peer conversation.
func (cp *cwtchPeer) NewConversation(handle string, acl model.AccessControlList) (int, error) {
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, acl, true)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano))
return conversationID, err
}
// NewContactConversation create a new p2p conversation with the given acl applied to the handle. // NewContactConversation create a new p2p conversation with the given acl applied to the handle.
func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) { func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) {
cp.mutex.Lock() cp.mutex.Lock()
@ -897,6 +906,26 @@ func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath)
return val, nil return val, nil
} }
// SetConversationAttributeInt sets the conversation attribute at path to an integer value
func (cp *cwtchPeer) SetConversationAttributeInt(id int, path attr.ScopedZonedPath, value int) error {
strvalue := strconv.Itoa(value)
return cp.storage.SetConversationAttribute(id, path, strvalue)
}
// GetConversationAttributeInt is a method for retrieving an integer value of a given conversation
func (cp *cwtchPeer) GetConversationAttributeInt(id int, path attr.ScopedZonedPath) (int, error) {
ci, err := cp.storage.GetConversation(id)
if err != nil {
return 0, err
}
val, exists := ci.Attributes[path.ToString()]
if !exists {
return 0, fmt.Errorf("%v does not exist for conversation %v", path.ToString(), id)
}
intvalue, err := strconv.Atoi(val)
return intvalue, err
}
// GetChannelMessage returns a message from a conversation channel referenced by the absolute ID. // GetChannelMessage returns a message from a conversation channel referenced by the absolute ID.
// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position // Note: This should note be used to index a list as the ID is not expected to be tied to absolute position
// in the table (e.g. deleted messages, expired messages, etc.) // in the table (e.g. deleted messages, expired messages, etc.)
@ -1147,6 +1176,12 @@ func (cp *cwtchPeer) DisconnectFromServer(onion string) {
// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests // QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
// Status: Ready for 1.10 // Status: Ready for 1.10
func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) { func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) {
// don't queue peers that do not have an onion addresses as a handle
// TODO establish a more robust separation between peerable contacts and
// facade contacts (managed groups, notes-to-self etc.)
if !tor.IsValidHostname(handle) {
return
}
ci, err := cp.FetchConversationInfo(handle) ci, err := cp.FetchConversationInfo(handle)
if err == nil { if err == nil {
lastSeen := cp.GetConversationLastSeenTime(ci.ID) lastSeen := cp.GetConversationLastSeenTime(ci.ID)
@ -1500,15 +1535,21 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
} }
// Don't store messages in channel 7 // Don't store messages in channel 7
channel := 0
if cm, err := model.DeserializeMessage(message); err == nil { if cm, err := model.DeserializeMessage(message); err == nil {
if cm.IsStream() { if cm.IsStream() {
return -1, nil return -1, nil
} }
channel = cm.Channel()
} }
// Generate a random number and use it as the signature // Generate a random number and use it as the signature
signature := event.GetRandNumber().String() signature := event.GetRandNumber().String()
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message)) return cp.storage.InsertMessage(ci.ID, channel, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
}
func (cp *cwtchPeer) InitChannel(conversation int, channel int) error {
return cp.storage.InitChannelOnConversation(conversation, channel)
} }
// eventHandler process events from other subsystems // eventHandler process events from other subsystems
@ -1523,6 +1564,7 @@ func (cp *cwtchPeer) eventHandler() {
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString()) onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
log.Infof("Protocol engine for %s has stopped listening: %v", onion, ev.Data[event.Error]) log.Infof("Protocol engine for %s has stopped listening: %v", onion, ev.Data[event.Error])
cp.mutex.Unlock() cp.mutex.Unlock()
cp.processExtensionsEvent(ev)
case event.EncryptedGroupMessage: case event.EncryptedGroupMessage:
// If successful, a side effect is the message is added to the group's timeline // If successful, a side effect is the message is added to the group's timeline
@ -1567,6 +1609,7 @@ func (cp *cwtchPeer) eventHandler() {
case event.NewMessageFromPeerEngine: //event.TimestampReceived, event.RemotePeer, event.Data case event.NewMessageFromPeerEngine: //event.TimestampReceived, event.RemotePeer, event.Data
ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
id, err := cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) id, err := cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
cp.processExtensionsEvent(ev)
if err == nil { if err == nil {
// Republish as NewMessageFromPeer // Republish as NewMessageFromPeer
ev.EventType = event.NewMessageFromPeer ev.EventType = event.NewMessageFromPeer
@ -1610,7 +1653,6 @@ func (cp *cwtchPeer) eventHandler() {
conversationInfo, err := cp.FetchConversationInfo(onion) conversationInfo, err := cp.FetchConversationInfo(onion)
log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err) log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err)
// only accepted contacts can look up information
if conversationInfo != nil && conversationInfo.GetPeerAC().ExchangeAttributes { if conversationInfo != nil && conversationInfo.GetPeerAC().ExchangeAttributes {
// Type Safe Scoped/Zoned Path // Type Safe Scoped/Zoned Path
zscope := attr.IntoScope(scope) zscope := attr.IntoScope(scope)
@ -1683,6 +1725,7 @@ func (cp *cwtchPeer) eventHandler() {
timestamp := time.Now().Format(time.RFC3339Nano) timestamp := time.Now().Format(time.RFC3339Nano)
cp.SetConversationAttribute(cid, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp) cp.SetConversationAttribute(cid, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp)
} else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED { } else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED {
ci, err := cp.FetchConversationInfo(handle) ci, err := cp.FetchConversationInfo(handle)
if err == nil { if err == nil {
@ -1697,19 +1740,7 @@ func (cp *cwtchPeer) eventHandler() {
} }
// Safe Access to Extensions // Safe Access to Extensions
cp.extensionLock.Lock() cp.processExtensionsEvent(ev)
for _, extension := range cp.extensions {
log.Debugf("checking extension...%v", extension)
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
continue
}
if cp.checkEventExperiment(extension, ev.EventType) {
extension.extension.OnEvent(ev, cp)
}
}
cp.extensionLock.Unlock()
case event.ServerStateChange: case event.ServerStateChange:
cp.mutex.Lock() cp.mutex.Lock()
@ -1848,6 +1879,21 @@ func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversat
return err return err
} }
// finds a message by a signature by searching across all possible channels returns a channel and a message id
func (cp *cwtchPeer) findChannelMessageBySignature(ci *model.Conversation, signature string) (int, int, error) {
id, err := cp.GetChannelMessageBySignature(ci.ID, constants.CHANNEL_CHAT, signature)
if err == nil {
return constants.CHANNEL_CHAT, id, nil
}
if ci.HasChannel(constants.CHANNEL_MANAGER) {
id, err = cp.GetChannelMessageBySignature(ci.ID, constants.CHANNEL_MANAGER, signature)
if err == nil {
return constants.CHANNEL_MANAGER, id, nil
}
}
return -1, -1, err
}
// attemptAcknowledgeP2PConversation is a convenience method that looks up the conversation // attemptAcknowledgeP2PConversation is a convenience method that looks up the conversation
// by the given handle and attempts to mark the message as acknowledged. returns error on failure // by the given handle and attempts to mark the message as acknowledged. returns error on failure
// to either find the contact or the associated message // to either find the contact or the associated message
@ -1855,21 +1901,19 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
ci, err := cp.FetchConversationInfo(handle) ci, err := cp.FetchConversationInfo(handle)
// We should *never* received a peer acknowledgement for a conversation that doesn't exist... // We should *never* received a peer acknowledgement for a conversation that doesn't exist...
if ci != nil && err == nil { if ci != nil && err == nil {
// for p2p messages the randomly generated event ID is the "signature" chid, mid, err := cp.findChannelMessageBySignature(ci, signature)
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature)
if err == nil { if err == nil {
_, attributes, err := cp.GetChannelMessage(ci.ID, 0, id) _, attributes, err := cp.GetChannelMessage(ci.ID, chid, mid)
if err == nil { if err == nil {
cp.mutex.Lock() cp.mutex.Lock()
attributes[constants.AttrAck] = constants.True attributes[constants.AttrAck] = constants.True
cp.mutex.Unlock() cp.mutex.Unlock()
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attributes) cp.storage.UpdateMessageAttributes(ci.ID, chid, mid, attributes)
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: handle, event.Index: strconv.Itoa(id)})) cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Channel: strconv.Itoa(chid), event.Index: strconv.Itoa(mid)}))
return nil return nil
} }
return err
} }
return err return fmt.Errorf("no such signature error: %x", signature)
} }
return err return err
} }
@ -1883,16 +1927,16 @@ func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature st
// We should *never* received an error for a conversation that doesn't exist... // We should *never* received an error for a conversation that doesn't exist...
if ci != nil && err == nil { if ci != nil && err == nil {
// "signature" here is event ID for peer messages... // "signature" here is event ID for peer messages...
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature) chid, mid, err := cp.findChannelMessageBySignature(ci, signature)
if err == nil { if err == nil {
_, attributes, err := cp.GetChannelMessage(ci.ID, 0, id) _, attributes, err := cp.GetChannelMessage(ci.ID, chid, mid)
if err == nil { if err == nil {
cp.mutex.Lock() cp.mutex.Lock()
attributes[constants.AttrErr] = constants.True attributes[constants.AttrErr] = constants.True
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attributes) cp.storage.UpdateMessageAttributes(ci.ID, chid, mid, attributes)
cp.mutex.Unlock() cp.mutex.Unlock()
// Send a generic indexed failure... // Send a generic indexed failure...
cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.Handle: handle, event.Error: error, event.Index: strconv.Itoa(id)})) cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.Handle: handle, event.Error: error, event.Channel: strconv.Itoa(chid), event.Index: strconv.Itoa(mid)}))
return nil return nil
} }
return err return err
@ -1931,3 +1975,45 @@ func (cp *cwtchPeer) constructGroupFromConversation(conversationInfo *model.Conv
} }
return &group, nil return &group, nil
} }
// Insert a message directly into a conversation-channel, with the given attributes.
// NOTE: This should only be used by cwtch-extensions. Regular API uses should use more direct methods
// like SendMessage
func (cp *cwtchPeer) InternalInsertMessage(conversation int, channel int, author string, body string, attrs model.Attributes, signature []byte) (int, error) {
signatureB64 := base64.StdEncoding.EncodeToString(signature)
return cp.storage.InsertMessage(conversation, channel, body, attrs, signatureB64, model.CalculateContentHash(author, body))
}
func (cp *cwtchPeer) SignMessage(blob []byte) ([]byte, error) {
privateKey, err := cp.storage.LoadProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey")
if err != nil {
log.Errorf("error loading private key from storage")
return nil, err
}
publicKey, err := cp.storage.LoadProfileKeyValue(TypePublicKey, "Ed25519PublicKey")
if err != nil {
log.Errorf("error loading public key from storage")
return nil, err
}
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
return identity.Sign(blob), nil
}
func (cp *cwtchPeer) processExtensionsEvent(ev event.Event) {
cp.extensionLock.Lock()
for _, extension := range cp.extensions {
// check if the current map of experiments satisfies the extension requirements
if !cp.checkExtensionExperiment(extension) {
log.Debugf("skipping extension (%v) ..not all experiments satisfied", extension)
continue
}
// if the extension is registered for this event type then process
if _, contains := extension.events[ev.EventType]; contains {
extension.extension.OnEvent(ev, cp)
}
}
cp.extensionLock.Unlock()
}

View File

@ -3,19 +3,21 @@ package peer
import ( import (
"archive/tar" "archive/tar"
"compress/gzip" "compress/gzip"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"database/sql" "database/sql"
"errors" "errors"
"fmt" "fmt"
"git.openprivacy.ca/openprivacy/log"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"git.openprivacy.ca/openprivacy/log"
) )
// StorageKeyType is an interface wrapper around storage key types // StorageKeyType is an interface wrapper around storage key types
@ -87,7 +89,7 @@ const setConversationACLSQLStmt = `update conversations set ACL=(?) where ID=(?)
const deleteConversationSQLStmt = `delete from conversations where ID=(?);` const deleteConversationSQLStmt = `delete from conversations where ID=(?);`
// createTableConversationMessagesSQLStmt is a template for creating conversation based tables... // createTableConversationMessagesSQLStmt is a template for creating conversation based tables...
const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_0_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime, Signature text unique, ContentHash blob text);` const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_%d_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime, Signature text unique, ContentHash blob text);`
// insertMessageIntoConversationSQLStmt is a template for creating conversation based tables... // insertMessageIntoConversationSQLStmt is a template for creating conversation based tables...
const insertMessageIntoConversationSQLStmt = `insert into channel_%d_%d_chat (Body, Attributes, Signature, ContentHash) values(?,?,?,?);` const insertMessageIntoConversationSQLStmt = `insert into channel_%d_%d_chat (Body, Attributes, Signature, ContentHash) values(?,?,?,?);`
@ -324,7 +326,7 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.
return -1, tx.Rollback() return -1, tx.Rollback()
} }
result, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id)) result, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id, 0))
if err != nil { if err != nil {
log.Errorf("error executing transaction: %v", err) log.Errorf("error executing transaction: %v", err)
return -1, tx.Rollback() return -1, tx.Rollback()
@ -345,6 +347,27 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.
return int(conversationID), nil return int(conversationID), nil
} }
func (cps *CwtchProfileStorage) InitChannelOnConversation(id int, channel int) error {
cps.mutex.Lock()
defer cps.mutex.Unlock()
tx, err := cps.db.Begin()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return tx.Rollback()
}
_, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id, channel))
if err != nil {
log.Errorf("error executing transaction: %v", err)
return tx.Rollback()
}
err = tx.Commit()
if err != nil {
log.Errorf("error executing transaction: %v", err)
return tx.Rollback()
}
return nil
}
// GetConversationByHandle is a convenience method to fetch an active conversation by a handle // GetConversationByHandle is a convenience method to fetch an active conversation by a handle
// Usage Notes: This should **only** be used to look up p2p conversations by convention. // Usage Notes: This should **only** be used to look up p2p conversations by convention.
// Ideally this function should not exist, and all lookups should happen by ID (this is currently // Ideally this function should not exist, and all lookups should happen by ID (this is currently
@ -843,7 +866,8 @@ func (cps *CwtchProfileStorage) PurgeConversationChannel(conversation int, chann
defer cps.mutex.Unlock() defer cps.mutex.Unlock()
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(purgeMessagesFromConversationSQLStmt, conversation, channel)) conversationStmt, err := cps.db.Prepare(fmt.Sprintf(purgeMessagesFromConversationSQLStmt, conversation, channel))
if err != nil { if err != nil {
log.Errorf("error executing transaction: %v", err) // Not all tables exist so this error can be spurious. Instead of logging here
// we will delegate handling to the caller...
return err return err
} }
conversationStmt.Exec() conversationStmt.Exec()
@ -867,7 +891,7 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() {
if err == nil { if err == nil {
for _, conversation := range ci { for _, conversation := range ci {
// unless this is a server or a group...for which we default save always (for legacy reasons) // unless this is a server or a group...for which we default save always (for legacy reasons)
// FIXME: revisit this for hybrid groups. // Note: Hybrid Groups are Peers so this check does not apply...
if !conversation.IsGroup() && !conversation.IsServer() { if !conversation.IsGroup() && !conversation.IsServer() {
// Note that we only check for confirmed status here...if it is set to any other value we will fallthrough to the default. // Note that we only check for confirmed status here...if it is set to any other value we will fallthrough to the default.
saveHistoryConfirmed := conversation.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(event.SaveHistoryKey)).ToString()] == event.SaveHistoryConfirmed saveHistoryConfirmed := conversation.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(event.SaveHistoryKey)).ToString()] == event.SaveHistoryConfirmed
@ -878,7 +902,10 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() {
if deleteHistoryConfirmed || (!saveHistoryConfirmed && !defaultSave) { if deleteHistoryConfirmed || (!saveHistoryConfirmed && !defaultSave) {
log.Debugf("purging conversation...") log.Debugf("purging conversation...")
// TODO: At some point in the future this needs to iterate over channels and make a decision for each on.. // TODO: At some point in the future this needs to iterate over channels and make a decision for each on..
cps.PurgeConversationChannel(conversation.ID, 0) // TODO: We need a way to differentiate between channels that don't exist, and database errors
// this should probably incorporate higher level logic (e.g. has channel...)
cps.PurgeConversationChannel(conversation.ID, constants.CHANNEL_CHAT)
cps.PurgeConversationChannel(conversation.ID, constants.CHANNEL_MANAGER)
} }
} }
} }

View File

@ -50,6 +50,8 @@ type ModifyServers interface {
// SendMessages enables a caller to sender messages to a contact // SendMessages enables a caller to sender messages to a contact
type SendMessages interface { type SendMessages interface {
// SendMessage sends a raw message to the conversation.
// SendMessage is a deprecated public API. Use EnhancedSendMessage instead
SendMessage(conversation int, message string) (int, error) SendMessage(conversation int, message string) (int, error)
// EnhancedSendMessage Attempts to Send a Message and Immediately Attempts to Lookup the Message in the Database // EnhancedSendMessage Attempts to Send a Message and Immediately Attempts to Lookup the Message in the Database
@ -115,6 +117,8 @@ type CwtchPeer interface {
EnhancedImportBundle(string) string EnhancedImportBundle(string) string
// New Unified Conversation Interfaces // New Unified Conversation Interfaces
NewConversation(handle string, acl model.AccessControlList) (int, error)
InitChannel(conversation int, channel int) error
NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error)
FetchConversations() ([]*model.Conversation, error) FetchConversations() ([]*model.Conversation, error)
ArchiveConversation(conversation int) ArchiveConversation(conversation int)
@ -135,12 +139,15 @@ type CwtchPeer interface {
SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error) GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
SetConversationAttributeInt(conversation int, path attr.ScopedZonedPath, value int) error
GetConversationAttributeInt(conversation int, path attr.ScopedZonedPath) (int, error)
DeleteConversation(conversation int) error DeleteConversation(conversation int) error
// New Unified Conversation Channel Interfaces // New Unified Conversation Channel Interfaces
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error) GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
GetChannelMessageCount(conversation int, channel int) (int, error) GetChannelMessageCount(conversation int, channel int) (int, error)
GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error) GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error)
GetChannelMessageBySignature(conversationID int, channelID int, signature string) (int, error)
GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error) GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error)
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
SearchConversations(pattern string) string SearchConversations(pattern string) string
@ -168,6 +175,10 @@ type CwtchPeer interface {
UpdateExperiments(enabled bool, experiments map[string]bool) UpdateExperiments(enabled bool, experiments map[string]bool)
NotifySettingsUpdate(settings settings.GlobalSettings) NotifySettingsUpdate(settings settings.GlobalSettings)
IsFeatureEnabled(featureName string) bool IsFeatureEnabled(featureName string) bool
SignMessage(blob []byte) ([]byte, error)
// Used for Internal Bookkeeping by Extensions, **do not expose in autobindings**
InternalInsertMessage(conversation int, channel int, author string, body string, attributes model.Attributes, signature []byte) (int, error)
} }
// EnhancedMessage wraps a Cwtch model.Message with some additional data to reduce calls from the UI. // EnhancedMessage wraps a Cwtch model.Message with some additional data to reduce calls from the UI.

View File

@ -178,7 +178,7 @@ func TestFileSharing(t *testing.T) {
// testBobDownloadFile(t, bob, filesharingFunctionality, queueOracle) // testBobDownloadFile(t, bob, filesharingFunctionality, queueOracle)
// Wait for say... // Wait for say...
time.Sleep(10 * time.Second) time.Sleep(30 * time.Second)
if _, err := os.Stat(path.Join(settings.DownloadPath, "cwtch.png")); errors.Is(err, os.ErrNotExist) { if _, err := os.Stat(path.Join(settings.DownloadPath, "cwtch.png")); errors.Is(err, os.ErrNotExist) {
// path/to/whatever does not exist // path/to/whatever does not exist

View File

@ -34,26 +34,6 @@ var (
carolLines = []string{"Howdy, thanks!"} carolLines = []string{"Howdy, thanks!"}
) )
func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
for {
log.Infof("%v checking connection...\n", peerName)
state := peer.GetPeerState(addr)
log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state])
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
}
if state != target {
log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
break
}
}
}
func waitForRetVal(peer peer.CwtchPeer, convId int, szp attr.ScopedZonedPath) { func waitForRetVal(peer peer.CwtchPeer, convId int, szp attr.ScopedZonedPath) {
for { for {
_, err := peer.GetConversationAttribute(convId, szp) _, err := peer.GetConversationAttribute(convId, szp)
@ -236,10 +216,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
t.Fatalf("Alice password did not change...") t.Fatalf("Alice password did not change...")
} }
waitForConnection(t, alice, bob.GetOnion(), connections.AUTHENTICATED) WaitForConnection(t, alice, bob.GetOnion(), connections.AUTHENTICATED)
waitForConnection(t, alice, carol.GetOnion(), connections.AUTHENTICATED) WaitForConnection(t, alice, carol.GetOnion(), connections.AUTHENTICATED)
waitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED) WaitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED)
waitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED) WaitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED)
log.Infof("Alice and Bob getVal public.name...") log.Infof("Alice and Bob getVal public.name...")
@ -323,9 +303,9 @@ func TestCwtchPeerIntegration(t *testing.T) {
} }
log.Infof("Waiting for alice to join server...") log.Infof("Waiting for alice to join server...")
waitForConnection(t, alice, ServerAddr, connections.SYNCED) WaitForConnection(t, alice, ServerAddr, connections.SYNCED)
log.Infof("Waiting for Bob to join connect to group server...") log.Infof("Waiting for Bob to join connect to group server...")
waitForConnection(t, bob, ServerAddr, connections.SYNCED) WaitForConnection(t, bob, ServerAddr, connections.SYNCED)
// 1 = Alice // 1 = Alice
// 2 = Server // 2 = Server
@ -351,7 +331,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
if len(cachedTokens) > (usedTokens + len(carolLines)) { if len(cachedTokens) > (usedTokens + len(carolLines)) {
carol.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)]) carol.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)])
} }
waitForConnection(t, carol, ServerAddr, connections.SYNCED) WaitForConnection(t, carol, ServerAddr, connections.SYNCED)
numGoRoutinesPostCarolConnect := runtime.NumGoroutine() numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
// Check Alice Timeline // Check Alice Timeline

View File

@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) {
os.RemoveAll("cwtch.out.png") os.RemoveAll("cwtch.out.png")
os.RemoveAll("cwtch.out.png.manifest") os.RemoveAll("cwtch.out.png.manifest")
log.SetLevel(log.LevelInfo) log.SetLevel(log.LevelDebug)
log.ExcludeFromPattern("tapir") log.ExcludeFromPattern("tapir")
os.Mkdir("tordir", 0700) os.Mkdir("tordir", 0700)
@ -151,13 +151,6 @@ func TestFileSharing(t *testing.T) {
bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true) bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true) alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
alice.PeerWithOnion(bob.GetOnion())
t.Logf("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
alice.AcceptConversation(1)
t.Logf("Alice and Bob are Connected!!")
filesharingFunctionality := filesharing.FunctionalityGate() filesharingFunctionality := filesharing.FunctionalityGate()
@ -167,10 +160,10 @@ func TestFileSharing(t *testing.T) {
} }
alice.SendMessage(1, fileSharingMessage) alice.SendMessage(1, fileSharingMessage)
bob.AcceptConversation(1)
// Wait for the messages to arrive... // Ok this is fun...we just Sent a Message we may not have a connection yet...
time.Sleep(time.Second * 10) // so this test will only pass if sending offline works...
waitForPeerPeerConnection(t, bob, alice)
bob.SendMessage(1, "this is a test message") bob.SendMessage(1, "this is a test message")
bob.SendMessage(1, "this is another test message") bob.SendMessage(1, "this is another test message")

View File

@ -0,0 +1,214 @@
package testing
import (
"crypto/rand"
"encoding/base64"
"fmt"
mrand "math/rand"
"os"
"path"
"path/filepath"
"runtime"
"runtime/pprof"
"testing"
"time"
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/functionality/hybrid"
"cwtch.im/cwtch/functionality/inter"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4"
)
func TestHyrbidGroupIntegration(t *testing.T) {
t.Logf("Starting Hybrid Groups Test")
os.RemoveAll("./storage")
os.RemoveAll("./managerstorage")
// Goroutine Monitoring Start..
numGoRoutinesStart := runtime.NumGoroutine()
log.AddEverythingFromPattern("connectivity")
log.SetLevel(log.LevelInfo)
log.ExcludeFromPattern("connection/connection")
log.ExcludeFromPattern("outbound/3dhauthchannel")
log.ExcludeFromPattern("event/eventmanager")
log.ExcludeFromPattern("tapir")
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")
os.MkdirAll(dataDir, 0700)
// we don't need real randomness for the port, just to avoid a possible conflict...
socksPort := mrand.Intn(1000) + 9051
controlPort := mrand.Intn(1000) + 9052
// generate a random password
key := make([]byte, 64)
_, err := rand.Read(key)
if err != nil {
panic(err)
}
useCache := os.Getenv("TORCACHE") == "true"
torDataDir := ""
if useCache {
log.Infof("using tor cache")
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
os.MkdirAll(torDataDir, 0700)
} else {
log.Infof("using clean tor data dir")
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
t.Fatalf("could not create data dir")
}
}
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
if err != nil {
t.Fatalf("Could not start Tor: %v", err)
}
log.Infof("Waiting for tor to bootstrap...")
acn.WaitTillBootstrapped()
defer acn.Close()
// ***** Cwtch Server management *****
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"))
// ***** cwtchPeer setup *****
// Turn on Groups Experiment...
settings := app.ReadSettings()
settings.ExperimentsEnabled = true
settings.Experiments[constants.GroupsExperiment] = true
settings.Experiments[constants.GroupManagerExperiment] = true
app.UpdateSettings(settings)
alice := MakeProfile(app, "Alice")
bob := MakeProfile(app, "Bob")
manager := MakeProfile(app, "Manager")
waitTime := time.Duration(60) * time.Second
log.Infof("** Waiting for Alice, Bob, and Carol to register their onion hidden service on the network... (%v)\n", waitTime)
time.Sleep(waitTime)
log.Infof("** Wait Done!")
// Ok Lets Start By Creating a Hybrid Group...
hgmf := hybrid.GroupManagerFunctionality{}
ci, err := hgmf.ManageNewGroup(manager)
if err != nil {
t.Fatalf("could not create hybrid group: %v", err)
}
log.Infof("created a hybrid group: %d. moving onto adding hybrid contacts...", ci)
err = hgmf.AddHybridContact(manager, alice.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid contact (alice): %v", err)
}
err = hgmf.AddHybridContact(manager, bob.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid contact (bob): %v", err)
}
// Now we can allow alice, bob and carol to create a new hybrid group...
log.Infof("now we can allow alice bob and carol to join the hybrid group")
inter := inter.InterfaceFunctionality{}
err = inter.ImportBundle(alice, "managed:"+manager.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid group contact (carol): %v", err)
}
alice.PeerWithOnion(manager.GetOnion()) // explictly trigger a peer request
err = inter.ImportBundle(bob, "managed:"+manager.GetOnion())
if err != nil {
t.Fatalf("could not create hybrid group contact (carol): %v", err)
}
bob.PeerWithOnion(manager.GetOnion())
log.Infof("waiting for alice and manager to connect")
WaitForConnection(t, alice, manager.GetOnion(), connections.AUTHENTICATED)
log.Infof("waiting for bob and manager to connect")
WaitForConnection(t, bob, manager.GetOnion(), connections.AUTHENTICATED)
// at this pont we should be able to send messages to the group, and receive them in the timeline
log.Infof("sending message to group")
_, err = inter.SendMessage(alice, 1, "hello everyone!!!")
if err != nil {
t.Fatalf("hybrid group sending failed... %v", err)
}
// Note: From this point onwards there are no managed-group specific calls. Everything happens
// transparently with respect to the receiver.
time.Sleep(time.Second * 10)
bobMessages, err := bob.GetMostRecentMessages(1, constants.CHANNEL_CHAT, 0, 1)
if err != nil || len(bobMessages) != 1 {
t.Fatalf("hybrid group receipt failed... %v %v ", err, len(bobMessages))
}
if bobMessages[0].Body != "hello everyone!!!" {
t.Fatalf("hybrid group receipt failed...message does not match")
}
aliceMessages, err := alice.GetMostRecentMessages(1, constants.CHANNEL_CHAT, 0, 1)
if err != nil || len(aliceMessages) != 1 {
t.Fatalf("hybrid group receipt failed... %v", err)
}
if aliceMessages[0].Attr[constants.AttrAck] != constants.True {
t.Fatalf("hybrid group receipt failed...alice's message was not ack'd")
}
// Time to Clean Up....
log.Infof("Shutting down Alice...")
app.ShutdownPeer(alice.GetOnion())
time.Sleep(time.Second * 3)
log.Infof("Shutting down Bob...")
app.ShutdownPeer(bob.GetOnion())
time.Sleep(time.Second * 3)
log.Infof("Shutting fown Manager...")
app.ShutdownPeer(manager.GetOnion())
time.Sleep(time.Second * 3)
log.Infof("Shutting down apps...")
log.Infof("app Shutdown: %v\n", runtime.NumGoroutine())
app.Shutdown()
time.Sleep(2 * time.Second)
log.Infof("Done shutdown: %v\n", runtime.NumGoroutine())
log.Infof("Shutting down ACN...")
acn.Close()
time.Sleep(time.Second * 60) // the network status / heartbeat plugin might keep goroutines alive for a minute before killing them
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
// Printing out the current goroutines
// Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
fmt.Println("")
if numGoRoutinesStart != numGoRoutinesPostAppShutdown {
t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, v detected!", numGoRoutinesStart, numGoRoutinesPostAppShutdown)
}
}
func MakeProfile(application app2.Application, name string) peer.CwtchPeer {
application.CreateProfile(name, "asdfasdf", true)
p := app2.WaitGetPeer(application, name)
application.ConfigureConnections(p.GetOnion(), true, true, false)
log.Infof("%s created: %s", name, p.GetOnion())
// bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
p.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
return p
}

32
testing/utils.go Normal file
View File

@ -0,0 +1,32 @@
package testing
import (
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
_ "github.com/mutecomm/go-sqlcipher/v4"
"testing"
"time"
)
func WaitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
for {
log.Infof("%v checking connection...\n", peerName)
state := peer.GetPeerState(addr)
log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state])
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
}
if state != target {
log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
break
}
}
}