Compare commits

...

22 Commits

Author SHA1 Message Date
Sarah Jamie Lewis cff2a8cafe Merge pull request 'Fix Various Bugs Associated with Profile Start Up / Restart' (#515) from startupbugs into master
Reviewed-on: cwtch.im/cwtch#515
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2023-05-16 23:21:40 +00:00
Sarah Jamie Lewis 035c6c669f Formatting / Remove Debug 2023-05-16 15:56:07 -07:00
Sarah Jamie Lewis 462a294c93 Add ProtocolEngine test case to ContactRetry plugin 2023-05-16 15:47:49 -07:00
Sarah Jamie Lewis f982e55c4f Safety check on unreachable case 2023-05-16 15:45:56 -07:00
Sarah Jamie Lewis bc522b57c1 Close connection in unreachable case 2023-05-16 15:45:05 -07:00
Sarah Jamie Lewis 8fd6d5ead2 Fix Various Bugs Associated with Profile Start Up / Restart 2023-05-16 22:42:44 +00:00
Sarah Jamie Lewis 50cca925de Merge pull request 'Add a setting to preserve custom font scaling setting' (#514) from font-setting into master
Reviewed-on: cwtch.im/cwtch#514
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2023-05-09 19:43:51 +00:00
Sarah Jamie Lewis b81353c128 Add a setting to preserve custom font scaling setting 2023-05-09 12:19:19 -07:00
Sarah Jamie Lewis 05cc347ba2 Merge pull request 'Remove RetryPeer event, Poke token count on new group' (#513) from events into master
Reviewed-on: cwtch.im/cwtch#513
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2023-05-09 18:24:31 +00:00
Sarah Jamie Lewis 92eed46c56 Adding a Test for Contact Retry; Adding jump the queue shortcuts for priority peers 2023-05-09 10:43:07 -07:00
Sarah Jamie Lewis 2abfaf82a1 Fix Race Condition 2023-05-02 13:45:19 -07:00
Sarah Jamie Lewis f5c397876b Update Conversation Timestamp 2023-05-02 13:04:53 -07:00
Sarah Jamie Lewis 3b822393cd Remove RetryPeer event, Poke token count on new group 2023-05-02 19:28:59 +00:00
Dan Ballard 7053f4a31b remove peerlock probably left over from peerapp seperation 2023-05-01 16:13:39 -05:00
Dan Ballard e9e2a18678 fix? 2023-04-28 15:00:23 -06:00
Dan Ballard 440b7f422c move event handling for AcnStatus engine reboot from lcg into app 2023-04-28 15:00:15 -06:00
Dan Ballard 12b89966de engine shutdown now puts potentially long blocking service.close()s in goroutine; contact retry more smartly handles protocolengine start in case last ACNstatus == 100 message comes first 2023-04-27 15:16:24 -06:00
Sarah Jamie Lewis 70c335df81 Merge pull request 'Make DelteProfile and ShutdownPeer safe to call twice / with incorrect onion' (#510) from fuzzbot into master
Reviewed-on: cwtch.im/cwtch#510
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2023-04-22 01:48:10 +00:00
Sarah Jamie Lewis 8ab0e9993a Make DelteProfile and ShutdownPeer safe to call twice / with incorrect onion 2023-04-21 14:22:09 -07:00
Sarah Jamie Lewis 48e5f44f84 Merge pull request 'Add UpdatedConversationAttribute Event for the UI' (#509) from fuzzbot into master
Reviewed-on: cwtch.im/cwtch#509
Reviewed-by: Dan Ballard <dan@openprivacy.ca>
2023-04-20 22:27:11 +00:00
Sarah Jamie Lewis 79c51b0e6d Add Conversation info in UCA 2023-04-20 15:18:51 -07:00
Sarah Jamie Lewis 4e0fbbc1de Add UpdatedConversationAttribute Event for the UI 2023-04-20 15:14:09 -07:00
12 changed files with 296 additions and 83 deletions

View File

@ -24,13 +24,13 @@ type application struct {
eventBuses map[string]event.Manager
directory string
peerLock sync.Mutex
peers map[string]peer.CwtchPeer
acn connectivity.ACN
plugins sync.Map //map[string] []plugins.Plugin
peers map[string]peer.CwtchPeer
acn connectivity.ACN
plugins sync.Map //map[string] []plugins.Plugin
engines map[string]connections.Engine
appBus event.Manager
eventQueue event.Queue
appmutex sync.Mutex
engineHooks connections.EngineHooks
@ -97,7 +97,7 @@ func LoadAppSettings(appDirectory string) *settings.GlobalSettingsFile {
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.GlobalSettingsFile) Application {
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings}
app := &application{engines: make(map[string]connections.Engine), eventBuses: make(map[string]event.Manager), directory: appDirectory, appBus: event.NewEventManager(), settings: settings, eventQueue: event.NewQueue()}
app.peers = make(map[string]peer.CwtchPeer)
app.engineHooks = connections.DefaultEngineHooks{}
app.acn = acn
@ -107,6 +107,9 @@ func NewApp(acn connectivity.ACN, appDirectory string, settings *settings.Global
prog, status := acn.GetBootstrapStatus()
statusHandler(prog, status)
app.GetPrimaryBus().Subscribe(event.ACNStatus, app.eventQueue)
go app.eventHandler()
return app
}
@ -128,9 +131,6 @@ func (app *application) UpdateSettings(settings settings.GlobalSettings) {
defer app.appmutex.Unlock()
app.settings.WriteGlobalSettings(settings)
// we now need to propagate changes to all peers
app.peerLock.Lock()
defer app.peerLock.Unlock()
for _, profile := range app.peers {
profile.UpdateExperiments(settings.ExperimentsEnabled, settings.Experiments)
@ -150,8 +150,8 @@ func (app *application) UpdateSettings(settings settings.GlobalSettings) {
func (app *application) ListProfiles() []string {
var keys []string
app.peerLock.Lock()
defer app.peerLock.Unlock()
app.appmutex.Lock()
defer app.appmutex.Unlock()
for handle := range app.peers {
keys = append(keys, handle)
}
@ -160,6 +160,8 @@ func (app *application) ListProfiles() []string {
// GetPeer returns a cwtchPeer for a given onion address
func (app *application) GetPeer(onion string) peer.CwtchPeer {
app.appmutex.Lock()
defer app.appmutex.Unlock()
if profile, ok := app.peers[onion]; ok {
return profile
}
@ -255,6 +257,12 @@ func (app *application) DeleteProfile(onion string, password string) {
app.appmutex.Lock()
defer app.appmutex.Unlock()
// short circuit to prevent nil-pointer panic if this function is called twice (or incorrectly)
if app.peers[onion] == nil {
log.Errorf("shutdownPeer called with invalid onion %v", onion)
return
}
// allow a blank password to delete "unencrypted" accounts...
if password == "" {
password = DefactoPasswordForUnencryptedProfiles
@ -405,8 +413,8 @@ func (app *application) ActivatePeerEngine(onion string) {
profile := app.GetPeer(onion)
if profile != nil {
if _, exists := app.engines[onion]; !exists {
log.Debugf("restartFlow: Creating a New Protocol Engine...")
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()], app.engineHooks)
app.eventBuses[profile.GetOnion()].Publish(event.NewEventList(event.ProtocolEngineCreated))
app.QueryACNStatus()
if true {
@ -468,6 +476,48 @@ func (app *application) QueryACNVersion() {
app.appBus.Publish(event.NewEventList(event.ACNVersion, event.Data, version))
}
func (app *application) eventHandler() {
acnStatus := -1
for {
e := app.eventQueue.Next()
switch e.EventType {
case event.ACNStatus:
newAcnStatus, err := strconv.Atoi(e.Data[event.Progress])
if err != nil {
break
}
if newAcnStatus == 100 {
if acnStatus != 100 {
for _, onion := range app.ListProfiles() {
profile := app.GetPeer(onion)
if profile != nil {
autostart, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.PeerAutostart)
if !exists || autostart == "true" {
app.ActivatePeerEngine(onion)
}
}
}
}
} else {
if acnStatus == 100 {
// just fell offline
for _, onion := range app.ListProfiles() {
app.DeactivatePeerEngine(onion)
}
}
}
acnStatus = newAcnStatus
default:
// invalid event, signifies shutdown
if e.EventType == "" {
return
}
}
}
}
// ShutdownPeer shuts down a peer and removes it from the app's management
func (app *application) ShutdownPeer(onion string) {
app.appmutex.Lock()
@ -477,6 +527,13 @@ func (app *application) ShutdownPeer(onion string) {
// shutdownPeer mutex unlocked helper shutdown peer
func (app *application) shutdownPeer(onion string) {
// short circuit to prevent nil-pointer panic if this function is called twice (or incorrectly)
if app.eventBuses[onion] == nil || app.peers[onion] == nil {
log.Errorf("shutdownPeer called with invalid onion %v", onion)
return
}
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
app.eventBuses[onion].Shutdown()
delete(app.eventBuses, onion)
@ -506,6 +563,7 @@ func (app *application) Shutdown() {
app.shutdownPeer(id)
}
log.Debugf("Shutting Down App")
app.eventQueue.Shutdown()
app.appBus.Shutdown()
log.Debugf("Shut Down Complete")
}

View File

@ -5,6 +5,7 @@ import (
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
"math"
"strconv"
"sync"
"time"
)
@ -113,6 +114,7 @@ type contactRetry struct {
breakChan chan bool
onion string
lastCheck time.Time
acnProgress int
connections sync.Map //[string]*contact
connCount int
@ -168,14 +170,16 @@ func (cr *contactRetry) run() {
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
cr.bus.Subscribe(event.ACNStatus, cr.queue)
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
cr.bus.Subscribe(event.PeerRequest, cr.queue)
cr.bus.Subscribe(event.QueuePeerRequest, cr.queue)
cr.bus.Subscribe(event.QueueJoinServer, cr.queue)
cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue)
cr.bus.Subscribe(event.ProtocolEngineCreated, cr.queue)
for {
if cr.ACNUp {
// Only attempt connection if both the ACN and the Protocol Engines are Online...
log.Debugf("restartFlow checking state")
if cr.ACNUp && cr.protocolEngine {
log.Debugf("restartFlow time to queue!!")
cr.requeueReady()
connectingCount := cr.connectingCount()
log.Debugf("checking queues (priority len: %v) (pending len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.priorityQueue.queue), len(cr.pendingQueue.queue), cr.connCount, connectingCount)
@ -249,8 +253,6 @@ func (cr *contactRetry) run() {
}
}
}
case event.ProtocolEngineCreated:
cr.protocolEngine = true
case event.ProtocolEngineShutdown:
cr.ACNUp = false
@ -265,22 +267,15 @@ func (cr *contactRetry) run() {
p.failedCount = 0
return true
})
case event.ProtocolEngineCreated:
cr.protocolEngine = true
cr.processStatus()
case event.ACNStatus:
prog := e.Data[event.Progress]
if !cr.protocolEngine {
continue
}
if prog == "100" && !cr.ACNUp {
cr.ACNUp = true
cr.ACNUpTime = time.Now()
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.failedCount = 0
return true
})
} else if prog != "100" {
cr.ACNUp = false
progData := e.Data[event.Progress]
if prog, err := strconv.Atoi(progData); err == nil {
cr.acnProgress = prog
cr.processStatus()
}
}
@ -294,6 +289,50 @@ func (cr *contactRetry) run() {
}
}
func (cr *contactRetry) processStatus() {
if !cr.protocolEngine {
cr.ACNUp = false
return
}
if cr.acnProgress == 100 && !cr.ACNUp {
// ACN is up...at this point we need to completely reset our state
// as there is no guarantee that the tor daemon shares our state anymore...
cr.ACNUp = true
cr.ACNUpTime = time.Now()
// reset all of the queues...
cr.connCount = 0
cr.priorityQueue = newConnectionQueue()
cr.pendingQueue = newConnectionQueue()
// Loop through connections. Reset state, and requeue...
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.queued = true
// prioritize connections made recently...
log.Debugf("adding %v to queue", p.id)
if time.Since(p.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
cr.priorityQueue.insert(p)
} else {
cr.pendingQueue.insert(p)
}
return true
})
} else if cr.acnProgress != 100 {
cr.ACNUp = false
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
p.failedCount = 0
p.queued = false
p.state = connections.DISCONNECTED
return true
})
}
}
func (cr *contactRetry) requeueReady() {
if !cr.ACNUp {
return
@ -329,8 +368,9 @@ func (cr *contactRetry) requeueReady() {
}
func (cr *contactRetry) publishConnectionRequest(contact *contact) {
log.Debugf("RestartFlow Publish Connection Request listener %v", contact)
if contact.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: contact.id}))
cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id}))
}
if contact.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: contact.id}))
@ -350,6 +390,13 @@ func (cr *contactRetry) addConnection(id string, state connections.ConnectionSta
cr.connections.Store(id, p)
cr.connCount += 1
return
} else {
// we have rerequested this connnection. Force set the queued parameter to true.
p, _ := cr.connections.Load(id)
if !p.(*contact).queued {
p.(*contact).queued = true
cr.connCount += 1
}
}
}
@ -362,7 +409,10 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
}
if _, exists := cr.connections.Load(id); !exists {
cr.addConnection(id, state, ctype, event.CwtchEpoch)
// We have an event for something we don't know about...
// The only reason this should happen is if a *new* Peer/Server connection has changed.
// Let's set the timeout to Now() to indicate that this is a fresh connection, and so should likely be prioritized.
cr.addConnection(id, state, ctype, time.Now())
return
}

View File

@ -0,0 +1,74 @@
package plugins
import (
"testing"
"time"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
)
// TestContactRetryQueue simulates some basic connection queueing
// NOTE: This whole test is a race condition, and does flag go's detector
// We are invasively checking the internal state of the retry plugin and accessing pointers from another
// thread.
// We could build an entire thread safe monitoring functonality, but that would dramatically expand the scope of this test.
func TestContactRetryQueue(t *testing.T) {
log.SetLevel(log.LevelDebug)
bus := event.NewEventManager()
cr := NewConnectionRetry(bus, "").(*contactRetry)
cr.ACNUp = true // fake an ACN connection...
cr.protocolEngine = true // fake protocol engine
go cr.run()
t.Logf("contact plugin up and running..sending peer connection...")
// Assert that there is a peer connection identified as "test"
bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: "test", event.LastSeen: "test"}))
// Wait until the test actually exists, and is queued
// This is the worst part of this test setup. Ideally we would sleep, or some other yielding, but
// go test scheduling doesn't like that and even sleeping long periods won't cause the event thread to make
// progress...
for {
if pinf, exists := cr.connections.Load("test"); exists {
if pinf.(*contact).queued {
break
}
}
}
pinf, _ := cr.connections.Load("test")
if pinf.(*contact).queued == false {
t.Fatalf("test connection should be queued, actually: %v", pinf.(*contact).queued)
}
// Asset that "test" is authenticated
cr.handleEvent("test", connections.AUTHENTICATED, peerConn)
// Assert that "test has a valid state"
pinf, _ = cr.connections.Load("test")
if pinf.(*contact).state != 3 {
t.Fatalf("test connection should be in authenticated after update, actually: %v", pinf.(*contact).state)
}
// Publish an unrelated event to trigger the Plugin to go through a queuing cycle
// If we didn't do this we would have to wait 30 seconds for a check-in
bus.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{event.RemotePeer: "test2", event.ConnectionState: "Disconnected"}))
time.Sleep(time.Second)
if pinf.(*contact).queued != false {
t.Fatalf("test connection should not be queued, actually: %v", pinf.(*contact).queued)
}
// Publish a new peer request...
bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: "test"}))
time.Sleep(time.Second) // yield for a second so the event can catch up...
// Peer test should be forced to queue....
pinf, _ = cr.connections.Load("test")
if pinf.(*contact).queued != true {
t.Fatalf("test connection should be forced to queue after new queue peer request")
}
cr.Shutdown()
}

