Streamline sending peer messages and unify error handling
continuous-integration/drone/pr Build is pending Details
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Sarah Jamie Lewis 2021-09-03 11:26:31 -07:00
parent c373f68fbc
commit d91d5ffdd3
6 changed files with 59 additions and 55 deletions

View File

@ -252,7 +252,7 @@ const (
// File Handling Events
ShareManifest = Type("ShareManifest")
ManifestSizeReceived = Type("ManifestSizeReceived")
ManifestError = Type("ManifestError")
ManifestError = Type("ManifestError")
ManifestReceived = Type("ManifestReceived")
ManifestSaved = Type("ManifestSaved")
FileDownloadProgressUpdate = Type("FileDownloadProgressUpdate")

View File

@ -59,10 +59,9 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handl
return err
}
message := OverlayMessage{
Name: path.Base(manifest.FileName),
Hash: hex.EncodeToString(manifest.RootHash),
Hash: hex.EncodeToString(manifest.RootHash),
Nonce: hex.EncodeToString(nonce[:]),
Size: manifest.FileSizeInBytes,
}
@ -78,7 +77,6 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handl
key := fmt.Sprintf("%x.%x", manifest.RootHash, nonce)
serializedManifest, _ := json.Marshal(manifest)
// Store the size of the manifest (in chunks) as part of the public scope so contacts who we share the file with
// can fetch the manifest as if it were a file.
profile.SetAttribute(attr.GetPublicScope(fmt.Sprintf("%s.manifest.size", key)), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest))/float64(files.DefaultChunkSize)))))

View File

@ -842,7 +842,7 @@ func (cp *cwtchPeer) eventHandler() {
if scope == attr.PublicScope {
if strings.HasSuffix(path, ".manifest.size") {
fileKey := strings.Replace(path, ".manifest.size", "", 1)
size,err := strconv.Atoi(val)
size, err := strconv.Atoi(val)
// if size is valid and below the maximum size for a manifest
// this is to prevent malicious sharers from using large amounts of memory when distributing
// a manifest as we reconstruct this in-memory

View File

@ -8,7 +8,6 @@ import (
model3 "cwtch.im/cwtch/protocol/model"
"encoding/base64"
"encoding/json"
"errors"
"git.openprivacy.ca/cwtch.im/tapir"
"git.openprivacy.ca/cwtch.im/tapir/applications"
"git.openprivacy.ca/cwtch.im/tapir/networks/tor"
@ -135,7 +134,10 @@ func (e *engine) eventHandler() {
go e.peerWithOnion(ev.Data[event.RemotePeer])
}
case event.InvitePeerToGroup:
e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite]))
err := e.sendPeerMessage(ev.Data[event.RemotePeer], model3.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])})
if err != nil {
}
case event.JoinServer:
signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature])
if err != nil {
@ -162,14 +164,17 @@ func (e *engine) eventHandler() {
if !ok {
context = event.ContextRaw
}
err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.Data]))
if err != nil {
if err := e.sendPeerMessage(ev.Data[event.RemotePeer], model3.PeerMessage{ID: ev.EventID, Context: context, Data: []byte(ev.Data[event.Data])}); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"}))
}
case event.SendGetValMessageToPeer:
e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path])
if err := e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path]); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
case event.SendRetValMessageToPeer:
e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists])
if err := e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
case event.SetPeerAuthorization:
auth := model.Authorization(ev.Data[event.Authorization])
e.authorizations.Store(ev.Data[event.RemotePeer], auth)
@ -197,13 +202,20 @@ func (e *engine) eventHandler() {
handle := ev.Data[event.Handle]
key := ev.Data[event.FileKey]
size, _ := strconv.Atoi(ev.Data[event.ManifestSize])
go e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size)))
if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
case event.ManifestSaved:
handle := ev.Data[event.Handle]
key := ev.Data[event.FileKey]
serializedManifest := ev.Data[event.SerializedManifest]
// NOTE: for now there will probably only ever be a single chunk request. When we enable group
// sharing and rehosting then this loop will serve as a a way of splitting the request among multiple
// contacts
for _, message := range e.filesharingSubSystem.CompileChunkRequests(key, serializedManifest) {
go e.sendPeerMessage(handle, message)
if err := e.sendPeerMessage(handle, message); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()}))
}
}
default:
return
@ -211,7 +223,6 @@ func (e *engine) eventHandler() {
}
}
func (e *engine) isBlocked(onion string) bool {
authorization, known := e.authorizations.Load(onion)
if !known {
@ -429,20 +440,6 @@ func (e *engine) peerDisconnected(onion string) {
}))
}
// sendMessageToPeer sends a message to a peer under a given context
func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error {
conn, err := e.service.WaitForCapabilityOrClose(onion, cwtchCapability)
if err == nil {
peerApp, ok := (conn.App()).(*PeerApp)
if ok {
peerApp.SendMessage(model3.PeerMessage{ID: eventID, Context: context, Data: message})
return nil
}
return errors.New("failed type assertion conn.App != PeerApp")
}
return err
}
func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error {
log.Debugf("sendGetValMessage to peer %v %v.%v\n", onion, scope, path)
getVal := peerGetVal{Scope: scope, Path: path}
@ -450,7 +447,7 @@ func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error {
if err != nil {
return err
}
return e.sendMessageToPeer(eventID, onion, event.ContextGetVal, message)
return e.sendPeerMessage(onion, model3.PeerMessage{ID: eventID, Context: event.ContextGetVal, Data: message})
}
func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error {
@ -461,7 +458,7 @@ func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error {
if err != nil {
return err
}
return e.sendMessageToPeer(eventID, onion, event.ContextRetVal, message)
return e.sendPeerMessage(onion, model3.PeerMessage{ID: eventID, Context: event.ContextRetVal, Data: message})
}
func (e *engine) deleteConnection(id string) {
@ -501,7 +498,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
}
} else {
// Broadast the token error
// Broadcast the token error
e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
}
} else if numtokens < 5 {
@ -526,7 +523,9 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
}
} else if context == event.ContextRequestManifest {
for _, message := range e.filesharingSubSystem.RequestManifestParts(eventID) {
e.sendPeerMessage(hostname, message)
if err := e.sendPeerMessage(hostname, message); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()}))
}
}
} else if context == event.ContextSendManifest {
if fileKey, manifest := e.filesharingSubSystem.ReceiveManifestPart(eventID, message); len(manifest) != 0 {
@ -535,7 +534,9 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
}
} else if context == event.ContextRequestFile {
for _, message := range e.filesharingSubSystem.ProcessChunkRequest(eventID, message) {
e.sendPeerMessage(hostname, message)
if err := e.sendPeerMessage(hostname, message); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()}))
}
}
} else if context == event.ContextSendFile {
fileKey, downloaded, progress, totalChunks, _ := e.filesharingSubSystem.ProcessChunk(eventID, message)
@ -547,7 +548,14 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri
e.eventManager.Publish(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string{event.FileKey: fileKey, event.Progress: strconv.Itoa(int(progress)), event.FileSizeInChunks: strconv.Itoa(int(totalChunks))}))
}
} else {
// Fall through handler for the default text conversation.
e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)}))
// Send an explicit acknowledgement
// Every other protocol should have a explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow
if err := e.sendPeerMessage(hostname, model3.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil {
e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()}))
}
}
}
@ -578,12 +586,15 @@ func (e *engine) leaveServer(server string) {
}
}
func (e *engine) sendPeerMessage(handle string, message model3.PeerMessage) {
func (e *engine) sendPeerMessage(handle string, message model3.PeerMessage) error {
conn, err := e.service.WaitForCapabilityOrClose(handle, cwtchCapability)
if err == nil {
peerApp, ok := (conn.App()).(*PeerApp)
if ok {
peerApp.SendMessage(message)
return nil
}
}
log.Errorf("could not send peer message: %v", err)
return err
}

View File

@ -98,9 +98,6 @@ func (pa *PeerApp) listen() {
default:
if pa.IsAllowed(pa.connection.Hostname()) {
pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data)
// Acknowledge the message
pa.SendMessage(model2.PeerMessage{ID: peerMessage.ID, Context: event.ContextAck, Data: []byte{}})
}
}
} else {
@ -117,4 +114,4 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) {
}
serialized, _ := json.Marshal(message)
pa.connection.Send(serialized)
}
}

