Merge pull request 'Enable a SendPeerMessage EngineHook for Fuzzbot' (#508) from fuzzbot into master
continuous-integration/drone/push Build is pending Details

Reviewed-on: #508
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
This commit is contained in:
Sarah Jamie Lewis 2023-04-20 21:00:14 +00:00
commit d9298f84b2
11 changed files with 57 additions and 16 deletions

4
.gitignore vendored
View File

@ -31,4 +31,6 @@ testing/encryptedstorage/tordir
*.tar.gz *.tar.gz
data-dir-cwtchtool/ data-dir-cwtchtool/
tokens tokens
tordir/ tordir/
testing/autodownload/download_dir
testing/autodownload/storage

View File

@ -29,9 +29,10 @@ type application struct {
acn connectivity.ACN acn connectivity.ACN
plugins sync.Map //map[string] []plugins.Plugin plugins sync.Map //map[string] []plugins.Plugin
engines map[string]connections.Engine engines map[string]connections.Engine
appBus event.Manager appBus event.Manager
appmutex sync.Mutex appmutex sync.Mutex
engineHooks connections.EngineHooks
settings *settings.GlobalSettingsFile settings *settings.GlobalSettingsFile
} }
@ -50,7 +51,7 @@ func (app *application) IsFeatureEnabled(experiment string) bool {
type Application interface { type Application interface {
LoadProfiles(password string) LoadProfiles(password string)
CreateProfile(name string, password string, autostart bool) CreateProfile(name string, password string, autostart bool)
InstallEngineHooks(engineHooks connections.EngineHooks)
ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error) ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error)
EnhancedImportProfile(exportedCwtchFile string, password string) string EnhancedImportProfile(exportedCwtchFile string, password string) string
DeleteProfile(onion string, currentPassword string) DeleteProfile(onion string, currentPassword string)
@ -98,7 +99,7 @@ func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.Global
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings} app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings}
app.peers = make(map[string]peer.CwtchPeer) app.peers = make(map[string]peer.CwtchPeer)
app.engineHooks = connections.DefaultEngineHooks{}
app.acn = acn app.acn = acn
statusHandler := app.getACNStatusHandler() statusHandler := app.getACNStatusHandler()
acn.SetStatusCallback(statusHandler) acn.SetStatusCallback(statusHandler)
@ -109,6 +110,12 @@ func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.Global
return app return app
} }
func (app *application) InstallEngineHooks(engineHooks connections.EngineHooks) {
app.appmutex.Lock()
defer app.appmutex.Unlock()
app.engineHooks = engineHooks
}
func (app *application) ReadSettings() settings.GlobalSettings { func (app *application) ReadSettings() settings.GlobalSettings {
app.appmutex.Lock() app.appmutex.Lock()
defer app.appmutex.Unlock() defer app.appmutex.Unlock()
@ -373,7 +380,7 @@ func (app *application) ActivateEngines(doListen, doPeers, doServers bool) {
log.Debugf("ActivateEngines") log.Debugf("ActivateEngines")
for _, profile := range app.peers { for _, profile := range app.peers {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
} }
app.QueryACNStatus() app.QueryACNStatus()
@ -398,7 +405,7 @@ func (app *application) ActivatePeerEngine(onion string) {
profile := app.GetPeer(onion) profile := app.GetPeer(onion)
if profile != nil { if profile != nil {
if _, exists := app.engines[onion]; !exists { if _, exists := app.engines[onion]; !exists {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated)) app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
app.QueryACNStatus() app.QueryACNStatus()

View File

@ -293,7 +293,7 @@ func (cp *cwtchPeer) ChangePassword(password string, newpassword string, newpass
// GenerateProtocolEngine // GenerateProtocolEngine
// Status: New in 1.5 // Status: New in 1.5
func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) (connections.Engine, error) { func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager, engineHooks connections.EngineHooks) (connections.Engine, error) {
cp.mutex.Lock() cp.mutex.Lock()
defer cp.mutex.Unlock() defer cp.mutex.Unlock()
conversations, _ := cp.storage.FetchConversations() conversations, _ := cp.storage.FetchConversations()
@ -323,7 +323,7 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey)) identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
return connections.NewProtocolEngine(identity, privateKey, acn, bus, authorizations), nil return connections.NewProtocolEngine(identity, privateKey, acn, bus, authorizations, engineHooks), nil
} }
// SendScopedZonedGetValToContact // SendScopedZonedGetValToContact

View File

@ -69,7 +69,7 @@ type CwtchPeer interface {
// most functions // most functions
Init(event.Manager) Init(event.Manager)
GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) (connections.Engine, error) GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager, engineHooks connections.EngineHooks) (connections.Engine, error)
AutoHandleEvents(events []event.Type) AutoHandleEvents(events []event.Type)
Listen() Listen()

View File

@ -47,7 +47,7 @@ func createKey(password string, salt []byte) [32]byte {
} }
func initV2Directory(directory, password string) ([32]byte, [128]byte, error) { func initV2Directory(directory, password string) ([32]byte, [128]byte, error) {
os.Mkdir(directory, 0700) os.MkdirAll(directory, 0700)
key, salt, err := CreateKeySalt(password) key, salt, err := CreateKeySalt(password)
if err != nil { if err != nil {

View File

@ -72,7 +72,8 @@ type engine struct {
tokenManagers sync.Map // [tokenService][]TokenManager tokenManagers sync.Map // [tokenService][]TokenManager
shuttingDown atomic.Bool shuttingDown atomic.Bool
onSendMessage func(connection tapir.Connection, message []byte) error
} }
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. // Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
@ -86,12 +87,16 @@ type Engine interface {
} }
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters // NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, peerAuthorizations map[string]model.Authorization) Engine { func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, peerAuthorizations map[string]model.Authorization, engineHooks EngineHooks) Engine {
engine := new(engine) engine := new(engine)
engine.identity = identity engine.identity = identity
engine.privateKey = privateKey engine.privateKey = privateKey
engine.ephemeralServices = make(map[string]*connectionLockedService) engine.ephemeralServices = make(map[string]*connectionLockedService)
engine.queue = event.NewQueue() engine.queue = event.NewQueue()
// the standard send message function
engine.onSendMessage = engineHooks.SendPeerMessage
go engine.eventHandler() go engine.eventHandler()
engine.acn = acn engine.acn = acn
@ -320,6 +325,7 @@ func (e *engine) createPeerTemplate() *PeerApp {
peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed) peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting) peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected) peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
peerAppTemplate.OnSendMessage = e.onSendMessage
return peerAppTemplate return peerAppTemplate
} }

