add to app ActivatePeerEngine; add to peer StartConnections; order connection attempts by lastseend (track); massive connection retry rework
continuous-integration/drone/pr Build is pending Details

This commit is contained in:
Dan Ballard 2022-12-02 16:23:11 -08:00
parent 6d8f31773e
commit ad72ce6e7a
12 changed files with 422 additions and 162 deletions

View File

@ -104,6 +104,13 @@ func (ap *application) AddPlugin(peerid string, id plugins.PluginID, bus event.M
pluginsinf, _ := ap.plugins.Load(peerid) pluginsinf, _ := ap.plugins.Load(peerid)
peerPlugins := pluginsinf.([]plugins.Plugin) peerPlugins := pluginsinf.([]plugins.Plugin)
for _, plugin := range peerPlugins {
if plugin.Id() == id {
log.Errorf("trying to add second instance of plugin %v to peer %v", id, peerid)
return
}
}
newp, err := plugins.Get(id, bus, acn, peerid) newp, err := plugins.Get(id, bus, acn, peerid)
if err == nil { if err == nil {
newp.Start() newp.Start()
@ -136,7 +143,7 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin
if tag != "" { if tag != "" {
profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag) profile.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag)
} }
app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True})) app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.True}))
} }
@ -237,6 +244,7 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
app.eventBuses[profile.GetOnion()] = eventBus app.eventBuses[profile.GetOnion()] = eventBus
profile.Init(app.eventBuses[profile.GetOnion()]) profile.Init(app.eventBuses[profile.GetOnion()])
app.peers[profile.GetOnion()] = profile app.peers[profile.GetOnion()] = profile
app.AddPeerPlugin(profile.GetOnion(), plugins.CONNECTIONRETRY) // Now Mandatory
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False})) app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.GetOnion(), event.Created: event.False}))
return true return true
} }
@ -246,7 +254,7 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
} }
func (app *application) ActivateEngine(doListen, doPeers, doServers bool) { func (app *application) ActivateEngine(doListen, doPeers, doServers bool) {
log.Infof("ActivateEngine") log.Debugf("ActivateEngine")
for _, profile := range app.peers { for _, profile := range app.peers {
app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()]) app.engines[profile.GetOnion()], _ = profile.GenerateProtocolEngine(app.acn, app.eventBuses[profile.GetOnion()])
@ -254,20 +262,15 @@ func (app *application) ActivateEngine(doListen, doPeers, doServers bool) {
if doListen { if doListen {
for _, profile := range app.peers { for _, profile := range app.peers {
log.Infof(" Listen for %v", profile.GetOnion()) log.Debugf(" Listen for %v", profile.GetOnion())
profile.Listen() profile.Listen()
} }
} }
if doServers {
if doPeers || doServers {
for _, profile := range app.peers { for _, profile := range app.peers {
log.Infof(" StartServerCons for %v", profile.GetOnion()) log.Debugf(" Start Connections for %v doPeers:%v doServers:%v", profile.GetOnion(), doPeers, doServers)
profile.StartServerConnections() profile.StartConnections(doPeers, doServers)
}
}
if doPeers {
for _, profile := range app.peers {
log.Infof(" StartPeerCons for %v", profile.GetOnion())
profile.StartPeersConnections()
} }
} }
} }
@ -280,12 +283,7 @@ func (app *application) ActivatePeerEngine(onion string, doListen, doPeers, doSe
if doListen { if doListen {
profile.Listen() profile.Listen()
} }
if doServers { profile.StartConnections(doPeers, doServers)
profile.StartServerConnections()
}
if doPeers {
profile.StartPeersConnections()
}
} }
} }

View File

@ -18,6 +18,10 @@ func (a *antispam) Start() {
go a.run() go a.run()
} }
func (cr *antispam) Id() PluginID {
return ANTISPAM
}
func (a antispam) Shutdown() { func (a antispam) Shutdown() {
a.breakChan <- true a.breakChan <- true
} }

View File

