Enable a SendPeerMessage EngineHook for Fuzzbot #508

Merged
sarah merged 5 commits from fuzzbot into master 2023-04-20 21:00:15 +00:00
11 changed files with 57 additions and 16 deletions

4
.gitignore vendored
View File

@ -31,4 +31,6 @@ testing/encryptedstorage/tordir
*.tar.gz
data-dir-cwtchtool/
tokens
tordir/
tordir/
testing/autodownload/download_dir
testing/autodownload/storage

View File

@ -29,9 +29,10 @@ type application struct {
acn connectivity.ACN
plugins sync.Map //map[string] []plugins.Plugin
engines map[string]connections.Engine
appBus event.Manager
appmutex sync.Mutex
engines map[string]connections.Engine
appBus event.Manager
appmutex sync.Mutex
engineHooks connections.EngineHooks
settings *settings.GlobalSettingsFile
}
@ -50,7 +51,7 @@ func (app *application) IsFeatureEnabled(experiment string) bool {
type Application interface {
LoadProfiles(password string)
CreateProfile(name string, password string, autostart bool)
InstallEngineHooks(engineHooks connections.EngineHooks)
ImportProfile(exportedCwtchFile string, password string) (peer.CwtchPeer, error)
EnhancedImportProfile(exportedCwtchFile string, password string) string
DeleteProfile(onion string, currentPassword string)
@ -98,7 +99,7 @@ func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.Global
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings}
app.peers = make(map[string]peer.CwtchPeer)
app.engineHooks = connections.DefaultEngineHooks{}
app.acn = acn
statusHandler := app.getACNStatusHandler()
acn.SetStatusCallback(statusHandler)
@ -109,6 +110,12 @@ func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.Global
return app
}
func (app *application) InstallEngineHooks(engineHooks connections.EngineHooks) {
app.appmutex.Lock()
defer app.appmutex.Unlock()
app.engineHooks = engineHooks
}
func (app *application) ReadSettings() settings.GlobalSettings {
app.appmutex.Lock()
defer app.appmutex.Unlock()
@ -373,7 +380,7 @@ func (app *application) ActivateEngines(doListen, doPeers, doServers bool) {
log.Debugf("ActivateEngines")
for _, profile := range app.peers {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
}
app.QueryACNStatus()
@ -398,7 +405,7 @@ func (app *application) ActivatePeerEngine(onion string) {
profile := app.GetPeer(onion)
if profile != nil {
if _, exists := app.engines[onion]; !exists {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
app.QueryACNStatus()

View File

@ -293,7 +293,7 @@ func (cp *cwtchPeer) ChangePassword(password string, newpassword string, newpass
// GenerateProtocolEngine
// Status: New in 1.5
func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) (connections.Engine, error) {
func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager, engineHooks connections.EngineHooks) (connections.Engine, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
conversations, _ := cp.storage.FetchConversations()
@ -323,7 +323,7 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
return connections.NewProtocolEngine(identity, privateKey, acn, bus, authorizations), nil
return connections.NewProtocolEngine(identity, privateKey, acn, bus, authorizations, engineHooks), nil
}
// SendScopedZonedGetValToContact

View File

@ -69,7 +69,7 @@ type CwtchPeer interface {
// most functions
Init(event.Manager)
GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) (connections.Engine, error)
GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager, engineHooks connections.EngineHooks) (connections.Engine, error)
AutoHandleEvents(events []event.Type)
Listen()

View File

@ -47,7 +47,7 @@ func createKey(password string, salt []byte) [32]byte {
}
func initV2Directory(directory, password string) ([32]byte, [128]byte, error) {
os.Mkdir(directory, 0700)
os.MkdirAll(directory, 0700)
key, salt, err := CreateKeySalt(password)
if err != nil {

View File

@ -72,7 +72,8 @@ type engine struct {
tokenManagers sync.Map // [tokenService][]TokenManager
shuttingDown atomic.Bool
shuttingDown atomic.Bool
onSendMessage func(connection tapir.Connection, message []byte) error
}
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
@ -86,12 +87,16 @@ type Engine interface {
}
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
Outdated
Review

why is this type not in enginehook.go?

why is this type not in enginehook.go?
func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, peerAuthorizations map[string]model.Authorization) Engine {
func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, peerAuthorizations map[string]model.Authorization, engineHooks EngineHooks) Engine {
engine := new(engine)
engine.identity = identity
engine.privateKey = privateKey
engine.ephemeralServices = make(map[string]*connectionLockedService)
engine.queue = event.NewQueue()
// the standard send message function
engine.onSendMessage = engineHooks.SendPeerMessage
go engine.eventHandler()
engine.acn = acn
@ -320,6 +325,7 @@ func (e *engine) createPeerTemplate() *PeerApp {
peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
peerAppTemplate.OnSendMessage = e.onSendMessage
return peerAppTemplate
}

View File

@ -0,0 +1,14 @@
package connections
import "git.openprivacy.ca/cwtch.im/tapir"
type EngineHooks interface {
SendPeerMessage(connection tapir.Connection, message []byte) error
}
type DefaultEngineHooks struct {
}
func (deh DefaultEngineHooks) SendPeerMessage(connection tapir.Connection, message []byte) error {
return connection.Send(message)
}

View File

@ -23,6 +23,7 @@ type PeerApp struct {
OnAuth func(string)
OnClose func(string)
OnConnecting func(string)
OnSendMessage func(connection tapir.Connection, message []byte) error
version atomic.Value
}
@ -48,6 +49,7 @@ func (pa *PeerApp) NewInstance() tapir.Application {
newApp.OnAuth = pa.OnAuth
newApp.OnClose = pa.OnClose
newApp.OnConnecting = pa.OnConnecting
newApp.OnSendMessage = pa.OnSendMessage
newApp.version.Store(Version1)
return newApp
}
@ -154,7 +156,7 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
}
if err == nil {
err = pa.connection.Send(serialized)
err = pa.OnSendMessage(pa.connection, serialized)
// at this point we have tried to send a message to a peer only to find that something went wrong.
// we don't know *what* went wrong - the most likely explanation is the peer went offline in the time between

View File

@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"io"
"os"
"sync"
@ -237,6 +238,9 @@ func (m *Manifest) PrepareDownload() error {
defer m.lock.Unlock()
m.chunkComplete = make([]bool, len(m.Chunks))
if m.ChunkSizeInBytes == 0 || m.FileSizeInBytes == 0 {
return fmt.Errorf("manifest is invalid")
}
if info, err := os.Stat(m.FileName); os.IsNotExist(err) {
useFileName := m.FileName
@ -293,6 +297,12 @@ func (m *Manifest) PrepareDownload() error {
}
break
}
if chunkI >= len(m.Chunks) {
log.Errorf("file is larger than the number of chunks assigned. Assuming manifest was corrupted.")
return fmt.Errorf("file is larger than the number of chunks assigned. Assuming manifest was corrupted")
}
hash := sha512.New()
hash.Write(buf[0:n])
chunkHash := hash.Sum(nil)

View File

@ -57,7 +57,7 @@ func TestFileSharing(t *testing.T) {
os.RemoveAll("tordir")
os.RemoveAll("./download_dir")
log.SetLevel(log.LevelDebug)
log.SetLevel(log.LevelInfo)
os.Mkdir("tordir", 0700)
dataDir := path.Join("tordir", "tor")