View File

@ -0,0 +1,14 @@
package connections
import "git.openprivacy.ca/cwtch.im/tapir"
type EngineHooks interface {
SendPeerMessage(connection tapir.Connection, message []byte) error
}
type DefaultEngineHooks struct {
}
func (deh DefaultEngineHooks) SendPeerMessage(connection tapir.Connection, message []byte) error {
return connection.Send(message)
}

View File

@ -23,6 +23,7 @@ type PeerApp struct {
OnAuth func(string) OnAuth func(string)
OnClose func(string) OnClose func(string)
OnConnecting func(string) OnConnecting func(string)
OnSendMessage func(connection tapir.Connection, message []byte) error
version atomic.Value version atomic.Value
} }
@ -48,6 +49,7 @@ func (pa *PeerApp) NewInstance() tapir.Application {
newApp.OnAuth = pa.OnAuth newApp.OnAuth = pa.OnAuth
newApp.OnClose = pa.OnClose newApp.OnClose = pa.OnClose
newApp.OnConnecting = pa.OnConnecting newApp.OnConnecting = pa.OnConnecting
newApp.OnSendMessage = pa.OnSendMessage
newApp.version.Store(Version1) newApp.version.Store(Version1)
return newApp return newApp
} }
@ -154,7 +156,7 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
} }
if err == nil { if err == nil {
err = pa.connection.Send(serialized) err = pa.OnSendMessage(pa.connection, serialized)
// at this point we have tried to send a message to a peer only to find that something went wrong. // at this point we have tried to send a message to a peer only to find that something went wrong.
// we don't know *what* went wrong - the most likely explanation is the peer went offline in the time between // we don't know *what* went wrong - the most likely explanation is the peer went offline in the time between

View File

@ -8,6 +8,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"git.openprivacy.ca/openprivacy/log"
"io" "io"
"os" "os"
"sync" "sync"
@ -237,6 +238,9 @@ func (m *Manifest) PrepareDownload() error {
defer m.lock.Unlock() defer m.lock.Unlock()
m.chunkComplete = make([]bool, len(m.Chunks)) m.chunkComplete = make([]bool, len(m.Chunks))
if m.ChunkSizeInBytes == 0 || m.FileSizeInBytes == 0 {
return fmt.Errorf("manifest is invalid")
}
if info, err := os.Stat(m.FileName); os.IsNotExist(err) { if info, err := os.Stat(m.FileName); os.IsNotExist(err) {
useFileName := m.FileName useFileName := m.FileName
@ -293,6 +297,12 @@ func (m *Manifest) PrepareDownload() error {
} }
break break
} }
if chunkI >= len(m.Chunks) {
log.Errorf("file is larger than the number of chunks assigned. Assuming manifest was corrupted.")
return fmt.Errorf("file is larger than the number of chunks assigned. Assuming manifest was corrupted")
}
hash := sha512.New() hash := sha512.New()
hash.Write(buf[0:n]) hash.Write(buf[0:n])
chunkHash := hash.Sum(nil) chunkHash := hash.Sum(nil)

View File

@ -57,7 +57,7 @@ func TestFileSharing(t *testing.T) {
os.RemoveAll("tordir") os.RemoveAll("tordir")
os.RemoveAll("./download_dir") os.RemoveAll("./download_dir")
log.SetLevel(log.LevelDebug) log.SetLevel(log.LevelInfo)
os.Mkdir("tordir", 0700) os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor") dataDir := path.Join("tordir", "tor")