@ -10,12 +10,13 @@ import (
"time" "time"
) )
const tickTimeSec = 300 //120 const tickTimeSec = 30
const tickTime = tickTimeSec * time.Second const tickTime = tickTimeSec * time.Second
const baseMaxBackoffPeer int = 6 // 5min * 6 = 30min
// Servers don't reach out, we can assume other peers coming online will ideally contact us, servers will not const circutTimeoutSecs int = 120
const baseMaxBackoffServer int = 3 // 5min * 3 = 15min
const circutTimeoutMins float64 = 2 const MaxBaseTimeoutSec = 5 * 60 // a max base time out of 5 min
const maxFailedBackoff = 6 // 2^6 = 64 -> 64 * [2m to 5m] = 2h8m to 5h20m
type connectionType int type connectionType int
@ -29,29 +30,126 @@ type contact struct {
state connections.ConnectionState state connections.ConnectionState
ctype connectionType ctype connectionType
ticks int lastAttempt time.Time
backoff int failedCount int
lastSeen time.Time
queued bool
}
// compare a to b
// returns -1 if a < b
//
// 0 if a == b
// +1 if a > b
//
// algo: sort by failedCount first favouring less attempts, then sort by lastSeen time favouring more recent connections
func (a *contact) compare(b *contact) int {
if a.failedCount < b.failedCount {
return -1
} else if a.failedCount > b.failedCount {
return +1
}
if a.lastSeen.After(b.lastSeen) {
return -1
} else if a.lastSeen.Before(b.lastSeen) {
return +1
}
return 0
}
type connectionQueue struct {
queue []*contact
lock sync.Mutex
}
func newConnectionQueue() *connectionQueue {
return &connectionQueue{queue: []*contact{}}
}
func (cq *connectionQueue) insert(c *contact) {
cq.lock.Lock()
defer cq.lock.Unlock()
// find loc
i := 0
var b *contact
for i, b = range cq.queue {
if c.compare(b) >= 0 {
break
}
}
// insert
if len(cq.queue) == i { // nil or empty slice or after last element
cq.queue = append(cq.queue, c)
} else {
cq.queue = append(cq.queue[:i+1], cq.queue[i:]...) // index < len(a)
cq.queue[i] = c
}
c.queued = true
}
func (cq *connectionQueue) dequeue() *contact {
cq.lock.Lock()
defer cq.lock.Unlock()
if len(cq.queue) == 0 {
return nil
}
c := cq.queue[0]
cq.queue = cq.queue[1:]
c.queued = false
return c
} }
type contactRetry struct { type contactRetry struct {
bus event.Manager bus event.Manager
queue event.Queue queue event.Queue
networkUp bool networkUp bool
running bool networkUpTime time.Time
breakChan chan bool running bool
onion string breakChan chan bool
lastCheck time.Time onion string
lastCheck time.Time
connectingCount int64 connections sync.Map //[string]*contact
connections sync.Map //[string]*contact connCount int64
pendingQueue *connectionQueue
} }
// NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a backoff timing // NewConnectionRetry returns a Plugin that when started will retry connecting to contacts with a failedCount timing
func NewConnectionRetry(bus event.Manager, onion string) Plugin { func NewConnectionRetry(bus event.Manager, onion string) Plugin {
cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, networkUp: false, onion: onion, connectingCount: 0} cr := &contactRetry{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1), connections: sync.Map{}, connCount: 0, networkUp: false, networkUpTime: time.Now(), onion: onion, pendingQueue: newConnectionQueue()}
return cr return cr
} }
// maxTorCircuitsPending a function to throttle access to tor network during start up
func (cr *contactRetry) maxTorCircuitsPending() int {
timeSinceStart := time.Now().Sub(cr.networkUpTime)
if timeSinceStart < 30*time.Second {
return 4
} else if timeSinceStart < 4*time.Minute {
return 8
} else if timeSinceStart < 8*time.Minute {
return 16
}
return connections.TorMaxPendingConns
}
func (cr *contactRetry) connectingCount() int {
connecting := 0
cr.connections.Range(func(k, v interface{}) bool {
conn := v.(*contact)
if conn.state == connections.CONNECTING {
connecting++
}
return true
})
return connecting
}
func (cr *contactRetry) Start() { func (cr *contactRetry) Start() {
if !cr.running { if !cr.running {
go cr.run() go cr.run()
@ -60,17 +158,34 @@ func (cr *contactRetry) Start() {
} }
} }
func (cr *contactRetry) Id() PluginID {
return CONNECTIONRETRY
}
func (cr *contactRetry) run() { func (cr *contactRetry) run() {
cr.running = true cr.running = true
cr.bus.Subscribe(event.PeerStateChange, cr.queue) cr.bus.Subscribe(event.PeerStateChange, cr.queue)
cr.bus.Subscribe(event.ACNStatus, cr.queue) cr.bus.Subscribe(event.ACNStatus, cr.queue)
cr.bus.Subscribe(event.ServerStateChange, cr.queue) cr.bus.Subscribe(event.ServerStateChange, cr.queue)
cr.bus.Subscribe(event.PeerRequest, cr.queue)
cr.bus.Subscribe(event.QueuePeerRequest, cr.queue)
cr.bus.Subscribe(event.QueueJoinServer, cr.queue)
for { for {
if time.Since(cr.lastCheck) > tickTime { cr.requeueReady()
cr.retryDisconnected() connectingCount := cr.connectingCount()
cr.lastCheck = time.Now() connCount := atomic.LoadInt64(&cr.connCount)
log.Debugf("checking queue (len: %v) of total conns watched: %v, with current connecingCount: %v", len(cr.pendingQueue.queue), connCount, connectingCount)
for connectingCount < cr.maxTorCircuitsPending() && len(cr.pendingQueue.queue) > 0 {
contact := cr.pendingQueue.dequeue()
// could have received incoming connection while in queue, make sure still disconnected before trying
if contact.state == connections.DISCONNECTED {
cr.publishConnectionRequest(contact)
connectingCount++
}
} }
cr.lastCheck = time.Now()
select { select {
case e := <-cr.queue.OutChan(): case e := <-cr.queue.OutChan():
switch e.EventType { switch e.EventType {
@ -84,20 +199,38 @@ func (cr *contactRetry) run() {
server := e.Data[event.GroupServer] server := e.Data[event.GroupServer]
cr.handleEvent(server, state, serverConn) cr.handleEvent(server, state, serverConn)
case event.QueueJoinServer:
fallthrough
case event.QueuePeerRequest:
lastSeen, err := time.Parse(e.Data[event.LastSeen], time.RFC3339Nano)
if err != nil {
lastSeen = event.CwtchEpoch
}
id := ""
if peer, exists := e.Data[event.RemotePeer]; exists {
id = peer
cr.addConnection(peer, connections.DISCONNECTED, peerConn, lastSeen)
} else if server, exists := e.Data[event.GroupServer]; exists {
id = server
cr.addConnection(server, connections.DISCONNECTED, serverConn, lastSeen)
}
if c, ok := cr.connections.Load(id); ok {
contact := c.(*contact)
if contact.state == connections.DISCONNECTED && !contact.queued {
cr.pendingQueue.insert(contact)
}
}
case event.ACNStatus: case event.ACNStatus:
prog := e.Data[event.Progress] prog := e.Data[event.Progress]
if prog == "100" && !cr.networkUp { if prog == "100" && !cr.networkUp {
cr.networkUp = true cr.networkUp = true
cr.networkUpTime = time.Now()
cr.connections.Range(func(k, v interface{}) bool { cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact) p := v.(*contact)
p.ticks = 0 p.failedCount = 0
p.backoff = 1
if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
}
if p.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
}
return true return true
}) })
} else if prog != "100" { } else if prog != "100" {
@ -115,47 +248,68 @@ func (cr *contactRetry) run() {
} }
} }
func calcPendingMultiplier(connectingCount int) int { func ticksToSec(ticks int) int {
throughPutPerMin := (int)(math.Floor(connections.TorMaxPendingConns / circutTimeoutMins)) return ticks * tickTimeSec
minsToClear := connectingCount / throughPutPerMin
baseMaxServerTime := baseMaxBackoffServer * (tickTimeSec / 60) // the lower of the two queues * the tick time in min
multiplier := minsToClear / baseMaxServerTime
if multiplier < 1 {
return 1
}
return multiplier
} }
func (cr *contactRetry) retryDisconnected() { func (cr *contactRetry) requeueReady() {
var retryCount int64 = 0 if !cr.networkUp {
return
}
retryable := []*contact{}
count := atomic.LoadInt64(&cr.connCount)
throughPutPerMin := cr.maxTorCircuitsPending() / (circutTimeoutSecs / 60)
adjustedBaseTimeout := int(count) / throughPutPerMin * 60
if adjustedBaseTimeout < circutTimeoutSecs {
adjustedBaseTimeout = circutTimeoutSecs
} else if adjustedBaseTimeout > MaxBaseTimeoutSec {
adjustedBaseTimeout = MaxBaseTimeoutSec
}
cr.connections.Range(func(k, v interface{}) bool { cr.connections.Range(func(k, v interface{}) bool {
p := v.(*contact) p := v.(*contact)
pendingMultiplier := calcPendingMultiplier((int)(atomic.LoadInt64(&cr.connectingCount) + retryCount)) if p.state == connections.DISCONNECTED && !p.queued {
if p.state == connections.DISCONNECTED { timeout := time.Duration((math.Pow(2, float64(p.failedCount)))*float64(adjustedBaseTimeout /*baseTimeoutSec*/)) * time.Second
p.ticks++ if time.Now().Sub(p.lastAttempt) > timeout {
log.Infof("Retrying on disconnected connection, with pendingmult: %v", pendingMultiplier) retryable = append(retryable, p)
if p.ticks >= (p.backoff * pendingMultiplier) {
retryCount++
p.ticks = 0
if cr.networkUp {
if p.ctype == peerConn {
go func(id string) {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: id}))
}(p.id)
}
if p.ctype == serverConn {
go func(id string) {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
}(p.id)
}
}
} }
} }
return true return true
}) })
for _, contact := range retryable {
cr.pendingQueue.insert(contact)
}
}
func (cr *contactRetry) publishConnectionRequest(contact *contact) {
if contact.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: contact.id}))
}
if contact.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: contact.id}))
}
contact.state = connections.CONNECTING // Hacky but needed so we don't over flood waiting for PeerStateChange from engine
contact.lastAttempt = time.Now()
}
func (cr *contactRetry) addConnection(id string, state connections.ConnectionState, ctype connectionType, lastSeen time.Time) {
// don't handle contact retries for ourselves
if id == cr.onion {
return
}
if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: state, failedCount: 0, lastAttempt: event.CwtchEpoch, ctype: ctype, lastSeen: lastSeen, queued: false}
cr.connections.Store(id, p)
atomic.AddInt64(&cr.connCount, 1)
return
}
} }
func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) { func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState, ctype connectionType) {
log.Debugf("cr.handleEvent state to %v on id %v", connections.ConnectionStateName[state], id)
// don't handle contact retries for ourselves // don't handle contact retries for ourselves
if id == cr.onion { if id == cr.onion {
@ -163,41 +317,34 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
} }
if _, exists := cr.connections.Load(id); !exists { if _, exists := cr.connections.Load(id); !exists {
p := &contact{id: id, state: state, backoff: 0, ticks: 0, ctype: ctype} cr.addConnection(id, state, ctype, event.CwtchEpoch)
cr.connections.Store(id, p)
cr.manageStateChange(state, connections.DISCONNECTED)
return return
} }
pinf, _ := cr.connections.Load(id) pinf, _ := cr.connections.Load(id)
p := pinf.(*contact) p := pinf.(*contact)
cr.manageStateChange(state, p.state) log.Infof(" managing state change for %v %v to %v by self %v", id, connections.ConnectionStateName[p.state], connections.ConnectionStateName[state], cr.onion)
if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED { if state == connections.DISCONNECTED || state == connections.FAILED || state == connections.KILLED {
p.state = connections.DISCONNECTED if p.state == connections.SYNCED || p.state == connections.AUTHENTICATED {
if p.backoff == 0 { p.lastSeen = time.Now()
p.backoff = 1 } else {
} else if p.backoff < baseMaxBackoffPeer { p.failedCount += 1
p.backoff *= 2 }
p.state = connections.DISCONNECTED
p.lastAttempt = time.Now()
if p.failedCount > maxFailedBackoff {
p.failedCount = maxFailedBackoff
} }
p.ticks = 0
} else if state == connections.CONNECTING || state == connections.CONNECTED { } else if state == connections.CONNECTING || state == connections.CONNECTED {
p.state = state p.state = state
} else if state == connections.AUTHENTICATED { } else if state == connections.AUTHENTICATED || state == connections.SYNCED {
p.state = state p.state = state
p.backoff = 0 p.lastSeen = time.Now()
} p.failedCount = 0
}
func (cr *contactRetry) manageStateChange(state, prevState connections.ConnectionState) {
if state == connections.CONNECTING {
atomic.AddInt64(&cr.connectingCount, 1)
log.Infof("New connecting, connectingCount: %v", atomic.LoadInt64(&cr.connectingCount))
} else if prevState == connections.CONNECTING {
atomic.AddInt64(&cr.connectingCount, -1)
log.Infof("Failed or Connected, connectingCount: %v", atomic.LoadInt64(&cr.connectingCount))
} }
} }
func (cr *contactRetry) Shutdown() { func (cr *contactRetry) Shutdown() {
cr.breakChan <- true cr.breakChan <- true
cr.queue.Shutdown()
} }

View File

@ -40,6 +40,10 @@ func (nc *networkCheck) Start() {
go nc.run() go nc.run()
} }
func (cr *networkCheck) Id() PluginID {
return NETWORKCHECK
}
func (nc *networkCheck) run() { func (nc *networkCheck) run() {
nc.running = true nc.running = true
nc.offline = true nc.offline = true

View File

@ -20,6 +20,7 @@ const (
type Plugin interface { type Plugin interface {
Start() Start()
Shutdown() Shutdown()
Id() PluginID
} }
// Get is a plugin factory for the requested plugin // Get is a plugin factory for the requested plugin

View File

@ -1,5 +1,9 @@
package event package event
import "time"
var CwtchEpoch = time.Date(2020, 6, 1, 0, 0, 0, 0, time.UTC)
// Type captures the definition of many common Cwtch application events // Type captures the definition of many common Cwtch application events
type Type string type Type string
@ -13,6 +17,14 @@ const (
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd" // RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
PeerRequest = Type("PeerRequest") PeerRequest = Type("PeerRequest")
// QueuePeerRequest
// When peer has too many peers to try and wants to ease off Tor throttling, use this to notify ContactRetry plugin to schedule a peer for later try
// LastSeen: last seen time of the contact
// And one of
// RemotePeer
// GroupServer
QueuePeerRequest = Type("QueuePeerRequest")
// RetryPeerRequest // RetryPeerRequest
// Identical to PeerRequest, but allows Engine to make decisions regarding blocked peers // Identical to PeerRequest, but allows Engine to make decisions regarding blocked peers
// attributes: // attributes:
@ -35,7 +47,8 @@ const (
AllowUnknownPeers = Type("AllowUnknownPeers") AllowUnknownPeers = Type("AllowUnknownPeers")
// GroupServer // GroupServer
JoinServer = Type("JoinServer") QueueJoinServer = Type("QueueJoinServer")
JoinServer = Type("JoinServer")
// attributes GroupServer - the onion of the server to leave // attributes GroupServer - the onion of the server to leave
LeaveServer = Type("LeaveServer") LeaveServer = Type("LeaveServer")
@ -217,6 +230,7 @@ const (
Onion = Field("Onion") Onion = Field("Onion")
RemotePeer = Field("RemotePeer") RemotePeer = Field("RemotePeer")
LastSeen = Field("LastSeen")
Ciphertext = Field("Ciphertext") Ciphertext = Field("Ciphertext")
Signature = Field("Signature") Signature = Field("Signature")
CachedTokens = Field("CachedTokens") CachedTokens = Field("CachedTokens")

View File

@ -54,3 +54,5 @@ const CustomProfileImageKey = "custom-profile-image"
const SyncPreLastMessageTime = "SyncPreLastMessageTime" const SyncPreLastMessageTime = "SyncPreLastMessageTime"
const SyncMostRecentMessageTime = "SyncMostRecentMessageTime" const SyncMostRecentMessageTime = "SyncMostRecentMessageTime"
const AttrLastConnectionTime = "last-connection-time"

View File

@ -809,12 +809,34 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion // PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
// address. // address.
func (cp *cwtchPeer) PeerWithOnion(onion string) { func (cp *cwtchPeer) PeerWithOnion(onion string) {
log.Infof("PeerWithOnion go") lastSeen := event.CwtchEpoch
//go func() { ci, err := cp.FetchConversationInfo(onion)
// time.Sleep(10 * time.Second) if err == nil {
log.Infof(" peerWithOnion wake, Pub") lastSeen = cp.GetConversationLastSeenTime(ci.ID)
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) }
//}() cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
}
// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
// Status: Ready for 1.10
func (cp *cwtchPeer) QueuePeeringWithOnion(handle string) {
lastSeen := event.CwtchEpoch
ci, err := cp.FetchConversationInfo(handle)
if err == nil {
lastSeen = cp.GetConversationLastSeenTime(ci.ID)
}
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
}
// QueueJoinServer sends the request to join a server directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
// Status: Ready for 1.10
func (cp *cwtchPeer) QueueJoinServer(handle string) {
lastSeen := event.CwtchEpoch
ci, err := cp.FetchConversationInfo(handle)
if err == nil {
lastSeen = cp.GetConversationLastSeenTime(ci.ID)
}
cp.eventBus.Publish(event.NewEvent(event.QueueJoinServer, map[event.Field]string{event.GroupServer: handle, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
} }
// SendInviteToConversation kicks off the invite process // SendInviteToConversation kicks off the invite process
@ -948,7 +970,6 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
log.Debugf("using cached tokens for %v", ci.Handle) log.Debugf("using cached tokens for %v", ci.Handle)
} }
log.Infof("Publish JoinServer Event")
cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson})) cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[event.Field]string{event.GroupServer: onion, event.ServerTokenY: tokenY, event.ServerTokenOnion: tokenOnion, event.Signature: signature, event.CachedTokens: cachedTokensJson}))
return nil return nil
} }
@ -1003,61 +1024,97 @@ func (cp *cwtchPeer) Listen() {
} }
type RecentConversation struct { type LastSeenConversation struct {
model *model.Conversation model *model.Conversation
lastMessageTime int lastSeen time.Time
}
func (cp *cwtchPeer) GetConversationLastSeenTime(conversationId int) time.Time {
timestamp, err := cp.GetConversationAttribute(conversationId, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)))
if err == nil {
if time, err := time.Parse(time.RFC3339Nano, timestamp); err == nil {
return time
}
}
lastMessage, _ := cp.GetMostRecentMessages(conversationId, 0, 0, 1)
if len(lastMessage) != 0 {
time, err := time.Parse(time.RFC3339Nano, lastMessage[0].Attr[constants.AttrSentTimestamp])
if err == nil {
return time
}
}
// Cwtch launch date
return event.CwtchEpoch
}
func (cp *cwtchPeer) getConnectionsSortedByLastSeen(doPeers, doServers bool) []*LastSeenConversation {
conversations, _ := cp.FetchConversations()
byRecent := []*LastSeenConversation{}
for _, conversation := range conversations {
if conversation.Accepted && !conversation.IsGroup() {
if conversation.IsServer() {
if !doServers {
continue
}
} else {
if !doPeers {
continue
}
}
byRecent = append(byRecent, &LastSeenConversation{conversation, cp.GetConversationLastSeenTime(conversation.ID)})
}
}
sort.Slice(byRecent, func(i, j int) bool {
return byRecent[i].lastSeen.After(byRecent[j].lastSeen)
})
return byRecent
}
func (cp *cwtchPeer) StartConnections(doPeers, doServers bool) {
byRecent := cp.getConnectionsSortedByLastSeen(doPeers, doServers)
log.Infof("StartConnections for %v", cp.GetOnion())
for _, conversation := range byRecent {
if conversation.model.IsServer() {
log.Infof(" QueueJoinServer(%v)", conversation.model.Handle)
cp.QueueJoinServer(conversation.model.Handle)
} else {
log.Infof(" QueuePeerWithOnion(%v)", conversation.model.Handle)
cp.QueuePeeringWithOnion(conversation.model.Handle)
}
time.Sleep(50 * time.Millisecond)
}
} }
// StartPeersConnections attempts to connect to peer connections // StartPeersConnections attempts to connect to peer connections
// Status: Ready for 1.5 // Status: Ready for 1.5
// Deprecated: for 1.10 use StartConnections
func (cp *cwtchPeer) StartPeersConnections() { func (cp *cwtchPeer) StartPeersConnections() {
conversations, _ := cp.FetchConversations()
byRecent := []*RecentConversation{}
log.Infof("StartPeerConnections") log.Infof("StartPeerConnections")
for _, conversation := range conversations { byRecent := cp.getConnectionsSortedByLastSeen(true, false)
if conversation.Accepted && !conversation.IsGroup() && !conversation.IsServer() {
lastMessageTime := 0
lastMessage, _ := cp.GetMostRecentMessages(conversation.ID, 0, 0, 1)
if len(lastMessage) != 0 {
time, err := time.Parse(time.RFC3339Nano, lastMessage[0].Attr[constants.AttrSentTimestamp])
if err == nil {
lastMessageTime = int(time.Unix())
}
}
byRecent = append(byRecent, &RecentConversation{conversation, lastMessageTime})
}
}
sort.Slice(byRecent, func(i, j int) bool {
return byRecent[i].lastMessageTime > byRecent[j].lastMessageTime
})
for _, conversation := range byRecent { for _, conversation := range byRecent {
log.Infof(" PeerWithOnion(%v)", conversation.model.Handle) log.Infof(" PeerWithOnion(%v)", conversation.model.Handle)
//go func(contact string) { cp.QueuePeeringWithOnion(conversation.model.Handle)
cp.PeerWithOnion(conversation.model.Handle)
//}(conversation.model.Handle)
} }
} }
// StartServerConnections attempts to connect to all server connections // StartServerConnections attempts to connect to all server connections
// Status: Ready for 1.5 // Status: Ready for 1.5
// Deprecated: for 1.10 use StartConnections
func (cp *cwtchPeer) StartServerConnections() { func (cp *cwtchPeer) StartServerConnections() {
log.Infof("StartServerConections") log.Infof("StartServerConections")
conversations, _ := cp.FetchConversations() byRecent := cp.getConnectionsSortedByLastSeen(false, true)
for _, conversation := range conversations {
if conversation.IsServer() {
log.Infof(" joinServer(%v)", conversation.Handle)
//go func(server string) {
// time.Sleep(5 * time.Second)
err := cp.JoinServer(conversation.Handle)
if err != nil {
// Almost certainly a programming error so print it..
log.Errorf("error joining server %v", err)
}
//}(conversation.Handle)
for _, conversation := range byRecent {
if conversation.model.IsServer() {
log.Infof(" QueueJoinServer(%v)", conversation.model.Handle)
cp.QueueJoinServer(conversation.model.Handle)
} }
} }
} }
@ -1342,9 +1399,27 @@ func (cp *cwtchPeer) eventHandler() {
case event.PeerStateChange: case event.PeerStateChange:
handle := ev.Data[event.RemotePeer] handle := ev.Data[event.RemotePeer]
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED { if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
_, err := cp.FetchConversationInfo(handle) ci, err := cp.FetchConversationInfo(handle)
var cid int
if err != nil { if err != nil {
cp.NewContactConversation(handle, model.DefaultP2PAccessControl(), false) // if it's a newly authenticated connection with no conversation storage, init
cid, err = cp.NewContactConversation(handle, model.DefaultP2PAccessControl(), false)
} else {
cid = ci.ID
}
timestamp := time.Now().Format(time.RFC3339Nano)
cp.SetConversationAttribute(cid, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp)
} else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED {
ci, err := cp.FetchConversationInfo(handle)
if err == nil {
cp.mutex.Lock()
if cp.state[ev.Data[event.RemotePeer]] == connections.AUTHENTICATED {
// If peer went offline, set last seen time to now
timestamp := time.Now().Format(time.RFC3339Nano)
cp.SetConversationAttribute(ci.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp)
}
cp.mutex.Unlock()
} }
} }
cp.mutex.Lock() cp.mutex.Lock()
@ -1352,6 +1427,7 @@ func (cp *cwtchPeer) eventHandler() {
cp.mutex.Unlock() cp.mutex.Unlock()
case event.ServerStateChange: case event.ServerStateChange:
cp.mutex.Lock() cp.mutex.Lock()
prevState := cp.state[ev.Data[event.GroupServer]]
state := connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] state := connections.ConnectionStateToType()[ev.Data[event.ConnectionState]]
cp.state[ev.Data[event.GroupServer]] = state cp.state[ev.Data[event.GroupServer]] = state
cp.mutex.Unlock() cp.mutex.Unlock()
@ -1359,7 +1435,7 @@ func (cp *cwtchPeer) eventHandler() {
// If starting to sync, determine last message from known groups on server so we can calculate sync progress // If starting to sync, determine last message from known groups on server so we can calculate sync progress
if state == connections.AUTHENTICATED { if state == connections.AUTHENTICATED {
conversations, err := cp.FetchConversations() conversations, err := cp.FetchConversations()
mostRecentTime := time.Date(2020, 6, 1, 0, 0, 0, 0, time.UTC) mostRecentTime := event.CwtchEpoch
if err == nil { if err == nil {
for _, conversationInfo := range conversations { for _, conversationInfo := range conversations {
if server, exists := conversationInfo.GetAttribute(attr.LocalScope, attr.LegacyGroupZone, constants.GroupServer); exists && server == ev.Data[event.GroupServer] { if server, exists := conversationInfo.GetAttribute(attr.LocalScope, attr.LegacyGroupZone, constants.GroupServer); exists && server == ev.Data[event.GroupServer] {
@ -1381,6 +1457,15 @@ func (cp *cwtchPeer) eventHandler() {
if err == nil { if err == nil {
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)), mostRecentTime.Format(time.RFC3339Nano)) cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncPreLastMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano)) cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.AUTHENTICATED {
timestamp := time.Now().Format(time.RFC3339Nano)
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp)
} else if connections.ConnectionStateToType()[ev.Data[event.ConnectionState]] == connections.DISCONNECTED && prevState == connections.AUTHENTICATED {
// If peer went offline, set last seen time to now
timestamp := time.Now().Format(time.RFC3339Nano)
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), timestamp)
}
} }
} }
case event.TriggerAntispamCheck: case event.TriggerAntispamCheck:

