Add Server Restarts to Contact Retry Plugin #347
|
@ -83,6 +83,9 @@ func (cr *contactRetry) run() {
|
|||
if p.ctype == peerConn {
|
||||
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: p.id}))
|
||||
}
|
||||
if p.ctype == serverConn {
|
||||
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: p.id}))
|
||||
}
|
||||
return true
|
||||
})
|
||||
} else if prog != "100" {
|
||||
|
|
|
@ -19,6 +19,11 @@ const (
|
|||
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
|
||||
RetryPeerRequest = Type("RetryPeerRequest")
|
||||
|
||||
// RetryServerRequest
|
||||
// Asks CwtchPeer to retry a server connection...
|
||||
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
|
||||
RetryServerRequest = Type("RetryServerRequest")
|
||||
|
||||
// RemotePeer
|
||||
// Authorization(model.peer.Auth_...)
|
||||
SetPeerAuthorization = Type("UpdatePeerAuthorization")
|
||||
|
@ -266,6 +271,8 @@ const (
|
|||
|
||||
// Indicate whether an event was triggered by a user import
|
||||
Imported = Field("Imported")
|
||||
|
||||
Source = Field("Source")
|
||||
)
|
||||
|
||||
// Defining Common errors
|
||||
|
|
|
@ -3,12 +3,18 @@ package event
|
|||
import (
|
||||
"crypto/rand"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"math"
|
||||
"math/big"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
|
||||
|
||||
// Event is the core struct type passed around between various subsystems. Events consist of a type which can be
|
||||
// filtered on, an event ID for tracing and a map of Fields to string values.
|
||||
type Event struct {
|
||||
|
@ -54,6 +60,7 @@ type manager struct {
|
|||
mapMutex sync.Mutex
|
||||
internal chan bool
|
||||
closed bool
|
||||
trace bool
|
||||
}
|
||||
|
||||
// Manager is an interface for an event bus
|
||||
|
@ -78,6 +85,10 @@ func (em *manager) initialize() {
|
|||
em.events = make(chan []byte)
|
||||
em.internal = make(chan bool)
|
||||
em.closed = false
|
||||
|
||||
|
||||
_, em.trace = os.LookupEnv("CWTCH_EVENT_SOURCE")
|
||||
|
||||
go em.eventBus()
|
||||
}
|
||||
|
||||
|
@ -92,6 +103,20 @@ func (em *manager) Subscribe(eventType Type, queue Queue) {
|
|||
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
|
||||
func (em *manager) Publish(event Event) {
|
||||
if event.EventType != "" && em.closed != true {
|
||||
|
||||
// Debug Events for Tracing, locked behind an environment variable
|
||||
// for now.
|
||||
if em.trace {
|
||||
pc, _, _, _ := runtime.Caller(1)
|
||||
funcName := runtime.FuncForPC(pc).Name()
|
||||
lastSlash := strings.LastIndexByte(funcName, '/')
|
||||
if lastSlash < 0 {
|
||||
lastSlash = 0
|
||||
}
|
||||
lastDot := strings.LastIndexByte(funcName[lastSlash:], '.') + lastSlash
|
||||
event.Data[Source] = fmt.Sprintf("%v.%v", funcName[:lastDot], funcName[lastDot+1:])
|
||||
}
|
||||
|
||||
// Deep Copy the Event...
|
||||
eventJSON, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
|
|
|
@ -19,7 +19,7 @@ import (
|
|||
var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true,
|
||||
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeer: true,
|
||||
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true,
|
||||
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true}
|
||||
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true}
|
||||
|
||||
// DefaultEventsToHandle specifies which events will be subscribed to
|
||||
// when a peer has its Init() function called
|
||||
|
@ -32,6 +32,7 @@ var DefaultEventsToHandle = []event.Type{
|
|||
event.SendMessageToGroupError,
|
||||
event.NewGetValMessageFromPeer,
|
||||
event.ProtocolEngineStopped,
|
||||
event.RetryServerRequest,
|
||||
}
|
||||
|
||||
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
|
||||
|
@ -221,7 +222,7 @@ func (cp *cwtchPeer) AutoHandleEvents(events []event.Type) {
|
|||
}
|
||||
}
|
||||
|
||||
// ImportGroup intializes a group from an imported source rather than a peer invite
|
||||
// ImportGroup initializes a group from an imported source rather than a peer invite
|
||||
func (cp *cwtchPeer) ImportGroup(exportedInvite string) (err error) {
|
||||
if strings.HasPrefix(exportedInvite, "torv3") {
|
||||
data, err := base64.StdEncoding.DecodeString(exportedInvite[5:])
|
||||
|
@ -704,7 +705,7 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature)
|
||||
cp.mutex.Unlock()
|
||||
if ok && !seen {
|
||||
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature:base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
|
||||
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
|
||||
}
|
||||
|
||||
case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data
|
||||
|
@ -725,7 +726,10 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
cp.mutex.Lock()
|
||||
cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error])
|
||||
cp.mutex.Unlock()
|
||||
|
||||
case event.RetryServerRequest:
|
||||
// Automated Join Server Request triggered by a plugin.
|
||||
log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer])
|
||||
cp.JoinServer(ev.Data[event.GroupServer])
|
||||
case event.NewGetValMessageFromPeer:
|
||||
onion := ev.Data[event.RemotePeer]
|
||||
scope := ev.Data[event.Scope]
|
||||
|
@ -795,6 +799,7 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
}
|
||||
}
|
||||
cp.mutex.Unlock()
|
||||
|
||||
default:
|
||||
if ev.EventType != "" {
|
||||
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
|
||||
|
|
|
@ -403,8 +403,8 @@ func (ps *ProfileStoreV1) eventHandler() {
|
|||
groupid := ev.Data[event.GroupID]
|
||||
received, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
|
||||
sent, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampSent])
|
||||
sig,_ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||
prevsig,_ := base64.StdEncoding.DecodeString(ev.Data[event.PreviousSignature])
|
||||
sig, _ := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
|
||||
prevsig, _ := base64.StdEncoding.DecodeString(ev.Data[event.PreviousSignature])
|
||||
message := model.Message{Received: received, Timestamp: sent, Message: ev.Data[event.Data], PeerID: ev.Data[event.RemotePeer], Signature: sig, PreviousMessageSig: prevsig, Acknowledged: true}
|
||||
ss, exists := ps.streamStores[groupid]
|
||||
if exists {
|
||||
|
|
Loading…
Reference in New Issue