View File

@ -25,12 +25,6 @@ const (
// GroupServer
QueuePeerRequest = Type("QueuePeerRequest")
// RetryPeerRequest
// Identical to PeerRequest, but allows Engine to make decisions regarding blocked peers
// attributes:
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
RetryPeerRequest = Type("RetryPeerRequest")
// RetryServerRequest
// Asks CwtchPeer to retry a server connection...
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
@ -212,9 +206,10 @@ const (
// Profile Attribute Event
UpdatedProfileAttribute = Type("UpdatedProfileAttribute")
StartingStorageMiragtion = Type("StartingStorageMigration")
DoneStorageMigration = Type("DoneStorageMigration")
// Conversation Attribute Update...
UpdatedConversationAttribute = Type("UpdatedConversationAttribute")
StartingStorageMiragtion = Type("StartingStorageMigration")
DoneStorageMigration = Type("DoneStorageMigration")
TokenManagerInfo = Type("TokenManagerInfo")
TriggerAntispamCheck = Type("TriggerAntispamCheck")

View File

@ -64,10 +64,31 @@ func (pne ProfileValueExtension) OnContactReceiveValue(profile peer.CwtchPeer, c
// Allow public profile parameters to be added as contact specific attributes...
scope, zone, _ := szp.GetScopeZonePath()
if exists && scope.IsPublic() && zone == attr.ProfileZone {
err := profile.SetConversationAttribute(conversation.ID, szp, value)
if err != nil {
log.Errorf("error setting conversation attribute %v", err)
// Check the current value of the attribute
currentValue, err := profile.GetConversationAttribute(conversation.ID, szp)
if err == nil && currentValue == value {
// Value exists and the value is the same, short-circuit
return
}
// Save the new Attribute
err = profile.SetConversationAttribute(conversation.ID, szp, value)
if err != nil {
// Something else wen't wrong.. short-circuit
log.Errorf("error setting conversation attribute %v", err)
return
}
// Finally publish an update for listeners to react to.
scope, zone, zpath := szp.GetScopeZonePath()
profile.PublishEvent(event.NewEvent(event.UpdatedConversationAttribute, map[event.Field]string{
event.Scope: string(scope),
event.Path: string(zone.ConstructZonedPath(zpath)),
event.Data: value,
event.RemotePeer: conversation.Handle,
event.ConversationID: strconv.Itoa(conversation.ID),
}))
}
}

View File

@ -193,6 +193,9 @@ func (f *Functionality) VerifyOrResumeDownload(profile peer.CwtchPeer, conversat
// Assert the filename...this is technically not necessary, but is here for completeness
manifest.FileName = downloadfilepath
if manifest.VerifyFile() == nil {
// Send a FileDownloaded Event. Usually when VerifyOrResumeDownload is triggered it's because some UI is awaiting the results of a
// Download.
profile.PublishEvent(event.NewEvent(event.FileDownloaded, map[event.Field]string{event.FileKey: fileKey, event.FilePath: downloadfilepath, event.TempFile: downloadfilepath}))
// File is verified and there is nothing else to do...
return nil
} else {

View File

@ -102,6 +102,7 @@ func (i *ImagePreviewsFunctionality) OnContactReceiveValue(profile peer.CwtchPee
if value, exists := profile.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.complete", fileKey)); exists && value == event.True {
if _, err := os.Stat(fp); err == nil {
// file is marked as completed downloaded and exists...
// Note: this will also resend the FileDownloaded event if successful...
if fsf.VerifyOrResumeDownload(profile, conversation.ID, fileKey, constants.ImagePreviewMaxSizeInBytes) == nil {
return
}

View File

@ -2,19 +2,11 @@ package peer
import (
"crypto/rand"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"os"
path "path/filepath"
"sort"
@ -23,6 +15,15 @@ import (
"sync"
"time"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"cwtch.im/cwtch/settings"
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
@ -672,6 +673,7 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
conversationInfo, _ := cp.storage.GetConversationByHandle(handle)
if conversationInfo == nil {
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano))
cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle}))
return conversationID, err
}
@ -836,13 +838,16 @@ func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) {
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), group.GroupServer)
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(group.GroupKey[:]))
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), name)
cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
event.ConversationID: strconv.Itoa(conversationID),
event.GroupID: group.GroupID,
event.GroupServer: group.GroupServer,
event.GroupName: name,
}))
// Trigger an Antispam payment. We need to do this for two reasons
// 1. This server is new and we don't have any antispam tokens yet
// 2. This group is new and needs it's count refreshed
cp.MakeAntispamPayment(server)
return conversationID, nil
}
log.Errorf("error creating group: %v", err)
@ -948,15 +953,11 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
return connections.DISCONNECTED
}
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
// address.
// PeerWithOnion represents a request to connect immediately to a given peer. Instead
// of checking the last seed time, cwtch will treat the current time as the time of last action.
func (cp *cwtchPeer) PeerWithOnion(onion string) {
lastSeen := event.CwtchEpoch
ci, err := cp.FetchConversationInfo(onion)
if err == nil {
lastSeen = cp.GetConversationLastSeenTime(ci.ID)
}
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
lastSeen := time.Now()
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
}
// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
@ -1336,7 +1337,7 @@ func (cp *cwtchPeer) eventHandler() {
ci, err := cp.FetchConversationInfo(ev.Data[event.GroupServer])
if ci == nil || err != nil {
log.Errorf("no server connection count")
return
continue
}
cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(lastReceivedSignature)), ev.Data[event.Signature])
conversations, err := cp.FetchConversations()

