forked from cwtch.im/cwtch
Merge branch 'appReload' of dan/cwtch into master
This commit is contained in:
commit
24855ca604
27
app/app.go
27
app/app.go
|
@ -26,7 +26,8 @@ type applicationCore struct {
|
|||
}
|
||||
|
||||
type appletPeers struct {
|
||||
peers map[string]peer.CwtchPeer
|
||||
peers map[string]peer.CwtchPeer
|
||||
launched bool // bit hacky, place holder while we transition to full multi peer support and a better api
|
||||
}
|
||||
|
||||
type appletACN struct {
|
||||
|
@ -69,6 +70,7 @@ func newAppCore(appDirectory string) *applicationCore {
|
|||
|
||||
func (ap *appletPeers) init() {
|
||||
ap.peers = make(map[string]peer.CwtchPeer)
|
||||
ap.launched = false
|
||||
}
|
||||
|
||||
func (a *appletACN) init(acn connectivity.ACN, publish func(int, string)) {
|
||||
|
@ -127,7 +129,7 @@ func (app *application) CreatePeer(name string, password string) {
|
|||
profileStore := storage.NewProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile)
|
||||
app.storage[profile.Onion] = profileStore
|
||||
|
||||
pc := app.storage[profile.Onion].GetProfileCopy()
|
||||
pc := app.storage[profile.Onion].GetProfileCopy(true)
|
||||
peer := peer.FromProfile(pc)
|
||||
peer.Init(app.eventBuses[profile.Onion])
|
||||
|
||||
|
@ -143,7 +145,7 @@ func (app *application) CreatePeer(name string, password string) {
|
|||
}
|
||||
|
||||
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
|
||||
func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfileFn) error {
|
||||
func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error {
|
||||
files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error: cannot read profiles directory: %v", err)
|
||||
|
@ -157,7 +159,7 @@ func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfi
|
|||
continue
|
||||
}
|
||||
|
||||
profile := profileStore.GetProfileCopy()
|
||||
profile := profileStore.GetProfileCopy(timeline)
|
||||
|
||||
_, exists := ac.eventBuses[profile.Onion]
|
||||
if exists {
|
||||
|
@ -179,7 +181,7 @@ func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfi
|
|||
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
|
||||
func (app *application) LoadProfiles(password string) {
|
||||
count := 0
|
||||
app.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) {
|
||||
app.applicationCore.LoadProfiles(password, true, func(profile *model.Profile, profileStore storage.ProfileStore) {
|
||||
peer := peer.FromProfile(profile)
|
||||
peer.Init(app.eventBuses[profile.Onion])
|
||||
|
||||
|
@ -207,13 +209,16 @@ func (app *application) GetPrimaryBus() event.Manager {
|
|||
|
||||
// LaunchPeers starts each peer Listening and connecting to peers and groups
|
||||
func (ap *appletPeers) LaunchPeers() {
|
||||
for _, p := range ap.peers {
|
||||
if !p.IsStarted() {
|
||||
p.Listen()
|
||||
p.StartPeersConnections()
|
||||
p.StartGroupConnections()
|
||||
}
|
||||
log.Debugf("appletPeers LaunchPeers\n")
|
||||
if ap.launched {
|
||||
return
|
||||
}
|
||||
for _, p := range ap.peers {
|
||||
p.Listen()
|
||||
p.StartPeersConnections()
|
||||
p.StartGroupConnections()
|
||||
}
|
||||
ap.launched = true
|
||||
}
|
||||
|
||||
// ListPeers returns a map of onions to their profile's Name
|
||||
|
|
|
@ -3,6 +3,11 @@ package app
|
|||
import "cwtch.im/cwtch/event"
|
||||
import "git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||
|
||||
const (
|
||||
// DestApp should be used as a destination for IPC messages that are for the application itself an not a peer
|
||||
DestApp = "app"
|
||||
)
|
||||
|
||||
type applicationBridge struct {
|
||||
applicationCore
|
||||
|
||||
|
@ -16,6 +21,7 @@ func (ab *applicationBridge) listen() {
|
|||
ipcMessage, ok := ab.bridge.Read()
|
||||
log.Debugf("listen() got %v for %v\n", ipcMessage.Message.EventType, ipcMessage.Dest)
|
||||
if !ok {
|
||||
log.Debugln("exiting appBridge.listen()")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -30,5 +36,4 @@ func (ab *applicationBridge) listen() {
|
|||
}
|
||||
|
||||
func (ab *applicationBridge) Shutdown() {
|
||||
ab.bridge.Shutdown()
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ func NewAppClient(appDirectory string, bridge event.IPCBridge) Application {
|
|||
|
||||
go appClient.listen()
|
||||
|
||||
appClient.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadClient)})
|
||||
|
||||
log.Infoln("Created new App Client")
|
||||
return appClient
|
||||
}
|
||||
|
@ -38,17 +40,20 @@ func (ac *applicationClient) handleEvent(ev *event.Event) {
|
|||
case event.NewPeer:
|
||||
localID := ev.Data[event.Identity]
|
||||
password := ev.Data[event.Password]
|
||||
ac.newPeer(localID, password)
|
||||
reload := ev.Data[event.Status] == "running"
|
||||
ac.newPeer(localID, password, reload)
|
||||
case event.PeerError:
|
||||
ac.appBus.Publish(*ev)
|
||||
case event.AppError:
|
||||
ac.appBus.Publish(*ev)
|
||||
case event.ACNStatus:
|
||||
ac.appBus.Publish(*ev)
|
||||
case event.ReloadDone:
|
||||
ac.appBus.Publish(*ev)
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *applicationClient) newPeer(localID, password string) {
|
||||
func (ac *applicationClient) newPeer(localID, password string, reload bool) {
|
||||
profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), password)
|
||||
if err != nil {
|
||||
log.Errorf("Could not read profile for NewPeer event: %v\n", err)
|
||||
|
@ -71,7 +76,16 @@ func (ac *applicationClient) newPeer(localID, password string) {
|
|||
defer ac.mutex.Unlock()
|
||||
ac.peers[profile.Onion] = peer
|
||||
ac.eventBuses[profile.Onion] = eventBus
|
||||
ac.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}))
|
||||
npEvent := event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})
|
||||
if reload {
|
||||
npEvent.Data[event.Status] = "running"
|
||||
}
|
||||
ac.appBus.Publish(npEvent)
|
||||
|
||||
if reload {
|
||||
ac.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadPeer, event.Identity, profile.Onion)})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// CreatePeer messages the service to create a new Peer with the given name
|
||||
|
|
|
@ -12,11 +12,6 @@ import (
|
|||
"strconv"
|
||||
)
|
||||
|
||||
const (
|
||||
// DestApp should be used as a destination for IPC messages that are for the application itself an not a peer
|
||||
DestApp = "app"
|
||||
)
|
||||
|
||||
type applicationService struct {
|
||||
applicationBridge
|
||||
appletACN
|
||||
|
@ -33,6 +28,7 @@ type ApplicationService interface {
|
|||
// NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge
|
||||
func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService {
|
||||
appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}}
|
||||
|
||||
fn := func(progress int, status string) {
|
||||
progStr := strconv.Itoa(progress)
|
||||
appService.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)})
|
||||
|
@ -56,6 +52,25 @@ func (as *applicationService) handleEvent(ev *event.Event) {
|
|||
case event.LoadProfiles:
|
||||
password := ev.Data[event.Password]
|
||||
as.loadProfiles(password)
|
||||
case event.ReloadClient:
|
||||
for _, storage := range as.storage {
|
||||
message := event.IPCMessage{Dest: DestApp, Message: *storage.GetNewPeerMessage()}
|
||||
as.bridge.Write(&message)
|
||||
}
|
||||
|
||||
message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadDone)}
|
||||
as.bridge.Write(&message)
|
||||
case event.ReloadPeer:
|
||||
onion := ev.Data[event.Identity]
|
||||
events := as.storage[onion].GetStatusMessages()
|
||||
|
||||
for _, ev := range events {
|
||||
message := event.IPCMessage{Dest: onion, Message: *ev}
|
||||
as.bridge.Write(&message)
|
||||
}
|
||||
case event.ShutdownPeer:
|
||||
onion := ev.Data[event.Identity]
|
||||
as.ShutdownPeer(onion)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -86,8 +101,9 @@ func (as *applicationService) createPeer(name, password string) {
|
|||
|
||||
func (as *applicationService) loadProfiles(password string) {
|
||||
count := 0
|
||||
as.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) {
|
||||
as.applicationCore.LoadProfiles(password, false, func(profile *model.Profile, profileStore storage.ProfileStore) {
|
||||
as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion])
|
||||
|
||||
blockedPeers := profile.BlockedPeers()
|
||||
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
|
||||
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], blockedPeers)
|
||||
|
@ -105,11 +121,18 @@ func (as *applicationService) loadProfiles(password string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (as *applicationService) ShutdownPeer(onion string) {
|
||||
as.engines[onion].Shutdown()
|
||||
delete(as.engines, onion)
|
||||
as.storage[onion].Shutdown()
|
||||
delete(as.storage, onion)
|
||||
as.eventBuses[onion].Shutdown()
|
||||
delete(as.eventBuses, onion)
|
||||
}
|
||||
|
||||
// Shutdown shuts down the application Service and all peer related backend parts
|
||||
func (as *applicationService) Shutdown() {
|
||||
for id := range as.engines {
|
||||
as.engines[id].Shutdown()
|
||||
as.storage[id].Shutdown()
|
||||
as.eventBuses[id].Shutdown()
|
||||
as.ShutdownPeer(id)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,9 @@ type pipeBridge struct {
|
|||
closedChan chan bool
|
||||
state connections.ConnectionState
|
||||
lock sync.Mutex
|
||||
|
||||
// For logging / debugging purposes
|
||||
name string
|
||||
}
|
||||
|
||||
func newPipeBridge(inFilename, outFilename string) *pipeBridge {
|
||||
|
@ -41,66 +44,50 @@ func newPipeBridge(inFilename, outFilename string) *pipeBridge {
|
|||
func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge {
|
||||
log.Debugf("Making new PipeBridge Client...\n")
|
||||
pb := newPipeBridge(inFilename, outFilename)
|
||||
go pb.clientConnectionManager()
|
||||
pb.name = "client"
|
||||
go pb.connectionManager()
|
||||
|
||||
return pb
|
||||
}
|
||||
|
||||
// NewPipeBridgeService returns a pipe backed IPCBridge for a service
|
||||
func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, error) {
|
||||
func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge {
|
||||
log.Debugf("Making new PipeBridge Service...\n")
|
||||
pb := newPipeBridge(inFilename, outFilename)
|
||||
pb.name = "service"
|
||||
|
||||
go pb.serviceConnectionManager()
|
||||
go pb.connectionManager()
|
||||
|
||||
log.Debugf("Successfully created new PipeBridge Service!\n")
|
||||
return pb, nil
|
||||
return pb
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) clientConnectionManager() {
|
||||
func (pb *pipeBridge) connectionManager() {
|
||||
for pb.state != connections.KILLED {
|
||||
log.Debugf("clientConnManager loop start init\n")
|
||||
pb.state = connections.CONNECTING
|
||||
|
||||
var err error
|
||||
pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600)
|
||||
log.Debugf("%v open file infile\n", pb.name)
|
||||
pb.in, err = os.OpenFile(pb.infile, os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
pb.state = connections.DISCONNECTED
|
||||
continue
|
||||
}
|
||||
|
||||
pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600)
|
||||
log.Debugf("%v open file outfile\n", pb.name)
|
||||
pb.out, err = os.OpenFile(pb.outfile, os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
pb.state = connections.DISCONNECTED
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("Successfully connected PipeBridge Client!\n")
|
||||
log.Debugf("Successfully connected PipeBridge %v!\n", pb.name)
|
||||
|
||||
pb.handleConns()
|
||||
}
|
||||
}
|
||||
log.Debugf("exiting %v ConnectionManager\n", pb.name)
|
||||
|
||||
func (pb *pipeBridge) serviceConnectionManager() {
|
||||
for pb.state != connections.KILLED {
|
||||
pb.state = connections.CONNECTING
|
||||
|
||||
var err error
|
||||
pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600)
|
||||
if err != nil {
|
||||
pb.state = connections.DISCONNECTED
|
||||
continue
|
||||
}
|
||||
|
||||
pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600)
|
||||
if err != nil {
|
||||
pb.state = connections.DISCONNECTED
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("Successfully connected PipeBridge Service!\n")
|
||||
|
||||
pb.handleConns()
|
||||
}
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) handleConns() {
|
||||
|
@ -110,30 +97,41 @@ func (pb *pipeBridge) handleConns() {
|
|||
|
||||
pb.closedChan = make(chan bool, 5)
|
||||
|
||||
log.Debugf("handleConns authed, 2xgo\n")
|
||||
log.Debugf("handleConns authed, %v 2xgo\n", pb.name)
|
||||
|
||||
go pb.handleRead()
|
||||
go pb.handleWrite()
|
||||
|
||||
<-pb.closedChan
|
||||
log.Debugf("handleConns CLOSEDCHAN!!!!\n")
|
||||
log.Debugf("handleConns <-closedChan (%v)\n", pb.name)
|
||||
if pb.state != connections.KILLED {
|
||||
pb.state = connections.FAILED
|
||||
}
|
||||
pb.closeReset()
|
||||
log.Debugf("handleConns done for %v, exit\n", pb.name)
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) closeReset() {
|
||||
pb.in.Close()
|
||||
pb.out.Close()
|
||||
close(pb.write)
|
||||
close(pb.read)
|
||||
pb.read = make(chan event.IPCMessage, maxBufferSize)
|
||||
pb.write = make(chan event.IPCMessage, maxBufferSize)
|
||||
log.Debugf("handleConns done, exit\n")
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) handleWrite() {
|
||||
log.Debugf("handleWrite() %v\n", pb.name)
|
||||
defer log.Debugf("exiting handleWrite() %v\n", pb.name)
|
||||
|
||||
for {
|
||||
select {
|
||||
case message := <-pb.write:
|
||||
log.Debugf("handleWrite <- message: %v\n", message)
|
||||
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
||||
log.Debugf("handleWrite <- message: %v %v ...\n", message.Dest, message.Message.EventType)
|
||||
} else {
|
||||
log.Debugf("handleWrite <- message: %v\n", message)
|
||||
}
|
||||
if pb.state == connections.AUTHENTICATED {
|
||||
encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}}
|
||||
for k, v := range message.Message.Data {
|
||||
|
@ -162,6 +160,9 @@ func (pb *pipeBridge) handleWrite() {
|
|||
}
|
||||
|
||||
func (pb *pipeBridge) handleRead() {
|
||||
log.Debugf("handleRead() %v\n", pb.name)
|
||||
defer log.Debugf("exiting handleRead() %v", pb.name)
|
||||
|
||||
var n int
|
||||
size := make([]byte, 2)
|
||||
var err error
|
||||
|
@ -198,26 +199,41 @@ func (pb *pipeBridge) handleRead() {
|
|||
val, _ := base64.StdEncoding.DecodeString(v)
|
||||
message.Message.Data[k] = string(val)
|
||||
}
|
||||
log.Debugf("handleRead read<-: %v\n", message)
|
||||
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
||||
log.Debugf("handleRead read<-: %v %v ...\n", message.Dest, message.Message.EventType)
|
||||
} else {
|
||||
log.Debugf("handleRead read<-: %v\n", message)
|
||||
}
|
||||
pb.read <- message
|
||||
log.Debugf("handleRead wrote\n")
|
||||
}
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) Read() (*event.IPCMessage, bool) {
|
||||
log.Debugf("Read()...\n")
|
||||
log.Debugf("Read() %v...\n", pb.name)
|
||||
|
||||
message := <-pb.read
|
||||
log.Debugf("Read: %v\n", message)
|
||||
return &message, true
|
||||
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
||||
log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
|
||||
} else {
|
||||
log.Debugf("Read %v: %v\n", pb.name, message)
|
||||
}
|
||||
return &message, pb.state != connections.KILLED
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) Write(message *event.IPCMessage) {
|
||||
log.Debugf("Write: %v\n", message)
|
||||
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
|
||||
log.Debugf("Write %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
|
||||
} else {
|
||||
log.Debugf("Write %v: %v\n", pb.name, message)
|
||||
}
|
||||
pb.write <- *message
|
||||
log.Debugf("Wrote\n")
|
||||
}
|
||||
|
||||
func (pb *pipeBridge) Shutdown() {
|
||||
log.Debugf("pb.Shutdown() for %v currently in state: %v\n", pb.name, connections.ConnectionStateName[pb.state])
|
||||
pb.state = connections.KILLED
|
||||
pb.closedChan <- true
|
||||
log.Debugf("Done Shutdown for %v\n", pb.name)
|
||||
}
|
||||
|
|
|
@ -36,12 +36,7 @@ func clientHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, d
|
|||
}
|
||||
|
||||
func serviceHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) {
|
||||
service, err := NewPipeBridgeService(in, out)
|
||||
if err != nil {
|
||||
t.Errorf("Error opening %v pipe: %v", in, err)
|
||||
done <- true
|
||||
return
|
||||
}
|
||||
service := NewPipeBridgeService(in, out)
|
||||
|
||||
service.Write(messageOrig)
|
||||
|
||||
|
|
|
@ -11,8 +11,6 @@ const (
|
|||
PeerRequest = Type("PeerRequest")
|
||||
BlockPeer = Type("BlockPeer")
|
||||
JoinServer = Type("JoinServer")
|
||||
// attributes: GroupServer
|
||||
FinishedFetch = Type("FinishedFetch")
|
||||
|
||||
ProtocolEngineStartListen = Type("ProtocolEngineStartListen")
|
||||
ProtocolEngineStopped = Type("ProtocolEngineStopped")
|
||||
|
@ -91,20 +89,28 @@ const (
|
|||
|
||||
// GroupServer
|
||||
// ConnectionState
|
||||
ServerStateChange = Type("GroupStateChange")
|
||||
ServerStateChange = Type("ServerStateChange")
|
||||
|
||||
/***** Application client / service messages *****/
|
||||
|
||||
// ProfileName, Password
|
||||
CreatePeer = Type("CreatePeer")
|
||||
|
||||
// service -> client: Identity(localId), Password
|
||||
// service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')]
|
||||
// app -> Identity(onion)
|
||||
NewPeer = Type("NewPeer")
|
||||
|
||||
// Password
|
||||
LoadProfiles = Type("LoadProfiles")
|
||||
|
||||
// Client has reloaded, triggers NewPeer s then ReloadDone
|
||||
ReloadClient = Type("ReloadClient")
|
||||
|
||||
ReloadDone = Type("ReloadDone")
|
||||
|
||||
// Identity - Ask service to resend all connection states
|
||||
ReloadPeer = Type("ReloadPeer")
|
||||
|
||||
// Identity(onion)
|
||||
ShutdownPeer = Type("ShutdownPeer")
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ type Group struct {
|
|||
SignedGroupID []byte
|
||||
GroupKey [32]byte
|
||||
GroupServer string
|
||||
Timeline Timeline
|
||||
Timeline Timeline `json:"-"`
|
||||
Accepted bool
|
||||
Owner string
|
||||
IsCompromised bool
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package model
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -48,6 +49,16 @@ func (t *Timeline) GetMessages() []Message {
|
|||
return messages
|
||||
}
|
||||
|
||||
// GetCopy returns a duplicate of the Timeline
|
||||
func (t *Timeline) GetCopy() *Timeline {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
bytes, _ := json.Marshal(t)
|
||||
newt := &Timeline{}
|
||||
json.Unmarshal(bytes, newt)
|
||||
return newt
|
||||
}
|
||||
|
||||
// SetMessages sets the Messages of this timeline. Only to be used in loading/initialization
|
||||
func (t *Timeline) SetMessages(messages []Message) {
|
||||
t.lock.Lock()
|
||||
|
|
|
@ -25,10 +25,10 @@ type PublicProfile struct {
|
|||
Blocked bool
|
||||
Onion string
|
||||
Attributes map[string]string
|
||||
Timeline Timeline
|
||||
LocalID string // used by storage engine
|
||||
State string `json:"-"`
|
||||
lock sync.Mutex
|
||||
//Timeline Timeline `json:"-"` // TODO: cache recent messages for client
|
||||
LocalID string // used by storage engine
|
||||
State string `json:"-"`
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// Profile encapsulates all the attributes necessary to be a Cwtch Peer.
|
||||
|
@ -119,6 +119,7 @@ func (p *Profile) RejectInvite(groupID string) {
|
|||
p.lock.Unlock()
|
||||
}
|
||||
|
||||
/*
|
||||
// AddMessageToContactTimeline allows the saving of a message sent via a direct connection chat to the profile.
|
||||
func (p *Profile) AddMessageToContactTimeline(onion string, fromMe bool, message string, sent time.Time) {
|
||||
p.lock.Lock()
|
||||
|
@ -136,6 +137,7 @@ func (p *Profile) AddMessageToContactTimeline(onion string, fromMe bool, message
|
|||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
// AcceptInvite accepts a group invite
|
||||
func (p *Profile) AcceptInvite(groupID string) (err error) {
|
||||
|
@ -384,13 +386,20 @@ func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte,
|
|||
return nil, nil, errors.New("group does not exist")
|
||||
}
|
||||
|
||||
// GetCopy returns a full deep copy of the Profile struct and its members
|
||||
func (p *Profile) GetCopy() *Profile {
|
||||
// GetCopy returns a full deep copy of the Profile struct and its members (timeline inclusion control by arg)
|
||||
func (p *Profile) GetCopy(timeline bool) *Profile {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
newp := new(Profile)
|
||||
bytes, _ := json.Marshal(p)
|
||||
json.Unmarshal(bytes, &newp)
|
||||
|
||||
if timeline {
|
||||
for groupID := range newp.Groups {
|
||||
newp.Groups[groupID].Timeline = *p.Groups[groupID].Timeline.GetCopy()
|
||||
}
|
||||
}
|
||||
|
||||
return newp
|
||||
}
|
||||
|
|
|
@ -4,31 +4,8 @@ import (
|
|||
"cwtch.im/cwtch/protocol"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestP2P(t *testing.T) {
|
||||
sarah := GenerateNewProfile("Sarah")
|
||||
alice := GenerateNewProfile("Alice")
|
||||
|
||||
sarah.AddContact(alice.Onion, &alice.PublicProfile)
|
||||
|
||||
sarah.AddMessageToContactTimeline(alice.Onion, false, "hello", time.Now())
|
||||
sarah.AddMessageToContactTimeline(alice.Onion, true, "world", time.Now())
|
||||
|
||||
contact, _ := sarah.GetContact(alice.Onion)
|
||||
for i, m := range contact.Timeline.GetMessages() {
|
||||
if i == 0 && (m.Message != "hello" || m.PeerID != alice.Onion) {
|
||||
t.Fatalf("Timeline is invalid: %v", m)
|
||||
}
|
||||
|
||||
if i == 1 && (m.Message != "world" || m.PeerID != sarah.Onion) {
|
||||
t.Fatalf("Timeline is invalid: %v", m)
|
||||
}
|
||||
t.Logf("Message: %v", m)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProfileIdentity(t *testing.T) {
|
||||
sarah := GenerateNewProfile("Sarah")
|
||||
alice := GenerateNewProfile("Alice")
|
||||
|
|
|
@ -21,7 +21,6 @@ type cwtchPeer struct {
|
|||
Profile *model.Profile
|
||||
mutex sync.Mutex
|
||||
shutdown bool
|
||||
started bool
|
||||
|
||||
queue *event.Queue
|
||||
eventBus event.Manager
|
||||
|
@ -59,7 +58,6 @@ type CwtchPeer interface {
|
|||
GetContacts() []string
|
||||
GetContact(string) *model.PublicProfile
|
||||
|
||||
IsStarted() bool
|
||||
Listen()
|
||||
StartPeersConnections()
|
||||
StartGroupConnections()
|
||||
|
@ -287,6 +285,7 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
|
|||
|
||||
// Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
|
||||
func (cp *cwtchPeer) Listen() {
|
||||
log.Debugf("cwtchPeer Listen sending ProtocolEngineStartListen\n")
|
||||
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{}))
|
||||
}
|
||||
|
||||
|
@ -317,11 +316,6 @@ func (cp *cwtchPeer) Shutdown() {
|
|||
cp.queue.Shutdown()
|
||||
}
|
||||
|
||||
// IsStarted returns true if Listen() has successfully been run before on this connection (ever). TODO: we will need to properly unset this flag on error if we want to support resumption in the future
|
||||
func (cp *cwtchPeer) IsStarted() bool {
|
||||
return cp.started
|
||||
}
|
||||
|
||||
// eventHandler process events from other subsystems
|
||||
func (cp *cwtchPeer) eventHandler() {
|
||||
for {
|
||||
|
|
|
@ -46,7 +46,7 @@ func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConn
|
|||
// ManageServerConnection creates a new ServerConnection for Host with the given callback handler.
|
||||
// If there is an establish connection, it is replaced with a new one, assuming this came from
|
||||
// a new JoinServer from a new Group being joined. If it is still connecting to a server, the second request will be abandonded
|
||||
func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) {
|
||||
func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage)) {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
|
@ -61,7 +61,6 @@ func (m *Manager) ManageServerConnection(host string, engine Engine, messageHand
|
|||
|
||||
newPsc := NewPeerServerConnection(engine, host)
|
||||
newPsc.GroupMessageHandler = messageHandler
|
||||
newPsc.CloseHandler = closedHandler
|
||||
go newPsc.Run()
|
||||
m.serverConnections[host] = newPsc
|
||||
|
||||
|
@ -71,6 +70,11 @@ func (m *Manager) ManageServerConnection(host string, engine Engine, messageHand
|
|||
}
|
||||
}
|
||||
|
||||
// SetServerSynced is a helper for peerserver connections and engine to call when a Fetch is done to set the state of the connection to SYNCED
|
||||
func (m *Manager) SetServerSynced(onion string) {
|
||||
m.serverConnections[onion].setState(SYNCED)
|
||||
}
|
||||
|
||||
// GetPeerPeerConnectionForOnion safely returns a given peer connection
|
||||
func (m *Manager) GetPeerPeerConnectionForOnion(host string) (ppc *PeerPeerConnection) {
|
||||
m.lock.Lock()
|
||||
|
|
|
@ -239,15 +239,9 @@ func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) {
|
|||
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
|
||||
}
|
||||
|
||||
// finishedFetch is a callback function the processes the termination of a fetch channel from a given server
|
||||
func (e *engine) finishedFetch(server string) {
|
||||
log.Debugf("Finished Fetch for %v\n", server)
|
||||
e.eventManager.Publish(event.NewEvent(event.FinishedFetch, map[event.Field]string{event.GroupServer: server}))
|
||||
}
|
||||
|
||||
// joinServer manages a new server connection with the given onion address
|
||||
func (e *engine) joinServer(onion string) {
|
||||
e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage, e.finishedFetch)
|
||||
e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage)
|
||||
}
|
||||
|
||||
// sendMessageToGroup attempts to sent the given message to the given group id.
|
||||
|
|
|
@ -26,7 +26,6 @@ type PeerServerConnection struct {
|
|||
protocolEngine Engine
|
||||
|
||||
GroupMessageHandler func(string, *protocol.GroupMessage)
|
||||
CloseHandler func(string)
|
||||
}
|
||||
|
||||
// NewPeerServerConnection creates a new Peer->Server outbound connection
|
||||
|
@ -53,10 +52,10 @@ func (psc *PeerServerConnection) setState(state ConnectionState) {
|
|||
}))
|
||||
}
|
||||
|
||||
// WaitTilAuthenticated waits until the underlying connection is authenticated
|
||||
func (psc *PeerServerConnection) WaitTilAuthenticated() {
|
||||
// WaitTilSynced waits until the underlying connection is authenticated
|
||||
func (psc *PeerServerConnection) WaitTilSynced() {
|
||||
for {
|
||||
if psc.GetState() == AUTHENTICATED {
|
||||
if psc.GetState() == SYNCED {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second * 1)
|
||||
|
@ -110,8 +109,8 @@ func (psc *PeerServerConnection) Break() error {
|
|||
|
||||
// SendGroupMessage sends the given protocol message to the Server.
|
||||
func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) error {
|
||||
if psc.state != AUTHENTICATED {
|
||||
return errors.New("peer is not yet connected & authenticated to server cannot send message")
|
||||
if psc.state != SYNCED {
|
||||
return errors.New("peer is not yet connected & authenticated & synced to server cannot send message")
|
||||
}
|
||||
|
||||
err := psc.connection.Do(func() error {
|
||||
|
@ -159,5 +158,5 @@ func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) {
|
|||
|
||||
// HandleFetchDone calls the supplied callback for when a fetch connection is closed
|
||||
func (psc *PeerServerConnection) HandleFetchDone() {
|
||||
psc.CloseHandler(psc.Server)
|
||||
psc.setState(SYNCED)
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ type TestServer struct {
|
|||
}
|
||||
|
||||
func (ts *TestServer) HandleGroupMessage(gm *protocol.GroupMessage) {
|
||||
ts.Received<-true
|
||||
ts.Received <- true
|
||||
}
|
||||
|
||||
func (ts *TestServer) HandleFetchRequest() []*protocol.GroupMessage {
|
||||
|
@ -68,7 +68,6 @@ func TestPeerServerConnection(t *testing.T) {
|
|||
pub, priv, _ := ed25519.GenerateKey(rand.Reader)
|
||||
|
||||
identity := identity.InitializeV3("", &priv, &pub)
|
||||
|
||||
t.Logf("Launching Server....\n")
|
||||
ts := new(TestServer)
|
||||
ts.Init()
|
||||
|
@ -86,17 +85,13 @@ func TestPeerServerConnection(t *testing.T) {
|
|||
psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) {
|
||||
numcalls++
|
||||
}
|
||||
closedCalls := 0
|
||||
psc.CloseHandler = func(s string) {
|
||||
closedCalls++
|
||||
}
|
||||
state := psc.GetState()
|
||||
if state != DISCONNECTED {
|
||||
t.Errorf("new connections should start in disconnected state")
|
||||
}
|
||||
time.Sleep(time.Second * 1)
|
||||
go psc.Run()
|
||||
psc.WaitTilAuthenticated()
|
||||
psc.WaitTilSynced()
|
||||
|
||||
gm := &protocol.GroupMessage{Ciphertext: []byte("hello"), Signature: []byte{}}
|
||||
psc.SendGroupMessage(gm)
|
||||
|
@ -107,9 +102,4 @@ func TestPeerServerConnection(t *testing.T) {
|
|||
if numcalls != 2 {
|
||||
t.Errorf("Should have received 2 calls from fetch request, instead received %v", numcalls)
|
||||
}
|
||||
|
||||
if closedCalls != 1 {
|
||||
t.Errorf("Should have closed connection 1 time, instead of %v", closedCalls)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -8,20 +8,22 @@ type ConnectionState int
|
|||
// CONNECTING - We are in the process of attempting to connect to a given endpoint
|
||||
// CONNECTED - We have connected but not yet authenticated
|
||||
// AUTHENTICATED - im.ricochet.auth-hidden-server has succeeded on the connection.
|
||||
// SYNCED - we have pulled all the messages for groups from the server and are ready to send
|
||||
const (
|
||||
DISCONNECTED ConnectionState = iota
|
||||
CONNECTING
|
||||
CONNECTED
|
||||
AUTHENTICATED
|
||||
SYNCED
|
||||
FAILED
|
||||
KILLED
|
||||
)
|
||||
|
||||
var (
|
||||
// ConnectionStateName allows conversion of states to their string representations
|
||||
ConnectionStateName = []string{"Disconnected", "Connecting", "Connected", "Authenticated", "Failed", "Killed"}
|
||||
ConnectionStateName = []string{"Disconnected", "Connecting", "Connected", "Authenticated", "Synced", "Failed", "Killed"}
|
||||
|
||||
// ConnectionStateType allows conversion of strings to their state type
|
||||
ConnectionStateType = map[string]ConnectionState{"Disconnected": DISCONNECTED, "Connecting": CONNECTING,
|
||||
"Connected": CONNECTED, "Authenticated": AUTHENTICATED, "Failed": FAILED, "Killed": KILLED}
|
||||
"Connected": CONNECTED, "Authenticated": AUTHENTICATED, "Synced": SYNCED, "Failed": FAILED, "Killed": KILLED}
|
||||
)
|
||||
|
|
|
@ -28,7 +28,9 @@ type profileStore struct {
|
|||
type ProfileStore interface {
|
||||
Load() error
|
||||
Shutdown()
|
||||
GetProfileCopy() *model.Profile
|
||||
GetProfileCopy(timeline bool) *model.Profile
|
||||
GetNewPeerMessage() *event.Event
|
||||
GetStatusMessages() []*event.Event
|
||||
}
|
||||
|
||||
// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
|
||||
|
@ -52,6 +54,8 @@ func NewProfileWriterStore(eventManager event.Manager, directory, password strin
|
|||
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue.EventChannel)
|
||||
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue.EventChannel)
|
||||
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue.EventChannel)
|
||||
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue.EventChannel)
|
||||
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue.EventChannel)
|
||||
|
||||
return ps
|
||||
}
|
||||
|
@ -67,7 +71,7 @@ func ReadProfile(directory, password string) (*model.Profile, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
profile := ps.GetProfileCopy()
|
||||
profile := ps.GetProfileCopy(true)
|
||||
|
||||
return profile, nil
|
||||
}
|
||||
|
@ -78,6 +82,37 @@ func NewProfile(name string) *model.Profile {
|
|||
return profile
|
||||
}
|
||||
|
||||
// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
|
||||
func (ps *profileStore) GetNewPeerMessage() *event.Event {
|
||||
message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running")
|
||||
return &message
|
||||
}
|
||||
|
||||
func (ps *profileStore) GetStatusMessages() []*event.Event {
|
||||
messages := []*event.Event{}
|
||||
for _, contact := range ps.profile.Contacts {
|
||||
message := event.NewEvent(event.PeerStateChange, map[event.Field]string{
|
||||
event.RemotePeer: string(contact.Onion),
|
||||
event.ConnectionState: contact.State,
|
||||
})
|
||||
messages = append(messages, &message)
|
||||
}
|
||||
|
||||
doneServers := make(map[string]bool)
|
||||
for _, group := range ps.profile.Groups {
|
||||
if _, exists := doneServers[group.GroupServer]; !exists {
|
||||
message := event.NewEvent(event.ServerStateChange, map[event.Field]string{
|
||||
event.GroupServer: string(group.GroupServer),
|
||||
event.ConnectionState: group.State,
|
||||
})
|
||||
messages = append(messages, &message)
|
||||
doneServers[group.GroupServer] = true
|
||||
}
|
||||
}
|
||||
|
||||
return messages
|
||||
}
|
||||
|
||||
func (ps *profileStore) save() error {
|
||||
if ps.writer {
|
||||
bytes, _ := json.Marshal(ps.profile)
|
||||
|
@ -97,15 +132,9 @@ func (ps *profileStore) Load() error {
|
|||
if err == nil {
|
||||
ps.profile = cp
|
||||
|
||||
for _, profile := range cp.Contacts {
|
||||
ss := NewStreamStore(ps.directory, profile.LocalID, ps.password)
|
||||
profile.Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[profile.Onion] = ss
|
||||
}
|
||||
|
||||
for _, group := range cp.Groups {
|
||||
for gid, group := range cp.Groups {
|
||||
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
|
||||
group.Timeline.SetMessages(ss.Read())
|
||||
cp.Groups[gid].Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[group.GroupID] = ss
|
||||
}
|
||||
}
|
||||
|
@ -113,8 +142,8 @@ func (ps *profileStore) Load() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (ps *profileStore) GetProfileCopy() *model.Profile {
|
||||
return ps.profile.GetCopy()
|
||||
func (ps *profileStore) GetProfileCopy(timeline bool) *model.Profile {
|
||||
return ps.profile.GetCopy(timeline)
|
||||
}
|
||||
|
||||
func (ps *profileStore) eventHandler() {
|
||||
|
@ -131,10 +160,11 @@ func (ps *profileStore) eventHandler() {
|
|||
var pp *model.PublicProfile
|
||||
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
|
||||
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
|
||||
ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
|
||||
// TODO: configure - allow peers to be configured to turn on limited storage
|
||||
/*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
|
||||
pp.Timeline.SetMessages(ss.Read())
|
||||
ps.streamStores[pp.Onion] = ss
|
||||
ps.save()
|
||||
ps.save()*/
|
||||
case event.GroupCreated:
|
||||
var group *model.Group
|
||||
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
|
||||
|
@ -193,6 +223,16 @@ func (ps *profileStore) eventHandler() {
|
|||
} else {
|
||||
log.Errorf("error storing new group message: %v stream store does not exist", ev)
|
||||
}
|
||||
case event.PeerStateChange:
|
||||
if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists {
|
||||
ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
|
||||
}
|
||||
case event.ServerStateChange:
|
||||
for _, group := range ps.profile.Groups {
|
||||
if group.GroupServer == ev.Data[event.GroupServer] {
|
||||
group.State = ev.Data[event.ConnectionState]
|
||||
}
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
|
|
@ -35,14 +35,14 @@ func TestProfileStoreWriteRead(t *testing.T) {
|
|||
packet := protocol.CwtchPeerPacket{}
|
||||
proto.Unmarshal(invite, &packet)
|
||||
invite, _ = proto.Marshal(packet.GetGroupChatInvite())
|
||||
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy().Onion, event.GroupInvite: string(invite)}))
|
||||
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
|
||||
event.GroupID: groupid,
|
||||
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
|
||||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||
event.RemotePeer: ps1.GetProfileCopy().Onion,
|
||||
event.RemotePeer: ps1.GetProfileCopy(true).Onion,
|
||||
event.Data: testMessage,
|
||||
}))
|
||||
time.Sleep(1 * time.Second)
|
||||
|
@ -55,7 +55,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
|
|||
t.Errorf("Error createing profileStore: %v\n", err)
|
||||
}
|
||||
|
||||
profile = ps2.GetProfileCopy()
|
||||
profile = ps2.GetProfileCopy(true)
|
||||
if profile.Name != testProfileName {
|
||||
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
|
||||
}
|
||||
|
@ -65,7 +65,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
|
|||
t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
|
||||
}
|
||||
|
||||
group2 := ps2.GetProfileCopy().Groups[groupid]
|
||||
group2 := ps2.GetProfileCopy(true).Groups[groupid]
|
||||
if group2 == nil {
|
||||
t.Errorf("Group not loaded\n")
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ type streamStore struct {
|
|||
storeDirectory string
|
||||
filenameBase string
|
||||
|
||||
// Buffer is used just for current file to write to
|
||||
messages []model.Message
|
||||
bufferByteCount int
|
||||
|
||||
|
@ -100,7 +101,7 @@ func (ss *streamStore) rotateFileStore() {
|
|||
}
|
||||
}
|
||||
|
||||
// FetchMessages returns all messages from the backing file.
|
||||
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file)
|
||||
func (ss *streamStore) Read() (messages []model.Message) {
|
||||
ss.lock.Lock()
|
||||
defer ss.lock.Unlock()
|
||||
|
@ -116,12 +117,8 @@ func (ss *streamStore) Read() (messages []model.Message) {
|
|||
}
|
||||
|
||||
msgs := []model.Message{}
|
||||
err = json.Unmarshal([]byte(bytes), &msgs)
|
||||
if err == nil {
|
||||
resp = append(resp, msgs...)
|
||||
} else {
|
||||
log.Debugf("Failed to unmarshal messages: %v", err)
|
||||
}
|
||||
json.Unmarshal([]byte(bytes), &msgs)
|
||||
resp = append(resp, msgs...)
|
||||
}
|
||||
|
||||
return resp
|
||||
|
|
|
@ -66,7 +66,7 @@ func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID strin
|
|||
if state == connections.FAILED {
|
||||
t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, groupID)
|
||||
}
|
||||
if state != connections.AUTHENTICATED {
|
||||
if state != connections.SYNCED {
|
||||
fmt.Printf("peer %v waiting connect to group %v, currently: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state])
|
||||
time.Sleep(time.Second * 5)
|
||||
continue
|
||||
|
@ -110,6 +110,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
log.ExcludeFromPattern("connection/connection")
|
||||
log.ExcludeFromPattern("outbound/3dhauthchannel")
|
||||
log.ExcludeFromPattern("event/eventmanager")
|
||||
log.ExcludeFromPattern("pipeBridge")
|
||||
acn, err := connectivity.StartTor(".", "")
|
||||
if err != nil {
|
||||
t.Fatalf("Could not start Tor: %v", err)
|
||||
|
@ -142,9 +143,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
|
||||
app := app2.NewApp(acn, "./storage")
|
||||
|
||||
bridge1, bridge2 := bridge.MakeGoChanBridge()
|
||||
appClient := app2.NewAppClient("./storage", bridge1)
|
||||
appService := app2.NewAppService(acn, "./storage", bridge2)
|
||||
bridgeClient := bridge.NewPipeBridgeClient("./clientPipe", "./servicePipe")
|
||||
bridgeService := bridge.NewPipeBridgeService("./servicePipe", "./clientPipe")
|
||||
appClient := app2.NewAppClient("./storage", bridgeClient)
|
||||
appService := app2.NewAppService(acn, "./storage", bridgeService)
|
||||
|
||||
numGoRoutinesPostAppStart := runtime.NumGoroutine()
|
||||
|
||||
|
@ -168,8 +170,6 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
carol := utils.WaitGetPeer(appClient, "carol")
|
||||
fmt.Println("Carol created:", carol.GetProfile().Onion)
|
||||
|
||||
//fmt.Println("Carol created:", carol.GetProfile().Onion)
|
||||
|
||||
app.LaunchPeers()
|
||||
appClient.LaunchPeers()
|
||||
|
||||
|
@ -384,12 +384,22 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
numGoRoutinesPostCarol := runtime.NumGoroutine()
|
||||
|
||||
fmt.Println("Shutting down apps...")
|
||||
fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine())
|
||||
app.Shutdown()
|
||||
fmt.Printf("appClientShutdown: %v\n", runtime.NumGoroutine())
|
||||
appClient.Shutdown()
|
||||
fmt.Printf("appServiceShutdown: %v\n", runtime.NumGoroutine())
|
||||
appService.Shutdown()
|
||||
|
||||
bridge1.Shutdown()
|
||||
bridge2.Shutdown()
|
||||
fmt.Printf("bridgeClientShutdown: %v\n", runtime.NumGoroutine())
|
||||
bridgeClient.Shutdown()
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
fmt.Printf("brideServiceShutdown: %v\n", runtime.NumGoroutine())
|
||||
bridgeService.Shutdown()
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
fmt.Printf("Done shutdown: %v\n", runtime.NumGoroutine())
|
||||
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()
|
||||
|
||||
fmt.Println("Shutting down ACN...")
|
||||
|
|
Loading…
Reference in New Issue