diff --git a/app/app.go b/app/app.go index ecef3ef..36950d9 100644 --- a/app/app.go +++ b/app/app.go @@ -160,6 +160,8 @@ func (app *application) ListProfiles() []string { // GetPeer returns a cwtchPeer for a given onion address func (app *application) GetPeer(onion string) peer.CwtchPeer { + app.appmutex.Lock() + defer app.appmutex.Unlock() if profile, ok := app.peers[onion]; ok { return profile } diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 6a4f345..cb36c20 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -170,7 +170,6 @@ func (cr *contactRetry) run() { cr.bus.Subscribe(event.PeerStateChange, cr.queue) cr.bus.Subscribe(event.ACNStatus, cr.queue) cr.bus.Subscribe(event.ServerStateChange, cr.queue) - cr.bus.Subscribe(event.PeerRequest, cr.queue) cr.bus.Subscribe(event.QueuePeerRequest, cr.queue) cr.bus.Subscribe(event.QueueJoinServer, cr.queue) cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue) @@ -341,7 +340,7 @@ func (cr *contactRetry) requeueReady() { func (cr *contactRetry) publishConnectionRequest(contact *contact) { if contact.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: contact.id})) + cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id})) } if contact.ctype == serverConn { cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: contact.id})) @@ -361,6 +360,13 @@ func (cr *contactRetry) addConnection(id string, state connections.ConnectionSta cr.connections.Store(id, p) cr.connCount += 1 return + } else { + // we have rerequested this connnection. Force set the queued parameter to true. + p, _ := cr.connections.Load(id) + if !p.(*contact).queued { + p.(*contact).queued = true + cr.connCount += 1 + } } } @@ -373,7 +379,10 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState } if _, exists := cr.connections.Load(id); !exists { - cr.addConnection(id, state, ctype, event.CwtchEpoch) + // We have an event for something we don't know about... + // The only reason this should happen is if a *new* Peer/Server connection has changed. + // Let's set the timeout to Now() to indicate that this is a fresh connection, and so should likely be prioritized. + cr.addConnection(id, state, ctype, time.Now()) return } diff --git a/app/plugins/contactRetry_test.go b/app/plugins/contactRetry_test.go new file mode 100644 index 0000000..f3c446d --- /dev/null +++ b/app/plugins/contactRetry_test.go @@ -0,0 +1,73 @@ +package plugins + +import ( + "testing" + "time" + + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/protocol/connections" + "git.openprivacy.ca/openprivacy/log" +) + +// TestContactRetryQueue simulates some basic connection queueing +// NOTE: This whole test is a race condition, and does flag go's detector +// We are invasively checking the internal state of the retry plugin and accessing pointers from another +// thread. +// We could build an entire thread safe monitoring functonality, but that would dramatically expand the scope of this test. +func TestContactRetryQueue(t *testing.T) { + log.SetLevel(log.LevelDebug) + bus := event.NewEventManager() + cr := NewConnectionRetry(bus, "").(*contactRetry) + cr.ACNUp = true // fake an ACN connection... + go cr.run() + + t.Logf("contact plugin up and running..sending peer connection...") + // Assert that there is a peer connection identified as "test" + bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: "test", event.LastSeen: "test"})) + + // Wait until the test actually exists, and is queued + // This is the worst part of this test setup. Ideally we would sleep, or some other yielding, but + // go test scheduling doesn't like that and even sleeping long periods won't cause the event thread to make + // progress... + for { + if pinf, exists := cr.connections.Load("test"); exists { + if pinf.(*contact).queued { + break + } + } + } + + pinf, _ := cr.connections.Load("test") + if pinf.(*contact).queued == false { + t.Fatalf("test connection should be queued, actually: %v", pinf.(*contact).queued) + } + + // Asset that "test" is authenticated + cr.handleEvent("test", connections.AUTHENTICATED, peerConn) + + // Assert that "test has a valid state" + pinf, _ = cr.connections.Load("test") + if pinf.(*contact).state != 3 { + t.Fatalf("test connection should be in authenticated after update, actually: %v", pinf.(*contact).state) + } + + // Publish an unrelated event to trigger the Plugin to go through a queuing cycle + // If we didn't do this we would have to wait 30 seconds for a check-in + bus.Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{event.RemotePeer: "test2", event.ConnectionState: "Disconnected"})) + time.Sleep(time.Second) + if pinf.(*contact).queued != false { + t.Fatalf("test connection should not be queued, actually: %v", pinf.(*contact).queued) + } + + // Publish a new peer request... + bus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: "test"})) + time.Sleep(time.Second) // yield for a second so the event can catch up... + + // Peer test should be forced to queue.... + pinf, _ = cr.connections.Load("test") + if pinf.(*contact).queued != true { + t.Fatalf("test connection should be forced to queue after new queue peer request") + } + + cr.Shutdown() +} diff --git a/event/common.go b/event/common.go index 58c57cd..2095790 100644 --- a/event/common.go +++ b/event/common.go @@ -25,12 +25,6 @@ const ( // GroupServer QueuePeerRequest = Type("QueuePeerRequest") - // RetryPeerRequest - // Identical to PeerRequest, but allows Engine to make decisions regarding blocked peers - // attributes: - // RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd" - RetryPeerRequest = Type("RetryPeerRequest") - // RetryServerRequest // Asks CwtchPeer to retry a server connection... // GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd" diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 2d6b363..3cebec5 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -2,19 +2,11 @@ package peer import ( "crypto/rand" - "cwtch.im/cwtch/model/constants" - "cwtch.im/cwtch/protocol/groups" - "cwtch.im/cwtch/settings" "encoding/base64" "encoding/hex" "encoding/json" "errors" "fmt" - "git.openprivacy.ca/cwtch.im/tapir/primitives" - "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" - "git.openprivacy.ca/openprivacy/connectivity" - "git.openprivacy.ca/openprivacy/connectivity/tor" - "golang.org/x/crypto/ed25519" "os" path "path/filepath" "sort" @@ -23,6 +15,15 @@ import ( "sync" "time" + "cwtch.im/cwtch/model/constants" + "cwtch.im/cwtch/protocol/groups" + "cwtch.im/cwtch/settings" + "git.openprivacy.ca/cwtch.im/tapir/primitives" + "git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass" + "git.openprivacy.ca/openprivacy/connectivity" + "git.openprivacy.ca/openprivacy/connectivity/tor" + "golang.org/x/crypto/ed25519" + "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/model/attr" @@ -672,6 +673,7 @@ func (cp *cwtchPeer) NewContactConversation(handle string, acl model.AccessContr conversationInfo, _ := cp.storage.GetConversationByHandle(handle) if conversationInfo == nil { conversationID, err := cp.storage.NewConversation(handle, model.Attributes{event.SaveHistoryKey: event.DeleteHistoryDefault}, model.AccessControlList{handle: acl}, accepted) + cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.AttrLastConnectionTime)), time.Now().Format(time.RFC3339Nano)) cp.eventBus.Publish(event.NewEvent(event.ContactCreated, map[event.Field]string{event.ConversationID: strconv.Itoa(conversationID), event.RemotePeer: handle})) return conversationID, err } @@ -836,13 +838,16 @@ func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) { cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), group.GroupServer) cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(group.GroupKey[:])) cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), name) - cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{ event.ConversationID: strconv.Itoa(conversationID), event.GroupID: group.GroupID, event.GroupServer: group.GroupServer, event.GroupName: name, })) + // Trigger an Antispam payment. We need to do this for two reasons + // 1. This server is new and we don't have any antispam tokens yet + // 2. This group is new and needs it's count refreshed + cp.MakeAntispamPayment(server) return conversationID, nil } log.Errorf("error creating group: %v", err) @@ -948,15 +953,11 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState { return connections.DISCONNECTED } -// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion -// address. +// PeerWithOnion represents a request to connect immediately to a given peer. Instead +// of checking the last seed time, cwtch will treat the current time as the time of last action. func (cp *cwtchPeer) PeerWithOnion(onion string) { - lastSeen := event.CwtchEpoch - ci, err := cp.FetchConversationInfo(onion) - if err == nil { - lastSeen = cp.GetConversationLastSeenTime(ci.ID) - } - cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) + lastSeen := time.Now() + cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) } // QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index a264f47..20e6d63 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -110,7 +110,6 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue) engine.eventManager.Subscribe(event.ProtocolEngineShutdown, engine.queue) engine.eventManager.Subscribe(event.PeerRequest, engine.queue) - engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue) engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue) engine.eventManager.Subscribe(event.JoinServer, engine.queue) engine.eventManager.Subscribe(event.LeaveServer, engine.queue) @@ -163,13 +162,6 @@ func (e *engine) eventHandler() { if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { go e.peerWithOnion(ev.Data[event.RemotePeer]) } - case event.RetryPeerRequest: - // This event allows engine to treat (automated) retry peering requests differently to user-specified - // peer events - if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { - log.Debugf("Retrying Peer Request: %v", ev.Data[event.RemotePeer]) - go e.peerWithOnion(ev.Data[event.RemotePeer]) - } case event.InvitePeerToGroup: err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])}) if err != nil { @@ -397,6 +389,10 @@ func (e *engine) makeAntispamPayment(onion string) { return } + // Before doing anything, send and event with the current number of token + // This may unblock downstream processes who don't have an accurate token count + e.PokeTokenCount(onion) + conn, err := ephemeralService.service.GetConnection(onion) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) diff --git a/protocol/connections/engine_token_handler.go b/protocol/connections/engine_token_handler.go index ec02adc..01b48c7 100644 --- a/protocol/connections/engine_token_handler.go +++ b/protocol/connections/engine_token_handler.go @@ -51,3 +51,9 @@ func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)})) return token, numTokens, err } + +func (e *engine) PokeTokenCount(tokenService string) { + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager()) + tokenManager := tokenManagerPointer.(*TokenManager) + e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())})) +} diff --git a/testing/tests.sh b/testing/tests.sh index bfd80a0..3881ca6 100755 --- a/testing/tests.sh +++ b/testing/tests.sh @@ -3,7 +3,7 @@ set -e pwd GORACE="haltonerror=1" -go test -race ${1} -coverprofile=plugins.cover.out -v ./app/plugins +go test -coverprofile=plugins.cover.out -v ./app/plugins go test -race ${1} -coverprofile=model.cover.out -v ./model go test -race ${1} -coverprofile=event.cover.out -v ./event go test -race ${1} -coverprofile=storage.v1.cover.out -v ./storage/v1