adding named pipe IPC pipe for use with app client/service; some adjustments to app client/service based on usage by UI; bug fixes: groupInvite json over ipc pipe using json was bugged, 'fixed' with base64 encoding; fixed race condition with peer server connection creation
This commit is contained in:
parent
44993d00fd
commit
e1d6dd7253
12
app/app.go
12
app/app.go
|
@ -20,7 +20,6 @@ import (
|
|||
type applicationCore struct {
|
||||
eventBuses map[string]event.Manager
|
||||
|
||||
acn connectivity.ACN
|
||||
directory string
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
@ -32,6 +31,7 @@ type applicationPeers struct {
|
|||
type application struct {
|
||||
applicationCore
|
||||
applicationPeers
|
||||
acn connectivity.ACN
|
||||
storage map[string]storage.ProfileStore
|
||||
engines map[string]connections.Engine
|
||||
appBus event.Manager
|
||||
|
@ -56,8 +56,8 @@ type Application interface {
|
|||
// LoadProfileFn is the function signature for a function in an app that loads a profile
|
||||
type LoadProfileFn func(profile *model.Profile, store storage.ProfileStore)
|
||||
|
||||
func newAppCore(acn connectivity.ACN, appDirectory string) *applicationCore {
|
||||
appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory, acn: acn}
|
||||
func newAppCore(appDirectory string) *applicationCore {
|
||||
appCore := &applicationCore{eventBuses: make(map[string]event.Manager), directory: appDirectory}
|
||||
os.MkdirAll(path.Join(appCore.directory, "profiles"), 0700)
|
||||
return appCore
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ func newAppCore(acn connectivity.ACN, appDirectory string) *applicationCore {
|
|||
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
|
||||
func NewApp(acn connectivity.ACN, appDirectory string) Application {
|
||||
log.Debugf("NewApp(%v)\n", appDirectory)
|
||||
app := &application{storage: make(map[string]storage.ProfileStore), applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, engines: make(map[string]connections.Engine), applicationCore: *newAppCore(acn, appDirectory), appBus: event.NewEventManager()}
|
||||
app := &application{acn: acn, storage: make(map[string]storage.ProfileStore), applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, engines: make(map[string]connections.Engine), applicationCore: *newAppCore(appDirectory), appBus: event.NewEventManager()}
|
||||
return app
|
||||
}
|
||||
|
||||
|
@ -102,7 +102,7 @@ func (app *application) CreatePeer(name string, password string) {
|
|||
|
||||
pc := app.storage[profile.Onion].GetProfileCopy()
|
||||
peer := peer.FromProfile(pc)
|
||||
peer.Init(app.acn, app.eventBuses[profile.Onion])
|
||||
peer.Init(app.eventBuses[profile.Onion])
|
||||
|
||||
blockedPeers := profile.BlockedPeers()
|
||||
// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
|
||||
|
@ -153,7 +153,7 @@ func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfi
|
|||
func (app *application) LoadProfiles(password string) {
|
||||
app.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) {
|
||||
peer := peer.FromProfile(profile)
|
||||
peer.Init(app.acn, app.eventBuses[profile.Onion])
|
||||
peer.Init(app.eventBuses[profile.Onion])
|
||||
|
||||
blockedPeers := profile.BlockedPeers()
|
||||
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/storage"
|
||||
"fmt"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
|
||||
"path"
|
||||
|
@ -19,12 +18,13 @@ type applicationClient struct {
|
|||
}
|
||||
|
||||
// NewAppClient returns an Application that acts as a client to a AppService, connected by the IPCBridge supplied
|
||||
func NewAppClient(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) Application {
|
||||
appClient := &applicationClient{applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(acn, appDirectory), bridge: bridge}, appBus: event.NewEventManager()}
|
||||
func NewAppClient(appDirectory string, bridge event.IPCBridge) Application {
|
||||
appClient := &applicationClient{applicationPeers: applicationPeers{peers: make(map[string]peer.CwtchPeer)}, applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}, appBus: event.NewEventManager()}
|
||||
appClient.handle = appClient.handleEvent
|
||||
|
||||
go appClient.listen()
|
||||
|
||||
log.Infoln("Created new App Client")
|
||||
return appClient
|
||||
}
|
||||
|
||||
|
@ -63,7 +63,7 @@ func (ac *applicationClient) newPeer(localID, password string) {
|
|||
|
||||
eventBus := event.NewIPCEventManager(ac.bridge, profile.Onion)
|
||||
peer := peer.FromProfile(profile)
|
||||
peer.Init(ac.acn, eventBus)
|
||||
peer.Init(eventBus)
|
||||
|
||||
ac.mutex.Lock()
|
||||
defer ac.mutex.Unlock()
|
||||
|
|
|
@ -19,6 +19,7 @@ const (
|
|||
type applicationService struct {
|
||||
applicationBridge
|
||||
|
||||
acn connectivity.ACN
|
||||
storage map[string]storage.ProfileStore
|
||||
engines map[string]connections.Engine
|
||||
}
|
||||
|
@ -30,14 +31,12 @@ type ApplicationService interface {
|
|||
|
||||
// NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge
|
||||
func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService {
|
||||
appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(acn, appDirectory), bridge: bridge}}
|
||||
appService := &applicationService{acn: acn, storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}}
|
||||
appService.handle = appService.handleEvent
|
||||
|
||||
// Set up IPC
|
||||
|
||||
// attach to listener
|
||||
go appService.listen()
|
||||
|
||||
log.Infoln("Created new App Service")
|
||||
return appService
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
app2 "cwtch.im/cwtch/app"
|
||||
"cwtch.im/cwtch/app/utils"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
"errors"
|
||||
|
@ -46,8 +47,11 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
botPeer := peer.NewCwtchPeer("servermon")
|
||||
botPeer.Init(acn, event.NewEventManager())
|
||||
app := app2.NewApp(acn, ".")
|
||||
|
||||
app.CreatePeer("servermon", "be gay, do crimes")
|
||||
|
||||
botPeer := utils.WaitGetPeer(app, "servermon")
|
||||
|
||||
fmt.Printf("Connecting to %v...\n", serverAddr)
|
||||
botPeer.JoinServer(serverAddr)
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
app2 "cwtch.im/cwtch/app"
|
||||
"cwtch.im/cwtch/app/utils"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"os"
|
||||
|
@ -21,16 +22,14 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Setup the Event Bus to Listen for Data Packets
|
||||
eventBus := event.NewEventManager()
|
||||
app := app2.NewApp(acn, ".")
|
||||
app.CreatePeer("alice", "be gay, do crimes")
|
||||
alice := utils.WaitGetPeer(app, "alice")
|
||||
app.LaunchPeers()
|
||||
eventBus := app.GetEventBus(alice.GetProfile().Onion)
|
||||
queue := event.NewEventQueue(100)
|
||||
eventBus.Subscribe(event.NewMessageFromPeer, queue.EventChannel)
|
||||
|
||||
// Setup Alice to Listen for new Events
|
||||
alice := peer.NewCwtchPeer("alice")
|
||||
alice.Init(acn, eventBus)
|
||||
alice.Listen()
|
||||
|
||||
// For every new Data Packet Alice received she will Print it out.
|
||||
for {
|
||||
event := queue.Next()
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/peer"
|
||||
app2 "cwtch.im/cwtch/app"
|
||||
"cwtch.im/cwtch/app/utils"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"os"
|
||||
|
@ -21,10 +21,9 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Set up the Event Buss and Initialize the Peer
|
||||
eventBus := event.NewEventManager()
|
||||
bob := peer.NewCwtchPeer("bob")
|
||||
bob.Init(acn, eventBus)
|
||||
app := app2.NewApp(acn, ".")
|
||||
app.CreatePeer("bob", "be gay, do crimes")
|
||||
bob := utils.WaitGetPeer(app, "bob")
|
||||
|
||||
// Add Alice's Onion Here (It changes run to run)
|
||||
bob.PeerWithOnion("upiztu7myymjf2dn4x4czhagp7axlnqjvf5zwfegbhtpkqb6v3vgu5yd")
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
app2 "cwtch.im/cwtch/app"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"time"
|
||||
)
|
||||
|
||||
// WaitGetPeer is a helper function for utility apps not writen using the event bus
|
||||
// Proper use of an App is to call CreatePeer and then process the NewPeer event
|
||||
// however for small utility use, this function which polls the app until the peer is created
|
||||
// may fill that usecase better
|
||||
func WaitGetPeer(app app2.Application, name string) peer.CwtchPeer {
|
||||
for true {
|
||||
for id, n := range app.ListPeers() {
|
||||
if n == name {
|
||||
return app.GetPeer(id)
|
||||
}
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
package bridge
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type goChanBridge struct {
|
||||
in chan event.IPCMessage
|
||||
out chan event.IPCMessage
|
||||
closedChan chan bool
|
||||
closed bool
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// MakeGoChanBridge returns a simple testing IPCBridge made from inprocess go channels
|
||||
func MakeGoChanBridge() (b1, b2 event.IPCBridge) {
|
||||
chan1 := make(chan event.IPCMessage)
|
||||
chan2 := make(chan event.IPCMessage)
|
||||
closed := make(chan bool)
|
||||
|
||||
a := &goChanBridge{in: chan1, out: chan2, closedChan: closed, closed: false}
|
||||
b := &goChanBridge{in: chan2, out: chan1, closedChan: closed, closed: false}
|
||||
|
||||
go monitor(a, b)
|
||||
|
||||
return a, b
|
||||
}
|
||||
|
||||
func monitor(a, b *goChanBridge) {
|
||||
<-a.closedChan
|
||||
a.closed = true
|
||||
b.closed = true
|
||||
a.closedChan <- true
|
||||
}
|
||||
|
||||
func (pb *goChanBridge) Read() (message event.IPCMessage, ok bool) {
|
||||
message, ok = <-pb.in
|
||||
return
|
||||
}
|
||||
|
||||
func (pb *goChanBridge) Write(message *event.IPCMessage) {
|
||||
pb.lock.Lock()
|
||||
defer pb.lock.Unlock()
|
||||
if !pb.closed {
|
||||
pb.out <- *message
|
||||
}
|
||||
}
|
||||
|
||||
func (pb *goChanBridge) Shutdown() {
|
||||
if !pb.closed {
|
||||
close(pb.in)
|
||||
close(pb.out)
|
||||
pb.closedChan <- true
|
||||
<-pb.closedChan
|
||||
}
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
package bridge
|
||||
|
||||
import (
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
|
||||
"cwtch.im/cwtch/event"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"sync"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
/* pipeBridge creates a pair of named pipes
|
||||
Needs a call to new client and service to fully successfully open
|
||||
*/
|
||||
|
||||
type pipeBridge struct {
|
||||
in, out *os.File
|
||||
closedChan chan bool
|
||||
closed bool
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// NewPipeBridgeClient returns a pipe backed IPCBridge for a client
|
||||
func NewPipeBridgeClient(inFilename, outFilename string) (event.IPCBridge, error) {
|
||||
log.Debugf("Making new PipeBridge Client...\n")
|
||||
syscall.Mkfifo(inFilename, 0600)
|
||||
in, err := os.OpenFile(inFilename, os.O_RDONLY, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
syscall.Mkfifo(outFilename, 0600)
|
||||
out, err := os.OpenFile(outFilename, os.O_WRONLY, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pb := &pipeBridge{in: in, out: out, closedChan: make(chan bool), closed: false}
|
||||
log.Debugf("Successfully created new PipeBridge Client!\n")
|
||||
return pb, nil
|
||||
}
|
||||
|
||||
// NewPipeBridgeService returns a pipe backed IPCBridge for a service
|
||||
func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, error) {
|
||||
log.Debugf("Making new PipeBridge Service...\n")
|
||||
|
||||
syscall.Mkfifo(outFilename, 0600)
|
||||
out, err := os.OpenFile(outFilename, os.O_WRONLY, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
syscall.Mkfifo(inFilename, 0600)
|
||||
in, err := os.OpenFile(inFilename, os.O_RDONLY, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pb := &pipeBridge{in: in, out: out, closedChan: make(chan bool), closed: false}
|
||||
log.Debugf("Successfully created new PipeBridge Service!\n")
|
||||
return pb, nil
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) Read() (message event.IPCMessage, ok bool) {
|
||||
dec := json.NewDecoder(pb.in)
|
||||
|
||||
err := dec.Decode(&message)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Read error: %v", err)
|
||||
return event.IPCMessage{}, false
|
||||
}
|
||||
return message, true
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) Write(message *event.IPCMessage) {
|
||||
pb.lock.Lock()
|
||||
defer pb.lock.Unlock()
|
||||
if !pb.closed {
|
||||
messageJSON, _ := json.Marshal(message)
|
||||
pb.out.Write(messageJSON)
|
||||
}
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) Shutdown() {
|
||||
pb.lock.Lock()
|
||||
defer pb.lock.Unlock()
|
||||
if !pb.closed {
|
||||
pb.in.Close()
|
||||
pb.out.Close()
|
||||
pb.closed = true
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
package bridge
|
||||
|
||||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var (
|
||||
clientPipe = "./client"
|
||||
servicePipe = "./service"
|
||||
)
|
||||
|
||||
func clientHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) {
|
||||
client, err := NewPipeBridgeClient(in, out)
|
||||
if err != nil {
|
||||
t.Errorf("Error opening %v pipe: %v", in, err)
|
||||
done <- true
|
||||
return
|
||||
}
|
||||
|
||||
messageAfter, ok := client.Read()
|
||||
if !ok {
|
||||
t.Errorf("Reading from client IPCBridge failed")
|
||||
done <- true
|
||||
return
|
||||
}
|
||||
|
||||
if messageOrig.Dest != messageAfter.Dest {
|
||||
t.Errorf("Dest's value differs expected: %v actaul: %v", messageOrig.Dest, messageAfter.Dest)
|
||||
}
|
||||
|
||||
if messageOrig.Message.EventType != messageAfter.Message.EventType {
|
||||
t.Errorf("EventTypes's value differs expected: %v actaul: %v", messageOrig.Message.EventType, messageAfter.Message.EventType)
|
||||
}
|
||||
|
||||
if messageOrig.Message.Data[event.Identity] != messageAfter.Message.Data[event.Identity] {
|
||||
t.Errorf("Data[Identity]'s value differs expected: %v actaul: %v", messageOrig.Message.Data[event.Identity], messageAfter.Message.Data[event.Identity])
|
||||
}
|
||||
|
||||
done <- true
|
||||
}
|
||||
|
||||
func serviceHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) {
|
||||
service, err := NewPipeBridgeService(in, out)
|
||||
if err != nil {
|
||||
t.Errorf("Error opening %v pipe: %v", in, err)
|
||||
done <- true
|
||||
return
|
||||
}
|
||||
|
||||
service.Write(messageOrig)
|
||||
|
||||
done <- true
|
||||
}
|
||||
|
||||
func TestPipeBridge(t *testing.T) {
|
||||
|
||||
messageOrig := &event.IPCMessage{Dest: "ABC", Message: event.NewEventList(event.NewPeer, event.Identity, "It is I")}
|
||||
serviceDone := make(chan bool)
|
||||
clientDone := make(chan bool)
|
||||
|
||||
go clientHelper(t, clientPipe, servicePipe, messageOrig, clientDone)
|
||||
go serviceHelper(t, servicePipe, clientPipe, messageOrig, serviceDone)
|
||||
|
||||
<-serviceDone
|
||||
<-clientDone
|
||||
}
|
57
event/ipc.go
57
event/ipc.go
|
@ -1,71 +1,14 @@
|
|||
package event
|
||||
|
||||
import (
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// IPCMessage is a wrapper for a regular eventMessage with a destination (onion|AppDest) so the other side of the bridge can route appropriately
|
||||
type IPCMessage struct {
|
||||
Dest string
|
||||
Message Event
|
||||
}
|
||||
|
||||
type pipeBridge struct {
|
||||
in chan IPCMessage
|
||||
out chan IPCMessage
|
||||
closedChan chan bool
|
||||
closed bool
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// IPCBridge is an interface to a IPC construct used to communicate IPCMessages
|
||||
type IPCBridge interface {
|
||||
Read() (IPCMessage, bool)
|
||||
Write(message *IPCMessage)
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
// MakePipeBridge returns a simple testing IPCBridge made from inprocess go channels
|
||||
func MakePipeBridge() (b1, b2 IPCBridge) {
|
||||
chan1 := make(chan IPCMessage)
|
||||
chan2 := make(chan IPCMessage)
|
||||
closed := make(chan bool)
|
||||
|
||||
a := &pipeBridge{in: chan1, out: chan2, closedChan: closed, closed: false}
|
||||
b := &pipeBridge{in: chan2, out: chan1, closedChan: closed, closed: false}
|
||||
|
||||
go monitor(a, b)
|
||||
|
||||
return a, b
|
||||
}
|
||||
|
||||
func monitor(a, b *pipeBridge) {
|
||||
<-a.closedChan
|
||||
a.closed = true
|
||||
b.closed = true
|
||||
a.closedChan <- true
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) Read() (message IPCMessage, ok bool) {
|
||||
message, ok = <-pb.in
|
||||
return
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) Write(message *IPCMessage) {
|
||||
pb.lock.Lock()
|
||||
defer pb.lock.Unlock()
|
||||
log.Infof("pb.Write: %v\n", message)
|
||||
if !pb.closed {
|
||||
pb.out <- *message
|
||||
}
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) Shutdown() {
|
||||
if !pb.closed {
|
||||
close(pb.in)
|
||||
close(pb.out)
|
||||
pb.closedChan <- true
|
||||
<-pb.closedChan
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"strings"
|
||||
|
@ -31,7 +30,7 @@ type cwtchPeer struct {
|
|||
// CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
|
||||
// directly implement a cwtchPeer.
|
||||
type CwtchPeer interface {
|
||||
Init(connectivity.ACN, event.Manager)
|
||||
Init(event.Manager)
|
||||
PeerWithOnion(string) *connections.PeerPeerConnection
|
||||
InviteOnionToGroup(string, string) error
|
||||
SendMessageToPeer(string, string) string
|
||||
|
@ -83,7 +82,7 @@ func FromProfile(profile *model.Profile) CwtchPeer {
|
|||
}
|
||||
|
||||
// Init instantiates a cwtchPeer
|
||||
func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus event.Manager) {
|
||||
func (cp *cwtchPeer) Init(eventBus event.Manager) {
|
||||
cp.queue = event.NewEventQueue(100)
|
||||
go cp.eventHandler()
|
||||
|
||||
|
@ -104,8 +103,9 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err
|
|||
if err == nil {
|
||||
jsobj, err := proto.Marshal(cpp.GetGroupChatInvite())
|
||||
if err == nil {
|
||||
b64obj := base64.StdEncoding.EncodeToString(jsobj)
|
||||
cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{
|
||||
event.GroupInvite: string(jsobj),
|
||||
event.GroupInvite: b64obj,
|
||||
}))
|
||||
} else {
|
||||
log.Errorf("error serializing group: %v", err)
|
||||
|
@ -305,6 +305,7 @@ func (cp *cwtchPeer) StartGroupConnections() {
|
|||
// Only send a join server packet if we haven't joined this server yet...
|
||||
group := cp.GetGroup(groupID)
|
||||
if joined := joinedServers[groupID]; group.Accepted && !joined {
|
||||
log.Infof("Join Server %v (%v)\n", group.GroupServer, joined)
|
||||
cp.JoinServer(group.GroupServer)
|
||||
joinedServers[group.GroupServer] = true
|
||||
}
|
||||
|
@ -334,7 +335,15 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
}
|
||||
case event.NewGroupInvite:
|
||||
var groupInvite protocol.GroupChatInvite
|
||||
proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &groupInvite)
|
||||
json, err := base64.StdEncoding.DecodeString(ev.Data[event.GroupInvite])
|
||||
if err != nil {
|
||||
log.Errorf("NewGroupInvite could not base64 decode invite: %v\n", err)
|
||||
continue
|
||||
}
|
||||
err = proto.Unmarshal([]byte(json), &groupInvite)
|
||||
if err != nil {
|
||||
log.Errorf("NewGroupInvite could not json decode invite: %v\n", err)
|
||||
}
|
||||
cp.Profile.ProcessInvite(&groupInvite, ev.Data[event.RemotePeer])
|
||||
case event.PeerStateChange:
|
||||
if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists {
|
||||
|
|
|
@ -3,6 +3,8 @@ package connections
|
|||
import (
|
||||
"cwtch.im/cwtch/protocol"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -42,11 +44,21 @@ func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConn
|
|||
}
|
||||
|
||||
// ManageServerConnection creates a new ServerConnection for Host with the given callback handler.
|
||||
// If there is an establish connection, it is replaced with a new one, assuming this came from
|
||||
// a new JoinServer from a new Group being joined. If it is still connecting to a server, the second request will be abandonded
|
||||
func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
psc, exists := m.serverConnections[host]
|
||||
|
||||
if exists {
|
||||
if psc.GetState() == DISCONNECTED || psc.GetState() == CONNECTING || psc.GetState() == CONNECTED {
|
||||
log.Infof("Already connecting to %v, abandoning fresh attempt\n", host)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
newPsc := NewPeerServerConnection(engine, host)
|
||||
newPsc.GroupMessageHandler = messageHandler
|
||||
newPsc.CloseHandler = closedHandler
|
||||
|
@ -54,10 +66,9 @@ func (m *Manager) ManageServerConnection(host string, engine Engine, messageHand
|
|||
m.serverConnections[host] = newPsc
|
||||
|
||||
if exists {
|
||||
log.Infof("Closing connection to %v, replacing with this one\n", host)
|
||||
psc.Close()
|
||||
}
|
||||
|
||||
m.lock.Unlock()
|
||||
}
|
||||
|
||||
// GetPeerPeerConnectionForOnion safely returns a given peer connection
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/protocol"
|
||||
"cwtch.im/cwtch/protocol/connections/peer"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/application"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
|
||||
|
@ -241,6 +242,7 @@ func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) {
|
|||
|
||||
// finishedFetch is a callback function the processes the termination of a fetch channel from a given server
|
||||
func (e *engine) finishedFetch(server string) {
|
||||
log.Debugf("Finished Fetch for %v\n", server)
|
||||
e.eventManager.Publish(event.NewEvent(event.FinishedFetch, map[event.Field]string{event.GroupServer: server}))
|
||||
}
|
||||
|
||||
|
@ -290,7 +292,8 @@ func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
|
|||
log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String())
|
||||
marshal, err := proto.Marshal(gci)
|
||||
if err == nil {
|
||||
cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.GroupInvite: string(marshal)}))
|
||||
marshalb64 := base64.StdEncoding.EncodeToString(marshal)
|
||||
cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: cph.Onion, event.GroupInvite: marshalb64}))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ func NewPeerServerConnection(engine Engine, serverhostname string) *PeerServerCo
|
|||
psc := new(PeerServerConnection)
|
||||
psc.protocolEngine = engine
|
||||
psc.Server = serverhostname
|
||||
psc.setState(DISCONNECTED)
|
||||
psc.Init()
|
||||
return psc
|
||||
}
|
||||
|
@ -44,6 +45,7 @@ func (psc *PeerServerConnection) GetState() ConnectionState {
|
|||
}
|
||||
|
||||
func (psc *PeerServerConnection) setState(state ConnectionState) {
|
||||
log.Debugf("Setting State to %v for %v\n", ConnectionStateName[state], psc.Server)
|
||||
psc.state = state
|
||||
psc.protocolEngine.EventManager().Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
||||
event.GroupServer: string(psc.Server),
|
||||
|
@ -64,14 +66,22 @@ func (psc *PeerServerConnection) WaitTilAuthenticated() {
|
|||
// Run manages the setup and teardown of a peer server connection
|
||||
func (psc *PeerServerConnection) Run() error {
|
||||
log.Infof("Connecting to %v", psc.Server)
|
||||
psc.setState(CONNECTING)
|
||||
|
||||
rc, err := goricochet.Open(psc.protocolEngine.ACN(), psc.Server)
|
||||
if err == nil {
|
||||
psc.connection = rc
|
||||
if psc.GetState() == KILLED {
|
||||
return nil
|
||||
}
|
||||
psc.setState(CONNECTED)
|
||||
pub, priv, err := ed25519.GenerateKey(rand.Reader)
|
||||
if err == nil {
|
||||
_, err := connection.HandleOutboundConnection(psc.connection).ProcessAuthAsV3Client(identity.InitializeV3("cwtchpeer", &priv, &pub))
|
||||
if err == nil {
|
||||
if psc.GetState() == KILLED {
|
||||
return nil
|
||||
}
|
||||
psc.setState(AUTHENTICATED)
|
||||
|
||||
go func() {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/protocol"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
@ -173,7 +174,8 @@ func (ps *profileStore) eventHandler() {
|
|||
}
|
||||
case event.NewGroupInvite:
|
||||
var gci protocol.GroupChatInvite
|
||||
err := proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &gci)
|
||||
json, _ := base64.StdEncoding.DecodeString(ev.Data[event.GroupInvite])
|
||||
err := proto.Unmarshal([]byte(json), &gci)
|
||||
if err == nil {
|
||||
ps.profile.ProcessInvite(&gci, ev.Data[event.RemotePeer])
|
||||
ps.save()
|
||||
|
|
|
@ -2,7 +2,8 @@ package testing
|
|||
|
||||
import (
|
||||
app2 "cwtch.im/cwtch/app"
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/app/utils"
|
||||
"cwtch.im/cwtch/event/bridge"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/protocol/connections"
|
||||
|
@ -101,23 +102,14 @@ func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.Cw
|
|||
return
|
||||
}
|
||||
|
||||
func waitGetPeer(app app2.Application, name string) peer.CwtchPeer {
|
||||
for true {
|
||||
for id, n := range app.ListPeers() {
|
||||
if n == name {
|
||||
return app.GetPeer(id)
|
||||
}
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestCwtchPeerIntegration(t *testing.T) {
|
||||
numGoRoutinesStart := runtime.NumGoroutine()
|
||||
|
||||
log.AddEverythingFromPattern("connectivity")
|
||||
//log.SetLevel(log.LevelDebug)
|
||||
log.SetLevel(log.LevelDebug)
|
||||
log.ExcludeFromPattern("connection/connection")
|
||||
log.ExcludeFromPattern("outbound/3dhauthchannel")
|
||||
log.ExcludeFromPattern("event/eventmanager")
|
||||
acn, err := connectivity.StartTor(".", "")
|
||||
if err != nil {
|
||||
t.Fatalf("Could not start Tor: %v", err)
|
||||
|
@ -150,8 +142,8 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
|
||||
app := app2.NewApp(acn, "./storage")
|
||||
|
||||
bridge1, bridge2 := event.MakePipeBridge()
|
||||
appClient := app2.NewAppClient(acn, "./storage", bridge1)
|
||||
bridge1, bridge2 := bridge.MakeGoChanBridge()
|
||||
appClient := app2.NewAppClient("./storage", bridge1)
|
||||
appService := app2.NewAppService(acn, "./storage", bridge2)
|
||||
|
||||
numGoRoutinesPostAppStart := runtime.NumGoroutine()
|
||||
|
@ -167,13 +159,13 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
fmt.Println("Creating Carol...")
|
||||
appClient.CreatePeer("carol", "asdfasdf")
|
||||
|
||||
alice := waitGetPeer(app, "alice")
|
||||
alice := utils.WaitGetPeer(app, "alice")
|
||||
fmt.Println("Alice created:", alice.GetProfile().Onion)
|
||||
|
||||
bob := waitGetPeer(app, "bob")
|
||||
bob := utils.WaitGetPeer(app, "bob")
|
||||
fmt.Println("Bob created:", bob.GetProfile().Onion)
|
||||
|
||||
carol := waitGetPeer(appClient, "carol")
|
||||
carol := utils.WaitGetPeer(appClient, "carol")
|
||||
fmt.Println("Carol created:", carol.GetProfile().Onion)
|
||||
|
||||
//fmt.Println("Carol created:", carol.GetProfile().Onion)
|
||||
|
|
Loading…
Reference in New Issue