From 48335552c97a564bdeb5f38747e033ec29134736 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Wed, 28 Apr 2021 12:47:55 -0700 Subject: [PATCH] Add Server Restarts to Contact Retry Plugin --- app/plugins/contactRetry.go | 3 +++ event/common.go | 7 +++++++ event/eventmanager.go | 25 +++++++++++++++++++++++++ peer/cwtch_peer.go | 13 +++++++++---- storage/v1/profile_store.go | 4 ++-- 5 files changed, 46 insertions(+), 6 deletions(-) diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index ecec1eb..3167146 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -83,6 +83,9 @@ func (cr *contactRetry) run() { if p.ctype == peerConn { cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id})) } + if p.ctype == serverConn { + cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id})) + } return true }) } else if prog != "100" { diff --git a/event/common.go b/event/common.go index ed0f38c..a9a8ccd 100644 --- a/event/common.go +++ b/event/common.go @@ -19,6 +19,11 @@ const ( // RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd" RetryPeerRequest = Type("RetryPeerRequest") + // RetryServerRequest + // Asks CwtchPeer to retry a server connection... + // GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd" + RetryServerRequest = Type("RetryServerRequest") + // RemotePeer // Authorization(model.peer.Auth_...) SetPeerAuthorization = Type("UpdatePeerAuthorization") @@ -266,6 +271,8 @@ const ( // Indicate whether an event was triggered by a user import Imported = Field("Imported") + + Source = Field("Source") ) // Defining Common errors diff --git a/event/eventmanager.go b/event/eventmanager.go index 25a3054..e5fcf12 100644 --- a/event/eventmanager.go +++ b/event/eventmanager.go @@ -3,12 +3,18 @@ package event import ( "crypto/rand" "encoding/json" + "fmt" "git.openprivacy.ca/openprivacy/log" "math" "math/big" + "os" + "runtime" + "strings" "sync" ) + + // Event is the core struct type passed around between various subsystems. Events consist of a type which can be // filtered on, an event ID for tracing and a map of Fields to string values. type Event struct { @@ -54,6 +60,7 @@ type manager struct { mapMutex sync.Mutex internal chan bool closed bool + trace bool } // Manager is an interface for an event bus @@ -78,6 +85,10 @@ func (em *manager) initialize() { em.events = make(chan []byte) em.internal = make(chan bool) em.closed = false + + + _, em.trace = os.LookupEnv("CWTCH_EVENT_SOURCE") + go em.eventBus() } @@ -92,6 +103,20 @@ func (em *manager) Subscribe(eventType Type, queue Queue) { // Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers func (em *manager) Publish(event Event) { if event.EventType != "" && em.closed != true { + + // Debug Events for Tracing, locked behind an environment variable + // for now. + if em.trace { + pc, _, _, _ := runtime.Caller(1) + funcName := runtime.FuncForPC(pc).Name() + lastSlash := strings.LastIndexByte(funcName, '/') + if lastSlash < 0 { + lastSlash = 0 + } + lastDot := strings.LastIndexByte(funcName[lastSlash:], '.') + lastSlash + event.Data[Source] = fmt.Sprintf("%v.%v", funcName[:lastDot], funcName[lastDot+1:]) + } + // Deep Copy the Event... eventJSON, err := json.Marshal(event) if err != nil { diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index b686a9f..0151357 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -19,7 +19,7 @@ import ( var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true, - event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true} + event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true} // DefaultEventsToHandle specifies which events will be subscribed to // when a peer has its Init() function called @@ -32,6 +32,7 @@ var DefaultEventsToHandle = []event.Type{ event.SendMessageToGroupError, event.NewGetValMessageFromPeer, event.ProtocolEngineStopped, + event.RetryServerRequest, } // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer @@ -221,7 +222,7 @@ func (cp *cwtchPeer) AutoHandleEvents(events []event.Type) { } } -// ImportGroup intializes a group from an imported source rather than a peer invite +// ImportGroup initializes a group from an imported source rather than a peer invite func (cp *cwtchPeer) ImportGroup(exportedInvite string) (err error) { if strings.HasPrefix(exportedInvite, "torv3") { data, err := base64.StdEncoding.DecodeString(exportedInvite[5:]) @@ -704,7 +705,7 @@ func (cp *cwtchPeer) eventHandler() { ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature) cp.mutex.Unlock() if ok && !seen { - cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature:base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID})) + cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID})) } case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data @@ -725,7 +726,10 @@ func (cp *cwtchPeer) eventHandler() { cp.mutex.Lock() cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error]) cp.mutex.Unlock() - + case event.RetryServerRequest: + // Automated Join Server Request triggered by a plugin. + log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer]) + cp.JoinServer(ev.Data[event.GroupServer]) case event.NewGetValMessageFromPeer: onion := ev.Data[event.RemotePeer] scope := ev.Data[event.Scope] @@ -795,6 +799,7 @@ func (cp *cwtchPeer) eventHandler() { } } cp.mutex.Unlock() + default: if ev.EventType != "" { log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) diff --git a/storage/v1/profile_store.go b/storage/v1/profile_store.go index 5b73183..a6b89bb 100644 --- a/storage/v1/profile_store.go +++ b/storage/v1/profile_store.go @@ -403,8 +403,8 @@ func (ps *ProfileStoreV1) eventHandler() { groupid := ev.Data[event.GroupID] received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent]) - sig,_ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) - prevsig,_ := base64.StdEncoding.DecodeString(ev.Data[event.PreviousSignature]) + sig, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) + prevsig, _ := base64.StdEncoding.DecodeString(ev.Data[event.PreviousSignature]) message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: sig, PreviousMessageSig: prevsig, Acknowledged: true} ss, exists := ps.streamStores[groupid] if exists {