forked from cwtch.im/cwtch
1
0
Fork 0
cwtch/peer/connections/connectionsmanager.go

148 lines
3.6 KiB
Go

package connections
import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol"
"sync"
"time"
)
// Manager encapsulates all the logic necessary to manage outgoing peer and server connections.
type Manager struct {
peerConnections map[string]*PeerPeerConnection
serverConnections map[string]*PeerServerConnection
lock sync.Mutex
breakChannel chan bool
}
// NewConnectionsManager creates a new instance of Manager.
func NewConnectionsManager() *Manager {
m := new(Manager)
m.peerConnections = make(map[string]*PeerPeerConnection)
m.serverConnections = make(map[string]*PeerServerConnection)
m.breakChannel = make(chan bool)
return m
}
// ManagePeerConnection creates a new PeerConnection for the given Host and Profile.
func (m *Manager) ManagePeerConnection(host string, profile *model.Profile) {
m.lock.Lock()
_, exists := m.peerConnections[host]
if !exists {
ppc := NewPeerPeerConnection(host, profile)
go ppc.Run()
m.peerConnections[host] = ppc
}
m.lock.Unlock()
}
// ManageServerConnection creates a new ServerConnection for Host with the given callback handler.
func (m *Manager) ManageServerConnection(host string, handler func(string, *protocol.GroupMessage)) {
m.lock.Lock()
_, exists := m.serverConnections[host]
if !exists {
psc := NewPeerServerConnection(host)
go psc.Run()
psc.GroupMessageHandler = handler
m.serverConnections[host] = psc
}
m.lock.Unlock()
}
// GetPeers returns a map of all peer connections with their state
func (m *Manager) GetPeers() map[string]ConnectionState {
rm := make(map[string]ConnectionState)
m.lock.Lock()
for onion, ppc := range m.peerConnections {
rm[onion] = ppc.GetState()
}
m.lock.Unlock()
return rm
}
// GetServers returns a map of all server connections with their state.
func (m *Manager) GetServers() map[string]ConnectionState {
rm := make(map[string]ConnectionState)
m.lock.Lock()
for onion, psc := range m.serverConnections {
rm[onion] = psc.GetState()
}
m.lock.Unlock()
return rm
}
// GetPeerPeerConnectionForOnion safely returns a given peer connection
func (m *Manager) GetPeerPeerConnectionForOnion(host string) (ppc *PeerPeerConnection) {
m.lock.Lock()
ppc = m.peerConnections[host]
m.lock.Unlock()
return
}
// GetPeerServerConnectionForOnion safely returns a given host connection
func (m *Manager) GetPeerServerConnectionForOnion(host string) (psc *PeerServerConnection) {
m.lock.Lock()
psc = m.serverConnections[host]
m.lock.Unlock()
return
}
// AttemptReconnections repeatedly attempts to reconnect with failed peers and servers.
func (m *Manager) AttemptReconnections() {
timeout := time.Duration(0) // first pass right away
for {
select {
case <-time.After(timeout * time.Second):
m.lock.Lock()
for _, ppc := range m.peerConnections {
if ppc.GetState() == FAILED {
go ppc.Run()
}
}
m.lock.Unlock()
m.lock.Lock()
for _, psc := range m.serverConnections {
if psc.GetState() == FAILED {
go psc.Run()
}
}
m.lock.Unlock()
// Launch Another Run In 30 Seconds
timeout = time.Duration(30)
case <-m.breakChannel:
return
}
}
}
// ClosePeerConnection closes an existing peer connection
func (m *Manager) ClosePeerConnection(onion string) {
m.lock.Lock()
pc, ok := m.peerConnections[onion]
if ok {
pc.Close()
delete(m.peerConnections, onion)
}
m.lock.Unlock()
}
func (m *Manager) Shutdown() {
m.breakChannel <- true
m.lock.Lock()
for onion, ppc := range m.peerConnections {
ppc.Close()
delete(m.peerConnections, onion)
}
for onion, psc := range m.serverConnections {
psc.Close()
delete(m.serverConnections, onion)
}
m.lock.Unlock()
}