add activateEngine to app to handle multiple profiles a little more gracefully; lauchPeerConnections sorts based on last message time; contactRetry slow downs and partial state tracking of circuit queue for adaptive slow downs

This commit is contained in:
Dan Ballard 2022-09-25 18:28:04 -07:00
parent 9ef244bc80
commit 6d8f31773e
4 changed files with 129 additions and 25 deletions

View File

@ -44,7 +44,8 @@ type Application interface {
QueryACNStatus()
QueryACNVersion()
ActivePeerEngine(onion string, doListen, doPeers, doServers bool)
ActivateEngine(doListn, doPeers, doServers bool)
ActivatePeerEngine(onion string, doListen, doPeers, doServers bool)
DeactivatePeerEngine(onion string)
ShutdownPeer(string)
@ -244,20 +245,47 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
return false
}
func (app *application) ActivateEngine(doListen, doPeers, doServers bool) {
log.Infof("ActivateEngine")
for _, profile := range app.peers {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
}
if doListen {
for _, profile := range app.peers {
log.Infof(" Listen for %v", profile.GetOnion())
profile.Listen()
}
}
if doServers {
for _, profile := range app.peers {
log.Infof(" StartServerCons for %v", profile.GetOnion())
profile.StartServerConnections()
}
}
if doPeers {
for _, profile := range app.peers {
log.Infof(" StartPeerCons for %v", profile.GetOnion())
profile.StartPeersConnections()
}
}
}
// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) {
func (app *application) ActivatePeerEngine(onion string, doListen, doPeers, doServers bool) {
profile := app.GetPeer(onion)
if profile != nil {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
if doListen {
profile.Listen()
}
if doPeers {
profile.StartPeersConnections()
}
if doServers {
profile.StartServerConnections()
}
if doPeers {
profile.StartPeersConnections()
}
}
}

View File

@ -4,12 +4,18 @@ import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
"math"
"sync"
"sync/atomic"
"time"
)
const tickTime = 30 * time.Second
const maxBackoff int = 10 // 320 seconds or ~5 min
const tickTimeSec = 300 //120
const tickTime = tickTimeSec * time.Second
const baseMaxBackoffPeer int = 6 // 5min * 6 = 30min
// Servers don't reach out, we can assume other peers coming online will ideally contact us, servers will not
const baseMaxBackoffServer int = 3 // 5min * 3 = 15min
const circutTimeoutMins float64 = 2
type connectionType int
@ -36,12 +42,13 @@ type contactRetry struct {
onion string
lastCheck time.Time
connections sync.Map //[string]*contact
connectingCount int64
connections sync.Map //[string]*contact
}
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing
func NewConnectionRetry(bus event.Manager, onion string) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion}
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion, connectingCount: 0}
return cr
}
@ -108,20 +115,38 @@ func (cr *contactRetry) run() {
}
}
func calcPendingMultiplier(connectingCount int) int {
throughPutPerMin := (int)(math.Floor(connections.TorMaxPendingConns / circutTimeoutMins))
minsToClear := connectingCount / throughPutPerMin
baseMaxServerTime := baseMaxBackoffServer * (tickTimeSec / 60) // the lower of the two queues * the tick time in min
multiplier := minsToClear / baseMaxServerTime
if multiplier < 1 {
return 1
}
return multiplier
}
func (cr *contactRetry) retryDisconnected() {
var retryCount int64 = 0
cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact)
pendingMultiplier := calcPendingMultiplier((int)(atomic.LoadInt64(&cr.connectingCount) + retryCount))
if p.state == connections.DISCONNECTED {
p.ticks++
if p.ticks >= p.backoff {
log.Infof("Retrying on disconnected connection, with pendingmult: %v", pendingMultiplier)
if p.ticks >= (p.backoff * pendingMultiplier) {
retryCount++
p.ticks = 0
if cr.networkUp {
if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
go func(id string) {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: id}))
}(p.id)
}
if p.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
go func(id string) {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
}(p.id)
}
}
}
@ -140,16 +165,18 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype}
cr.connections.Store(id, p)
cr.manageStateChange(state, connections.DISCONNECTED)
return
}
pinf, _ := cr.connections.Load(id)
p := pinf.(*contact)
cr.manageStateChange(state, p.state)
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
p.state = connections.DISCONNECTED
if p.backoff == 0 {
p.backoff = 1
} else if p.backoff < maxBackoff {
} else if p.backoff < baseMaxBackoffPeer {
p.backoff *= 2
}
p.ticks = 0
@ -161,6 +188,16 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
}
}
func (cr *contactRetry) manageStateChange(state, prevState connections.ConnectionState) {
if state == connections.CONNECTING {
atomic.AddInt64(&cr.connectingCount, 1)
log.Infof("New connecting, connectingCount: %v", atomic.LoadInt64(&cr.connectingCount))
} else if prevState == connections.CONNECTING {
atomic.AddInt64(&cr.connectingCount, -1)
log.Infof("Failed or Connected, connectingCount: %v", atomic.LoadInt64(&cr.connectingCount))
}
}
func (cr *contactRetry) Shutdown() {
cr.breakChan <- true
}

