Initial Prototype of Event Hooks
This commit is contained in:
parent
697b3df54c
commit
26c5c11216
13
app/app.go
13
app/app.go
|
@ -3,6 +3,8 @@ package app
|
|||
import (
|
||||
"cwtch.im/cwtch/app/plugins"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/extensions"
|
||||
"cwtch.im/cwtch/functionality/filesharing"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
|
@ -145,6 +147,7 @@ func (app *application) CreatePeer(name string, password string, attributes map[
|
|||
eventBus := event.NewEventManager()
|
||||
app.eventBuses[profile.GetOnion()] = eventBus
|
||||
profile.Init(app.eventBuses[profile.GetOnion()])
|
||||
app.registerHooks(profile)
|
||||
app.peers[profile.GetOnion()] = profile
|
||||
|
||||
for zp, val := range attributes {
|
||||
|
@ -242,6 +245,13 @@ func (app *application) LoadProfiles(password string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (app *application) registerHooks(profile peer.CwtchPeer) {
|
||||
// Register Hooks
|
||||
profile.RegisterHook(extensions.ProfileValueExtension{})
|
||||
profile.RegisterHook(filesharing.Functionality{})
|
||||
|
||||
}
|
||||
|
||||
// installProfile takes a profile and if it isn't loaded in the app, installs it and returns true
|
||||
func (app *application) installProfile(profile peer.CwtchPeer) bool {
|
||||
app.appmutex.Lock()
|
||||
|
@ -252,8 +262,11 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
|
|||
eventBus := event.NewEventManager()
|
||||
app.eventBuses[profile.GetOnion()] = eventBus
|
||||
profile.Init(app.eventBuses[profile.GetOnion()])
|
||||
app.registerHooks(profile)
|
||||
app.peers[profile.GetOnion()] = profile
|
||||
|
||||
app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory
|
||||
|
||||
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
package extensions
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/model/constants"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// ProfileValueExtension implements custom Profile Names over Cwtch
|
||||
type ProfileValueExtension struct {
|
||||
}
|
||||
|
||||
func (pne ProfileValueExtension) RegisterEvents() []event.Type {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pne ProfileValueExtension) RegisterExperiments() []string {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pne ProfileValueExtension) OnEvent(event event.Event, profile peer.CwtchPeer) {
|
||||
// nop
|
||||
}
|
||||
|
||||
// OnContactReceiveValue for ProfileValueExtension handles saving specific Public Profile Values like Profile Name
|
||||
func (pne ProfileValueExtension) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, szp attr.ScopedZonedPath, value string, exists bool) {
|
||||
// Allow public profile parameters to be added as contact specific attributes...
|
||||
scope, zone, _ := szp.GetScopeZonePath()
|
||||
if exists && scope.IsPublic() && zone == attr.ProfileZone {
|
||||
err := profile.SetConversationAttribute(conversation.ID, szp, value)
|
||||
if err != nil {
|
||||
log.Errorf("error setting conversation attribute %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// OnContactRequestValue for ProfileValueExtension handles returning Public Profile Values
|
||||
func (pne ProfileValueExtension) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, szp attr.ScopedZonedPath) {
|
||||
scope, zone, zpath := szp.GetScopeZonePath()
|
||||
log.Infof("Looking up public | conversation scope/zone %v", szp.ToString())
|
||||
if scope.IsPublic() || scope.IsConversation() {
|
||||
val, exists := profile.GetScopedZonedAttribute(scope, zone, zpath)
|
||||
|
||||
// NOTE: Temporary Override because UI currently wipes names if it can't find them...
|
||||
if !exists && zone == attr.UnknownZone && zpath == constants.Name {
|
||||
val, exists = profile.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
|
||||
}
|
||||
|
||||
// Construct a Response
|
||||
resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversation.ID), event.RemotePeer: conversation.Handle, event.Exists: strconv.FormatBool(exists)})
|
||||
resp.EventID = eventID
|
||||
if exists {
|
||||
resp.Data[event.Data] = val
|
||||
} else {
|
||||
resp.Data[event.Data] = ""
|
||||
}
|
||||
|
||||
log.Debugf("Responding with SendRetValMessageToPeer exists:%v data: %v\n", exists, val)
|
||||
profile.PublishEvent(resp)
|
||||
}
|
||||
}
|
|
@ -2,12 +2,14 @@ package filesharing
|
|||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"cwtch.im/cwtch/event"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"math/bits"
|
||||
"os"
|
||||
path "path/filepath"
|
||||
"regexp"
|
||||
|
@ -28,6 +30,101 @@ import (
|
|||
type Functionality struct {
|
||||
}
|
||||
|
||||
func (f Functionality) RegisterEvents() []event.Type {
|
||||
return []event.Type{event.ManifestReceived, event.FileDownloaded}
|
||||
}
|
||||
|
||||
func (f Functionality) RegisterExperiments() []string {
|
||||
return []string{constants.FileSharingExperiment}
|
||||
}
|
||||
|
||||
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
|
||||
func (f Functionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
|
||||
switch ev.EventType {
|
||||
case event.ManifestReceived:
|
||||
log.Debugf("Manifest Received Event!: %v", ev)
|
||||
handle := ev.Data[event.Handle]
|
||||
fileKey := ev.Data[event.FileKey]
|
||||
serializedManifest := ev.Data[event.SerializedManifest]
|
||||
|
||||
manifestFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.manifest", fileKey))
|
||||
if exists {
|
||||
downloadFilePath, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.path", fileKey))
|
||||
if exists {
|
||||
log.Debugf("downloading manifest to %v, file to %v", manifestFilePath, downloadFilePath)
|
||||
var manifest files.Manifest
|
||||
err := json.Unmarshal([]byte(serializedManifest), &manifest)
|
||||
|
||||
if err == nil {
|
||||
// We only need to check the file size here, as manifest is sent to engine and the file created
|
||||
// will be bound to the size advertised in manifest.
|
||||
fileSizeLimitValue, fileSizeLimitExists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.limit", fileKey))
|
||||
if fileSizeLimitExists {
|
||||
fileSizeLimit, err := strconv.ParseUint(fileSizeLimitValue, 10, bits.UintSize)
|
||||
if err == nil {
|
||||
if manifest.FileSizeInBytes >= fileSizeLimit {
|
||||
log.Errorf("could not download file, size %v greater than limit %v", manifest.FileSizeInBytes, fileSizeLimitValue)
|
||||
} else {
|
||||
manifest.Title = manifest.FileName
|
||||
manifest.FileName = downloadFilePath
|
||||
log.Debugf("saving manifest")
|
||||
err = manifest.Save(manifestFilePath)
|
||||
if err != nil {
|
||||
log.Errorf("could not save manifest: %v", err)
|
||||
} else {
|
||||
tempFile := ""
|
||||
if runtime.GOOS == "android" {
|
||||
tempFile = manifestFilePath[0 : len(manifestFilePath)-len(".manifest")]
|
||||
log.Debugf("derived android temp path: %v", tempFile)
|
||||
}
|
||||
profile.PublishEvent(event.NewEvent(event.ManifestSaved, map[event.Field]string{
|
||||
event.FileKey: fileKey,
|
||||
event.Handle: handle,
|
||||
event.SerializedManifest: string(manifest.Serialize()),
|
||||
event.TempFile: tempFile,
|
||||
event.NameSuggestion: manifest.Title,
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Errorf("error saving manifest: %v", err)
|
||||
}
|
||||
} else {
|
||||
log.Errorf("found manifest path but not download path for %v", fileKey)
|
||||
}
|
||||
} else {
|
||||
log.Errorf("no download path found for manifest: %v", fileKey)
|
||||
}
|
||||
case event.FileDownloaded:
|
||||
fileKey := ev.Data[event.FileKey]
|
||||
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey), "true")
|
||||
}
|
||||
}
|
||||
|
||||
func (f Functionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
|
||||
// nop
|
||||
}
|
||||
|
||||
func (f Functionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
|
||||
scope, zone, zpath := path.GetScopeZonePath()
|
||||
log.Infof("file sharing contact receive value")
|
||||
if exists && scope.IsConversation() && zone == attr.FilesharingZone && strings.HasSuffix(zpath, ".manifest.size") {
|
||||
fileKey := strings.Replace(zpath, ".manifest.size", "", 1)
|
||||
size, err := strconv.Atoi(value)
|
||||
// if size is valid and below the maximum size for a manifest
|
||||
// this is to prevent malicious sharers from using large amounts of memory when distributing
|
||||
// a manifest as we reconstruct this in-memory
|
||||
if err == nil && size < files.MaxManifestSize {
|
||||
profile.PublishEvent(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: value, event.Handle: conversation.Handle}))
|
||||
} else {
|
||||
profile.PublishEvent(event.NewEvent(event.ManifestError, map[event.Field]string{event.FileKey: fileKey, event.Handle: conversation.Handle}))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// FunctionalityGate returns filesharing if enabled in the given experiment map
|
||||
// Note: Experiment maps are currently in libcwtch-go
|
||||
func FunctionalityGate(experimentMap map[string]bool) (*Functionality, error) {
|
||||
|
@ -109,6 +206,23 @@ func (f *Functionality) DownloadFile(profile peer.CwtchPeer, conversationID int,
|
|||
return nil
|
||||
}
|
||||
|
||||
// startFileShare is a private method used to finalize a file share and publish it to the protocol engine for processing.
|
||||
func (f *Functionality) startFileShare(profile peer.CwtchPeer, filekey string, manifest string) error {
|
||||
tsStr, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", filekey))
|
||||
if exists {
|
||||
ts, err := strconv.ParseInt(tsStr, 10, 64)
|
||||
if err != nil || ts < time.Now().Unix()-2592000 {
|
||||
log.Errorf("ignoring request to download a file offered more than 30 days ago")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// set the filekey status to active
|
||||
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", filekey), constants.True)
|
||||
profile.PublishEvent(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: filekey, event.SerializedManifest: manifest}))
|
||||
return nil
|
||||
}
|
||||
|
||||
// RestartFileShare takes in an existing filekey and, assuming the manifest exists, restarts sharing of the manifest
|
||||
func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string) error {
|
||||
// check that a manifest exists
|
||||
|
@ -116,8 +230,7 @@ func (f *Functionality) RestartFileShare(profile peer.CwtchPeer, filekey string)
|
|||
if manifestExists {
|
||||
// everything is in order, so reshare this file with the engine
|
||||
log.Debugf("restarting file share: %v", filekey)
|
||||
profile.ShareFile(filekey, manifest)
|
||||
return nil
|
||||
return f.startFileShare(profile, filekey, manifest)
|
||||
}
|
||||
return fmt.Errorf("manifest does not exist for filekey: %v", filekey)
|
||||
}
|
||||
|
@ -238,7 +351,7 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer) (stri
|
|||
profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key), string(serializedManifest))
|
||||
profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest)-lenDiff)/float64(files.DefaultChunkSize)))))
|
||||
|
||||
profile.ShareFile(key, string(serializedManifest))
|
||||
err = f.startFileShare(profile, key, string(serializedManifest))
|
||||
|
||||
return key, string(wrapperJSON), err
|
||||
}
|
||||
|
@ -330,3 +443,15 @@ func GenerateDownloadPath(basePath, fileName string, overwrite bool) (filePath,
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
// StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file
|
||||
func (f *Functionality) StopFileShare(profile peer.CwtchPeer, fileKey string) {
|
||||
// set the filekey status to inactive
|
||||
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False)
|
||||
profile.PublishEvent(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey}))
|
||||
}
|
||||
|
||||
// StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files
|
||||
func (f *Functionality) StopAllFileShares(profile peer.CwtchPeer) {
|
||||
profile.PublishEvent(event.NewEvent(event.StopAllFileShares, map[event.Field]string{}))
|
||||
}
|
||||
|
|
|
@ -23,6 +23,12 @@ type Scope string
|
|||
// ScopedZonedPath typed path with a scope and a zone
|
||||
type ScopedZonedPath string
|
||||
|
||||
func (szp ScopedZonedPath) GetScopeZonePath() (Scope, Zone, string) {
|
||||
scope, path := ParseScope(string(szp))
|
||||
zone, zpath := ParseZone(path)
|
||||
return scope, zone, zpath
|
||||
}
|
||||
|
||||
// scopes for attributes
|
||||
const (
|
||||
// on a peer, local and peer supplied data
|
||||
|
|
|
@ -14,10 +14,8 @@ import (
|
|||
"git.openprivacy.ca/openprivacy/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"golang.org/x/crypto/ed25519"
|
||||
"math/bits"
|
||||
"os"
|
||||
path "path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -28,7 +26,6 @@ import (
|
|||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"cwtch.im/cwtch/protocol/files"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
)
|
||||
|
||||
|
@ -38,8 +35,7 @@ const lastReceivedSignature = "LastReceivedSignature"
|
|||
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
|
||||
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true,
|
||||
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true,
|
||||
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true,
|
||||
event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true, event.TriggerAntispamCheck: true}
|
||||
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true, event.TriggerAntispamCheck: true}
|
||||
|
||||
// DefaultEventsToHandle specifies which events will be subscribed to
|
||||
//
|
||||
|
@ -58,8 +54,6 @@ var DefaultEventsToHandle = []event.Type{
|
|||
event.ServerStateChange,
|
||||
event.SendMessageToPeerError,
|
||||
event.NewRetValMessageFromPeer,
|
||||
event.ManifestReceived,
|
||||
event.FileDownloaded,
|
||||
event.TriggerAntispamCheck,
|
||||
}
|
||||
|
||||
|
@ -74,6 +68,25 @@ type cwtchPeer struct {
|
|||
|
||||
queue event.Queue
|
||||
eventBus event.Manager
|
||||
|
||||
extensions []ProfileHook
|
||||
extensionLock sync.Mutex // we don't want to hold up all of cwtch for managing thread safe access to extensions
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) PublishEvent(resp event.Event) {
|
||||
cp.eventBus.Publish(resp)
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) RegisterHook(extension ProfileHooks) {
|
||||
cp.extensionLock.Lock()
|
||||
defer cp.extensionLock.Unlock()
|
||||
|
||||
// Register Requested Events
|
||||
for _, event := range extension.RegisterEvents() {
|
||||
cp.eventBus.Subscribe(event, cp.queue)
|
||||
}
|
||||
|
||||
cp.extensions = append(cp.extensions, ConstructHook(extension))
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) StoreCachedTokens(tokenServer string, tokens []*privacypass.Token) {
|
||||
|
@ -1165,33 +1178,6 @@ func (cp *cwtchPeer) storeMessage(handle string, message string, sent time.Time)
|
|||
return cp.storage.InsertMessage(ci.ID, 0, message, model.Attributes{constants.AttrAuthor: handle, constants.AttrAck: event.True, constants.AttrSentTimestamp: sent.Format(time.RFC3339Nano)}, signature, model.CalculateContentHash(handle, message))
|
||||
}
|
||||
|
||||
// ShareFile begins hosting the given serialized manifest
|
||||
func (cp *cwtchPeer) ShareFile(fileKey string, serializedManifest string) {
|
||||
tsStr, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.ts", fileKey))
|
||||
if exists {
|
||||
ts, err := strconv.ParseInt(tsStr, 10, 64)
|
||||
if err != nil || ts < time.Now().Unix()-2592000 {
|
||||
log.Errorf("ignoring request to download a file offered more than 30 days ago")
|
||||
return
|
||||
}
|
||||
}
|
||||
// set the filekey status to active
|
||||
cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.True)
|
||||
cp.eventBus.Publish(event.NewEvent(event.ShareManifest, map[event.Field]string{event.FileKey: fileKey, event.SerializedManifest: serializedManifest}))
|
||||
}
|
||||
|
||||
// StopFileShare sends a message to the ProtocolEngine to cease sharing a particular file
|
||||
func (cp *cwtchPeer) StopFileShare(fileKey string) {
|
||||
// set the filekey status to inactive
|
||||
cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.active", fileKey), constants.False)
|
||||
cp.eventBus.Publish(event.NewEvent(event.StopFileShare, map[event.Field]string{event.FileKey: fileKey}))
|
||||
}
|
||||
|
||||
// StopAllFileShares sends a message to the ProtocolEngine to cease sharing all files
|
||||
func (cp *cwtchPeer) StopAllFileShares() {
|
||||
cp.eventBus.Publish(event.NewEvent(event.StopAllFileShares, map[event.Field]string{}))
|
||||
}
|
||||
|
||||
// eventHandler process events from other subsystems
|
||||
func (cp *cwtchPeer) eventHandler() {
|
||||
for {
|
||||
|
@ -1288,128 +1274,48 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
path := ev.Data[event.Path]
|
||||
|
||||
log.Debugf("NewGetValMessageFromPeer for %v.%v from %v\n", scope, path, onion)
|
||||
|
||||
conversationInfo, err := cp.FetchConversationInfo(onion)
|
||||
|
||||
log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err)
|
||||
// only accepted contacts can look up information
|
||||
if conversationInfo != nil && conversationInfo.Accepted {
|
||||
scope := attr.IntoScope(scope)
|
||||
if scope.IsPublic() || scope.IsConversation() {
|
||||
zone, zpath := attr.ParseZone(path)
|
||||
val, exists := cp.GetScopedZonedAttribute(scope, zone, zpath)
|
||||
// Type Safe Scoped/Zoned Path
|
||||
zscope := attr.IntoScope(scope)
|
||||
zone, zpath := attr.ParseZone(path)
|
||||
scopedZonedPath := zscope.ConstructScopedZonedPath(zone.ConstructZonedPath(zpath))
|
||||
|
||||
// NOTE: Temporary Override because UI currently wipes names if it can't find them...
|
||||
if !exists && zone == attr.UnknownZone && path == constants.Name {
|
||||
val, exists = cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
|
||||
}
|
||||
|
||||
resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationInfo.ID), event.RemotePeer: onion, event.Exists: strconv.FormatBool(exists)})
|
||||
resp.EventID = ev.EventID
|
||||
if exists {
|
||||
resp.Data[event.Data] = val
|
||||
} else {
|
||||
resp.Data[event.Data] = ""
|
||||
}
|
||||
log.Debugf("Responding with SendRetValMessageToPeer exists:%v data: %v\n", exists, val)
|
||||
|
||||
cp.eventBus.Publish(resp)
|
||||
// Safe Access to Extensions
|
||||
cp.extensionLock.Lock()
|
||||
log.Infof("checking extension...%v", cp.extensions)
|
||||
for _, extension := range cp.extensions {
|
||||
log.Infof("checking extension...%v", extension)
|
||||
extension.extension.OnContactRequestValue(cp, *conversationInfo, ev.EventID, scopedZonedPath)
|
||||
}
|
||||
cp.extensionLock.Unlock()
|
||||
|
||||
}
|
||||
|
||||
case event.ManifestReceived:
|
||||
log.Debugf("Manifest Received Event!: %v", ev)
|
||||
handle := ev.Data[event.Handle]
|
||||
fileKey := ev.Data[event.FileKey]
|
||||
serializedManifest := ev.Data[event.SerializedManifest]
|
||||
|
||||
manifestFilePath, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.manifest", fileKey))
|
||||
if exists {
|
||||
downloadFilePath, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.path", fileKey))
|
||||
if exists {
|
||||
log.Debugf("downloading manifest to %v, file to %v", manifestFilePath, downloadFilePath)
|
||||
var manifest files.Manifest
|
||||
err := json.Unmarshal([]byte(serializedManifest), &manifest)
|
||||
|
||||
if err == nil {
|
||||
// We only need to check the file size here, as manifest is sent to engine and the file created
|
||||
// will be bound to the size advertised in manifest.
|
||||
fileSizeLimitValue, fileSizeLimitExists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.limit", fileKey))
|
||||
if fileSizeLimitExists {
|
||||
fileSizeLimit, err := strconv.ParseUint(fileSizeLimitValue, 10, bits.UintSize)
|
||||
if err == nil {
|
||||
if manifest.FileSizeInBytes >= fileSizeLimit {
|
||||
log.Errorf("could not download file, size %v greater than limit %v", manifest.FileSizeInBytes, fileSizeLimitValue)
|
||||
} else {
|
||||
manifest.Title = manifest.FileName
|
||||
manifest.FileName = downloadFilePath
|
||||
log.Debugf("saving manifest")
|
||||
err = manifest.Save(manifestFilePath)
|
||||
if err != nil {
|
||||
log.Errorf("could not save manifest: %v", err)
|
||||
} else {
|
||||
tempFile := ""
|
||||
if runtime.GOOS == "android" {
|
||||
tempFile = manifestFilePath[0 : len(manifestFilePath)-len(".manifest")]
|
||||
log.Debugf("derived android temp path: %v", tempFile)
|
||||
}
|
||||
cp.eventBus.Publish(event.NewEvent(event.ManifestSaved, map[event.Field]string{
|
||||
event.FileKey: fileKey,
|
||||
event.Handle: handle,
|
||||
event.SerializedManifest: string(manifest.Serialize()),
|
||||
event.TempFile: tempFile,
|
||||
event.NameSuggestion: manifest.Title,
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Errorf("error saving manifest: %v", err)
|
||||
}
|
||||
} else {
|
||||
log.Errorf("found manifest path but not download path for %v", fileKey)
|
||||
}
|
||||
} else {
|
||||
log.Errorf("no download path found for manifest: %v", fileKey)
|
||||
}
|
||||
case event.FileDownloaded:
|
||||
fileKey := ev.Data[event.FileKey]
|
||||
cp.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey), "true")
|
||||
case event.NewRetValMessageFromPeer:
|
||||
onion := ev.Data[event.RemotePeer]
|
||||
handle := ev.Data[event.RemotePeer]
|
||||
scope := ev.Data[event.Scope]
|
||||
path := ev.Data[event.Path]
|
||||
val := ev.Data[event.Data]
|
||||
exists, _ := strconv.ParseBool(ev.Data[event.Exists])
|
||||
log.Debugf("NewRetValMessageFromPeer %v %v %v %v %v\n", onion, scope, path, exists, val)
|
||||
if exists {
|
||||
log.Debugf("NewRetValMessageFromPeer %v %v %v %v %v\n", handle, scope, path, exists, val)
|
||||
|
||||
// Handle File Sharing Metadata
|
||||
// TODO This probably should be broken out to it's own code..
|
||||
zone, path := attr.ParseZone(path)
|
||||
if attr.Scope(scope).IsConversation() && zone == attr.FilesharingZone && strings.HasSuffix(path, ".manifest.size") {
|
||||
fileKey := strings.Replace(path, ".manifest.size", "", 1)
|
||||
size, err := strconv.Atoi(val)
|
||||
// if size is valid and below the maximum size for a manifest
|
||||
// this is to prevent malicious sharers from using large amounts of memory when distributing
|
||||
// a manifest as we reconstruct this in-memory
|
||||
if err == nil && size < files.MaxManifestSize {
|
||||
cp.eventBus.Publish(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: val, event.Handle: onion}))
|
||||
} else {
|
||||
cp.eventBus.Publish(event.NewEvent(event.ManifestError, map[event.Field]string{event.FileKey: fileKey, event.Handle: onion}))
|
||||
}
|
||||
}
|
||||
conversationInfo, _ := cp.FetchConversationInfo(handle)
|
||||
// only accepted contacts can look up information
|
||||
if conversationInfo != nil && conversationInfo.Accepted {
|
||||
// Type Safe Scoped/Zoned Path
|
||||
zscope := attr.IntoScope(scope)
|
||||
zone, zpath := attr.ParseZone(path)
|
||||
scopedZonedPath := zscope.ConstructScopedZonedPath(zone.ConstructZonedPath(zpath))
|
||||
|
||||
// Allow public profile parameters to be added as peer specific attributes...
|
||||
if attr.Scope(scope).IsPublic() && zone == attr.ProfileZone {
|
||||
ci, err := cp.FetchConversationInfo(onion)
|
||||
log.Debugf("fetch conversation info %v %v", ci, err)
|
||||
if ci != nil && err == nil {
|
||||
err := cp.SetConversationAttribute(ci.ID, attr.Scope(scope).ConstructScopedZonedPath(zone.ConstructZonedPath(path)), val)
|
||||
if err != nil {
|
||||
log.Errorf("error setting conversation attribute %v", err)
|
||||
}
|
||||
}
|
||||
// Safe Access to Extensions
|
||||
cp.extensionLock.Lock()
|
||||
for _, extension := range cp.extensions {
|
||||
extension.extension.OnContactReceiveValue(cp, *conversationInfo, scopedZonedPath, val, exists)
|
||||
}
|
||||
cp.extensionLock.Unlock()
|
||||
}
|
||||
case event.PeerStateChange:
|
||||
handle := ev.Data[event.RemotePeer]
|
||||
|
@ -1491,10 +1397,26 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
}
|
||||
}
|
||||
default:
|
||||
if ev.EventType != "" {
|
||||
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
|
||||
// invalid event, signifies shutdown
|
||||
if ev.EventType == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise, obtain Safe Access to Extensions
|
||||
processed := false
|
||||
cp.extensionLock.Lock()
|
||||
for _, extension := range cp.extensions {
|
||||
// if the extension is registered for this event type then process
|
||||
if _, contains := extension.events[ev.EventType]; contains {
|
||||
extension.extension.OnEvent(ev, cp)
|
||||
processed = true
|
||||
}
|
||||
}
|
||||
cp.extensionLock.Unlock()
|
||||
|
||||
if !processed {
|
||||
log.Errorf("cwtch profile received an event that it (or an extension) was unable to handle. this is very likely a programming error: %v", ev.EventType)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
package peer
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/model/attr"
|
||||
)
|
||||
|
||||
type ProfileHooks interface {
|
||||
|
||||
// RegisterEvents returns a set of events that the extension is interested hooking
|
||||
RegisterEvents() []event.Type
|
||||
|
||||
// RegisterExperiments RegisterExperiments returns a set of experiments that the extension is interested in being notified about
|
||||
RegisterExperiments() []string
|
||||
|
||||
// OnEvent is called whenever an event Registered with RegisterEvents is called
|
||||
OnEvent(event event.Event, profile CwtchPeer)
|
||||
|
||||
// OnContactRequestValue is Hooked when a contact sends a request for the given path
|
||||
OnContactRequestValue(profile CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath)
|
||||
|
||||
// OnContactReceiveValue is Hooked after a profile receives a response to a Get/Val Request
|
||||
OnContactReceiveValue(profile CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool)
|
||||
}
|
||||
|
||||
type ProfileHook struct {
|
||||
extension ProfileHooks
|
||||
events map[event.Type]bool
|
||||
experiments map[string]bool
|
||||
}
|
||||
|
||||
func ConstructHook(extension ProfileHooks) ProfileHook {
|
||||
events := make(map[event.Type]bool)
|
||||
for _, event := range extension.RegisterEvents() {
|
||||
events[event] = true
|
||||
}
|
||||
|
||||
experiments := make(map[string]bool)
|
||||
for _, experiment := range extension.RegisterExperiments() {
|
||||
experiments[experiment] = true
|
||||
}
|
||||
|
||||
return ProfileHook{
|
||||
extension,
|
||||
events,
|
||||
experiments,
|
||||
}
|
||||
}
|
|
@ -121,12 +121,6 @@ type CwtchPeer interface {
|
|||
GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error)
|
||||
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
|
||||
|
||||
// File Sharing APIS
|
||||
// TODO move these to feature protected interfaces
|
||||
ShareFile(fileKey string, serializedManifest string)
|
||||
StopFileShare(fileKey string)
|
||||
StopAllFileShares()
|
||||
|
||||
// Server Token APIS
|
||||
// TODO move these to feature protected interfaces
|
||||
StoreCachedTokens(tokenServer string, tokens []*privacypass.Token)
|
||||
|
@ -136,4 +130,6 @@ type CwtchPeer interface {
|
|||
ChangePassword(oldpassword string, newpassword string, newpasswordAgain string) error
|
||||
Export(file string) error
|
||||
Delete()
|
||||
PublishEvent(resp event.Event)
|
||||
RegisterHook(hook ProfileHooks)
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"path/filepath"
|
||||
|
||||
// Import SQL Cipher
|
||||
mrand "math/rand"
|
||||
|
@ -56,7 +57,7 @@ func TestFileSharing(t *testing.T) {
|
|||
os.RemoveAll("cwtch.out.png")
|
||||
os.RemoveAll("cwtch.out.png.manifest")
|
||||
|
||||
log.SetLevel(log.LevelInfo)
|
||||
log.SetLevel(log.LevelDebug)
|
||||
|
||||
os.Mkdir("tordir", 0700)
|
||||
dataDir := path.Join("tordir", "tor")
|
||||
|
@ -74,13 +75,22 @@ func TestFileSharing(t *testing.T) {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
useCache := os.Getenv("TORCACHE") == "true"
|
||||
|
||||
torDataDir := ""
|
||||
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
|
||||
t.Fatalf("could not create data dir")
|
||||
if useCache {
|
||||
log.Infof("using tor cache")
|
||||
torDataDir = filepath.Join(dataDir, "data-dir-torcache")
|
||||
os.MkdirAll(torDataDir, 0700)
|
||||
} else {
|
||||
log.Infof("using clean tor data dir")
|
||||
if torDataDir, err = os.MkdirTemp(dataDir, "data-dir-"); err != nil {
|
||||
t.Fatalf("could not create data dir")
|
||||
}
|
||||
}
|
||||
|
||||
tor.NewTorrc().WithSocksPort(socksPort).WithOnionTrafficOnly().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("tordir/tor/torrc")
|
||||
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
|
||||
acn, err := tor.NewTorACNWithAuth("./tordir", path.Join("..", "tor"), torDataDir, controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
|
||||
if err != nil {
|
||||
t.Fatalf("Could not start Tor: %v", err)
|
||||
}
|
||||
|
@ -125,6 +135,7 @@ func TestFileSharing(t *testing.T) {
|
|||
|
||||
t.Logf("Waiting for alice and Bob to peer...")
|
||||
waitForPeerPeerConnection(t, alice, bob)
|
||||
alice.AcceptConversation(1)
|
||||
|
||||
t.Logf("Alice and Bob are Connected!!")
|
||||
|
||||
|
@ -145,7 +156,7 @@ func TestFileSharing(t *testing.T) {
|
|||
|
||||
// Test stopping and restarting file shares
|
||||
t.Logf("Stopping File Share")
|
||||
alice.StopAllFileShares()
|
||||
filesharingFunctionality.StopAllFileShares(alice)
|
||||
|
||||
// Allow time for the stop request to filter through Engine
|
||||
time.Sleep(time.Second * 5)
|
||||
|
@ -181,6 +192,7 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional
|
|||
os.RemoveAll("cwtch.out.png")
|
||||
os.RemoveAll("cwtch.out.png.manifest")
|
||||
|
||||
bob.AcceptConversation(1)
|
||||
message, _, err := bob.GetChannelMessage(1, 0, 1)
|
||||
if err != nil {
|
||||
t.Fatalf("could not find file sharing message: %v", err)
|
||||
|
@ -194,19 +206,19 @@ func testBobDownloadFile(t *testing.T, bob peer.CwtchPeer, filesharingFunctional
|
|||
err := json.Unmarshal([]byte(messageWrapper.Data), &fileMessageOverlay)
|
||||
|
||||
if err == nil {
|
||||
|
||||
t.Logf("bob attempting to download file with invalid download")
|
||||
// try downloading with invalid download dir
|
||||
err = filesharingFunctionality.DownloadFile(bob, 1, "/do/not/download/this/file/cwtch.out.png", "./cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce), constants.ImagePreviewMaxSizeInBytes)
|
||||
if err == nil {
|
||||
t.Fatalf("should not download file with invalid download dir")
|
||||
}
|
||||
|
||||
t.Logf("bob attempting to download file with invalid manifest")
|
||||
// try downloading with invalid manifest dir
|
||||
err = filesharingFunctionality.DownloadFile(bob, 1, "./cwtch.out.png", "/do/not/download/this/file/cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce), constants.ImagePreviewMaxSizeInBytes)
|
||||
if err == nil {
|
||||
t.Fatalf("should not download file with invalid manifest dir")
|
||||
}
|
||||
|
||||
t.Logf("bob attempting to download file")
|
||||
err = filesharingFunctionality.DownloadFile(bob, 1, "./cwtch.out.png", "./cwtch.out.png.manifest", fmt.Sprintf("%s.%s", fileMessageOverlay.Hash, fileMessageOverlay.Nonce), constants.ImagePreviewMaxSizeInBytes)
|
||||
if err != nil {
|
||||
t.Fatalf("could not download file: %v", err)
|
||||
|
|
Loading…
Reference in New Issue