Browse Source

Merge branch 'storageRW' of dan/cwtch into master

pull/247/head
Sarah Jamie Lewis 8 months ago
parent
commit
5429cc6deb

+ 61
- 22
app/app.go View File

@@ -3,8 +3,10 @@ package app
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/storage"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"

"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
@@ -16,12 +18,13 @@ import (

type application struct {
peers map[string]peer.CwtchPeer
storage map[string]storage.ProfileStore
engines map[string]connections.Engine
eventBuses map[string]*event.Manager
acn connectivity.ACN
directory string
mutex sync.Mutex
primaryonion string
storage map[string]storage.ProfileStore
eventBus *event.Manager
}

// Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
@@ -32,20 +35,18 @@ type Application interface {
PrimaryIdentity() peer.CwtchPeer
GetPeer(onion string) peer.CwtchPeer
ListPeers() map[string]string
GetEventBus(onion string) *event.Manager
LaunchPeers()

EventBus() *event.Manager

ShutdownPeer(string)
Shutdown()
}

// NewApp creates a new app with some environment awareness and initializes a Tor Manager
func NewApp(acn connectivity.ACN, appDirectory string) Application {
log.Debugf("NewApp(%v)\n", appDirectory)
app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), directory: appDirectory, acn: acn}
app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), engines: make(map[string]connections.Engine), eventBuses: make(map[string]*event.Manager), directory: appDirectory, acn: acn}
os.Mkdir(path.Join(app.directory, "profiles"), 0700)
app.eventBus = new(event.Manager)
app.eventBus.Initialize()
return app
}