View File

@ -24,8 +24,6 @@ type FileSharingSubSystem struct {
activeDownloads sync.Map // file key to manifests
}
// ShareFile given a file key and a serialized manifest, allow the serialized manifest to be downloaded
// by Cwtch profiles in possession of the fileKey
func (fsss *FileSharingSubSystem) ShareFile(fileKey string, serializedManifest string) {
@ -79,7 +77,7 @@ func (fsss *FileSharingSubSystem) RequestManifestParts(fileKey string) []model.P
manifest := manifestI.(*Manifest)
serializedManifest := manifest.Serialize()
log.Debugf("found serialized manifest: %s", serializedManifest)
chunkID := 0;
chunkID := 0
for i := 0; i < len(serializedManifest); i += DefaultChunkSize {
offset := i
end := i + DefaultChunkSize
@ -94,7 +92,7 @@ func (fsss *FileSharingSubSystem) RequestManifestParts(fileKey string) []model.P
ID: fmt.Sprintf("%s.%d", fileKey, chunkID),
Data: chunk,
})
chunkID+=1
chunkID++
}
}
return messages
@ -147,21 +145,21 @@ func (fsss *FileSharingSubSystem) ProcessChunkRequest(fileKey string, serialized
if exists {
manifest := manifestI.(*Manifest)
log.Debugf("manifest found: %x", manifest.RootHash)
chunkSpec, err := Deserialize(string(serializedChunkRequest))
log.Debugf("deserialized chunk spec found: %v [%s]", chunkSpec, serializedChunkRequest)
if err == nil {
for _, chunk := range *chunkSpec {
contents, err := manifest.GetChunkBytes(chunk)
if err == nil {
log.Debugf("sending chunk: %v %x", chunk, contents)
messages = append(messages, model.PeerMessage{
ID: fmt.Sprintf("%v.%d", fileKey, chunk),
Context: event.ContextSendFile,
Data: contents,
})
}
chunkSpec, err := Deserialize(string(serializedChunkRequest))
log.Debugf("deserialized chunk spec found: %v [%s]", chunkSpec, serializedChunkRequest)
if err == nil {
for _, chunk := range *chunkSpec {
contents, err := manifest.GetChunkBytes(chunk)
if err == nil {
log.Debugf("sending chunk: %v %x", chunk, contents)
messages = append(messages, model.PeerMessage{
ID: fmt.Sprintf("%v.%d", fileKey, chunk),
Context: event.ContextSendFile,
Data: contents,
})
}
}
}
}
return messages
}