fixes to pipe bridge: it base64 encodes data of messages before sending them over to preserve binary data; fixed a lack of wiring for ipcBridge in service

This commit is contained in:
Dan Ballard 2019-07-05 17:46:51 -07:00
parent 03b9b80268
commit 9f52e5de7b
6 changed files with 51 additions and 20 deletions

View File

@ -14,7 +14,7 @@ func (ab *applicationBridge) listen() {
log.Infoln("ab.listen()") log.Infoln("ab.listen()")
for { for {
ipcMessage, ok := ab.bridge.Read() ipcMessage, ok := ab.bridge.Read()
log.Infof("listen() got %v\n", ipcMessage) log.Infof("listen() got %v for %v\n", ipcMessage.Message.EventType, ipcMessage.Dest)
if !ok { if !ok {
return return
} }

View File

@ -81,6 +81,7 @@ func (as *applicationService) createPeer(name, password string) {
func (as *applicationService) loadProfiles(password string) { func (as *applicationService) loadProfiles(password string) {
count := 0 count := 0
as.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) { as.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) {
as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion])
blockedPeers := profile.BlockedPeers() blockedPeers := profile.BlockedPeers()
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], blockedPeers) engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], blockedPeers)

View File

@ -1,6 +1,8 @@
package bridge package bridge
import ( import (
"encoding/base64"
"encoding/binary"
"git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/log"
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
@ -63,14 +65,36 @@ func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, erro
} }
func (pb *pipeBridge) Read() (message event.IPCMessage, ok bool) { func (pb *pipeBridge) Read() (message event.IPCMessage, ok bool) {
dec := json.NewDecoder(pb.in) var n int
size := make([]byte, 2)
n, err := pb.in.Read(size)
if err != nil || n != 2 {
log.Errorf("Could not read len int from stream: %v\n", err)
return message, false
}
err := dec.Decode(&message) n = int(binary.LittleEndian.Uint16(size))
pos := 0
buffer := make([]byte, n)
for n > 0 {
m, err := pb.in.Read(buffer[pos:])
if err != nil {
log.Errorf("Reading into buffer from pipe: %v\n", err)
return message, false
}
n -= m
pos += m
}
err = json.Unmarshal(buffer, &message)
if err != nil { if err != nil {
log.Errorf("Read error: %v", err) log.Errorf("Read error: %v --value: %v", err, message)
return event.IPCMessage{}, false return event.IPCMessage{}, false
} }
for k, v := range message.Message.Data {
val, _ := base64.StdEncoding.DecodeString(v)
message.Message.Data[k] = string(val)
}
return message, true return message, true
} }
@ -78,8 +102,24 @@ func (pb *pipeBridge) Write(message *event.IPCMessage) {
pb.lock.Lock() pb.lock.Lock()
defer pb.lock.Unlock() defer pb.lock.Unlock()
if !pb.closed { if !pb.closed {
messageJSON, _ := json.Marshal(message) encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}}
pb.out.Write(messageJSON) for k, v := range message.Message.Data {
encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v))
}
messageJSON, _ := json.Marshal(encMessage)
size := make([]byte, 2)
binary.LittleEndian.PutUint16(size, uint16(len(messageJSON)))
pb.out.Write(size)
for pos := 0; pos < len(messageJSON); {
n, err := pb.out.Write(messageJSON)
if err != nil {
log.Errorf("Writing out on pipeBridge: %v\n", err)
return
}
pos += n
}
} }
} }

View File

@ -103,9 +103,8 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err
if err == nil { if err == nil {
jsobj, err := proto.Marshal(cpp.GetGroupChatInvite()) jsobj, err := proto.Marshal(cpp.GetGroupChatInvite())
if err == nil { if err == nil {
b64obj := base64.StdEncoding.EncodeToString(jsobj)
cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{ cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{
event.GroupInvite: b64obj, event.GroupInvite: string(jsobj),
})) }))
} else { } else {
log.Errorf("error serializing group: %v", err) log.Errorf("error serializing group: %v", err)
@ -335,12 +334,7 @@ func (cp *cwtchPeer) eventHandler() {
} }
case event.NewGroupInvite: case event.NewGroupInvite:
var groupInvite protocol.GroupChatInvite var groupInvite protocol.GroupChatInvite
json, err := base64.StdEncoding.DecodeString(ev.Data[event.GroupInvite]) err := proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &groupInvite)
if err != nil {
log.Errorf("NewGroupInvite could not base64 decode invite: %v\n", err)
continue
}
err = proto.Unmarshal([]byte(json), &groupInvite)
if err != nil { if err != nil {
log.Errorf("NewGroupInvite could not json decode invite: %v\n", err) log.Errorf("NewGroupInvite could not json decode invite: %v\n", err)
} }

View File

@ -5,7 +5,6 @@ import (
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections/peer" "cwtch.im/cwtch/protocol/connections/peer"
"encoding/base64"
"errors" "errors"
"git.openprivacy.ca/openprivacy/libricochet-go/application" "git.openprivacy.ca/openprivacy/libricochet-go/application"
"git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/channels"
@ -292,8 +291,7 @@ func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String()) log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String())
marshal, err := proto.Marshal(gci) marshal, err := proto.Marshal(gci)
if err == nil { if err == nil {
marshalb64 := base64.StdEncoding.EncodeToString(marshal) cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.GroupInvite: string(marshal)}))
cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.GroupInvite: marshalb64}))
} }
} }

View File

@ -4,7 +4,6 @@ import (
"cwtch.im/cwtch/event" "cwtch.im/cwtch/event"
"cwtch.im/cwtch/model" "cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol"
"encoding/base64"
"encoding/json" "encoding/json"
"git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/log"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -174,8 +173,7 @@ func (ps *profileStore) eventHandler() {
} }
case event.NewGroupInvite: case event.NewGroupInvite:
var gci protocol.GroupChatInvite var gci protocol.GroupChatInvite
json, _ := base64.StdEncoding.DecodeString(ev.Data[event.GroupInvite]) err := proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &gci)
err := proto.Unmarshal([]byte(json), &gci)
if err == nil { if err == nil {
ps.profile.ProcessInvite(&gci, ev.Data[event.RemotePeer]) ps.profile.ProcessInvite(&gci, ev.Data[event.RemotePeer])
ps.save() ps.save()