diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 417daf4..b85ed49 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -42,6 +42,8 @@ type engine struct { // Nextgen Tapir Service service tapir.Service + getValRequests sync.Map // [string]string eventID:Data + // Nextgen Tapir Service ephemeralServices sync.Map // string(onion) => tapir.Service @@ -255,6 +257,22 @@ func (e *engine) createPeerTemplate() *PeerApp { peerAppTemplate.OnConnecting = e.ignoreOnShutdown(e.peerConnecting) peerAppTemplate.OnClose = e.ignoreOnShutdown(e.peerDisconnected) peerAppTemplate.RetValHandler = e.handlePeerRetVal + + // peerApp should not store state... guarded by a hostname + peerAppTemplate.RequestMapLookup = func(hostname, key string) (string, bool) { + val, ok := e.getValRequests.Load(hostname+key) + if ok { + valStr := val.(string) + return valStr, false + } + return "", false + } + peerAppTemplate.RequestMapStore = func(hostname,key string, value string) { + e.getValRequests.Store(hostname+key, value) + } + peerAppTemplate.RequestMapDelete = func(hostname, key string) { + e.getValRequests.Delete(hostname+key) + } return peerAppTemplate } diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index 55d8706..573a76b 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -7,7 +7,6 @@ import ( "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/openprivacy/log" - "sync" ) const cwtchCapability = tapir.Capability("cwtchCapability") @@ -25,7 +24,10 @@ type PeerApp struct { OnClose func(string) OnConnecting func(string) - getValRequests sync.Map // [string]string eventID:Data + // Allow Peer App to look up Values associated with a particular connection... + RequestMapStore func(string, string, string) + RequestMapLookup func(string, string) (string, bool) + RequestMapDelete func(string, string) } type peerGetVal struct { @@ -48,6 +50,9 @@ func (pa *PeerApp) NewInstance() tapir.Application { newApp.OnClose = pa.OnClose newApp.OnConnecting = pa.OnConnecting newApp.RetValHandler = pa.RetValHandler + newApp.RequestMapStore = pa.RequestMapStore + newApp.RequestMapDelete = pa.RequestMapDelete + newApp.RequestMapLookup = pa.RequestMapLookup return newApp } @@ -88,13 +93,10 @@ func (pa *PeerApp) listen() { switch peerMessage.Context { case event.ContextAck: pa.OnAcknowledgement(pa.connection.Hostname(), peerMessage.ID) - case event.ContextRetVal: - log.Infof("Loading from map %x", pa.getValRequests) - req, ok := pa.getValRequests.Load(peerMessage.ID) + req, ok := pa.RequestMapLookup(pa.connection.Hostname(), peerMessage.ID) if ok { - reqStr := []byte(req.(string)) - pa.RetValHandler(pa.connection.Hostname(), reqStr, peerMessage.Data) - pa.getValRequests.Delete(peerMessage.ID) + pa.RetValHandler(pa.connection.Hostname(), []byte(req), peerMessage.Data) + pa.RequestMapDelete(pa.connection.Hostname(), peerMessage.ID) } else { log.Errorf("could not find val request for %v %s", peerMessage.ID, peerMessage.Data) } @@ -113,14 +115,7 @@ func (pa *PeerApp) listen() { // NOTE: This is a stub, we will likely want to extend this to better reflect the desired protocol func (pa *PeerApp) SendMessage(message model2.PeerMessage) error { if message.Context == event.ContextGetVal { - log.Infof("Saving to map %x", pa.getValRequests) - pa.getValRequests.Store(message.ID, string(message.Data)) - // sync map is apparently not guaranteed to be consistent - _, ok := pa.getValRequests.Load(message.ID) - for !ok { - log.Errorf("retrying to load value map after lookup failure") - _, ok = pa.getValRequests.Load(message.ID) - } + pa.RequestMapStore(pa.connection.Hostname(), message.ID, string(message.Data)) } serialized, err := json.Marshal(message) if err == nil { @@ -129,3 +124,4 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) error { } return err } + diff --git a/protocol/files/filesharing_subsystem.go b/protocol/files/filesharing_subsystem.go index f22d684..bcbf1a8 100644 --- a/protocol/files/filesharing_subsystem.go +++ b/protocol/files/filesharing_subsystem.go @@ -199,7 +199,7 @@ func (fsss *FileSharingSubSystem) ProcessChunk(chunkKey string, chunk []byte) (f return } - +// VerifyFile returns true if the file has been downloaded, false otherwise func (fsss *FileSharingSubSystem) VerifyFile(fileKey string) (downloaded bool) { manifestI, exists := fsss.activeDownloads.Load(fileKey) if exists { @@ -212,4 +212,4 @@ func (fsss *FileSharingSubSystem) VerifyFile(fileKey string) (downloaded bool) { } } return false -} \ No newline at end of file +}