Small Code Fixups
This commit is contained in:
parent
28ddbcc132
commit
f3296ffdd9
43
app/app.go
43
app/app.go
|
@ -37,9 +37,9 @@ type application struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *application) IsFeatureEnabled(experiment string) bool {
|
func (app *application) IsFeatureEnabled(experiment string) bool {
|
||||||
settings := app.ReadSettings()
|
globalSettings := app.ReadSettings()
|
||||||
if settings.ExperimentsEnabled {
|
if globalSettings.ExperimentsEnabled {
|
||||||
if status, exists := settings.Experiments[experiment]; exists {
|
if status, exists := globalSettings.Experiments[experiment]; exists {
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,11 +86,11 @@ func LoadAppSettings(appDirectory string) *settings.GlobalSettingsFile {
|
||||||
// Note: we basically presume this doesn't fail. If the file doesn't exist we create it, and as such the
|
// Note: we basically presume this doesn't fail. If the file doesn't exist we create it, and as such the
|
||||||
// only plausible error conditions are related to file create e.g. low disk space. If that is the case then
|
// only plausible error conditions are related to file create e.g. low disk space. If that is the case then
|
||||||
// many other parts of Cwtch are likely to fail also.
|
// many other parts of Cwtch are likely to fail also.
|
||||||
settings, err := settings.InitGlobalSettingsFile(appDirectory, DefactoPasswordForUnencryptedProfiles)
|
globalSettingsFile, err := settings.InitGlobalSettingsFile(appDirectory, DefactoPasswordForUnencryptedProfiles)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error initializing global settings file %. Global settings might not be loaded or saves", err)
|
log.Errorf("error initializing global globalSettingsFile file %. Global globalSettingsFile might not be loaded or saves", err)
|
||||||
}
|
}
|
||||||
return settings
|
return globalSettingsFile
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
||||||
|
@ -153,18 +153,18 @@ func (app *application) ListProfiles() []string {
|
||||||
|
|
||||||
// GetPeer returns a cwtchPeer for a given onion address
|
// GetPeer returns a cwtchPeer for a given onion address
|
||||||
func (app *application) GetPeer(onion string) peer.CwtchPeer {
|
func (app *application) GetPeer(onion string) peer.CwtchPeer {
|
||||||
if peer, ok := app.peers[onion]; ok {
|
if profile, ok := app.peers[onion]; ok {
|
||||||
return peer
|
return profile
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) {
|
func (app *application) AddPlugin(peerid string, id plugins.PluginID, bus event.Manager, acn connectivity.ACN) {
|
||||||
if _, exists := ap.plugins.Load(peerid); !exists {
|
if _, exists := app.plugins.Load(peerid); !exists {
|
||||||
ap.plugins.Store(peerid, []plugins.Plugin{})
|
app.plugins.Store(peerid, []plugins.Plugin{})
|
||||||
}
|
}
|
||||||
|
|
||||||
pluginsinf, _ := ap.plugins.Load(peerid)
|
pluginsinf, _ := app.plugins.Load(peerid)
|
||||||
peerPlugins := pluginsinf.([]plugins.Plugin)
|
peerPlugins := pluginsinf.([]plugins.Plugin)
|
||||||
|
|
||||||
for _, plugin := range peerPlugins {
|
for _, plugin := range peerPlugins {
|
||||||
|
@ -179,7 +179,7 @@ func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.M
|
||||||
newp.Start()
|
newp.Start()
|
||||||
peerPlugins = append(peerPlugins, newp)
|
peerPlugins = append(peerPlugins, newp)
|
||||||
log.Debugf("storing plugin for %v %v", peerid, peerPlugins)
|
log.Debugf("storing plugin for %v %v", peerid, peerPlugins)
|
||||||
ap.plugins.Store(peerid, peerPlugins)
|
app.plugins.Store(peerid, peerPlugins)
|
||||||
} else {
|
} else {
|
||||||
log.Errorf("error adding plugin: %v", err)
|
log.Errorf("error adding plugin: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -201,11 +201,6 @@ func (app *application) CreateProfile(name string, password string, autostart bo
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deprecated in 1.10
|
|
||||||
func (app *application) CreateTaggedPeer(name string, password string, tag string) {
|
|
||||||
app.CreatePeer(name, password, map[attr.ZonedPath]string{attr.ProfileZone.ConstructZonedPath(constants.Tag): tag})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *application) setupPeer(profile peer.CwtchPeer) {
|
func (app *application) setupPeer(profile peer.CwtchPeer) {
|
||||||
eventBus := event.NewEventManager()
|
eventBus := event.NewEventManager()
|
||||||
app.eventBuses[profile.GetOnion()] = eventBus
|
app.eventBuses[profile.GetOnion()] = eventBus
|
||||||
|
@ -215,8 +210,8 @@ func (app *application) setupPeer(profile peer.CwtchPeer) {
|
||||||
profile.Init(app.eventBuses[profile.GetOnion()])
|
profile.Init(app.eventBuses[profile.GetOnion()])
|
||||||
|
|
||||||
// Update the Peer with the Most Recent Experiment State...
|
// Update the Peer with the Most Recent Experiment State...
|
||||||
settings := app.settings.ReadGlobalSettings()
|
globalSettings := app.settings.ReadGlobalSettings()
|
||||||
profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments)
|
profile.UpdateExperiments(globalSettings.ExperimentsEnabled, globalSettings.Experiments)
|
||||||
app.registerHooks(profile)
|
app.registerHooks(profile)
|
||||||
|
|
||||||
// Register the Peer With Application Plugins..
|
// Register the Peer With Application Plugins..
|
||||||
|
@ -350,7 +345,7 @@ func (app *application) LoadProfiles(password string) {
|
||||||
func (app *application) registerHooks(profile peer.CwtchPeer) {
|
func (app *application) registerHooks(profile peer.CwtchPeer) {
|
||||||
// Register Hooks
|
// Register Hooks
|
||||||
profile.RegisterHook(extensions.ProfileValueExtension{})
|
profile.RegisterHook(extensions.ProfileValueExtension{})
|
||||||
profile.RegisterHook(filesharing.Functionality{})
|
profile.RegisterHook(new(filesharing.Functionality))
|
||||||
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
|
profile.RegisterHook(new(filesharing.ImagePreviewsFunctionality))
|
||||||
// Ensure that Profiles have the Most Up to Date Settings...
|
// Ensure that Profiles have the Most Up to Date Settings...
|
||||||
profile.NotifySettingsUpdate(app.settings.ReadGlobalSettings())
|
profile.NotifySettingsUpdate(app.settings.ReadGlobalSettings())
|
||||||
|
@ -398,7 +393,7 @@ func (app *application) ActivateEngines(doListen, doPeers, doServers bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
|
// ActivatePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
|
||||||
func (app *application) ActivatePeerEngine(onion string) {
|
func (app *application) ActivatePeerEngine(onion string) {
|
||||||
profile := app.GetPeer(onion)
|
profile := app.GetPeer(onion)
|
||||||
if profile != nil {
|
if profile != nil {
|
||||||
|
@ -487,8 +482,8 @@ func (app *application) shutdownPeer(onion string) {
|
||||||
log.Debugf("shutting down plugins for %v", onion)
|
log.Debugf("shutting down plugins for %v", onion)
|
||||||
pluginsI, ok := app.plugins.Load(onion)
|
pluginsI, ok := app.plugins.Load(onion)
|
||||||
if ok {
|
if ok {
|
||||||
plugins := pluginsI.([]plugins.Plugin)
|
appPlugins := pluginsI.([]plugins.Plugin)
|
||||||
for _, plugin := range plugins {
|
for _, plugin := range appPlugins {
|
||||||
plugin.Shutdown()
|
plugin.Shutdown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package app
|
package app
|
||||||
|
|
||||||
// We offer "un-passworded" profiles but our storage encrypts everything with a password. We need an agreed upon
|
// DefactoPasswordForUnencryptedProfiles is used to offer "un-passworded" profiles. Our storage encrypts everything with a password. We need an agreed upon
|
||||||
// password to use in that case, that the app case use behind the scenes to password and unlock with
|
// password to use in that case, that the app case use behind the scenes to password and unlock with
|
||||||
// https://docs.openprivacy.ca/cwtch-security-handbook/profile_encryption_and_storage.html
|
// https://docs.openprivacy.ca/cwtch-security-handbook/profile_encryption_and_storage.html
|
||||||
const DefactoPasswordForUnencryptedProfiles = "be gay do crime"
|
const DefactoPasswordForUnencryptedProfiles = "be gay do crime"
|
||||||
|
|
|
@ -18,11 +18,11 @@ func (a *antispam) Start() {
|
||||||
go a.run()
|
go a.run()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cr *antispam) Id() PluginID {
|
func (a *antispam) Id() PluginID {
|
||||||
return ANTISPAM
|
return ANTISPAM
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a antispam) Shutdown() {
|
func (a *antispam) Shutdown() {
|
||||||
a.breakChan <- true
|
a.breakChan <- true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
const tickTimeSec = 30
|
const tickTimeSec = 30
|
||||||
const tickTime = tickTimeSec * time.Second
|
const tickTime = tickTimeSec * time.Second
|
||||||
|
|
||||||
const circutTimeoutSecs int = 120
|
const circuitTimeoutSecs int = 120
|
||||||
|
|
||||||
const MaxBaseTimeoutSec = 5 * 60 // a max base time out of 5 min
|
const MaxBaseTimeoutSec = 5 * 60 // a max base time out of 5 min
|
||||||
const maxFailedBackoff = 6 // 2^6 = 64 -> 64 * [2m to 5m] = 2h8m to 5h20m
|
const maxFailedBackoff = 6 // 2^6 = 64 -> 64 * [2m to 5m] = 2h8m to 5h20m
|
||||||
|
@ -299,12 +299,12 @@ func (cr *contactRetry) requeueReady() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
retryable := []*contact{}
|
var retryable []*contact
|
||||||
|
|
||||||
throughPutPerMin := cr.maxTorCircuitsPending() / (circutTimeoutSecs / 60)
|
throughPutPerMin := cr.maxTorCircuitsPending() / (circuitTimeoutSecs / 60)
|
||||||
adjustedBaseTimeout := cr.connCount / throughPutPerMin * 60
|
adjustedBaseTimeout := cr.connCount / throughPutPerMin * 60
|
||||||
if adjustedBaseTimeout < circutTimeoutSecs {
|
if adjustedBaseTimeout < circuitTimeoutSecs {
|
||||||
adjustedBaseTimeout = circutTimeoutSecs
|
adjustedBaseTimeout = circuitTimeoutSecs
|
||||||
} else if adjustedBaseTimeout > MaxBaseTimeoutSec {
|
} else if adjustedBaseTimeout > MaxBaseTimeoutSec {
|
||||||
adjustedBaseTimeout = MaxBaseTimeoutSec
|
adjustedBaseTimeout = MaxBaseTimeoutSec
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,28 +14,28 @@ type heartbeat struct {
|
||||||
breakChan chan bool
|
breakChan chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *heartbeat) Start() {
|
func (hb *heartbeat) Start() {
|
||||||
go a.run()
|
go hb.run()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cr *heartbeat) Id() PluginID {
|
func (hb *heartbeat) Id() PluginID {
|
||||||
return HEARTBEAT
|
return HEARTBEAT
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a heartbeat) Shutdown() {
|
func (hb *heartbeat) Shutdown() {
|
||||||
a.breakChan <- true
|
hb.breakChan <- true
|
||||||
a.queue.Shutdown()
|
hb.queue.Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *heartbeat) run() {
|
func (hb *heartbeat) run() {
|
||||||
log.Debugf("running heartbeat trigger plugin")
|
log.Debugf("running heartbeat trigger plugin")
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-time.After(heartbeatTickTime):
|
case <-time.After(heartbeatTickTime):
|
||||||
// no fuss, just trigger the beat.
|
// no fuss, just trigger the beat.
|
||||||
a.bus.Publish(event.NewEvent(event.Heartbeat, map[event.Field]string{}))
|
hb.bus.Publish(event.NewEvent(event.Heartbeat, map[event.Field]string{}))
|
||||||
continue
|
continue
|
||||||
case <-a.breakChan:
|
case <-hb.breakChan:
|
||||||
log.Debugf("shutting down heartbeat plugin")
|
log.Debugf("shutting down heartbeat plugin")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ func (nc *networkCheck) Start() {
|
||||||
go nc.run()
|
go nc.run()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cr *networkCheck) Id() PluginID {
|
func (nc *networkCheck) Id() PluginID {
|
||||||
return NETWORKCHECK
|
return NETWORKCHECK
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,8 +126,8 @@ func (nc *networkCheck) selfTest() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nc *networkCheck) checkConnection(onion string) {
|
func (nc *networkCheck) checkConnection(onion string) {
|
||||||
prog, _ := nc.acn.GetBootstrapStatus()
|
progress, _ := nc.acn.GetBootstrapStatus()
|
||||||
if prog != 100 {
|
if progress != 100 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -137,7 +137,7 @@ func (nc *networkCheck) checkConnection(onion string) {
|
||||||
err := ClientTimeout.ExecuteAction(func() error {
|
err := ClientTimeout.ExecuteAction(func() error {
|
||||||
conn, _, err := nc.acn.Open(onion)
|
conn, _, err := nc.acn.Open(onion)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
conn.Close()
|
_ = conn.Close()
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
|
|
@ -35,7 +35,7 @@ func (iq *queue) OutChan() <-chan Event {
|
||||||
return iq.infChan.Out()
|
return iq.infChan.Out()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Out returns the next available event from the front of the queue
|
// Next returns the next available event from the front of the queue
|
||||||
func (iq *queue) Next() Event {
|
func (iq *queue) Next() Event {
|
||||||
event := <-iq.infChan.Out()
|
event := <-iq.infChan.Out()
|
||||||
return event
|
return event
|
||||||
|
|
|
@ -22,7 +22,7 @@ type Event struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRandNumber is a helper function which returns a random integer, this is
|
// GetRandNumber is a helper function which returns a random integer, this is
|
||||||
// currently mostly used to generate messageids
|
// currently mostly used to generate message IDs
|
||||||
func GetRandNumber() *big.Int {
|
func GetRandNumber() *big.Int {
|
||||||
num, err := rand.Int(rand.Reader, big.NewInt(math.MaxUint32))
|
num, err := rand.Int(rand.Reader, big.NewInt(math.MaxUint32))
|
||||||
// If we can't generate random numbers then panicking is probably
|
// If we can't generate random numbers then panicking is probably
|
||||||
|
@ -129,7 +129,7 @@ func (em *manager) eventBus() {
|
||||||
for {
|
for {
|
||||||
eventJSON := <-em.events
|
eventJSON := <-em.events
|
||||||
|
|
||||||
// In the case on an empty event. Teardown the Queue
|
// In the case on an empty event. Tear down the Queue
|
||||||
if len(eventJSON) == 0 {
|
if len(eventJSON) == 0 {
|
||||||
log.Errorf("Received zero length event")
|
log.Errorf("Received zero length event")
|
||||||
break
|
break
|
||||||
|
@ -151,7 +151,10 @@ func (em *manager) eventBus() {
|
||||||
for _, subscriber := range subscribers {
|
for _, subscriber := range subscribers {
|
||||||
// Deep Copy for Each Subscriber
|
// Deep Copy for Each Subscriber
|
||||||
var eventCopy Event
|
var eventCopy Event
|
||||||
json.Unmarshal(eventJSON, &eventCopy)
|
err = json.Unmarshal(eventJSON, &eventCopy)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error unmarshalling event: %v ", err)
|
||||||
|
}
|
||||||
subscriber.Publish(eventCopy)
|
subscriber.Publish(eventCopy)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ func TestEventManagerMultiple(t *testing.T) {
|
||||||
eventManager.Publish(Event{EventType: "GroupEvent", Data: map[Field]string{"Value": "Hello World Group"}})
|
eventManager.Publish(Event{EventType: "GroupEvent", Data: map[Field]string{"Value": "Hello World Group"}})
|
||||||
eventManager.Publish(Event{EventType: "PeerEvent", Data: map[Field]string{"Value": "Hello World Peer"}})
|
eventManager.Publish(Event{EventType: "PeerEvent", Data: map[Field]string{"Value": "Hello World Peer"}})
|
||||||
eventManager.Publish(Event{EventType: "ErrorEvent", Data: map[Field]string{"Value": "Hello World Error"}})
|
eventManager.Publish(Event{EventType: "ErrorEvent", Data: map[Field]string{"Value": "Hello World Error"}})
|
||||||
eventManager.Publish(Event{EventType: "NobodyIsSubscribedToThisEvent", Data: map[Field]string{"Value": "Noone should see this!"}})
|
eventManager.Publish(Event{EventType: "NobodyIsSubscribedToThisEvent", Data: map[Field]string{"Value": "No one should see this!"}})
|
||||||
|
|
||||||
assertLength := func(len int, expected int, label string) {
|
assertLength := func(len int, expected int, label string) {
|
||||||
if len != expected {
|
if len != expected {
|
||||||
|
|
|
@ -19,7 +19,7 @@ func newInfiniteChannel() *infiniteChannel {
|
||||||
input: make(chan Event),
|
input: make(chan Event),
|
||||||
output: make(chan Event),
|
output: make(chan Event),
|
||||||
length: make(chan int),
|
length: make(chan int),
|
||||||
buffer: newInfinitQueue(),
|
buffer: newInfiniteQueue(),
|
||||||
}
|
}
|
||||||
go ch.infiniteBuffer()
|
go ch.infiniteBuffer()
|
||||||
return ch
|
return ch
|
||||||
|
|
|
@ -24,7 +24,7 @@ type infiniteQueue struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New constructs and returns a new Queue.
|
// New constructs and returns a new Queue.
|
||||||
func newInfinitQueue() *infiniteQueue {
|
func newInfiniteQueue() *infiniteQueue {
|
||||||
return &infiniteQueue{
|
return &infiniteQueue{
|
||||||
buf: make([]Event, minQueueLen),
|
buf: make([]Event, minQueueLen),
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ import (
|
||||||
type ProfileValueExtension struct {
|
type ProfileValueExtension struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pne ProfileValueExtension) NotifySettingsUpdate(settings settings.GlobalSettings) {
|
func (pne ProfileValueExtension) NotifySettingsUpdate(_ settings.GlobalSettings) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pne ProfileValueExtension) EventsToRegister() []event.Type {
|
func (pne ProfileValueExtension) EventsToRegister() []event.Type {
|
||||||
|
|
|
@ -30,19 +30,19 @@ import (
|
||||||
type Functionality struct {
|
type Functionality struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f Functionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
|
func (f *Functionality) NotifySettingsUpdate(settings settings.GlobalSettings) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f Functionality) EventsToRegister() []event.Type {
|
func (f *Functionality) EventsToRegister() []event.Type {
|
||||||
return []event.Type{event.ProtocolEngineCreated, event.ManifestReceived, event.FileDownloaded}
|
return []event.Type{event.ProtocolEngineCreated, event.ManifestReceived, event.FileDownloaded}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f Functionality) ExperimentsToRegister() []string {
|
func (f *Functionality) ExperimentsToRegister() []string {
|
||||||
return []string{constants.FileSharingExperiment}
|
return []string{constants.FileSharingExperiment}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
|
// OnEvent handles File Sharing Hooks like Manifest Received and FileDownloaded
|
||||||
func (f Functionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
|
func (f *Functionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
|
||||||
if profile.IsFeatureEnabled(constants.FileSharingExperiment) {
|
if profile.IsFeatureEnabled(constants.FileSharingExperiment) {
|
||||||
switch ev.EventType {
|
switch ev.EventType {
|
||||||
case event.ManifestReceived:
|
case event.ManifestReceived:
|
||||||
|
@ -114,11 +114,11 @@ func (f Functionality) OnEvent(ev event.Event, profile peer.CwtchPeer) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f Functionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
|
func (f *Functionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
|
||||||
// nop
|
// nop
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f Functionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
|
func (f *Functionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
|
||||||
// Profile should not call us if FileSharing is disabled
|
// Profile should not call us if FileSharing is disabled
|
||||||
if profile.IsFeatureEnabled(constants.FileSharingExperiment) {
|
if profile.IsFeatureEnabled(constants.FileSharingExperiment) {
|
||||||
scope, zone, zpath := path.GetScopeZonePath()
|
scope, zone, zpath := path.GetScopeZonePath()
|
||||||
|
@ -481,7 +481,7 @@ func (f *Functionality) EnhancedGetSharedFiles(profile peer.CwtchPeer, conversat
|
||||||
|
|
||||||
// GetSharedFiles returns all file shares associated with a given conversation
|
// GetSharedFiles returns all file shares associated with a given conversation
|
||||||
func (f *Functionality) GetSharedFiles(profile peer.CwtchPeer, conversationID int) []SharedFile {
|
func (f *Functionality) GetSharedFiles(profile peer.CwtchPeer, conversationID int) []SharedFile {
|
||||||
sharedFiles := []SharedFile{}
|
var sharedFiles []SharedFile
|
||||||
ci, err := profile.GetConversationInfo(conversationID)
|
ci, err := profile.GetConversationInfo(conversationID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for k := range ci.Attributes {
|
for k := range ci.Attributes {
|
||||||
|
|
|
@ -24,11 +24,11 @@ func (i *ImagePreviewsFunctionality) NotifySettingsUpdate(settings settings.Glob
|
||||||
i.downloadFolder = settings.DownloadPath
|
i.downloadFolder = settings.DownloadPath
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ImagePreviewsFunctionality) EventsToRegister() []event.Type {
|
func (i *ImagePreviewsFunctionality) EventsToRegister() []event.Type {
|
||||||
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup, event.PeerStateChange, event.Heartbeat}
|
return []event.Type{event.ProtocolEngineCreated, event.NewMessageFromPeer, event.NewMessageFromGroup, event.PeerStateChange, event.Heartbeat}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ImagePreviewsFunctionality) ExperimentsToRegister() []string {
|
func (i *ImagePreviewsFunctionality) ExperimentsToRegister() []string {
|
||||||
return []string{constants.FileSharingExperiment, constants.ImagePreviewsExperiment}
|
return []string{constants.FileSharingExperiment, constants.ImagePreviewsExperiment}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP
|
||||||
case event.PeerStateChange:
|
case event.PeerStateChange:
|
||||||
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
|
ci, err := profile.FetchConversationInfo(ev.Data["RemotePeer"])
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// if we have re-authenticated with thie peer then request their profile image...
|
// if we have re-authenticated with this peer then request their profile image...
|
||||||
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
|
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
|
||||||
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
|
profile.SendScopedZonedGetValToContact(ci.ID, attr.PublicScope, attr.ProfileZone, constants.CustomProfileImageKey)
|
||||||
}
|
}
|
||||||
|
@ -78,12 +78,12 @@ func (i *ImagePreviewsFunctionality) OnEvent(ev event.Event, profile peer.CwtchP
|
||||||
}
|
}
|
||||||
// If file sharing is enabled then reshare all active files...
|
// If file sharing is enabled then reshare all active files...
|
||||||
fsf := FunctionalityGate()
|
fsf := FunctionalityGate()
|
||||||
fsf.ReShareFiles(profile)
|
_ = fsf.ReShareFiles(profile)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ImagePreviewsFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
|
func (i *ImagePreviewsFunctionality) OnContactRequestValue(profile peer.CwtchPeer, conversation model.Conversation, eventID string, path attr.ScopedZonedPath) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *ImagePreviewsFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
|
func (i *ImagePreviewsFunctionality) OnContactReceiveValue(profile peer.CwtchPeer, conversation model.Conversation, path attr.ScopedZonedPath, value string, exists bool) {
|
||||||
|
@ -119,13 +119,13 @@ func (i *ImagePreviewsFunctionality) handleImagePreviews(profile peer.CwtchPeer,
|
||||||
if profile.IsFeatureEnabled(constants.FileSharingExperiment) && profile.IsFeatureEnabled(constants.ImagePreviewsExperiment) {
|
if profile.IsFeatureEnabled(constants.FileSharingExperiment) && profile.IsFeatureEnabled(constants.ImagePreviewsExperiment) {
|
||||||
|
|
||||||
// Short-circuit failures
|
// Short-circuit failures
|
||||||
// Don't autodownload images if the download path does not exist.
|
// Don't auto-download images if the download path does not exist.
|
||||||
if i.downloadFolder == "" {
|
if i.downloadFolder == "" {
|
||||||
log.Errorf("download folder %v is not set", i.downloadFolder)
|
log.Errorf("download folder %v is not set", i.downloadFolder)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't autodownload images if the download path does not exist.
|
// Don't auto-download images if the download path does not exist.
|
||||||
if _, err := os.Stat(i.downloadFolder); os.IsNotExist(err) {
|
if _, err := os.Stat(i.downloadFolder); os.IsNotExist(err) {
|
||||||
log.Errorf("download folder %v does not exist", i.downloadFolder)
|
log.Errorf("download folder %v does not exist", i.downloadFolder)
|
||||||
return
|
return
|
||||||
|
|
|
@ -91,20 +91,20 @@ func (cp *cwtchPeer) EnhancedImportBundle(importString string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *cwtchPeer) EnhancedGetMessages(conversation int, index int, count int) string {
|
func (cp *cwtchPeer) EnhancedGetMessages(conversation int, index int, count int) string {
|
||||||
var emessages []EnhancedMessage = make([]EnhancedMessage, count)
|
var emessages = make([]EnhancedMessage, count)
|
||||||
|
|
||||||
messages, err := cp.GetMostRecentMessages(conversation, 0, index, count)
|
messages, err := cp.GetMostRecentMessages(conversation, 0, index, count)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
||||||
for i, message := range messages {
|
for i, message := range messages {
|
||||||
|
|
||||||
time, _ := time.Parse(time.RFC3339Nano, message.Attr[constants.AttrSentTimestamp])
|
sentTime, _ := time.Parse(time.RFC3339Nano, message.Attr[constants.AttrSentTimestamp])
|
||||||
emessages[i].Message = model.Message{
|
emessages[i].Message = model.Message{
|
||||||
Message: message.Body,
|
Message: message.Body,
|
||||||
Acknowledged: message.Attr[constants.AttrAck] == constants.True,
|
Acknowledged: message.Attr[constants.AttrAck] == constants.True,
|
||||||
Error: message.Attr[constants.AttrErr],
|
Error: message.Attr[constants.AttrErr],
|
||||||
PeerID: message.Attr[constants.AttrAuthor],
|
PeerID: message.Attr[constants.AttrAuthor],
|
||||||
Timestamp: time,
|
Timestamp: sentTime,
|
||||||
}
|
}
|
||||||
emessages[i].ID = message.ID
|
emessages[i].ID = message.ID
|
||||||
emessages[i].Attributes = message.Attr
|
emessages[i].Attributes = message.Attr
|
||||||
|
@ -118,19 +118,19 @@ func (cp *cwtchPeer) EnhancedGetMessages(conversation int, index int, count int)
|
||||||
|
|
||||||
func (cp *cwtchPeer) EnhancedGetMessageById(conversation int, messageID int) string {
|
func (cp *cwtchPeer) EnhancedGetMessageById(conversation int, messageID int) string {
|
||||||
var message EnhancedMessage
|
var message EnhancedMessage
|
||||||
dbmessage, attr, err := cp.GetChannelMessage(conversation, 0, messageID)
|
dbmessage, attributes, err := cp.GetChannelMessage(conversation, 0, messageID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
time, _ := time.Parse(time.RFC3339Nano, attr[constants.AttrSentTimestamp])
|
sentTime, _ := time.Parse(time.RFC3339Nano, attributes[constants.AttrSentTimestamp])
|
||||||
message.Message = model.Message{
|
message.Message = model.Message{
|
||||||
Message: dbmessage,
|
Message: dbmessage,
|
||||||
Acknowledged: attr[constants.AttrAck] == constants.True,
|
Acknowledged: attributes[constants.AttrAck] == constants.True,
|
||||||
Error: attr[constants.AttrErr],
|
Error: attributes[constants.AttrErr],
|
||||||
PeerID: attr[constants.AttrAuthor],
|
PeerID: attributes[constants.AttrAuthor],
|
||||||
Timestamp: time,
|
Timestamp: sentTime,
|
||||||
}
|
}
|
||||||
message.ID = messageID
|
message.ID = messageID
|
||||||
message.Attributes = attr
|
message.Attributes = attributes
|
||||||
message.ContentHash = model.CalculateContentHash(attr[constants.AttrAuthor], dbmessage)
|
message.ContentHash = model.CalculateContentHash(attributes[constants.AttrAuthor], dbmessage)
|
||||||
}
|
}
|
||||||
bytes, _ := json.Marshal(message)
|
bytes, _ := json.Marshal(message)
|
||||||
return string(bytes)
|
return string(bytes)
|
||||||
|
@ -142,13 +142,13 @@ func (cp *cwtchPeer) EnhancedGetMessageByContentHash(conversation int, contentHa
|
||||||
if err == nil {
|
if err == nil {
|
||||||
messages, err := cp.GetMostRecentMessages(conversation, 0, offset, 1)
|
messages, err := cp.GetMostRecentMessages(conversation, 0, offset, 1)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
time, _ := time.Parse(time.RFC3339Nano, messages[0].Attr[constants.AttrSentTimestamp])
|
sentTime, _ := time.Parse(time.RFC3339Nano, messages[0].Attr[constants.AttrSentTimestamp])
|
||||||
message.Message = model.Message{
|
message.Message = model.Message{
|
||||||
Message: messages[0].Body,
|
Message: messages[0].Body,
|
||||||
Acknowledged: messages[0].Attr[constants.AttrAck] == constants.True,
|
Acknowledged: messages[0].Attr[constants.AttrAck] == constants.True,
|
||||||
Error: messages[0].Attr[constants.AttrErr],
|
Error: messages[0].Attr[constants.AttrErr],
|
||||||
PeerID: messages[0].Attr[constants.AttrAuthor],
|
PeerID: messages[0].Attr[constants.AttrAuthor],
|
||||||
Timestamp: time,
|
Timestamp: sentTime,
|
||||||
}
|
}
|
||||||
message.ID = messages[0].ID
|
message.ID = messages[0].ID
|
||||||
message.Attributes = messages[0].Attr
|
message.Attributes = messages[0].Attr
|
||||||
|
@ -211,8 +211,8 @@ func (cp *cwtchPeer) RegisterHook(extension ProfileHooks) {
|
||||||
defer cp.extensionLock.Unlock()
|
defer cp.extensionLock.Unlock()
|
||||||
|
|
||||||
// Register Requested Events
|
// Register Requested Events
|
||||||
for _, event := range extension.EventsToRegister() {
|
for _, e := range extension.EventsToRegister() {
|
||||||
cp.eventBus.Subscribe(event, cp.queue)
|
cp.eventBus.Subscribe(e, cp.queue)
|
||||||
}
|
}
|
||||||
|
|
||||||
cp.extensions = append(cp.extensions, ConstructHook(extension))
|
cp.extensions = append(cp.extensions, ConstructHook(extension))
|
||||||
|
@ -353,7 +353,7 @@ func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
|
||||||
return string(value), true
|
return string(value), true
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetScopedZonedAttributes finds all keys associated with the given scope and zone
|
// GetScopedZonedAttributeKeys finds all keys associated with the given scope and zone
|
||||||
func (cp *cwtchPeer) GetScopedZonedAttributeKeys(scope attr.Scope, zone attr.Zone) ([]string, error) {
|
func (cp *cwtchPeer) GetScopedZonedAttributeKeys(scope attr.Scope, zone attr.Zone) ([]string, error) {
|
||||||
|
|
||||||
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(""))
|
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(""))
|
||||||
|
@ -367,7 +367,7 @@ func (cp *cwtchPeer) GetScopedZonedAttributeKeys(scope attr.Scope, zone attr.Zon
|
||||||
return keys, nil
|
return keys, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetScopedZonedAttribute
|
// SetScopedZonedAttribute saves a scoped and zoned attribute key/value pair as part of the profile
|
||||||
func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string, value string) {
|
func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string, value string) {
|
||||||
|
|
||||||
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key))
|
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key))
|
||||||
|
@ -520,8 +520,8 @@ func ImportLegacyProfile(profile *model.Profile, cps *CwtchProfileStorage) Cwtch
|
||||||
parts := strings.SplitN(k, ".", 2)
|
parts := strings.SplitN(k, ".", 2)
|
||||||
if len(parts) == 2 {
|
if len(parts) == 2 {
|
||||||
scope := attr.IntoScope(parts[0])
|
scope := attr.IntoScope(parts[0])
|
||||||
zone, path := attr.ParseZone(parts[1])
|
zone, szpath := attr.ParseZone(parts[1])
|
||||||
cp.SetScopedZonedAttribute(scope, zone, path, v)
|
cp.SetScopedZonedAttribute(scope, zone, szpath, v)
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("could not import legacy style attribute %v", k)
|
log.Debugf("could not import legacy style attribute %v", k)
|
||||||
}
|
}
|
||||||
|
@ -567,14 +567,14 @@ func ImportLegacyProfile(profile *model.Profile, cps *CwtchProfileStorage) Cwtch
|
||||||
|
|
||||||
for _, message := range contact.Timeline.GetMessages() {
|
for _, message := range contact.Timeline.GetMessages() {
|
||||||
// By definition anything stored in legacy timelines in acknowledged
|
// By definition anything stored in legacy timelines in acknowledged
|
||||||
attr := model.Attributes{constants.AttrAuthor: message.PeerID, constants.AttrAck: event.True, constants.AttrSentTimestamp: message.Timestamp.Format(time.RFC3339Nano)}
|
attributes := model.Attributes{constants.AttrAuthor: message.PeerID, constants.AttrAck: event.True, constants.AttrSentTimestamp: message.Timestamp.Format(time.RFC3339Nano)}
|
||||||
if message.Flags&0x01 == 0x01 {
|
if message.Flags&0x01 == 0x01 {
|
||||||
attr[constants.AttrRejected] = event.True
|
attributes[constants.AttrRejected] = event.True
|
||||||
}
|
}
|
||||||
if message.Flags&0x02 == 0x02 {
|
if message.Flags&0x02 == 0x02 {
|
||||||
attr[constants.AttrDownloaded] = event.True
|
attributes[constants.AttrDownloaded] = event.True
|
||||||
}
|
}
|
||||||
cp.storage.InsertMessage(conversationID, 0, message.Message, attr, model.GenerateRandomID(), model.CalculateContentHash(message.PeerID, message.Message))
|
cp.storage.InsertMessage(conversationID, 0, message.Message, attributes, model.GenerateRandomID(), model.CalculateContentHash(message.PeerID, message.Message))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -588,14 +588,14 @@ func ImportLegacyProfile(profile *model.Profile, cps *CwtchProfileStorage) Cwtch
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for _, message := range group.Timeline.GetMessages() {
|
for _, message := range group.Timeline.GetMessages() {
|
||||||
// By definition anything stored in legacy timelines in acknowledged
|
// By definition anything stored in legacy timelines in acknowledged
|
||||||
attr := model.Attributes{constants.AttrAuthor: message.PeerID, constants.AttrAck: event.True, constants.AttrSentTimestamp: message.Timestamp.Format(time.RFC3339Nano)}
|
attributes := model.Attributes{constants.AttrAuthor: message.PeerID, constants.AttrAck: event.True, constants.AttrSentTimestamp: message.Timestamp.Format(time.RFC3339Nano)}
|
||||||
if message.Flags&0x01 == 0x01 {
|
if message.Flags&0x01 == 0x01 {
|
||||||
attr[constants.AttrRejected] = event.True
|
attributes[constants.AttrRejected] = event.True
|
||||||
}
|
}
|
||||||
if message.Flags&0x02 == 0x02 {
|
if message.Flags&0x02 == 0x02 {
|
||||||
attr[constants.AttrDownloaded] = event.True
|
attributes[constants.AttrDownloaded] = event.True
|
||||||
}
|
}
|
||||||
cp.storage.InsertMessage(conversationID, 0, message.Message, attr, base64.StdEncoding.EncodeToString(message.Signature), model.CalculateContentHash(message.PeerID, message.Message))
|
cp.storage.InsertMessage(conversationID, 0, message.Message, attributes, base64.StdEncoding.EncodeToString(message.Signature), model.CalculateContentHash(message.PeerID, message.Message))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -811,12 +811,12 @@ func (cp *cwtchPeer) GetMostRecentMessages(conversation int, channel int, offset
|
||||||
// UpdateMessageAttribute sets a given key/value attribute on the message in the given conversation/channel
|
// UpdateMessageAttribute sets a given key/value attribute on the message in the given conversation/channel
|
||||||
// errors if the message doesn't exist, or for underlying database issues.
|
// errors if the message doesn't exist, or for underlying database issues.
|
||||||
func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error {
|
func (cp *cwtchPeer) UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error {
|
||||||
_, attr, err := cp.GetChannelMessage(conversation, channel, id)
|
_, attribute, err := cp.GetChannelMessage(conversation, channel, id)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cp.mutex.Lock()
|
cp.mutex.Lock()
|
||||||
defer cp.mutex.Unlock()
|
defer cp.mutex.Unlock()
|
||||||
attr[key] = value
|
attribute[key] = value
|
||||||
return cp.storage.UpdateMessageAttributes(conversation, channel, id, attr)
|
return cp.storage.UpdateMessageAttributes(conversation, channel, id, attribute)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1178,8 +1178,8 @@ func (cp *cwtchPeer) GetConversationLastSeenTime(conversationId int) time.Time {
|
||||||
|
|
||||||
timestamp, err := cp.GetConversationAttribute(conversationId, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)))
|
timestamp, err := cp.GetConversationAttribute(conversationId, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if time, err := time.Parse(time.RFC3339Nano, timestamp); err == nil {
|
if lastSeenTime, err := time.Parse(time.RFC3339Nano, timestamp); err == nil {
|
||||||
lastTime = time
|
lastTime = lastSeenTime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1207,7 +1207,7 @@ func (cp *cwtchPeer) GetConversationLastSeenTime(conversationId int) time.Time {
|
||||||
|
|
||||||
func (cp *cwtchPeer) getConnectionsSortedByLastSeen(doPeers, doServers bool) []*LastSeenConversation {
|
func (cp *cwtchPeer) getConnectionsSortedByLastSeen(doPeers, doServers bool) []*LastSeenConversation {
|
||||||
conversations, _ := cp.FetchConversations()
|
conversations, _ := cp.FetchConversations()
|
||||||
byRecent := []*LastSeenConversation{}
|
var byRecent []*LastSeenConversation
|
||||||
|
|
||||||
for _, conversation := range conversations {
|
for _, conversation := range conversations {
|
||||||
if !conversation.IsGroup() {
|
if !conversation.IsGroup() {
|
||||||
|
@ -1384,7 +1384,7 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
}
|
}
|
||||||
case event.SendMessageToPeerError:
|
case event.SendMessageToPeerError:
|
||||||
context := ev.Data[event.EventContext]
|
context := ev.Data[event.EventContext]
|
||||||
if context == string(event.SendMessageToPeer) {
|
if event.Type(context) == event.SendMessageToPeer {
|
||||||
err := cp.attemptErrorConversationMessage(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error])
|
err := cp.attemptErrorConversationMessage(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to error p2p message: %s %v", ev.Data[event.RemotePeer], err)
|
log.Errorf("failed to error p2p message: %s %v", ev.Data[event.RemotePeer], err)
|
||||||
|
@ -1400,9 +1400,9 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
case event.NewGetValMessageFromPeer:
|
case event.NewGetValMessageFromPeer:
|
||||||
onion := ev.Data[event.RemotePeer]
|
onion := ev.Data[event.RemotePeer]
|
||||||
scope := ev.Data[event.Scope]
|
scope := ev.Data[event.Scope]
|
||||||
path := ev.Data[event.Path]
|
zpath := ev.Data[event.Path]
|
||||||
|
|
||||||
log.Debugf("NewGetValMessageFromPeer for %v.%v from %v\n", scope, path, onion)
|
log.Debugf("NewGetValMessageFromPeer for %v.%v from %v\n", scope, zpath, onion)
|
||||||
conversationInfo, err := cp.FetchConversationInfo(onion)
|
conversationInfo, err := cp.FetchConversationInfo(onion)
|
||||||
|
|
||||||
log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err)
|
log.Debugf("confo info lookup newgetval %v %v %v", onion, conversationInfo, err)
|
||||||
|
@ -1410,7 +1410,7 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
if conversationInfo != nil && conversationInfo.Accepted {
|
if conversationInfo != nil && conversationInfo.Accepted {
|
||||||
// Type Safe Scoped/Zoned Path
|
// Type Safe Scoped/Zoned Path
|
||||||
zscope := attr.IntoScope(scope)
|
zscope := attr.IntoScope(scope)
|
||||||
zone, zpath := attr.ParseZone(path)
|
zone, zpath := attr.ParseZone(zpath)
|
||||||
scopedZonedPath := zscope.ConstructScopedZonedPath(zone.ConstructZonedPath(zpath))
|
scopedZonedPath := zscope.ConstructScopedZonedPath(zone.ConstructZonedPath(zpath))
|
||||||
|
|
||||||
// Safe Access to Extensions
|
// Safe Access to Extensions
|
||||||
|
@ -1432,17 +1432,17 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
case event.NewRetValMessageFromPeer:
|
case event.NewRetValMessageFromPeer:
|
||||||
handle := ev.Data[event.RemotePeer]
|
handle := ev.Data[event.RemotePeer]
|
||||||
scope := ev.Data[event.Scope]
|
scope := ev.Data[event.Scope]
|
||||||
path := ev.Data[event.Path]
|
zpath := ev.Data[event.Path]
|
||||||
val := ev.Data[event.Data]
|
val := ev.Data[event.Data]
|
||||||
exists, _ := strconv.ParseBool(ev.Data[event.Exists])
|
exists, _ := strconv.ParseBool(ev.Data[event.Exists])
|
||||||
log.Debugf("NewRetValMessageFromPeer %v %v %v %v %v\n", handle, scope, path, exists, val)
|
log.Debugf("NewRetValMessageFromPeer %v %v %v %v %v\n", handle, scope, zpath, exists, val)
|
||||||
|
|
||||||
conversationInfo, _ := cp.FetchConversationInfo(handle)
|
conversationInfo, _ := cp.FetchConversationInfo(handle)
|
||||||
// only accepted contacts can look up information
|
// only accepted contacts can look up information
|
||||||
if conversationInfo != nil && conversationInfo.Accepted {
|
if conversationInfo != nil && conversationInfo.Accepted {
|
||||||
// Type Safe Scoped/Zoned Path
|
// Type Safe Scoped/Zoned Path
|
||||||
zscope := attr.IntoScope(scope)
|
zscope := attr.IntoScope(scope)
|
||||||
zone, zpath := attr.ParseZone(path)
|
zone, zpath := attr.ParseZone(zpath)
|
||||||
scopedZonedPath := zscope.ConstructScopedZonedPath(zone.ConstructZonedPath(zpath))
|
scopedZonedPath := zscope.ConstructScopedZonedPath(zone.ConstructZonedPath(zpath))
|
||||||
|
|
||||||
// Safe Access to Extensions
|
// Safe Access to Extensions
|
||||||
|
@ -1620,12 +1620,12 @@ func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversat
|
||||||
messageID, err := cp.GetChannelMessageBySignature(conversationID, 0, signature)
|
messageID, err := cp.GetChannelMessageBySignature(conversationID, 0, signature)
|
||||||
// We have received our own message (probably), acknowledge and move on...
|
// We have received our own message (probably), acknowledge and move on...
|
||||||
if err == nil {
|
if err == nil {
|
||||||
_, attr, err := cp.GetChannelMessage(conversationID, 0, messageID)
|
_, attributes, err := cp.GetChannelMessage(conversationID, 0, messageID)
|
||||||
if err == nil && attr[constants.AttrAck] != constants.True {
|
if err == nil && attributes[constants.AttrAck] != constants.True {
|
||||||
cp.mutex.Lock()
|
cp.mutex.Lock()
|
||||||
attr[constants.AttrAck] = constants.True
|
attributes[constants.AttrAck] = constants.True
|
||||||
cp.mutex.Unlock()
|
cp.mutex.Unlock()
|
||||||
cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attr)
|
_ = cp.storage.UpdateMessageAttributes(conversationID, 0, messageID, attributes)
|
||||||
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.Index: strconv.Itoa(messageID)}))
|
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.Index: strconv.Itoa(messageID)}))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1650,12 +1650,12 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
|
||||||
// for p2p messages the randomly generated event ID is the "signature"
|
// for p2p messages the randomly generated event ID is the "signature"
|
||||||
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature)
|
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
_, attr, err := cp.GetChannelMessage(ci.ID, 0, id)
|
_, attributes, err := cp.GetChannelMessage(ci.ID, 0, id)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cp.mutex.Lock()
|
cp.mutex.Lock()
|
||||||
attr[constants.AttrAck] = constants.True
|
attributes[constants.AttrAck] = constants.True
|
||||||
cp.mutex.Unlock()
|
cp.mutex.Unlock()
|
||||||
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr)
|
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attributes)
|
||||||
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: handle, event.Index: strconv.Itoa(id)}))
|
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: handle, event.Index: strconv.Itoa(id)}))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1677,11 +1677,11 @@ func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature st
|
||||||
// "signature" here is event ID for peer messages...
|
// "signature" here is event ID for peer messages...
|
||||||
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature)
|
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
_, attr, err := cp.GetChannelMessage(ci.ID, 0, id)
|
_, attributes, err := cp.GetChannelMessage(ci.ID, 0, id)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cp.mutex.Lock()
|
cp.mutex.Lock()
|
||||||
attr[constants.AttrErr] = constants.True
|
attributes[constants.AttrErr] = constants.True
|
||||||
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attr)
|
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attributes)
|
||||||
cp.mutex.Unlock()
|
cp.mutex.Unlock()
|
||||||
// Send a generic indexed failure...
|
// Send a generic indexed failure...
|
||||||
cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.Handle: handle, event.Error: error, event.Index: strconv.Itoa(id)}))
|
cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.Handle: handle, event.Error: error, event.Index: strconv.Itoa(id)}))
|
||||||
|
|
|
@ -124,7 +124,7 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS
|
||||||
|
|
||||||
insertProfileKeyValueStmt, err := db.Prepare(insertProfileKeySQLStmt)
|
insertProfileKeyValueStmt, err := db.Prepare(insertProfileKeySQLStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.Close()
|
_ = db.Close()
|
||||||
// note: this is debug because we expect failure here when opening an encrypted database with an
|
// note: this is debug because we expect failure here when opening an encrypted database with an
|
||||||
// incorrect password. The rest are errors because failure is not expected.
|
// incorrect password. The rest are errors because failure is not expected.
|
||||||
log.Debugf("error preparing query: %v %v", insertProfileKeySQLStmt, err)
|
log.Debugf("error preparing query: %v %v", insertProfileKeySQLStmt, err)
|
||||||
|
@ -133,70 +133,70 @@ func NewCwtchProfileStorage(db *sql.DB, profileDirectory string) (*CwtchProfileS
|
||||||
|
|
||||||
selectProfileKeyStmt, err := db.Prepare(selectProfileKeySQLStmt)
|
selectProfileKeyStmt, err := db.Prepare(selectProfileKeySQLStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.Close()
|
_ = db.Close()
|
||||||
log.Errorf("error preparing query: %v %v", selectProfileKeySQLStmt, err)
|
log.Errorf("error preparing query: %v %v", selectProfileKeySQLStmt, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
findProfileKeyStmt, err := db.Prepare(findProfileKeySQLStmt)
|
findProfileKeyStmt, err := db.Prepare(findProfileKeySQLStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.Close()
|
_ = db.Close()
|
||||||
log.Errorf("error preparing query: %v %v", findProfileKeySQLStmt, err)
|
log.Errorf("error preparing query: %v %v", findProfileKeySQLStmt, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
insertConversationStmt, err := db.Prepare(insertConversationSQLStmt)
|
insertConversationStmt, err := db.Prepare(insertConversationSQLStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.Close()
|
_ = db.Close()
|
||||||
log.Errorf("error preparing query: %v %v", insertConversationSQLStmt, err)
|
log.Errorf("error preparing query: %v %v", insertConversationSQLStmt, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchAllConversationsStmt, err := db.Prepare(fetchAllConversationsSQLStmt)
|
fetchAllConversationsStmt, err := db.Prepare(fetchAllConversationsSQLStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.Close()
|
_ = db.Close()
|
||||||
log.Errorf("error preparing query: %v %v", fetchAllConversationsSQLStmt, err)
|
log.Errorf("error preparing query: %v %v", fetchAllConversationsSQLStmt, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
selectConversationStmt, err := db.Prepare(selectConversationSQLStmt)
|
selectConversationStmt, err := db.Prepare(selectConversationSQLStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.Close()
|
_ = db.Close()
|
||||||
log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err)
|
log.Errorf("error preparing query: %v %v", selectConversationSQLStmt, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
selectConversationByHandleStmt, err := db.Prepare(selectConversationByHandleSQLStmt)
|
selectConversationByHandleStmt, err := db.Prepare(selectConversationByHandleSQLStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.Close()
|
_ = db.Close()
|
||||||
log.Errorf("error preparing query: %v %v", selectConversationByHandleSQLStmt, err)
|
log.Errorf("error preparing query: %v %v", selectConversationByHandleSQLStmt, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
acceptConversationStmt, err := db.Prepare(acceptConversationSQLStmt)
|
acceptConversationStmt, err := db.Prepare(acceptConversationSQLStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.Close()
|
_ = db.Close()
|
||||||
log.Errorf("error preparing query: %v %v", acceptConversationSQLStmt, err)
|
log.Errorf("error preparing query: %v %v", acceptConversationSQLStmt, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
deleteConversationStmt, err := db.Prepare(deleteConversationSQLStmt)
|
deleteConversationStmt, err := db.Prepare(deleteConversationSQLStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.Close()
|
_ = db.Close()
|
||||||
log.Errorf("error preparing query: %v %v", deleteConversationSQLStmt, err)
|
log.Errorf("error preparing query: %v %v", deleteConversationSQLStmt, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
setConversationAttributesStmt, err := db.Prepare(setConversationAttributesSQLStmt)
|
setConversationAttributesStmt, err := db.Prepare(setConversationAttributesSQLStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.Close()
|
_ = db.Close()
|
||||||
log.Errorf("error preparing query: %v %v", setConversationAttributesSQLStmt, err)
|
log.Errorf("error preparing query: %v %v", setConversationAttributesSQLStmt, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
setConversationACLStmt, err := db.Prepare(setConversationACLSQLStmt)
|
setConversationACLStmt, err := db.Prepare(setConversationACLSQLStmt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.Close()
|
_ = db.Close()
|
||||||
log.Errorf("error preparing query: %v %v", setConversationACLSQLStmt, err)
|
log.Errorf("error preparing query: %v %v", setConversationACLSQLStmt, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -816,41 +816,41 @@ func (cps *CwtchProfileStorage) Close(purgeAllNonSavedMessages bool) {
|
||||||
cps.mutex.Lock()
|
cps.mutex.Lock()
|
||||||
defer cps.mutex.Unlock()
|
defer cps.mutex.Unlock()
|
||||||
|
|
||||||
cps.insertProfileKeyValueStmt.Close()
|
_ = cps.insertProfileKeyValueStmt.Close()
|
||||||
cps.selectProfileKeyValueStmt.Close()
|
_ = cps.selectProfileKeyValueStmt.Close()
|
||||||
|
|
||||||
cps.insertConversationStmt.Close()
|
_ = cps.insertConversationStmt.Close()
|
||||||
cps.fetchAllConversationsStmt.Close()
|
_ = cps.fetchAllConversationsStmt.Close()
|
||||||
cps.selectConversationStmt.Close()
|
_ = cps.selectConversationStmt.Close()
|
||||||
cps.selectConversationByHandleStmt.Close()
|
_ = cps.selectConversationByHandleStmt.Close()
|
||||||
cps.acceptConversationStmt.Close()
|
_ = cps.acceptConversationStmt.Close()
|
||||||
cps.deleteConversationStmt.Close()
|
_ = cps.deleteConversationStmt.Close()
|
||||||
cps.setConversationAttributesStmt.Close()
|
_ = cps.setConversationAttributesStmt.Close()
|
||||||
cps.setConversationACLStmt.Close()
|
_ = cps.setConversationACLStmt.Close()
|
||||||
|
|
||||||
for _, v := range cps.channelInsertStmts {
|
for _, v := range cps.channelInsertStmts {
|
||||||
v.Close()
|
_ = v.Close()
|
||||||
}
|
}
|
||||||
for _, v := range cps.channelUpdateMessageStmts {
|
for _, v := range cps.channelUpdateMessageStmts {
|
||||||
v.Close()
|
_ = v.Close()
|
||||||
}
|
}
|
||||||
for _, v := range cps.channelGetMessageStmts {
|
for _, v := range cps.channelGetMessageStmts {
|
||||||
v.Close()
|
_ = v.Close()
|
||||||
}
|
}
|
||||||
for _, v := range cps.channelGetMessageBySignatureStmts {
|
for _, v := range cps.channelGetMessageBySignatureStmts {
|
||||||
v.Close()
|
_ = v.Close()
|
||||||
}
|
}
|
||||||
for _, v := range cps.channelGetCountStmts {
|
for _, v := range cps.channelGetCountStmts {
|
||||||
v.Close()
|
_ = v.Close()
|
||||||
}
|
}
|
||||||
for _, v := range cps.channelGetMostRecentMessagesStmts {
|
for _, v := range cps.channelGetMostRecentMessagesStmts {
|
||||||
v.Close()
|
_ = v.Close()
|
||||||
}
|
}
|
||||||
for _, v := range cps.channelGetMessageByContentHashStmts {
|
for _, v := range cps.channelGetMessageByContentHashStmts {
|
||||||
v.Close()
|
_ = v.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
cps.db.Close()
|
_ = cps.db.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,8 +35,8 @@ type ProfileHook struct {
|
||||||
|
|
||||||
func ConstructHook(extension ProfileHooks) ProfileHook {
|
func ConstructHook(extension ProfileHooks) ProfileHook {
|
||||||
events := make(map[event.Type]bool)
|
events := make(map[event.Type]bool)
|
||||||
for _, event := range extension.EventsToRegister() {
|
for _, e := range extension.EventsToRegister() {
|
||||||
events[event] = true
|
events[e] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
experiments := make(map[string]bool)
|
experiments := make(map[string]bool)
|
||||||
|
|
|
@ -75,7 +75,7 @@ func (pa *PeerApp) Init(connection tapir.Connection) {
|
||||||
// version *must* be the first message sent to prevent race conditions for other events fired after-auth
|
// version *must* be the first message sent to prevent race conditions for other events fired after-auth
|
||||||
// (e.g. getVal requests)
|
// (e.g. getVal requests)
|
||||||
// as such, we send this message before we update the rest of the system
|
// as such, we send this message before we update the rest of the system
|
||||||
pa.SendMessage(model2.PeerMessage{
|
_ = pa.SendMessage(model2.PeerMessage{
|
||||||
ID: event.ContextVersion,
|
ID: event.ContextVersion,
|
||||||
Context: event.ContextGetVal,
|
Context: event.ContextGetVal,
|
||||||
Data: []byte{Version2},
|
Data: []byte{Version2},
|
||||||
|
@ -131,7 +131,7 @@ func (pa *PeerApp) listen() {
|
||||||
pa.version.Store(Version2)
|
pa.version.Store(Version2)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, []byte(packet.Data))
|
pa.MessageHandler(pa.connection.Hostname(), packet.ID, packet.Context, packet.Data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -94,8 +94,11 @@ func InitGlobalSettingsFile(directory string, password string) (*GlobalSettingsF
|
||||||
log.Errorf("Could not initialize salt: %v", err)
|
log.Errorf("Could not initialize salt: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
os.Mkdir(directory, 0700)
|
err := os.Mkdir(directory, 0700)
|
||||||
err := os.WriteFile(path.Join(directory, saltFile), newSalt[:], 0600)
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
err = os.WriteFile(path.Join(directory, saltFile), newSalt[:], 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Could not write salt file: %v", err)
|
log.Errorf("Could not write salt file: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
Loading…
Reference in New Issue