diff --git a/app/appBridge.go b/app/appBridge.go index 07bcd43..4ba05b1 100644 --- a/app/appBridge.go +++ b/app/appBridge.go @@ -14,7 +14,7 @@ func (ab *applicationBridge) listen() { log.Infoln("ab.listen()") for { ipcMessage, ok := ab.bridge.Read() - log.Infof("listen() got %v\n", ipcMessage) + log.Infof("listen() got %v for %v\n", ipcMessage.Message.EventType, ipcMessage.Dest) if !ok { return } diff --git a/app/appService.go b/app/appService.go index 046be5f..e6be603 100644 --- a/app/appService.go +++ b/app/appService.go @@ -81,6 +81,7 @@ func (as *applicationService) createPeer(name, password string) { func (as *applicationService) loadProfiles(password string) { count := 0 as.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) { + as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion]) blockedPeers := profile.BlockedPeers() identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], blockedPeers) diff --git a/event/bridge/pipeBridge.go b/event/bridge/pipeBridge.go index 5ac7b01..63c3ea9 100644 --- a/event/bridge/pipeBridge.go +++ b/event/bridge/pipeBridge.go @@ -1,6 +1,8 @@ package bridge import ( + "encoding/base64" + "encoding/binary" "git.openprivacy.ca/openprivacy/libricochet-go/log" "cwtch.im/cwtch/event" @@ -63,14 +65,36 @@ func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, erro } func (pb *pipeBridge) Read() (message event.IPCMessage, ok bool) { - dec := json.NewDecoder(pb.in) + var n int + size := make([]byte, 2) + n, err := pb.in.Read(size) + if err != nil || n != 2 { + log.Errorf("Could not read len int from stream: %v\n", err) + return message, false + } - err := dec.Decode(&message) + n = int(binary.LittleEndian.Uint16(size)) + pos := 0 + buffer := make([]byte, n) + for n > 0 { + m, err := pb.in.Read(buffer[pos:]) + if err != nil { + log.Errorf("Reading into buffer from pipe: %v\n", err) + return message, false + } + n -= m + pos += m + } + err = json.Unmarshal(buffer, &message) if err != nil { - log.Errorf("Read error: %v", err) + log.Errorf("Read error: %v --value: %v", err, message) return event.IPCMessage{}, false } + for k, v := range message.Message.Data { + val, _ := base64.StdEncoding.DecodeString(v) + message.Message.Data[k] = string(val) + } return message, true } @@ -78,8 +102,24 @@ func (pb *pipeBridge) Write(message *event.IPCMessage) { pb.lock.Lock() defer pb.lock.Unlock() if !pb.closed { - messageJSON, _ := json.Marshal(message) - pb.out.Write(messageJSON) + encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}} + for k, v := range message.Message.Data { + encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v)) + } + + messageJSON, _ := json.Marshal(encMessage) + size := make([]byte, 2) + binary.LittleEndian.PutUint16(size, uint16(len(messageJSON))) + pb.out.Write(size) + + for pos := 0; pos < len(messageJSON); { + n, err := pb.out.Write(messageJSON) + if err != nil { + log.Errorf("Writing out on pipeBridge: %v\n", err) + return + } + pos += n + } } } diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 9fdae3f..d8c2c2b 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -103,9 +103,8 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err if err == nil { jsobj, err := proto.Marshal(cpp.GetGroupChatInvite()) if err == nil { - b64obj := base64.StdEncoding.EncodeToString(jsobj) cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{ - event.GroupInvite: b64obj, + event.GroupInvite: string(jsobj), })) } else { log.Errorf("error serializing group: %v", err) @@ -335,12 +334,7 @@ func (cp *cwtchPeer) eventHandler() { } case event.NewGroupInvite: var groupInvite protocol.GroupChatInvite - json, err := base64.StdEncoding.DecodeString(ev.Data[event.GroupInvite]) - if err != nil { - log.Errorf("NewGroupInvite could not base64 decode invite: %v\n", err) - continue - } - err = proto.Unmarshal([]byte(json), &groupInvite) + err := proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &groupInvite) if err != nil { log.Errorf("NewGroupInvite could not json decode invite: %v\n", err) } diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index b666183..2cb9959 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -5,7 +5,6 @@ import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol/connections/peer" - "encoding/base64" "errors" "git.openprivacy.ca/openprivacy/libricochet-go/application" "git.openprivacy.ca/openprivacy/libricochet-go/channels" @@ -292,8 +291,7 @@ func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) { log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String()) marshal, err := proto.Marshal(gci) if err == nil { - marshalb64 := base64.StdEncoding.EncodeToString(marshal) - cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.GroupInvite: marshalb64})) + cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.GroupInvite: string(marshal)})) } } diff --git a/storage/profile_store.go b/storage/profile_store.go index bdd9084..7d9fe05 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -4,7 +4,6 @@ import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/protocol" - "encoding/base64" "encoding/json" "git.openprivacy.ca/openprivacy/libricochet-go/log" "github.com/golang/protobuf/proto" @@ -174,8 +173,7 @@ func (ps *profileStore) eventHandler() { } case event.NewGroupInvite: var gci protocol.GroupChatInvite - json, _ := base64.StdEncoding.DecodeString(ev.Data[event.GroupInvite]) - err := proto.Unmarshal([]byte(json), &gci) + err := proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &gci) if err == nil { ps.profile.ProcessInvite(&gci, ev.Data[event.RemotePeer]) ps.save()