View File

@ -110,7 +110,6 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue)
engine.eventManager.Subscribe(event.ProtocolEngineShutdown, engine.queue)
engine.eventManager.Subscribe(event.PeerRequest, engine.queue)
engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue)
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue)
engine.eventManager.Subscribe(event.JoinServer, engine.queue)
engine.eventManager.Subscribe(event.LeaveServer, engine.queue)
@ -150,6 +149,7 @@ func (e *engine) EventManager() event.Manager {
// eventHandler process events from other subsystems
func (e *engine) eventHandler() {
log.Debugf("restartFlow Launching ProtocolEngine listener")
for {
ev := e.queue.Next()
// optimistic shutdown...
@ -160,16 +160,10 @@ func (e *engine) eventHandler() {
case event.StatusRequest:
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
case event.PeerRequest:
log.Debugf("restartFlow Handling Peer Request")
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
go e.peerWithOnion(ev.Data[event.RemotePeer])
}
case event.RetryPeerRequest:
// This event allows engine to treat (automated) retry peering requests differently to user-specified
// peer events
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
log.Debugf("Retrying Peer Request: %v", ev.Data[event.RemotePeer])
go e.peerWithOnion(ev.Data[event.RemotePeer])
}
case event.InvitePeerToGroup:
err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])})
if err != nil {
@ -341,19 +335,22 @@ func (e *engine) listenFn() {
func (e *engine) Shutdown() {
// don't accept any more events...
e.queue.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineShutdown, map[event.Field]string{}))
e.service.Shutdown()
e.shuttingDown.Store(true)
e.ephemeralServicesLock.Lock()
defer e.ephemeralServicesLock.Unlock()
for _, connection := range e.ephemeralServices {
log.Infof("shutting down ephemeral service")
connection.connectingLock.Lock()
connection.service.Shutdown()
connection.connectingLock.Unlock()
}
// work around: service.shutdown() can block for a long time if it is Open()ing a new connection, putting it in a
// goroutine means we can perform this operation and let the per service shutdown in their own time or until the app exits
go func() {
connection.connectingLock.Lock()
connection.service.Shutdown()
connection.connectingLock.Unlock()
}()
}
e.queue.Shutdown()
}
@ -364,23 +361,24 @@ func (e *engine) peerWithOnion(onion string) {
if !e.isBlocked(onion) {
e.ignoreOnShutdown(e.peerConnecting)(onion)
connected, err := e.service.Connect(onion, e.createPeerTemplate())
// If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless)
if connected && err != nil {
conn, err := e.service.GetConnection(onion)
conn, err := e.service.WaitForCapabilityOrClose(onion, cwtchCapability)
if err == nil {
if conn.HasCapability(cwtchCapability) {
e.ignoreOnShutdown(e.peerAuthed)(onion)
return
}
log.Errorf("PeerWithOnion something went very wrong...%v %v", onion, err)
if conn != nil {
conn.Close()
}
e.ignoreOnShutdown(e.peerDisconnected)(onion)
} else {
e.ignoreOnShutdown(e.peerDisconnected)(onion)
}
}
// Only issue a disconnected error if we are disconnected (Connect will fail if a connection already exists)
if !connected && err != nil {
e.ignoreOnShutdown(e.peerDisconnected)(onion)
}
}
}
@ -395,6 +393,10 @@ func (e *engine) makeAntispamPayment(onion string) {
return
}
// Before doing anything, send and event with the current number of token
// This may unblock downstream processes who don't have an accurate token count
e.PokeTokenCount(onion)
conn, err := ephemeralService.service.GetConnection(onion)
if err == nil {
tokenApp, ok := (conn.App()).(*TokenBoardClient)

View File

@ -51,3 +51,9 @@ func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)}))
return token, numTokens, err
}
func (e *engine) PokeTokenCount(tokenService string) {
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
tokenManager := tokenManagerPointer.(*TokenManager)
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())}))
}

View File

@ -56,6 +56,7 @@ type GlobalSettings struct {
UseTorCache bool
TorCacheDir string
BlodeuweddPath string
FontScaling float64
}
var DefaultGlobalSettings = GlobalSettings{
@ -81,6 +82,7 @@ var DefaultGlobalSettings = GlobalSettings{
UseTorCache: false,
TorCacheDir: "",
BlodeuweddPath: "",
FontScaling: 1.0, // use the system pixel scaling default
}
func InitGlobalSettingsFile(directory string, password string) (*GlobalSettingsFile, error) {

View File

@ -3,7 +3,7 @@
set -e
pwd
GORACE="haltonerror=1"
go test -race ${1} -coverprofile=plugins.cover.out -v ./app/plugins
go test -coverprofile=plugins.cover.out -v ./app/plugins
go test -race ${1} -coverprofile=model.cover.out -v ./model
go test -race ${1} -coverprofile=event.cover.out -v ./event
go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1