Fix Issues with Antispam triggering / Add explicit timeout calls for group servers / token aquisition and optimistic closing for peers
This commit is contained in:
parent
f52919271c
commit
d455eb6477
|
@ -241,7 +241,7 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
/// ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
|
// / ActivePeerEngine creates a peer engine for use with an ACN, should be called once the underlying ACN is online
|
||||||
func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) {
|
func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServers bool) {
|
||||||
profile := app.GetPeer(onion)
|
profile := app.GetPeer(onion)
|
||||||
if profile != nil {
|
if profile != nil {
|
||||||
|
@ -258,7 +258,7 @@ func (app *application) ActivePeerEngine(onion string, doListen, doPeers, doServ
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
|
// / DeactivatePeerEngine shutsdown and cleans up a peer engine, should be called when an underlying ACN goes offline
|
||||||
func (app *application) DeactivatePeerEngine(onion string) {
|
func (app *application) DeactivatePeerEngine(onion string) {
|
||||||
if engine, exists := app.engines[onion]; exists {
|
if engine, exists := app.engines[onion]; exists {
|
||||||
engine.Shutdown()
|
engine.Shutdown()
|
||||||
|
@ -316,7 +316,7 @@ func (app *application) ShutdownPeer(onion string) {
|
||||||
app.shutdownPeer(onion)
|
app.shutdownPeer(onion)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// shutdownPeer mutex unlocked helper shutdown peer
|
// / shutdownPeer mutex unlocked helper shutdown peer
|
||||||
func (app *application) shutdownPeer(onion string) {
|
func (app *application) shutdownPeer(onion string) {
|
||||||
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
|
app.eventBuses[onion].Publish(event.NewEventList(event.ShutdownPeer, event.Identity, onion))
|
||||||
app.eventBuses[onion].Shutdown()
|
app.eventBuses[onion].Shutdown()
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
package plugins
|
||||||
|
|
||||||
|
import (
|
||||||
|
"cwtch.im/cwtch/event"
|
||||||
|
"git.openprivacy.ca/openprivacy/log"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const antispamTickTime = 30 * time.Second
|
||||||
|
|
||||||
|
type antispam struct {
|
||||||
|
bus event.Manager
|
||||||
|
queue event.Queue
|
||||||
|
breakChan chan bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *antispam) Start() {
|
||||||
|
go a.run()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a antispam) Shutdown() {
|
||||||
|
a.breakChan <- true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *antispam) run() {
|
||||||
|
log.Infof("running antispam trigger plugin")
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(antispamTickTime):
|
||||||
|
// no fuss, just trigger the check. Downstream will filter out superfluous actions
|
||||||
|
a.bus.Publish(event.NewEvent(event.TriggerAntispamCheck, map[event.Field]string{}))
|
||||||
|
continue
|
||||||
|
case <-a.breakChan:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewAntiSpam returns a Plugin that when started will trigger antispam payments on a regular interval
|
||||||
|
func NewAntiSpam(bus event.Manager) Plugin {
|
||||||
|
cr := &antispam{bus: bus, queue: event.NewQueue(), breakChan: make(chan bool, 1)}
|
||||||
|
return cr
|
||||||
|
}
|
|
@ -13,6 +13,7 @@ type PluginID int
|
||||||
const (
|
const (
|
||||||
CONNECTIONRETRY PluginID = iota
|
CONNECTIONRETRY PluginID = iota
|
||||||
NETWORKCHECK
|
NETWORKCHECK
|
||||||
|
ANTISPAM
|
||||||
)
|
)
|
||||||
|
|
||||||
// Plugin is the interface for a plugin
|
// Plugin is the interface for a plugin
|
||||||
|
@ -28,6 +29,8 @@ func Get(id PluginID, bus event.Manager, acn connectivity.ACN, onion string) (Pl
|
||||||
return NewConnectionRetry(bus, onion), nil
|
return NewConnectionRetry(bus, onion), nil
|
||||||
case NETWORKCHECK:
|
case NETWORKCHECK:
|
||||||
return NewNetworkCheck(onion, bus, acn), nil
|
return NewNetworkCheck(onion, bus, acn), nil
|
||||||
|
case ANTISPAM:
|
||||||
|
return NewAntiSpam(bus), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("plugin not defined %v", id)
|
return nil, fmt.Errorf("plugin not defined %v", id)
|
||||||
|
|
|
@ -201,7 +201,9 @@ const (
|
||||||
StartingStorageMiragtion = Type("StartingStorageMigration")
|
StartingStorageMiragtion = Type("StartingStorageMigration")
|
||||||
DoneStorageMigration = Type("DoneStorageMigration")
|
DoneStorageMigration = Type("DoneStorageMigration")
|
||||||
|
|
||||||
TokenManagerInfo = Type("TokenManagerInfo")
|
TokenManagerInfo = Type("TokenManagerInfo")
|
||||||
|
TriggerAntispamCheck = Type("TriggerAntispamCheck")
|
||||||
|
MakeAntispamPayment = Type("MakeAntispamPayment")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Field defines common event attributes
|
// Field defines common event attributes
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
"git.openprivacy.ca/openprivacy/connectivity/tor"
|
||||||
"golang.org/x/crypto/ed25519"
|
"golang.org/x/crypto/ed25519"
|
||||||
"math/bits"
|
"math/bits"
|
||||||
|
rand2 "math/rand"
|
||||||
"os"
|
"os"
|
||||||
path "path/filepath"
|
path "path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
@ -37,7 +38,7 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true
|
||||||
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true,
|
event.ServerStateChange: true, event.NewGroupInvite: true, event.NewMessageFromPeerEngine: true,
|
||||||
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true,
|
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToPeerError: true, event.SendMessageToGroupError: true,
|
||||||
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true,
|
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true, event.ProtocolEngineStopped: true, event.RetryServerRequest: true,
|
||||||
event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true}
|
event.ManifestSizeReceived: true, event.ManifestReceived: true, event.FileDownloaded: true, event.TriggerAntispamCheck: true}
|
||||||
|
|
||||||
// DefaultEventsToHandle specifies which events will be subscribed to
|
// DefaultEventsToHandle specifies which events will be subscribed to
|
||||||
//
|
//
|
||||||
|
@ -58,6 +59,7 @@ var DefaultEventsToHandle = []event.Type{
|
||||||
event.NewRetValMessageFromPeer,
|
event.NewRetValMessageFromPeer,
|
||||||
event.ManifestReceived,
|
event.ManifestReceived,
|
||||||
event.FileDownloaded,
|
event.FileDownloaded,
|
||||||
|
event.TriggerAntispamCheck,
|
||||||
}
|
}
|
||||||
|
|
||||||
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
|
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
|
||||||
|
@ -783,7 +785,13 @@ func (cp *cwtchPeer) GetPeerState(handle string) connections.ConnectionState {
|
||||||
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
|
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
|
||||||
// address.
|
// address.
|
||||||
func (cp *cwtchPeer) PeerWithOnion(onion string) {
|
func (cp *cwtchPeer) PeerWithOnion(onion string) {
|
||||||
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
|
go func() {
|
||||||
|
// wait a random number of seconds before triggering
|
||||||
|
// this cuts down on contention in the event
|
||||||
|
randWait := time.Duration(rand2.Int() % 60)
|
||||||
|
time.Sleep(randWait * time.Second)
|
||||||
|
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendInviteToConversation kicks off the invite process
|
// SendInviteToConversation kicks off the invite process
|
||||||
|
@ -918,6 +926,14 @@ func (cp *cwtchPeer) JoinServer(onion string) error {
|
||||||
return errors.New("no keys found for server connection")
|
return errors.New("no keys found for server connection")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails...
|
||||||
|
// TODO in the future we might want to expose this in CwtchPeer interface
|
||||||
|
// Additionally we may want to add extra checks here to deduplicate groups from tokenservers to cut down
|
||||||
|
// on the number of events (right now it should be minimal)
|
||||||
|
func (cp *cwtchPeer) MakeAntispamPayment(server string) {
|
||||||
|
cp.eventBus.Publish(event.NewEvent(event.MakeAntispamPayment, map[event.Field]string{event.GroupServer: server}))
|
||||||
|
}
|
||||||
|
|
||||||
// ResyncServer completely tears down and resyncs a new server connection with the given handle
|
// ResyncServer completely tears down and resyncs a new server connection with the given handle
|
||||||
func (cp *cwtchPeer) ResyncServer(handle string) error {
|
func (cp *cwtchPeer) ResyncServer(handle string) error {
|
||||||
ci, err := cp.FetchConversationInfo(handle)
|
ci, err := cp.FetchConversationInfo(handle)
|
||||||
|
@ -1305,6 +1321,13 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
|
cp.SetConversationAttribute(serverInfo.ID, attr.LocalScope.ConstructScopedZonedPath(attr.LegacyGroupZone.ConstructZonedPath(constants.SyncMostRecentMessageTime)), mostRecentTime.Format(time.RFC3339Nano))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case event.TriggerAntispamCheck:
|
||||||
|
conversations, _ := cp.FetchConversations()
|
||||||
|
for _, conversation := range conversations {
|
||||||
|
if conversation.IsServer() {
|
||||||
|
cp.MakeAntispamPayment(conversation.Handle)
|
||||||
|
}
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
if ev.EventType != "" {
|
if ev.EventType != "" {
|
||||||
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
|
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)
|
||||||
|
|
|
@ -116,6 +116,9 @@ func NewProtocolEngine(identity primitives.Identity, privateKey ed25519.PrivateK
|
||||||
engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue)
|
engine.eventManager.Subscribe(event.ManifestSizeReceived, engine.queue)
|
||||||
engine.eventManager.Subscribe(event.ManifestSaved, engine.queue)
|
engine.eventManager.Subscribe(event.ManifestSaved, engine.queue)
|
||||||
|
|
||||||
|
// Token Server
|
||||||
|
engine.eventManager.Subscribe(event.MakeAntispamPayment, engine.queue)
|
||||||
|
|
||||||
for peer, authorization := range peerAuthorizations {
|
for peer, authorization := range peerAuthorizations {
|
||||||
engine.authorizations.Store(peer, authorization)
|
engine.authorizations.Store(peer, authorization)
|
||||||
}
|
}
|
||||||
|
@ -160,6 +163,8 @@ func (e *engine) eventHandler() {
|
||||||
signature = []byte{}
|
signature = []byte{}
|
||||||
}
|
}
|
||||||
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
go e.peerWithTokenServer(ev.Data[event.GroupServer], ev.Data[event.ServerTokenOnion], ev.Data[event.ServerTokenY], signature)
|
||||||
|
case event.MakeAntispamPayment:
|
||||||
|
go e.makeAntispamPayment(ev.Data[event.GroupServer])
|
||||||
case event.LeaveServer:
|
case event.LeaveServer:
|
||||||
e.leaveServer(ev.Data[event.GroupServer])
|
e.leaveServer(ev.Data[event.GroupServer])
|
||||||
case event.DeleteContact:
|
case event.DeleteContact:
|
||||||
|
@ -335,6 +340,31 @@ func (e *engine) peerWithOnion(onion string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *engine) makeAntispamPayment(onion string) {
|
||||||
|
log.Debugf("making antispam payment")
|
||||||
|
e.ephemeralServicesLock.Lock()
|
||||||
|
ephemeralService, ok := e.ephemeralServices[onion]
|
||||||
|
e.ephemeralServicesLock.Unlock()
|
||||||
|
|
||||||
|
if ephemeralService == nil || !ok {
|
||||||
|
log.Debugf("could not find associated group for antispam payment")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := ephemeralService.service.GetConnection(onion)
|
||||||
|
if err == nil {
|
||||||
|
tokenApp, ok := (conn.App()).(*TokenBoardClient)
|
||||||
|
if ok {
|
||||||
|
tokenManagerPointer, _ := e.tokenManagers.LoadOrStore(tokenApp.tokenServiceOnion, new(TokenManager))
|
||||||
|
tokenManager := tokenManagerPointer.(*TokenManager)
|
||||||
|
log.Debugf("checking antispam tokens %v", tokenManager.NumTokens())
|
||||||
|
if tokenManager.NumTokens() < 5 {
|
||||||
|
go tokenApp.MakePayment()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
|
// peerWithTokenServer is the entry point for cwtchPeer - server relationships
|
||||||
// needs to be run in a goroutine as will block on Open.
|
// needs to be run in a goroutine as will block on Open.
|
||||||
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
|
func (e *engine) peerWithTokenServer(onion string, tokenServerOnion string, tokenServerY string, lastKnownSignature []byte) {
|
||||||
|
|
|
@ -86,6 +86,8 @@ func (pa *PeerApp) Init(connection tapir.Connection) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// The auth protocol wasn't completed, we can safely shutdown the connection
|
// The auth protocol wasn't completed, we can safely shutdown the connection
|
||||||
|
// send an onclose here because we *may* have triggered this and we want to retry later...
|
||||||
|
pa.OnClose(connection.Hostname())
|
||||||
connection.Close()
|
connection.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,8 @@ package connections
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"cwtch.im/cwtch/protocol/groups"
|
"cwtch.im/cwtch/protocol/groups"
|
||||||
|
"cwtch.im/cwtch/utils"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"git.openprivacy.ca/cwtch.im/tapir"
|
"git.openprivacy.ca/cwtch.im/tapir"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
|
||||||
|
@ -191,28 +191,30 @@ func (ta *TokenBoardClient) MakePayment() error {
|
||||||
connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp)
|
connected, err := client.Connect(ta.tokenServiceOnion, powTokenApp)
|
||||||
if connected && err == nil {
|
if connected && err == nil {
|
||||||
log.Debugf("Waiting for successful Token Acquisition...")
|
log.Debugf("Waiting for successful Token Acquisition...")
|
||||||
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
|
tp := utils.TimeoutPolicy(time.Second * 30)
|
||||||
if err == nil {
|
err := tp.ExecuteAction(func() error {
|
||||||
powtapp, ok := conn.App().(*applications.TokenApplication)
|
conn, err := client.WaitForCapabilityOrClose(ta.tokenServiceOnion, applications.HasTokensCapability)
|
||||||
if ok {
|
if err == nil {
|
||||||
log.Debugf("Updating Tokens")
|
powtapp, ok := conn.App().(*applications.TokenApplication)
|
||||||
ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens)
|
if ok {
|
||||||
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
|
log.Debugf("Updating Tokens")
|
||||||
conn.Close()
|
ta.tokenBoardHandler.NewTokenHandler(ta.tokenServiceOnion, powtapp.Tokens)
|
||||||
|
log.Debugf("Transcript: %v", powtapp.Transcript().OutputTranscriptToAudit())
|
||||||
|
conn.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App()))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App()))
|
return nil
|
||||||
return errors.New("invalid cast of powapp. this should never happen")
|
})
|
||||||
|
|
||||||
|
// we timed out
|
||||||
|
if err != nil {
|
||||||
|
ta.connection.Close()
|
||||||
}
|
}
|
||||||
log.Debugf("could not connect to payment server..trying again: %v", err)
|
|
||||||
return ta.MakePayment()
|
|
||||||
} else if connected && err != nil {
|
|
||||||
log.Debugf("inexplicable error: %v", err)
|
|
||||||
}
|
}
|
||||||
log.Debugf("failed to make a connection. trying again...")
|
return err
|
||||||
// it doesn't actually take that long to make a payment, so waiting a small amount of time should suffice
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
return ta.MakePayment()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NextToken retrieves the next token
|
// NextToken retrieves the next token
|
||||||
|
|
Loading…
Reference in New Issue