View File

@ -64,8 +64,12 @@ type CwtchPeer interface {
AutoHandleEvents(events []event.Type) AutoHandleEvents(events []event.Type)
Listen() Listen()
StartConnections(doPeers, doServers bool)
// Deprecated in 1.10
StartPeersConnections() StartPeersConnections()
// Deprecated in 1.10
StartServerConnections() StartServerConnections()
Shutdown() Shutdown()
// GetOnion is deprecated. If you find yourself needing to rely on this method it is time // GetOnion is deprecated. If you find yourself needing to rely on this method it is time

View File

@ -27,7 +27,12 @@ import (
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
) )
const TorMaxPendingConns = 32 // tor/src/app/config/config.c MaxClientCircuitsPending // 32 from tor/src/app/config/config.c MaxClientCircuitsPending
// we lower a bit because there's a lot of spillage
// - just cus we get a SOCKS timeout doesn't mean tor has stopped trying as a huge sorce
// - potential multiple profiles as a huge source
// - second order connections like token service's second servers aren't tracked in our system adding a few extra periodically
const TorMaxPendingConns = 28
type connectionLockedService struct { type connectionLockedService struct {
service tapir.Service service tapir.Service
@ -140,7 +145,6 @@ func (e *engine) EventManager() event.Manager {
// eventHandler process events from other subsystems // eventHandler process events from other subsystems
func (e *engine) eventHandler() { func (e *engine) eventHandler() {
log.Infof("engine.EventHandler()...")
for { for {
ev := e.queue.Next() ev := e.queue.Next()
// optimistic shutdown... // optimistic shutdown...
@ -341,15 +345,10 @@ func (e *engine) Shutdown() {
// peerWithOnion is the entry point for cwtchPeer relationships // peerWithOnion is the entry point for cwtchPeer relationships
// needs to be run in a goroutine as will block on Open. // needs to be run in a goroutine as will block on Open.
func (e *engine) peerWithOnion(onion string) { func (e *engine) peerWithOnion(onion string) {
log.Infof("engine.peerWithOnion(%v)", onion)
log.Debugf("Called PeerWithOnion for %v", onion) log.Debugf("Called PeerWithOnion for %v", onion)
if !e.isBlocked(onion) { if !e.isBlocked(onion) {
e.ignoreOnShutdown(e.peerConnecting)(onion) e.ignoreOnShutdown(e.peerConnecting)(onion)
log.Infof(" service.Connect(%v)", onion)
connected, err := e.service.Connect(onion, e.createPeerTemplate()) connected, err := e.service.Connect(onion, e.createPeerTemplate())
if err != nil {
log.Errorf("peerWithOnion err form Connect: %v\n", err)
}
// If we are already connected...check if we are authed and issue an auth event // If we are already connected...check if we are authed and issue an auth event
// (This allows the ui to be stateless) // (This allows the ui to be stateless)

View File

@ -38,7 +38,7 @@ func waitForConnection(t *testing.T, peer peer.CwtchPeer, addr string, target co
for { for {
log.Infof("%v checking connection...\n", peerName) log.Infof("%v checking connection...\n", peerName)
state := peer.GetPeerState(addr) state := peer.GetPeerState(addr)
log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, state) log.Infof("Waiting for Peer %v to %v - state: %v\n", peerName, addr, connections.ConnectionStateName[state])
if state == connections.FAILED { if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr) t.Fatalf("%v could not connect to %v", peer.GetOnion(), addr)
} }
@ -118,6 +118,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Could not start Tor: %v", err) t.Fatalf("Could not start Tor: %v", err)
} }
log.Infof("Waiting for tor to bootstrap...")
acn.WaitTillBootstrapped() acn.WaitTillBootstrapped()
defer acn.Close() defer acn.Close()
@ -149,25 +150,25 @@ func TestCwtchPeerIntegration(t *testing.T) {
app.CreateTaggedPeer("Carol", "asdfasdf", "test") app.CreateTaggedPeer("Carol", "asdfasdf", "test")
alice := app2.WaitGetPeer(app, "Alice") alice := app2.WaitGetPeer(app, "Alice")
app.ActivePeerEngine(alice.GetOnion(), true, true, true) app.ActivatePeerEngine(alice.GetOnion(), true, true, true)
log.Infoln("Alice created:", alice.GetOnion()) log.Infoln("Alice created:", alice.GetOnion())
alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice") alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice")
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
bob := app2.WaitGetPeer(app, "Bob") bob := app2.WaitGetPeer(app, "Bob")
app.ActivePeerEngine(bob.GetOnion(), true, true, true) app.ActivatePeerEngine(bob.GetOnion(), true, true, true)
log.Infoln("Bob created:", bob.GetOnion()) log.Infoln("Bob created:", bob.GetOnion())
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob") bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
carol := app2.WaitGetPeer(app, "Carol") carol := app2.WaitGetPeer(app, "Carol")
app.ActivePeerEngine(carol.GetOnion(), true, true, true) app.ActivatePeerEngine(carol.GetOnion(), true, true, true)
log.Infoln("Carol created:", carol.GetOnion()) log.Infoln("Carol created:", carol.GetOnion())
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol") carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer}) carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
waitTime := time.Duration(60) * time.Second waitTime := time.Duration(60) * time.Second
log.Infof("** Waiting for Alice, Bob, and Carol to connect with onion network... (%v)\n", waitTime) log.Infof("** Waiting for Alice, Bob, and Carol to register their onion hidden service on the network... (%v)\n", waitTime)
time.Sleep(waitTime) time.Sleep(waitTime)
numGoRoutinesPostPeerStart := runtime.NumGoroutine() numGoRoutinesPostPeerStart := runtime.NumGoroutine()
log.Infof("** Wait Done!") log.Infof("** Wait Done!")
@ -394,7 +395,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ log.Infof("numGoRoutinesStart: %v\nnumGoRoutinesPostAppStart: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v", "numGoRoutinesPostAlice: %v\nnumGoRoutinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostCarol: %v\nnumGoRoutinesPostAppShutdown: %v",
numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesStart, numGoRoutinesPostAppStart, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown) numGoRoutinesPostAlice, numGoRoutinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostCarol, numGoRoutinesPostAppShutdown)
@ -418,6 +419,7 @@ func checkSendMessageToGroup(t *testing.T, profile peer.CwtchPeer, id int, messa
// Utility function for testing that a message in a conversation is as expected // Utility function for testing that a message in a conversation is as expected
func checkMessage(t *testing.T, profile peer.CwtchPeer, id int, messageID int, expected string) { func checkMessage(t *testing.T, profile peer.CwtchPeer, id int, messageID int, expected string) {
message, _, err := profile.GetChannelMessage(id, 0, messageID) message, _, err := profile.GetChannelMessage(id, 0, messageID)
log.Debugf(" checking if expected: %v is actual: %v", expected, message)
if err != nil { if err != nil {
log.Errorf("unexpected message %v expected: %v got error: %v", profile.GetOnion(), expected, err) log.Errorf("unexpected message %v expected: %v got error: %v", profile.GetOnion(), expected, err)
t.Fatalf("unexpected message %v expected: %v got error: %v\n", profile.GetOnion(), expected, err) t.Fatalf("unexpected message %v expected: %v got error: %v\n", profile.GetOnion(), expected, err)

View File

@ -103,9 +103,9 @@ func TestFileSharing(t *testing.T) {
t.Logf("** Waiting for Alice, Bob...") t.Logf("** Waiting for Alice, Bob...")
alice := app2.WaitGetPeer(app, "alice") alice := app2.WaitGetPeer(app, "alice")
app.ActivePeerEngine(alice.GetOnion(), true, true, true) app.ActivatePeerEngine(alice.GetOnion(), true, true, true)
bob := app2.WaitGetPeer(app, "bob") bob := app2.WaitGetPeer(app, "bob")
app.ActivePeerEngine(bob.GetOnion(), true, true, true) app.ActivatePeerEngine(bob.GetOnion(), true, true, true)
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer}) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer})
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer, event.ManifestReceived}) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.NewRetValMessageFromPeer, event.ManifestReceived})