Move state out of peerApp
This commit is contained in:
parent
902b759e81
commit
1d3f44a79f
|
@ -42,6 +42,8 @@ type engine struct {
|
|||
// Nextgen Tapir Service
|
||||
service tapir.Service
|
||||
|
||||
getValRequests sync.Map // [string]string eventID:Data
|
||||
|
||||
// Nextgen Tapir Service
|
||||
ephemeralServices sync.Map // string(onion) => tapir.Service
|
||||
|
||||
|
@ -255,6 +257,22 @@ func (e *engine) createPeerTemplate() *PeerApp {
|
|||
peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting)
|
||||
peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected)
|
||||
peerAppTemplate.RetValHandler = e.handlePeerRetVal
|
||||
|
||||
// peerApp should not store state... guarded by a hostname
|
||||
peerAppTemplate.RequestMapLookup = func(hostname, key string) (string, bool) {
|
||||
val, ok := e.getValRequests.Load(hostname+key)
|
||||
if ok {
|
||||
valStr := val.(string)
|
||||
return valStr, false
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
peerAppTemplate.RequestMapStore = func(hostname,key string, value string) {
|
||||
e.getValRequests.Store(hostname+key, value)
|
||||
}
|
||||
peerAppTemplate.RequestMapDelete = func(hostname, key string) {
|
||||
e.getValRequests.Delete(hostname+key)
|
||||
}
|
||||
return peerAppTemplate
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"git.openprivacy.ca/cwtch.im/tapir"
|
||||
"git.openprivacy.ca/cwtch.im/tapir/applications"
|
||||
"git.openprivacy.ca/openprivacy/log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const cwtchCapability = tapir.Capability("cwtchCapability")
|
||||
|
@ -25,7 +24,10 @@ type PeerApp struct {
|
|||
OnClose func(string)
|
||||
OnConnecting func(string)
|
||||
|
||||
getValRequests sync.Map // [string]string eventID:Data
|
||||
// Allow Peer App to look up Values associated with a particular connection...
|
||||
RequestMapStore func(string, string, string)
|
||||
RequestMapLookup func(string, string) (string, bool)
|
||||
RequestMapDelete func(string, string)
|
||||
}
|
||||
|
||||
type peerGetVal struct {
|
||||
|
@ -48,6 +50,9 @@ func (pa *PeerApp) NewInstance() tapir.Application {
|
|||
newApp.OnClose = pa.OnClose
|
||||
newApp.OnConnecting = pa.OnConnecting
|
||||
newApp.RetValHandler = pa.RetValHandler
|
||||
newApp.RequestMapStore = pa.RequestMapStore
|
||||
newApp.RequestMapDelete = pa.RequestMapDelete
|
||||
newApp.RequestMapLookup = pa.RequestMapLookup
|
||||
return newApp
|
||||
}
|
||||
|
||||
|
@ -88,13 +93,10 @@ func (pa *PeerApp) listen() {
|
|||
switch peerMessage.Context {
|
||||
case event.ContextAck:
|
||||
pa.OnAcknowledgement(pa.connection.Hostname(), peerMessage.ID)
|
||||
case event.ContextRetVal:
|
||||
log.Infof("Loading from map %x", pa.getValRequests)
|
||||
req, ok := pa.getValRequests.Load(peerMessage.ID)
|
||||
req, ok := pa.RequestMapLookup(pa.connection.Hostname(), peerMessage.ID)
|
||||
if ok {
|
||||
reqStr := []byte(req.(string))
|
||||
pa.RetValHandler(pa.connection.Hostname(), reqStr, peerMessage.Data)
|
||||
pa.getValRequests.Delete(peerMessage.ID)
|
||||
pa.RetValHandler(pa.connection.Hostname(), []byte(req), peerMessage.Data)
|
||||
pa.RequestMapDelete(pa.connection.Hostname(), peerMessage.ID)
|
||||
} else {
|
||||
log.Errorf("could not find val request for %v %s", peerMessage.ID, peerMessage.Data)
|
||||
}
|
||||
|
@ -113,14 +115,7 @@ func (pa *PeerApp) listen() {
|
|||
// NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol
|
||||
func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
|
||||
if message.Context == event.ContextGetVal {
|
||||
log.Infof("Saving to map %x", pa.getValRequests)
|
||||
pa.getValRequests.Store(message.ID, string(message.Data))
|
||||
// sync map is apparently not guaranteed to be consistent
|
||||
_, ok := pa.getValRequests.Load(message.ID)
|
||||
for !ok {
|
||||
log.Errorf("retrying to load value map after lookup failure")
|
||||
_, ok = pa.getValRequests.Load(message.ID)
|
||||
}
|
||||
pa.RequestMapStore(pa.connection.Hostname(), message.ID, string(message.Data))
|
||||
}
|
||||
serialized, err := json.Marshal(message)
|
||||
if err == nil {
|
||||
|
@ -129,3 +124,4 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -199,7 +199,7 @@ func (fsss *FileSharingSubSystem) ProcessChunk(chunkKey string, chunk []byte) (f
|
|||
return
|
||||
}
|
||||
|
||||
|
||||
// VerifyFile returns true if the file has been downloaded, false otherwise
|
||||
func (fsss *FileSharingSubSystem) VerifyFile(fileKey string) (downloaded bool) {
|
||||
manifestI, exists := fsss.activeDownloads.Load(fileKey)
|
||||
if exists {
|
||||
|
@ -212,4 +212,4 @@ func (fsss *FileSharingSubSystem) VerifyFile(fileKey string) (downloaded bool) {
|
|||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue