forked from cwtch.im/cwtch
106 lines
3.1 KiB
Go
106 lines
3.1 KiB
Go
package connections
|
|
|
|
import (
|
|
"cwtch.im/cwtch/protocol"
|
|
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
|
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
|
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Manager encapsulates all the logic necessary to manage outgoing peer and server connections.
|
|
type Manager struct {
|
|
serverConnections map[string]*PeerServerConnection
|
|
lock sync.Mutex
|
|
breakChannel chan bool
|
|
acn connectivity.ACN
|
|
}
|
|
|
|
// NewConnectionsManager creates a new instance of Manager.
|
|
func NewConnectionsManager(acn connectivity.ACN) *Manager {
|
|
m := new(Manager)
|
|
m.acn = acn
|
|
m.serverConnections = make(map[string]*PeerServerConnection)
|
|
m.breakChannel = make(chan bool)
|
|
return m
|
|
}
|
|
|
|
// ManageServerConnection creates a new ServerConnection for Host with the given callback handler.
|
|
// If there is an establish connection, it is replaced with a new one, assuming this came from
|
|
// a new JoinServer from a new Group being joined. If it is still connecting to a server, the second request will be abandonded
|
|
func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage)) {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
psc, exists := m.serverConnections[host]
|
|
|
|
if exists {
|
|
if psc.GetState() == DISCONNECTED || psc.GetState() == CONNECTING || psc.GetState() == CONNECTED {
|
|
log.Infof("Already connecting to %v, abandoning fresh attempt\n", host)
|
|
return
|
|
}
|
|
}
|
|
|
|
newPsc := NewPeerServerConnection(engine, host)
|
|
newPsc.GroupMessageHandler = messageHandler
|
|
go newPsc.Run()
|
|
m.serverConnections[host] = newPsc
|
|
|
|
if exists {
|
|
log.Infof("Closing connection to %v, replacing with this one\n", host)
|
|
psc.Close()
|
|
}
|
|
}
|
|
|
|
// SetServerSynced is a helper for peerserver connections and engine to call when a Fetch is done to set the state of the connection to SYNCED
|
|
func (m *Manager) SetServerSynced(onion string) {
|
|
m.serverConnections[onion].setState(SYNCED)
|
|
}
|
|
|
|
// GetPeerServerConnectionForOnion safely returns a given host connection
|
|
func (m *Manager) GetPeerServerConnectionForOnion(host string) (psc *PeerServerConnection) {
|
|
m.lock.Lock()
|
|
psc = m.serverConnections[host]
|
|
m.lock.Unlock()
|
|
return
|
|
}
|
|
|
|
// AttemptReconnections repeatedly attempts to reconnect with failed peers and servers.
|
|
func (m *Manager) AttemptReconnections() {
|
|
maxTimeout := time.Minute * 5
|
|
// nearly instant first run, next few runs will prolly be too quick to have any FAILED and will gracefully slow to MAX after that
|
|
timeout := time.Millisecond * 500
|
|
for {
|
|
select {
|
|
case <-time.After(timeout):
|
|
m.lock.Lock()
|
|
for _, psc := range m.serverConnections {
|
|
if psc.GetState() == FAILED {
|
|
go psc.Run()
|
|
}
|
|
}
|
|
m.lock.Unlock()
|
|
|
|
if timeout < maxTimeout {
|
|
timeout = timeout * 2
|
|
} else {
|
|
timeout = maxTimeout
|
|
}
|
|
case <-m.breakChannel:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Shutdown closes all connections under management (freeing their goroutines)
|
|
func (m *Manager) Shutdown() {
|
|
m.breakChannel <- true
|
|
m.lock.Lock()
|
|
for onion, psc := range m.serverConnections {
|
|
psc.Close()
|
|
delete(m.serverConnections, onion)
|
|
}
|
|
m.lock.Unlock()
|
|
}
|