forked from cwtch.im/cwtch
Remove RetryPeer event, Poke token count on new group
This commit is contained in:
parent
7053f4a31b
commit
3b822393cd
|
@ -170,7 +170,6 @@ func (cr *contactRetry) run() {
|
|||
cr.bus.Subscribe(event.PeerStateChange, cr.queue)
|
||||
cr.bus.Subscribe(event.ACNStatus, cr.queue)
|
||||
cr.bus.Subscribe(event.ServerStateChange, cr.queue)
|
||||
cr.bus.Subscribe(event.PeerRequest, cr.queue)
|
||||
cr.bus.Subscribe(event.QueuePeerRequest, cr.queue)
|
||||
cr.bus.Subscribe(event.QueueJoinServer, cr.queue)
|
||||
cr.bus.Subscribe(event.ProtocolEngineShutdown, cr.queue)
|
||||
|
@ -341,7 +340,7 @@ func (cr *contactRetry) requeueReady() {
|
|||
|
||||
func (cr *contactRetry) publishConnectionRequest(contact *contact) {
|
||||
if contact.ctype == peerConn {
|
||||
cr.bus.Publish(event.NewEvent(event.RetryPeerRequest, map[event.Field]string{event.RemotePeer: contact.id}))
|
||||
cr.bus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: contact.id}))
|
||||
}
|
||||
if contact.ctype == serverConn {
|
||||
cr.bus.Publish(event.NewEvent(event.RetryServerRequest, map[event.Field]string{event.GroupServer: contact.id}))
|
||||
|
@ -373,7 +372,10 @@ func (cr *contactRetry) handleEvent(id string, state connections.ConnectionState
|
|||
}
|
||||
|
||||
if _, exists := cr.connections.Load(id); !exists {
|
||||
cr.addConnection(id, state, ctype, event.CwtchEpoch)
|
||||
// We have an event for something we don't know about...
|
||||
// The only reason this should happen is if a *new* Peer/Server connection has changed.
|
||||
// Let's set the timeout to Now() to indicate that this is a fresh connection, and so should likely be prioritized.
|
||||
cr.addConnection(id, state, ctype, time.Now())
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -25,12 +25,6 @@ const (
|
|||
// GroupServer
|
||||
QueuePeerRequest = Type("QueuePeerRequest")
|
||||
|
||||
// RetryPeerRequest
|
||||
// Identical to PeerRequest, but allows Engine to make decisions regarding blocked peers
|
||||
// attributes:
|
||||
// RemotePeer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
|
||||
RetryPeerRequest = Type("RetryPeerRequest")
|
||||
|
||||
// RetryServerRequest
|
||||
// Asks CwtchPeer to retry a server connection...
|
||||
// GroupServer: [eg "chpr7qm6op5vfcg2pi4vllco3h6aa7exexc4rqwnlupqhoogx2zgd6qd"
|
||||
|
|
|
@ -836,13 +836,16 @@ func (cp *cwtchPeer) StartGroup(name string, server string) (int, error) {
|
|||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupServer)), group.GroupServer)
|
||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.GroupKey)), base64.StdEncoding.EncodeToString(group.GroupKey[:]))
|
||||
cp.SetConversationAttribute(conversationID, attr.LocalScope.ConstructScopedZonedPath(attr.ProfileZone.ConstructZonedPath(constants.Name)), name)
|
||||
|
||||
cp.eventBus.Publish(event.NewEvent(event.GroupCreated, map[event.Field]string{
|
||||
event.ConversationID: strconv.Itoa(conversationID),
|
||||
event.GroupID: group.GroupID,
|
||||
event.GroupServer: group.GroupServer,
|
||||
event.GroupName: name,
|
||||
}))
|
||||
// Trigger an Antispam payment. We need to do this for two reasons
|
||||
// 1. This server is new and we don't have any antispam tokens yet
|
||||
// 2. This group is new and needs it's count refreshed
|
||||
cp.MakeAntispamPayment(server)
|
||||
return conversationID, nil
|
||||
}
|
||||
log.Errorf("error creating group: %v", err)
|
||||
|
@ -956,7 +959,7 @@ func (cp *cwtchPeer) PeerWithOnion(onion string) {
|
|||
if err == nil {
|
||||
lastSeen = cp.GetConversationLastSeenTime(ci.ID)
|
||||
}
|
||||
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
|
||||
cp.eventBus.Publish(event.NewEvent(event.QueuePeerRequest, map[event.Field]string{event.RemotePeer: onion, event.LastSeen: lastSeen.Format(time.RFC3339Nano)}))
|
||||
}
|
||||
|
||||
// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
|
||||
|
|
|
@ -110,7 +110,6 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
|
|||
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue)
|
||||
engine.eventManager.Subscribe(event.ProtocolEngineShutdown, engine.queue)
|
||||
engine.eventManager.Subscribe(event.PeerRequest, engine.queue)
|
||||
engine.eventManager.Subscribe(event.RetryPeerRequest, engine.queue)
|
||||
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue)
|
||||
engine.eventManager.Subscribe(event.JoinServer, engine.queue)
|
||||
engine.eventManager.Subscribe(event.LeaveServer, engine.queue)
|
||||
|
@ -163,13 +162,6 @@ func (e *engine) eventHandler() {
|
|||
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
||||
go e.peerWithOnion(ev.Data[event.RemotePeer])
|
||||
}
|
||||
case event.RetryPeerRequest:
|
||||
// This event allows engine to treat (automated) retry peering requests differently to user-specified
|
||||
// peer events
|
||||
if torProvider.IsValidHostname(ev.Data[event.RemotePeer]) {
|
||||
log.Debugf("Retrying Peer Request: %v", ev.Data[event.RemotePeer])
|
||||
go e.peerWithOnion(ev.Data[event.RemotePeer])
|
||||
}
|
||||
case event.InvitePeerToGroup:
|
||||
err := e.sendPeerMessage(ev.Data[event.RemotePeer], pmodel.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])})
|
||||
if err != nil {
|
||||
|
@ -397,6 +389,10 @@ func (e *engine) makeAntispamPayment(onion string) {
|
|||
return
|
||||
}
|
||||
|
||||
// Before doing anything, send and event with the current number of token
|
||||
// This may unblock downstream processes who don't have an accurate token count
|
||||
e.PokeTokenCount(onion)
|
||||
|
||||
conn, err := ephemeralService.service.GetConnection(onion)
|
||||
if err == nil {
|
||||
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||
|
|
|
@ -51,3 +51,9 @@ func (e *engine) FetchToken(tokenService string) (*privacypass.Token, int, error
|
|||
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(numTokens)}))
|
||||
return token, numTokens, err
|
||||
}
|
||||
|
||||
func (e *engine) PokeTokenCount(tokenService string) {
|
||||
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenService, NewTokenManager())
|
||||
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||
e.eventManager.Publish(event.NewEvent(event.TokenManagerInfo, map[event.Field]string{event.ServerTokenOnion: tokenService, event.ServerTokenCount: strconv.Itoa(tokenManager.NumTokens())}))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue