Remove all engine state from peerHandle
This commit is contained in:
parent
8998025305
commit
62f0cfaad3
|
@ -256,23 +256,6 @@ func (e *engine) createPeerTemplate() *PeerApp {
|
||||||
peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
|
peerAppTemplate.OnAuth = e.ignoreOnShutdown(e.peerAuthed)
|
||||||
peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
|
peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
|
||||||
peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
|
peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
|
||||||
peerAppTemplate.RetValHandler = e.handlePeerRetVal
|
|
||||||
|
|
||||||
// peerApp should not store state... guarded by a hostname
|
|
||||||
peerAppTemplate.RequestMapLookup = func(hostname, key string) (string, bool) {
|
|
||||||
val, ok := e.getValRequests.Load(hostname + key)
|
|
||||||
if ok {
|
|
||||||
valStr := val.(string)
|
|
||||||
return valStr, true
|
|
||||||
}
|
|
||||||
return "", false
|
|
||||||
}
|
|
||||||
peerAppTemplate.RequestMapStore = func(hostname, key string, value string) {
|
|
||||||
e.getValRequests.Store(hostname + key, value)
|
|
||||||
}
|
|
||||||
peerAppTemplate.RequestMapDelete = func(hostname, key string) {
|
|
||||||
e.getValRequests.Delete(hostname + key)
|
|
||||||
}
|
|
||||||
return peerAppTemplate
|
return peerAppTemplate
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -466,6 +449,8 @@ func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.getValRequests.Store(onion + eventID, message)
|
||||||
return e.sendPeerMessage(onion, model3.PeerMessage{ID: eventID, Context: event.ContextGetVal, Data: message})
|
return e.sendPeerMessage(onion, model3.PeerMessage{ID: eventID, Context: event.ContextGetVal, Data: message})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -530,9 +515,22 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
|
||||||
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-connection-not-valid", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
|
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: "server-connection-not-valid", event.Signature: base64.StdEncoding.EncodeToString(sig)}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO this is becoming cluttered
|
||||||
func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) {
|
func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) {
|
||||||
log.Debugf("New message from peer: %v %v", hostname, context)
|
log.Debugf("New message from peer: %v %v", hostname, context)
|
||||||
if context == event.ContextGetVal {
|
|
||||||
|
if context == event.ContextAck {
|
||||||
|
e.peerAck(hostname, eventID)
|
||||||
|
} else if context == event.ContextRetVal {
|
||||||
|
req, ok := e.getValRequests.Load(hostname + eventID)
|
||||||
|
if ok {
|
||||||
|
reqStr := req.(string)
|
||||||
|
e.handlePeerRetVal(hostname, []byte(reqStr), message)
|
||||||
|
e.getValRequests.Delete(hostname + eventID)
|
||||||
|
} else {
|
||||||
|
log.Errorf("could not find val request for %v %s",hostname, eventID)
|
||||||
|
}
|
||||||
|
} else if context == event.ContextGetVal {
|
||||||
var getVal peerGetVal
|
var getVal peerGetVal
|
||||||
err := json.Unmarshal(message, &getVal)
|
err := json.Unmarshal(message, &getVal)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package connections
|
package connections
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"cwtch.im/cwtch/event"
|
|
||||||
model2 "cwtch.im/cwtch/protocol/model"
|
model2 "cwtch.im/cwtch/protocol/model"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"git.openprivacy.ca/cwtch.im/tapir"
|
"git.openprivacy.ca/cwtch.im/tapir"
|
||||||
|
@ -16,18 +15,12 @@ type PeerApp struct {
|
||||||
applications.AuthApp
|
applications.AuthApp
|
||||||
connection tapir.Connection
|
connection tapir.Connection
|
||||||
MessageHandler func(string, string, string, []byte)
|
MessageHandler func(string, string, string, []byte)
|
||||||
RetValHandler func(string, []byte, []byte)
|
|
||||||
IsBlocked func(string) bool
|
IsBlocked func(string) bool
|
||||||
IsAllowed func(string) bool
|
IsAllowed func(string) bool
|
||||||
OnAcknowledgement func(string, string)
|
OnAcknowledgement func(string, string)
|
||||||
OnAuth func(string)
|
OnAuth func(string)
|
||||||
OnClose func(string)
|
OnClose func(string)
|
||||||
OnConnecting func(string)
|
OnConnecting func(string)
|
||||||
|
|
||||||
// Allow Peer App to look up Values associated with a particular connection...
|
|
||||||
RequestMapStore func(string, string, string)
|
|
||||||
RequestMapLookup func(string, string) (string, bool)
|
|
||||||
RequestMapDelete func(string, string)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type peerGetVal struct {
|
type peerGetVal struct {
|
||||||
|
@ -49,10 +42,6 @@ func (pa *PeerApp) NewInstance() tapir.Application {
|
||||||
newApp.OnAuth = pa.OnAuth
|
newApp.OnAuth = pa.OnAuth
|
||||||
newApp.OnClose = pa.OnClose
|
newApp.OnClose = pa.OnClose
|
||||||
newApp.OnConnecting = pa.OnConnecting
|
newApp.OnConnecting = pa.OnConnecting
|
||||||
newApp.RetValHandler = pa.RetValHandler
|
|
||||||
newApp.RequestMapStore = pa.RequestMapStore
|
|
||||||
newApp.RequestMapDelete = pa.RequestMapDelete
|
|
||||||
newApp.RequestMapLookup = pa.RequestMapLookup
|
|
||||||
return newApp
|
return newApp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,22 +79,9 @@ func (pa *PeerApp) listen() {
|
||||||
var peerMessage model2.PeerMessage
|
var peerMessage model2.PeerMessage
|
||||||
err := json.Unmarshal(message, &peerMessage)
|
err := json.Unmarshal(message, &peerMessage)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
switch peerMessage.Context {
|
|
||||||
case event.ContextAck:
|
|
||||||
pa.OnAcknowledgement(pa.connection.Hostname(), peerMessage.ID)
|
|
||||||
case event.ContextRetVal:
|
|
||||||
req, ok := pa.RequestMapLookup(pa.connection.Hostname(), peerMessage.ID)
|
|
||||||
if ok {
|
|
||||||
pa.RetValHandler(pa.connection.Hostname(), []byte(req), peerMessage.Data)
|
|
||||||
pa.RequestMapDelete(pa.connection.Hostname(), peerMessage.ID)
|
|
||||||
} else {
|
|
||||||
log.Errorf("could not find val request for %v %s", peerMessage.ID, peerMessage.Data)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
if pa.IsAllowed(pa.connection.Hostname()) {
|
if pa.IsAllowed(pa.connection.Hostname()) {
|
||||||
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)
|
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err)
|
log.Errorf("Error unmarshalling PeerMessage package: %x %v", message, err)
|
||||||
}
|
}
|
||||||
|
@ -115,9 +91,6 @@ func (pa *PeerApp) listen() {
|
||||||
// SendMessage sends the peer a preformatted message
|
// SendMessage sends the peer a preformatted message
|
||||||
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
|
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
|
||||||
func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
|
func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
|
||||||
if message.Context == event.ContextGetVal {
|
|
||||||
pa.RequestMapStore(pa.connection.Hostname(), message.ID, string(message.Data))
|
|
||||||
}
|
|
||||||
serialized, err := json.Marshal(message)
|
serialized, err := json.Marshal(message)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
pa.connection.Send(serialized)
|
pa.connection.Send(serialized)
|
||||||
|
|
Loading…
Reference in New Issue