Remove RetryPeer event, Poke token count on new group #513
|
@ -160,6 +160,8 @@ func (app *application) ListProfiles() []string {
|
||||||
|
|
||||||
// GetPeer returns a cwtchPeer for a given onion address
|
// GetPeer returns a cwtchPeer for a given onion address
|
||||||
func (app *application) GetPeer(onion string) peer.CwtchPeer {
|
func (app *application) GetPeer(onion string) peer.CwtchPeer {
|
||||||
|
app.appmutex.Lock()
|
||||||
|
defer app.appmutex.Unlock()
|
||||||
if profile, ok := app.peers[onion]; ok {
|
if profile, ok := app.peers[onion]; ok {
|
||||||
return profile
|
return profile
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,7 +170,6 @@ func (cr *contactRetry) run() {
|
||||||
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
|
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
|
||||||
cr.bus.Subscribe(event.ACNStatus, cr.queue)
|
cr.bus.Subscribe(event.ACNStatus, cr.queue)
|
||||||
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
|
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
|
||||||
cr.bus.Subscribe(event.PeerRequest, cr.queue)
|
|
||||||
cr.bus.Subscribe(event.QueuePeerRequest, cr.queue)
|
cr.bus.Subscribe(event.QueuePeerRequest, cr.queue)
|
||||||
cr.bus.Subscribe(event.QueueJoinServer, cr.queue)
|
cr.bus.Subscribe(event.QueueJoinServer, cr.queue)
|
||||||
cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue)
|
cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue)
|
||||||
|
@ -341,7 +340,7 @@ func (cr *contactRetry) requeueReady() {
|
||||||
|
|
||||||
func (cr *contactRetry) publishConnectionRequest(contact *contact) {
|
func (cr *contactRetry) publishConnectionRequest(contact *contact) {
|
||||||
if contact.ctype == peerConn {
|
if contact.ctype == peerConn {
|
||||||
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: contact.id}))
|
cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id}))
|
||||||
}
|
}
|
||||||
if contact.ctype == serverConn {
|
if contact.ctype == serverConn {
|
||||||
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: contact.id}))
|
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: contact.id}))
|
||||||
|
@ -361,6 +360,13 @@ func (cr *contactRetry) addConnection(id string, state connections.ConnectionSta
|
||||||
cr.connections.Store(id, p)
|
cr.connections.Store(id, p)
|
||||||
cr.connCount += 1
|
cr.connCount += 1
|
||||||
return
|
return
|
||||||
|
} else {
|
||||||
|
// we have rerequested this connnection. Force set the queued parameter to true.
|
||||||
|
p, _ := cr.connections.Load(id)
|
||||||
|
if !p.(*contact).queued {
|
||||||
|
p.(*contact).queued = true
|
||||||
|
cr.connCount += 1
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,7 +379,10 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, exists := cr.connections.Load(id); !exists {
|
if _, exists := cr.connections.Load(id); !exists {
|
||||||
cr.addConnection(id, state, ctype, event.CwtchEpoch)
|
// We have an event for something we don't know about...
|
||||||
|
// The only reason this should happen is if a *new* Peer/Server connection has changed.
|
||||||
|
// Let's set the timeout to Now() to indicate that this is a fresh connection, and so should likely be prioritized.
|
||||||
|
cr.addConnection(id, state, ctype, time.Now())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
package plugins
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"cwtch.im/cwtch/event"
|
||||||
|
"cwtch.im/cwtch/protocol/connections"
|
||||||
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestContactRetryQueue simulates some basic connection queueing
|
||||||
|
// NOTE: This whole test is a race condition, and does flag go's detector
|
||||||
|
// We are invasively checking the internal state of the retry plugin and accessing pointers from another
|
||||||
|
// thread.
|
||||||
|
// We could build an entire thread safe monitoring functonality, but that would dramatically expand the scope of this test.
|
||||||
|
func TestContactRetryQueue(t *testing.T) {
|
||||||
|
log.SetLevel(log.LevelDebug)
|
||||||
|
bus := event.NewEventManager()
|
||||||
|
cr := NewConnectionRetry(bus, "").(*contactRetry)
|
||||||
|
cr.ACNUp = true // fake an ACN connection...
|
||||||
|
go cr.run()
|
||||||
|
|
||||||
|
t.Logf("contact plugin up and running..sending peer connection...")
|
||||||
|
// Assert that there is a peer connection identified as "test"
|
||||||
|
bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: "test", event.LastSeen: "test"}))
|
||||||
|
|
||||||
|
// Wait until the test actually exists, and is queued
|
||||||
|
// This is the worst part of this test setup. Ideally we would sleep, or some other yielding, but
|
||||||
|
// go test scheduling doesn't like that and even sleeping long periods won't cause the event thread to make
|
||||||
|
// progress...
|
||||||
|
for {
|
||||||
|
if pinf, exists := cr.connections.Load("test"); exists {
|
||||||
|
if pinf.(*contact).queued {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pinf, _ := cr.connections.Load("test")
|
||||||
|
if pinf.(*contact).queued == false {
|
||||||
|
t.Fatalf("test connection should be queued, actually: %v", pinf.(*contact).queued)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Asset that "test" is authenticated
|
||||||
|
cr.handleEvent("test", connections.AUTHENTICATED, peerConn)
|
||||||
|
|
||||||
|
// Assert that "test has a valid state"
|
||||||
|
pinf, _ = cr.connections.Load("test")
|
||||||
|
if pinf.(*contact).state != 3 {
|
||||||
|
t.Fatalf("test connection should be in authenticated after update, actually: %v", pinf.(*contact).state)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish an unrelated event to trigger the Plugin to go through a queuing cycle
|
||||||
|
// If we didn't do this we would have to wait 30 seconds for a check-in
|
||||||
|
bus.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{event.RemotePeer: "test2", event.ConnectionState: "Disconnected"}))
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
if pinf.(*contact).queued != false {
|
||||||
|
t.Fatalf("test connection should not be queued, actually: %v", pinf.(*contact).queued)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish a new peer request...
|
||||||
|
bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: "test"}))
|
||||||
|
time.Sleep(time.Second) // yield for a second so the event can catch up...
|
||||||
|
|
||||||
|
// Peer test should be forced to queue....
|
||||||
|
pinf, _ = cr.connections.Load("test")
|
||||||
|
if pinf.(*contact).queued != true {
|
||||||
|
t.Fatalf("test connection should be forced to queue after new queue peer request")
|
||||||
|
}
|
||||||
|
|
||||||
|
cr.Shutdown()
|
||||||
|
}
|
|
@ -25,12 +25,6 @@ const (
|
||||||
// GroupServer
|
// GroupServer
|
||||||
QueuePeerRequest = Type("QueuePeerRequest")
|
QueuePeerRequest = Type("QueuePeerRequest")
|
||||||
|
|
||||||
// RetryPeerRequest
|
|
||||||
// Identical to PeerRequest, but allows Engine to make decisions regarding blocked peers
|
|
||||||
// attributes:
|
|
||||||
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
|
|
||||||
RetryPeerRequest = Type("RetryPeerRequest")
|
|
||||||
|
|
||||||
// RetryServerRequest
|
// RetryServerRequest
|
||||||
// Asks CwtchPeer to retry a server connection...
|
// Asks CwtchPeer to retry a server connection...
|
||||||
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
|
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
|
||||||
|
|
|
@ -2,19 +2,11 @@ package peer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"cwtch.im/cwtch/model/constants"
|
|
||||||
"cwtch.im/cwtch/protocol/groups"
|
|
||||||
"cwtch.im/cwtch/settings"
|
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
|
||||||
"git.openprivacy.ca/openprivacy/connectivity"
|
|
||||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
|
||||||
"golang.org/x/crypto/ed25519"
|
|
||||||
"os"
|
"os"
|
||||||
path "path/filepath"
|
path "path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
|
@ -23,6 +15,15 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"cwtch.im/cwtch/model/constants"
|
||||||
|
"cwtch.im/cwtch/protocol/groups"
|
||||||
|
"cwtch.im/cwtch/settings"
|
||||||
|
"git.openprivacy.ca/cwtch.im/tapir/primitives"
|
||||||
|
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
|
||||||
|
"git.openprivacy.ca/openprivacy/connectivity"
|
||||||
|
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||||
|
"golang.org/x/crypto/ed25519"
|
||||||
|
|
||||||
"cwtch.im/cwtch/event"
|
"cwtch.im/cwtch/event"
|
||||||
"cwtch.im/cwtch/model"
|
"cwtch.im/cwtch/model"
|
||||||
"cwtch.im/cwtch/model/attr"
|
"cwtch.im/cwtch/model/attr"
|
||||||
|
@ -672,6 +673,7 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr
|
||||||
conversationInfo, _ := cp.storage.GetConversationByHandle(handle)
|
conversationInfo, _ := cp.storage.GetConversationByHandle(handle)
|
||||||
if conversationInfo == nil {
|
if conversationInfo == nil {
|
||||||
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted)
|
conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted)
|
||||||
|
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano))
|
||||||
cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle}))
|
cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle}))
|
||||||
return conversationID, err
|
return conversationID, err
|
||||||
}
|
}
|
||||||
|
@ -836,13 +838,16 @@ func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) {
|
||||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), group.GroupServer)
|
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), group.GroupServer)
|
||||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(group.GroupKey[:]))
|
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(group.GroupKey[:]))
|
||||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), name)
|
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), name)
|
||||||
|
|
||||||
cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
|
cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
|
||||||
event.ConversationID: strconv.Itoa(conversationID),
|
event.ConversationID: strconv.Itoa(conversationID),
|
||||||
event.GroupID: group.GroupID,
|
event.GroupID: group.GroupID,
|
||||||
event.GroupServer: group.GroupServer,
|
event.GroupServer: group.GroupServer,
|
||||||
event.GroupName: name,
|
event.GroupName: name,
|
||||||
}))
|
}))
|
||||||
|
// Trigger an Antispam payment. We need to do this for two reasons
|
||||||
|
// 1. This server is new and we don't have any antispam tokens yet
|
||||||
|
// 2. This group is new and needs it's count refreshed
|
||||||
|
cp.MakeAntispamPayment(server)
|
||||||
return conversationID, nil
|
return conversationID, nil
|
||||||
}
|
}
|
||||||
log.Errorf("error creating group: %v", err)
|
log.Errorf("error creating group: %v", err)
|
||||||
|
@ -948,15 +953,11 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
|
||||||
return connections.DISCONNECTED
|
return connections.DISCONNECTED
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
|
// PeerWithOnion represents a request to connect immediately to a given peer. Instead
|
||||||
// address.
|
// of checking the last seed time, cwtch will treat the current time as the time of last action.
|
||||||
func (cp *cwtchPeer) PeerWithOnion(onion string) {
|
func (cp *cwtchPeer) PeerWithOnion(onion string) {
|
||||||
lastSeen := event.CwtchEpoch
|
lastSeen := time.Now()
|
||||||
ci, err := cp.FetchConversationInfo(onion)
|
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
|
||||||
if err == nil {
|
|
||||||
lastSeen = cp.GetConversationLastSeenTime(ci.ID)
|
|
||||||
}
|
|
||||||
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
|
// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
|
||||||
|
|
|
@ -110,7 +110,6 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
|
||||||
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue)
|
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue)
|
||||||
engine.eventManager.Subscribe(event.ProtocolEngineShutdown, engine.queue)
|
engine.eventManager.Subscribe(event.ProtocolEngineShutdown, engine.queue)
|
||||||
engine.eventManager.Subscribe(event.PeerRequest, engine.queue)
|
engine.eventManager.Subscribe(event.PeerRequest, engine.queue)
|
||||||
engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue)
|
|
||||||
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue)
|
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue)
|
||||||
engine.eventManager.Subscribe(event.JoinServer, engine.queue)
|
engine.eventManager.Subscribe(event.JoinServer, engine.queue)
|
||||||
engine.eventManager.Subscribe(event.LeaveServer, engine.queue)
|
engine.eventManager.Subscribe(event.LeaveServer, engine.queue)
|
||||||
|
@ -163,13 +162,6 @@ func (e *engine) eventHandler() {
|
||||||
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
||||||
go e.peerWithOnion(ev.Data[event.RemotePeer])
|
go e.peerWithOnion(ev.Data[event.RemotePeer])
|
||||||
}
|
}
|
||||||
case event.RetryPeerRequest:
|
|
||||||
// This event allows engine to treat (automated) retry peering requests differently to user-specified
|
|
||||||
// peer events
|
|
||||||
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
|
||||||
log.Debugf("Retrying Peer Request: %v", ev.Data[event.RemotePeer])
|
|
||||||
go e.peerWithOnion(ev.Data[event.RemotePeer])
|
|
||||||
}
|
|
||||||
case event.InvitePeerToGroup:
|
case event.InvitePeerToGroup:
|
||||||
err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])})
|
err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -397,6 +389,10 @@ func (e *engine) makeAntispamPayment(onion string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Before doing anything, send and event with the current number of token
|
||||||
|
// This may unblock downstream processes who don't have an accurate token count
|
||||||
|
e.PokeTokenCount(onion)
|
||||||
|
|
||||||
conn, err := ephemeralService.service.GetConnection(onion)
|
conn, err := ephemeralService.service.GetConnection(onion)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||||
|
|
|
@ -51,3 +51,9 @@ func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error
|
||||||
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)}))
|
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)}))
|
||||||
return token, numTokens, err
|
return token, numTokens, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *engine) PokeTokenCount(tokenService string) {
|
||||||
|
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
|
||||||
|
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||||
|
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())}))
|
||||||
|
}
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
set -e
|
set -e
|
||||||
pwd
|
pwd
|
||||||
GORACE="haltonerror=1"
|
GORACE="haltonerror=1"
|
||||||
go test -race ${1} -coverprofile=plugins.cover.out -v ./app/plugins
|
go test -coverprofile=plugins.cover.out -v ./app/plugins
|
||||||
go test -race ${1} -coverprofile=model.cover.out -v ./model
|
go test -race ${1} -coverprofile=model.cover.out -v ./model
|
||||||
go test -race ${1} -coverprofile=event.cover.out -v ./event
|
go test -race ${1} -coverprofile=event.cover.out -v ./event
|
||||||
go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1
|
go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1
|
||||||
|
|
Loading…
Reference in New Issue