Compare commits
No commits in common. "master" and "v0.26.2" have entirely different histories.
|
@ -34,5 +34,3 @@ tokens
|
|||
tordir/
|
||||
testing/autodownload/download_dir
|
||||
testing/autodownload/storage
|
||||
*.swp
|
||||
testing/managerstorage/*
|
36
app/app.go
36
app/app.go
|
@ -1,16 +1,10 @@
|
|||
package app
|
||||
|
||||
import (
|
||||
"os"
|
||||
path "path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"cwtch.im/cwtch/app/plugins"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/extensions"
|
||||
"cwtch.im/cwtch/functionality/filesharing"
|
||||
"cwtch.im/cwtch/functionality/hybrid"
|
||||
"cwtch.im/cwtch/functionality/servers"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
|
@ -21,6 +15,10 @@ import (
|
|||
"cwtch.im/cwtch/storage"
|
||||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"os"
|
||||
path "path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type application struct {
|
||||
|
@ -53,7 +51,7 @@ func (app *application) IsFeatureEnabled(experiment string) bool {
|
|||
// Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
|
||||
type Application interface {
|
||||
LoadProfiles(password string)
|
||||
CreateProfile(name string, password string, autostart bool) string
|
||||
CreateProfile(name string, password string, autostart bool)
|
||||
InstallEngineHooks(engineHooks connections.EngineHooks)
|
||||
ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error)
|
||||
EnhancedImportProfile(exportedCwtchFile string, password string) string
|
||||
|
@ -197,7 +195,7 @@ func (app *application) AddPlugin(peerid string, id plugins.PluginID, bus event.
|
|||
}
|
||||
}
|
||||
|
||||
func (app *application) CreateProfile(name string, password string, autostart bool) string {
|
||||
func (app *application) CreateProfile(name string, password string, autostart bool) {
|
||||
autostartVal := constants.True
|
||||
if !autostart {
|
||||
autostartVal = constants.False
|
||||
|
@ -207,15 +205,10 @@ func (app *application) CreateProfile(name string, password string, autostart bo
|
|||
tagVal = constants.ProfileTypeV1DefaultPassword
|
||||
}
|
||||
|
||||
profile_id, err := app.CreatePeer(name, password, map[attr.ZonedPath]string{
|
||||
app.CreatePeer(name, password, map[attr.ZonedPath]string{
|
||||
attr.ProfileZone.ConstructZonedPath(constants.Tag): tagVal,
|
||||
attr.ProfileZone.ConstructZonedPath(constants.PeerAutostart): autostartVal,
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
return profile_id
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (app *application) setupPeer(profile peer.CwtchPeer) {
|
||||
|
@ -237,7 +230,7 @@ func (app *application) setupPeer(profile peer.CwtchPeer) {
|
|||
|
||||
}
|
||||
|
||||
func (app *application) CreatePeer(name string, password string, attributes map[attr.ZonedPath]string) (string, error) {
|
||||
func (app *application) CreatePeer(name string, password string, attributes map[attr.ZonedPath]string) {
|
||||
app.appmutex.Lock()
|
||||
defer app.appmutex.Unlock()
|
||||
|
||||
|
@ -247,7 +240,7 @@ func (app *application) CreatePeer(name string, password string, attributes map[
|
|||
if err != nil {
|
||||
log.Errorf("Error Creating Peer: %v", err)
|
||||
app.appBus.Publish(event.NewEventList(event.PeerError, event.Error, err.Error()))
|
||||
return "", err
|
||||
return
|
||||
}
|
||||
|
||||
app.setupPeer(profile)
|
||||
|
@ -258,7 +251,6 @@ func (app *application) CreatePeer(name string, password string, attributes map[
|
|||
}
|
||||
|
||||
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True}))
|
||||
return profile.GetOnion(), nil
|
||||
}
|
||||
|
||||
func (app *application) DeleteProfile(onion string, password string) {
|
||||
|
@ -371,12 +363,9 @@ func (app *application) LoadProfiles(password string) {
|
|||
func (app *application) registerHooks(profile peer.CwtchPeer) {
|
||||
// Register Hooks
|
||||
profile.RegisterHook(extensions.ProfileValueExtension{})
|
||||
profile.RegisterHook(extensions.SendWhenOnlineExtension{})
|
||||
profile.RegisterHook(new(filesharing.Functionality))
|
||||
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
|
||||
profile.RegisterHook(new(servers.Functionality))
|
||||
profile.RegisterHook(new(hybrid.ManagedGroupFunctionality))
|
||||
profile.RegisterHook(new(hybrid.GroupManagerFunctionality)) // will only be activated if GroupManagerExperiment is enabled...
|
||||
// Ensure that Profiles have the Most Up to Date Settings...
|
||||
profile.NotifySettingsUpdate(app.settings.ReadGlobalSettings())
|
||||
}
|
||||
|
@ -402,14 +391,12 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
|
|||
func (app *application) ActivatePeerEngine(onion string) {
|
||||
profile := app.GetPeer(onion)
|
||||
if profile != nil {
|
||||
app.appmutex.Lock()
|
||||
if _, exists := app.engines[onion]; !exists {
|
||||
eventBus, exists := app.eventBuses[profile.GetOnion()]
|
||||
|
||||
if !exists {
|
||||
// todo handle this case?
|
||||
log.Errorf("cannot activate peer engine without an event bus")
|
||||
app.appmutex.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -418,13 +405,12 @@ func (app *application) ActivatePeerEngine(onion string) {
|
|||
log.Debugf("restartFlow: Creating a New Protocol Engine...")
|
||||
app.engines[profile.GetOnion()] = engine
|
||||
eventBus.Publish(event.NewEventList(event.ProtocolEngineCreated))
|
||||
app.QueryACNStatus()
|
||||
} else {
|
||||
log.Errorf("corrupted profile detected for %v", onion)
|
||||
}
|
||||
}
|
||||
app.appmutex.Unlock()
|
||||
}
|
||||
app.QueryACNStatus()
|
||||
}
|
||||
|
||||
// ConfigureConnections autostarts the given kinds of connections.
|
||||
|
@ -432,9 +418,7 @@ func (app *application) ConfigureConnections(onion string, listen bool, peers bo
|
|||
profile := app.GetPeer(onion)
|
||||
if profile != nil {
|
||||
|
||||
app.appmutex.Lock()
|
||||
profileBus, exists := app.eventBuses[profile.GetOnion()]
|
||||
app.appmutex.Unlock()
|
||||
if exists {
|
||||
// if we are making a decision to ignore
|
||||
if !peers || !servers {
|
||||
|
|
|
@ -284,7 +284,6 @@ const (
|
|||
Status = Field("Status")
|
||||
EventID = Field("EventID")
|
||||
EventContext = Field("EventContext")
|
||||
Channel = Field("Channel")
|
||||
Index = Field("Index")
|
||||
RowIndex = Field("RowIndex")
|
||||
ContentHash = Field("ContentHash")
|
||||
|
|
|
@ -95,11 +95,6 @@ func (em *manager) initialize() {
|
|||
func (em *manager) Subscribe(eventType Type, queue Queue) {
|
||||
em.mapMutex.Lock()
|
||||
defer em.mapMutex.Unlock()
|
||||
for _, sub := range em.subscribers[eventType] {
|
||||
if sub == queue {
|
||||
return // don't add the same queue for the same event twice...
|
||||
}
|
||||
}
|
||||
em.subscribers[eventType] = append(em.subscribers[eventType], queue)
|
||||
}
|
||||
|
||||
|
|
|
@ -104,15 +104,6 @@ func (pne ProfileValueExtension) OnContactRequestValue(profile peer.CwtchPeer, c
|
|||
val, exists = profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
|
||||
}
|
||||
|
||||
// NOTE: Cwtch 1.15+ requires that profiles be able to restrict file downloading to specific contacts. As such we need an ACL check here
|
||||
// on the fileshareing zone.
|
||||
// TODO: Split this functionality into FilesharingFunctionality, and restrict this function to only considering Profile zoned attributes?
|
||||
if zone == attr.FilesharingZone {
|
||||
if !conversation.GetPeerAC().ShareFiles {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Construct a Response
|
||||
resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.RemotePeer: conversation.Handle, event.Exists: strconv.FormatBool(exists)})
|
||||
resp.EventID = eventID
|
||||
|
|
|
@ -1,91 +0,0 @@
|
|||
package extensions
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"cwtch.im/cwtch/settings"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
)
|
||||
|
||||
// SendWhenOnlineExtension implements automatic sending
|
||||
// Some Considerations:
|
||||
// - There are race conditions inherant in this approach e.g. a peer could go offline just after recieving a message and never sending an ack
|
||||
// - In that case the next time we connect we will send a duplicate message.
|
||||
// - Currently we do not include metadata like sent time in raw peer protocols (however Overlay does now have support for that information)
|
||||
type SendWhenOnlineExtension struct {
|
||||
}
|
||||
|
||||
func (soe SendWhenOnlineExtension) NotifySettingsUpdate(_ settings.GlobalSettings) {
|
||||
}
|
||||
|
||||
func (soe SendWhenOnlineExtension) EventsToRegister() []event.Type {
|
||||
return []event.Type{event.PeerStateChange}
|
||||
}
|
||||
|
||||
func (soe SendWhenOnlineExtension) ExperimentsToRegister() []string {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (soe SendWhenOnlineExtension) OnEvent(ev event.Event, profile peer.CwtchPeer) {
|
||||
switch ev.EventType {
|
||||
case event.PeerStateChange:
|
||||
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
|
||||
if err == nil {
|
||||
// if we have re-authenticated with thie peer then request their profile image...
|
||||
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
|
||||
log.Infof("Sending Offline Messages to %s", ci.Handle)
|
||||
// Check the last 100 messages, if any of them are pending, then send them now...
|
||||
messsages, _ := profile.GetMostRecentMessages(ci.ID, constants.CHANNEL_CHAT, 0, uint(100))
|
||||
slices.Reverse(messsages)
|
||||
for _, message := range messsages {
|
||||
if message.Attr[constants.AttrAck] == constants.False {
|
||||
sent, timeparseerr := time.Parse(time.RFC3339, message.Attr[constants.AttrSentTimestamp])
|
||||
if timeparseerr != nil {
|
||||
continue
|
||||
}
|
||||
if time.Since(sent) > time.Hour*24*7 {
|
||||
continue
|
||||
}
|
||||
body := message.Body
|
||||
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Data: body})
|
||||
ev.EventID = message.Signature // we need this ensure that we correctly ack this in the db when it comes back
|
||||
// TODO: The EventBus is becoming very noisy...we may want to consider a one-way shortcut to Engine i.e. profile.Engine.SendMessageToPeer
|
||||
log.Infof("resending message that was sent when peer was offline")
|
||||
profile.PublishEvent(ev)
|
||||
}
|
||||
}
|
||||
if ci.HasChannel(constants.CHANNEL_MANAGER) {
|
||||
messsages, _ = profile.GetMostRecentMessages(ci.ID, constants.CHANNEL_MANAGER, 0, uint(100))
|
||||
slices.Reverse(messsages)
|
||||
for _, message := range messsages {
|
||||
if message.Attr[constants.AttrAck] == constants.False {
|
||||
body := message.Body
|
||||
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Data: body})
|
||||
ev.EventID = message.Signature // we need this ensure that we correctly ack this in the db when it comes back
|
||||
// TODO: The EventBus is becoming very noisy...we may want to consider a one-way shortcut to Engine i.e. profile.Engine.SendMessageToPeer
|
||||
log.Debugf("resending message that was sent when peer was offline")
|
||||
profile.PublishEvent(ev)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// OnContactReceiveValue is nop for SendWhenOnnlineExtension
|
||||
func (soe SendWhenOnlineExtension) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, szp attr.ScopedZonedPath, value string, exists bool) {
|
||||
}
|
||||
|
||||
// OnContactRequestValue is nop for SendWhenOnnlineExtension
|
||||
func (soe SendWhenOnlineExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) {
|
||||
|
||||
}
|
|
@ -62,18 +62,10 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP
|
|||
if err == nil {
|
||||
for _, ci := range conversations {
|
||||
if profile.GetPeerState(ci.Handle) == connections.AUTHENTICATED {
|
||||
// if we have enabled file shares for this contact, then send them our profile image
|
||||
// NOTE: In the past, Cwtch treated "profile image" as a public file share. As such, anyone with the file key and who is able
|
||||
// to authenticate with the profile (i.e. non-blocked peers) can download the file (if the global profile images experiment is enabled)
|
||||
// To better allow for fine-grained permissions (and to support hybrid group permissions), we want to enable per-conversation file
|
||||
// sharing permissions. As such, profile images are now only shared with contacts with that permission enabled.
|
||||
// (i.e. all previous accepted contacts, new accepted contacts, and contacts who have this toggle set explictly)
|
||||
if ci.GetPeerAC().ShareFiles {
|
||||
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
case event.ProtocolEngineCreated:
|
||||
// Now that the Peer Engine is Activated, Reshare Profile Images
|
||||
key, exists := profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
|
||||
|
|
|
@ -1,107 +0,0 @@
|
|||
package hybrid
|
||||
|
||||
import (
|
||||
"crypto/ed25519"
|
||||
"encoding/base32"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
)
|
||||
|
||||
const ManagedGroupOpen = "managed-group-open"
|
||||
|
||||
type GroupEventType int
|
||||
|
||||
const (
|
||||
MemberGroupIDKey = "member_group_id_key"
|
||||
MemberMessageIDKey = "member_group_messge_id"
|
||||
)
|
||||
|
||||
const (
|
||||
AddMember = GroupEventType(0x1000)
|
||||
RemoveMember = GroupEventType(0x2000)
|
||||
RotateKey = GroupEventType(0x3000)
|
||||
NewMessage = GroupEventType(0x4000)
|
||||
NewClearMessage = GroupEventType(0x5000)
|
||||
SyncRequest = GroupEventType(0x6000)
|
||||
)
|
||||
|
||||
type ManageGroupEvent struct {
|
||||
EventType GroupEventType `json:"t"`
|
||||
Data string `json:"d"` // json encoded data
|
||||
}
|
||||
|
||||
type AddMemberEvent struct {
|
||||
Handle string `json:"h"`
|
||||
}
|
||||
|
||||
type RemoveMemberEvent struct {
|
||||
Handle string `json:"h"`
|
||||
}
|
||||
|
||||
type RotateKeyEvent struct {
|
||||
Key []byte `json:"k"`
|
||||
}
|
||||
|
||||
type NewMessageEvent struct {
|
||||
EncryptedHybridGroupMessage []byte `json:"m"`
|
||||
}
|
||||
|
||||
type NewClearMessageEvent struct {
|
||||
HybridGroupMessage HybridGroupMessage `json:"m"`
|
||||
}
|
||||
|
||||
type SyncRequestMessage struct {
|
||||
// a map of MemberGroupID: MemberMessageID
|
||||
LastSeen map[int]int `json:"l"`
|
||||
}
|
||||
|
||||
// This file contains code for the Hybrid Group / Managed Group types..
|
||||
type HybridGroupMessage struct {
|
||||
Author string `json:"a"` // the authors cwtch address
|
||||
MemberGroupID uint32 `json:"g"`
|
||||
MemberMessageID uint32 `json:"m"`
|
||||
MessageBody string `json:"b"`
|
||||
Sent uint64 `json:"t"` // milliseconds since epoch
|
||||
Signature []byte `json:"s"` // of json-encoded content (including empty sig)
|
||||
}
|
||||
|
||||
// AuthenticateMessage returns true if the Author of the message produced the Signature over the message
|
||||
func AuthenticateMessage(message HybridGroupMessage) bool {
|
||||
messageCopy := message
|
||||
messageCopy.Signature = []byte{}
|
||||
// Otherwise we derive the public key from the sender and check it against that.
|
||||
decodedPub, err := base32.StdEncoding.DecodeString(strings.ToUpper(message.Author))
|
||||
if err == nil {
|
||||
data, err := json.Marshal(messageCopy)
|
||||
if err == nil && len(decodedPub) >= 32 {
|
||||
return ed25519.Verify(decodedPub[:32], data, message.Signature)
|
||||
}
|
||||
}
|
||||
log.Errorf("invalid signature on message from %s", message)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func CheckACL(handle string, group *model.Conversation) (*model.AccessControl, error) {
|
||||
if isOpen, exists := group.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(ManagedGroupOpen)).ToString()]; !exists {
|
||||
return nil, fmt.Errorf("group has not been setup correctly - ManagedGroupOpen does not exist ")
|
||||
} else if isOpen == event.True {
|
||||
// We don't need to do a membership check
|
||||
defaultACL := group.GetPeerAC()
|
||||
return &defaultACL, nil
|
||||
}
|
||||
// If this is a closed group. Check if we have an ACL entry for this member
|
||||
// If we don't OR that member has been blocked, then close the connection.
|
||||
if acl, inGroup := group.ACL[handle]; !inGroup || acl.Blocked {
|
||||
log.Infof("ACL Check Failed: %v %v %v", handle, acl, inGroup)
|
||||
return nil, fmt.Errorf("peer is not a member of this group")
|
||||
} else {
|
||||
return &acl, nil
|
||||
}
|
||||
}
|
|
@ -1,227 +0,0 @@
|
|||
// This file contains all code related to how a Group Manager operates over a group.
|
||||
// Managed groups are canonically controlled by members setting
|
||||
// the ManageGroup permission in the conversation ACL; allowing the manager to
|
||||
// take control of how this group is structured, see OnEvent below...
|
||||
// TODO: This file represents stage 1 of the roll out which de-risks most of the
|
||||
// integration into cwtch peer, new interfaces, and UI integration
|
||||
// The following functionality is not yet implemented:
|
||||
// - group-level encryption
|
||||
// - key rotation / membership ACL
|
||||
// Cwtch Hybrid Groups are still very experimental functionality and should
|
||||
// only be used for testing purposes.
|
||||
package hybrid
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"cwtch.im/cwtch/settings"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
)
|
||||
|
||||
// MANAGED_GROUP_HANDLE denotes the nominal name that the managed group is given, for easier handling
|
||||
// Note: we could use id here, as the managed group should technically always be the first group
|
||||
// But we don't want to assume that, and also allow conversations to be moved around without
|
||||
// constantly referring to a magic id.
|
||||
const MANAGED_GROUP_HANDLE = "managed:000"
|
||||
|
||||
type GroupManagerFunctionality struct {
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) EventsToRegister() []event.Type {
|
||||
return []event.Type{event.PeerStateChange, event.NewMessageFromPeerEngine}
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) ExperimentsToRegister() []string {
|
||||
return []string{constants.GroupManagerExperiment, constants.GroupsExperiment}
|
||||
}
|
||||
|
||||
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
|
||||
func (f *GroupManagerFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
|
||||
|
||||
// We only want to engage this functionality if the peer is managing a group.
|
||||
// In that case ALL peer connections and messages need to be routed through
|
||||
// the management logic
|
||||
// For now, we assume that a manager is a peer with a special management group.
|
||||
// In the future we may want to make this a profile-level switch/attribute.
|
||||
isManager := false
|
||||
if ci, err := profile.FetchConversationInfo(MANAGED_GROUP_HANDLE); ci != nil && err == nil {
|
||||
isManager = true
|
||||
}
|
||||
|
||||
if isManager {
|
||||
switch ev.EventType {
|
||||
case event.PeerStateChange:
|
||||
handle := ev.Data["RemotePeer"]
|
||||
// check that we have authenticated with this peer
|
||||
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
|
||||
mg, err := f.GetManagedGroup(profile)
|
||||
if err != nil {
|
||||
log.Infof("group manager received peer connections but no suitable group has been found: %v %v", handle, err)
|
||||
profile.DisconnectFromPeer(handle)
|
||||
break
|
||||
}
|
||||
if _, err := CheckACL(handle, mg); err != nil {
|
||||
log.Infof("received managed group connection from unauthorized peer: %v %v", handle, err)
|
||||
profile.DisconnectFromPeer(handle)
|
||||
break
|
||||
}
|
||||
}
|
||||
// This is where most of the magic happens for managed groups. A few notes:
|
||||
// - CwtchPeer has already taken care of storing this for us, we don't need to worry about that
|
||||
// - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected.
|
||||
case event.NewMessageFromPeerEngine:
|
||||
log.Infof("received new message from peer: manager")
|
||||
ci, err := f.GetManagedGroup(profile)
|
||||
if err != nil {
|
||||
log.Errorf("unknown conversation %v", err)
|
||||
break // we don't care about unknown conversations...
|
||||
}
|
||||
var cm model.MessageWrapper
|
||||
err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
|
||||
if err != nil {
|
||||
log.Errorf("could not deserialize json %s %v", ev.Data[event.Data], err)
|
||||
break
|
||||
}
|
||||
// The overlay type of this message **must** be ManageGroupEvent
|
||||
if cm.Overlay == model.OverlayManageGroupEvent {
|
||||
var mge ManageGroupEvent
|
||||
err = json.Unmarshal([]byte(cm.Data), &mge)
|
||||
if err == nil {
|
||||
f.handleEvent(profile, *ci, mge, ev.Data[event.Data])
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling
|
||||
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
|
||||
func (f *GroupManagerFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent, original string) {
|
||||
switch mge.EventType {
|
||||
case NewClearMessage:
|
||||
var nme NewClearMessageEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &nme)
|
||||
if err == nil {
|
||||
f.handleNewMessageEvent(profile, conversation, nme, original)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent, original string) {
|
||||
log.Infof("handling new clear message event")
|
||||
hgm := nme.HybridGroupMessage
|
||||
if AuthenticateMessage(hgm) {
|
||||
log.Infof("authenticated message")
|
||||
group, err := f.GetManagedGroup(profile)
|
||||
if err != nil {
|
||||
log.Infof("received fraudulant hybrid message from group: %v", err)
|
||||
return
|
||||
}
|
||||
if acl, err := CheckACL(hgm.Author, group); err != nil {
|
||||
log.Infof("received fraudulant hybrid message from group: %v", err)
|
||||
return
|
||||
} else if !acl.Append {
|
||||
log.Infof("received fraudulant hybrid message from group: peer does not have append privileges")
|
||||
return
|
||||
} else {
|
||||
|
||||
// TODO - Store this message locally in a format that makes it easier to
|
||||
// do assurance later on
|
||||
|
||||
// forward the message to everyone who the server has added as a contact
|
||||
// and who are represented in the ACL...
|
||||
allConversations, _ := profile.FetchConversations()
|
||||
for _, ci := range allConversations {
|
||||
// NOTE: This check works for Open Groups too as CheckACL will return the default ACL
|
||||
// for the group....
|
||||
if ci.Handle != MANAGED_GROUP_HANDLE { // don't send to ourselves...
|
||||
if acl, err := CheckACL(hgm.Author, group); err == nil && acl.Read {
|
||||
log.Infof("forwarding group message to: %v", ci.Handle)
|
||||
profile.SendMessage(ci.ID, original)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Errorf("received fraudulant hybrid message fom group")
|
||||
}
|
||||
}
|
||||
|
||||
// GetManagedGroup is a convieniance function that looks up the managed group
|
||||
func (f *GroupManagerFunctionality) GetManagedGroup(profile peer.CwtchPeer) (*model.Conversation, error) {
|
||||
return profile.FetchConversationInfo(MANAGED_GROUP_HANDLE)
|
||||
}
|
||||
|
||||
// Establish a new Managed Group and return its conversation id
|
||||
func (f *GroupManagerFunctionality) ManageNewGroup(profile peer.CwtchPeer) (int, error) {
|
||||
// note: a manager can only manage one group. This will (probably) always be true and has a few benefits
|
||||
// and downsides.
|
||||
// The main downside is that it requires a new manager per group (and thus an onion service per group)
|
||||
// However, it means that we can lean on p2p functionality like profile images / metadata / name
|
||||
// etc. for group metadata and effectively get that for-free in the client.
|
||||
// HOWEVER: hedging our bets here by giving this group a numeric handle...
|
||||
if _, err := profile.FetchConversationInfo(MANAGED_GROUP_HANDLE); err == nil {
|
||||
return -1, fmt.Errorf("manager is already managing a group")
|
||||
}
|
||||
|
||||
ac := model.DefaultP2PAccessControl()
|
||||
// by setting the ManageGroup permission in this ACL we are allowing the manager to
|
||||
// take control of how this group is structured, see OnEvent above...
|
||||
ac.ManageGroup = true
|
||||
acl := model.AccessControlList{}
|
||||
acl[profile.GetOnion()] = ac
|
||||
acl[MANAGED_GROUP_HANDLE] = model.NoAccessControl()
|
||||
ci, err := profile.NewConversation(MANAGED_GROUP_HANDLE, acl)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(ManagedGroupOpen)), event.False)
|
||||
|
||||
return ci, nil
|
||||
}
|
||||
|
||||
// AddHybridContact is a wrapper arround NewContactConversation which sets the contact
|
||||
// up for Hybrid Group channel messages...
|
||||
// TODO this function assumes that authorization has been done at a higher level..
|
||||
func (f *GroupManagerFunctionality) AddHybridContact(profile peer.CwtchPeer, handle string) error {
|
||||
ac := model.DefaultP2PAccessControl()
|
||||
ac.ManageGroup = false
|
||||
ci, err := profile.NewContactConversation(handle, ac, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mg, err := f.GetManagedGroup(profile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Update the ACL list to add this contact...
|
||||
acl := mg.ACL
|
||||
acl[handle] = model.DefaultP2PAccessControl()
|
||||
profile.UpdateConversationAccessControlList(mg.ID, acl)
|
||||
// enable channel 2 on this conversation (hybrid groups management channel)
|
||||
profile.InitChannel(ci, constants.CHANNEL_MANAGER)
|
||||
key := fmt.Sprintf("channel.%d", constants.CHANNEL_MANAGER)
|
||||
profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True)
|
||||
// Group managers need to always save history (and manually deal with purging...)
|
||||
profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(event.SaveHistoryKey)), event.SaveHistoryConfirmed)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
|
||||
// nop hybrid group conversations do not exchange contact requests
|
||||
}
|
||||
|
||||
func (f *GroupManagerFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
|
||||
// nop hybrid group conversations do not exchange contact requests
|
||||
}
|
|
@ -1,330 +0,0 @@
|
|||
package hybrid
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/settings"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"golang.org/x/crypto/nacl/secretbox"
|
||||
"math"
|
||||
"math/big"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ManagedGroupFunctionality struct {
|
||||
}
|
||||
|
||||
func (f ManagedGroupFunctionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
|
||||
}
|
||||
|
||||
func (f ManagedGroupFunctionality) EventsToRegister() []event.Type {
|
||||
return []event.Type{event.NewMessageFromPeerEngine}
|
||||
}
|
||||
|
||||
func (f ManagedGroupFunctionality) ExperimentsToRegister() []string {
|
||||
return []string{constants.GroupsExperiment}
|
||||
}
|
||||
|
||||
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
|
||||
func (f *ManagedGroupFunctionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
|
||||
switch ev.EventType {
|
||||
// This is where most of the magic happens for managed groups. A few notes:
|
||||
// - CwtchPeer has already taken care of storing this for us, we don't need to worry about that
|
||||
// - Group Managers **only** speak overlays and **always** wrap their messages in a ManageGroupEvent anything else is fast-rejected.
|
||||
case event.NewMessageFromPeerEngine:
|
||||
handle := ev.Data[event.RemotePeer]
|
||||
ci, err := profile.FetchConversationInfo(handle)
|
||||
if err != nil {
|
||||
break // we don't care about unknown conversations...
|
||||
}
|
||||
|
||||
// We reject managed group requests for groups not setup as managed groups...
|
||||
if ci.ACL[handle].ManageGroup {
|
||||
var cm model.MessageWrapper
|
||||
err = json.Unmarshal([]byte(ev.Data[event.Data]), &cm)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
// The overlay type of this message **must** be ManageGroupEvent
|
||||
if cm.Overlay == model.OverlayManageGroupEvent {
|
||||
var mge ManageGroupEvent
|
||||
err = json.Unmarshal([]byte(cm.Data), &mge)
|
||||
if err == nil {
|
||||
cid, err := profile.FetchConversationInfo(handle)
|
||||
if err == nil {
|
||||
f.handleEvent(profile, *cid, mge)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleEvent takes in a high level ManageGroupEvent message, transforms it into the proper type, and passes it on for handling
|
||||
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
|
||||
func (f *ManagedGroupFunctionality) handleEvent(profile peer.CwtchPeer, conversation model.Conversation, mge ManageGroupEvent) {
|
||||
switch mge.EventType {
|
||||
case AddMember:
|
||||
var ame AddMemberEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &ame)
|
||||
if err == nil {
|
||||
f.handleAddMemberEvent(profile, conversation, ame)
|
||||
}
|
||||
case RemoveMember:
|
||||
var rme RemoveMemberEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &rme)
|
||||
if err == nil {
|
||||
f.handleRemoveMemberEvent(profile, conversation, rme)
|
||||
}
|
||||
case NewMessage:
|
||||
var nme NewMessageEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &nme)
|
||||
if err == nil {
|
||||
f.handleNewMessageEvent(profile, conversation, nme)
|
||||
}
|
||||
case NewClearMessage:
|
||||
var nme NewClearMessageEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &nme)
|
||||
if err == nil {
|
||||
f.handleNewClearMessageEvent(profile, conversation, nme)
|
||||
}
|
||||
case RotateKey:
|
||||
var rke RotateKeyEvent
|
||||
err := json.Unmarshal([]byte(mge.Data), &rke)
|
||||
if err == nil {
|
||||
f.handleRotateKeyEvent(profile, conversation, rke)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleAddMemberEvent adds a group member to the conversation ACL
|
||||
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
|
||||
func (f *ManagedGroupFunctionality) handleAddMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, ame AddMemberEvent) {
|
||||
acl := conversation.ACL
|
||||
acl[ame.Handle] = model.DefaultP2PAccessControl()
|
||||
profile.UpdateConversationAccessControlList(conversation.ID, acl)
|
||||
}
|
||||
|
||||
// handleRemoveMemberEvent removes a group member from the conversation ACL
|
||||
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
|
||||
func (f *ManagedGroupFunctionality) handleRemoveMemberEvent(profile peer.CwtchPeer, conversation model.Conversation, rme RemoveMemberEvent) {
|
||||
acl := conversation.ACL
|
||||
delete(acl, rme.Handle)
|
||||
profile.UpdateConversationAccessControlList(conversation.ID, acl)
|
||||
}
|
||||
|
||||
// handleRotateKeyEvent rotates the encryption key for a given group
|
||||
// assumes we are called after an event provided by an authorized peer (i.e. ManageGroup == true)
|
||||
// TODO this currently is a noop as group levle encryption is unimplemented
|
||||
func (f *ManagedGroupFunctionality) handleRotateKeyEvent(profile peer.CwtchPeer, conversation model.Conversation, rke RotateKeyEvent) {
|
||||
keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key"))
|
||||
keyB64 := base64.StdEncoding.EncodeToString(rke.Key)
|
||||
profile.SetConversationAttribute(conversation.ID, keyScope, keyB64)
|
||||
}
|
||||
|
||||
// TODO this is a sketch implementation that is not yet complete.
|
||||
func (f *ManagedGroupFunctionality) handleNewMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewMessageEvent) {
|
||||
keyScope := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath("key"))
|
||||
if keyB64, err := profile.GetConversationAttribute(conversation.ID, keyScope); err == nil {
|
||||
key, err := base64.StdEncoding.DecodeString(keyB64)
|
||||
if err != nil || len(key) != 32 {
|
||||
log.Errorf("hybrid group key is corrupted")
|
||||
return
|
||||
}
|
||||
// decrypt the message with key...
|
||||
hgm, err := f.decryptMessage(key, nme.EncryptedHybridGroupMessage)
|
||||
if hgm == nil || err != nil {
|
||||
log.Errorf("unable to decrypt hybrid group message: %v", err)
|
||||
return
|
||||
}
|
||||
f.handleNewClearMessageEvent(profile, conversation, NewClearMessageEvent{HybridGroupMessage: *hgm})
|
||||
}
|
||||
}
|
||||
|
||||
func (f *ManagedGroupFunctionality) handleNewClearMessageEvent(profile peer.CwtchPeer, conversation model.Conversation, nme NewClearMessageEvent) {
|
||||
hgm := nme.HybridGroupMessage
|
||||
if AuthenticateMessage(hgm) {
|
||||
// TODO Closed Group Membership Check - right now we only support open groups...
|
||||
if profile.GetOnion() == hgm.Author {
|
||||
// ack
|
||||
signatureB64 := base64.StdEncoding.EncodeToString(hgm.Signature)
|
||||
id, err := profile.GetChannelMessageBySignature(conversation.ID, constants.CHANNEL_CHAT, signatureB64)
|
||||
if err == nil {
|
||||
profile.UpdateMessageAttribute(conversation.ID, constants.CHANNEL_CHAT, id, constants.AttrAck, constants.True)
|
||||
profile.PublishEvent(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.Index: strconv.Itoa(id)}))
|
||||
}
|
||||
} else {
|
||||
mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on...
|
||||
newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID))
|
||||
// Set the attributes of this message...
|
||||
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr,
|
||||
constants.AttrAuthor: hgm.Author,
|
||||
constants.AttrAck: event.True,
|
||||
constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)}
|
||||
|
||||
// Note: The Channel here is 0...this is the main channel that UIs understand as the default, so this message is
|
||||
// becomes part of the conversation...
|
||||
mid, err := profile.InternalInsertMessage(conversation.ID, constants.CHANNEL_CHAT, hgm.Author, hgm.MessageBody, attr, hgm.Signature)
|
||||
contenthash := model.CalculateContentHash(hgm.Author, hgm.MessageBody)
|
||||
if err == nil {
|
||||
profile.PublishEvent(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.TimestampSent: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano), event.RemotePeer: hgm.Author, event.Index: strconv.Itoa(mid), event.Data: hgm.MessageBody, event.ContentHash: contenthash}))
|
||||
}
|
||||
}
|
||||
|
||||
// TODO need to send an event here...
|
||||
} else {
|
||||
log.Errorf("received fraudulant hybrid message fom group")
|
||||
}
|
||||
}
|
||||
|
||||
// todo sketch function
|
||||
func (f *ManagedGroupFunctionality) decryptMessage(key []byte, ciphertext []byte) (*HybridGroupMessage, error) {
|
||||
if len(ciphertext) > 24 {
|
||||
var decryptNonce [24]byte
|
||||
copy(decryptNonce[:], ciphertext[:24])
|
||||
var fixedSizeKey [32]byte
|
||||
copy(fixedSizeKey[:], key[:32])
|
||||
decrypted, ok := secretbox.Open(nil, ciphertext[24:], &decryptNonce, &fixedSizeKey)
|
||||
if ok {
|
||||
var hgm HybridGroupMessage
|
||||
err := json.Unmarshal(decrypted, &hgm)
|
||||
return &hgm, err
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("invalid ciphertext/key error")
|
||||
}
|
||||
|
||||
// Define a new managed group, managed by the manager...
|
||||
func (f *ManagedGroupFunctionality) NewManagedGroup(profile peer.CwtchPeer, manager string) error {
|
||||
|
||||
// generate a truely random member id for this group in [0..2^32)
|
||||
nBig, err := rand.Int(rand.Reader, big.NewInt(math.MaxUint32))
|
||||
if err != nil {
|
||||
return err // if there is a problem with random we want to exit now rather than have to clean up group setup...
|
||||
}
|
||||
|
||||
ac := model.DefaultP2PAccessControl()
|
||||
ac.ManageGroup = true // by setting the ManageGroup permission in this ACL we are allowing the manager to control of how this group is structured
|
||||
ci, err := profile.NewContactConversation(manager, ac, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// enable channel 2 on this conversation (hybrid groups management channel)
|
||||
key := fmt.Sprintf("channel.%d", 2)
|
||||
err = profile.SetConversationAttribute(ci, attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)), constants.True)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not enable channel 2 on hybrid group: %v", err) // likely a catestrophic error...fail
|
||||
}
|
||||
err = profile.InitChannel(ci, 2)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not enable channel 2 on hybrid group: %v", err) // likely a catestrophic error...fail
|
||||
}
|
||||
|
||||
// finally, set the member group id on this group...
|
||||
mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey))
|
||||
err = profile.SetConversationAttributeInt(ci, mgidkey, int(nBig.Uint64()))
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not set group id on hybrid group: %v", err) // likely a catestrophic error...fail
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendMessageToManagedGroup acts like SendMessage(ToPeer), but with a few additional bookkeeping steps for Hybrid Groups
|
||||
func (f *ManagedGroupFunctionality) SendMessageToManagedGroup(profile peer.CwtchPeer, conversation int, message string) (int, error) {
|
||||
mgidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberGroupIDKey))
|
||||
mgid, err := profile.GetConversationAttributeInt(conversation, mgidkey)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
mmidkey := attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(MemberMessageIDKey))
|
||||
mmid, err := profile.GetConversationAttributeInt(conversation, mmidkey)
|
||||
if err != nil {
|
||||
mmid = 0 // first message
|
||||
}
|
||||
|
||||
mmid += 1
|
||||
|
||||
// Now time to package this whole thing in layers of JSON...
|
||||
hgm := HybridGroupMessage{
|
||||
MemberGroupID: uint32(mgid),
|
||||
MemberMessageID: uint32(mmid),
|
||||
Sent: uint64(time.Now().UnixMilli()),
|
||||
Author: profile.GetOnion(),
|
||||
MessageBody: message,
|
||||
Signature: []byte{}, // Leave blank so we can sign this message...
|
||||
}
|
||||
|
||||
data, err := json.Marshal(hgm)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// Don't forget to sign the message...
|
||||
sig, err := profile.SignMessage(data)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
hgm.Signature = sig
|
||||
|
||||
ncm := NewClearMessageEvent{
|
||||
HybridGroupMessage: hgm,
|
||||
}
|
||||
|
||||
signedData, err := json.Marshal(ncm)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
mgm := ManageGroupEvent{
|
||||
EventType: NewClearMessage,
|
||||
Data: string(signedData),
|
||||
}
|
||||
|
||||
odata, err := json.Marshal(mgm)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
overlay := model.MessageWrapper{
|
||||
Overlay: model.OverlayManageGroupEvent,
|
||||
Data: string(odata),
|
||||
}
|
||||
|
||||
ojson, err := json.Marshal(overlay)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// send the message to the manager and update our message is string for tracking...
|
||||
_, err = profile.SendMessage(conversation, string(ojson))
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
profile.SetConversationAttributeInt(conversation, mmidkey, mmid)
|
||||
|
||||
// ok there is still one more thing we need to do...
|
||||
// insert this message as part of our group log, for members of the group
|
||||
// this exists in channel 0 of the conversation with the group manager...
|
||||
mgidstr := strconv.Itoa(mgid) // we need both MemberGroupId and MemberMessageId for attestation later on...
|
||||
newmmidstr := strconv.Itoa(mmid)
|
||||
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr, constants.AttrAuthor: profile.GetOnion(), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}
|
||||
return profile.InternalInsertMessage(conversation, 0, hgm.Author, message, attr, hgm.Signature)
|
||||
}
|
||||
|
||||
func (f ManagedGroupFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
|
||||
// nop hybrid group conversations do not exchange contact requests
|
||||
}
|
||||
|
||||
func (f ManagedGroupFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
|
||||
// nop hybrid group conversations do not exchange contact requests
|
||||
}
|
|
@ -1,75 +0,0 @@
|
|||
package inter
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"cwtch.im/cwtch/functionality/hybrid"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
)
|
||||
|
||||
// This functionality is a little different. It's not functionality per-se. It's a wrapper around
|
||||
// CwtchProfile function that combines some core-functionalities like Hybrid Groups so that
|
||||
// they can be transparently exposed in autobindings.
|
||||
// DEV NOTE: consider moving other cross-cutting interface functions here to simplfy CwtchPeer
|
||||
type InterfaceFunctionality struct {
|
||||
}
|
||||
|
||||
// FunctionalityGate returns filesharing functionality - gates now happen on function calls.
|
||||
func FunctionalityGate() *InterfaceFunctionality {
|
||||
return new(InterfaceFunctionality)
|
||||
}
|
||||
|
||||
func (i InterfaceFunctionality) ImportBundle(profile peer.CwtchPeer, uri string) error {
|
||||
// check if this is a managed group. Note: managed groups do not comply with the server bundle format.
|
||||
if strings.HasPrefix(uri, "managed:") {
|
||||
uri = uri[len("managed:"):]
|
||||
if profile.IsFeatureEnabled(constants.GroupsExperiment) {
|
||||
mgf := hybrid.ManagedGroupFunctionality{}
|
||||
return mgf.NewManagedGroup(profile, uri)
|
||||
} else {
|
||||
return errors.New("managed groups require the group experiment to be enabled")
|
||||
}
|
||||
}
|
||||
// DEV NOTE: we may want to eventually move Server Import code to ServerFunctionality and add a hook here...
|
||||
// DEV NOTE: consider making ImportBundle a high-level functionality interface? to support different kinds of contacts?
|
||||
return profile.ImportBundle(uri)
|
||||
}
|
||||
|
||||
// EnhancedImportBundle is identical to EnhancedImportBundle in CwtchPeer but instead of wrapping CwtchPeer.ImportBundle it instead
|
||||
// wraps InterfaceFunctionality.ImportBundle
|
||||
func (i InterfaceFunctionality) EnhancedImportBundle(profile peer.CwtchPeer, uri string) string {
|
||||
err := i.ImportBundle(profile, uri)
|
||||
if err == nil {
|
||||
return "importBundle.success"
|
||||
}
|
||||
return err.Error()
|
||||
}
|
||||
|
||||
// SendMessage sends a message to a conversation.
|
||||
// NOTE: Unlike CwtchPeer.SendMessage this interface makes no guarentees about the raw-ness of the message sent to peer contacts.
|
||||
// If the conversation is a hybrid groups then the message may be wrapped in multiple layers of overlay messages / encryption
|
||||
// prior to being send. To send a raw message to a peer then use peer.CwtchPeer
|
||||
// DEV NOTE: Move Legacy Group message send here...
|
||||
func (i InterfaceFunctionality) SendMessage(profile peer.CwtchPeer, conversation int, message string) (int, error) {
|
||||
ci, err := profile.GetConversationInfo(conversation)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
if ci.ACL[ci.Handle].ManageGroup {
|
||||
mgf := hybrid.ManagedGroupFunctionality{}
|
||||
return mgf.SendMessageToManagedGroup(profile, conversation, message)
|
||||
}
|
||||
return profile.SendMessage(conversation, message)
|
||||
}
|
||||
|
||||
// EnhancedSendMessage Attempts to Send a Message and Immediately Attempts to Lookup the Message in the Database
|
||||
// this wraps InterfaceFunctionality.SendMessage to support HybridGroups
|
||||
func (i InterfaceFunctionality) EnhancedSendMessage(profile peer.CwtchPeer, conversation int, message string) string {
|
||||
mid, err := i.SendMessage(profile, conversation, message)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return profile.EnhancedGetMessageById(conversation, mid)
|
||||
}
|
1
go.mod
1
go.mod
|
@ -16,6 +16,7 @@ require (
|
|||
require (
|
||||
filippo.io/edwards25519 v1.0.0 // indirect
|
||||
git.openprivacy.ca/openprivacy/bine v0.0.5 // indirect
|
||||
github.com/client9/misspell v0.3.4 // indirect
|
||||
github.com/google/go-cmp v0.5.8 // indirect
|
||||
github.com/gtank/merlin v0.1.1 // indirect
|
||||
github.com/mimoo/StrobeGo v0.0.0-20220103164710-9a04d6ca976b // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -8,6 +8,8 @@ git.openprivacy.ca/openprivacy/connectivity v1.11.0 h1:roASjaFtQLu+HdH5fa2wx6F00
|
|||
git.openprivacy.ca/openprivacy/connectivity v1.11.0/go.mod h1:OQO1+7OIz/jLxDrorEMzvZA6SEbpbDyLGpjoFqT3z1Y=
|
||||
git.openprivacy.ca/openprivacy/log v1.0.3 h1:E/PMm4LY+Q9s3aDpfySfEDq/vYQontlvNj/scrPaga0=
|
||||
git.openprivacy.ca/openprivacy/log v1.0.3/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
|
||||
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
|
|
|
@ -20,9 +20,6 @@ const (
|
|||
// LegacyGroupZone for attributes related to legacy group experiment
|
||||
LegacyGroupZone = Zone("legacygroup")
|
||||
|
||||
// ConversationZone for attributes related to structure of the conversation
|
||||
ConversationZone = Zone("conversation")
|
||||
|
||||
// FilesharingZone for attributes related to file sharing
|
||||
FilesharingZone = Zone("filesharing")
|
||||
|
||||
|
@ -68,8 +65,6 @@ func ParseZone(path string) (Zone, string) {
|
|||
return ServerKeyZone, parts[1]
|
||||
case ServerZone:
|
||||
return ServerZone, parts[1]
|
||||
case ConversationZone:
|
||||
return ConversationZone, parts[1]
|
||||
default:
|
||||
return UnknownZone, parts[1]
|
||||
}
|
||||
|
|
|
@ -58,7 +58,6 @@ const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"
|
|||
const AttrLastConnectionTime = "last-connection-time"
|
||||
const PeerAutostart = "autostart"
|
||||
const PeerAppearOffline = "appear-offline"
|
||||
const PrivateName = "private-name"
|
||||
const Archived = "archived"
|
||||
|
||||
const ProfileStatus = "profile-status"
|
||||
|
@ -72,4 +71,3 @@ const Description = "description"
|
|||
// Used to store the status of acl migrations
|
||||
const ACLVersion = "acl-version"
|
||||
const ACLVersionOne = "acl-v1"
|
||||
const ACLVersionTwo = "acl-v2"
|
||||
|
|
|
@ -1,4 +0,0 @@
|
|||
package constants
|
||||
|
||||
const CHANNEL_CHAT = 0
|
||||
const CHANNEL_MANAGER = 2
|
|
@ -19,6 +19,3 @@ var AutoDLFileExts = [...]string{".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp
|
|||
|
||||
// BlodeuweddExperiment enables the Blodeuwedd Assistant
|
||||
const BlodeuweddExperiment = "blodeuwedd"
|
||||
|
||||
// Enables the Hybrid Group Manager Extension
|
||||
const GroupManagerExperiment = "group-manager"
|
||||
|
|
|
@ -4,8 +4,6 @@ import (
|
|||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"time"
|
||||
)
|
||||
|
@ -25,8 +23,6 @@ type AccessControl struct {
|
|||
// Extension Related Permissions
|
||||
ShareFiles bool // Allows a handle to share files to a conversation
|
||||
RenderImages bool // Indicates that certain filetypes should be autodownloaded and rendered when shared by this contact
|
||||
|
||||
ManageGroup bool // Allows this conversation to be managed by hybrid groups
|
||||
}
|
||||
|
||||
// DefaultP2PAccessControl defaults to a semi-trusted peer with no access to special extensions.
|
||||
|
@ -35,14 +31,6 @@ func DefaultP2PAccessControl() AccessControl {
|
|||
AutoConnect: true, ShareFiles: false, RenderImages: false}
|
||||
}
|
||||
|
||||
// NoAccessControl defaults to a none-trusted peer with no access to special extensions.
|
||||
// This is used as a fall back (if due to a software glitch a contact was setup without an access control, or for
|
||||
// special contacts that should never be involvedt in external networking e.g. notes-to-self, or managed peers)
|
||||
func NoAccessControl() AccessControl {
|
||||
return AccessControl{Read: false, Append: false, ExchangeAttributes: false, Blocked: false,
|
||||
AutoConnect: false, ShareFiles: false, RenderImages: false}
|
||||
}
|
||||
|
||||
// AccessControlList represents an access control list for a conversation. Mapping handles to conversation
|
||||
// functions
|
||||
type AccessControlList map[string]AccessControl
|
||||
|
@ -71,12 +59,8 @@ func (a *Attributes) Serialize() []byte {
|
|||
|
||||
// DeserializeAttributes converts a JSON struct into an Attributes map
|
||||
func DeserializeAttributes(data []byte) Attributes {
|
||||
attributes := make(Attributes)
|
||||
err := json.Unmarshal(data, &attributes)
|
||||
if err != nil {
|
||||
log.Error("error deserializing attributes (this is likely a programming error): %v", err)
|
||||
return make(Attributes)
|
||||
}
|
||||
var attributes Attributes
|
||||
json.Unmarshal(data, &attributes)
|
||||
return attributes
|
||||
}
|
||||
|
||||
|
@ -107,28 +91,8 @@ func (ci *Conversation) GetPeerAC() AccessControl {
|
|||
if acl, exists := ci.ACL[ci.Handle]; exists {
|
||||
return acl
|
||||
}
|
||||
log.Errorf("attempted to access a Peer Access Control object from %v but peer ACL is undefined. This is likely a programming error - fallback to a NoAccess AC", ci.Handle)
|
||||
return NoAccessControl()
|
||||
}
|
||||
|
||||
// HasChannel returns true if the requested channel has been setup for this conversation
|
||||
func (ci *Conversation) HasChannel(requestedChannel int) bool {
|
||||
if requestedChannel == 0 {
|
||||
return true
|
||||
}
|
||||
if requestedChannel == 1 {
|
||||
return false // channel 1 is mapped to channel 0 for backwards compatibility
|
||||
}
|
||||
key := fmt.Sprintf("channel.%d", requestedChannel)
|
||||
if value, exists := ci.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ConversationZone.ConstructZonedPath(key)).ToString()]; exists {
|
||||
return value == constants.True
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsCwtchPeer is a helper attribute that identifies whether a conversation is a cwtch peer
|
||||
func (ci *Conversation) IsCwtchPeer() bool {
|
||||
return tor.IsValidHostname(ci.Handle)
|
||||
log.Errorf("attempted to access a Peer Access Control object from %v but peer ACL is undefined. This is likely a programming error", ci.Handle)
|
||||
return DefaultP2PAccessControl()
|
||||
}
|
||||
|
||||
// IsGroup is a helper attribute that identifies whether a conversation is a legacy group
|
||||
|
|
|
@ -6,13 +6,13 @@ import "sync"
|
|||
// examples of experiments include File Sharing, Profile Images and Groups.
|
||||
type Experiments struct {
|
||||
enabled bool
|
||||
experiments *sync.Map
|
||||
experiments sync.Map
|
||||
}
|
||||
|
||||
// InitExperiments encapsulates a set of experiments separate from their storage in GlobalSettings.
|
||||
func InitExperiments(enabled bool, experiments map[string]bool) Experiments {
|
||||
|
||||
syncExperiments := new(sync.Map)
|
||||
var syncExperiments sync.Map
|
||||
for experiment, set := range experiments {
|
||||
syncExperiments.Store(experiment, set)
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package model
|
|||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// CalculateContentHash derives a hash using the author and the message body. It is intended to be
|
||||
|
@ -13,13 +12,3 @@ func CalculateContentHash(author string, messageBody string) string {
|
|||
contentBasedHash := sha256.Sum256(content)
|
||||
return base64.StdEncoding.EncodeToString(contentBasedHash[:])
|
||||
}
|
||||
|
||||
func DeserializeMessage(message string) (*MessageWrapper, error) {
|
||||
var cm MessageWrapper
|
||||
err := json.Unmarshal([]byte(message), &cm)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &cm, err
|
||||
}
|
||||
|
|
|
@ -1,41 +1,9 @@
|
|||
package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// MessageWrapper is the canonical Cwtch overlay wrapper
|
||||
type MessageWrapper struct {
|
||||
Overlay int `json:"o"`
|
||||
Data string `json:"d"`
|
||||
|
||||
// when the data was assembled
|
||||
SendTime *time.Time `json:"s,omitempty"`
|
||||
|
||||
// when the data was transmitted (by protocol engine e.g. over Tor)
|
||||
TransitTime *time.Time `json:"t,omitempty"`
|
||||
|
||||
// when the data was received
|
||||
RecvTime *time.Time `json:"r,omitempty"`
|
||||
}
|
||||
|
||||
// Channel is defined as being the last 3 bits of the overlay id
|
||||
// Channel 0 is reserved for the main conversation
|
||||
// Channel 2 is reserved for conversation admin (managed groups)
|
||||
// Channel 7 is reserved for streams (no ack, no store)
|
||||
func (mw MessageWrapper) Channel() int {
|
||||
// 1024 / 0x400 is the start of new channel overlays
|
||||
if mw.Overlay > 1024 {
|
||||
return mw.Overlay & 0x07
|
||||
}
|
||||
// for backward compatibilty all overlays less than 0x400 i.e. 1024 are
|
||||
// mapped to channel 0 regardless of their channel status.
|
||||
return 0
|
||||
}
|
||||
|
||||
// If Overlay is a Stream Message it should not be ackd, or stored.
|
||||
func (mw MessageWrapper) IsStream() bool {
|
||||
return mw.Channel() == 0x07
|
||||
}
|
||||
|
||||
// OverlayChat is the canonical identifier for chat overlays
|
||||
|
@ -49,6 +17,3 @@ const OverlayInviteGroup = 101
|
|||
|
||||
// OverlayFileSharing is the canonical identifier for the file sharing overlay
|
||||
const OverlayFileSharing = 200
|
||||
|
||||
// ManageGroupEvent is the canonical identifier for the manage group overlay
|
||||
const OverlayManageGroupEvent = 0x402
|
||||
|
|
|
@ -54,10 +54,12 @@ const MaxGroupMessageLength = 1800
|
|||
|
||||
func getRandomness(arr *[]byte) {
|
||||
if _, err := io.ReadFull(rand.Reader, (*arr)[:]); err != nil {
|
||||
if err != nil {
|
||||
// If we can't do randomness, just crash something is very very wrong and we are not going
|
||||
// to resolve it here....
|
||||
panic(err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateRandomID generates a random 16 byte hex id code
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/protocol/groups"
|
||||
"cwtch.im/cwtch/settings"
|
||||
|
@ -27,6 +26,7 @@ import (
|
|||
"golang.org/x/crypto/ed25519"
|
||||
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
|
@ -311,16 +311,15 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
|
|||
authorizations := make(map[string]model.Authorization)
|
||||
for _, conversation := range conversations {
|
||||
|
||||
// Only perform the following actions for Peer-type Conversaions...
|
||||
if conversation.IsCwtchPeer() {
|
||||
// if this profile does not have an ACL version, and the profile is accepted (OR the acl version is v1 and the profile is accepted...)
|
||||
// then migrate the permissions to the v2 ACL
|
||||
if tor.IsValidHostname(conversation.Handle) {
|
||||
|
||||
// if this profile does not have an ACL version, and the profile is accepted, then migrate
|
||||
// the permissions to the v1 ACL
|
||||
// migrate the old accepted AC to a new fine-grained one
|
||||
// we only do this for previously trusted connections
|
||||
// NOTE: this does not supercede global cwthch experiments settings
|
||||
// if share files is turned off globally then acl.ShareFiles will be ignored
|
||||
// Note: There was a bug in the original EP code that meant that some acl-v1 profiles did not get ShareFiles or RenderImages - this corrects that.
|
||||
if version, exists := conversation.GetAttribute(attr.LocalScope, attr.ProfileZone, constants.ACLVersion); !exists || version == constants.ACLVersionOne {
|
||||
// if share files is turned off globally then acl.ShareFiles will be ignored.
|
||||
if _, exists := conversation.GetAttribute(attr.LocalScope, attr.ProfileZone, constants.ACLVersion); !exists {
|
||||
if conversation.Accepted {
|
||||
if ac, exists := conversation.ACL[conversation.Handle]; exists {
|
||||
ac.ShareFiles = true
|
||||
|
@ -331,7 +330,7 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
|
|||
}
|
||||
|
||||
// Update the ACL Version
|
||||
cp.storage.SetConversationAttribute(conversation.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionTwo)
|
||||
cp.storage.SetConversationAttribute(conversation.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne)
|
||||
// Store the updated ACL
|
||||
cp.storage.SetConversationACL(conversation.ID, conversation.ACL)
|
||||
}
|
||||
|
@ -436,19 +435,12 @@ func (cp *cwtchPeer) SendMessage(conversation int, message string) (int, error)
|
|||
if tor.IsValidHostname(conversationInfo.Handle) {
|
||||
ev := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: conversationInfo.Handle, event.Data: message})
|
||||
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
|
||||
id := -1
|
||||
|
||||
// check if we should store this message locally...
|
||||
if cm, err := model.DeserializeMessage(message); err == nil {
|
||||
if !cm.IsStream() {
|
||||
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
|
||||
id, err = cp.storage.InsertMessage(conversationInfo.ID, cm.Channel(), message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
|
||||
id, err := cp.storage.InsertMessage(conversationInfo.ID, 0, message, model.Attributes{constants.AttrAuthor: string(onion), constants.AttrAck: event.False, constants.AttrSentTimestamp: time.Now().Format(time.RFC3339Nano)}, ev.EventID, model.CalculateContentHash(string(onion), message))
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cp.eventBus.Publish(ev)
|
||||
return id, nil
|
||||
} else {
|
||||
|
@ -708,14 +700,6 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (int, error) {
|
|||
return groupConversationID, err
|
||||
}
|
||||
|
||||
// NewConversation create a new multi-peer conversation.
|
||||
func (cp *cwtchPeer) NewConversation(handle string, acl model.AccessControlList) (int, error) {
|
||||
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, acl, true)
|
||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne)
|
||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano))
|
||||
return conversationID, err
|
||||
}
|
||||
|
||||
// NewContactConversation create a new p2p conversation with the given acl applied to the handle.
|
||||
func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error) {
|
||||
cp.mutex.Lock()
|
||||
|
@ -723,18 +707,8 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
|
|||
conversationInfo, _ := cp.storage.GetConversationByHandle(handle)
|
||||
if conversationInfo == nil {
|
||||
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted)
|
||||
if err != nil {
|
||||
log.Errorf("unable to create a new contact conversation: %v", err)
|
||||
return -1, err
|
||||
}
|
||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionOne)
|
||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano))
|
||||
if accepted {
|
||||
// If this call came from a trusted action (i.e. import bundle or accept button then accept the conversation)
|
||||
// This assigns all permissions (and in v2 is currently the default state of trusted contacts)
|
||||
// Accept conversation does PeerWithOnion
|
||||
cp.AcceptConversation(conversationID)
|
||||
}
|
||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.ACLVersion)), constants.ACLVersionTwo)
|
||||
cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle}))
|
||||
return conversationID, err
|
||||
}
|
||||
|
@ -906,26 +880,6 @@ func (cp *cwtchPeer) GetConversationAttribute(id int, path attr.ScopedZonedPath)
|
|||
return val, nil
|
||||
}
|
||||
|
||||
// SetConversationAttributeInt sets the conversation attribute at path to an integer value
|
||||
func (cp *cwtchPeer) SetConversationAttributeInt(id int, path attr.ScopedZonedPath, value int) error {
|
||||
strvalue := strconv.Itoa(value)
|
||||
return cp.storage.SetConversationAttribute(id, path, strvalue)
|
||||
}
|
||||
|
||||
// GetConversationAttributeInt is a method for retrieving an integer value of a given conversation
|
||||
func (cp *cwtchPeer) GetConversationAttributeInt(id int, path attr.ScopedZonedPath) (int, error) {
|
||||
ci, err := cp.storage.GetConversation(id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
val, exists := ci.Attributes[path.ToString()]
|
||||
if !exists {
|
||||
return 0, fmt.Errorf("%v does not exist for conversation %v", path.ToString(), id)
|
||||
}
|
||||
intvalue, err := strconv.Atoi(val)
|
||||
return intvalue, err
|
||||
}
|
||||
|
||||
// GetChannelMessage returns a message from a conversation channel referenced by the absolute ID.
|
||||
// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position
|
||||
// in the table (e.g. deleted messages, expired messages, etc.)
|
||||
|
@ -1176,12 +1130,6 @@ func (cp *cwtchPeer) DisconnectFromServer(onion string) {
|
|||
// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
|
||||
// Status: Ready for 1.10
|
||||
func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) {
|
||||
// don't queue peers that do not have an onion addresses as a handle
|
||||
// TODO establish a more robust separation between peerable contacts and
|
||||
// facade contacts (managed groups, notes-to-self etc.)
|
||||
if !tor.IsValidHostname(handle) {
|
||||
return
|
||||
}
|
||||
ci, err := cp.FetchConversationInfo(handle)
|
||||
if err == nil {
|
||||
lastSeen := cp.GetConversationLastSeenTime(ci.ID)
|
||||
|
@ -1303,9 +1251,9 @@ func (cp *cwtchPeer) ImportBundle(importString string) error {
|
|||
return ConstructResponse(constants.ImportBundlePrefix, "success")
|
||||
} else if tor.IsValidHostname(importString) {
|
||||
_, err := cp.NewContactConversation(importString, model.DefaultP2PAccessControl(), true)
|
||||
// NOTE: Not NewContactConversation implictly does AcceptConversation AND PeerWithOnion if relevant so
|
||||
// we no longer need to do it here...
|
||||
if err == nil {
|
||||
// Assuming all is good, we should peer with this contact.
|
||||
cp.PeerWithOnion(importString)
|
||||
return ConstructResponse(constants.ImportBundlePrefix, "success")
|
||||
}
|
||||
return ConstructResponse(constants.ImportBundlePrefix, err.Error())
|
||||
|
@ -1534,22 +1482,9 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
|
|||
}
|
||||
}
|
||||
|
||||
// Don't store messages in channel 7
|
||||
channel := 0
|
||||
if cm, err := model.DeserializeMessage(message); err == nil {
|
||||
if cm.IsStream() {
|
||||
return -1, nil
|
||||
}
|
||||
channel = cm.Channel()
|
||||
}
|
||||
|
||||
// Generate a random number and use it as the signature
|
||||
signature := event.GetRandNumber().String()
|
||||
return cp.storage.InsertMessage(ci.ID, channel, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) InitChannel(conversation int, channel int) error {
|
||||
return cp.storage.InitChannelOnConversation(conversation, channel)
|
||||
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
|
||||
}
|
||||
|
||||
// eventHandler process events from other subsystems
|
||||
|
@ -1564,7 +1499,6 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
onion, _ := cp.storage.LoadProfileKeyValue(TypeAttribute, attr.PublicScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Onion)).ToString())
|
||||
log.Infof("Protocol engine for %s has stopped listening: %v", onion, ev.Data[event.Error])
|
||||
cp.mutex.Unlock()
|
||||
cp.processExtensionsEvent(ev)
|
||||
case event.EncryptedGroupMessage:
|
||||
|
||||
// If successful, a side effect is the message is added to the group's timeline
|
||||
|
@ -1609,7 +1543,6 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
case event.NewMessageFromPeerEngine: //event.TimestampReceived, event.RemotePeer, event.Data
|
||||
ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
||||
id, err := cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
|
||||
cp.processExtensionsEvent(ev)
|
||||
if err == nil {
|
||||
// Republish as NewMessageFromPeer
|
||||
ev.EventType = event.NewMessageFromPeer
|
||||
|
@ -1653,6 +1586,7 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
conversationInfo, err := cp.FetchConversationInfo(onion)
|
||||
|
||||
log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err)
|
||||
// only accepted contacts can look up information
|
||||
if conversationInfo != nil && conversationInfo.GetPeerAC().ExchangeAttributes {
|
||||
// Type Safe Scoped/Zoned Path
|
||||
zscope := attr.IntoScope(scope)
|
||||
|
@ -1725,7 +1659,6 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
|
||||
timestamp := time.Now().Format(time.RFC3339Nano)
|
||||
cp.SetConversationAttribute(cid, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp)
|
||||
|
||||
} else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED {
|
||||
ci, err := cp.FetchConversationInfo(handle)
|
||||
if err == nil {
|
||||
|
@ -1740,7 +1673,19 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
}
|
||||
|
||||
// Safe Access to Extensions
|
||||
cp.processExtensionsEvent(ev)
|
||||
cp.extensionLock.Lock()
|
||||
for _, extension := range cp.extensions {
|
||||
log.Debugf("checking extension...%v", extension)
|
||||
// check if the current map of experiments satisfies the extension requirements
|
||||
if !cp.checkExtensionExperiment(extension) {
|
||||
log.Debugf("skipping extension (%s) ..not all experiments satisfied", extension)
|
||||
continue
|
||||
}
|
||||
if cp.checkEventExperiment(extension, ev.EventType) {
|
||||
extension.extension.OnEvent(ev, cp)
|
||||
}
|
||||
}
|
||||
cp.extensionLock.Unlock()
|
||||
|
||||
case event.ServerStateChange:
|
||||
cp.mutex.Lock()
|
||||
|
@ -1879,21 +1824,6 @@ func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversat
|
|||
return err
|
||||
}
|
||||
|
||||
// finds a message by a signature by searching across all possible channels returns a channel and a message id
|
||||
func (cp *cwtchPeer) findChannelMessageBySignature(ci *model.Conversation, signature string) (int, int, error) {
|
||||
id, err := cp.GetChannelMessageBySignature(ci.ID, constants.CHANNEL_CHAT, signature)
|
||||
if err == nil {
|
||||
return constants.CHANNEL_CHAT, id, nil
|
||||
}
|
||||
if ci.HasChannel(constants.CHANNEL_MANAGER) {
|
||||
id, err = cp.GetChannelMessageBySignature(ci.ID, constants.CHANNEL_MANAGER, signature)
|
||||
if err == nil {
|
||||
return constants.CHANNEL_MANAGER, id, nil
|
||||
}
|
||||
}
|
||||
return -1, -1, err
|
||||
}
|
||||
|
||||
// attemptAcknowledgeP2PConversation is a convenience method that looks up the conversation
|
||||
// by the given handle and attempts to mark the message as acknowledged. returns error on failure
|
||||
// to either find the contact or the associated message
|
||||
|
@ -1901,19 +1831,21 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
|
|||
ci, err := cp.FetchConversationInfo(handle)
|
||||
// We should *never* received a peer acknowledgement for a conversation that doesn't exist...
|
||||
if ci != nil && err == nil {
|
||||
chid, mid, err := cp.findChannelMessageBySignature(ci, signature)
|
||||
// for p2p messages the randomly generated event ID is the "signature"
|
||||
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature)
|
||||
if err == nil {
|
||||
_, attributes, err := cp.GetChannelMessage(ci.ID, chid, mid)
|
||||
_, attributes, err := cp.GetChannelMessage(ci.ID, 0, id)
|
||||
if err == nil {
|
||||
cp.mutex.Lock()
|
||||
attributes[constants.AttrAck] = constants.True
|
||||
cp.mutex.Unlock()
|
||||
cp.storage.UpdateMessageAttributes(ci.ID, chid, mid, attributes)
|
||||
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Channel: strconv.Itoa(chid), event.Index: strconv.Itoa(mid)}))
|
||||
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attributes)
|
||||
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: handle, event.Index: strconv.Itoa(id)}))
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("no such signature error: %x", signature)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -1927,16 +1859,16 @@ func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature st
|
|||
// We should *never* received an error for a conversation that doesn't exist...
|
||||
if ci != nil && err == nil {
|
||||
// "signature" here is event ID for peer messages...
|
||||
chid, mid, err := cp.findChannelMessageBySignature(ci, signature)
|
||||
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature)
|
||||
if err == nil {
|
||||
_, attributes, err := cp.GetChannelMessage(ci.ID, chid, mid)
|
||||
_, attributes, err := cp.GetChannelMessage(ci.ID, 0, id)
|
||||
if err == nil {
|
||||
cp.mutex.Lock()
|
||||
attributes[constants.AttrErr] = constants.True
|
||||
cp.storage.UpdateMessageAttributes(ci.ID, chid, mid, attributes)
|
||||
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attributes)
|
||||
cp.mutex.Unlock()
|
||||
// Send a generic indexed failure...
|
||||
cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.Handle: handle, event.Error: error, event.Channel: strconv.Itoa(chid), event.Index: strconv.Itoa(mid)}))
|
||||
cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.Handle: handle, event.Error: error, event.Index: strconv.Itoa(id)}))
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
|
@ -1975,45 +1907,3 @@ func (cp *cwtchPeer) constructGroupFromConversation(conversationInfo *model.Conv
|
|||
}
|
||||
return &group, nil
|
||||
}
|
||||
|
||||
// Insert a message directly into a conversation-channel, with the given attributes.
|
||||
// NOTE: This should only be used by cwtch-extensions. Regular API uses should use more direct methods
|
||||
// like SendMessage
|
||||
func (cp *cwtchPeer) InternalInsertMessage(conversation int, channel int, author string, body string, attrs model.Attributes, signature []byte) (int, error) {
|
||||
signatureB64 := base64.StdEncoding.EncodeToString(signature)
|
||||
return cp.storage.InsertMessage(conversation, channel, body, attrs, signatureB64, model.CalculateContentHash(author, body))
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) SignMessage(blob []byte) ([]byte, error) {
|
||||
privateKey, err := cp.storage.LoadProfileKeyValue(TypePrivateKey, "Ed25519PrivateKey")
|
||||
if err != nil {
|
||||
log.Errorf("error loading private key from storage")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
publicKey, err := cp.storage.LoadProfileKeyValue(TypePublicKey, "Ed25519PublicKey")
|
||||
if err != nil {
|
||||
log.Errorf("error loading public key from storage")
|
||||
return nil, err
|
||||
}
|
||||
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
|
||||
return identity.Sign(blob), nil
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) processExtensionsEvent(ev event.Event) {
|
||||
cp.extensionLock.Lock()
|
||||
for _, extension := range cp.extensions {
|
||||
|
||||
// check if the current map of experiments satisfies the extension requirements
|
||||
if !cp.checkExtensionExperiment(extension) {
|
||||
log.Debugf("skipping extension (%v) ..not all experiments satisfied", extension)
|
||||
continue
|
||||
}
|
||||
|
||||
// if the extension is registered for this event type then process
|
||||
if _, contains := extension.events[ev.EventType]; contains {
|
||||
extension.extension.OnEvent(ev, cp)
|
||||
}
|
||||
}
|
||||
cp.extensionLock.Unlock()
|
||||
}
|
||||
|
|
|
@ -3,21 +3,19 @@ package peer
|
|||
import (
|
||||
"archive/tar"
|
||||
"compress/gzip"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
)
|
||||
|
||||
// StorageKeyType is an interface wrapper around storage key types
|
||||
|
@ -89,7 +87,7 @@ const setConversationACLSQLStmt = `update conversations set ACL=(?) where ID=(?)
|
|||
const deleteConversationSQLStmt = `delete from conversations where ID=(?);`
|
||||
|
||||
// createTableConversationMessagesSQLStmt is a template for creating conversation based tables...
|
||||
const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_%d_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime, Signature text unique, ContentHash blob text);`
|
||||
const createTableConversationMessagesSQLStmt = `create table if not exists channel_%d_0_chat (ID integer unique primary key autoincrement, Body text, Attributes []byte, Expiry datetime, Signature text unique, ContentHash blob text);`
|
||||
|
||||
// insertMessageIntoConversationSQLStmt is a template for creating conversation based tables...
|
||||
const insertMessageIntoConversationSQLStmt = `insert into channel_%d_%d_chat (Body, Attributes, Signature, ContentHash) values(?,?,?,?);`
|
||||
|
@ -326,7 +324,7 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.
|
|||
return -1, tx.Rollback()
|
||||
}
|
||||
|
||||
result, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id, 0))
|
||||
result, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id))
|
||||
if err != nil {
|
||||
log.Errorf("error executing transaction: %v", err)
|
||||
return -1, tx.Rollback()
|
||||
|
@ -347,27 +345,6 @@ func (cps *CwtchProfileStorage) NewConversation(handle string, attributes model.
|
|||
return int(conversationID), nil
|
||||
}
|
||||
|
||||
func (cps *CwtchProfileStorage) InitChannelOnConversation(id int, channel int) error {
|
||||
cps.mutex.Lock()
|
||||
defer cps.mutex.Unlock()
|
||||
tx, err := cps.db.Begin()
|
||||
if err != nil {
|
||||
log.Errorf("error executing transaction: %v", err)
|
||||
return tx.Rollback()
|
||||
}
|
||||
_, err = tx.Exec(fmt.Sprintf(createTableConversationMessagesSQLStmt, id, channel))
|
||||
if err != nil {
|
||||
log.Errorf("error executing transaction: %v", err)
|
||||
return tx.Rollback()
|
||||
}
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
log.Errorf("error executing transaction: %v", err)
|
||||
return tx.Rollback()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetConversationByHandle is a convenience method to fetch an active conversation by a handle
|
||||
// Usage Notes: This should **only** be used to look up p2p conversations by convention.
|
||||
// Ideally this function should not exist, and all lookups should happen by ID (this is currently
|
||||
|
@ -866,8 +843,7 @@ func (cps *CwtchProfileStorage) PurgeConversationChannel(conversation int, chann
|
|||
defer cps.mutex.Unlock()
|
||||
conversationStmt, err := cps.db.Prepare(fmt.Sprintf(purgeMessagesFromConversationSQLStmt, conversation, channel))
|
||||
if err != nil {
|
||||
// Not all tables exist so this error can be spurious. Instead of logging here
|
||||
// we will delegate handling to the caller...
|
||||
log.Errorf("error executing transaction: %v", err)
|
||||
return err
|
||||
}
|
||||
conversationStmt.Exec()
|
||||
|
@ -891,7 +867,7 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() {
|
|||
if err == nil {
|
||||
for _, conversation := range ci {
|
||||
// unless this is a server or a group...for which we default save always (for legacy reasons)
|
||||
// Note: Hybrid Groups are Peers so this check does not apply...
|
||||
// FIXME: revisit this for hybrid groups.
|
||||
if !conversation.IsGroup() && !conversation.IsServer() {
|
||||
// Note that we only check for confirmed status here...if it is set to any other value we will fallthrough to the default.
|
||||
saveHistoryConfirmed := conversation.Attributes[attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(event.SaveHistoryKey)).ToString()] == event.SaveHistoryConfirmed
|
||||
|
@ -902,10 +878,7 @@ func (cps *CwtchProfileStorage) PurgeNonSavedMessages() {
|
|||
if deleteHistoryConfirmed || (!saveHistoryConfirmed && !defaultSave) {
|
||||
log.Debugf("purging conversation...")
|
||||
// TODO: At some point in the future this needs to iterate over channels and make a decision for each on..
|
||||
// TODO: We need a way to differentiate between channels that don't exist, and database errors
|
||||
// this should probably incorporate higher level logic (e.g. has channel...)
|
||||
cps.PurgeConversationChannel(conversation.ID, constants.CHANNEL_CHAT)
|
||||
cps.PurgeConversationChannel(conversation.ID, constants.CHANNEL_MANAGER)
|
||||
cps.PurgeConversationChannel(conversation.ID, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,8 +50,6 @@ type ModifyServers interface {
|
|||
|
||||
// SendMessages enables a caller to sender messages to a contact
|
||||
type SendMessages interface {
|
||||
// SendMessage sends a raw message to the conversation.
|
||||
// SendMessage is a deprecated public API. Use EnhancedSendMessage instead
|
||||
SendMessage(conversation int, message string) (int, error)
|
||||
|
||||
// EnhancedSendMessage Attempts to Send a Message and Immediately Attempts to Lookup the Message in the Database
|
||||
|
@ -117,8 +115,6 @@ type CwtchPeer interface {
|
|||
EnhancedImportBundle(string) string
|
||||
|
||||
// New Unified Conversation Interfaces
|
||||
NewConversation(handle string, acl model.AccessControlList) (int, error)
|
||||
InitChannel(conversation int, channel int) error
|
||||
NewContactConversation(handle string, acl model.AccessControl, accepted bool) (int, error)
|
||||
FetchConversations() ([]*model.Conversation, error)
|
||||
ArchiveConversation(conversation int)
|
||||
|
@ -139,15 +135,12 @@ type CwtchPeer interface {
|
|||
|
||||
SetConversationAttribute(conversation int, path attr.ScopedZonedPath, value string) error
|
||||
GetConversationAttribute(conversation int, path attr.ScopedZonedPath) (string, error)
|
||||
SetConversationAttributeInt(conversation int, path attr.ScopedZonedPath, value int) error
|
||||
GetConversationAttributeInt(conversation int, path attr.ScopedZonedPath) (int, error)
|
||||
DeleteConversation(conversation int) error
|
||||
|
||||
// New Unified Conversation Channel Interfaces
|
||||
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
|
||||
GetChannelMessageCount(conversation int, channel int) (int, error)
|
||||
GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error)
|
||||
GetChannelMessageBySignature(conversationID int, channelID int, signature string) (int, error)
|
||||
GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error)
|
||||
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
|
||||
SearchConversations(pattern string) string
|
||||
|
@ -175,10 +168,6 @@ type CwtchPeer interface {
|
|||
UpdateExperiments(enabled bool, experiments map[string]bool)
|
||||
NotifySettingsUpdate(settings settings.GlobalSettings)
|
||||
IsFeatureEnabled(featureName string) bool
|
||||
SignMessage(blob []byte) ([]byte, error)
|
||||
|
||||
// Used for Internal Bookkeeping by Extensions, **do not expose in autobindings**
|
||||
InternalInsertMessage(conversation int, channel int, author string, body string, attributes model.Attributes, signature []byte) (int, error)
|
||||
}
|
||||
|
||||
// EnhancedMessage wraps a Cwtch model.Message with some additional data to reduce calls from the UI.
|
||||
|
|
|
@ -749,16 +749,6 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
|
|||
// Fall through handler for the default text conversation.
|
||||
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeerEngine, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
|
||||
|
||||
// Don't ack messages in channel 7
|
||||
// Note: this code explictly doesn't care about malformed messages, we deal with them
|
||||
// later on...we still want to ack the original send...(as some "malformed" messages
|
||||
// may be future-ok)
|
||||
if cm, err := model.DeserializeMessage(string(message)); err == nil {
|
||||
if cm.IsStream() {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Send an explicit acknowledgement
|
||||
// Every other protocol should have an explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow
|
||||
if err := e.sendPeerMessage(hostname, pmodel.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil {
|
||||
|
|
|
@ -2,14 +2,12 @@ package connections
|
|||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
model2 "cwtch.im/cwtch/protocol/model"
|
||||
"encoding/json"
|
||||
"git.openprivacy.ca/cwtch.im/tapir"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
const cwtchCapability = tapir.Capability("cwtchCapability")
|
||||
|
@ -135,14 +133,6 @@ func (pa *PeerApp) listen() {
|
|||
pa.version.Store(Version2)
|
||||
}
|
||||
} else {
|
||||
if cm, err := model.DeserializeMessage(string(packet.Data)); err == nil {
|
||||
if cm.TransitTime != nil {
|
||||
rt := time.Now().UTC()
|
||||
cm.RecvTime = &rt
|
||||
data, _ := json.Marshal(cm)
|
||||
packet.Data = data
|
||||
}
|
||||
}
|
||||
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, packet.Data)
|
||||
}
|
||||
}
|
||||
|
@ -158,15 +148,6 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
|
|||
var serialized []byte
|
||||
var err error
|
||||
|
||||
if cm, err := model.DeserializeMessage(string(message.Data)); err == nil {
|
||||
if cm.SendTime != nil {
|
||||
tt := time.Now().UTC()
|
||||
cm.TransitTime = &tt
|
||||
data, _ := json.Marshal(cm)
|
||||
message.Data = data
|
||||
}
|
||||
}
|
||||
|
||||
if pa.version.Load() == Version2 {
|
||||
// treat data as a pre-serialized string, not as a byte array (which will be base64 encoded and bloat the packet size)
|
||||
serialized = message.Serialize()
|
||||
|
|
|
@ -178,7 +178,7 @@ func TestFileSharing(t *testing.T) {
|
|||
// testBobDownloadFile(t, bob, filesharingFunctionality, queueOracle)
|
||||
|
||||
// Wait for say...
|
||||
time.Sleep(30 * time.Second)
|
||||
time.Sleep(10 * time.Second)
|
||||
|
||||
if _, err := os.Stat(path.Join(settings.DownloadPath, "cwtch.png")); errors.Is(err, os.ErrNotExist) {
|
||||
// path/to/whatever does not exist
|
||||
|
|
|
@ -34,6 +34,26 @@ var (
|
|||
carolLines = []string{"Howdy, thanks!"}
|
||||
)
|
||||
|
||||
func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
|
||||
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
|
||||
for {
|
||||
log.Infof("%v checking connection...\n", peerName)
|
||||
state := peer.GetPeerState(addr)
|
||||
log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state])
|
||||
if state == connections.FAILED {
|
||||
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
|
||||
}
|
||||
if state != target {
|
||||
log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
|
||||
time.Sleep(time.Second * 5)
|
||||
continue
|
||||
} else {
|
||||
log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitForRetVal(peer peer.CwtchPeer, convId int, szp attr.ScopedZonedPath) {
|
||||
for {
|
||||
_, err := peer.GetConversationAttribute(convId, szp)
|
||||
|
@ -216,10 +236,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
t.Fatalf("Alice password did not change...")
|
||||
}
|
||||
|
||||
WaitForConnection(t, alice, bob.GetOnion(), connections.AUTHENTICATED)
|
||||
WaitForConnection(t, alice, carol.GetOnion(), connections.AUTHENTICATED)
|
||||
WaitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED)
|
||||
WaitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED)
|
||||
waitForConnection(t, alice, bob.GetOnion(), connections.AUTHENTICATED)
|
||||
waitForConnection(t, alice, carol.GetOnion(), connections.AUTHENTICATED)
|
||||
waitForConnection(t, bob, alice.GetOnion(), connections.AUTHENTICATED)
|
||||
waitForConnection(t, carol, alice.GetOnion(), connections.AUTHENTICATED)
|
||||
|
||||
log.Infof("Alice and Bob getVal public.name...")
|
||||
|
||||
|
@ -303,9 +323,9 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
}
|
||||
|
||||
log.Infof("Waiting for alice to join server...")
|
||||
WaitForConnection(t, alice, ServerAddr, connections.SYNCED)
|
||||
waitForConnection(t, alice, ServerAddr, connections.SYNCED)
|
||||
log.Infof("Waiting for Bob to join connect to group server...")
|
||||
WaitForConnection(t, bob, ServerAddr, connections.SYNCED)
|
||||
waitForConnection(t, bob, ServerAddr, connections.SYNCED)
|
||||
|
||||
// 1 = Alice
|
||||
// 2 = Server
|
||||
|
@ -331,7 +351,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
if len(cachedTokens) > (usedTokens + len(carolLines)) {
|
||||
carol.StoreCachedTokens(ServerAddr, cachedTokens[usedTokens:usedTokens+len(carolLines)])
|
||||
}
|
||||
WaitForConnection(t, carol, ServerAddr, connections.SYNCED)
|
||||
waitForConnection(t, carol, ServerAddr, connections.SYNCED)
|
||||
numGoRoutinesPostCarolConnect := runtime.NumGoroutine()
|
||||
|
||||
// Check Alice Timeline
|
||||
|
|
|
@ -58,7 +58,7 @@ func TestFileSharing(t *testing.T) {
|
|||
os.RemoveAll("cwtch.out.png")
|
||||
os.RemoveAll("cwtch.out.png.manifest")
|
||||
|
||||
log.SetLevel(log.LevelDebug)
|
||||
log.SetLevel(log.LevelInfo)
|
||||
log.ExcludeFromPattern("tapir")
|
||||
|
||||
os.Mkdir("tordir", 0700)
|
||||
|
@ -151,6 +151,13 @@ func TestFileSharing(t *testing.T) {
|
|||
|
||||
bob.NewContactConversation(alice.GetOnion(), model.DefaultP2PAccessControl(), true)
|
||||
alice.NewContactConversation(bob.GetOnion(), model.DefaultP2PAccessControl(), true)
|
||||
alice.PeerWithOnion(bob.GetOnion())
|
||||
|
||||
t.Logf("Waiting for alice and Bob to peer...")
|
||||
waitForPeerPeerConnection(t, alice, bob)
|
||||
alice.AcceptConversation(1)
|
||||
|
||||
t.Logf("Alice and Bob are Connected!!")
|
||||
|
||||
filesharingFunctionality := filesharing.FunctionalityGate()
|
||||
|
||||
|
@ -160,10 +167,10 @@ func TestFileSharing(t *testing.T) {
|
|||
}
|
||||
|
||||
alice.SendMessage(1, fileSharingMessage)
|
||||
bob.AcceptConversation(1)
|
||||
|
||||
// Ok this is fun...we just Sent a Message we may not have a connection yet...
|
||||
// so this test will only pass if sending offline works...
|
||||
waitForPeerPeerConnection(t, bob, alice)
|
||||
// Wait for the messages to arrive...
|
||||
time.Sleep(time.Second * 10)
|
||||
|
||||
bob.SendMessage(1, "this is a test message")
|
||||
bob.SendMessage(1, "this is another test message")
|
||||
|
|
|
@ -1,214 +0,0 @@
|
|||
package testing
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
mrand "math/rand"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
app2 "cwtch.im/cwtch/app"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/functionality/hybrid"
|
||||
"cwtch.im/cwtch/functionality/inter"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
_ "github.com/mutecomm/go-sqlcipher/v4"
|
||||
)
|
||||
|
||||
func TestHyrbidGroupIntegration(t *testing.T) {
|
||||
|
||||
t.Logf("Starting Hybrid Groups Test")
|
||||
|
||||
os.RemoveAll("./storage")
|
||||
os.RemoveAll("./managerstorage")
|
||||
|
||||
// Goroutine Monitoring Start..
|
||||
numGoRoutinesStart := runtime.NumGoroutine()
|
||||
|
||||
log.AddEverythingFromPattern("connectivity")
|
||||
log.SetLevel(log.LevelInfo)
|
||||
log.ExcludeFromPattern("connection/connection")
|
||||
log.ExcludeFromPattern("outbound/3dhauthchannel")
|
||||
log.ExcludeFromPattern("event/eventmanager")
|
||||
log.ExcludeFromPattern("tapir")
|
||||
|
||||
os.Mkdir("tordir", 0700)
|
||||
dataDir := path.Join("tordir", "tor")
|
||||
os.MkdirAll(dataDir, 0700)
|
||||
|
||||
// we don't need real randomness for the port, just to avoid a possible conflict...
|
||||
socksPort := mrand.Intn(1000) + 9051
|
||||
controlPort := mrand.Intn(1000) + 9052
|
||||
|
||||
// generate a random password
|
||||
key := make([]byte, 64)
|
||||
_, err := rand.Read(key)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
useCache := os.Getenv("TORCACHE") == "true"
|
||||
|
||||
torDataDir := ""
|
||||
if useCache {
|
||||
log.Infof("using tor cache")
|
||||
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
|
||||
os.MkdirAll(torDataDir, 0700)
|
||||
} else {
|
||||
log.Infof("using clean tor data dir")
|
||||
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
|
||||
t.Fatalf("could not create data dir")
|
||||
}
|
||||
}
|
||||
|
||||
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
|
||||
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
|
||||
if err != nil {
|
||||
t.Fatalf("Could not start Tor: %v", err)
|
||||
}
|
||||
log.Infof("Waiting for tor to bootstrap...")
|
||||
acn.WaitTillBootstrapped()
|
||||
defer acn.Close()
|
||||
|
||||
// ***** Cwtch Server management *****
|
||||
|
||||
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"))
|
||||
|
||||
// ***** cwtchPeer setup *****
|
||||
// Turn on Groups Experiment...
|
||||
settings := app.ReadSettings()
|
||||
settings.ExperimentsEnabled = true
|
||||
settings.Experiments[constants.GroupsExperiment] = true
|
||||
settings.Experiments[constants.GroupManagerExperiment] = true
|
||||
app.UpdateSettings(settings)
|
||||
|
||||
alice := MakeProfile(app, "Alice")
|
||||
bob := MakeProfile(app, "Bob")
|
||||
manager := MakeProfile(app, "Manager")
|
||||
|
||||
waitTime := time.Duration(60) * time.Second
|
||||
log.Infof("** Waiting for Alice, Bob, and Carol to register their onion hidden service on the network... (%v)\n", waitTime)
|
||||
time.Sleep(waitTime)
|
||||
log.Infof("** Wait Done!")
|
||||
|
||||
// Ok Lets Start By Creating a Hybrid Group...
|
||||
|
||||
hgmf := hybrid.GroupManagerFunctionality{}
|
||||
ci, err := hgmf.ManageNewGroup(manager)
|
||||
if err != nil {
|
||||
t.Fatalf("could not create hybrid group: %v", err)
|
||||
}
|
||||
log.Infof("created a hybrid group: %d. moving onto adding hybrid contacts...", ci)
|
||||
err = hgmf.AddHybridContact(manager, alice.GetOnion())
|
||||
if err != nil {
|
||||
t.Fatalf("could not create hybrid contact (alice): %v", err)
|
||||
}
|
||||
err = hgmf.AddHybridContact(manager, bob.GetOnion())
|
||||
if err != nil {
|
||||
t.Fatalf("could not create hybrid contact (bob): %v", err)
|
||||
}
|
||||
|
||||
// Now we can allow alice, bob and carol to create a new hybrid group...
|
||||
log.Infof("now we can allow alice bob and carol to join the hybrid group")
|
||||
inter := inter.InterfaceFunctionality{}
|
||||
err = inter.ImportBundle(alice, "managed:"+manager.GetOnion())
|
||||
if err != nil {
|
||||
t.Fatalf("could not create hybrid group contact (carol): %v", err)
|
||||
}
|
||||
alice.PeerWithOnion(manager.GetOnion()) // explictly trigger a peer request
|
||||
err = inter.ImportBundle(bob, "managed:"+manager.GetOnion())
|
||||
if err != nil {
|
||||
t.Fatalf("could not create hybrid group contact (carol): %v", err)
|
||||
}
|
||||
bob.PeerWithOnion(manager.GetOnion())
|
||||
|
||||
log.Infof("waiting for alice and manager to connect")
|
||||
WaitForConnection(t, alice, manager.GetOnion(), connections.AUTHENTICATED)
|
||||
log.Infof("waiting for bob and manager to connect")
|
||||
WaitForConnection(t, bob, manager.GetOnion(), connections.AUTHENTICATED)
|
||||
|
||||
// at this pont we should be able to send messages to the group, and receive them in the timeline
|
||||
log.Infof("sending message to group")
|
||||
_, err = inter.SendMessage(alice, 1, "hello everyone!!!")
|
||||
if err != nil {
|
||||
t.Fatalf("hybrid group sending failed... %v", err)
|
||||
}
|
||||
|
||||
// Note: From this point onwards there are no managed-group specific calls. Everything happens
|
||||
// transparently with respect to the receiver.
|
||||
time.Sleep(time.Second * 10)
|
||||
|
||||
bobMessages, err := bob.GetMostRecentMessages(1, constants.CHANNEL_CHAT, 0, 1)
|
||||
if err != nil || len(bobMessages) != 1 {
|
||||
t.Fatalf("hybrid group receipt failed... %v %v ", err, len(bobMessages))
|
||||
}
|
||||
|
||||
if bobMessages[0].Body != "hello everyone!!!" {
|
||||
t.Fatalf("hybrid group receipt failed...message does not match")
|
||||
}
|
||||
|
||||
aliceMessages, err := alice.GetMostRecentMessages(1, constants.CHANNEL_CHAT, 0, 1)
|
||||
if err != nil || len(aliceMessages) != 1 {
|
||||
t.Fatalf("hybrid group receipt failed... %v", err)
|
||||
}
|
||||
|
||||
if aliceMessages[0].Attr[constants.AttrAck] != constants.True {
|
||||
t.Fatalf("hybrid group receipt failed...alice's message was not ack'd")
|
||||
}
|
||||
|
||||
// Time to Clean Up....
|
||||
log.Infof("Shutting down Alice...")
|
||||
app.ShutdownPeer(alice.GetOnion())
|
||||
time.Sleep(time.Second * 3)
|
||||
|
||||
log.Infof("Shutting down Bob...")
|
||||
app.ShutdownPeer(bob.GetOnion())
|
||||
time.Sleep(time.Second * 3)
|
||||
|
||||
log.Infof("Shutting fown Manager...")
|
||||
app.ShutdownPeer(manager.GetOnion())
|
||||
time.Sleep(time.Second * 3)
|
||||
|
||||
log.Infof("Shutting down apps...")
|
||||
log.Infof("app Shutdown: %v\n", runtime.NumGoroutine())
|
||||
app.Shutdown()
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
log.Infof("Done shutdown: %v\n", runtime.NumGoroutine())
|
||||
|
||||
log.Infof("Shutting down ACN...")
|
||||
acn.Close()
|
||||
time.Sleep(time.Second * 60) // the network status / heartbeat plugin might keep goroutines alive for a minute before killing them
|
||||
|
||||
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
|
||||
|
||||
// Printing out the current goroutines
|
||||
// Very useful if we are leaking any.
|
||||
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||
fmt.Println("")
|
||||
|
||||
if numGoRoutinesStart != numGoRoutinesPostAppShutdown {
|
||||
t.Errorf("Number of GoRoutines at start (%v) does not match number of goRoutines after cleanup of peers and servers (%v), clean up failed, v detected!", numGoRoutinesStart, numGoRoutinesPostAppShutdown)
|
||||
}
|
||||
}
|
||||
|
||||
func MakeProfile(application app2.Application, name string) peer.CwtchPeer {
|
||||
application.CreateProfile(name, "asdfasdf", true)
|
||||
p := app2.WaitGetPeer(application, name)
|
||||
application.ConfigureConnections(p.GetOnion(), true, true, false)
|
||||
log.Infof("%s created: %s", name, p.GetOnion())
|
||||
// bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") <- This is now done automatically by ProfileValueExtension, keeping this here for clarity
|
||||
p.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
|
||||
return p
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
package testing
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
_ "github.com/mutecomm/go-sqlcipher/v4"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func WaitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target connections.ConnectionState) {
|
||||
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
|
||||
for {
|
||||
log.Infof("%v checking connection...\n", peerName)
|
||||
state := peer.GetPeerState(addr)
|
||||
log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state])
|
||||
if state == connections.FAILED {
|
||||
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
|
||||
}
|
||||
if state != target {
|
||||
log.Infof("peer %v %v waiting connect %v, currently: %v\n", peerName, peer.GetOnion(), addr, connections.ConnectionStateName[state])
|
||||
time.Sleep(time.Second * 5)
|
||||
continue
|
||||
} else {
|
||||
log.Infof("peer %v %v CONNECTED to %v\n", peerName, peer.GetOnion(), addr)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue