Refactor: engine and peer decoupled, engine and eventbus now per peer

and stored top level in app. Storage has read only mode. Peer and group
state now event based and stored in profiles.
This commit is contained in:
Dan Ballard 2019-05-15 13:12:11 -07:00
parent 8be10930e7
commit 0c4bbe9ad1
17 changed files with 264 additions and 197 deletions

View File

@ -3,8 +3,10 @@ package app
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/storage"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
@ -16,12 +18,13 @@ import (
type application struct {
peers map[string]peer.CwtchPeer
storage map[string]storage.ProfileStore
engines map[string]connections.Engine
eventBuses map[string]*event.Manager
acn connectivity.ACN
directory string
mutex sync.Mutex
primaryonion string
storage map[string]storage.ProfileStore
eventBus *event.Manager
}
// Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
@ -32,20 +35,18 @@ type Application interface {
PrimaryIdentity() peer.CwtchPeer
GetPeer(onion string) peer.CwtchPeer
ListPeers() map[string]string
GetEventBus(onion string) *event.Manager
LaunchPeers()
EventBus() *event.Manager
ShutdownPeer(string)
Shutdown()
}
// NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string) Application {
log.Debugf("NewApp(%v)\n", appDirectory)
app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), directory: appDirectory, acn: acn}
app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), eventBuses: make(map[string]*event.Manager), directory: appDirectory, acn: acn}
os.Mkdir(path.Join(app.directory, "profiles"), 0700)
app.eventBus = new(event.Manager)
app.eventBus.Initialize()
return app
}
@ -53,19 +54,31 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application {
func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer, error) {
log.Debugf("CreatePeer(%v)\n", name)
eventBus := new(event.Manager)
eventBus.Initialize()
profile := storage.NewProfile(name)
profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", profile.LocalID), password, profile)
profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", profile.LocalID), password, profile)
pc := profileStore.GetProfileCopy()
p := peer.FromProfile(pc)
p.Init(app.acn, app.eventBus)
_, exists := app.peers[p.GetProfile().Onion]
if exists {
p.Shutdown()
profileStore.Shutdown()
eventBus.Shutdown()
return nil, fmt.Errorf("Error: profile for onion %v already exists", p.GetProfile().Onion)
}
p.Init(app.acn, eventBus)
blockedPeers := profile.BlockedPeers()
// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers)
app.mutex.Lock()
app.peers[p.GetProfile().Onion] = p
app.storage[p.GetProfile().Onion] = profileStore
app.engines[p.GetProfile().Onion] = engine
app.eventBuses[p.GetProfile().Onion] = eventBus
app.mutex.Unlock()
return p, nil
@ -79,8 +92,10 @@ func (app *application) LoadProfiles(password string) error {
for _, file := range files {
// TODO: Per profile eventBus
profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", file.Name()), password, nil)
eventBus := new(event.Manager)
eventBus.Initialize()
profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", file.Name()), password, nil)
err = profileStore.Load()
if err != nil {
continue
@ -91,16 +106,23 @@ func (app *application) LoadProfiles(password string) error {
_, exists := app.peers[profile.Onion]
if exists {
profileStore.Shutdown()
eventBus.Shutdown()
log.Errorf("profile for onion %v already exists", profile.Onion)
continue
}
peer := peer.FromProfile(profile)
peer.Init(app.acn, app.eventBus)
peer.Init(app.acn, eventBus)
blockedPeers := profile.BlockedPeers()
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers)
app.mutex.Lock()
app.peers[profile.Onion] = peer
app.storage[profile.Onion] = profileStore
app.engines[profile.Onion] = engine
app.eventBuses[profile.Onion] = eventBus
if app.primaryonion == "" {
app.primaryonion = profile.Onion
}
@ -109,10 +131,13 @@ func (app *application) LoadProfiles(password string) error {
return nil
}
// LaunchPeers starts each peer Listening and connecting to peers and groups
func (app *application) LaunchPeers() {
for _, p := range app.peers {
if !p.IsStarted() {
p.Listen()
p.StartPeersConnections()
p.StartGroupConnections()
}
}
}
@ -139,20 +164,34 @@ func (app *application) GetPeer(onion string) peer.CwtchPeer {
return nil
}
/*
// GetTorStatus returns tor control port bootstrap-phase status info in a map
func (app *application) GetTorStatus() (map[string]string, error) {
return app.torManager.GetStatus()
}*/
// GetEventBus returns a cwtchPeer's event bus
func (app *application) GetEventBus(onion string) *event.Manager {
if manager, ok := app.eventBuses[onion]; ok {
return manager
}
return nil
}
// Fetch the app's event manager
func (app *application) EventBus() *event.Manager {
return app.eventBus
// ShutdownPeer shuts down a peer and removes it from the app's management
func (app *application) ShutdownPeer(onion string) {
app.mutex.Lock()
defer app.mutex.Unlock()
app.eventBuses[onion].Shutdown()
delete(app.eventBuses, onion)
app.peers[onion].Shutdown()
delete(app.peers, onion)
app.engines[onion].Shutdown()
delete(app.engines, onion)
app.storage[onion].Shutdown()
delete(app.storage, onion)
}
// Shutdown shutsdown all peers of an app and then the tormanager
func (app *application) Shutdown() {
for _, peer := range app.peers {
for id, peer := range app.peers {
peer.Shutdown()
app.engines[id].Shutdown()
app.storage[id].Shutdown()
app.eventBuses[id].Shutdown()
}
}

View File

@ -11,16 +11,16 @@ import (
"time"
)
func waitForPeerServerConnection(peer peer.CwtchPeer, server string) error {
func waitForPeerGroupConnection(peer peer.CwtchPeer, groupID string) error {
for {
servers := peer.GetServers()
state, ok := servers[server]
_, ok := peer.GetProfile().Groups[groupID]
if ok {
state := peer.GetGroupState(groupID)
if state == connections.FAILED {
return errors.New("Connection to server " + server + " failed!")
return errors.New("Connection to group " + groupID + " failed!")
}
if state != connections.AUTHENTICATED {
fmt.Printf("peer %v waiting to authenticate with server %v, current state: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state])
fmt.Printf("peer %v waiting to authenticate with group %v 's server, current state: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state])
time.Sleep(time.Second * 10)
continue
}
@ -57,7 +57,7 @@ func main() {
os.Exit(1)
}
err = waitForPeerServerConnection(botPeer, serverAddr)
err = waitForPeerGroupConnection(botPeer, groupID)
if err != nil {
fmt.Printf("Could not connect to server %v: %v\n", serverAddr, err)
os.Exit(1)

View File

@ -6,7 +6,6 @@ import (
"bytes"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol/connections"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
@ -45,8 +44,8 @@ var suggestionsSelectedProfile = []prompt.Suggest{
{Text: "/invite", Description: "invite a new contact"},
{Text: "/invite-to-group", Description: "invite an existing contact to join an existing group"},
{Text: "/accept-invite", Description: "accept the invite of a group"},
{Text: "/list-servers", Description: "retrieve a list of servers and their connection status"},
{Text: "/list-peers", Description: "retrieve a list of peers and their connection status"},
/*{Text: "/list-servers", Description: "retrieve a list of servers and their connection status"},
{Text: "/list-peers", Description: "retrieve a list of peers and their connection status"},*/
{Text: "/export-group", Description: "export a group invite: prints as a string"},
{Text: "/trust", Description: "trust a peer"},
{Text: "/block", Description: "block a peer - you will no longer see messages or connect to this peer"},
@ -55,13 +54,13 @@ var suggestionsSelectedProfile = []prompt.Suggest{
var suggestions = suggestionsBase
var usages = map[string]string{
"/new-profile": "/new-profile [name]",
"/load-profiles": "/load-profiles",
"/list-profiles": "",
"/select-profile": "/select-profile [onion]",
"/quit": "",
"/list-servers": "",
"/list-peers": "",
"/new-profile": "/new-profile [name]",
"/load-profiles": "/load-profiles",
"/list-profiles": "",
"/select-profile": "/select-profile [onion]",
"/quit": "",
/* "/list-servers": "",
"/list-peers": "",*/
"/list-contacts": "",
"/list-groups": "",
"/select-group": "/select-group [groupid]",
@ -424,7 +423,7 @@ func main() {
} else {
fmt.Printf("Error inviting peer, usage: %s\n", usages[commands[0]])
}
case "/list-peers":
/*case "/list-peers":
peers := peer.GetPeers()
for p, s := range peers {
fmt.Printf("Name: %v Status: %v\n", p, connections.ConnectionStateName[s])
@ -433,7 +432,7 @@ func main() {
servers := peer.GetServers()
for s, st := range servers {
fmt.Printf("Name: %v Status: %v\n", s, connections.ConnectionStateName[st])
}
}*/
case "/list-contacts":
contacts := peer.GetContacts()
for _, onion := range contacts {

View File

@ -84,6 +84,14 @@ const (
// Key [eg "nick"]
// Data [eg "open privacy board"]
SetGroupAttribute = Type("SetGroupAttribute")
// RemotePeer
// ConnectionState
PeerStateChange = Type("PeerStateChange")
// GroupServer
// ConnectionState
ServerStateChange = Type("GroupStateChange")
)
// Field defines common event attributes
@ -106,6 +114,8 @@ const (
ProfileName = Field("ProfileName")
ConnectionState = Field("ConnectionState")
Key = Field("Key")
Data = Field("Data")

View File

@ -24,6 +24,7 @@ type Manager struct {
events chan Event
mapMutex sync.Mutex
internal chan bool
closed bool
}
// Initialize sets up the Manager.
@ -31,6 +32,7 @@ func (em *Manager) Initialize() {
em.subscribers = make(map[Type][]chan Event)
em.events = make(chan Event)
em.internal = make(chan bool)
em.closed = false
go em.eventBus()
}
@ -44,7 +46,7 @@ func (em *Manager) Subscribe(eventType Type, eventChannel chan Event) {
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
func (em *Manager) Publish(event Event) {
if event.EventType != "" {
if event.EventType != "" && em.closed != true {
em.events <- event
}
}
@ -83,6 +85,7 @@ func (em *Manager) eventBus() {
// Shutdown triggers, and waits for, the internal eventBus goroutine to finish
func (em *Manager) Shutdown() {
em.events <- Event{}
em.closed = true
// wait for eventBus to finish
<-em.internal
close(em.events)

View File

@ -29,6 +29,7 @@ type Group struct {
Attributes map[string]string
lock sync.Mutex
LocalID string
State string `json:"-"`
unacknowledgedMessages []Message
}

View File

@ -27,6 +27,7 @@ type PublicProfile struct {
Attributes map[string]string
Timeline Timeline
LocalID string // used by storage engine
State string `json:"-"`
lock sync.Mutex
}
@ -186,6 +187,18 @@ func (p *Profile) BlockPeer(onion string) (err error) {
return
}
// BlockedPeers calculates a list of Peers who have been Blocked.
func (p *Profile) BlockedPeers() []string {
blockedPeers := []string{}
for _, contact := range p.GetContacts() {
c, _ := p.GetContact(contact)
if c.Blocked {
blockedPeers = append(blockedPeers, c.Onion)
}
}
return blockedPeers
}
// TrustPeer sets a contact to trusted
func (p *Profile) TrustPeer(onion string) (err error) {
p.lock.Lock()

View File

@ -5,11 +5,11 @@ import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections"
"encoding/base32"
"encoding/base64"
"encoding/json"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"github.com/golang/protobuf/proto"
"strings"
@ -24,7 +24,6 @@ type cwtchPeer struct {
shutdown bool
started bool
engine connections.Engine
queue *event.Queue
eventBus *event.Manager
}
@ -47,9 +46,7 @@ type CwtchPeer interface {
SendMessageToGroupTracked(string, string) (string, error)
GetProfile() *model.Profile
GetPeers() map[string]connections.ConnectionState
GetServers() map[string]connections.ConnectionState
GetPeerState(string) connections.ConnectionState
StartGroup(string) (string, []byte, error)
@ -57,13 +54,16 @@ type CwtchPeer interface {
ExportGroup(string) (string, error)
GetGroup(string) *model.Group
GetGroupState(string) connections.ConnectionState
GetGroups() []string
AddContact(nick, onion string, publickey []byte, trusted bool)
AddContact(nick, onion string, trusted bool)
GetContacts() []string
GetContact(string) *model.PublicProfile
IsStarted() bool
Listen()
StartPeersConnections()
StartGroupConnections()
Shutdown()
}
@ -90,19 +90,8 @@ func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus *event.Manager) {
cp.eventBus = eventBus
cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue.EventChannel)
cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue.EventChannel)
// Calculate a list of Peers who have been Blocked.
blockedPeers := []string{}
for _, contact := range cp.Profile.GetContacts() {
c, _ := cp.Profile.GetContact(contact)
if c.Blocked {
blockedPeers = append(blockedPeers, c.Onion)
}
}
// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
identity := identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey)
cp.engine = connections.NewProtocolEngine(identity, cp.Profile.Ed25519PrivateKey, acn, eventBus, blockedPeers)
cp.eventBus.Subscribe(event.ServerStateChange, cp.queue.EventChannel)
cp.eventBus.Subscribe(event.PeerStateChange, cp.queue.EventChannel)
}
// ImportGroup intializes a group from an imported source rather than a peer invite
@ -175,8 +164,9 @@ func (cp *cwtchPeer) GetGroup(groupID string) *model.Group {
return cp.Profile.GetGroupByGroupID(groupID)
}
func (cp *cwtchPeer) AddContact(nick, onion string, publickey []byte, trusted bool) {
pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: publickey, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"nick": nick}}
func (cp *cwtchPeer) AddContact(nick, onion string, trusted bool) {
decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion))
pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: decodedPub, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"nick": nick}}
cp.Profile.AddContact(onion, pp)
pd, _ := json.Marshal(pp)
cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{
@ -201,6 +191,14 @@ func (cp *cwtchPeer) GetProfile() *model.Profile {
return cp.Profile
}
func (cp *cwtchPeer) GetPeerState(onion string) connections.ConnectionState {
return connections.ConnectionStateType[cp.Profile.Contacts[onion].State]
}
func (cp *cwtchPeer) GetGroupState(groupid string) connections.ConnectionState {
return connections.ConnectionStateType[cp.Profile.Groups[groupid].State]
}
// PeerWithOnion is the entry point for cwtchPeer relationships
func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
@ -255,16 +253,6 @@ func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
return event.EventID
}
// GetPeers returns a list of peer connections.
func (cp *cwtchPeer) GetPeers() map[string]connections.ConnectionState {
return cp.engine.GetPeers()
}
// GetServers returns a list of server connections
func (cp *cwtchPeer) GetServers() map[string]connections.ConnectionState {
return cp.engine.GetServers()
}
// TrustPeer sets an existing peer relationship to trusted
func (cp *cwtchPeer) TrustPeer(peer string) error {
err := cp.Profile.TrustPeer(peer)
@ -298,14 +286,34 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
cp.Profile.RejectInvite(groupID)
}
// Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
func (cp *cwtchPeer) Listen() {
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{}))
}
// StartGroupConnections attempts to connect to all group servers (thus initiating reconnect attempts in the conectionsmanager)
func (cp *cwtchPeer) StartPeersConnections() {
for _, contact := range cp.GetContacts() {
cp.PeerWithOnion(contact)
}
}
// StartPeerConnections attempts to connect to all peers (thus initiating reconnect attempts in the conectionsmanager)
func (cp *cwtchPeer) StartGroupConnections() {
joinedServers := map[string]bool{}
for _, groupID := range cp.GetGroups() {
// Only send a join server packet if we haven't joined this server yet...
group := cp.GetGroup(groupID)
if joined := joinedServers[groupID]; group.Accepted && !joined {
cp.JoinServer(group.GroupServer)
joinedServers[group.GroupServer] = true
}
}
}
// Shutdown kills all connections and cleans up all goroutines for the peer
func (cp *cwtchPeer) Shutdown() {
cp.shutdown = true
cp.engine.Shutdown()
cp.queue.Shutdown()
}
@ -328,6 +336,16 @@ func (cp *cwtchPeer) eventHandler() {
var groupInvite protocol.GroupChatInvite
proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &groupInvite)
cp.Profile.ProcessInvite(&groupInvite, ev.Data[event.RemotePeer])
case event.PeerStateChange:
if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists {
cp.Profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
}
case event.ServerStateChange:
for _, group := range cp.Profile.Groups {
if group.GroupServer == ev.Data[event.GroupServer] {
group.State = ev.Data[event.ConnectionState]
}
}
default:
if ev.EventType != "" {
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)

View File

@ -42,12 +42,12 @@ func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConn
}
// ManageServerConnection creates a new ServerConnection for Host with the given callback handler.
func (m *Manager) ManageServerConnection(host string, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) {
func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) {
m.lock.Lock()
psc, exists := m.serverConnections[host]
newPsc := NewPeerServerConnection(m.acn, host)
newPsc := NewPeerServerConnection(engine, host)
newPsc.GroupMessageHandler = messageHandler
newPsc.CloseHandler = closedHandler
go newPsc.Run()
@ -60,28 +60,6 @@ func (m *Manager) ManageServerConnection(host string, messageHandler func(string
m.lock.Unlock()
}
// GetPeers returns a map of all peer connections with their state
func (m *Manager) GetPeers() map[string]ConnectionState {
rm := make(map[string]ConnectionState)
m.lock.Lock()
for onion, ppc := range m.peerConnections {
rm[onion] = ppc.GetState()
}
m.lock.Unlock()
return rm
}
// GetServers returns a map of all server connections with their state.
func (m *Manager) GetServers() map[string]ConnectionState {
rm := make(map[string]ConnectionState)
m.lock.Lock()
for onion, psc := range m.serverConnections {
rm[onion] = psc.GetState()
}
m.lock.Unlock()
return rm
}
// GetPeerPeerConnectionForOnion safely returns a given peer connection
func (m *Manager) GetPeerPeerConnectionForOnion(host string) (ppc *PeerPeerConnection) {
m.lock.Lock()
@ -103,7 +81,6 @@ func (m *Manager) AttemptReconnections() {
maxTimeout := time.Minute * 5
// nearly instant first run, next few runs will prolly be too quick to have any FAILED and will gracefully slow to MAX after that
timeout := time.Millisecond * 500
for {
select {
case <-time.After(timeout):

View File

@ -45,6 +45,7 @@ type engine struct {
type Engine interface {
Identity() identity.Identity
ACN() connectivity.ACN
EventManager() *event.Manager
GetPeerHandler(string) *CwtchPeerHandler
ContactRequest(string, string) string
@ -52,9 +53,6 @@ type Engine interface {
LookupContact(string, rsa.PublicKey) (bool, bool)
LookupContactV3(string, ed25519.PublicKey) (bool, bool)
GetPeers() map[string]ConnectionState
GetServers() map[string]ConnectionState
Shutdown()
}
@ -94,6 +92,10 @@ func (e *engine) Identity() identity.Identity {
return e.identity
}
func (e *engine) EventManager() *event.Manager {
return e.eventManager
}
// eventHandler process events from other subsystems
func (e *engine) eventHandler() {
for {
@ -244,7 +246,7 @@ func (e *engine) finishedFetch(server string) {
// joinServer manages a new server connection with the given onion address
func (e *engine) joinServer(onion string) {
e.connectionsManager.ManageServerConnection(onion, e.receiveGroupMessage, e.finishedFetch)
e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage, e.finishedFetch)
}
// sendMessageToGroup attempts to sent the given message to the given group id.
@ -264,16 +266,6 @@ func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) {
}
}
// GetPeers returns a list of peer connections.
func (e *engine) GetPeers() map[string]ConnectionState {
return e.connectionsManager.GetPeers()
}
// GetServers returns a list of server connections
func (e *engine) GetServers() map[string]ConnectionState {
return e.connectionsManager.GetServers()
}
// CwtchPeerInstance encapsulates incoming peer connections
type CwtchPeerInstance struct {
rai *application.Instance

View File

@ -1,6 +1,7 @@
package connections
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections/peer"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go"
@ -28,6 +29,14 @@ func NewPeerPeerConnection(peerhostname string, protocolEngine Engine) *PeerPeer
return ppc
}
func (ppc *PeerPeerConnection) setState(state ConnectionState) {
ppc.state = state
ppc.protocolEngine.EventManager().Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(ppc.PeerHostname),
event.ConnectionState: ConnectionStateName[state],
}))
}
// GetState returns the current connection state
func (ppc *PeerPeerConnection) GetState() ConnectionState {
return ppc.state
@ -78,14 +87,14 @@ func (ppc *PeerPeerConnection) WaitTilAuthenticated() {
// Run manages the setup and teardown of a peer->peer connection
func (ppc *PeerPeerConnection) Run() error {
ppc.state = CONNECTING
ppc.setState(CONNECTING)
rc, err := goricochet.Open(ppc.protocolEngine.ACN(), ppc.PeerHostname)
if err == nil {
ppc.connection = rc
ppc.state = CONNECTED
ppc.setState(CONNECTED)
_, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity())
if err == nil {
ppc.state = AUTHENTICATED
ppc.setState(AUTHENTICATED)
go func() {
ppc.connection.Do(func() error {
ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)})
@ -101,13 +110,13 @@ func (ppc *PeerPeerConnection) Run() error {
ppc.connection.Process(ppc)
}
}
ppc.state = FAILED
ppc.setState(FAILED)
return err
}
// Close closes the connection
func (ppc *PeerPeerConnection) Close() {
ppc.state = KILLED
ppc.setState(KILLED)
if ppc.connection != nil {
ppc.connection.Close()
}

View File

@ -2,6 +2,7 @@ package connections
import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections/fetch"
"cwtch.im/cwtch/protocol/connections/listen"
@ -10,7 +11,6 @@ import (
"git.openprivacy.ca/openprivacy/libricochet-go"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/connection"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"golang.org/x/crypto/ed25519"
@ -20,19 +20,19 @@ import (
// PeerServerConnection encapsulates a single Peer->Server connection
type PeerServerConnection struct {
connection.AutoConnectionHandler
Server string
state ConnectionState
connection *connection.Connection
acn connectivity.ACN
Server string
state ConnectionState
connection *connection.Connection
protocolEngine Engine
GroupMessageHandler func(string, *protocol.GroupMessage)
CloseHandler func(string)
}
// NewPeerServerConnection creates a new Peer->Server outbound connection
func NewPeerServerConnection(acn connectivity.ACN, serverhostname string) *PeerServerConnection {
func NewPeerServerConnection(engine Engine, serverhostname string) *PeerServerConnection {
psc := new(PeerServerConnection)
psc.acn = acn
psc.protocolEngine = engine
psc.Server = serverhostname
psc.Init()
return psc
@ -43,6 +43,14 @@ func (psc *PeerServerConnection) GetState() ConnectionState {
return psc.state
}
func (psc *PeerServerConnection) setState(state ConnectionState) {
psc.state = state
psc.protocolEngine.EventManager().Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: string(psc.Server),
event.ConnectionState: ConnectionStateName[state],
}))
}
// WaitTilAuthenticated waits until the underlying connection is authenticated
func (psc *PeerServerConnection) WaitTilAuthenticated() {
for {
@ -56,15 +64,15 @@ func (psc *PeerServerConnection) WaitTilAuthenticated() {
// Run manages the setup and teardown of a peer server connection
func (psc *PeerServerConnection) Run() error {
log.Infof("Connecting to %v", psc.Server)
rc, err := goricochet.Open(psc.acn, psc.Server)
rc, err := goricochet.Open(psc.protocolEngine.ACN(), psc.Server)
if err == nil {
psc.connection = rc
psc.state = CONNECTED
psc.setState(CONNECTED)
pub, priv, err := ed25519.GenerateKey(rand.Reader)
if err == nil {
_, err := connection.HandleOutboundConnection(psc.connection).ProcessAuthAsV3Client(identity.InitializeV3("cwtchpeer", &priv, &pub))
if err == nil {
psc.state = AUTHENTICATED
psc.setState(AUTHENTICATED)
go func() {
psc.connection.Do(func() error {
@ -81,7 +89,7 @@ func (psc *PeerServerConnection) Run() error {
}
}
}
psc.state = FAILED
psc.setState(FAILED)
return err
}
@ -128,7 +136,7 @@ func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) err
// Close shuts down the connection (freeing the handler goroutines)
func (psc *PeerServerConnection) Close() {
psc.state = KILLED
psc.setState(KILLED)
if psc.connection != nil {
psc.connection.Close()
}

View File

@ -2,6 +2,7 @@ package connections
import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/server/fetch"
"cwtch.im/cwtch/server/send"
@ -75,7 +76,11 @@ func TestPeerServerConnection(t *testing.T) {
<-listenChan
onionAddr := identity.Hostname()
psc := NewPeerServerConnection(connectivity.LocalProvider(), "127.0.0.1:5451|"+onionAddr)
manager := &event.Manager{}
manager.Initialize()
engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil)
psc := NewPeerServerConnection(engine, "127.0.0.1:5451|"+onionAddr)
numcalls := 0
psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) {
numcalls++

View File

@ -7,7 +7,7 @@ type ConnectionState int
// DISCONNECTED - No existing connection has been made, or all attempts have failed
// CONNECTING - We are in the process of attempting to connect to a given endpoint
// CONNECTED - We have connected but not yet authenticated
// AUTHENTICATED - im.ricochet.auth-hidden-server has succeeded on thec onnection.
// AUTHENTICATED - im.ricochet.auth-hidden-server has succeeded on the connection.
const (
DISCONNECTED ConnectionState = iota
CONNECTING
@ -20,4 +20,8 @@ const (
var (
// ConnectionStateName allows conversion of states to their string representations
ConnectionStateName = []string{"Disconnected", "Connecting", "Connected", "Authenticated", "Failed", "Killed"}
// ConnectionStateType allows conversion of strings to their state type
ConnectionStateType = map[string]ConnectionState{"Disconnected": DISCONNECTED, "Connecting": CONNECTING,
"Connected": CONNECTED, "Authenticated": AUTHENTICATED, "Failed": FAILED, "Killed": KILLED}
)

View File

@ -21,6 +21,7 @@ type profileStore struct {
profile *model.Profile
eventManager *event.Manager
queue *event.Queue
writer bool
}
// ProfileStore is an interface to managing the storage of Cwtch Profiles
@ -30,11 +31,11 @@ type ProfileStore interface {
GetProfileCopy() *model.Profile
}
// NewProfileStore returns a profile store backed by a filestore listening for events and saving them
// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func NewProfileStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore {
func NewProfileWriterStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore {
os.Mkdir(directory, 0700)
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}}
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
ps.queue = event.NewEventQueue(100)
if profile != nil {
ps.save()
@ -55,6 +56,15 @@ func NewProfileStore(eventManager *event.Manager, directory, password string, pr
return ps
}
// NewProfileReaderStore returns a profile store backed by a filestore
// directory should be $appDir/profiles/$rand
func NewProfileReaderStore(directory, password string, profile *model.Profile) ProfileStore {
os.Mkdir(directory, 0700)
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}
return ps
}
// NewProfile creates a new profile for use in the profile store.
func NewProfile(name string) *model.Profile {
profile := model.GenerateNewProfile(name)
@ -62,8 +72,11 @@ func NewProfile(name string) *model.Profile {
}
func (ps *profileStore) save() error {
bytes, _ := json.Marshal(ps.profile)
return ps.fs.Save(bytes)
if ps.writer {
bytes, _ := json.Marshal(ps.profile)
return ps.fs.Save(bytes)
}
return nil
}
// Load instantiates a cwtchPeer from the file store
@ -181,5 +194,7 @@ func (ps *profileStore) eventHandler() {
}
func (ps *profileStore) Shutdown() {
ps.queue.Shutdown()
if ps.queue != nil {
ps.queue.Shutdown()
}
}

View File

@ -20,7 +20,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
eventBus := new(event.Manager)
eventBus.Initialize()
profile := NewProfile(testProfileName)
ps1 := NewProfileStore(eventBus, testingDir, password, profile)
ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile)
eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal}))
time.Sleep(1 * time.Second)
@ -50,7 +50,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
ps1.Shutdown()
ps2 := NewProfileStore(eventBus, testingDir, password, nil)
ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil)
err = ps2.Load()
if err != nil {
t.Errorf("Error createing profileStore: %v\n", err)

View File

@ -1,7 +1,7 @@
package testing
import (
"cwtch.im/cwtch/event"
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
@ -55,16 +55,17 @@ func serverCheck(t *testing.T, serverAddr string) bool {
return true
}
func waitForPeerServerConnection(t *testing.T, peer peer.CwtchPeer, server string) {
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) {
for {
servers := peer.GetServers()
state, ok := servers[server]
_, ok := peer.GetProfile().Groups[groupID]
if ok {
state := peer.GetGroupState(groupID)
//log.Infof("Waiting for Peer %v to join group %v - state: %v\n", peer.GetProfile().Name, groupID, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, server)
t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, groupID)
}
if state != connections.AUTHENTICATED {
fmt.Printf("peer %v waiting connect to server %v, currently: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state])
fmt.Printf("peer %v waiting connect to group %v, currently: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
@ -78,9 +79,11 @@ func waitForPeerServerConnection(t *testing.T, peer peer.CwtchPeer, server strin
func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) {
for {
peers := peera.GetPeers()
state, ok := peers[peerb.GetProfile().Onion]
//peers := peera.GetPeers()
_, ok := peera.GetProfile().Contacts[peerb.GetProfile().Onion]
if ok {
state := peera.GetPeerState(peerb.GetProfile().Onion)
//log.Infof("Waiting for Peer %v to peer with peer: %v - state: %v\n", peera.GetProfile().Name, peerb.GetProfile().Name, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peera.GetProfile().Onion, peerb.GetProfile().Onion)
}
@ -132,35 +135,33 @@ func TestCwtchPeerIntegration(t *testing.T) {
numGoRoutinesPostServer := runtime.NumGoroutine()
app := app2.NewApp(acn, "./storage")
// ***** cwtchPeer setup *****
// It's important that each Peer have their own EventBus
aliceEventBus := new(event.Manager)
/*aliceEventBus := new(event.Manager)
aliceEventBus.Initialize()
bobEventBus := new(event.Manager)
bobEventBus.Initialize()
carolEventBus := new(event.Manager)
carolEventBus.Initialize()
carolEventBus.Initialize()*/
fmt.Println("Creating Alice...")
alice := peer.NewCwtchPeer("Alice")
alice.Init(acn, aliceEventBus)
alice.Listen()
alice, _ := app.CreatePeer("alice", "asdfasdf")
fmt.Println("Alice created:", alice.GetProfile().Onion)
fmt.Println("Creating Bob...")
bob := peer.NewCwtchPeer("Bob")
bob.Init(acn, bobEventBus)
bob.Listen()
bob, _ := app.CreatePeer("bob", "asdfasdf")
fmt.Println("Bob created:", bob.GetProfile().Onion)
fmt.Println("Creating Carol...")
carol := peer.NewCwtchPeer("Carol")
carol.Init(acn, carolEventBus)
carol.Listen()
carol, _ := app.CreatePeer("Carol", "asdfasdf")
fmt.Println("Carol created:", carol.GetProfile().Onion)
fmt.Println("Waiting for Alice, Bob, and Carol to connection with onion network...")
app.LaunchPeers()
fmt.Println("Waiting for Alice, Bob, and Carol to connect with onion network...")
time.Sleep(time.Second * 90)
numGoRoutinesPostPeerStart := runtime.NumGoroutine()
@ -175,8 +176,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
}
fmt.Println("Alice peering with Bob...")
alice.AddContact("Bob", bob.GetProfile().Onion, false) // Add contact so we can track connection state
alice.PeerWithOnion(bob.GetProfile().Onion)
fmt.Println("Alice peering with Carol...")
alice.AddContact("Carol", carol.GetProfile().Onion, false)
alice.PeerWithOnion(carol.GetProfile().Onion)
fmt.Println("Alice joining server...")
@ -185,10 +188,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
bob.JoinServer(serverAddr)
fmt.Println("Waiting for alice to join server...")
waitForPeerServerConnection(t, alice, serverAddr)
waitForPeerGroupConnection(t, alice, groupID)
fmt.Println("Waiting for bob to join server...")
waitForPeerServerConnection(t, bob, serverAddr)
//fmt.Println("Waiting for bob to join server...")
//waitForPeerGroupConnection(t, bob, groupID)
fmt.Println("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
@ -217,35 +220,9 @@ func TestCwtchPeerIntegration(t *testing.T) {
numGoRoutinesPostServerConnect := runtime.NumGoroutine()
// ***** Fill up message history of server ******
/*
// filler group will be used to fill up the servers message history a bit to stress test fetch later for carol
fillerGroupId, _, err := alice.Profile.StartGroup(serverAddr)
if err != nil {
t.Errorf("Failed to init filler group: %v", err)
return
}
fmt.Println("Alice filling message history of server...")
for i := 0; i < 100; i++ {
go func (x int) {
time.Sleep(time.Second * time.Duration(x))
err := alice.SendMessageToGroup(fillerGroupId, aliceLines[0])
if err != nil {
fmt.Println("SEND", x, "ERROR:", err)
} else {
fmt.Println("SEND", x, " SUCCESS!")
}
}(i)
}
time.Sleep(time.Second * 110)
*/
// Wait for them to join the server
waitForPeerServerConnection(t, alice, serverAddr)
waitForPeerServerConnection(t, bob, serverAddr)
waitForPeerGroupConnection(t, alice, groupID)
waitForPeerGroupConnection(t, bob, groupID)
//numGouRoutinesPostServerConnect := runtime.NumGoroutine()
// ***** Conversation *****
@ -291,14 +268,13 @@ func TestCwtchPeerIntegration(t *testing.T) {
}
fmt.Println("Shutting down Alice...")
alice.Shutdown()
aliceEventBus.Shutdown()
app.ShutdownPeer(alice.GetProfile().Onion)
time.Sleep(time.Second * 5)
numGoRoutinesPostAlice := runtime.NumGoroutine()
fmt.Println("Carol joining server...")
carol.JoinServer(serverAddr)
waitForPeerServerConnection(t, carol, serverAddr)
waitForPeerGroupConnection(t, carol, groupID)
numGoRotinesPostCarolConnect := runtime.NumGoroutine()
fmt.Println("Bob> ", bobLines[2])
@ -380,8 +356,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
}
fmt.Println("Shutting down Bob...")
bob.Shutdown()
bobEventBus.Shutdown()
app.ShutdownPeer(bob.GetProfile().Onion)
time.Sleep(time.Second * 3)
numGoRoutinesPostBob := runtime.NumGoroutine()
if server != nil {
@ -392,8 +367,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
numGoRoutinesPostServerShutdown := runtime.NumGoroutine()
fmt.Println("Shutting down Carol...")
carol.Shutdown()
carolEventBus.Shutdown()
app.ShutdownPeer(carol.GetProfile().Onion)
time.Sleep(time.Second * 3)
numGoRoutinesPostCarol := runtime.NumGoroutine()