diff --git a/app/app.go b/app/app.go index 5cfe688..e4099ba 100644 --- a/app/app.go +++ b/app/app.go @@ -20,7 +20,6 @@ import ( type applicationCore struct { eventBuses map[string]event.Manager - acn connectivity.ACN directory string mutex sync.Mutex } @@ -32,6 +31,7 @@ type applicationPeers struct { type application struct { applicationCore applicationPeers + acn connectivity.ACN storage map[string]storage.ProfileStore engines map[string]connections.Engine appBus event.Manager @@ -56,8 +56,8 @@ type Application interface { // LoadProfileFn is the function signature for a function in an app that loads a profile type LoadProfileFn func(profile *model.Profile, store storage.ProfileStore) -func newAppCore(acn connectivity.ACN, appDirectory string) *applicationCore { - appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory, acn: acn} +func newAppCore(appDirectory string) *applicationCore { + appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory} os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700) return appCore } @@ -65,7 +65,7 @@ func newAppCore(acn connectivity.ACN, appDirectory string) *applicationCore { // NewApp creates a new app with some environment awareness and initializes a Tor Manager func NewApp(acn connectivity.ACN, appDirectory string) Application { log.Debugf("NewApp(%v)\n", appDirectory) - app := &application{storage: make(map[string]storage.ProfileStore), applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, engines: make(map[string]connections.Engine), applicationCore: *newAppCore(acn, appDirectory), appBus: event.NewEventManager()} + app := &application{acn: acn, storage: make(map[string]storage.ProfileStore), applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()} return app } @@ -102,7 +102,7 @@ func (app *application) CreatePeer(name string, password string) { pc := app.storage[profile.Onion].GetProfileCopy() peer := peer.FromProfile(pc) - peer.Init(app.acn, app.eventBuses[profile.Onion]) + peer.Init(app.eventBuses[profile.Onion]) blockedPeers := profile.BlockedPeers() // TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key. @@ -153,7 +153,7 @@ func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfi func (app *application) LoadProfiles(password string) { app.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) { peer := peer.FromProfile(profile) - peer.Init(app.acn, app.eventBuses[profile.Onion]) + peer.Init(app.eventBuses[profile.Onion]) blockedPeers := profile.BlockedPeers() identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey) diff --git a/app/appClient.go b/app/appClient.go index da8ad78..5390480 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -5,7 +5,6 @@ import ( "cwtch.im/cwtch/peer" "cwtch.im/cwtch/storage" "fmt" - "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "path" @@ -19,12 +18,13 @@ type applicationClient struct { } // NewAppClient returns an Application that acts as a client to a AppService, connected by the IPCBridge supplied -func NewAppClient(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) Application { - appClient := &applicationClient{applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(acn, appDirectory), bridge: bridge}, appBus: event.NewEventManager()} +func NewAppClient(appDirectory string, bridge event.IPCBridge) Application { + appClient := &applicationClient{applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}, appBus: event.NewEventManager()} appClient.handle = appClient.handleEvent go appClient.listen() + log.Infoln("Created new App Client") return appClient } @@ -63,7 +63,7 @@ func (ac *applicationClient) newPeer(localID, password string) { eventBus := event.NewIPCEventManager(ac.bridge, profile.Onion) peer := peer.FromProfile(profile) - peer.Init(ac.acn, eventBus) + peer.Init(eventBus) ac.mutex.Lock() defer ac.mutex.Unlock() diff --git a/app/appService.go b/app/appService.go index 78a7594..046be5f 100644 --- a/app/appService.go +++ b/app/appService.go @@ -19,6 +19,7 @@ const ( type applicationService struct { applicationBridge + acn connectivity.ACN storage map[string]storage.ProfileStore engines map[string]connections.Engine } @@ -30,14 +31,12 @@ type ApplicationService interface { // NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService { - appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(acn, appDirectory), bridge: bridge}} + appService := &applicationService{acn: acn, storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}} appService.handle = appService.handleEvent - // Set up IPC - - // attach to listener go appService.listen() + log.Infoln("Created new App Service") return appService } diff --git a/app/bots/servermon/main.go b/app/bots/servermon/main.go index 58a47cd..7fb8b01 100644 --- a/app/bots/servermon/main.go +++ b/app/bots/servermon/main.go @@ -1,7 +1,8 @@ package main import ( - "cwtch.im/cwtch/event" + app2 "cwtch.im/cwtch/app" + "cwtch.im/cwtch/app/utils" "cwtch.im/cwtch/peer" "cwtch.im/cwtch/protocol/connections" "errors" @@ -46,8 +47,11 @@ func main() { os.Exit(1) } - botPeer := peer.NewCwtchPeer("servermon") - botPeer.Init(acn, event.NewEventManager()) + app := app2.NewApp(acn, ".") + + app.CreatePeer("servermon", "be gay, do crimes") + + botPeer := utils.WaitGetPeer(app, "servermon") fmt.Printf("Connecting to %v...\n", serverAddr) botPeer.JoinServer(serverAddr) diff --git a/app/peer/alice/alice.go b/app/peer/alice/alice.go index 348ebb2..579a1d2 100644 --- a/app/peer/alice/alice.go +++ b/app/peer/alice/alice.go @@ -1,8 +1,9 @@ package main import ( + app2 "cwtch.im/cwtch/app" + "cwtch.im/cwtch/app/utils" "cwtch.im/cwtch/event" - "cwtch.im/cwtch/peer" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "os" @@ -21,16 +22,14 @@ func main() { os.Exit(1) } - // Setup the Event Bus to Listen for Data Packets - eventBus := event.NewEventManager() + app := app2.NewApp(acn, ".") + app.CreatePeer("alice", "be gay, do crimes") + alice := utils.WaitGetPeer(app, "alice") + app.LaunchPeers() + eventBus := app.GetEventBus(alice.GetProfile().Onion) queue := event.NewEventQueue(100) eventBus.Subscribe(event.NewMessageFromPeer, queue.EventChannel) - // Setup Alice to Listen for new Events - alice := peer.NewCwtchPeer("alice") - alice.Init(acn, eventBus) - alice.Listen() - // For every new Data Packet Alice received she will Print it out. for { event := queue.Next() diff --git a/app/peer/bob/bob.go b/app/peer/bob/bob.go index 7df4aef..a7bbd1f 100644 --- a/app/peer/bob/bob.go +++ b/app/peer/bob/bob.go @@ -1,8 +1,8 @@ package main import ( - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/peer" + app2 "cwtch.im/cwtch/app" + "cwtch.im/cwtch/app/utils" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "os" @@ -21,10 +21,9 @@ func main() { os.Exit(1) } - // Set up the Event Buss and Initialize the Peer - eventBus := event.NewEventManager() - bob := peer.NewCwtchPeer("bob") - bob.Init(acn, eventBus) + app := app2.NewApp(acn, ".") + app.CreatePeer("bob", "be gay, do crimes") + bob := utils.WaitGetPeer(app, "bob") // Add Alice's Onion Here (It changes run to run) bob.PeerWithOnion("upiztu7myymjf2dn4x4czhagp7axlnqjvf5zwfegbhtpkqb6v3vgu5yd") diff --git a/app/utils/utils.go b/app/utils/utils.go new file mode 100644 index 0000000..dd30d3e --- /dev/null +++ b/app/utils/utils.go @@ -0,0 +1,23 @@ +package utils + +import ( + app2 "cwtch.im/cwtch/app" + "cwtch.im/cwtch/peer" + "time" +) + +// WaitGetPeer is a helper function for utility apps not writen using the event bus +// Proper use of an App is to call CreatePeer and then process the NewPeer event +// however for small utility use, this function which polls the app until the peer is created +// may fill that usecase better +func WaitGetPeer(app app2.Application, name string) peer.CwtchPeer { + for true { + for id, n := range app.ListPeers() { + if n == name { + return app.GetPeer(id) + } + } + time.Sleep(100 * time.Millisecond) + } + return nil +} diff --git a/event/bridge/goChanBridge.go b/event/bridge/goChanBridge.go new file mode 100644 index 0000000..7745092 --- /dev/null +++ b/event/bridge/goChanBridge.go @@ -0,0 +1,57 @@ +package bridge + +import ( + "cwtch.im/cwtch/event" + "sync" +) + +type goChanBridge struct { + in chan event.IPCMessage + out chan event.IPCMessage + closedChan chan bool + closed bool + lock sync.Mutex +} + +// MakeGoChanBridge returns a simple testing IPCBridge made from inprocess go channels +func MakeGoChanBridge() (b1, b2 event.IPCBridge) { + chan1 := make(chan event.IPCMessage) + chan2 := make(chan event.IPCMessage) + closed := make(chan bool) + + a := &goChanBridge{in: chan1, out: chan2, closedChan: closed, closed: false} + b := &goChanBridge{in: chan2, out: chan1, closedChan: closed, closed: false} + + go monitor(a, b) + + return a, b +} + +func monitor(a, b *goChanBridge) { + <-a.closedChan + a.closed = true + b.closed = true + a.closedChan <- true +} + +func (pb *goChanBridge) Read() (message event.IPCMessage, ok bool) { + message, ok = <-pb.in + return +} + +func (pb *goChanBridge) Write(message *event.IPCMessage) { + pb.lock.Lock() + defer pb.lock.Unlock() + if !pb.closed { + pb.out <- *message + } +} + +func (pb *goChanBridge) Shutdown() { + if !pb.closed { + close(pb.in) + close(pb.out) + pb.closedChan <- true + <-pb.closedChan + } +} diff --git a/event/bridge/pipeBridge.go b/event/bridge/pipeBridge.go new file mode 100644 index 0000000..5ac7b01 --- /dev/null +++ b/event/bridge/pipeBridge.go @@ -0,0 +1,94 @@ +package bridge + +import ( + "git.openprivacy.ca/openprivacy/libricochet-go/log" + + "cwtch.im/cwtch/event" + "encoding/json" + "os" + "sync" + "syscall" +) + +/* pipeBridge creates a pair of named pipes + Needs a call to new client and service to fully successfully open +*/ + +type pipeBridge struct { + in, out *os.File + closedChan chan bool + closed bool + lock sync.Mutex +} + +// NewPipeBridgeClient returns a pipe backed IPCBridge for a client +func NewPipeBridgeClient(inFilename, outFilename string) (event.IPCBridge, error) { + log.Debugf("Making new PipeBridge Client...\n") + syscall.Mkfifo(inFilename, 0600) + in, err := os.OpenFile(inFilename, os.O_RDONLY, 0600) + if err != nil { + return nil, err + } + + syscall.Mkfifo(outFilename, 0600) + out, err := os.OpenFile(outFilename, os.O_WRONLY, 0600) + if err != nil { + return nil, err + } + + pb := &pipeBridge{in: in, out: out, closedChan: make(chan bool), closed: false} + log.Debugf("Successfully created new PipeBridge Client!\n") + return pb, nil +} + +// NewPipeBridgeService returns a pipe backed IPCBridge for a service +func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, error) { + log.Debugf("Making new PipeBridge Service...\n") + + syscall.Mkfifo(outFilename, 0600) + out, err := os.OpenFile(outFilename, os.O_WRONLY, 0600) + if err != nil { + return nil, err + } + + syscall.Mkfifo(inFilename, 0600) + in, err := os.OpenFile(inFilename, os.O_RDONLY, 0600) + if err != nil { + return nil, err + } + + pb := &pipeBridge{in: in, out: out, closedChan: make(chan bool), closed: false} + log.Debugf("Successfully created new PipeBridge Service!\n") + return pb, nil +} + +func (pb *pipeBridge) Read() (message event.IPCMessage, ok bool) { + dec := json.NewDecoder(pb.in) + + err := dec.Decode(&message) + + if err != nil { + log.Errorf("Read error: %v", err) + return event.IPCMessage{}, false + } + return message, true +} + +func (pb *pipeBridge) Write(message *event.IPCMessage) { + pb.lock.Lock() + defer pb.lock.Unlock() + if !pb.closed { + messageJSON, _ := json.Marshal(message) + pb.out.Write(messageJSON) + } +} + +func (pb *pipeBridge) Shutdown() { + pb.lock.Lock() + defer pb.lock.Unlock() + if !pb.closed { + pb.in.Close() + pb.out.Close() + pb.closed = true + } +} diff --git a/event/bridge/pipeBridge_test.go b/event/bridge/pipeBridge_test.go new file mode 100644 index 0000000..1f42274 --- /dev/null +++ b/event/bridge/pipeBridge_test.go @@ -0,0 +1,67 @@ +package bridge + +import ( + "cwtch.im/cwtch/event" + "testing" +) + +var ( + clientPipe = "./client" + servicePipe = "./service" +) + +func clientHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) { + client, err := NewPipeBridgeClient(in, out) + if err != nil { + t.Errorf("Error opening %v pipe: %v", in, err) + done <- true + return + } + + messageAfter, ok := client.Read() + if !ok { + t.Errorf("Reading from client IPCBridge failed") + done <- true + return + } + + if messageOrig.Dest != messageAfter.Dest { + t.Errorf("Dest's value differs expected: %v actaul: %v", messageOrig.Dest, messageAfter.Dest) + } + + if messageOrig.Message.EventType != messageAfter.Message.EventType { + t.Errorf("EventTypes's value differs expected: %v actaul: %v", messageOrig.Message.EventType, messageAfter.Message.EventType) + } + + if messageOrig.Message.Data[event.Identity] != messageAfter.Message.Data[event.Identity] { + t.Errorf("Data[Identity]'s value differs expected: %v actaul: %v", messageOrig.Message.Data[event.Identity], messageAfter.Message.Data[event.Identity]) + } + + done <- true +} + +func serviceHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) { + service, err := NewPipeBridgeService(in, out) + if err != nil { + t.Errorf("Error opening %v pipe: %v", in, err) + done <- true + return + } + + service.Write(messageOrig) + + done <- true +} + +func TestPipeBridge(t *testing.T) { + + messageOrig := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.NewPeer, event.Identity, "It is I")} + serviceDone := make(chan bool) + clientDone := make(chan bool) + + go clientHelper(t, clientPipe, servicePipe, messageOrig, clientDone) + go serviceHelper(t, servicePipe, clientPipe, messageOrig, serviceDone) + + <-serviceDone + <-clientDone +} diff --git a/event/ipc.go b/event/ipc.go index 9357c05..57244bb 100644 --- a/event/ipc.go +++ b/event/ipc.go @@ -1,71 +1,14 @@ package event -import ( - "git.openprivacy.ca/openprivacy/libricochet-go/log" - "sync" -) - // IPCMessage is a wrapper for a regular eventMessage with a destination (onion|AppDest) so the other side of the bridge can route appropriately type IPCMessage struct { Dest string Message Event } -type pipeBridge struct { - in chan IPCMessage - out chan IPCMessage - closedChan chan bool - closed bool - lock sync.Mutex -} - // IPCBridge is an interface to a IPC construct used to communicate IPCMessages type IPCBridge interface { Read() (IPCMessage, bool) Write(message *IPCMessage) Shutdown() } - -// MakePipeBridge returns a simple testing IPCBridge made from inprocess go channels -func MakePipeBridge() (b1, b2 IPCBridge) { - chan1 := make(chan IPCMessage) - chan2 := make(chan IPCMessage) - closed := make(chan bool) - - a := &pipeBridge{in: chan1, out: chan2, closedChan: closed, closed: false} - b := &pipeBridge{in: chan2, out: chan1, closedChan: closed, closed: false} - - go monitor(a, b) - - return a, b -} - -func monitor(a, b *pipeBridge) { - <-a.closedChan - a.closed = true - b.closed = true - a.closedChan <- true -} - -func (pb *pipeBridge) Read() (message IPCMessage, ok bool) { - message, ok = <-pb.in - return -} - -func (pb *pipeBridge) Write(message *IPCMessage) { - pb.lock.Lock() - defer pb.lock.Unlock() - log.Infof("pb.Write: %v\n", message) - if !pb.closed { - pb.out <- *message - } -} - -func (pb *pipeBridge) Shutdown() { - if !pb.closed { - close(pb.in) - close(pb.out) - pb.closedChan <- true - <-pb.closedChan - } -} diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 8a83dc3..9fdae3f 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -9,7 +9,6 @@ import ( "encoding/base64" "encoding/json" "errors" - "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "github.com/golang/protobuf/proto" "strings" @@ -31,7 +30,7 @@ type cwtchPeer struct { // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to // directly implement a cwtchPeer. type CwtchPeer interface { - Init(connectivity.ACN, event.Manager) + Init(event.Manager) PeerWithOnion(string) *connections.PeerPeerConnection InviteOnionToGroup(string, string) error SendMessageToPeer(string, string) string @@ -83,7 +82,7 @@ func FromProfile(profile *model.Profile) CwtchPeer { } // Init instantiates a cwtchPeer -func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus event.Manager) { +func (cp *cwtchPeer) Init(eventBus event.Manager) { cp.queue = event.NewEventQueue(100) go cp.eventHandler() @@ -104,8 +103,9 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err if err == nil { jsobj, err := proto.Marshal(cpp.GetGroupChatInvite()) if err == nil { + b64obj := base64.StdEncoding.EncodeToString(jsobj) cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{ - event.GroupInvite: string(jsobj), + event.GroupInvite: b64obj, })) } else { log.Errorf("error serializing group: %v", err) @@ -305,6 +305,7 @@ func (cp *cwtchPeer) StartGroupConnections() { // Only send a join server packet if we haven't joined this server yet... group := cp.GetGroup(groupID) if joined := joinedServers[groupID]; group.Accepted && !joined { + log.Infof("Join Server %v (%v)\n", group.GroupServer, joined) cp.JoinServer(group.GroupServer) joinedServers[group.GroupServer] = true } @@ -334,7 +335,15 @@ func (cp *cwtchPeer) eventHandler() { } case event.NewGroupInvite: var groupInvite protocol.GroupChatInvite - proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &groupInvite) + json, err := base64.StdEncoding.DecodeString(ev.Data[event.GroupInvite]) + if err != nil { + log.Errorf("NewGroupInvite could not base64 decode invite: %v\n", err) + continue + } + err = proto.Unmarshal([]byte(json), &groupInvite) + if err != nil { + log.Errorf("NewGroupInvite could not json decode invite: %v\n", err) + } cp.Profile.ProcessInvite(&groupInvite, ev.Data[event.RemotePeer]) case event.PeerStateChange: if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists { diff --git a/protocol/connections/connectionsmanager.go b/protocol/connections/connectionsmanager.go index bb7730b..4856982 100644 --- a/protocol/connections/connectionsmanager.go +++ b/protocol/connections/connectionsmanager.go @@ -3,6 +3,8 @@ package connections import ( "cwtch.im/cwtch/protocol" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "sync" "time" ) @@ -42,11 +44,21 @@ func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConn } // ManageServerConnection creates a new ServerConnection for Host with the given callback handler. +// If there is an establish connection, it is replaced with a new one, assuming this came from +// a new JoinServer from a new Group being joined. If it is still connecting to a server, the second request will be abandonded func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) { m.lock.Lock() + defer m.lock.Unlock() psc, exists := m.serverConnections[host] + if exists { + if psc.GetState() == DISCONNECTED || psc.GetState() == CONNECTING || psc.GetState() == CONNECTED { + log.Infof("Already connecting to %v, abandoning fresh attempt\n", host) + return + } + } + newPsc := NewPeerServerConnection(engine, host) newPsc.GroupMessageHandler = messageHandler newPsc.CloseHandler = closedHandler @@ -54,10 +66,9 @@ func (m *Manager) ManageServerConnection(host string, engine Engine, messageHand m.serverConnections[host] = newPsc if exists { + log.Infof("Closing connection to %v, replacing with this one\n", host) psc.Close() } - - m.lock.Unlock() } // GetPeerPeerConnectionForOnion safely returns a given peer connection diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 02695c4..b666183 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -5,6 +5,7 @@ import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol/connections/peer" + "encoding/base64" "errors" "git.openprivacy.ca/openprivacy/libricochet-go/application" "git.openprivacy.ca/openprivacy/libricochet-go/channels" @@ -241,6 +242,7 @@ func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) { // finishedFetch is a callback function the processes the termination of a fetch channel from a given server func (e *engine) finishedFetch(server string) { + log.Debugf("Finished Fetch for %v\n", server) e.eventManager.Publish(event.NewEvent(event.FinishedFetch, map[event.Field]string{event.GroupServer: server})) } @@ -290,7 +292,8 @@ func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) { log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String()) marshal, err := proto.Marshal(gci) if err == nil { - cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.GroupInvite: string(marshal)})) + marshalb64 := base64.StdEncoding.EncodeToString(marshal) + cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.GroupInvite: marshalb64})) } } diff --git a/protocol/connections/peerserverconnection.go b/protocol/connections/peerserverconnection.go index 6325820..e6bad78 100644 --- a/protocol/connections/peerserverconnection.go +++ b/protocol/connections/peerserverconnection.go @@ -34,6 +34,7 @@ func NewPeerServerConnection(engine Engine, serverhostname string) *PeerServerCo psc := new(PeerServerConnection) psc.protocolEngine = engine psc.Server = serverhostname + psc.setState(DISCONNECTED) psc.Init() return psc } @@ -44,6 +45,7 @@ func (psc *PeerServerConnection) GetState() ConnectionState { } func (psc *PeerServerConnection) setState(state ConnectionState) { + log.Debugf("Setting State to %v for %v\n", ConnectionStateName[state], psc.Server) psc.state = state psc.protocolEngine.EventManager().Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ event.GroupServer: string(psc.Server), @@ -64,14 +66,22 @@ func (psc *PeerServerConnection) WaitTilAuthenticated() { // Run manages the setup and teardown of a peer server connection func (psc *PeerServerConnection) Run() error { log.Infof("Connecting to %v", psc.Server) + psc.setState(CONNECTING) + rc, err := goricochet.Open(psc.protocolEngine.ACN(), psc.Server) if err == nil { psc.connection = rc + if psc.GetState() == KILLED { + return nil + } psc.setState(CONNECTED) pub, priv, err := ed25519.GenerateKey(rand.Reader) if err == nil { _, err := connection.HandleOutboundConnection(psc.connection).ProcessAuthAsV3Client(identity.InitializeV3("cwtchpeer", &priv, &pub)) if err == nil { + if psc.GetState() == KILLED { + return nil + } psc.setState(AUTHENTICATED) go func() { diff --git a/storage/profile_store.go b/storage/profile_store.go index 7d9fe05..bdd9084 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -4,6 +4,7 @@ import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/protocol" + "encoding/base64" "encoding/json" "git.openprivacy.ca/openprivacy/libricochet-go/log" "github.com/golang/protobuf/proto" @@ -173,7 +174,8 @@ func (ps *profileStore) eventHandler() { } case event.NewGroupInvite: var gci protocol.GroupChatInvite - err := proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &gci) + json, _ := base64.StdEncoding.DecodeString(ev.Data[event.GroupInvite]) + err := proto.Unmarshal([]byte(json), &gci) if err == nil { ps.profile.ProcessInvite(&gci, ev.Data[event.RemotePeer]) ps.save() diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 2e9c841..fcdc246 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -2,7 +2,8 @@ package testing import ( app2 "cwtch.im/cwtch/app" - "cwtch.im/cwtch/event" + "cwtch.im/cwtch/app/utils" + "cwtch.im/cwtch/event/bridge" "cwtch.im/cwtch/model" "cwtch.im/cwtch/peer" "cwtch.im/cwtch/protocol/connections" @@ -101,23 +102,14 @@ func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.Cw return } -func waitGetPeer(app app2.Application, name string) peer.CwtchPeer { - for true { - for id, n := range app.ListPeers() { - if n == name { - return app.GetPeer(id) - } - } - time.Sleep(100 * time.Millisecond) - } - return nil -} - func TestCwtchPeerIntegration(t *testing.T) { numGoRoutinesStart := runtime.NumGoroutine() log.AddEverythingFromPattern("connectivity") - //log.SetLevel(log.LevelDebug) + log.SetLevel(log.LevelDebug) + log.ExcludeFromPattern("connection/connection") + log.ExcludeFromPattern("outbound/3dhauthchannel") + log.ExcludeFromPattern("event/eventmanager") acn, err := connectivity.StartTor(".", "") if err != nil { t.Fatalf("Could not start Tor: %v", err) @@ -150,8 +142,8 @@ func TestCwtchPeerIntegration(t *testing.T) { app := app2.NewApp(acn, "./storage") - bridge1, bridge2 := event.MakePipeBridge() - appClient := app2.NewAppClient(acn, "./storage", bridge1) + bridge1, bridge2 := bridge.MakeGoChanBridge() + appClient := app2.NewAppClient("./storage", bridge1) appService := app2.NewAppService(acn, "./storage", bridge2) numGoRoutinesPostAppStart := runtime.NumGoroutine() @@ -167,13 +159,13 @@ func TestCwtchPeerIntegration(t *testing.T) { fmt.Println("Creating Carol...") appClient.CreatePeer("carol", "asdfasdf") - alice := waitGetPeer(app, "alice") + alice := utils.WaitGetPeer(app, "alice") fmt.Println("Alice created:", alice.GetProfile().Onion) - bob := waitGetPeer(app, "bob") + bob := utils.WaitGetPeer(app, "bob") fmt.Println("Bob created:", bob.GetProfile().Onion) - carol := waitGetPeer(appClient, "carol") + carol := utils.WaitGetPeer(appClient, "carol") fmt.Println("Carol created:", carol.GetProfile().Onion) //fmt.Println("Carol created:", carol.GetProfile().Onion)