View File

@ -15,10 +15,10 @@ import (
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
"math/bits"
rand2 "math/rand"
"os"
path "path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
@ -809,13 +809,12 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
// address.
func (cp *cwtchPeer) PeerWithOnion(onion string) {
go func() {
// wait a random number of seconds before triggering
// this cuts down on contention in the evengit st bus
randWait := time.Duration(rand2.Int() % 60)
time.Sleep(randWait * time.Second)
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
}()
log.Infof("PeerWithOnion go")
//go func() {
// time.Sleep(10 * time.Second)
log.Infof(" peerWithOnion wake, Pub")
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
//}()
}
// SendInviteToConversation kicks off the invite process
@ -944,12 +943,12 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
if !exists {
signature = base64.StdEncoding.EncodeToString([]byte{})
}
cachedTokensJson, hasCachedTokens := ci.GetAttribute(attr.LocalScope, attr.ServerZone, "tokens")
if hasCachedTokens {
log.Debugf("using cached tokens for %v", ci.Handle)
}
log.Infof("Publish JoinServer Event")
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson}))
return nil
}
@ -1004,28 +1003,61 @@ func (cp *cwtchPeer) Listen() {
}
type RecentConversation struct {
model *model.Conversation
lastMessageTime int
}
// StartPeersConnections attempts to connect to peer connections
// Status: Ready for 1.5
func (cp *cwtchPeer) StartPeersConnections() {
conversations, _ := cp.FetchConversations()
byRecent := []*RecentConversation{}
log.Infof("StartPeerConnections")
for _, conversation := range conversations {
if conversation.Accepted && !conversation.IsGroup() && !conversation.IsServer() {
cp.PeerWithOnion(conversation.Handle)
lastMessageTime := 0
lastMessage, _ := cp.GetMostRecentMessages(conversation.ID, 0, 0, 1)
if len(lastMessage) != 0 {
time, err := time.Parse(time.RFC3339Nano, lastMessage[0].Attr[constants.AttrSentTimestamp])
if err == nil {
lastMessageTime = int(time.Unix())
}
}
byRecent = append(byRecent, &RecentConversation{conversation, lastMessageTime})
}
}
sort.Slice(byRecent, func(i, j int) bool {
return byRecent[i].lastMessageTime > byRecent[j].lastMessageTime
})
for _, conversation := range byRecent {
log.Infof(" PeerWithOnion(%v)", conversation.model.Handle)
//go func(contact string) {
cp.PeerWithOnion(conversation.model.Handle)
//}(conversation.model.Handle)
}
}
// StartServerConnections attempts to connect to all server connections
// Status: Ready for 1.5
func (cp *cwtchPeer) StartServerConnections() {
log.Infof("StartServerConections")
conversations, _ := cp.FetchConversations()
for _, conversation := range conversations {
if conversation.IsServer() {
log.Infof(" joinServer(%v)", conversation.Handle)
//go func(server string) {
// time.Sleep(5 * time.Second)
err := cp.JoinServer(conversation.Handle)
if err != nil {
// Almost certainly a programming error so print it..
log.Errorf("error joining server %v", err)
}
//}(conversation.Handle)
}
}
}

View File

@ -27,6 +27,8 @@ import (
"golang.org/x/crypto/ed25519"
)
const TorMaxPendingConns = 32 // tor/src/app/config/config.c MaxClientCircuitsPending
type connectionLockedService struct {
service tapir.Service
connectingLock sync.Mutex
@ -138,6 +140,7 @@ func (e *engine) EventManager() event.Manager {
// eventHandler process events from other subsystems
func (e *engine) eventHandler() {
log.Infof("engine.EventHandler()...")
for {
ev := e.queue.Next()
// optimistic shutdown...
@ -338,11 +341,15 @@ func (e *engine) Shutdown() {
// peerWithOnion is the entry point for cwtchPeer relationships
// needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithOnion(onion string) {
log.Infof("engine.peerWithOnion(%v)", onion)
log.Debugf("Called PeerWithOnion for %v", onion)
if !e.isBlocked(onion) {
e.ignoreOnShutdown(e.peerConnecting)(onion)
log.Infof(" service.Connect(%v)", onion)
connected, err := e.service.Connect(onion, e.createPeerTemplate())
if err != nil {
log.Errorf("peerWithOnion err form Connect: %v\n", err)
}
// If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless)