diff --git a/app/app.go b/app/app.go index e1ca7fa..d10ae9b 100644 --- a/app/app.go +++ b/app/app.go @@ -2,6 +2,7 @@ package app import ( "crypto/rand" + "cwtch.im/cwtch/event" "cwtch.im/cwtch/peer" "cwtch.im/cwtch/storage" "encoding/hex" @@ -23,6 +24,7 @@ type application struct { mutex sync.Mutex primaryonion string storage map[string]storage.ProfileStore + eventBus *event.Manager } // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers @@ -46,6 +48,8 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application { log.Debugf("NewApp(%v)\n", appDirectory) app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), directory: appDirectory, acn: acn} os.Mkdir(path.Join(app.directory, "profiles"), 0700) + app.eventBus = new(event.Manager) + app.eventBus.Initialize() return app } @@ -73,7 +77,7 @@ func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer if err != nil { return nil, err } - p.Init(app.acn) + p.Init(app.acn, app.eventBus) _, exists := app.peers[p.GetProfile().Onion] if exists { p.Shutdown() @@ -109,7 +113,7 @@ func (app *application) LoadProfiles(password string) error { continue } - p.Init(app.acn) + p.Init(app.acn, app.eventBus) app.mutex.Lock() app.peers[p.GetProfile().Onion] = p diff --git a/app/bots/servermon/main.go b/app/bots/servermon/main.go index 4957327..a8e4252 100644 --- a/app/bots/servermon/main.go +++ b/app/bots/servermon/main.go @@ -1,8 +1,9 @@ package main import ( + "cwtch.im/cwtch/event" "cwtch.im/cwtch/peer" - "cwtch.im/cwtch/peer/connections" + "cwtch.im/cwtch/protocol/connections" "errors" "fmt" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" @@ -46,7 +47,7 @@ func main() { } botPeer := peer.NewCwtchPeer("servermon") - botPeer.Init(acn) + botPeer.Init(acn, new(event.Manager)) fmt.Printf("Connecting to %v...\n", serverAddr) botPeer.JoinServer(serverAddr) diff --git a/app/cli/main.go b/app/cli/main.go index 026021e..14df948 100644 --- a/app/cli/main.go +++ b/app/cli/main.go @@ -6,7 +6,7 @@ import ( "bytes" "cwtch.im/cwtch/model" - "cwtch.im/cwtch/peer/connections" + "cwtch.im/cwtch/protocol/connections" "fmt" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" @@ -265,6 +265,7 @@ func main() { log.Errorf("Error initializing application: %v", err) os.Exit(1) } + log.SetLevel(log.LevelDebug) fmt.Printf("\nWelcome to Cwtch!\n") fmt.Printf("If this if your first time you should create a profile by running `/new-profile`\n") fmt.Printf("`/load-profiles` will prompt you for a password and load profiles from storage\n") @@ -283,6 +284,8 @@ func main() { text := prompt.Input(prmpt, completer, prompt.OptionSuggestionBGColor(prompt.Purple), prompt.OptionDescriptionBGColor(prompt.White), + prompt.OptionPrefixTextColor(prompt.White), + prompt.OptionInputTextColor(prompt.Purple), prompt.OptionHistory(history)) commands := strings.Split(text[0:], " ") diff --git a/app/peer/alice/alice.go b/app/peer/alice/alice.go index f72236d..b4b1217 100644 --- a/app/peer/alice/alice.go +++ b/app/peer/alice/alice.go @@ -1,20 +1,40 @@ package main import ( + "cwtch.im/cwtch/event" "cwtch.im/cwtch/peer" + "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" + "os" + "path" ) func main() { - log.AddEverythingFromPattern("peer/alice") - alice := peer.NewCwtchPeer("alice") - processData := func(onion string, data []byte) []byte { - log.Debugf("Recieved %s from %v", data, onion) - return data + // System Setup, We need Tor and Logging up and Running + log.AddEverythingFromPattern("peer/alice") + log.SetLevel(log.LevelDebug) + + acn, err := connectivity.StartTor(path.Join(".", ".cwtch"), "") + if err != nil { + log.Errorf("\nError connecting to Tor: %v\n", err) + os.Exit(1) } - alice.SetPeerDataHandler(processData) + // Setup the Event Bus to Listen for Data Packets + eventBus := new(event.Manager) + eventBus.Initialize() + queue := event.NewEventQueue(100) + eventBus.Subscribe(event.NewMessageFromPeer, queue.EventChannel) + + // Setup Alice to Listen for new Events + alice := peer.NewCwtchPeer("alice") + alice.Init(acn, eventBus) alice.Listen() + // For every new Data Packet Alice recieved she will Print it out. + for { + event := queue.Next() + log.Printf(log.LevelInfo, "Received %v from %v: %s", event.EventType, event.Data["Onion"], event.Data["Data"]) + } } diff --git a/app/peer/bob/bob.go b/app/peer/bob/bob.go index f64b89d..f663f95 100644 --- a/app/peer/bob/bob.go +++ b/app/peer/bob/bob.go @@ -1,28 +1,41 @@ package main import ( + "cwtch.im/cwtch/event" "cwtch.im/cwtch/peer" + "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" - "strconv" + "os" + "path" "time" ) func main() { + + // System Boilerplate, We need Tor Up and Running log.AddEverythingFromPattern("peer/bob") + log.SetLevel(log.LevelDebug) + acn, err := connectivity.StartTor(path.Join(".", ".cwtch"), "") + if err != nil { + log.Errorf("\nError connecting to Tor: %v\n", err) + os.Exit(1) + } + + // Set up the Event Buss and Initialize the Peer + eventBus := new(event.Manager) + eventBus.Initialize() bob := peer.NewCwtchPeer("bob") - counter := 1 + bob.Init(acn, eventBus) - bob.SetPeerDataHandler(func(onion string, data []byte) []byte { - log.Infof("Recieved %s from %v", data, onion) - counter++ - return []byte(strconv.Itoa(counter)) - }) - connection := bob.PeerWithOnion("f4b6thuwmfszsqd3fzqpr45sdem4qoazdlzr2xmnc7fq22qe746hjqqd") + // Add Alice's Onion Here (It changes run to run) + bob.PeerWithOnion("upiztu7myymjf2dn4x4czhagp7axlnqjvf5zwfegbhtpkqb6v3vgu5yd") + // Send the Message... log.Infof("Waiting for Bob to Connect to Alice...") - connection.SendPacket([]byte("Hello Alice!!!")) + bob.SendMessageToPeer("upiztu7myymjf2dn4x4czhagp7axlnqjvf5zwfegbhtpkqb6v3vgu5yd", "Hello Alice!!!") // Wait a while... + // Everything is run in a goroutine so the main thread has to stay active time.Sleep(time.Second * 100) } diff --git a/event/common.go b/event/common.go new file mode 100644 index 0000000..151c3b2 --- /dev/null +++ b/event/common.go @@ -0,0 +1,27 @@ +package event + +// Type captures the definition of many common Cwtch application events +type Type string + +// Defining Common Event Types +const ( + StatusRequest = Type("StatusRequest") + ProtocolEngineStatus = Type("ProtocolEngineStatus") + + PeerRequest = Type("PeerRequest") + BlockPeer = Type("BlockPeer") + JoinServer = Type("JoinServer") + + ProtocolEngineStartListen = Type("ProtocolEngineStartListen") + ProtocolEngineStopped = Type("ProtocolEngineStopped") + + InvitePeerToGroup = Type("InvitePeerToGroup") + NewGroupInvite = Type("NewGroupInvite") + + SendMessageToGroup = Type("SendMessagetoGroup") + EncryptedGroupMessage = Type("EncryptedGroupMessage") + NewMessageFromGroup = Type("NewMessageFromGroup") + + SendMessageToPeer = Type("SendMessageToPeer") + NewMessageFromPeer = Type("NewMessageFromPeer") +) diff --git a/event/eventmanager.go b/event/eventmanager.go index 8521526..01a2599 100644 --- a/event/eventmanager.go +++ b/event/eventmanager.go @@ -2,18 +2,25 @@ package event import ( "git.openprivacy.ca/openprivacy/libricochet-go/log" + "git.openprivacy.ca/openprivacy/libricochet-go/utils" "sync" ) -// Event is a structure which binds a given set of data to an EventType +// Event is a structure which binds a given set of data to an Type type Event struct { - EventType string - Data []byte + EventType Type + EventID string + Data map[string]string +} + +// NewEvent creates a new event object with a unique ID and the given type and data. +func NewEvent(eventType Type, data map[string]string) Event { + return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data} } // Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others. type Manager struct { - subscribers map[string][]chan Event + subscribers map[Type][]chan Event events chan Event mapMutex sync.Mutex internal chan bool @@ -21,7 +28,7 @@ type Manager struct { // Initialize sets up the Manager. func (em *Manager) Initialize() { - em.subscribers = make(map[string][]chan Event) + em.subscribers = make(map[Type][]chan Event) em.events = make(chan Event) em.internal = make(chan bool) go em.eventBus() @@ -29,7 +36,7 @@ func (em *Manager) Initialize() { // Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type // will be sent to the eventChannel. -func (em *Manager) Subscribe(eventType string, eventChannel chan Event) { +func (em *Manager) Subscribe(eventType Type, eventChannel chan Event) { em.mapMutex.Lock() defer em.mapMutex.Unlock() em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel) @@ -75,7 +82,7 @@ func (em *Manager) eventBus() { // Shutdown triggers, and waits for, the internal eventBus goroutine to finish func (em *Manager) Shutdown() { - em.events <- Event{"", []byte{}} + em.events <- Event{} // wait for eventBus to finish <-em.internal close(em.events) diff --git a/event/eventmanager_test.go b/event/eventmanager_test.go index 7d24bee..a2fd4c6 100644 --- a/event/eventmanager_test.go +++ b/event/eventmanager_test.go @@ -14,10 +14,10 @@ func TestEventManager(t *testing.T) { // We need to make this buffer at least 1, otherwise we will log an error! testChan := make(chan Event, 1) eventManager.Subscribe("TEST", testChan) - eventManager.Publish(Event{EventType: "TEST", Data: []byte("Hello World")}) + eventManager.Publish(Event{EventType: "TEST", Data: map[string]string{"Value": "Hello World"}}) event := <-testChan - if event.EventType == "TEST" && string(event.Data) == "Hello World" { + if event.EventType == "TEST" && event.Data["Value"] == "Hello World" { } else { t.Errorf("Received Invalid Event") @@ -34,7 +34,7 @@ func TestEventManagerOverflow(t *testing.T) { // Explicitly setting this to 0 log an error! testChan := make(chan Event) eventManager.Subscribe("TEST", testChan) - eventManager.Publish(Event{EventType: "TEST", Data: []byte("Hello World")}) + eventManager.Publish(Event{EventType: "TEST"}) } func TestEventManagerMultiple(t *testing.T) { @@ -46,17 +46,17 @@ func TestEventManagerMultiple(t *testing.T) { peerEventQueue := NewEventQueue(10) allEventQueue := NewEventQueue(10) - eventManager.Subscribe("PeerEvent", peerEventQueue.EventChannel) eventManager.Subscribe("GroupEvent", groupEventQueue.EventChannel) eventManager.Subscribe("PeerEvent", allEventQueue.EventChannel) eventManager.Subscribe("GroupEvent", allEventQueue.EventChannel) eventManager.Subscribe("ErrorEvent", allEventQueue.EventChannel) - eventManager.Publish(Event{EventType: "PeerEvent", Data: []byte("Hello World Peer")}) - eventManager.Publish(Event{EventType: "GroupEvent", Data: []byte("Hello World Group")}) - eventManager.Publish(Event{EventType: "PeerEvent", Data: []byte("Hello World Peer 2")}) - eventManager.Publish(Event{EventType: "ErrorEvent", Data: []byte("Hello World Error")}) + eventManager.Publish(Event{EventType: "PeerEvent", Data: map[string]string{"Value": "Hello World Peer"}}) + eventManager.Publish(Event{EventType: "GroupEvent", Data: map[string]string{"Value": "Hello World Group"}}) + eventManager.Publish(Event{EventType: "PeerEvent", Data: map[string]string{"Value": "Hello World Peer"}}) + eventManager.Publish(Event{EventType: "ErrorEvent", Data: map[string]string{"Value": "Hello World Error"}}) + eventManager.Publish(Event{EventType: "NobodyIsSubscribedToThisEvent", Data: map[string]string{"Value": "Noone should see this!"}}) assertLength := func(len int, expected int, label string) { if len != expected { @@ -70,7 +70,7 @@ func TestEventManagerMultiple(t *testing.T) { assertLength(peerEventQueue.Backlog(), 2, "Peer Event Queue Length") assertLength(allEventQueue.Backlog(), 4, "All Event Queue Length") - checkEvent := func(eventType string, expected string, label string) { + checkEvent := func(eventType Type, expected Type, label string) { if eventType != expected { t.Errorf("Expected %s to be %v was %v", label, expected, eventType) } diff --git a/model/message_test.go b/model/message_test.go index c66d0d6..775f849 100644 --- a/model/message_test.go +++ b/model/message_test.go @@ -80,12 +80,12 @@ func TestTranscriptConsistency(t *testing.T) { c5, s5, _ := alice.EncryptMessageToGroup("Hello World 5", group.GroupID) t.Logf("Length of Encrypted Message: %v", len(c5)) - _, m1 := sarah.AttemptDecryption(c1, s1) + _, _, m1 := sarah.AttemptDecryption(c1, s1) sarah.AttemptDecryption(c1, s1) // Try a duplicate - _, m2 := sarah.AttemptDecryption(c2, s2) - _, m3 := sarah.AttemptDecryption(c3, s3) - _, m4 := sarah.AttemptDecryption(c4, s4) - _, m5 := sarah.AttemptDecryption(c5, s5) + _, _, m2 := sarah.AttemptDecryption(c2, s2) + _, _, m3 := sarah.AttemptDecryption(c3, s3) + _, _, m4 := sarah.AttemptDecryption(c4, s4) + _, _, m5 := sarah.AttemptDecryption(c5, s5) // Now we simulate a client receiving these messages completely out of order timeline.Insert(m1) diff --git a/model/profile.go b/model/profile.go index cceb48a..f69a83f 100644 --- a/model/profile.go +++ b/model/profile.go @@ -314,7 +314,7 @@ func (p *Profile) AddGroup(group *Group) { } // AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups. -func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, *Message) { +func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message) { for _, group := range p.Groups { success, dgm := group.DecryptMessage(ciphertext) if success { @@ -329,7 +329,7 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, // We set the flag to be handled by the UX and reject the message. if !valid { group.Compromised() - return false, nil + return false, "", nil } } @@ -340,13 +340,13 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, // Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised. if !verified { group.Compromised() - return false, nil + return false, "", nil } - return true, group.AddMessage(dgm, signature) + return true, group.GroupID, group.AddMessage(dgm, signature) } } - return false, nil + return false, "", nil } func getRandomness(arr *[]byte) { diff --git a/model/profile_test.go b/model/profile_test.go index 4116eee..e67cdc3 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -161,13 +161,13 @@ func TestProfileGroup(t *testing.T) { bob.ProcessInvite(gci2.GetGroupChatInvite(), alice.Onion) c3, s3, err := bob.EncryptMessageToGroup("Bobs Message", group2.GroupID) if err == nil { - ok, message := alice.AttemptDecryption(c3, s3) + ok, _, message := alice.AttemptDecryption(c3, s3) if !ok { t.Errorf("Bobs message to the group should be decrypted %v %v", message, ok) } eve := GenerateNewProfile("eve") - ok, _ = eve.AttemptDecryption(c3, s3) + ok, _, _ = eve.AttemptDecryption(c3, s3) if ok { t.Errorf("Eves hould not be able to decrypt messages!") } diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 76b5fd6..85868ad 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -1,48 +1,41 @@ package peer import ( - "crypto/rsa" + "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" - "cwtch.im/cwtch/peer/connections" - "cwtch.im/cwtch/peer/peer" "cwtch.im/cwtch/protocol" + "cwtch.im/cwtch/protocol/connections" "encoding/base64" "errors" - "fmt" - "git.openprivacy.ca/openprivacy/libricochet-go/application" - "git.openprivacy.ca/openprivacy/libricochet-go/channels" - "git.openprivacy.ca/openprivacy/libricochet-go/connection" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/identity" - "git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/utils" + "github.com/ethereum/go-ethereum/log" "github.com/golang/protobuf/proto" "golang.org/x/crypto/ed25519" "strings" "sync" - "time" ) // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer type cwtchPeer struct { - connection.AutoConnectionHandler - Profile *model.Profile - app *application.RicochetApplication - acn connectivity.ACN - mutex sync.Mutex - connectionsManager *connections.Manager - dataHandler func(string, []byte) []byte - shutdown bool - aif application.ApplicationInstanceFactory - started bool + Profile *model.Profile + mutex sync.Mutex + shutdown bool + started bool + + engine *connections.Engine + queue *event.Queue + eventBus *event.Manager } // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to // directly implement a cwtchPeer. type CwtchPeer interface { - Init(connectivity.ACN) + Init(connectivity.ACN, *event.Manager) PeerWithOnion(string) *connections.PeerPeerConnection InviteOnionToGroup(string, string) error + SendMessageToPeer(string, string) TrustPeer(string) error BlockPeer(string) error @@ -67,9 +60,6 @@ type CwtchPeer interface { GetContacts() []string GetContact(string) *model.PublicProfile - SetApplicationInstanceFactory(factory application.ApplicationInstanceFactory) - SetPeerDataHandler(func(string, []byte) []byte) - IsStarted() bool Listen() Shutdown() @@ -91,10 +81,17 @@ func FromProfile(profile *model.Profile) CwtchPeer { } // Init instantiates a cwtchPeer -func (cp *cwtchPeer) Init(acn connectivity.ACN) { - cp.acn = acn - cp.connectionsManager = connections.NewConnectionsManager(cp.acn) - go cp.connectionsManager.AttemptReconnections() +func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus *event.Manager) { + cp.queue = event.NewEventQueue(100) + go cp.eventHandler() + + cp.eventBus = eventBus + cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue.EventChannel) + cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue.EventChannel) + + // TODO: Would be nice if ProtocolEngine did not need to explictly be given the Private Key. + cp.engine = connections.NewProtocolEngine(cp.Profile.Ed25519PrivateKey, acn, eventBus) + cp.engine.Identity = identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey) } // ImportGroup intializes a group from an imported source rather than a peer invite @@ -121,20 +118,6 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err return } -// a handler for the optional data handler -// note that the "correct" way to do this would be to AddChannelHandler("im.cwtch.peerdata", ...") but peerdata is such -// a handy channel that it's nice to have this convenience shortcut -// SetPeerDataHandler sets the handler for the (optional) data channel for cwtch peers. -func (cp *cwtchPeer) SetPeerDataHandler(dataHandler func(string, []byte) []byte) { - cp.dataHandler = dataHandler -} - -// add extra channel handlers (note that the peer will merge these with the ones necessary to make cwtch work, so you -// are not clobbering the underlying functionality) -func (cp *cwtchPeer) SetApplicationInstanceFactory(aif application.ApplicationInstanceFactory) { - cp.aif = aif -} - // ExportGroup serializes a group invite so it can be given offline func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) { group := cp.Profile.GetGroupByGroupID(groupID) @@ -180,81 +163,62 @@ func (cp *cwtchPeer) GetContact(onion string) *model.PublicProfile { } // GetProfile returns the profile associated with this cwtchPeer. -// TODO While it is probably "safe", it is not really "safe", to call functions on this profile. This only exists to return things like Name and Onion,we should gate these. func (cp *cwtchPeer) GetProfile() *model.Profile { return cp.Profile } // PeerWithOnion is the entry point for cwtchPeer relationships func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection { - return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler, cp.aif) + cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[string]string{"Onion": onion})) + return nil } // InviteOnionToGroup kicks off the invite process func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error { - group := cp.Profile.GetGroupByGroupID(groupid) - if group != nil { - log.Infof("Constructing invite for group: %v\n", group) - invite, err := group.Invite(group.GetInitialMessage()) - if err != nil { - return err - } - ppc := cp.connectionsManager.GetPeerPeerConnectionForOnion(onion) - if ppc == nil { - return errors.New("peer connection not setup for onion. peers must be trusted before sending") - } - if ppc.GetState() == connections.AUTHENTICATED { - log.Infof("Got connection for group: %v - Sending Invite\n", ppc) - ppc.SendGroupInvite(invite) - } else { - return errors.New("cannot send invite to onion: peer connection is not ready") - } - return nil + if group == nil { + return errors.New("invalid group id") } - return errors.New("group id could not be found") -} -// ReceiveGroupMessage is a callback function that processes GroupMessages from a given server -func (cp *cwtchPeer) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) { - cp.Profile.AttemptDecryption(gm.Ciphertext, gm.Signature) + invite, err := group.Invite(group.InitialMessage) + if err == nil { + cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[string]string{"Onion": onion, "Invite": string(invite)})) + } + return err } // JoinServer manages a new server connection with the given onion address func (cp *cwtchPeer) JoinServer(onion string) { - cp.connectionsManager.ManageServerConnection(onion, cp.ReceiveGroupMessage) + cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[string]string{"Onion": onion})) } // SendMessageToGroup attemps to sent the given message to the given group id. func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error { group := cp.Profile.GetGroupByGroupID(groupid) if group == nil { - return errors.New("group does not exist") - } - psc := cp.connectionsManager.GetPeerServerConnectionForOnion(group.GroupServer) - if psc == nil { - return errors.New("could not find server connection to send message to") + return errors.New("invalid group id") } ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid) - if err != nil { - return err + + if err == nil { + cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[string]string{"Server": group.GroupServer, "Ciphertext": string(ct), "Signature": string(sig)})) } - gm := &protocol.GroupMessage{ - Ciphertext: ct, - Signature: sig, - } - err = psc.SendGroupMessage(gm) + return err } +func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) { + cp.eventBus.Publish(event.NewEvent(event.SendMessageToPeer, map[string]string{"Peer": onion, "Message": message})) +} + // GetPeers returns a list of peer connections. func (cp *cwtchPeer) GetPeers() map[string]connections.ConnectionState { - return cp.connectionsManager.GetPeers() + return cp.engine.GetPeers() } // GetServers returns a list of server connections func (cp *cwtchPeer) GetServers() map[string]connections.ConnectionState { - return cp.connectionsManager.GetServers() + return cp.engine.GetServers() } // TrustPeer sets an existing peer relationship to trusted @@ -269,7 +233,7 @@ func (cp *cwtchPeer) TrustPeer(peer string) error { // BlockPeer blocks an existing peer relationship. func (cp *cwtchPeer) BlockPeer(peer string) error { err := cp.Profile.BlockPeer(peer) - cp.connectionsManager.ClosePeerConnection(peer) + cp.eventBus.Publish(event.NewEvent(event.BlockPeer, map[string]string{"Onion": peer})) return err } @@ -283,90 +247,15 @@ func (cp *cwtchPeer) RejectInvite(groupID string) { cp.Profile.RejectInvite(groupID) } -// LookupContact returns that a contact is known and allowed to communicate for all cases. -func (cp *cwtchPeer) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) { - blocked := cp.Profile.IsBlocked(hostname) - return !blocked, true -} - -// LookupContactV3 returns that a contact is known and allowed to communicate for all cases. -func (cp *cwtchPeer) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) { - blocked := cp.Profile.IsBlocked(hostname) - return !blocked, true -} - -// ContactRequest needed to implement ContactRequestHandler Interface -func (cp *cwtchPeer) ContactRequest(name string, message string) string { - return "Accepted" -} - func (cp *cwtchPeer) Listen() { - go func() { - for !cp.shutdown { - e := cp.listenFn() - if e != nil { - // TODO: was panic, then fatal - fmt.Printf("ERROR: peer %v has crashed with: %v\n", cp.GetProfile().Onion, e) - } - // listenFn failed, wait 5 seconds and try again - time.Sleep(5 * time.Second) - } - }() -} - -// Listen sets up an onion listener to process incoming cwtch messages -func (cp *cwtchPeer) listenFn() error { - ra := new(application.RicochetApplication) - onionService, err := cp.acn.Listen(cp.Profile.Ed25519PrivateKey, application.RicochetPort) - if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ { - return err - } - - af := application.ApplicationInstanceFactory{} - af.Init() - af.AddHandler("im.cwtch.peer", func(rai *application.ApplicationInstance) func() channels.Handler { - cpi := new(CwtchPeerInstance) - cpi.Init(rai, ra) - return func() channels.Handler { - cpc := new(peer.CwtchPeerChannel) - cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp} - return cpc - } - }) - - if cp.dataHandler != nil { - af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler { - cpi := new(CwtchPeerInstance) - cpi.Init(rai, ra) - return func() channels.Handler { - cpc := new(peer.CwtchPeerDataChannel) - cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp, DataHandler: cp.dataHandler} - return cpc - } - }) - } - - handlers := cp.aif.GetHandlers() - for i := range handlers { - af.AddHandler(handlers[i], cp.aif.GetHandler(handlers[i])) - } - - ra.Init(cp.acn, cp.Profile.Name, identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey), af, cp) - log.Infof("Running cwtch peer on %v", onionService.AddressFull()) - cp.app = ra - cp.started = true - ra.Run(onionService) - - return nil + cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[string]string{})) } // Shutdown kills all connections and cleans up all goroutines for the peer func (cp *cwtchPeer) Shutdown() { cp.shutdown = true - cp.connectionsManager.Shutdown() - if cp.app != nil { - cp.app.Shutdown() - } + cp.engine.Shutdown() + cp.queue.Shutdown() } // IsStarted returns true if Listen() has successfully been run before on this connection (ever). TODO: we will need to properly unset this flag on error if we want to support resumption in the future @@ -374,32 +263,25 @@ func (cp *cwtchPeer) IsStarted() bool { return cp.started } -// CwtchPeerInstance encapsulates incoming peer connections -type CwtchPeerInstance struct { - rai *application.ApplicationInstance - ra *application.RicochetApplication -} - -// Init sets up a CwtchPeerInstance -func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *application.RicochetApplication) { - cpi.rai = rai - cpi.ra = ra -} - -// CwtchPeerHandler encapsulates handling of incoming CwtchPackets -type CwtchPeerHandler struct { - Onion string - Peer *cwtchPeer - DataHandler func(string, []byte) []byte -} - -// HandleGroupInvite handles incoming GroupInvites -func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) { - log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String()) - cph.Peer.Profile.ProcessInvite(gci, cph.Onion) -} - -// HandlePacket handles the Cwtch cwtchPeer Data Channel -func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte { - return cph.DataHandler(cph.Onion, data) +// eventHandler process events from other subsystems +func (cp *cwtchPeer) eventHandler() { + for { + ev := cp.queue.Next() + switch ev.EventType { + case event.EncryptedGroupMessage: + ok, groupID, _ := cp.Profile.AttemptDecryption([]byte(ev.Data["Ciphertext"]), []byte(ev.Data["Signature"])) + if ok { + cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[string]string{"GroupID": groupID})) + } + case event.NewGroupInvite: + var groupInvite protocol.GroupChatInvite + proto.Unmarshal([]byte(ev.Data["GroupInvite"]), &groupInvite) + cp.Profile.ProcessInvite(&groupInvite, ev.Data["Onion"]) + default: + if ev.EventType != "" { + log.Error("peer event handler received an event it was not subscribed for: %v") + } + return + } + } } diff --git a/peer/cwtch_peer_test.go b/peer/cwtch_peer_test.go index 6f605c4..27c5326 100644 --- a/peer/cwtch_peer_test.go +++ b/peer/cwtch_peer_test.go @@ -1,12 +1,12 @@ package peer import ( - "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "testing" ) +// TODO: Rewrite these tests (and others) using the news event bus interface. func TestCwtchPeerGenerate(t *testing.T) { - + /** alice := NewCwtchPeer("alice") groupID, _, _ := alice.StartGroup("test.server") @@ -16,16 +16,21 @@ func TestCwtchPeerGenerate(t *testing.T) { importedGroupID, err := alice.ImportGroup(exportedGroup) group := alice.GetGroup(importedGroupID) t.Logf("Imported Group: %v, err := %v %v", group, err, importedGroupID) - + */ } func TestTrustPeer(t *testing.T) { + /** groupName := "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd" alice := NewCwtchPeer("alice") - alice.Init(connectivity.LocalProvider()) + aem := new(event.Manager) + aem.Initialize() + alice.Init(connectivity.LocalProvider(),aem) defer alice.Shutdown() bob := NewCwtchPeer("bob") - bob.Init(connectivity.LocalProvider()) + bem := new(event.Manager) + bem.Initialize() + bob.Init(connectivity.LocalProvider(), bem) defer bob.Shutdown() bobOnion := bob.GetProfile().Onion @@ -70,4 +75,5 @@ func TestTrustPeer(t *testing.T) { if err == nil { t.Errorf("bob trusts alice but peer connection is not ready yet. should not be able to invite her to group, instead got: %v", err) } + */ } diff --git a/peer/connections/connectionsmanager.go b/protocol/connections/connectionsmanager.go similarity index 91% rename from peer/connections/connectionsmanager.go rename to protocol/connections/connectionsmanager.go index 08b1462..0d31ed2 100644 --- a/peer/connections/connectionsmanager.go +++ b/protocol/connections/connectionsmanager.go @@ -1,9 +1,7 @@ package connections import ( - "cwtch.im/cwtch/model" "cwtch.im/cwtch/protocol" - "git.openprivacy.ca/openprivacy/libricochet-go/application" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "sync" "time" @@ -29,13 +27,13 @@ func NewConnectionsManager(acn connectivity.ACN) *Manager { } // ManagePeerConnection creates a new PeerConnection for the given Host and Profile. -func (m *Manager) ManagePeerConnection(host string, profile *model.Profile, dataHandler func(string, []byte) []byte, aif application.ApplicationInstanceFactory) *PeerPeerConnection { +func (m *Manager) ManagePeerConnection(host string, engine *Engine) *PeerPeerConnection { m.lock.Lock() defer m.lock.Unlock() _, exists := m.peerConnections[host] if !exists { - ppc := NewPeerPeerConnection(m.acn, host, profile, dataHandler, aif) + ppc := NewPeerPeerConnection(host, engine) go ppc.Run() m.peerConnections[host] = ppc return ppc diff --git a/peer/connections/connectsmanager_test.go b/protocol/connections/connectsmanager_test.go similarity index 100% rename from peer/connections/connectsmanager_test.go rename to protocol/connections/connectsmanager_test.go diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go new file mode 100644 index 0000000..f9b5deb --- /dev/null +++ b/protocol/connections/engine.go @@ -0,0 +1,246 @@ +package connections + +import ( + "crypto/rsa" + "cwtch.im/cwtch/event" + "cwtch.im/cwtch/protocol" + "cwtch.im/cwtch/protocol/connections/peer" + "errors" + "git.openprivacy.ca/openprivacy/libricochet-go/application" + "git.openprivacy.ca/openprivacy/libricochet-go/channels" + "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" + "git.openprivacy.ca/openprivacy/libricochet-go/identity" + "git.openprivacy.ca/openprivacy/libricochet-go/log" + "github.com/golang/protobuf/proto" + "golang.org/x/crypto/ed25519" +) + +// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections. +// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages +type Engine struct { + queue *event.Queue + connectionsManager *Manager + + // Engine Attributes + Identity identity.Identity + ACN connectivity.ACN + + app *application.RicochetApplication + + // Engine State + started bool + + // Pointer to the Global Event Manager + eventManager *event.Manager + privateKey ed25519.PrivateKey +} + +// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters +func NewProtocolEngine(privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager) *Engine { + engine := new(Engine) + engine.privateKey = privateKey + engine.queue = event.NewEventQueue(100) + go engine.eventHandler() + + engine.ACN = acn + engine.connectionsManager = NewConnectionsManager(engine.ACN) + go engine.connectionsManager.AttemptReconnections() + + engine.eventManager = eventManager + engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel) + engine.eventManager.Subscribe(event.PeerRequest, engine.queue.EventChannel) + engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue.EventChannel) + engine.eventManager.Subscribe(event.JoinServer, engine.queue.EventChannel) + engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue.EventChannel) + engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue.EventChannel) + + return engine +} + +// eventHandler process events from other subsystems +func (e *Engine) eventHandler() { + for { + ev := e.queue.Next() + switch ev.EventType { + case event.StatusRequest: + e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID}) + case event.PeerRequest: + e.PeerWithOnion(ev.Data["Onion"]) + case event.InvitePeerToGroup: + e.InviteOnionToGroup(ev.Data["Onion"], []byte(ev.Data["Invite"])) + case event.JoinServer: + e.JoinServer(ev.Data["Onion"]) + case event.SendMessageToGroup: + e.SendMessageToGroup(ev.Data["Server"], []byte(ev.Data["Ciphertext"]), []byte(ev.Data["Signature"])) + case event.SendMessageToPeer: + log.Debugf("Sending Message to Peer.....") + ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data["Peer"]) + if ppc != nil { + // TODO this will block. + ppc.SendPacket([]byte(ev.Data["Message"])) + } + case event.ProtocolEngineStartListen: + go e.listenFn() + default: + return + } + } +} + +// GetPeerHandler is an external interface function that allows callers access to a CwtchPeerHandler +// TODO: There is likely a slightly better way to encapsulate this behavior +func (e *Engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler { + return &CwtchPeerHandler{Onion: remotePeerHostname, EventBus: e.eventManager} +} + +// Listen sets up an onion listener to process incoming cwtch messages +func (e *Engine) listenFn() { + ra := new(application.RicochetApplication) + onionService, err := e.ACN.Listen(e.privateKey, application.RicochetPort) + if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ { + e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[string]string{"Identity": e.Identity.Hostname(), "Error": err.Error()})) + return + } + + af := application.ApplicationInstanceFactory{} + af.Init() + af.AddHandler("im.cwtch.peer", func(rai *application.ApplicationInstance) func() channels.Handler { + cpi := new(CwtchPeerInstance) + cpi.Init(rai, ra) + return func() channels.Handler { + cpc := new(peer.CwtchPeerChannel) + cpc.Handler = e.GetPeerHandler(rai.RemoteHostname) + return cpc + } + }) + + af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler { + cpi := new(CwtchPeerInstance) + cpi.Init(rai, ra) + return func() channels.Handler { + cpc := new(peer.CwtchPeerDataChannel) + cpc.Handler = e.GetPeerHandler(rai.RemoteHostname) + return cpc + } + }) + + ra.Init(e.ACN, e.Identity.Name, e.Identity, af, e) + log.Infof("Running cwtch peer on %v", onionService.AddressFull()) + e.started = true + e.app = ra + ra.Run(onionService) + e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[string]string{"Identity": e.Identity.Hostname()})) + return +} + +// LookupContact returns that a contact is known and allowed to communicate for all cases. +func (e *Engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) { + return true, true +} + +// LookupContactV3 returns that a contact is known and allowed to communicate for all cases. +func (e *Engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) { + return true, true +} + +// ContactRequest needed to implement ContactRequestHandler Interface +func (e *Engine) ContactRequest(name string, message string) string { + return "Accepted" +} + +// Shutdown tears down the eventHandler goroutine +func (e *Engine) Shutdown() { + e.connectionsManager.Shutdown() + e.app.Shutdown() + e.queue.Shutdown() +} + +// PeerWithOnion is the entry point for cwtchPeer relationships +func (e *Engine) PeerWithOnion(onion string) *PeerPeerConnection { + return e.connectionsManager.ManagePeerConnection(onion, e) +} + +// InviteOnionToGroup kicks off the invite process +func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error { + ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(onion) + if ppc == nil { + return errors.New("peer connection not setup for onion. peers must be trusted before sending") + } + if ppc.GetState() == AUTHENTICATED { + log.Infof("Got connection for group: %v - Sending Invite\n", ppc) + ppc.SendGroupInvite(invite) + } else { + return errors.New("cannot send invite to onion: peer connection is not ready") + } + return nil +} + +// ReceiveGroupMessage is a callback function that processes GroupMessages from a given server +func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) { + // Publish Event so that a Profile Engine can deal with it. + // Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine! + e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[string]string{"Ciphertext": string(gm.GetCiphertext()), "Signature": string(gm.GetSignature())})) +} + +// JoinServer manages a new server connection with the given onion address +func (e *Engine) JoinServer(onion string) { + e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage) +} + +// SendMessageToGroup attemps to sent the given message to the given group id. +func (e *Engine) SendMessageToGroup(server string, ct []byte, sig []byte) error { + psc := e.connectionsManager.GetPeerServerConnectionForOnion(server) + if psc == nil { + return errors.New("could not find server connection to send message to") + } + gm := &protocol.GroupMessage{ + Ciphertext: ct, + Signature: sig, + } + err := psc.SendGroupMessage(gm) + return err +} + +// GetPeers returns a list of peer connections. +func (e *Engine) GetPeers() map[string]ConnectionState { + return e.connectionsManager.GetPeers() +} + +// GetServers returns a list of server connections +func (e *Engine) GetServers() map[string]ConnectionState { + return e.connectionsManager.GetServers() +} + +// CwtchPeerInstance encapsulates incoming peer connections +type CwtchPeerInstance struct { + rai *application.ApplicationInstance + ra *application.RicochetApplication +} + +// Init sets up a CwtchPeerInstance +func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *application.RicochetApplication) { + cpi.rai = rai + cpi.ra = ra +} + +// CwtchPeerHandler encapsulates handling of incoming CwtchPackets +type CwtchPeerHandler struct { + Onion string + EventBus *event.Manager + DataHandler func(string, []byte) []byte +} + +// HandleGroupInvite handles incoming GroupInvites +func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) { + log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String()) + marshal, err := proto.Marshal(gci) + if err == nil { + cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[string]string{"Onion": cph.Onion, "GroupInvite": string(marshal)})) + } +} + +// HandlePacket handles the Cwtch cwtchPeer Data Channel +func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte { + cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[string]string{"Onion": cph.Onion, "Data": string(data)})) + return []byte{} // TODO remove this +} diff --git a/peer/fetch/peer_fetch_channel.go b/protocol/connections/fetch/peer_fetch_channel.go similarity index 100% rename from peer/fetch/peer_fetch_channel.go rename to protocol/connections/fetch/peer_fetch_channel.go diff --git a/peer/fetch/peer_fetch_channel_test.go b/protocol/connections/fetch/peer_fetch_channel_test.go similarity index 100% rename from peer/fetch/peer_fetch_channel_test.go rename to protocol/connections/fetch/peer_fetch_channel_test.go diff --git a/peer/listen/peer_listen_channel.go b/protocol/connections/listen/peer_listen_channel.go similarity index 93% rename from peer/listen/peer_listen_channel.go rename to protocol/connections/listen/peer_listen_channel.go index 032cdd3..0aacc0a 100644 --- a/peer/listen/peer_listen_channel.go +++ b/protocol/connections/listen/peer_listen_channel.go @@ -78,9 +78,7 @@ func (cplc *CwtchPeerListenChannel) Packet(data []byte) { if err == nil { if csp.GetGroupMessage() != nil { gm := csp.GetGroupMessage() - // We create a new go routine here to avoid leaking any information about processing time - // TODO Server can probably try to use this to DoS a peer - go cplc.Handler.HandleGroupMessage(gm) + cplc.Handler.HandleGroupMessage(gm) } } } diff --git a/peer/listen/peer_listen_channel_test.go b/protocol/connections/listen/peer_listen_channel_test.go similarity index 100% rename from peer/listen/peer_listen_channel_test.go rename to protocol/connections/listen/peer_listen_channel_test.go diff --git a/peer/peer/peer_channel.go b/protocol/connections/peer/peer_channel.go similarity index 100% rename from peer/peer/peer_channel.go rename to protocol/connections/peer/peer_channel.go diff --git a/peer/peer/peer_channel_test.go b/protocol/connections/peer/peer_channel_test.go similarity index 100% rename from peer/peer/peer_channel_test.go rename to protocol/connections/peer/peer_channel_test.go diff --git a/peer/peer/peer_data_channel.go b/protocol/connections/peer/peer_data_channel.go similarity index 100% rename from peer/peer/peer_data_channel.go rename to protocol/connections/peer/peer_data_channel.go diff --git a/peer/connections/peerpeerconnection.go b/protocol/connections/peerpeerconnection.go similarity index 53% rename from peer/connections/peerpeerconnection.go rename to protocol/connections/peerpeerconnection.go index 8b290ff..d6a9ca4 100644 --- a/peer/connections/peerpeerconnection.go +++ b/protocol/connections/peerpeerconnection.go @@ -1,15 +1,10 @@ package connections import ( - "cwtch.im/cwtch/model" - "cwtch.im/cwtch/peer/peer" - "cwtch.im/cwtch/protocol" + "cwtch.im/cwtch/protocol/connections/peer" "git.openprivacy.ca/openprivacy/libricochet-go" - "git.openprivacy.ca/openprivacy/libricochet-go/application" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/connection" - "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" - "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "time" ) @@ -17,23 +12,17 @@ import ( // PeerPeerConnection encapsulates a single outgoing cwtchPeer->cwtchPeer connection type PeerPeerConnection struct { connection.AutoConnectionHandler - PeerHostname string - state ConnectionState - connection *connection.Connection - profile *model.Profile - dataHandler func(string, []byte) []byte - aif application.ApplicationInstanceFactory - acn connectivity.ACN + PeerHostname string + state ConnectionState + connection *connection.Connection + protocolEngine *Engine } // NewPeerPeerConnection creates a new peer connection for the given hostname and profile. -func NewPeerPeerConnection(acn connectivity.ACN, peerhostname string, profile *model.Profile, dataHandler func(string, []byte) []byte, aif application.ApplicationInstanceFactory) *PeerPeerConnection { +func NewPeerPeerConnection(peerhostname string, protocolEngine *Engine) *PeerPeerConnection { ppc := new(PeerPeerConnection) - ppc.acn = acn ppc.PeerHostname = peerhostname - ppc.profile = profile - ppc.dataHandler = dataHandler - ppc.aif = aif + ppc.protocolEngine = protocolEngine ppc.Init() return ppc } @@ -43,16 +32,6 @@ func (ppc *PeerPeerConnection) GetState() ConnectionState { return ppc.state } -// HandleGroupInvite passes the given group invite tothe profile -func (ppc *PeerPeerConnection) HandleGroupInvite(gci *protocol.GroupChatInvite) { - ppc.profile.ProcessInvite(gci, ppc.PeerHostname) -} - -// HandlePacket handles data packets on the optional data channel -func (ppc *PeerPeerConnection) HandlePacket(data []byte) []byte { - return ppc.dataHandler(ppc.PeerHostname, data) -} - // SendPacket sends data packets on the optional data channel func (ppc *PeerPeerConnection) SendPacket(data []byte) { ppc.WaitTilAuthenticated() @@ -69,18 +48,6 @@ func (ppc *PeerPeerConnection) SendPacket(data []byte) { }) } -// DoOnChannel performs an operation on the requested channel -func (ppc *PeerPeerConnection) DoOnChannel(ctype string, direction channels.Direction, doSomethingWith func(channel *channels.Channel)) { - ppc.WaitTilAuthenticated() - ppc.connection.Do(func() error { - channel := ppc.connection.Channel(ctype, direction) - if channel != nil { - doSomethingWith(channel) - } - return nil - }) -} - // SendGroupInvite sends the given serialized invite packet to the Peer func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) { ppc.WaitTilAuthenticated() @@ -110,33 +77,23 @@ func (ppc *PeerPeerConnection) WaitTilAuthenticated() { // Run manages the setup and teardown of a peer->peer connection func (ppc *PeerPeerConnection) Run() error { ppc.state = CONNECTING - rc, err := goricochet.Open(ppc.acn, ppc.PeerHostname) + rc, err := goricochet.Open(ppc.protocolEngine.ACN, ppc.PeerHostname) if err == nil { ppc.connection = rc ppc.state = CONNECTED - _, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(identity.InitializeV3(ppc.profile.Name, &ppc.profile.Ed25519PrivateKey, &ppc.profile.Ed25519PublicKey)) + _, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity) if err == nil { ppc.state = AUTHENTICATED go func() { ppc.connection.Do(func() error { - ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc}) + ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)}) return nil }) - if ppc.dataHandler != nil { - ppc.connection.Do(func() error { - ppc.connection.RequestOpenChannel("im.cwtch.peer.data", &peer.CwtchPeerDataChannel{Handler: ppc}) - return nil - }) - } - - handlers := ppc.aif.GetHandlers() - for i := range handlers { - ppc.connection.Do(func() error { - ppc.connection.RequestOpenChannel(handlers[i], ppc.aif.GetHandler(handlers[i])(ppc.aif.GetApplicationInstance(ppc.connection))()) - return nil - }) - } + ppc.connection.Do(func() error { + ppc.connection.RequestOpenChannel("im.cwtch.peer.data", &peer.CwtchPeerDataChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)}) + return nil + }) }() ppc.connection.Process(ppc) diff --git a/peer/connections/peerpeerconnection_test.go b/protocol/connections/peerpeerconnection_test.go similarity index 91% rename from peer/connections/peerpeerconnection_test.go rename to protocol/connections/peerpeerconnection_test.go index 7d02e09..1182c4c 100644 --- a/peer/connections/peerpeerconnection_test.go +++ b/protocol/connections/peerpeerconnection_test.go @@ -3,11 +3,10 @@ package connections import ( "crypto/rand" "cwtch.im/cwtch/model" - "cwtch.im/cwtch/peer/peer" "cwtch.im/cwtch/protocol" + "cwtch.im/cwtch/protocol/connections/peer" "fmt" "git.openprivacy.ca/openprivacy/libricochet-go" - "git.openprivacy.ca/openprivacy/libricochet-go/application" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/connection" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" @@ -61,7 +60,7 @@ func TestPeerPeerConnection(t *testing.T) { profile := model.GenerateNewProfile("alice") hostname := identity.Hostname() - ppc := NewPeerPeerConnection(connectivity.LocalProvider(), "127.0.0.1:5452|"+hostname, profile, nil, application.ApplicationInstanceFactory{}) + ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, &Engine{ACN: connectivity.LocalProvider(), Identity: identity}) tp := new(TestPeer) tp.Init() diff --git a/peer/connections/peerserverconnection.go b/protocol/connections/peerserverconnection.go similarity index 96% rename from peer/connections/peerserverconnection.go rename to protocol/connections/peerserverconnection.go index e8a41da..71d3abf 100644 --- a/peer/connections/peerserverconnection.go +++ b/protocol/connections/peerserverconnection.go @@ -2,10 +2,10 @@ package connections import ( "crypto/rand" - "cwtch.im/cwtch/peer/fetch" - "cwtch.im/cwtch/peer/listen" - "cwtch.im/cwtch/peer/send" "cwtch.im/cwtch/protocol" + "cwtch.im/cwtch/protocol/connections/fetch" + "cwtch.im/cwtch/protocol/connections/listen" + "cwtch.im/cwtch/protocol/connections/send" "errors" "git.openprivacy.ca/openprivacy/libricochet-go" "git.openprivacy.ca/openprivacy/libricochet-go/channels" diff --git a/peer/connections/peerserverconnection_test.go b/protocol/connections/peerserverconnection_test.go similarity index 100% rename from peer/connections/peerserverconnection_test.go rename to protocol/connections/peerserverconnection_test.go diff --git a/peer/send/peer_send_channel.go b/protocol/connections/send/peer_send_channel.go similarity index 98% rename from peer/send/peer_send_channel.go rename to protocol/connections/send/peer_send_channel.go index e302857..a374b94 100644 --- a/peer/send/peer_send_channel.go +++ b/protocol/connections/send/peer_send_channel.go @@ -2,7 +2,7 @@ package send import ( "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/protocol/spam" + "cwtch.im/cwtch/protocol/connections/spam" "errors" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/utils" diff --git a/peer/send/peer_send_channel_test.go b/protocol/connections/send/peer_send_channel_test.go similarity index 98% rename from peer/send/peer_send_channel_test.go rename to protocol/connections/send/peer_send_channel_test.go index 4715c1d..5ec16ba 100644 --- a/peer/send/peer_send_channel_test.go +++ b/protocol/connections/send/peer_send_channel_test.go @@ -2,7 +2,7 @@ package send import ( "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/protocol/spam" + "cwtch.im/cwtch/protocol/connections/spam" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/wire/control" "github.com/golang/protobuf/proto" diff --git a/protocol/spam/spamguard.go b/protocol/connections/spam/spamguard.go similarity index 100% rename from protocol/spam/spamguard.go rename to protocol/connections/spam/spamguard.go diff --git a/protocol/spam/spamguard_test.go b/protocol/connections/spam/spamguard_test.go similarity index 100% rename from protocol/spam/spamguard_test.go rename to protocol/connections/spam/spamguard_test.go diff --git a/peer/connections/state.go b/protocol/connections/state.go similarity index 100% rename from peer/connections/state.go rename to protocol/connections/state.go diff --git a/server/send/server_send_channel.go b/server/send/server_send_channel.go index 8ba36b4..55a2b43 100644 --- a/server/send/server_send_channel.go +++ b/server/send/server_send_channel.go @@ -2,7 +2,7 @@ package send import ( "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/protocol/spam" + "cwtch.im/cwtch/protocol/connections/spam" "errors" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/log" diff --git a/server/send/server_send_channel_test.go b/server/send/server_send_channel_test.go index 7e963ba..443375f 100644 --- a/server/send/server_send_channel_test.go +++ b/server/send/server_send_channel_test.go @@ -2,7 +2,7 @@ package send import ( "cwtch.im/cwtch/protocol" - "cwtch.im/cwtch/protocol/spam" + "cwtch.im/cwtch/protocol/connections/spam" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/wire/control" "github.com/golang/protobuf/proto" diff --git a/testing/cwtch_peer_server_intergration_test.go b/testing/cwtch_peer_server_intergration_test.go index 6d900b8..d76358a 100644 --- a/testing/cwtch_peer_server_intergration_test.go +++ b/testing/cwtch_peer_server_intergration_test.go @@ -1,15 +1,17 @@ package testing import ( + "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" "cwtch.im/cwtch/peer" - "cwtch.im/cwtch/peer/connections" + "cwtch.im/cwtch/protocol/connections" cwtchserver "cwtch.im/cwtch/server" "fmt" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "golang.org/x/net/proxy" "os" "runtime" + "runtime/pprof" "testing" "time" ) @@ -62,13 +64,13 @@ func waitForPeerServerConnection(t *testing.T, peer peer.CwtchPeer, server strin } if state != connections.AUTHENTICATED { fmt.Printf("peer %v waiting connect to server %v, currently: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state]) - time.Sleep(time.Second * 10) + time.Sleep(time.Second * 5) continue + } else { + break } - } else { - t.Fatalf("peer server connectiond %v should have entry for server %v", servers, server) - } - break + } // It might take a second for the server to show up as it is now going through the event bus + time.Sleep(time.Second) } return } @@ -83,13 +85,13 @@ func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.Cw } if state != connections.AUTHENTICATED { fmt.Printf("peer% v waiting connect to peer %v, currently: %v\n", peera.GetProfile().Onion, peerb.GetProfile().Onion, connections.ConnectionStateName[state]) - time.Sleep(time.Second * 10) + time.Sleep(time.Second * 5) continue + } else { + break } - } else { - t.Fatalf("peer peer connectiond %v should have entry for server %v", peers, peerb.GetProfile().Onion) - } - break + } // It might take a second for the peer to show up as it is now going through the event bus + time.Sleep(time.Second) } return } @@ -130,21 +132,29 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** cwtchPeer setup ***** + // It's important that each Peer have their own EventBus + aliceEventBus := new(event.Manager) + aliceEventBus.Initialize() + bobEventBus := new(event.Manager) + bobEventBus.Initialize() + carolEventBus := new(event.Manager) + carolEventBus.Initialize() + fmt.Println("Creating Alice...") alice := peer.NewCwtchPeer("Alice") - alice.Init(acn) + alice.Init(acn, aliceEventBus) alice.Listen() fmt.Println("Alice created:", alice.GetProfile().Onion) fmt.Println("Creating Bob...") bob := peer.NewCwtchPeer("Bob") - bob.Init(acn) + bob.Init(acn, bobEventBus) bob.Listen() fmt.Println("Bob created:", bob.GetProfile().Onion) fmt.Println("Creating Carol...") carol := peer.NewCwtchPeer("Carol") - carol.Init(acn) + carol.Init(acn, carolEventBus) carol.Listen() fmt.Println("Carol created:", carol.GetProfile().Onion) @@ -280,6 +290,7 @@ func TestCwtchPeerIntegration(t *testing.T) { fmt.Println("Shutting down Alice...") alice.Shutdown() + aliceEventBus.Shutdown() time.Sleep(time.Second * 5) numGoRoutinesPostAlice := runtime.NumGoroutine() @@ -368,6 +379,7 @@ func TestCwtchPeerIntegration(t *testing.T) { fmt.Println("Shutting down Bob...") bob.Shutdown() + bobEventBus.Shutdown() time.Sleep(time.Second * 3) numGoRoutinesPostBob := runtime.NumGoroutine() if server != nil { @@ -377,11 +389,16 @@ func TestCwtchPeerIntegration(t *testing.T) { } numGoRoutinesPostServerShutdown := runtime.NumGoroutine() - fmt.Println("Shuttind down Carol...") + fmt.Println("Shutting down Carol...") carol.Shutdown() + carolEventBus.Shutdown() time.Sleep(time.Second * 3) numGoRoutinesPostCarol := runtime.NumGoroutine() + // Printing out the current goroutines + // Very useful if we are leaking any. + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ "numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\n", numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, diff --git a/testing/tests.sh b/testing/tests.sh index 8213cfb..a86c11f 100755 --- a/testing/tests.sh +++ b/testing/tests.sh @@ -4,13 +4,13 @@ set -e pwd go test ${1} -coverprofile=model.cover.out -v ./model go test ${1} -coverprofile=event.cover.out -v ./event -go test ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/spam go test ${1} -coverprofile=storage.cover.out -v ./storage -go test ${1} -coverprofile=peer.connections.cover.out -v ./peer/connections -go test ${1} -coverprofile=peer.fetch.cover.out -v ./peer/fetch -go test ${1} -coverprofile=peer.listen.cover.out -v ./peer/listen -go test ${1} -coverprofile=peer.peer.cover.out -v ./peer/peer -go test ${1} -coverprofile=peer.send.cover.out -v ./peer/send +go test ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections +go test ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/connections/spam +go test ${1} -coverprofile=peer.fetch.cover.out -v ./protocol/connections/fetch +go test ${1} -coverprofile=peer.listen.cover.out -v ./protocol/connections/listen +go test ${1} -coverprofile=peer.peer.cover.out -v ./protocol/connections/peer +go test ${1} -coverprofile=peer.send.cover.out -v ./protocol/connections/send go test ${1} -coverprofile=peer.cover.out -v ./peer go test ${1} -coverprofile=server.fetch.cover.out -v ./server/fetch go test ${1} -coverprofile=server.listen.cover.out -v ./server/listen