Merge pull request 'Listen to ProtocolEngineStopped Event and Handle Restart Listen' (#345) from upgrade into master
continuous-integration/drone/tag Build is passing Details
continuous-integration/drone/push Build is passing Details

Reviewed-on: #345
This commit is contained in:
erinn 2021-04-13 15:23:49 -07:00
commit ae90e22e64
8 changed files with 42 additions and 19 deletions

View File

@ -248,9 +248,9 @@ func (app *application) getACNStatusHandler() func(int, string) {
return func(progress int, status string) { return func(progress int, status string) {
progStr := strconv.Itoa(progress) progStr := strconv.Itoa(progress)
app.peerLock.Lock() app.peerLock.Lock()
app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)) app.appBus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
for _, bus := range app.eventBuses { for _, bus := range app.eventBuses {
bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)) bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
} }
app.peerLock.Unlock() app.peerLock.Unlock()
} }

View File

@ -155,11 +155,11 @@ func (as *applicationService) loadProfiles(password string) {
func (as *applicationService) getACNStatusHandler() func(int, string) { func (as *applicationService) getACNStatusHandler() func(int, string) {
return func(progress int, status string) { return func(progress int, status string) {
progStr := strconv.Itoa(progress) progStr := strconv.Itoa(progress)
as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)}) as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status)})
as.applicationCore.coremutex.Lock() as.applicationCore.coremutex.Lock()
defer as.applicationCore.coremutex.Unlock() defer as.applicationCore.coremutex.Unlock()
for _, bus := range as.eventBuses { for _, bus := range as.eventBuses {
bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)) bus.Publish(event.NewEventList(event.ACNStatus, event.Progress, progStr, event.Status, status))
} }
} }
} }

View File

