From 3b822393cde00d6ee6bc6722a6476bde2f864f75 Mon Sep 17 00:00:00 2001 From: Sarah Jamie Lewis Date: Tue, 2 May 2023 12:26:46 -0700 Subject: [PATCH] Remove RetryPeer event, Poke token count on new group --- app/plugins/contactRetry.go | 8 +++++--- event/common.go | 6 ------ peer/cwtch_peer.go | 7 +++++-- protocol/connections/engine.go | 12 ++++-------- protocol/connections/engine_token_handler.go | 6 ++++++ 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/app/plugins/contactRetry.go b/app/plugins/contactRetry.go index 6a4f345..7b29b0d 100644 --- a/app/plugins/contactRetry.go +++ b/app/plugins/contactRetry.go @@ -170,7 +170,6 @@ func (cr *contactRetry) run() { cr.bus.Subscribe(event.PeerStateChange, cr.queue) cr.bus.Subscribe(event.ACNStatus, cr.queue) cr.bus.Subscribe(event.ServerStateChange, cr.queue) - cr.bus.Subscribe(event.PeerRequest, cr.queue) cr.bus.Subscribe(event.QueuePeerRequest, cr.queue) cr.bus.Subscribe(event.QueueJoinServer, cr.queue) cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue) @@ -341,7 +340,7 @@ func (cr *contactRetry) requeueReady() { func (cr *contactRetry) publishConnectionRequest(contact *contact) { if contact.ctype == peerConn { - cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: contact.id})) + cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id})) } if contact.ctype == serverConn { cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: contact.id})) @@ -373,7 +372,10 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState } if _, exists := cr.connections.Load(id); !exists { - cr.addConnection(id, state, ctype, event.CwtchEpoch) + // We have an event for something we don't know about... + // The only reason this should happen is if a *new* Peer/Server connection has changed. + // Let's set the timeout to Now() to indicate that this is a fresh connection, and so should likely be prioritized. + cr.addConnection(id, state, ctype, time.Now()) return } diff --git a/event/common.go b/event/common.go index 58c57cd..2095790 100644 --- a/event/common.go +++ b/event/common.go @@ -25,12 +25,6 @@ const ( // GroupServer QueuePeerRequest = Type("QueuePeerRequest") - // RetryPeerRequest - // Identical to PeerRequest, but allows Engine to make decisions regarding blocked peers - // attributes: - // RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd" - RetryPeerRequest = Type("RetryPeerRequest") - // RetryServerRequest // Asks CwtchPeer to retry a server connection... // GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd" diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 2d6b363..57c28d2 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -836,13 +836,16 @@ func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) { cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), group.GroupServer) cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(group.GroupKey[:])) cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), name) - cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{ event.ConversationID: strconv.Itoa(conversationID), event.GroupID: group.GroupID, event.GroupServer: group.GroupServer, event.GroupName: name, })) + // Trigger an Antispam payment. We need to do this for two reasons + // 1. This server is new and we don't have any antispam tokens yet + // 2. This group is new and needs it's count refreshed + cp.MakeAntispamPayment(server) return conversationID, nil } log.Errorf("error creating group: %v", err) @@ -956,7 +959,7 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) { if err == nil { lastSeen = cp.GetConversationLastSeenTime(ci.ID) } - cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) + cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)})) } // QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index a264f47..20e6d63 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -110,7 +110,6 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue) engine.eventManager.Subscribe(event.ProtocolEngineShutdown, engine.queue) engine.eventManager.Subscribe(event.PeerRequest, engine.queue) - engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue) engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue) engine.eventManager.Subscribe(event.JoinServer, engine.queue) engine.eventManager.Subscribe(event.LeaveServer, engine.queue) @@ -163,13 +162,6 @@ func (e *engine) eventHandler() { if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { go e.peerWithOnion(ev.Data[event.RemotePeer]) } - case event.RetryPeerRequest: - // This event allows engine to treat (automated) retry peering requests differently to user-specified - // peer events - if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) { - log.Debugf("Retrying Peer Request: %v", ev.Data[event.RemotePeer]) - go e.peerWithOnion(ev.Data[event.RemotePeer]) - } case event.InvitePeerToGroup: err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])}) if err != nil { @@ -397,6 +389,10 @@ func (e *engine) makeAntispamPayment(onion string) { return } + // Before doing anything, send and event with the current number of token + // This may unblock downstream processes who don't have an accurate token count + e.PokeTokenCount(onion) + conn, err := ephemeralService.service.GetConnection(onion) if err == nil { tokenApp, ok := (conn.App()).(*TokenBoardClient) diff --git a/protocol/connections/engine_token_handler.go b/protocol/connections/engine_token_handler.go index ec02adc..01b48c7 100644 --- a/protocol/connections/engine_token_handler.go +++ b/protocol/connections/engine_token_handler.go @@ -51,3 +51,9 @@ func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)})) return token, numTokens, err } + +func (e *engine) PokeTokenCount(tokenService string) { + tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager()) + tokenManager := tokenManagerPointer.(*TokenManager) + e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())})) +}