Enable a SendPeerMessage EngineHook for Fuzzbot
This commit is contained in:
parent
02fe9323c4
commit
5f448ac2c2
|
@ -31,4 +31,6 @@ testing/encryptedstorage/tordir
|
||||||
*.tar.gz
|
*.tar.gz
|
||||||
data-dir-cwtchtool/
|
data-dir-cwtchtool/
|
||||||
tokens
|
tokens
|
||||||
tordir/
|
tordir/
|
||||||
|
testing/autodownload/download_dir
|
||||||
|
testing/autodownload/storage
|
15
app/app.go
15
app/app.go
|
@ -29,9 +29,10 @@ type application struct {
|
||||||
acn connectivity.ACN
|
acn connectivity.ACN
|
||||||
plugins sync.Map //map[string] []plugins.Plugin
|
plugins sync.Map //map[string] []plugins.Plugin
|
||||||
|
|
||||||
engines map[string]connections.Engine
|
engines map[string]connections.Engine
|
||||||
appBus event.Manager
|
appBus event.Manager
|
||||||
appmutex sync.Mutex
|
appmutex sync.Mutex
|
||||||
|
engineHooks connections.EngineHooks
|
||||||
|
|
||||||
settings *settings.GlobalSettingsFile
|
settings *settings.GlobalSettingsFile
|
||||||
}
|
}
|
||||||
|
@ -94,11 +95,11 @@ func LoadAppSettings(appDirectory string) *settings.GlobalSettingsFile {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
||||||
func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.GlobalSettingsFile) Application {
|
func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.GlobalSettingsFile, engineHooks connections.EngineHooks) Application {
|
||||||
|
|
||||||
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings}
|
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings}
|
||||||
app.peers = make(map[string]peer.CwtchPeer)
|
app.peers = make(map[string]peer.CwtchPeer)
|
||||||
|
app.engineHooks = engineHooks
|
||||||
app.acn = acn
|
app.acn = acn
|
||||||
statusHandler := app.getACNStatusHandler()
|
statusHandler := app.getACNStatusHandler()
|
||||||
acn.SetStatusCallback(statusHandler)
|
acn.SetStatusCallback(statusHandler)
|
||||||
|
@ -373,7 +374,7 @@ func (app *application) ActivateEngines(doListen, doPeers, doServers bool) {
|
||||||
log.Debugf("ActivateEngines")
|
log.Debugf("ActivateEngines")
|
||||||
|
|
||||||
for _, profile := range app.peers {
|
for _, profile := range app.peers {
|
||||||
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
|
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
|
||||||
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
|
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
|
||||||
}
|
}
|
||||||
app.QueryACNStatus()
|
app.QueryACNStatus()
|
||||||
|
@ -398,7 +399,7 @@ func (app *application) ActivatePeerEngine(onion string) {
|
||||||
profile := app.GetPeer(onion)
|
profile := app.GetPeer(onion)
|
||||||
if profile != nil {
|
if profile != nil {
|
||||||
if _, exists := app.engines[onion]; !exists {
|
if _, exists := app.engines[onion]; !exists {
|
||||||
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
|
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
|
||||||
|
|
||||||
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
|
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
|
||||||
app.QueryACNStatus()
|
app.QueryACNStatus()
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
package app
|
||||||
|
|
||||||
|
import "git.openprivacy.ca/cwtch.im/tapir"
|
||||||
|
|
||||||
|
type DefaultEngineHooks struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (deh DefaultEngineHooks) SendPeerMessage(connection tapir.Connection, message []byte) error {
|
||||||
|
return connection.Send(message)
|
||||||
|
}
|
|
@ -293,7 +293,7 @@ func (cp *cwtchPeer) ChangePassword(password string, newpassword string, newpass
|
||||||
|
|
||||||
// GenerateProtocolEngine
|
// GenerateProtocolEngine
|
||||||
// Status: New in 1.5
|
// Status: New in 1.5
|
||||||
func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) (connections.Engine, error) {
|
func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager, engineHooks connections.EngineHooks) (connections.Engine, error) {
|
||||||
cp.mutex.Lock()
|
cp.mutex.Lock()
|
||||||
defer cp.mutex.Unlock()
|
defer cp.mutex.Unlock()
|
||||||
conversations, _ := cp.storage.FetchConversations()
|
conversations, _ := cp.storage.FetchConversations()
|
||||||
|
@ -323,7 +323,7 @@ func (cp *cwtchPeer) GenerateProtocolEngine(acn connectivity.ACN, bus event.Mana
|
||||||
|
|
||||||
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
|
identity := primitives.InitializeIdentity("", (*ed25519.PrivateKey)(&privateKey), (*ed25519.PublicKey)(&publicKey))
|
||||||
|
|
||||||
return connections.NewProtocolEngine(identity, privateKey, acn, bus, authorizations), nil
|
return connections.NewProtocolEngine(identity, privateKey, acn, bus, authorizations, engineHooks), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendScopedZonedGetValToContact
|
// SendScopedZonedGetValToContact
|
||||||
|
|
|
@ -69,7 +69,7 @@ type CwtchPeer interface {
|
||||||
// most functions
|
// most functions
|
||||||
Init(event.Manager)
|
Init(event.Manager)
|
||||||
|
|
||||||
GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager) (connections.Engine, error)
|
GenerateProtocolEngine(acn connectivity.ACN, bus event.Manager, engineHooks connections.EngineHooks) (connections.Engine, error)
|
||||||
|
|
||||||
AutoHandleEvents(events []event.Type)
|
AutoHandleEvents(events []event.Type)
|
||||||
Listen()
|
Listen()
|
||||||
|
|
|
@ -47,7 +47,7 @@ func createKey(password string, salt []byte) [32]byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
func initV2Directory(directory, password string) ([32]byte, [128]byte, error) {
|
func initV2Directory(directory, password string) ([32]byte, [128]byte, error) {
|
||||||
os.Mkdir(directory, 0700)
|
os.MkdirAll(directory, 0700)
|
||||||
|
|
||||||
key, salt, err := CreateKeySalt(password)
|
key, salt, err := CreateKeySalt(password)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -72,7 +72,8 @@ type engine struct {
|
||||||
|
|
||||||
tokenManagers sync.Map // [tokenService][]TokenManager
|
tokenManagers sync.Map // [tokenService][]TokenManager
|
||||||
|
|
||||||
shuttingDown atomic.Bool
|
shuttingDown atomic.Bool
|
||||||
|
onSendMessage func(connection tapir.Connection, message []byte) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
|
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
|
||||||
|
@ -85,13 +86,21 @@ type Engine interface {
|
||||||
Shutdown()
|
Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type EngineHooks interface {
|
||||||
|
SendPeerMessage(connection tapir.Connection, message []byte) error
|
||||||
|
}
|
||||||
|
|
||||||
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
|
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
|
||||||
func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, peerAuthorizations map[string]model.Authorization) Engine {
|
func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager event.Manager, peerAuthorizations map[string]model.Authorization, engineHooks EngineHooks) Engine {
|
||||||
engine := new(engine)
|
engine := new(engine)
|
||||||
engine.identity = identity
|
engine.identity = identity
|
||||||
engine.privateKey = privateKey
|
engine.privateKey = privateKey
|
||||||
engine.ephemeralServices = make(map[string]*connectionLockedService)
|
engine.ephemeralServices = make(map[string]*connectionLockedService)
|
||||||
engine.queue = event.NewQueue()
|
engine.queue = event.NewQueue()
|
||||||
|
|
||||||
|
// the standard send message function
|
||||||
|
engine.onSendMessage = engineHooks.SendPeerMessage
|
||||||
|
|
||||||
go engine.eventHandler()
|
go engine.eventHandler()
|
||||||
|
|
||||||
engine.acn = acn
|
engine.acn = acn
|
||||||
|
@ -320,6 +329,7 @@ func (e *engine) createPeerTemplate() *PeerApp {
|
||||||
peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
|
peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
|
||||||
peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
|
peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
|
||||||
peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
|
peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
|
||||||
|
peerAppTemplate.OnSendMessage = e.onSendMessage
|
||||||
return peerAppTemplate
|
return peerAppTemplate
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@ type PeerApp struct {
|
||||||
OnAuth func(string)
|
OnAuth func(string)
|
||||||
OnClose func(string)
|
OnClose func(string)
|
||||||
OnConnecting func(string)
|
OnConnecting func(string)
|
||||||
|
OnSendMessage func(connection tapir.Connection, message []byte) error
|
||||||
version atomic.Value
|
version atomic.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,6 +49,7 @@ func (pa *PeerApp) NewInstance() tapir.Application {
|
||||||
newApp.OnAuth = pa.OnAuth
|
newApp.OnAuth = pa.OnAuth
|
||||||
newApp.OnClose = pa.OnClose
|
newApp.OnClose = pa.OnClose
|
||||||
newApp.OnConnecting = pa.OnConnecting
|
newApp.OnConnecting = pa.OnConnecting
|
||||||
|
newApp.OnSendMessage = pa.OnSendMessage
|
||||||
newApp.version.Store(Version1)
|
newApp.version.Store(Version1)
|
||||||
return newApp
|
return newApp
|
||||||
}
|
}
|
||||||
|
@ -154,7 +156,7 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = pa.connection.Send(serialized)
|
err = pa.OnSendMessage(pa.connection, serialized)
|
||||||
|
|
||||||
// at this point we have tried to send a message to a peer only to find that something went wrong.
|
// at this point we have tried to send a message to a peer only to find that something went wrong.
|
||||||
// we don't know *what* went wrong - the most likely explanation is the peer went offline in the time between
|
// we don't know *what* went wrong - the most likely explanation is the peer went offline in the time between
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -237,6 +238,9 @@ func (m *Manifest) PrepareDownload() error {
|
||||||
defer m.lock.Unlock()
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
m.chunkComplete = make([]bool, len(m.Chunks))
|
m.chunkComplete = make([]bool, len(m.Chunks))
|
||||||
|
if m.ChunkSizeInBytes == 0 || m.FileSizeInBytes == 0 {
|
||||||
|
return fmt.Errorf("manifest is invalid")
|
||||||
|
}
|
||||||
|
|
||||||
if info, err := os.Stat(m.FileName); os.IsNotExist(err) {
|
if info, err := os.Stat(m.FileName); os.IsNotExist(err) {
|
||||||
useFileName := m.FileName
|
useFileName := m.FileName
|
||||||
|
@ -293,6 +297,12 @@ func (m *Manifest) PrepareDownload() error {
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if chunkI >= len(m.Chunks) {
|
||||||
|
log.Errorf("file is larger than the number of chunks assigned. Assuming manifest was corrupted.")
|
||||||
|
return fmt.Errorf("file is larger than the number of chunks assigned. Assuming manifest was corrupted")
|
||||||
|
}
|
||||||
|
|
||||||
hash := sha512.New()
|
hash := sha512.New()
|
||||||
hash.Write(buf[0:n])
|
hash.Write(buf[0:n])
|
||||||
chunkHash := hash.Sum(nil)
|
chunkHash := hash.Sum(nil)
|
||||||
|
|
|
@ -57,7 +57,7 @@ func TestFileSharing(t *testing.T) {
|
||||||
os.RemoveAll("tordir")
|
os.RemoveAll("tordir")
|
||||||
os.RemoveAll("./download_dir")
|
os.RemoveAll("./download_dir")
|
||||||
|
|
||||||
log.SetLevel(log.LevelDebug)
|
log.SetLevel(log.LevelInfo)
|
||||||
|
|
||||||
os.Mkdir("tordir", 0700)
|
os.Mkdir("tordir", 0700)
|
||||||
dataDir := path.Join("tordir", "tor")
|
dataDir := path.Join("tordir", "tor")
|
||||||
|
@ -97,7 +97,7 @@ func TestFileSharing(t *testing.T) {
|
||||||
acn.WaitTillBootstrapped()
|
acn.WaitTillBootstrapped()
|
||||||
defer acn.Close()
|
defer acn.Close()
|
||||||
|
|
||||||
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"))
|
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"), app2.DefaultEngineHooks{})
|
||||||
|
|
||||||
usr, _ := user.Current()
|
usr, _ := user.Current()
|
||||||
cwtchDir := path.Join(usr.HomeDir, ".cwtch")
|
cwtchDir := path.Join(usr.HomeDir, ".cwtch")
|
||||||
|
|
|
@ -139,7 +139,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
const ServerAddr = "nfhxzvzxinripgdh4t2m4xcy3crf6p4cbhectgckuj3idsjsaotgowad"
|
const ServerAddr = "nfhxzvzxinripgdh4t2m4xcy3crf6p4cbhectgckuj3idsjsaotgowad"
|
||||||
serverKeyBundle, _ := base64.StdEncoding.DecodeString(ServerKeyBundleBase64)
|
serverKeyBundle, _ := base64.StdEncoding.DecodeString(ServerKeyBundleBase64)
|
||||||
|
|
||||||
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"))
|
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"), app2.DefaultEngineHooks{})
|
||||||
|
|
||||||
usr, _ := user.Current()
|
usr, _ := user.Current()
|
||||||
cwtchDir := path.Join(usr.HomeDir, ".cwtch")
|
cwtchDir := path.Join(usr.HomeDir, ".cwtch")
|
||||||
|
|
|
@ -59,7 +59,7 @@ func TestEncryptedStorage(t *testing.T) {
|
||||||
|
|
||||||
defer acn.Close()
|
defer acn.Close()
|
||||||
acn.WaitTillBootstrapped()
|
acn.WaitTillBootstrapped()
|
||||||
app := app2.NewApp(acn, cwtchDir, app2.LoadAppSettings(cwtchDir))
|
app := app2.NewApp(acn, cwtchDir, app2.LoadAppSettings(cwtchDir), app2.DefaultEngineHooks{})
|
||||||
app.CreateProfile("alice", "password", true)
|
app.CreateProfile("alice", "password", true)
|
||||||
app.CreateProfile("bob", "password", true)
|
app.CreateProfile("bob", "password", true)
|
||||||
|
|
||||||
|
|
|
@ -97,7 +97,7 @@ func TestFileSharing(t *testing.T) {
|
||||||
acn.WaitTillBootstrapped()
|
acn.WaitTillBootstrapped()
|
||||||
defer acn.Close()
|
defer acn.Close()
|
||||||
|
|
||||||
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"))
|
app := app2.NewApp(acn, "./storage", app2.LoadAppSettings("./storage"), app2.DefaultEngineHooks{})
|
||||||
|
|
||||||
usr, _ := user.Current()
|
usr, _ := user.Current()
|
||||||
cwtchDir := path.Join(usr.HomeDir, ".cwtch")
|
cwtchDir := path.Join(usr.HomeDir, ".cwtch")
|
||||||
|
|
Loading…
Reference in New Issue