Listen to ProtocolEngineStopped Event and Handle Restart Listen

Also upgrade connectivity and tapir
This commit is contained in:
Sarah Jamie Lewis 2021-04-13 15:12:12 -07:00
parent bbda5cc777
commit 0ad787d07f
8 changed files with 42 additions and 19 deletions

View File

@ -248,9 +248,9 @@ func (app *application) getACNStatusHandler() func(int, string) {
return func(progress int, status string) {
progStr := strconv.Itoa(progress)
app.peerLock.Lock()
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
for _, bus := range app.eventBuses {
bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
}
app.peerLock.Unlock()
}

View File

@ -155,11 +155,11 @@ func (as *applicationService) loadProfiles(password string) {
func (as *applicationService) getACNStatusHandler() func(int, string) {
return func(progress int, status string) {
progStr := strconv.Itoa(progress)
as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)})
as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)})
as.applicationCore.coremutex.Lock()
defer as.applicationCore.coremutex.Unlock()
for _, bus := range as.eventBuses {
bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status))
bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
}
}
}

View File

@ -73,7 +73,7 @@ func (cr *contactRetry) run() {
cr.handleEvent(server, state, serverConn)
case event.ACNStatus:
prog := e.Data[event.Progreess]
prog := e.Data[event.Progress]
if prog == "100" && cr.networkUp == false {
cr.networkUp = true
cr.connections.Range(func(k, v interface{}) bool {

View File

@ -61,10 +61,12 @@ func (nc *networkCheck) run() {
// and then we will wait a minute and check the connection for the first time (the onion should be up)
// under normal operating circumstances
case event.ProtocolEngineStartListen:
log.Debugf("initiating connection check for %v", e.Data[event.Onion])
nc.onionsToCheck.Store(e.Data[event.Onion], true)
if time.Now().Sub(lastMessageReceived) > time.Minute {
nc.selfTest()
if _, exists := nc.onionsToCheck.Load(e.Data[event.Onion]); !exists {
log.Debugf("initiating connection check for %v", e.Data[event.Onion])
nc.onionsToCheck.Store(e.Data[event.Onion], true)
if time.Now().Sub(lastMessageReceived) > time.Minute {
nc.selfTest()
}
}
case event.PeerStateChange:
fallthrough

View File

@ -255,7 +255,7 @@ const (
Error = Field("Error")
Progreess = Field("Progress")
Progress = Field("Progress")
Status = Field("Status")
EventID = Field("EventID")
EventContext = Field("EventContext")

4
go.mod
View File

@ -3,8 +3,8 @@ module cwtch.im/cwtch
go 1.14
require (
git.openprivacy.ca/cwtch.im/tapir v0.3.2
git.openprivacy.ca/openprivacy/connectivity v1.4.1
git.openprivacy.ca/cwtch.im/tapir v0.3.4
git.openprivacy.ca/openprivacy/connectivity v1.4.2
git.openprivacy.ca/openprivacy/log v1.0.2
github.com/gtank/ristretto255 v0.1.2
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect

8
go.sum
View File

@ -2,10 +2,18 @@ git.openprivacy.ca/cwtch.im/tapir v0.3.1 h1:+d1dHyPvZ8JmdfFe/oXWJPardzflRIhcdILt
git.openprivacy.ca/cwtch.im/tapir v0.3.1/go.mod h1:q6RMI/TQvRN8SCtRY3GryOawMcB0uG6NjP6M77oSMx8=
git.openprivacy.ca/cwtch.im/tapir v0.3.2 h1:thLWqqY1LkirWFcy9Tg6NgWeYbvo9xBm+s2XVnCIvpY=
git.openprivacy.ca/cwtch.im/tapir v0.3.2/go.mod h1:q6RMI/TQvRN8SCtRY3GryOawMcB0uG6NjP6M77oSMx8=
git.openprivacy.ca/cwtch.im/tapir v0.3.3 h1:Q7F8JijgOMMYSy3IdZl7+r6qkWckEWV1+EY7q6MAkVs=
git.openprivacy.ca/cwtch.im/tapir v0.3.3/go.mod h1:ZMg9Jzh0n3Os2aSF4z+bx/n8WBCJBN7KCQESXperYts=
git.openprivacy.ca/cwtch.im/tapir v0.3.4 h1:g7yZkfz/vWr/t2tFXa/t0Ebr/w665uIKpxpCZ3lIPCo=
git.openprivacy.ca/cwtch.im/tapir v0.3.4/go.mod h1:+Niy2AHhQC351ZTtfhC0uLjViCICyOxCJZsIlGKKNAU=
git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c=
git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU=
git.openprivacy.ca/openprivacy/connectivity v1.4.0 h1:c7AANUCrlA4hIqXxIGDOWMtSe8CpDleD1877PShScbM=
git.openprivacy.ca/openprivacy/connectivity v1.4.0/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo=
git.openprivacy.ca/openprivacy/connectivity v1.4.1 h1:zoM+j7PFj8mQeUCNiDNMe7Uq9dhcJDOhaZcSANfeDL4=
git.openprivacy.ca/openprivacy/connectivity v1.4.1/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo=
git.openprivacy.ca/openprivacy/connectivity v1.4.2 h1:rQFIjWunLlRmXL5Efsv+7+1cA70T6Uza6RCy2PRm9zc=
git.openprivacy.ca/openprivacy/connectivity v1.4.2/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo=
git.openprivacy.ca/openprivacy/log v1.0.1/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
git.openprivacy.ca/openprivacy/log v1.0.2 h1:HLP4wsw4ljczFAelYnbObIs821z+jgMPCe8uODPnGQM=
git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=

View File

@ -19,7 +19,7 @@ import (
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true,
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true}
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true}
// DefaultEventsToHandle specifies which events will be subscribed to
// when a peer has its Init() function called
@ -31,13 +31,15 @@ var DefaultEventsToHandle = []event.Type{
event.PeerError,
event.SendMessageToGroupError,
event.NewGetValMessageFromPeer,
event.ProtocolEngineStopped,
}
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
type cwtchPeer struct {
Profile *model.Profile
mutex sync.Mutex
shutdown bool
Profile *model.Profile
mutex sync.Mutex
shutdown bool
listenStatus bool
queue event.Queue
eventBus event.Manager
@ -564,8 +566,15 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
// Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
func (cp *cwtchPeer) Listen() {
log.Debugf("cwtchPeer Listen sending ProtocolEngineStartListen\n")
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: cp.Profile.Onion}))
cp.mutex.Lock()
defer cp.mutex.Unlock()
if cp.listenStatus == false {
log.Infof("cwtchPeer Listen sending ProtocolEngineStartListen\n")
cp.listenStatus = true
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: cp.Profile.Onion}))
} else {
// protocol engine is already listening
}
}
// StartPeersConnections attempts to connect to peer connections
@ -682,7 +691,11 @@ func (cp *cwtchPeer) eventHandler() {
ev := cp.queue.Next()
switch ev.EventType {
/***** Default auto handled events *****/
case event.ProtocolEngineStopped:
cp.mutex.Lock()
cp.listenStatus = false
log.Infof("Protocol engine for %v has stopped listening", cp.Profile.Onion)
cp.mutex.Unlock()
case event.EncryptedGroupMessage:
// If successful, a side effect is the message is added to the group's timeline
cp.mutex.Lock()