@ -73,7 +73,7 @@ func (cr *contactRetry) run() {
cr.handleEvent(server, state, serverConn) cr.handleEvent(server, state, serverConn)
case event.ACNStatus: case event.ACNStatus:
prog := e.Data[event.Progreess] prog := e.Data[event.Progress]
if prog == "100" && cr.networkUp == false { if prog == "100" && cr.networkUp == false {
cr.networkUp = true cr.networkUp = true
cr.connections.Range(func(k, v interface{}) bool { cr.connections.Range(func(k, v interface{}) bool {

View File

@ -61,10 +61,12 @@ func (nc *networkCheck) run() {
// and then we will wait a minute and check the connection for the first time (the onion should be up) // and then we will wait a minute and check the connection for the first time (the onion should be up)
// under normal operating circumstances // under normal operating circumstances
case event.ProtocolEngineStartListen: case event.ProtocolEngineStartListen:
log.Debugf("initiating connection check for %v", e.Data[event.Onion]) if _, exists := nc.onionsToCheck.Load(e.Data[event.Onion]); !exists {
nc.onionsToCheck.Store(e.Data[event.Onion], true) log.Debugf("initiating connection check for %v", e.Data[event.Onion])
if time.Now().Sub(lastMessageReceived) > time.Minute { nc.onionsToCheck.Store(e.Data[event.Onion], true)
nc.selfTest() if time.Now().Sub(lastMessageReceived) > time.Minute {
nc.selfTest()
}
} }
case event.PeerStateChange: case event.PeerStateChange:
fallthrough fallthrough

View File

@ -255,7 +255,7 @@ const (
Error = Field("Error") Error = Field("Error")
Progreess = Field("Progress") Progress = Field("Progress")
Status = Field("Status") Status = Field("Status")
EventID = Field("EventID") EventID = Field("EventID")
EventContext = Field("EventContext") EventContext = Field("EventContext")

4
go.mod
View File

@ -3,8 +3,8 @@ module cwtch.im/cwtch
go 1.14 go 1.14
require ( require (
git.openprivacy.ca/cwtch.im/tapir v0.3.2 git.openprivacy.ca/cwtch.im/tapir v0.3.4
git.openprivacy.ca/openprivacy/connectivity v1.4.0 git.openprivacy.ca/openprivacy/connectivity v1.4.2
git.openprivacy.ca/openprivacy/log v1.0.2 git.openprivacy.ca/openprivacy/log v1.0.2
github.com/gtank/ristretto255 v0.1.2 github.com/gtank/ristretto255 v0.1.2
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect

8
go.sum
View File

@ -2,10 +2,18 @@ git.openprivacy.ca/cwtch.im/tapir v0.3.1 h1:+d1dHyPvZ8JmdfFe/oXWJPardzflRIhcdILt
git.openprivacy.ca/cwtch.im/tapir v0.3.1/go.mod h1:q6RMI/TQvRN8SCtRY3GryOawMcB0uG6NjP6M77oSMx8= git.openprivacy.ca/cwtch.im/tapir v0.3.1/go.mod h1:q6RMI/TQvRN8SCtRY3GryOawMcB0uG6NjP6M77oSMx8=
git.openprivacy.ca/cwtch.im/tapir v0.3.2 h1:thLWqqY1LkirWFcy9Tg6NgWeYbvo9xBm+s2XVnCIvpY= git.openprivacy.ca/cwtch.im/tapir v0.3.2 h1:thLWqqY1LkirWFcy9Tg6NgWeYbvo9xBm+s2XVnCIvpY=
git.openprivacy.ca/cwtch.im/tapir v0.3.2/go.mod h1:q6RMI/TQvRN8SCtRY3GryOawMcB0uG6NjP6M77oSMx8= git.openprivacy.ca/cwtch.im/tapir v0.3.2/go.mod h1:q6RMI/TQvRN8SCtRY3GryOawMcB0uG6NjP6M77oSMx8=
git.openprivacy.ca/cwtch.im/tapir v0.3.3 h1:Q7F8JijgOMMYSy3IdZl7+r6qkWckEWV1+EY7q6MAkVs=
git.openprivacy.ca/cwtch.im/tapir v0.3.3/go.mod h1:ZMg9Jzh0n3Os2aSF4z+bx/n8WBCJBN7KCQESXperYts=
git.openprivacy.ca/cwtch.im/tapir v0.3.4 h1:g7yZkfz/vWr/t2tFXa/t0Ebr/w665uIKpxpCZ3lIPCo=
git.openprivacy.ca/cwtch.im/tapir v0.3.4/go.mod h1:+Niy2AHhQC351ZTtfhC0uLjViCICyOxCJZsIlGKKNAU=
git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c= git.openprivacy.ca/openprivacy/bine v0.0.4 h1:CO7EkGyz+jegZ4ap8g5NWRuDHA/56KKvGySR6OBPW+c=
git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU= git.openprivacy.ca/openprivacy/bine v0.0.4/go.mod h1:13ZqhKyqakDsN/ZkQkIGNULsmLyqtXc46XBcnuXm/mU=
git.openprivacy.ca/openprivacy/connectivity v1.4.0 h1:c7AANUCrlA4hIqXxIGDOWMtSe8CpDleD1877PShScbM= git.openprivacy.ca/openprivacy/connectivity v1.4.0 h1:c7AANUCrlA4hIqXxIGDOWMtSe8CpDleD1877PShScbM=
git.openprivacy.ca/openprivacy/connectivity v1.4.0/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo= git.openprivacy.ca/openprivacy/connectivity v1.4.0/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo=
git.openprivacy.ca/openprivacy/connectivity v1.4.1 h1:zoM+j7PFj8mQeUCNiDNMe7Uq9dhcJDOhaZcSANfeDL4=
git.openprivacy.ca/openprivacy/connectivity v1.4.1/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo=
git.openprivacy.ca/openprivacy/connectivity v1.4.2 h1:rQFIjWunLlRmXL5Efsv+7+1cA70T6Uza6RCy2PRm9zc=
git.openprivacy.ca/openprivacy/connectivity v1.4.2/go.mod h1:bR0Myx9nm2YzWtsThRelkNMV4Pp7sPDa123O1qsAbVo=
git.openprivacy.ca/openprivacy/log v1.0.1/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= git.openprivacy.ca/openprivacy/log v1.0.1/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=
git.openprivacy.ca/openprivacy/log v1.0.2 h1:HLP4wsw4ljczFAelYnbObIs821z+jgMPCe8uODPnGQM= git.openprivacy.ca/openprivacy/log v1.0.2 h1:HLP4wsw4ljczFAelYnbObIs821z+jgMPCe8uODPnGQM=
git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw= git.openprivacy.ca/openprivacy/log v1.0.2/go.mod h1:gGYK8xHtndRLDymFtmjkG26GaMQNgyhioNS82m812Iw=

View File

@ -19,7 +19,7 @@ import (
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true, event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true,
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true} event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true}
// DefaultEventsToHandle specifies which events will be subscribed to // DefaultEventsToHandle specifies which events will be subscribed to
// when a peer has its Init() function called // when a peer has its Init() function called
@ -31,13 +31,15 @@ var DefaultEventsToHandle = []event.Type{
event.PeerError, event.PeerError,
event.SendMessageToGroupError, event.SendMessageToGroupError,
event.NewGetValMessageFromPeer, event.NewGetValMessageFromPeer,
event.ProtocolEngineStopped,
} }
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
type cwtchPeer struct { type cwtchPeer struct {
Profile *model.Profile Profile *model.Profile
mutex sync.Mutex mutex sync.Mutex
shutdown bool shutdown bool
listenStatus bool
queue event.Queue queue event.Queue
eventBus event.Manager eventBus event.Manager
@ -564,8 +566,15 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
// Listen makes the peer open a listening port to accept incoming connections (and be detactably online) // Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
func (cp *cwtchPeer) Listen() { func (cp *cwtchPeer) Listen() {
log.Debugf("cwtchPeer Listen sending ProtocolEngineStartListen\n") cp.mutex.Lock()
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: cp.Profile.Onion})) defer cp.mutex.Unlock()
if cp.listenStatus == false {
log.Infof("cwtchPeer Listen sending ProtocolEngineStartListen\n")
cp.listenStatus = true
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{event.Onion: cp.Profile.Onion}))
} else {
// protocol engine is already listening
}
} }
// StartPeersConnections attempts to connect to peer connections // StartPeersConnections attempts to connect to peer connections
@ -682,7 +691,11 @@ func (cp *cwtchPeer) eventHandler() {
ev := cp.queue.Next() ev := cp.queue.Next()
switch ev.EventType { switch ev.EventType {
/***** Default auto handled events *****/ /***** Default auto handled events *****/
case event.ProtocolEngineStopped:
cp.mutex.Lock()
cp.listenStatus = false
log.Infof("Protocol engine for %v has stopped listening", cp.Profile.Onion)
cp.mutex.Unlock()
case event.EncryptedGroupMessage: case event.EncryptedGroupMessage:
// If successful, a side effect is the message is added to the group's timeline // If successful, a side effect is the message is added to the group's timeline
cp.mutex.Lock() cp.mutex.Lock()