@@ -53,19 +54,31 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application {
func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer, error) {
log.Debugf("CreatePeer(%v)\n", name)

eventBus := new(event.Manager)
eventBus.Initialize()

profile := storage.NewProfile(name)
profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", profile.LocalID), password, profile)
profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", profile.LocalID), password, profile)
pc := profileStore.GetProfileCopy()
p := peer.FromProfile(pc)
p.Init(app.acn, app.eventBus)
_, exists := app.peers[p.GetProfile().Onion]
if exists {
p.Shutdown()
profileStore.Shutdown()
eventBus.Shutdown()
return nil, fmt.Errorf("Error: profile for onion %v already exists", p.GetProfile().Onion)
}
p.Init(app.acn, eventBus)

blockedPeers := profile.BlockedPeers()
// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers)

app.mutex.Lock()
app.peers[p.GetProfile().Onion] = p
app.storage[p.GetProfile().Onion] = profileStore
app.engines[p.GetProfile().Onion] = engine
app.eventBuses[p.GetProfile().Onion] = eventBus
app.mutex.Unlock()

return p, nil
@@ -79,8 +92,10 @@ func (app *application) LoadProfiles(password string) error {

for _, file := range files {

// TODO: Per profile eventBus
profileStore := storage.NewProfileStore(app.eventBus, path.Join(app.directory, "profiles", file.Name()), password, nil)
eventBus := new(event.Manager)
eventBus.Initialize()

profileStore := storage.NewProfileWriterStore(eventBus, path.Join(app.directory, "profiles", file.Name()), password, nil)
err = profileStore.Load()
if err != nil {
continue
@@ -91,16 +106,23 @@ func (app *application) LoadProfiles(password string) error {
_, exists := app.peers[profile.Onion]
if exists {
profileStore.Shutdown()
eventBus.Shutdown()
log.Errorf("profile for onion %v already exists", profile.Onion)
continue
}

peer := peer.FromProfile(profile)
peer.Init(app.acn, app.eventBus)
peer.Init(app.acn, eventBus)

blockedPeers := profile.BlockedPeers()
identity := identity.InitializeV3(profile.Name, &profile.Ed25519PrivateKey, &profile.Ed25519PublicKey)
engine := connections.NewProtocolEngine(identity, profile.Ed25519PrivateKey, app.acn, eventBus, blockedPeers)

app.mutex.Lock()
app.peers[profile.Onion] = peer
app.storage[profile.Onion] = profileStore
app.engines[profile.Onion] = engine
app.eventBuses[profile.Onion] = eventBus
if app.primaryonion == "" {
app.primaryonion = profile.Onion
}
@@ -109,10 +131,13 @@ func (app *application) LoadProfiles(password string) error {
return nil
}

// LaunchPeers starts each peer Listening and connecting to peers and groups
func (app *application) LaunchPeers() {
for _, p := range app.peers {
if !p.IsStarted() {
p.Listen()
p.StartPeersConnections()
p.StartGroupConnections()
}
}
}
@@ -139,20 +164,34 @@ func (app *application) GetPeer(onion string) peer.CwtchPeer {
return nil
}

/*
// GetTorStatus returns tor control port bootstrap-phase status info in a map
func (app *application) GetTorStatus() (map[string]string, error) {
return app.torManager.GetStatus()
}*/
// GetEventBus returns a cwtchPeer's event bus
func (app *application) GetEventBus(onion string) *event.Manager {
if manager, ok := app.eventBuses[onion]; ok {
return manager
}
return nil
}

// Fetch the app's event manager
func (app *application) EventBus() *event.Manager {
return app.eventBus
// ShutdownPeer shuts down a peer and removes it from the app's management
func (app *application) ShutdownPeer(onion string) {
app.mutex.Lock()
defer app.mutex.Unlock()
app.eventBuses[onion].Shutdown()
delete(app.eventBuses, onion)
app.peers[onion].Shutdown()
delete(app.peers, onion)
app.engines[onion].Shutdown()
delete(app.engines, onion)
app.storage[onion].Shutdown()
delete(app.storage, onion)
}

// Shutdown shutsdown all peers of an app and then the tormanager
func (app *application) Shutdown() {
for _, peer := range app.peers {
for id, peer := range app.peers {
peer.Shutdown()
app.engines[id].Shutdown()
app.storage[id].Shutdown()
app.eventBuses[id].Shutdown()
}
}

+ 6
- 6
app/bots/servermon/main.go View File

@@ -11,16 +11,16 @@ import (
"time"
)

func waitForPeerServerConnection(peer peer.CwtchPeer, server string) error {
func waitForPeerGroupConnection(peer peer.CwtchPeer, groupID string) error {
for {
servers := peer.GetServers()
state, ok := servers[server]
_, ok := peer.GetProfile().Groups[groupID]
if ok {
state := peer.GetGroupState(groupID)
if state == connections.FAILED {
return errors.New("Connection to server " + server + " failed!")
return errors.New("Connection to group " + groupID + " failed!")
}
if state != connections.AUTHENTICATED {
fmt.Printf("peer %v waiting to authenticate with server %v, current state: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state])
fmt.Printf("peer %v waiting to authenticate with group %v 's server, current state: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state])
time.Sleep(time.Second * 10)
continue
}
@@ -57,7 +57,7 @@ func main() {
os.Exit(1)
}

err = waitForPeerServerConnection(botPeer, serverAddr)
err = waitForPeerGroupConnection(botPeer, groupID)
if err != nil {
fmt.Printf("Could not connect to server %v: %v\n", serverAddr, err)
os.Exit(1)

+ 11
- 12
app/cli/main.go View File

@@ -6,7 +6,6 @@ import (

"bytes"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol/connections"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
@@ -45,8 +44,8 @@ var suggestionsSelectedProfile = []prompt.Suggest{
{Text: "/invite", Description: "invite a new contact"},
{Text: "/invite-to-group", Description: "invite an existing contact to join an existing group"},
{Text: "/accept-invite", Description: "accept the invite of a group"},
{Text: "/list-servers", Description: "retrieve a list of servers and their connection status"},
{Text: "/list-peers", Description: "retrieve a list of peers and their connection status"},
/*{Text: "/list-servers", Description: "retrieve a list of servers and their connection status"},
{Text: "/list-peers", Description: "retrieve a list of peers and their connection status"},*/
{Text: "/export-group", Description: "export a group invite: prints as a string"},
{Text: "/trust", Description: "trust a peer"},
{Text: "/block", Description: "block a peer - you will no longer see messages or connect to this peer"},
@@ -55,13 +54,13 @@ var suggestionsSelectedProfile = []prompt.Suggest{
var suggestions = suggestionsBase

var usages = map[string]string{
"/new-profile": "/new-profile [name]",
"/load-profiles": "/load-profiles",
"/list-profiles": "",
"/select-profile": "/select-profile [onion]",
"/quit": "",
"/list-servers": "",
"/list-peers": "",
"/new-profile": "/new-profile [name]",
"/load-profiles": "/load-profiles",
"/list-profiles": "",
"/select-profile": "/select-profile [onion]",
"/quit": "",
/* "/list-servers": "",
"/list-peers": "",*/
"/list-contacts": "",
"/list-groups": "",
"/select-group": "/select-group [groupid]",
@@ -424,7 +423,7 @@ func main() {
} else {
fmt.Printf("Error inviting peer, usage: %s\n", usages[commands[0]])
}
case "/list-peers":
/*case "/list-peers":
peers := peer.GetPeers()
for p, s := range peers {
fmt.Printf("Name: %v Status: %v\n", p, connections.ConnectionStateName[s])
@@ -433,7 +432,7 @@ func main() {
servers := peer.GetServers()
for s, st := range servers {
fmt.Printf("Name: %v Status: %v\n", s, connections.ConnectionStateName[st])
}
}*/
case "/list-contacts":
contacts := peer.GetContacts()
for _, onion := range contacts {

+ 10
- 0
event/common.go View File

@@ -84,6 +84,14 @@ const (
// Key [eg "nick"]
// Data [eg "open privacy board"]
SetGroupAttribute = Type("SetGroupAttribute")

// RemotePeer
// ConnectionState
PeerStateChange = Type("PeerStateChange")

// GroupServer
// ConnectionState
ServerStateChange = Type("GroupStateChange")
)

// Field defines common event attributes
@@ -106,6 +114,8 @@ const (

ProfileName = Field("ProfileName")

ConnectionState = Field("ConnectionState")

Key = Field("Key")
Data = Field("Data")


+ 4
- 1
event/eventmanager.go View File

@@ -24,6 +24,7 @@ type Manager struct {
events chan Event
mapMutex sync.Mutex
internal chan bool
closed bool
}

// Initialize sets up the Manager.
@@ -31,6 +32,7 @@ func (em *Manager) Initialize() {
em.subscribers = make(map[Type][]chan Event)
em.events = make(chan Event)
em.internal = make(chan bool)
em.closed = false
go em.eventBus()
}

@@ -44,7 +46,7 @@ func (em *Manager) Subscribe(eventType Type, eventChannel chan Event) {

// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
func (em *Manager) Publish(event Event) {
if event.EventType != "" {
if event.EventType != "" && em.closed != true {
em.events <- event
}
}
@@ -83,6 +85,7 @@ func (em *Manager) eventBus() {
// Shutdown triggers, and waits for, the internal eventBus goroutine to finish
func (em *Manager) Shutdown() {
em.events <- Event{}
em.closed = true
// wait for eventBus to finish
<-em.internal
close(em.events)

+ 1
- 0
model/group.go View File

@@ -29,6 +29,7 @@ type Group struct {
Attributes map[string]string
lock sync.Mutex
LocalID string
State string `json:"-"`
unacknowledgedMessages []Message
}


+ 13
- 0
model/profile.go View File

@@ -27,6 +27,7 @@ type PublicProfile struct {
Attributes map[string]string
Timeline Timeline
LocalID string // used by storage engine
State string `json:"-"`
lock sync.Mutex
}

@@ -186,6 +187,18 @@ func (p *Profile) BlockPeer(onion string) (err error) {
return
}

// BlockedPeers calculates a list of Peers who have been Blocked.
func (p *Profile) BlockedPeers() []string {
blockedPeers := []string{}
for _, contact := range p.GetContacts() {
c, _ := p.GetContact(contact)
if c.Blocked {
blockedPeers = append(blockedPeers, c.Onion)
}
}
return blockedPeers
}

// TrustPeer sets a contact to trusted
func (p *Profile) TrustPeer(onion string) (err error) {
p.lock.Lock()

+ 50
- 32
peer/cwtch_peer.go View File

@@ -5,11 +5,11 @@ import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections"
"encoding/base32"
"encoding/base64"
"encoding/json"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"github.com/golang/protobuf/proto"
"strings"
@@ -24,7 +24,6 @@ type cwtchPeer struct {
shutdown bool
started bool

engine connections.Engine
queue *event.Queue
eventBus *event.Manager
}
@@ -47,9 +46,7 @@ type CwtchPeer interface {
SendMessageToGroupTracked(string, string) (string, error)

GetProfile() *model.Profile

GetPeers() map[string]connections.ConnectionState
GetServers() map[string]connections.ConnectionState
GetPeerState(string) connections.ConnectionState

StartGroup(string) (string, []byte, error)

@@ -57,13 +54,16 @@ type CwtchPeer interface {
ExportGroup(string) (string, error)

GetGroup(string) *model.Group
GetGroupState(string) connections.ConnectionState
GetGroups() []string
AddContact(nick, onion string, publickey []byte, trusted bool)
AddContact(nick, onion string, trusted bool)
GetContacts() []string
GetContact(string) *model.PublicProfile

IsStarted() bool
Listen()
StartPeersConnections()
StartGroupConnections()
Shutdown()
}

@@ -90,19 +90,8 @@ func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus *event.Manager) {
cp.eventBus = eventBus
cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue.EventChannel)
cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue.EventChannel)

// Calculate a list of Peers who have been Blocked.
blockedPeers := []string{}
for _, contact := range cp.Profile.GetContacts() {
c, _ := cp.Profile.GetContact(contact)
if c.Blocked {
blockedPeers = append(blockedPeers, c.Onion)
}
}

// TODO: Would be nice if ProtocolEngine did not need to explicitly be given the Private Key.
identity := identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey)
cp.engine = connections.NewProtocolEngine(identity, cp.Profile.Ed25519PrivateKey, acn, eventBus, blockedPeers)
cp.eventBus.Subscribe(event.ServerStateChange, cp.queue.EventChannel)
cp.eventBus.Subscribe(event.PeerStateChange, cp.queue.EventChannel)
}

// ImportGroup intializes a group from an imported source rather than a peer invite
@@ -175,8 +164,9 @@ func (cp *cwtchPeer) GetGroup(groupID string) *model.Group {
return cp.Profile.GetGroupByGroupID(groupID)
}

func (cp *cwtchPeer) AddContact(nick, onion string, publickey []byte, trusted bool) {
pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: publickey, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"nick": nick}}
func (cp *cwtchPeer) AddContact(nick, onion string, trusted bool) {
decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion))
pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: decodedPub, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"nick": nick}}
cp.Profile.AddContact(onion, pp)
pd, _ := json.Marshal(pp)
cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{
@@ -201,6 +191,14 @@ func (cp *cwtchPeer) GetProfile() *model.Profile {
return cp.Profile
}

func (cp *cwtchPeer) GetPeerState(onion string) connections.ConnectionState {
return connections.ConnectionStateType[cp.Profile.Contacts[onion].State]
}

func (cp *cwtchPeer) GetGroupState(groupid string) connections.ConnectionState {
return connections.ConnectionStateType[cp.Profile.Groups[groupid].State]
}

// PeerWithOnion is the entry point for cwtchPeer relationships
func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion}))
@@ -255,16 +253,6 @@ func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
return event.EventID
}

// GetPeers returns a list of peer connections.
func (cp *cwtchPeer) GetPeers() map[string]connections.ConnectionState {
return cp.engine.GetPeers()
}

// GetServers returns a list of server connections
func (cp *cwtchPeer) GetServers() map[string]connections.ConnectionState {
return cp.engine.GetServers()
}

// TrustPeer sets an existing peer relationship to trusted
func (cp *cwtchPeer) TrustPeer(peer string) error {
err := cp.Profile.TrustPeer(peer)
@@ -298,14 +286,34 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
cp.Profile.RejectInvite(groupID)
}

// Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
func (cp *cwtchPeer) Listen() {
cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[event.Field]string{}))
}

// StartGroupConnections attempts to connect to all group servers (thus initiating reconnect attempts in the conectionsmanager)
func (cp *cwtchPeer) StartPeersConnections() {
for _, contact := range cp.GetContacts() {
cp.PeerWithOnion(contact)
}
}

// StartPeerConnections attempts to connect to all peers (thus initiating reconnect attempts in the conectionsmanager)
func (cp *cwtchPeer) StartGroupConnections() {
joinedServers := map[string]bool{}
for _, groupID := range cp.GetGroups() {
// Only send a join server packet if we haven't joined this server yet...
group := cp.GetGroup(groupID)
if joined := joinedServers[groupID]; group.Accepted && !joined {
cp.JoinServer(group.GroupServer)
joinedServers[group.GroupServer] = true
}
}
}

// Shutdown kills all connections and cleans up all goroutines for the peer
func (cp *cwtchPeer) Shutdown() {
cp.shutdown = true
cp.engine.Shutdown()
cp.queue.Shutdown()
}

@@ -328,6 +336,16 @@ func (cp *cwtchPeer) eventHandler() {
var groupInvite protocol.GroupChatInvite
proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &groupInvite)
cp.Profile.ProcessInvite(&groupInvite, ev.Data[event.RemotePeer])
case event.PeerStateChange:
if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists {
cp.Profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState]
}
case event.ServerStateChange:
for _, group := range cp.Profile.Groups {
if group.GroupServer == ev.Data[event.GroupServer] {
group.State = ev.Data[event.ConnectionState]
}
}
default:
if ev.EventType != "" {
log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType)

+ 2
- 25
protocol/connections/connectionsmanager.go View File

@@ -42,12 +42,12 @@ func (m *Manager) ManagePeerConnection(host string, engine Engine) *PeerPeerConn
}

// ManageServerConnection creates a new ServerConnection for Host with the given callback handler.
func (m *Manager) ManageServerConnection(host string, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) {
func (m *Manager) ManageServerConnection(host string, engine Engine, messageHandler func(string, *protocol.GroupMessage), closedHandler func(string)) {
m.lock.Lock()

psc, exists := m.serverConnections[host]

newPsc := NewPeerServerConnection(m.acn, host)
newPsc := NewPeerServerConnection(engine, host)
newPsc.GroupMessageHandler = messageHandler
newPsc.CloseHandler = closedHandler
go newPsc.Run()
@@ -60,28 +60,6 @@ func (m *Manager) ManageServerConnection(host string, messageHandler func(string
m.lock.Unlock()
}

// GetPeers returns a map of all peer connections with their state
func (m *Manager) GetPeers() map[string]ConnectionState {
rm := make(map[string]ConnectionState)
m.lock.Lock()
for onion, ppc := range m.peerConnections {
rm[onion] = ppc.GetState()
}
m.lock.Unlock()
return rm
}

// GetServers returns a map of all server connections with their state.
func (m *Manager) GetServers() map[string]ConnectionState {
rm := make(map[string]ConnectionState)
m.lock.Lock()
for onion, psc := range m.serverConnections {
rm[onion] = psc.GetState()
}
m.lock.Unlock()
return rm
}

// GetPeerPeerConnectionForOnion safely returns a given peer connection
func (m *Manager) GetPeerPeerConnectionForOnion(host string) (ppc *PeerPeerConnection) {
m.lock.Lock()
@@ -103,7 +81,6 @@ func (m *Manager) AttemptReconnections() {
maxTimeout := time.Minute * 5
// nearly instant first run, next few runs will prolly be too quick to have any FAILED and will gracefully slow to MAX after that
timeout := time.Millisecond * 500

for {
select {
case <-time.After(timeout):

+ 6
- 14
protocol/connections/engine.go View File

@@ -45,6 +45,7 @@ type engine struct {
type Engine interface {
Identity() identity.Identity
ACN() connectivity.ACN
EventManager() *event.Manager

GetPeerHandler(string) *CwtchPeerHandler
ContactRequest(string, string) string
@@ -52,9 +53,6 @@ type Engine interface {
LookupContact(string, rsa.PublicKey) (bool, bool)
LookupContactV3(string, ed25519.PublicKey) (bool, bool)

GetPeers() map[string]ConnectionState
GetServers() map[string]ConnectionState

Shutdown()
}

@@ -94,6 +92,10 @@ func (e *engine) Identity() identity.Identity {
return e.identity
}

func (e *engine) EventManager() *event.Manager {
return e.eventManager
}

// eventHandler process events from other subsystems
func (e *engine) eventHandler() {
for {
@@ -244,7 +246,7 @@ func (e *engine) finishedFetch(server string) {

// joinServer manages a new server connection with the given onion address
func (e *engine) joinServer(onion string) {
e.connectionsManager.ManageServerConnection(onion, e.receiveGroupMessage, e.finishedFetch)
e.connectionsManager.ManageServerConnection(onion, e, e.receiveGroupMessage, e.finishedFetch)
}

// sendMessageToGroup attempts to sent the given message to the given group id.
@@ -264,16 +266,6 @@ func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) {
}
}

// GetPeers returns a list of peer connections.
func (e *engine) GetPeers() map[string]ConnectionState {
return e.connectionsManager.GetPeers()
}

// GetServers returns a list of server connections
func (e *engine) GetServers() map[string]ConnectionState {
return e.connectionsManager.GetServers()
}

// CwtchPeerInstance encapsulates incoming peer connections
type CwtchPeerInstance struct {
rai *application.Instance

+ 14
- 5
protocol/connections/peerpeerconnection.go View File

@@ -1,6 +1,7 @@
package connections

import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol/connections/peer"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go"
@@ -28,6 +29,14 @@ func NewPeerPeerConnection(peerhostname string, protocolEngine Engine) *PeerPeer
return ppc
}

func (ppc *PeerPeerConnection) setState(state ConnectionState) {
ppc.state = state
ppc.protocolEngine.EventManager().Publish(event.NewEvent(event.PeerStateChange, map[event.Field]string{
event.RemotePeer: string(ppc.PeerHostname),
event.ConnectionState: ConnectionStateName[state],
}))
}

// GetState returns the current connection state
func (ppc *PeerPeerConnection) GetState() ConnectionState {
return ppc.state
@@ -78,14 +87,14 @@ func (ppc *PeerPeerConnection) WaitTilAuthenticated() {

// Run manages the setup and teardown of a peer->peer connection
func (ppc *PeerPeerConnection) Run() error {
ppc.state = CONNECTING
ppc.setState(CONNECTING)
rc, err := goricochet.Open(ppc.protocolEngine.ACN(), ppc.PeerHostname)
if err == nil {
ppc.connection = rc
ppc.state = CONNECTED
ppc.setState(CONNECTED)
_, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity())
if err == nil {
ppc.state = AUTHENTICATED
ppc.setState(AUTHENTICATED)
go func() {
ppc.connection.Do(func() error {
ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)})
@@ -101,13 +110,13 @@ func (ppc *PeerPeerConnection) Run() error {
ppc.connection.Process(ppc)
}
}
ppc.state = FAILED
ppc.setState(FAILED)
return err
}

// Close closes the connection
func (ppc *PeerPeerConnection) Close() {
ppc.state = KILLED
ppc.setState(KILLED)
if ppc.connection != nil {
ppc.connection.Close()
}

+ 20
- 12
protocol/connections/peerserverconnection.go View File

@@ -2,6 +2,7 @@ package connections

import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections/fetch"
"cwtch.im/cwtch/protocol/connections/listen"
@@ -10,7 +11,6 @@ import (
"git.openprivacy.ca/openprivacy/libricochet-go"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/connection"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"golang.org/x/crypto/ed25519"
@@ -20,19 +20,19 @@ import (
// PeerServerConnection encapsulates a single Peer->Server connection
type PeerServerConnection struct {
connection.AutoConnectionHandler
Server string
state ConnectionState
connection *connection.Connection
acn connectivity.ACN
Server string
state ConnectionState
connection *connection.Connection
protocolEngine Engine

GroupMessageHandler func(string, *protocol.GroupMessage)
CloseHandler func(string)
}

// NewPeerServerConnection creates a new Peer->Server outbound connection
func NewPeerServerConnection(acn connectivity.ACN, serverhostname string) *PeerServerConnection {
func NewPeerServerConnection(engine Engine, serverhostname string) *PeerServerConnection {
psc := new(PeerServerConnection)
psc.acn = acn
psc.protocolEngine = engine
psc.Server = serverhostname
psc.Init()
return psc
@@ -43,6 +43,14 @@ func (psc *PeerServerConnection) GetState() ConnectionState {
return psc.state
}

func (psc *PeerServerConnection) setState(state ConnectionState) {
psc.state = state
psc.protocolEngine.EventManager().Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{
event.GroupServer: string(psc.Server),
event.ConnectionState: ConnectionStateName[state],
}))
}

// WaitTilAuthenticated waits until the underlying connection is authenticated
func (psc *PeerServerConnection) WaitTilAuthenticated() {
for {
@@ -56,15 +64,15 @@ func (psc *PeerServerConnection) WaitTilAuthenticated() {
// Run manages the setup and teardown of a peer server connection
func (psc *PeerServerConnection) Run() error {
log.Infof("Connecting to %v", psc.Server)
rc, err := goricochet.Open(psc.acn, psc.Server)
rc, err := goricochet.Open(psc.protocolEngine.ACN(), psc.Server)
if err == nil {
psc.connection = rc
psc.state = CONNECTED
psc.setState(CONNECTED)
pub, priv, err := ed25519.GenerateKey(rand.Reader)
if err == nil {
_, err := connection.HandleOutboundConnection(psc.connection).ProcessAuthAsV3Client(identity.InitializeV3("cwtchpeer", &priv, &pub))
if err == nil {
psc.state = AUTHENTICATED
psc.setState(AUTHENTICATED)

go func() {
psc.connection.Do(func() error {
@@ -81,7 +89,7 @@ func (psc *PeerServerConnection) Run() error {
}
}
}
psc.state = FAILED
psc.setState(FAILED)
return err
}

@@ -128,7 +136,7 @@ func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) err

// Close shuts down the connection (freeing the handler goroutines)
func (psc *PeerServerConnection) Close() {
psc.state = KILLED
psc.setState(KILLED)
if psc.connection != nil {
psc.connection.Close()
}

+ 6
- 1
protocol/connections/peerserverconnection_test.go View File

@@ -2,6 +2,7 @@ package connections

import (
"crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/server/fetch"
"cwtch.im/cwtch/server/send"
@@ -75,7 +76,11 @@ func TestPeerServerConnection(t *testing.T) {
<-listenChan
onionAddr := identity.Hostname()

psc := NewPeerServerConnection(connectivity.LocalProvider(), "127.0.0.1:5451|"+onionAddr)
manager := &event.Manager{}
manager.Initialize()
engine := NewProtocolEngine(identity, priv, connectivity.LocalProvider(), manager, nil)

psc := NewPeerServerConnection(engine, "127.0.0.1:5451|"+onionAddr)
numcalls := 0
psc.GroupMessageHandler = func(s string, gm *protocol.GroupMessage) {
numcalls++

+ 5
- 1
protocol/connections/state.go View File

@@ -7,7 +7,7 @@ type ConnectionState int
// DISCONNECTED - No existing connection has been made, or all attempts have failed
// CONNECTING - We are in the process of attempting to connect to a given endpoint
// CONNECTED - We have connected but not yet authenticated
// AUTHENTICATED - im.ricochet.auth-hidden-server has succeeded on thec onnection.
// AUTHENTICATED - im.ricochet.auth-hidden-server has succeeded on the connection.
const (
DISCONNECTED ConnectionState = iota
CONNECTING
@@ -20,4 +20,8 @@ const (
var (
// ConnectionStateName allows conversion of states to their string representations
ConnectionStateName = []string{"Disconnected", "Connecting", "Connected", "Authenticated", "Failed", "Killed"}

// ConnectionStateType allows conversion of strings to their state type
ConnectionStateType = map[string]ConnectionState{"Disconnected": DISCONNECTED, "Connecting": CONNECTING,
"Connected": CONNECTED, "Authenticated": AUTHENTICATED, "Failed": FAILED, "Killed": KILLED}
)

+ 21
- 6
storage/profile_store.go View File

@@ -21,6 +21,7 @@ type profileStore struct {
profile *model.Profile
eventManager *event.Manager
queue *event.Queue
writer bool
}

// ProfileStore is an interface to managing the storage of Cwtch Profiles
@@ -30,11 +31,11 @@ type ProfileStore interface {
GetProfileCopy() *model.Profile
}

// NewProfileStore returns a profile store backed by a filestore listening for events and saving them
// NewProfileWriterStore returns a profile store backed by a filestore listening for events and saving them
// directory should be $appDir/profiles/$rand
func NewProfileStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore {
func NewProfileWriterStore(eventManager *event.Manager, directory, password string, profile *model.Profile) ProfileStore {
os.Mkdir(directory, 0700)
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}}
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: eventManager, streamStores: map[string]StreamStore{}, writer: true}
ps.queue = event.NewEventQueue(100)
if profile != nil {
ps.save()
@@ -55,6 +56,15 @@ func NewProfileStore(eventManager *event.Manager, directory, password string, pr
return ps
}

// NewProfileReaderStore returns a profile store backed by a filestore
// directory should be $appDir/profiles/$rand
func NewProfileReaderStore(directory, password string, profile *model.Profile) ProfileStore {
os.Mkdir(directory, 0700)
ps := &profileStore{fs: NewFileStore(directory, profileFilename, password), password: password, directory: directory, profile: profile, eventManager: nil, streamStores: map[string]StreamStore{}, writer: true}

return ps
}

// NewProfile creates a new profile for use in the profile store.
func NewProfile(name string) *model.Profile {
profile := model.GenerateNewProfile(name)
@@ -62,8 +72,11 @@ func NewProfile(name string) *model.Profile {
}

func (ps *profileStore) save() error {
bytes, _ := json.Marshal(ps.profile)
return ps.fs.Save(bytes)
if ps.writer {
bytes, _ := json.Marshal(ps.profile)
return ps.fs.Save(bytes)
}
return nil
}

// Load instantiates a cwtchPeer from the file store
@@ -181,5 +194,7 @@ func (ps *profileStore) eventHandler() {
}

func (ps *profileStore) Shutdown() {
ps.queue.Shutdown()
if ps.queue != nil {
ps.queue.Shutdown()
}
}

+ 2
- 2
storage/profile_store_test.go View File

@@ -20,7 +20,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
eventBus := new(event.Manager)
eventBus.Initialize()
profile := NewProfile(testProfileName)
ps1 := NewProfileStore(eventBus, testingDir, password, profile)
ps1 := NewProfileWriterStore(eventBus, testingDir, password, profile)

eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{event.Key: testKey, event.Data: testVal}))
time.Sleep(1 * time.Second)
@@ -50,7 +50,7 @@ func TestProfileStoreWriteRead(t *testing.T) {

ps1.Shutdown()

ps2 := NewProfileStore(eventBus, testingDir, password, nil)
ps2 := NewProfileWriterStore(eventBus, testingDir, password, nil)
err = ps2.Load()
if err != nil {
t.Errorf("Error createing profileStore: %v\n", err)

+ 32
- 58
testing/cwtch_peer_server_integration_test.go View File

@@ -1,7 +1,7 @@
package testing

import (
"cwtch.im/cwtch/event"
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
@@ -55,16 +55,17 @@ func serverCheck(t *testing.T, serverAddr string) bool {
return true
}

func waitForPeerServerConnection(t *testing.T, peer peer.CwtchPeer, server string) {
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) {
for {
servers := peer.GetServers()
state, ok := servers[server]
_, ok := peer.GetProfile().Groups[groupID]
if ok {
state := peer.GetGroupState(groupID)
//log.Infof("Waiting for Peer %v to join group %v - state: %v\n", peer.GetProfile().Name, groupID, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, server)
t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, groupID)
}
if state != connections.AUTHENTICATED {
fmt.Printf("peer %v waiting connect to server %v, currently: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state])
fmt.Printf("peer %v waiting connect to group %v, currently: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
@@ -78,9 +79,11 @@ func waitForPeerServerConnection(t *testing.T, peer peer.CwtchPeer, server strin

func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) {
for {
peers := peera.GetPeers()
state, ok := peers[peerb.GetProfile().Onion]
//peers := peera.GetPeers()
_, ok := peera.GetProfile().Contacts[peerb.GetProfile().Onion]
if ok {
state := peera.GetPeerState(peerb.GetProfile().Onion)
//log.Infof("Waiting for Peer %v to peer with peer: %v - state: %v\n", peera.GetProfile().Name, peerb.GetProfile().Name, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peera.GetProfile().Onion, peerb.GetProfile().Onion)
}
@@ -132,35 +135,33 @@ func TestCwtchPeerIntegration(t *testing.T) {

numGoRoutinesPostServer := runtime.NumGoroutine()

app := app2.NewApp(acn, "./storage")

// ***** cwtchPeer setup *****

// It's important that each Peer have their own EventBus
aliceEventBus := new(event.Manager)
/*aliceEventBus := new(event.Manager)
aliceEventBus.Initialize()
bobEventBus := new(event.Manager)
bobEventBus.Initialize()
carolEventBus := new(event.Manager)
carolEventBus.Initialize()
carolEventBus.Initialize()*/

fmt.Println("Creating Alice...")
alice := peer.NewCwtchPeer("Alice")
alice.Init(acn, aliceEventBus)
alice.Listen()
alice, _ := app.CreatePeer("alice", "asdfasdf")
fmt.Println("Alice created:", alice.GetProfile().Onion)

fmt.Println("Creating Bob...")
bob := peer.NewCwtchPeer("Bob")
bob.Init(acn, bobEventBus)
bob.Listen()
bob, _ := app.CreatePeer("bob", "asdfasdf")
fmt.Println("Bob created:", bob.GetProfile().Onion)

fmt.Println("Creating Carol...")
carol := peer.NewCwtchPeer("Carol")
carol.Init(acn, carolEventBus)
carol.Listen()
carol, _ := app.CreatePeer("Carol", "asdfasdf")
fmt.Println("Carol created:", carol.GetProfile().Onion)

fmt.Println("Waiting for Alice, Bob, and Carol to connection with onion network...")
app.LaunchPeers()

fmt.Println("Waiting for Alice, Bob, and Carol to connect with onion network...")
time.Sleep(time.Second * 90)
numGoRoutinesPostPeerStart := runtime.NumGoroutine()

@@ -175,8 +176,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
}

fmt.Println("Alice peering with Bob...")
alice.AddContact("Bob", bob.GetProfile().Onion, false) // Add contact so we can track connection state
alice.PeerWithOnion(bob.GetProfile().Onion)
fmt.Println("Alice peering with Carol...")
alice.AddContact("Carol", carol.GetProfile().Onion, false)
alice.PeerWithOnion(carol.GetProfile().Onion)

fmt.Println("Alice joining server...")
@@ -185,10 +188,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
bob.JoinServer(serverAddr)

fmt.Println("Waiting for alice to join server...")
waitForPeerServerConnection(t, alice, serverAddr)
waitForPeerGroupConnection(t, alice, groupID)

fmt.Println("Waiting for bob to join server...")
waitForPeerServerConnection(t, bob, serverAddr)
//fmt.Println("Waiting for bob to join server...")
//waitForPeerGroupConnection(t, bob, groupID)

fmt.Println("Waiting for alice and Bob to peer...")
waitForPeerPeerConnection(t, alice, bob)
@@ -217,35 +220,9 @@ func TestCwtchPeerIntegration(t *testing.T) {

numGoRoutinesPostServerConnect := runtime.NumGoroutine()

// ***** Fill up message history of server ******

/*
// filler group will be used to fill up the servers message history a bit to stress test fetch later for carol
fillerGroupId, _, err := alice.Profile.StartGroup(serverAddr)
if err != nil {
t.Errorf("Failed to init filler group: %v", err)
return
}

fmt.Println("Alice filling message history of server...")
for i := 0; i < 100; i++ {

go func (x int) {
time.Sleep(time.Second * time.Duration(x))
err := alice.SendMessageToGroup(fillerGroupId, aliceLines[0])
if err != nil {
fmt.Println("SEND", x, "ERROR:", err)
} else {
fmt.Println("SEND", x, " SUCCESS!")
}
}(i)
}

time.Sleep(time.Second * 110)
*/
// Wait for them to join the server
waitForPeerServerConnection(t, alice, serverAddr)
waitForPeerServerConnection(t, bob, serverAddr)
waitForPeerGroupConnection(t, alice, groupID)
waitForPeerGroupConnection(t, bob, groupID)
//numGouRoutinesPostServerConnect := runtime.NumGoroutine()

// ***** Conversation *****
@@ -291,14 +268,13 @@ func TestCwtchPeerIntegration(t *testing.T) {
}

fmt.Println("Shutting down Alice...")
alice.Shutdown()
aliceEventBus.Shutdown()
app.ShutdownPeer(alice.GetProfile().Onion)
time.Sleep(time.Second * 5)
numGoRoutinesPostAlice := runtime.NumGoroutine()

fmt.Println("Carol joining server...")
carol.JoinServer(serverAddr)
waitForPeerServerConnection(t, carol, serverAddr)
waitForPeerGroupConnection(t, carol, groupID)
numGoRotinesPostCarolConnect := runtime.NumGoroutine()

fmt.Println("Bob> ", bobLines[2])
@@ -380,8 +356,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
}

fmt.Println("Shutting down Bob...")
bob.Shutdown()
bobEventBus.Shutdown()
app.ShutdownPeer(bob.GetProfile().Onion)
time.Sleep(time.Second * 3)
numGoRoutinesPostBob := runtime.NumGoroutine()
if server != nil {
@@ -392,8 +367,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
numGoRoutinesPostServerShutdown := runtime.NumGoroutine()

fmt.Println("Shutting down Carol...")
carol.Shutdown()
carolEventBus.Shutdown()
app.ShutdownPeer(carol.GetProfile().Onion)
time.Sleep(time.Second * 3)
numGoRoutinesPostCarol := runtime.NumGoroutine()


Loading…
Cancel
Save