Add Server Restarts to Contact Retry Plugin
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Sarah Jamie Lewis 2021-04-28 12:47:55 -07:00
parent 25ded15a5b
commit 48335552c9
5 changed files with 46 additions and 6 deletions

View File

@ -83,6 +83,9 @@ func (cr *contactRetry) run() {
if p.ctype == peerConn {
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
}
if p.ctype == serverConn {
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
}
return true
})
} else if prog != "100" {

View File

@ -19,6 +19,11 @@ const (
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
RetryPeerRequest = Type("RetryPeerRequest")
// RetryServerRequest
// Asks CwtchPeer to retry a server connection...
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
RetryServerRequest = Type("RetryServerRequest")
// RemotePeer
// Authorization(model.peer.Auth_...)
SetPeerAuthorization = Type("UpdatePeerAuthorization")
@ -266,6 +271,8 @@ const (
// Indicate whether an event was triggered by a user import
Imported = Field("Imported")
Source = Field("Source")
)
// Defining Common errors

View File

@ -3,12 +3,18 @@ package event
import (
"crypto/rand"
"encoding/json"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"math"
"math/big"
"os"
"runtime"
"strings"
"sync"
)
// Event is the core struct type passed around between various subsystems. Events consist of a type which can be
// filtered on, an event ID for tracing and a map of Fields to string values.
type Event struct {
@ -54,6 +60,7 @@ type manager struct {
mapMutex sync.Mutex
internal chan bool
closed bool
trace bool
}
// Manager is an interface for an event bus
@ -78,6 +85,10 @@ func (em *manager) initialize() {
em.events = make(chan []byte)
em.internal = make(chan bool)
em.closed = false
_, em.trace = os.LookupEnv("CWTCH_EVENT_SOURCE")
go em.eventBus()
}
@ -92,6 +103,20 @@ func (em *manager) Subscribe(eventType Type, queue Queue) {
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
func (em *manager) Publish(event Event) {
if event.EventType != "" && em.closed != true {
// Debug Events for Tracing, locked behind an environment variable
// for now.
if em.trace {
pc, _, _, _ := runtime.Caller(1)
funcName := runtime.FuncForPC(pc).Name()
lastSlash := strings.LastIndexByte(funcName, '/')
if lastSlash < 0 {
lastSlash = 0
}
lastDot := strings.LastIndexByte(funcName[lastSlash:], '.') + lastSlash
event.Data[Source] = fmt.Sprintf("%v.%v", funcName[:lastDot], funcName[lastDot+1:])
}
// Deep Copy the Event...
eventJSON, err := json.Marshal(event)
if err != nil {

View File

@ -19,7 +19,7 @@ import (
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true,
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true}
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true}
// DefaultEventsToHandle specifies which events will be subscribed to
// when a peer has its Init() function called
@ -32,6 +32,7 @@ var DefaultEventsToHandle = []event.Type{
event.SendMessageToGroupError,
event.NewGetValMessageFromPeer,
event.ProtocolEngineStopped,
event.RetryServerRequest,
}
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
@ -221,7 +222,7 @@ func (cp *cwtchPeer) AutoHandleEvents(events []event.Type) {
}
}
// ImportGroup intializes a group from an imported source rather than a peer invite
// ImportGroup initializes a group from an imported source rather than a peer invite
func (cp *cwtchPeer) ImportGroup(exportedInvite string) (err error) {
if strings.HasPrefix(exportedInvite, "torv3") {
data, err := base64.StdEncoding.DecodeString(exportedInvite[5:])
@ -704,7 +705,7 @@ func (cp *cwtchPeer) eventHandler() {
ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature)
cp.mutex.Unlock()
if ok && !seen {
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature:base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
}
case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data
@ -725,7 +726,10 @@ func (cp *cwtchPeer) eventHandler() {
cp.mutex.Lock()
cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error])
cp.mutex.Unlock()
case event.RetryServerRequest:
// Automated Join Server Request triggered by a plugin.
log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer])
cp.JoinServer(ev.Data[event.GroupServer])
case event.NewGetValMessageFromPeer:
onion := ev.Data[event.RemotePeer]
scope := ev.Data[event.Scope]
@ -795,6 +799,7 @@ func (cp *cwtchPeer) eventHandler() {
}
}
cp.mutex.Unlock()
default:
if ev.EventType != "" {
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)

View File

@ -403,8 +403,8 @@ func (ps *ProfileStoreV1) eventHandler() {
groupid := ev.Data[event.GroupID]
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
sig,_ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
prevsig,_ := base64.StdEncoding.DecodeString(ev.Data[event.PreviousSignature])
sig, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
prevsig, _ := base64.StdEncoding.DecodeString(ev.Data[event.PreviousSignature])
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: sig, PreviousMessageSig: prevsig, Acknowledged: true}
ss, exists := ps.streamStores[groupid]
if exists {