Browse Source

add app level functionality to reload from service

pull/255/head
Dan Ballard 4 months ago
parent
commit
0465973a78

+ 15
- 10
app/app.go View File

@@ -26,7 +26,8 @@ type applicationCore struct {
}

type appletPeers struct {
peers map[string]peer.CwtchPeer
peers map[string]peer.CwtchPeer
launched bool // bit hacky, place holder while we transition to full multi peer support and a better api
}

type appletACN struct {
@@ -69,6 +70,7 @@ func newAppCore(appDirectory string) *applicationCore {

func (ap *appletPeers) init() {
ap.peers = make(map[string]peer.CwtchPeer)
ap.launched = false
}

func (a *appletACN) init(acn connectivity.ACN, publish func(int, string)) {
@@ -127,7 +129,7 @@ func (app *application) CreatePeer(name string, password string) {
profileStore := storage.NewProfileWriterStore(app.eventBuses[profile.Onion], path.Join(app.directory, "profiles", profile.LocalID), password, profile)
app.storage[profile.Onion] = profileStore

pc := app.storage[profile.Onion].GetProfileCopy()
pc := app.storage[profile.Onion].GetProfileCopy(true)
peer := peer.FromProfile(pc)
peer.Init(app.eventBuses[profile.Onion])

@@ -143,7 +145,7 @@ func (app *application) CreatePeer(name string, password string) {
}

// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfileFn) error {
func (ac *applicationCore) LoadProfiles(password string, timeline bool, loadProfileFn LoadProfileFn) error {
files, err := ioutil.ReadDir(path.Join(ac.directory, "profiles"))
if err != nil {
return fmt.Errorf("Error: cannot read profiles directory: %v", err)
@@ -157,7 +159,7 @@ func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfi
continue
}

profile := profileStore.GetProfileCopy()
profile := profileStore.GetProfileCopy(timeline)

_, exists := ac.eventBuses[profile.Onion]
if exists {
@@ -179,7 +181,7 @@ func (ac *applicationCore) LoadProfiles(password string, loadProfileFn LoadProfi
// LoadProfiles takes a password and attempts to load any profiles it can from storage with it and create Peers for them
func (app *application) LoadProfiles(password string) {
count := 0
app.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) {
app.applicationCore.LoadProfiles(password, true, func(profile *model.Profile, profileStore storage.ProfileStore) {
peer := peer.FromProfile(profile)
peer.Init(app.eventBuses[profile.Onion])

@@ -207,13 +209,16 @@ func (app *application) GetPrimaryBus() event.Manager {

// LaunchPeers starts each peer Listening and connecting to peers and groups
func (ap *appletPeers) LaunchPeers() {
log.Debugf("appletPeers LaunchPeers\n")
if ap.launched {
return
}
for _, p := range ap.peers {
if !p.IsStarted() {
p.Listen()
p.StartPeersConnections()
p.StartGroupConnections()
}
p.Listen()
p.StartPeersConnections()
p.StartGroupConnections()
}
ap.launched = true
}

// ListPeers returns a map of onions to their profile's Name

+ 6
- 1
app/appBridge.go View File

@@ -3,6 +3,11 @@ package app
import "cwtch.im/cwtch/event"
import "git.openprivacy.ca/openprivacy/libricochet-go/log"

const (
// DestApp should be used as a destination for IPC messages that are for the application itself an not a peer
DestApp = "app"
)

type applicationBridge struct {
applicationCore

@@ -16,6 +21,7 @@ func (ab *applicationBridge) listen() {
ipcMessage, ok := ab.bridge.Read()
log.Debugf("listen() got %v for %v\n", ipcMessage.Message.EventType, ipcMessage.Dest)
if !ok {
log.Debugln("exiting appBridge.listen()")
return
}

@@ -30,5 +36,4 @@ func (ab *applicationBridge) listen() {
}

func (ab *applicationBridge) Shutdown() {
ab.bridge.Shutdown()
}

+ 17
- 3
app/appClient.go View File

@@ -24,6 +24,8 @@ func NewAppClient(appDirectory string, bridge event.IPCBridge) Application {

go appClient.listen()

appClient.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadClient)})

log.Infoln("Created new App Client")
return appClient
}
@@ -38,17 +40,20 @@ func (ac *applicationClient) handleEvent(ev *event.Event) {
case event.NewPeer:
localID := ev.Data[event.Identity]
password := ev.Data[event.Password]
ac.newPeer(localID, password)
reload := ev.Data[event.Status] == "running"
ac.newPeer(localID, password, reload)
case event.PeerError:
ac.appBus.Publish(*ev)
case event.AppError:
ac.appBus.Publish(*ev)
case event.ACNStatus:
ac.appBus.Publish(*ev)
case event.ReloadDone:
ac.appBus.Publish(*ev)
}
}

func (ac *applicationClient) newPeer(localID, password string) {
func (ac *applicationClient) newPeer(localID, password string, reload bool) {
profile, err := storage.ReadProfile(path.Join(ac.directory, "profiles", localID), password)
if err != nil {
log.Errorf("Could not read profile for NewPeer event: %v\n", err)
@@ -71,7 +76,16 @@ func (ac *applicationClient) newPeer(localID, password string) {
defer ac.mutex.Unlock()
ac.peers[profile.Onion] = peer
ac.eventBuses[profile.Onion] = eventBus
ac.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}))
npEvent := event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion})
if reload {
npEvent.Data[event.Status] = "running"
}
ac.appBus.Publish(npEvent)

if reload {
ac.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadPeer, event.Identity, profile.Onion)})
}

}

// CreatePeer messages the service to create a new Peer with the given name

+ 32
- 9
app/appService.go View File

@@ -12,11 +12,6 @@ import (
"strconv"
)

const (
// DestApp should be used as a destination for IPC messages that are for the application itself an not a peer
DestApp = "app"
)

type applicationService struct {
applicationBridge
appletACN
@@ -33,6 +28,7 @@ type ApplicationService interface {
// NewAppService returns an ApplicationService that runs the backend of an app and communicates with a client by the supplied IPCBridge
func NewAppService(acn connectivity.ACN, appDirectory string, bridge event.IPCBridge) ApplicationService {
appService := &applicationService{storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), applicationBridge: applicationBridge{applicationCore: *newAppCore(appDirectory), bridge: bridge}}

fn := func(progress int, status string) {
progStr := strconv.Itoa(progress)
appService.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)})
@@ -56,6 +52,25 @@ func (as *applicationService) handleEvent(ev *event.Event) {
case event.LoadProfiles:
password := ev.Data[event.Password]
as.loadProfiles(password)
case event.ReloadClient:
for _, storage := range as.storage {
message := event.IPCMessage{Dest: DestApp, Message: *storage.GetNewPeerMessage()}
as.bridge.Write(&message)
}

message := event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ReloadDone)}
as.bridge.Write(&message)
case event.ReloadPeer:
onion := ev.Data[event.Identity]
events := as.storage[onion].GetStatusMessages()

for _, ev := range events {
message := event.IPCMessage{Dest: onion, Message: *ev}
as.bridge.Write(&message)
}
case event.ShutdownPeer:
onion := ev.Data[event.Identity]
as.ShutdownPeer(onion)
}
}

@@ -86,8 +101,9 @@ func (as *applicationService) createPeer(name, password string) {

func (as *applicationService) loadProfiles(password string) {
count := 0
as.applicationCore.LoadProfiles(password, func(profile *model.Profile, profileStore storage.ProfileStore) {
as.applicationCore.LoadProfiles(password, false, func(profile *model.Profile, profileStore storage.ProfileStore) {
as.eventBuses[profile.Onion] = event.IPCEventManagerFrom(as.bridge, profile.Onion, as.eventBuses[profile.Onion])

blockedPeers := profile.BlockedPeers()
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, as.acn, as.eventBuses[profile.Onion], blockedPeers)
@@ -105,11 +121,18 @@ func (as *applicationService) loadProfiles(password string) {
}
}

func (as *applicationService) ShutdownPeer(onion string) {
as.engines[onion].Shutdown()
delete(as.engines, onion)
as.storage[onion].Shutdown()
delete(as.storage, onion)
as.eventBuses[onion].Shutdown()
delete(as.eventBuses, onion)
}

// Shutdown shuts down the application Service and all peer related backend parts
func (as *applicationService) Shutdown() {
for id := range as.engines {
as.engines[id].Shutdown()
as.storage[id].Shutdown()
as.eventBuses[id].Shutdown()
as.ShutdownPeer(id)
}
}

+ 55
- 39
event/bridge/pipeBridge.go View File

@@ -26,6 +26,9 @@ type pipeBridge struct {
closedChan chan bool
state connections.ConnectionState
lock sync.Mutex

// For logging / debugging purposes
name string
}

func newPipeBridge(inFilename, outFilename string) *pipeBridge {
@@ -41,66 +44,50 @@ func newPipeBridge(inFilename, outFilename string) *pipeBridge {
func NewPipeBridgeClient(inFilename, outFilename string) event.IPCBridge {
log.Debugf("Making new PipeBridge Client...\n")
pb := newPipeBridge(inFilename, outFilename)
go pb.clientConnectionManager()
pb.name = "client"
go pb.connectionManager()

return pb
}

// NewPipeBridgeService returns a pipe backed IPCBridge for a service
func NewPipeBridgeService(inFilename, outFilename string) (event.IPCBridge, error) {
func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge {
log.Debugf("Making new PipeBridge Service...\n")
pb := newPipeBridge(inFilename, outFilename)
pb.name = "service"

go pb.serviceConnectionManager()
go pb.connectionManager()

log.Debugf("Successfully created new PipeBridge Service!\n")
return pb, nil
return pb
}

func (pb *pipeBridge) clientConnectionManager() {
func (pb *pipeBridge) connectionManager() {
for pb.state != connections.KILLED {
log.Debugf("clientConnManager loop start init\n")
pb.state = connections.CONNECTING

var err error
pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600)
log.Debugf("%v open file infile\n", pb.name)
pb.in, err = os.OpenFile(pb.infile, os.O_RDWR, 0600)
if err != nil {
pb.state = connections.DISCONNECTED
continue
}

pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600)
log.Debugf("%v open file outfile\n", pb.name)
pb.out, err = os.OpenFile(pb.outfile, os.O_RDWR, 0600)
if err != nil {
pb.state = connections.DISCONNECTED
continue
}

log.Debugf("Successfully connected PipeBridge Client!\n")
log.Debugf("Successfully connected PipeBridge %v!\n", pb.name)

pb.handleConns()
}
}

func (pb *pipeBridge) serviceConnectionManager() {
for pb.state != connections.KILLED {
pb.state = connections.CONNECTING

var err error
pb.out, err = os.OpenFile(pb.outfile, os.O_WRONLY, 0600)
if err != nil {
pb.state = connections.DISCONNECTED
continue
}

pb.in, err = os.OpenFile(pb.infile, os.O_RDONLY, 0600)
if err != nil {
pb.state = connections.DISCONNECTED
continue
}
log.Debugf("exiting %v ConnectionManager\n", pb.name)

log.Debugf("Successfully connected PipeBridge Service!\n")

pb.handleConns()
}
}

func (pb *pipeBridge) handleConns() {
@@ -110,30 +97,41 @@ func (pb *pipeBridge) handleConns() {

pb.closedChan = make(chan bool, 5)

log.Debugf("handleConns authed, 2xgo\n")
log.Debugf("handleConns authed, %v 2xgo\n", pb.name)

go pb.handleRead()
go pb.handleWrite()

<-pb.closedChan
log.Debugf("handleConns CLOSEDCHAN!!!!\n")
log.Debugf("handleConns <-closedChan (%v)\n", pb.name)
if pb.state != connections.KILLED {
pb.state = connections.FAILED
}
pb.closeReset()
log.Debugf("handleConns done for %v, exit\n", pb.name)
}

func (pb *pipeBridge) closeReset() {
pb.in.Close()
pb.out.Close()
close(pb.write)
close(pb.read)
pb.read = make(chan event.IPCMessage, maxBufferSize)
pb.write = make(chan event.IPCMessage, maxBufferSize)
log.Debugf("handleConns done, exit\n")
}

func (pb *pipeBridge) handleWrite() {
log.Debugf("handleWrite() %v\n", pb.name)
defer log.Debugf("exiting handleWrite() %v\n", pb.name)

for {
select {
case message := <-pb.write:
log.Debugf("handleWrite <- message: %v\n", message)
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
log.Debugf("handleWrite <- message: %v %v ...\n", message.Dest, message.Message.EventType)
} else {
log.Debugf("handleWrite <- message: %v\n", message)
}
if pb.state == connections.AUTHENTICATED {
encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}}
for k, v := range message.Message.Data {
@@ -162,6 +160,9 @@ func (pb *pipeBridge) handleWrite() {
}

func (pb *pipeBridge) handleRead() {
log.Debugf("handleRead() %v\n", pb.name)
defer log.Debugf("exiting handleRead() %v", pb.name)

var n int
size := make([]byte, 2)
var err error
@@ -198,26 +199,41 @@ func (pb *pipeBridge) handleRead() {
val, _ := base64.StdEncoding.DecodeString(v)
message.Message.Data[k] = string(val)
}
log.Debugf("handleRead read<-: %v\n", message)
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
log.Debugf("handleRead read<-: %v %v ...\n", message.Dest, message.Message.EventType)
} else {
log.Debugf("handleRead read<-: %v\n", message)
}
pb.read <- message
log.Debugf("handleRead wrote\n")
}
}

func (pb *pipeBridge) Read() (*event.IPCMessage, bool) {
log.Debugf("Read()...\n")
log.Debugf("Read() %v...\n", pb.name)

message := <-pb.read
log.Debugf("Read: %v\n", message)
return &message, true
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
} else {
log.Debugf("Read %v: %v\n", pb.name, message)
}
return &message, pb.state != connections.KILLED
}

func (pb *pipeBridge) Write(message *event.IPCMessage) {
log.Debugf("Write: %v\n", message)
if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup {
log.Debugf("Write %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType)
} else {
log.Debugf("Write %v: %v\n", pb.name, message)
}
pb.write <- *message
log.Debugf("Wrote\n")
}

func (pb *pipeBridge) Shutdown() {
log.Debugf("pb.Shutdown() for %v currently in state: %v\n", pb.name, connections.ConnectionStateName[pb.state])
pb.state = connections.KILLED
pb.closedChan <- true
log.Debugf("Done Shutdown for %v\n", pb.name)
}

+ 1
- 6
event/bridge/pipeBridge_test.go View File

@@ -36,12 +36,7 @@ func clientHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, d
}

func serviceHelper(t *testing.T, in, out string, messageOrig *event.IPCMessage, done chan bool) {
service, err := NewPipeBridgeService(in, out)
if err != nil {
t.Errorf("Error opening %v pipe: %v", in, err)
done <- true
return
}
service := NewPipeBridgeService(in, out)

service.Write(messageOrig)


+ 10
- 4
event/common.go View File

@@ -11,8 +11,6 @@ const (
PeerRequest = Type("PeerRequest")
BlockPeer = Type("BlockPeer")
JoinServer = Type("JoinServer")
// attributes: GroupServer
FinishedFetch = Type("FinishedFetch")

ProtocolEngineStartListen = Type("ProtocolEngineStartListen")
ProtocolEngineStopped = Type("ProtocolEngineStopped")
@@ -91,20 +89,28 @@ const (

// GroupServer
// ConnectionState
ServerStateChange = Type("GroupStateChange")
ServerStateChange = Type("ServerStateChange")

/***** Application client / service messages *****/

// ProfileName, Password
CreatePeer = Type("CreatePeer")

// service -> client: Identity(localId), Password
// service -> client: Identity(localId), Password, [Status(new/default=blank || from reload='running')]
// app -> Identity(onion)
NewPeer = Type("NewPeer")

// Password
LoadProfiles = Type("LoadProfiles")

// Client has reloaded, triggers NewPeer s then ReloadDone
ReloadClient = Type("ReloadClient")

ReloadDone = Type("ReloadDone")

// Identity - Ask service to resend all connection states
ReloadPeer = Type("ReloadPeer")

// Identity(onion)
ShutdownPeer = Type("ShutdownPeer")


+ 1
- 1
model/group.go View File

@@ -21,7 +21,7 @@ type Group struct {
SignedGroupID []byte
GroupKey [32]byte
GroupServer string
Timeline Timeline
Timeline Timeline `json:"-"`
Accepted bool
Owner string
IsCompromised bool

+ 11
- 0
model/message.go View File

@@ -1,6 +1,7 @@
package model

import (
"encoding/json"
"sort"
"sync"
"time"
@@ -48,6 +49,16 @@ func (t *Timeline) GetMessages() []Message {
return messages
}

// GetCopy returns a duplicate of the Timeline
func (t *Timeline) GetCopy() *Timeline {
t.lock.Lock()
defer t.lock.Unlock()
bytes, _ := json.Marshal(t)
newt := &Timeline{}
json.Unmarshal(bytes, newt)
return newt
}

// SetMessages sets the Messages of this timeline. Only to be used in loading/initialization
func (t *Timeline) SetMessages(messages []Message) {
t.lock.Lock()

+ 15
- 6
model/profile.go View File

@@ -25,10 +25,10 @@ type PublicProfile struct {
Blocked bool
Onion string
Attributes map[string]string
Timeline Timeline
LocalID string // used by storage engine
State string `json:"-"`
lock sync.Mutex
//Timeline Timeline `json:"-"` // TODO: cache recent messages for client
LocalID string // used by storage engine
State string `json:"-"`
lock sync.Mutex
}

// Profile encapsulates all the attributes necessary to be a Cwtch Peer.
@@ -119,6 +119,7 @@ func (p *Profile) RejectInvite(groupID string) {
p.lock.Unlock()
}

/*
// AddMessageToContactTimeline allows the saving of a message sent via a direct connection chat to the profile.
func (p *Profile) AddMessageToContactTimeline(onion string, fromMe bool, message string, sent time.Time) {
p.lock.Lock()
@@ -136,6 +137,7 @@ func (p *Profile) AddMessageToContactTimeline(onion string, fromMe bool, message
}
}
}
*/

// AcceptInvite accepts a group invite
func (p *Profile) AcceptInvite(groupID string) (err error) {
@@ -384,13 +386,20 @@ func (p *Profile) EncryptMessageToGroup(message string, groupID string) ([]byte,
return nil, nil, errors.New("group does not exist")
}

// GetCopy returns a full deep copy of the Profile struct and its members
func (p *Profile) GetCopy() *Profile {
// GetCopy returns a full deep copy of the Profile struct and its members (timeline inclusion control by arg)
func (p *Profile) GetCopy(timeline bool) *Profile {
p.lock.Lock()
defer p.lock.Unlock()

newp := new(Profile)
bytes, _ := json.Marshal(p)
json.Unmarshal(bytes, &newp)

if timeline {
for groupID := range newp.Groups {
newp.Groups[groupID].Timeline = *p.Groups[groupID].Timeline.GetCopy()
}
}

return newp
}

+ 0
- 23
model/profile_test.go View File

@@ -4,31 +4,8 @@ import (
"cwtch.im/cwtch/protocol"
"github.com/golang/protobuf/proto"
"testing"
"time"
)

func TestP2P(t *testing.T) {
sarah := GenerateNewProfile("Sarah")
alice := GenerateNewProfile("Alice")

sarah.AddContact(alice.Onion, &alice.PublicProfile)

sarah.AddMessageToContactTimeline(alice.Onion, false, "hello", time.Now())
sarah.AddMessageToContactTimeline(alice.Onion, true, "world", time.Now())

contact, _ := sarah.GetContact(alice.Onion)
for i, m := range contact.Timeline.GetMessages() {
if i == 0 && (m.Message != "hello" || m.PeerID != alice.Onion) {
t.Fatalf("Timeline is invalid: %v", m)
}

if i == 1 && (m.Message != "world" || m.PeerID != sarah.Onion) {
t.Fatalf("Timeline is invalid: %v", m)
}
t.Logf("Message: %v", m)
}
}

func TestProfileIdentity(t *testing.T) {
sarah := GenerateNewProfile("Sarah")
alice := GenerateNewProfile("Alice")

+ 1
- 7
peer/cwtch_peer.go View File

@@ -21,7 +21,6 @@ type cwtchPeer struct {
Profile *model.Profile
mutex sync.Mutex
shutdown bool
started bool

queue *event.Queue
eventBus event.Manager
@@ -59,7 +58,6 @@ type CwtchPeer interface {
GetContacts() []string
GetContact(string) *model.PublicProfile

IsStarted() bool
Listen()
StartPeersConnections()
StartGroupConnections()
@@ -287,6 +285,7 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {

// Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
func (cp *cwtchPeer) Listen() {
log.Debugf("cwtchPeer Listen sending ProtocolEngineStartListen\n")
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{}))
}

@@ -317,11 +316,6 @@ func (cp *cwtchPeer) Shutdown() {
cp.queue.Shutdown()
}

// IsStarted returns true if Listen() has successfully been run before on this connection (ever). TODO: we will need to properly unset this flag on error if we want to support resumption in the future
func (cp *cwtchPeer) IsStarted() bool {
return cp.started
}

// eventHandler process events from other subsystems
func (cp *cwtchPeer) eventHandler() {
for {

+ 6
- 2
protocol/connections/connectionsmanager.go View File

@@ -46,7 +46,7 @@ func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConn
// ManageServerConnection creates a new ServerConnection for Host with the given callback handler.
// If there is an establish connection, it is replaced with a new one, assuming this came from
// a new JoinServer from a new Group being joined. If it is still connecting to a server, the second request will be abandonded
func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) {
func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage)) {
m.lock.Lock()
defer m.lock.Unlock()

@@ -61,7 +61,6 @@ func (m *Manager) ManageServerConnection(host string, engine Engine, messageHand

newPsc := NewPeerServerConnection(engine, host)
newPsc.GroupMessageHandler = messageHandler
newPsc.CloseHandler = closedHandler
go newPsc.Run()
m.serverConnections[host] = newPsc

@@ -71,6 +70,11 @@ func (m *Manager) ManageServerConnection(host string, engine Engine, messageHand
}
}

// SetServerSynced is a helper for peerserver connections and engine to call when a Fetch is done to set the state of the connection to SYNCED
func (m *Manager) SetServerSynced(onion string) {
m.serverConnections[onion].setState(SYNCED)
}

// GetPeerPeerConnectionForOnion safely returns a given peer connection
func (m *Manager) GetPeerPeerConnectionForOnion(host string) (ppc *PeerPeerConnection) {
m.lock.Lock()

+ 1
- 7
protocol/connections/engine.go View File

@@ -239,15 +239,9 @@ func (e *engine) receiveGroupMessage(server string, gm *protocol.GroupMessage) {
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[event.Field]string{event.Ciphertext: string(gm.GetCiphertext()), event.Signature: string(gm.GetSignature())}))
}

// finishedFetch is a callback function the processes the termination of a fetch channel from a given server
func (e *engine) finishedFetch(server string) {
log.Debugf("Finished Fetch for %v\n", server)
e.eventManager.Publish(event.NewEvent(event.FinishedFetch, map[event.Field]string{event.GroupServer: server}))
}

// joinServer manages a new server connection with the given onion address
func (e *engine) joinServer(onion string) {
e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage, e.finishedFetch)
e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage)
}

// sendMessageToGroup attempts to sent the given message to the given group id.

+ 6
- 7
protocol/connections/peerserverconnection.go View File

@@ -26,7 +26,6 @@ type PeerServerConnection struct {
protocolEngine Engine

GroupMessageHandler func(string, *protocol.GroupMessage)
CloseHandler func(string)
}

// NewPeerServerConnection creates a new Peer->Server outbound connection
@@ -53,10 +52,10 @@ func (psc *PeerServerConnection) setState(state ConnectionState) {
}))
}

// WaitTilAuthenticated waits until the underlying connection is authenticated
func (psc *PeerServerConnection) WaitTilAuthenticated() {
// WaitTilSynced waits until the underlying connection is authenticated
func (psc *PeerServerConnection) WaitTilSynced() {
for {
if psc.GetState() == AUTHENTICATED {
if psc.GetState() == SYNCED {
break
}
time.Sleep(time.Second * 1)
@@ -110,8 +109,8 @@ func (psc *PeerServerConnection) Break() error {

// SendGroupMessage sends the given protocol message to the Server.
func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) error {
if psc.state != AUTHENTICATED {
return errors.New("peer is not yet connected & authenticated to server cannot send message")
if psc.state != SYNCED {
return errors.New("peer is not yet connected & authenticated & synced to server cannot send message")
}

err := psc.connection.Do(func() error {
@@ -159,5 +158,5 @@ func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) {

// HandleFetchDone calls the supplied callback for when a fetch connection is closed
func (psc *PeerServerConnection) HandleFetchDone() {
psc.CloseHandler(psc.Server)
psc.setState(SYNCED)
}

+ 2
- 12
protocol/connections/peerserverconnection_test.go View File

@@ -27,7 +27,7 @@ type TestServer struct {
}

func (ts *TestServer) HandleGroupMessage(gm *protocol.GroupMessage) {
ts.Received<-true
ts.Received <- true
}

func (ts *TestServer) HandleFetchRequest() []*protocol.GroupMessage {
@@ -68,7 +68,6 @@ func TestPeerServerConnection(t *testing.T) {
pub, priv, _ := ed25519.GenerateKey(rand.Reader)

identity := identity.InitializeV3("", &priv, &pub)

t.Logf("Launching Server....\n")
ts := new(TestServer)
ts.Init()
@@ -86,17 +85,13 @@ func TestPeerServerConnection(t *testing.T) {
psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) {
numcalls++
}
closedCalls := 0
psc.CloseHandler = func(s string) {
closedCalls++
}
state := psc.GetState()
if state != DISCONNECTED {
t.Errorf("new connections should start in disconnected state")
}
time.Sleep(time.Second * 1)
go psc.Run()
psc.WaitTilAuthenticated()
psc.WaitTilSynced()

gm := &protocol.GroupMessage{Ciphertext: []byte("hello"), Signature: []byte{}}
psc.SendGroupMessage(gm)
@@ -107,9 +102,4 @@ func TestPeerServerConnection(t *testing.T) {
if numcalls != 2 {
t.Errorf("Should have received 2 calls from fetch request, instead received %v", numcalls)
}

if closedCalls != 1 {
t.Errorf("Should have closed connection 1 time, instead of %v", closedCalls)
}

}

+ 4
- 2
protocol/connections/state.go View File

@@ -8,20 +8,22 @@ type ConnectionState int
// CONNECTING - We are in the process of attempting to connect to a given endpoint
// CONNECTED - We have connected but not yet authenticated
// AUTHENTICATED - im.ricochet.auth-hidden-server has succeeded on the connection.
// SYNCED - we have pulled all the messages for groups from the server and are ready to send
const (
DISCONNECTED ConnectionState = iota
CONNECTING
CONNECTED
AUTHENTICATED
SYNCED
FAILED
KILLED
)

var (
// ConnectionStateName allows conversion of states to their string representations
ConnectionStateName = []string{"Disconnected", "Connecting", "Connected", "Authenticated", "Failed", "Killed"}
ConnectionStateName = []string{"Disconnected", "Connecting", "Connected", "Authenticated", "Synced", "Failed", "Killed"}

// ConnectionStateType allows conversion of strings to their state type
ConnectionStateType = map[string]ConnectionState{"Disconnected": DISCONNECTED, "Connecting": CONNECTING,
"Connected": CONNECTED, "Authenticated": AUTHENTICATED, "Failed": FAILED, "Killed": KILLED}
"Connected": CONNECTED, "Authenticated": AUTHENTICATED, "Synced": SYNCED, "Failed": FAILED, "Killed": KILLED}
)

+ 54
- 14
storage/profile_store.go View File

@@ -28,7 +28,9 @@ type profileStore struct {
type ProfileStore interface {
Load() error
Shutdown()
GetProfileCopy() *model.Profile
GetProfileCopy(timeline bool) *model.Profile
GetNewPeerMessage() *event.Event
GetStatusMessages() []*event.Event
}

// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
@@ -52,6 +54,8 @@ func NewProfileWriterStore(eventManager event.Manager, directory, password strin
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.PeerStateChange, ps.queue.EventChannel)
ps.eventManager.Subscribe(event.ServerStateChange, ps.queue.EventChannel)

return ps
}
@@ -67,7 +71,7 @@ func ReadProfile(directory, password string) (*model.Profile, error) {
return nil, err
}

profile := ps.GetProfileCopy()
profile := ps.GetProfileCopy(true)

return profile, nil
}
@@ -78,6 +82,37 @@ func NewProfile(name string) *model.Profile {
return profile
}

// GetNewPeerMessage is for AppService to call on Reload events, to reseed the AppClient with the loaded peers
func (ps *profileStore) GetNewPeerMessage() *event.Event {
message := event.NewEventList(event.NewPeer, event.Identity, ps.profile.LocalID, event.Password, ps.password, event.Status, "running")
return &message
}

func (ps *profileStore) GetStatusMessages() []*event.Event {
messages := []*event.Event{}
for _, contact := range ps.profile.Contacts {
message := event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(contact.Onion),
event.ConnectionState: contact.State,
})
messages = append(messages, &message)
}

doneServers := make(map[string]bool)
for _, group := range ps.profile.Groups {
if _, exists := doneServers[group.GroupServer]; !exists {
message := event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: string(group.GroupServer),
event.ConnectionState: group.State,
})
messages = append(messages, &message)
doneServers[group.GroupServer] = true
}
}

return messages
}

func (ps *profileStore) save() error {
if ps.writer {
bytes, _ := json.Marshal(ps.profile)
@@ -97,15 +132,9 @@ func (ps *profileStore) Load() error {
if err == nil {
ps.profile = cp

for _, profile := range cp.Contacts {
ss := NewStreamStore(ps.directory, profile.LocalID, ps.password)
profile.Timeline.SetMessages(ss.Read())
ps.streamStores[profile.Onion] = ss
}

for _, group := range cp.Groups {
for gid, group := range cp.Groups {
ss := NewStreamStore(ps.directory, group.LocalID, ps.password)
group.Timeline.SetMessages(ss.Read())
cp.Groups[gid].Timeline.SetMessages(ss.Read())
ps.streamStores[group.GroupID] = ss
}
}
@@ -113,8 +142,8 @@ func (ps *profileStore) Load() error {
return err
}

func (ps *profileStore) GetProfileCopy() *model.Profile {
return ps.profile.GetCopy()
func (ps *profileStore) GetProfileCopy(timeline bool) *model.Profile {
return ps.profile.GetCopy(timeline)
}

func (ps *profileStore) eventHandler() {
@@ -131,10 +160,11 @@ func (ps *profileStore) eventHandler() {
var pp *model.PublicProfile
json.Unmarshal([]byte(ev.Data[event.Data]), &pp)
ps.profile.AddContact(ev.Data[event.RemotePeer], pp)
ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
// TODO: configure - allow peers to be configured to turn on limited storage
/*ss := NewStreamStore(ps.directory, pp.LocalID, ps.password)
pp.Timeline.SetMessages(ss.Read())
ps.streamStores[pp.Onion] = ss
ps.save()
ps.save()*/
case event.GroupCreated:
var group *model.Group
json.Unmarshal([]byte(ev.Data[event.Data]), &group)
@@ -193,6 +223,16 @@ func (ps *profileStore) eventHandler() {
} else {
log.Errorf("error storing new group message: %v stream store does not exist", ev)
}
case event.PeerStateChange:
if _, exists := ps.profile.Contacts[ev.Data[event.RemotePeer]]; exists {
ps.profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
}
case event.ServerStateChange:
for _, group := range ps.profile.Groups {
if group.GroupServer == ev.Data[event.GroupServer] {
group.State = ev.Data[event.ConnectionState]
}
}
default:
return
}

+ 4
- 4
storage/profile_store_test.go View File

@@ -35,14 +35,14 @@ func TestProfileStoreWriteRead(t *testing.T) {
packet := protocol.CwtchPeerPacket{}
proto.Unmarshal(invite, &packet)
invite, _ = proto.Marshal(packet.GetGroupChatInvite())
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy().Onion, event.GroupInvite: string(invite)}))
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
time.Sleep(1 * time.Second)

eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
event.GroupID: groupid,
event.TimestampSent: time.Now().Format(time.RFC3339Nano),
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: ps1.GetProfileCopy().Onion,
event.RemotePeer: ps1.GetProfileCopy(true).Onion,
event.Data: testMessage,
}))
time.Sleep(1 * time.Second)
@@ -55,7 +55,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
t.Errorf("Error createing profileStore: %v\n", err)
}

profile = ps2.GetProfileCopy()
profile = ps2.GetProfileCopy(true)
if profile.Name != testProfileName {
t.Errorf("Profile name from loaded profile incorrect. Expected: '%v' Actual: '%v'\n", testProfileName, profile.Name)
}
@@ -65,7 +65,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
t.Errorf("Profile attribute '%v' inccorect. Expected: '%v' Actual: '%v'\n", testKey, testVal, v)
}

group2 := ps2.GetProfileCopy().Groups[groupid]
group2 := ps2.GetProfileCopy(true).Groups[groupid]
if group2 == nil {
t.Errorf("Group not loaded\n")
}

+ 4
- 7
storage/stream_store.go View File

@@ -23,6 +23,7 @@ type streamStore struct {
storeDirectory string
filenameBase string

// Buffer is used just for current file to write to
messages []model.Message
bufferByteCount int

@@ -100,7 +101,7 @@ func (ss *streamStore) rotateFileStore() {
}
}

// FetchMessages returns all messages from the backing file.
// Read returns all messages from the backing file (not the buffer, which is jsut for writing to the current file)
func (ss *streamStore) Read() (messages []model.Message) {
ss.lock.Lock()
defer ss.lock.Unlock()
@@ -116,12 +117,8 @@ func (ss *streamStore) Read() (messages []model.Message) {
}

msgs := []model.Message{}
err = json.Unmarshal([]byte(bytes), &msgs)
if err == nil {
resp = append(resp, msgs...)
} else {
log.Debugf("Failed to unmarshal messages: %v", err)
}
json.Unmarshal([]byte(bytes), &msgs)
resp = append(resp, msgs...)
}

return resp

+ 18
- 8
testing/cwtch_peer_server_integration_test.go View File

@@ -66,7 +66,7 @@ func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID strin
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, groupID)
}
if state != connections.AUTHENTICATED {
if state != connections.SYNCED {
fmt.Printf("peer %v waiting connect to group %v, currently: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
@@ -110,6 +110,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
log.ExcludeFromPattern("connection/connection")
log.ExcludeFromPattern("outbound/3dhauthchannel")
log.ExcludeFromPattern("event/eventmanager")
log.ExcludeFromPattern("pipeBridge")
acn, err := connectivity.StartTor(".", "")
if err != nil {
t.Fatalf("Could not start Tor: %v", err)
@@ -142,9 +143,10 @@ func TestCwtchPeerIntegration(t *testing.T) {

app := app2.NewApp(acn, "./storage")

bridge1, bridge2 := bridge.MakeGoChanBridge()
appClient := app2.NewAppClient("./storage", bridge1)
appService := app2.NewAppService(acn, "./storage", bridge2)
bridgeClient := bridge.NewPipeBridgeClient("./clientPipe", "./servicePipe")
bridgeService := bridge.NewPipeBridgeService("./servicePipe", "./clientPipe")
appClient := app2.NewAppClient("./storage", bridgeClient)
appService := app2.NewAppService(acn, "./storage", bridgeService)

numGoRoutinesPostAppStart := runtime.NumGoroutine()

@@ -168,8 +170,6 @@ func TestCwtchPeerIntegration(t *testing.T) {
carol := utils.WaitGetPeer(appClient, "carol")
fmt.Println("Carol created:", carol.GetProfile().Onion)

//fmt.Println("Carol created:", carol.GetProfile().Onion)

app.LaunchPeers()
appClient.LaunchPeers()

@@ -384,12 +384,22 @@ func TestCwtchPeerIntegration(t *testing.T) {
numGoRoutinesPostCarol := runtime.NumGoroutine()

fmt.Println("Shutting down apps...")
fmt.Printf("app Shutdown: %v\n", runtime.NumGoroutine())
app.Shutdown()
fmt.Printf("appClientShutdown: %v\n", runtime.NumGoroutine())
appClient.Shutdown()
fmt.Printf("appServiceShutdown: %v\n", runtime.NumGoroutine())
appService.Shutdown()

bridge1.Shutdown()
bridge2.Shutdown()
fmt.Printf("bridgeClientShutdown: %v\n", runtime.NumGoroutine())
bridgeClient.Shutdown()
time.Sleep(2 * time.Second)

fmt.Printf("brideServiceShutdown: %v\n", runtime.NumGoroutine())
bridgeService.Shutdown()
time.Sleep(2 * time.Second)

fmt.Printf("Done shutdown: %v\n", runtime.NumGoroutine())
numGoRoutinesPostAppShutdown := runtime.NumGoroutine()

fmt.Println("Shutting down ACN...")

Loading…
Cancel
Save