diff --git a/event/common.go b/event/common.go index f586685..2fed68e 100644 --- a/event/common.go +++ b/event/common.go @@ -252,7 +252,7 @@ const ( // File Handling Events ShareManifest = Type("ShareManifest") ManifestSizeReceived = Type("ManifestSizeReceived") - ManifestError = Type("ManifestError") + ManifestError = Type("ManifestError") ManifestReceived = Type("ManifestReceived") ManifestSaved = Type("ManifestSaved") FileDownloadProgressUpdate = Type("FileDownloadProgressUpdate") diff --git a/functionality/filesharing/filesharing_functionality.go b/functionality/filesharing/filesharing_functionality.go index c336c90..1e1486c 100644 --- a/functionality/filesharing/filesharing_functionality.go +++ b/functionality/filesharing/filesharing_functionality.go @@ -59,10 +59,9 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handl return err } - message := OverlayMessage{ Name: path.Base(manifest.FileName), - Hash: hex.EncodeToString(manifest.RootHash), + Hash: hex.EncodeToString(manifest.RootHash), Nonce: hex.EncodeToString(nonce[:]), Size: manifest.FileSizeInBytes, } @@ -78,7 +77,6 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handl key := fmt.Sprintf("%x.%x", manifest.RootHash, nonce) serializedManifest, _ := json.Marshal(manifest) - // Store the size of the manifest (in chunks) as part of the public scope so contacts who we share the file with // can fetch the manifest as if it were a file. profile.SetAttribute(attr.GetPublicScope(fmt.Sprintf("%s.manifest.size", key)), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest))/float64(files.DefaultChunkSize))))) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index c510ad3..58c114c 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -842,7 +842,7 @@ func (cp *cwtchPeer) eventHandler() { if scope == attr.PublicScope { if strings.HasSuffix(path, ".manifest.size") { fileKey := strings.Replace(path, ".manifest.size", "", 1) - size,err := strconv.Atoi(val) + size, err := strconv.Atoi(val) // if size is valid and below the maximum size for a manifest // this is to prevent malicious sharers from using large amounts of memory when distributing // a manifest as we reconstruct this in-memory diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 1623d89..dfb1ee8 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -8,7 +8,6 @@ import ( model3 "cwtch.im/cwtch/protocol/model" "encoding/base64" "encoding/json" - "errors" "git.openprivacy.ca/cwtch.im/tapir" "git.openprivacy.ca/cwtch.im/tapir/applications" "git.openprivacy.ca/cwtch.im/tapir/networks/tor" @@ -135,7 +134,10 @@ func (e *engine) eventHandler() { go e.peerWithOnion(ev.Data[event.RemotePeer]) } case event.InvitePeerToGroup: - e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], event.ContextInvite, []byte(ev.Data[event.GroupInvite])) + err := e.sendPeerMessage(ev.Data[event.RemotePeer], model3.PeerMessage{ID: ev.EventID, Context: event.ContextInvite, Data: []byte(ev.Data[event.GroupInvite])}) + if err != nil { + + } case event.JoinServer: signature, err := base64.StdEncoding.DecodeString(ev.Data[event.Signature]) if err != nil { @@ -162,14 +164,17 @@ func (e *engine) eventHandler() { if !ok { context = event.ContextRaw } - err := e.sendMessageToPeer(ev.EventID, ev.Data[event.RemotePeer], context, []byte(ev.Data[event.Data])) - if err != nil { + if err := e.sendPeerMessage(ev.Data[event.RemotePeer], model3.PeerMessage{ID: ev.EventID, Context: context, Data: []byte(ev.Data[event.Data])}); err != nil { e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: "peer is offline or the connection has yet to finalize"})) } case event.SendGetValMessageToPeer: - e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path]) + if err := e.sendGetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Scope], ev.Data[event.Path]); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + } case event.SendRetValMessageToPeer: - e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]) + if err := e.sendRetValToPeer(ev.EventID, ev.Data[event.RemotePeer], ev.Data[event.Data], ev.Data[event.Exists]); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + } case event.SetPeerAuthorization: auth := model.Authorization(ev.Data[event.Authorization]) e.authorizations.Store(ev.Data[event.RemotePeer], auth) @@ -197,13 +202,20 @@ func (e *engine) eventHandler() { handle := ev.Data[event.Handle] key := ev.Data[event.FileKey] size, _ := strconv.Atoi(ev.Data[event.ManifestSize]) - go e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))) + if err := e.sendPeerMessage(handle, e.filesharingSubSystem.FetchManifest(key, uint64(size))); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + } case event.ManifestSaved: handle := ev.Data[event.Handle] key := ev.Data[event.FileKey] serializedManifest := ev.Data[event.SerializedManifest] + // NOTE: for now there will probably only ever be a single chunk request. When we enable group + // sharing and rehosting then this loop will serve as a a way of splitting the request among multiple + // contacts for _, message := range e.filesharingSubSystem.CompileChunkRequests(key, serializedManifest) { - go e.sendPeerMessage(handle, message) + if err := e.sendPeerMessage(handle, message); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: ev.Data[event.RemotePeer], event.EventID: ev.EventID, event.Error: err.Error()})) + } } default: return @@ -211,7 +223,6 @@ func (e *engine) eventHandler() { } } - func (e *engine) isBlocked(onion string) bool { authorization, known := e.authorizations.Load(onion) if !known { @@ -429,20 +440,6 @@ func (e *engine) peerDisconnected(onion string) { })) } -// sendMessageToPeer sends a message to a peer under a given context -func (e *engine) sendMessageToPeer(eventID string, onion string, context string, message []byte) error { - conn, err := e.service.WaitForCapabilityOrClose(onion, cwtchCapability) - if err == nil { - peerApp, ok := (conn.App()).(*PeerApp) - if ok { - peerApp.SendMessage(model3.PeerMessage{ID: eventID, Context: context, Data: message}) - return nil - } - return errors.New("failed type assertion conn.App != PeerApp") - } - return err -} - func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error { log.Debugf("sendGetValMessage to peer %v %v.%v\n", onion, scope, path) getVal := peerGetVal{Scope: scope, Path: path} @@ -450,7 +447,7 @@ func (e *engine) sendGetValToPeer(eventID, onion, scope, path string) error { if err != nil { return err } - return e.sendMessageToPeer(eventID, onion, event.ContextGetVal, message) + return e.sendPeerMessage(onion, model3.PeerMessage{ID: eventID, Context: event.ContextGetVal, Data: message}) } func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error { @@ -461,7 +458,7 @@ func (e *engine) sendRetValToPeer(eventID, onion, val, existsStr string) error { if err != nil { return err } - return e.sendMessageToPeer(eventID, onion, event.ContextRetVal, message) + return e.sendPeerMessage(onion, model3.PeerMessage{ID: eventID, Context: event.ContextRetVal, Data: message}) } func (e *engine) deleteConnection(id string) { @@ -501,7 +498,7 @@ func (e *engine) sendMessageToGroup(groupID string, server string, ct []byte, si e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)})) } } else { - // Broadast the token error + // Broadcast the token error e.eventManager.Publish(event.NewEvent(event.SendMessageToGroupError, map[event.Field]string{event.GroupID: groupID, event.GroupServer: server, event.Error: err.Error(), event.Signature: base64.StdEncoding.EncodeToString(sig)})) } } else if numtokens < 5 { @@ -526,7 +523,9 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri } } else if context == event.ContextRequestManifest { for _, message := range e.filesharingSubSystem.RequestManifestParts(eventID) { - e.sendPeerMessage(hostname, message) + if err := e.sendPeerMessage(hostname, message); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()})) + } } } else if context == event.ContextSendManifest { if fileKey, manifest := e.filesharingSubSystem.ReceiveManifestPart(eventID, message); len(manifest) != 0 { @@ -535,7 +534,9 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri } } else if context == event.ContextRequestFile { for _, message := range e.filesharingSubSystem.ProcessChunkRequest(eventID, message) { - e.sendPeerMessage(hostname, message) + if err := e.sendPeerMessage(hostname, message); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()})) + } } } else if context == event.ContextSendFile { fileKey, downloaded, progress, totalChunks, _ := e.filesharingSubSystem.ProcessChunk(eventID, message) @@ -547,7 +548,14 @@ func (e *engine) handlePeerMessage(hostname string, eventID string, context stri e.eventManager.Publish(event.NewEvent(event.FileDownloadProgressUpdate, map[event.Field]string{event.FileKey: fileKey, event.Progress: strconv.Itoa(int(progress)), event.FileSizeInChunks: strconv.Itoa(int(totalChunks))})) } } else { + // Fall through handler for the default text conversation. e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)})) + + // Send an explicit acknowledgement + // Every other protocol should have a explicit acknowledgement message e.g. value lookups have responses, and file handling has an explicit flow + if err := e.sendPeerMessage(hostname, model3.PeerMessage{ID: eventID, Context: event.ContextAck, Data: []byte{}}); err != nil { + e.eventManager.Publish(event.NewEvent(event.SendMessageToPeerError, map[event.Field]string{event.RemotePeer: hostname, event.EventID: eventID, event.Error: err.Error()})) + } } } @@ -578,12 +586,15 @@ func (e *engine) leaveServer(server string) { } } -func (e *engine) sendPeerMessage(handle string, message model3.PeerMessage) { +func (e *engine) sendPeerMessage(handle string, message model3.PeerMessage) error { conn, err := e.service.WaitForCapabilityOrClose(handle, cwtchCapability) if err == nil { peerApp, ok := (conn.App()).(*PeerApp) if ok { peerApp.SendMessage(message) + return nil } } + log.Errorf("could not send peer message: %v", err) + return err } diff --git a/protocol/connections/peerapp.go b/protocol/connections/peerapp.go index 4d68d01..f4448e2 100644 --- a/protocol/connections/peerapp.go +++ b/protocol/connections/peerapp.go @@ -98,9 +98,6 @@ func (pa *PeerApp) listen() { default: if pa.IsAllowed(pa.connection.Hostname()) { pa.MessageHandler(pa.connection.Hostname(), peerMessage.ID, peerMessage.Context, peerMessage.Data) - - // Acknowledge the message - pa.SendMessage(model2.PeerMessage{ID: peerMessage.ID, Context: event.ContextAck, Data: []byte{}}) } } } else { @@ -117,4 +114,4 @@ func (pa *PeerApp) SendMessage(message model2.PeerMessage) { } serialized, _ := json.Marshal(message) pa.connection.Send(serialized) -} \ No newline at end of file +} diff --git a/protocol/files/filesharing_subsystem.go b/protocol/files/filesharing_subsystem.go index 4bac6be..61c46be 100644 --- a/protocol/files/filesharing_subsystem.go +++ b/protocol/files/filesharing_subsystem.go @@ -24,8 +24,6 @@ type FileSharingSubSystem struct { activeDownloads sync.Map // file key to manifests } - - // ShareFile given a file key and a serialized manifest, allow the serialized manifest to be downloaded // by Cwtch profiles in possession of the fileKey func (fsss *FileSharingSubSystem) ShareFile(fileKey string, serializedManifest string) { @@ -79,7 +77,7 @@ func (fsss *FileSharingSubSystem) RequestManifestParts(fileKey string) []model.P manifest := manifestI.(*Manifest) serializedManifest := manifest.Serialize() log.Debugf("found serialized manifest: %s", serializedManifest) - chunkID := 0; + chunkID := 0 for i := 0; i < len(serializedManifest); i += DefaultChunkSize { offset := i end := i + DefaultChunkSize @@ -94,7 +92,7 @@ func (fsss *FileSharingSubSystem) RequestManifestParts(fileKey string) []model.P ID: fmt.Sprintf("%s.%d", fileKey, chunkID), Data: chunk, }) - chunkID+=1 + chunkID++ } } return messages @@ -147,21 +145,21 @@ func (fsss *FileSharingSubSystem) ProcessChunkRequest(fileKey string, serialized if exists { manifest := manifestI.(*Manifest) log.Debugf("manifest found: %x", manifest.RootHash) - chunkSpec, err := Deserialize(string(serializedChunkRequest)) - log.Debugf("deserialized chunk spec found: %v [%s]", chunkSpec, serializedChunkRequest) - if err == nil { - for _, chunk := range *chunkSpec { - contents, err := manifest.GetChunkBytes(chunk) - if err == nil { - log.Debugf("sending chunk: %v %x", chunk, contents) - messages = append(messages, model.PeerMessage{ - ID: fmt.Sprintf("%v.%d", fileKey, chunk), - Context: event.ContextSendFile, - Data: contents, - }) - } + chunkSpec, err := Deserialize(string(serializedChunkRequest)) + log.Debugf("deserialized chunk spec found: %v [%s]", chunkSpec, serializedChunkRequest) + if err == nil { + for _, chunk := range *chunkSpec { + contents, err := manifest.GetChunkBytes(chunk) + if err == nil { + log.Debugf("sending chunk: %v %x", chunk, contents) + messages = append(messages, model.PeerMessage{ + ID: fmt.Sprintf("%v.%d", fileKey, chunk), + Context: event.ContextSendFile, + Data: contents, + }) } } + } } return messages }