Browse Source

Protocol Engine Refactor

Sarah Jamie Lewis 6 months ago
parent
commit
c3d797b2e1
36 changed files with 513 additions and 335 deletions
  1. 6 2
      app/app.go
  2. 3 2
      app/bots/servermon/main.go
  3. 4 1
      app/cli/main.go
  4. 25 5
      app/peer/alice/alice.go
  5. 22 9
      app/peer/bob/bob.go
  6. 27 0
      event/common.go
  7. 14 7
      event/eventmanager.go
  8. 9 9
      event/eventmanager_test.go
  9. 5 5
      model/message_test.go
  10. 5 5
      model/profile.go
  11. 2 2
      model/profile_test.go
  12. 70 188
      peer/cwtch_peer.go
  13. 11 5
      peer/cwtch_peer_test.go
  14. 2 4
      peer/connections/connectionsmanager.go
  15. 0 0
      protocol/connections/connectsmanager_test.go
  16. 246 0
      protocol/connections/engine.go
  17. 0 0
      protocol/connections/fetch/peer_fetch_channel.go
  18. 0 0
      protocol/connections/fetch/peer_fetch_channel_test.go
  19. 1 3
      peer/listen/peer_listen_channel.go
  20. 0 0
      protocol/connections/listen/peer_listen_channel_test.go
  21. 0 0
      protocol/connections/peer/peer_channel.go
  22. 0 0
      protocol/connections/peer/peer_channel_test.go
  23. 0 0
      protocol/connections/peer/peer_data_channel.go
  24. 14 57
      peer/connections/peerpeerconnection.go
  25. 2 3
      peer/connections/peerpeerconnection_test.go
  26. 3 3
      peer/connections/peerserverconnection.go
  27. 0 0
      protocol/connections/peerserverconnection_test.go
  28. 1 1
      peer/send/peer_send_channel.go
  29. 1 1
      peer/send/peer_send_channel_test.go
  30. 0 0
      protocol/connections/spam/spamguard.go
  31. 0 0
      protocol/connections/spam/spamguard_test.go
  32. 0 0
      protocol/connections/state.go
  33. 1 1
      server/send/server_send_channel.go
  34. 1 1
      server/send/server_send_channel_test.go
  35. 32 15
      testing/cwtch_peer_server_intergration_test.go
  36. 6 6
      testing/tests.sh

+ 6 - 2
app/app.go

@@ -2,6 +2,7 @@ package app
 
 import (
 	"crypto/rand"
+	"cwtch.im/cwtch/event"
 	"cwtch.im/cwtch/peer"
 	"cwtch.im/cwtch/storage"
 	"encoding/hex"
@@ -23,6 +24,7 @@ type application struct {
 	mutex        sync.Mutex
 	primaryonion string
 	storage      map[string]storage.ProfileStore
+	eventBus     *event.Manager
 }
 
 // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
@@ -46,6 +48,8 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application {
 	log.Debugf("NewApp(%v)\n", appDirectory)
 	app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), directory: appDirectory, acn: acn}
 	os.Mkdir(path.Join(app.directory, "profiles"), 0700)
+	app.eventBus = new(event.Manager)
+	app.eventBus.Initialize()
 	return app
 }
 
@@ -73,7 +77,7 @@ func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer
 	if err != nil {
 		return nil, err
 	}
-	p.Init(app.acn)
+	p.Init(app.acn, app.eventBus)
 	_, exists := app.peers[p.GetProfile().Onion]
 	if exists {
 		p.Shutdown()
@@ -109,7 +113,7 @@ func (app *application) LoadProfiles(password string) error {
 			continue
 		}
 
-		p.Init(app.acn)
+		p.Init(app.acn, app.eventBus)
 
 		app.mutex.Lock()
 		app.peers[p.GetProfile().Onion] = p

+ 3 - 2
app/bots/servermon/main.go

@@ -1,8 +1,9 @@
 package main
 
 import (
+	"cwtch.im/cwtch/event"
 	"cwtch.im/cwtch/peer"
-	"cwtch.im/cwtch/peer/connections"
+	"cwtch.im/cwtch/protocol/connections"
 	"errors"
 	"fmt"
 	"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
@@ -46,7 +47,7 @@ func main() {
 	}
 
 	botPeer := peer.NewCwtchPeer("servermon")
-	botPeer.Init(acn)
+	botPeer.Init(acn, new(event.Manager))
 
 	fmt.Printf("Connecting to %v...\n", serverAddr)
 	botPeer.JoinServer(serverAddr)

+ 4 - 1
app/cli/main.go

@@ -6,7 +6,7 @@ import (
 
 	"bytes"
 	"cwtch.im/cwtch/model"
-	"cwtch.im/cwtch/peer/connections"
+	"cwtch.im/cwtch/protocol/connections"
 	"fmt"
 	"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
 	"git.openprivacy.ca/openprivacy/libricochet-go/log"
@@ -265,6 +265,7 @@ func main() {
 		log.Errorf("Error initializing application: %v", err)
 		os.Exit(1)
 	}
+	log.SetLevel(log.LevelDebug)
 	fmt.Printf("\nWelcome to Cwtch!\n")
 	fmt.Printf("If this if your first time you should create a profile by running `/new-profile`\n")
 	fmt.Printf("`/load-profiles` will prompt you for a password and load profiles from storage\n")
@@ -283,6 +284,8 @@ func main() {
 
 		text := prompt.Input(prmpt, completer, prompt.OptionSuggestionBGColor(prompt.Purple),
 			prompt.OptionDescriptionBGColor(prompt.White),
+			prompt.OptionPrefixTextColor(prompt.White),
+			prompt.OptionInputTextColor(prompt.Purple),
 			prompt.OptionHistory(history))
 
 		commands := strings.Split(text[0:], " ")

+ 25 - 5
app/peer/alice/alice.go

@@ -1,20 +1,40 @@
 package main
 
 import (
+	"cwtch.im/cwtch/event"
 	"cwtch.im/cwtch/peer"
+	"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
 	"git.openprivacy.ca/openprivacy/libricochet-go/log"
+	"os"
+	"path"
 )
 
 func main() {
+
+	// System Setup, We need Tor and Logging up and Running
 	log.AddEverythingFromPattern("peer/alice")
-	alice := peer.NewCwtchPeer("alice")
+	log.SetLevel(log.LevelDebug)
 
-	processData := func(onion string, data []byte) []byte {
-		log.Debugf("Recieved %s from %v", data, onion)
-		return data
+	acn, err := connectivity.StartTor(path.Join(".", ".cwtch"), "")
+	if err != nil {
+		log.Errorf("\nError connecting to Tor: %v\n", err)
+		os.Exit(1)
 	}
 
-	alice.SetPeerDataHandler(processData)
+	// Setup the Event Bus to Listen for Data Packets
+	eventBus := new(event.Manager)
+	eventBus.Initialize()
+	queue := event.NewEventQueue(100)
+	eventBus.Subscribe(event.NewMessageFromPeer, queue.EventChannel)
+
+	// Setup Alice to Listen for new Events
+	alice := peer.NewCwtchPeer("alice")
+	alice.Init(acn, eventBus)
 	alice.Listen()
 
+	// For every new Data Packet Alice recieved she will Print it out.
+	for {
+		event := queue.Next()
+		log.Printf(log.LevelInfo, "Received %v from %v: %s", event.EventType, event.Data["Onion"], event.Data["Data"])
+	}
 }

+ 22 - 9
app/peer/bob/bob.go

@@ -1,28 +1,41 @@
 package main
 
 import (
+	"cwtch.im/cwtch/event"
 	"cwtch.im/cwtch/peer"
+	"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
 	"git.openprivacy.ca/openprivacy/libricochet-go/log"
-	"strconv"
+	"os"
+	"path"
 	"time"
 )
 
 func main() {
+
+	// System Boilerplate, We need Tor Up and Running
 	log.AddEverythingFromPattern("peer/bob")
+	log.SetLevel(log.LevelDebug)
+	acn, err := connectivity.StartTor(path.Join(".", ".cwtch"), "")
+	if err != nil {
+		log.Errorf("\nError connecting to Tor: %v\n", err)
+		os.Exit(1)
+	}
+
+	// Set up the Event Buss and Initialize the Peer
+	eventBus := new(event.Manager)
+	eventBus.Initialize()
 	bob := peer.NewCwtchPeer("bob")
-	counter := 1
+	bob.Init(acn, eventBus)
 
-	bob.SetPeerDataHandler(func(onion string, data []byte) []byte {
-		log.Infof("Recieved %s from %v", data, onion)
-		counter++
-		return []byte(strconv.Itoa(counter))
-	})
-	connection := bob.PeerWithOnion("f4b6thuwmfszsqd3fzqpr45sdem4qoazdlzr2xmnc7fq22qe746hjqqd")
+	// Add Alice's Onion Here (It changes run to run)
+	bob.PeerWithOnion("upiztu7myymjf2dn4x4czhagp7axlnqjvf5zwfegbhtpkqb6v3vgu5yd")
 
+	// Send the Message...
 	log.Infof("Waiting for Bob to Connect to Alice...")
-	connection.SendPacket([]byte("Hello Alice!!!"))
+	bob.SendMessageToPeer("upiztu7myymjf2dn4x4czhagp7axlnqjvf5zwfegbhtpkqb6v3vgu5yd", "Hello Alice!!!")
 
 	// Wait a while...
+	// Everything is run in  a goroutine so the main thread has to stay active
 	time.Sleep(time.Second * 100)
 
 }

+ 27 - 0
event/common.go

@@ -0,0 +1,27 @@
+package event
+
+// Type captures the definition of many common Cwtch application events
+type Type string
+
+// Defining Common Event Types
+const (
+	StatusRequest        = Type("StatusRequest")
+	ProtocolEngineStatus = Type("ProtocolEngineStatus")
+
+	PeerRequest = Type("PeerRequest")
+	BlockPeer   = Type("BlockPeer")
+	JoinServer  = Type("JoinServer")
+
+	ProtocolEngineStartListen = Type("ProtocolEngineStartListen")
+	ProtocolEngineStopped     = Type("ProtocolEngineStopped")
+
+	InvitePeerToGroup = Type("InvitePeerToGroup")
+	NewGroupInvite    = Type("NewGroupInvite")
+
+	SendMessageToGroup    = Type("SendMessagetoGroup")
+	EncryptedGroupMessage = Type("EncryptedGroupMessage")
+	NewMessageFromGroup   = Type("NewMessageFromGroup")
+
+	SendMessageToPeer  = Type("SendMessageToPeer")
+	NewMessageFromPeer = Type("NewMessageFromPeer")
+)

+ 14 - 7
event/eventmanager.go

@@ -2,18 +2,25 @@ package event
 
 import (
 	"git.openprivacy.ca/openprivacy/libricochet-go/log"
+	"git.openprivacy.ca/openprivacy/libricochet-go/utils"
 	"sync"
 )
 
-// Event is a structure which binds a given set of data to an EventType
+// Event is a structure which binds a given set of data to an Type
 type Event struct {
-	EventType string
-	Data      []byte
+	EventType Type
+	EventID   string
+	Data      map[string]string
+}
+
+// NewEvent creates a new event object with a unique ID and the given type and data.
+func NewEvent(eventType Type, data map[string]string) Event {
+	return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data}
 }
 
 // Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
 type Manager struct {
-	subscribers map[string][]chan Event
+	subscribers map[Type][]chan Event
 	events      chan Event
 	mapMutex    sync.Mutex
 	internal    chan bool
@@ -21,7 +28,7 @@ type Manager struct {
 
 // Initialize sets up the Manager.
 func (em *Manager) Initialize() {
-	em.subscribers = make(map[string][]chan Event)
+	em.subscribers = make(map[Type][]chan Event)
 	em.events = make(chan Event)
 	em.internal = make(chan bool)
 	go em.eventBus()
@@ -29,7 +36,7 @@ func (em *Manager) Initialize() {
 
 // Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type
 // will be sent to the eventChannel.
-func (em *Manager) Subscribe(eventType string, eventChannel chan Event) {
+func (em *Manager) Subscribe(eventType Type, eventChannel chan Event) {
 	em.mapMutex.Lock()
 	defer em.mapMutex.Unlock()
 	em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel)
@@ -75,7 +82,7 @@ func (em *Manager) eventBus() {
 
 // Shutdown triggers, and waits for, the internal eventBus goroutine to finish
 func (em *Manager) Shutdown() {
-	em.events <- Event{"", []byte{}}
+	em.events <- Event{}
 	// wait for eventBus to finish
 	<-em.internal
 	close(em.events)

+ 9 - 9
event/eventmanager_test.go

@@ -14,10 +14,10 @@ func TestEventManager(t *testing.T) {
 	// We need to make this buffer at least 1, otherwise we will log an error!
 	testChan := make(chan Event, 1)
 	eventManager.Subscribe("TEST", testChan)
-	eventManager.Publish(Event{EventType: "TEST", Data: []byte("Hello World")})
+	eventManager.Publish(Event{EventType: "TEST", Data: map[string]string{"Value": "Hello World"}})
 
 	event := <-testChan
-	if event.EventType == "TEST" && string(event.Data) == "Hello World" {
+	if event.EventType == "TEST" && event.Data["Value"] == "Hello World" {
 
 	} else {
 		t.Errorf("Received Invalid Event")
@@ -34,7 +34,7 @@ func TestEventManagerOverflow(t *testing.T) {
 	// Explicitly setting this to 0 log an error!
 	testChan := make(chan Event)
 	eventManager.Subscribe("TEST", testChan)
-	eventManager.Publish(Event{EventType: "TEST", Data: []byte("Hello World")})
+	eventManager.Publish(Event{EventType: "TEST"})
 }
 
 func TestEventManagerMultiple(t *testing.T) {
@@ -46,17 +46,17 @@ func TestEventManagerMultiple(t *testing.T) {
 	peerEventQueue := NewEventQueue(10)
 	allEventQueue := NewEventQueue(10)
 
-
 	eventManager.Subscribe("PeerEvent", peerEventQueue.EventChannel)
 	eventManager.Subscribe("GroupEvent", groupEventQueue.EventChannel)
 	eventManager.Subscribe("PeerEvent", allEventQueue.EventChannel)
 	eventManager.Subscribe("GroupEvent", allEventQueue.EventChannel)
 	eventManager.Subscribe("ErrorEvent", allEventQueue.EventChannel)
 
-	eventManager.Publish(Event{EventType: "PeerEvent", Data: []byte("Hello World Peer")})
-	eventManager.Publish(Event{EventType: "GroupEvent", Data: []byte("Hello World Group")})
-	eventManager.Publish(Event{EventType: "PeerEvent", Data: []byte("Hello World Peer 2")})
-	eventManager.Publish(Event{EventType: "ErrorEvent", Data: []byte("Hello World Error")})
+	eventManager.Publish(Event{EventType: "PeerEvent", Data: map[string]string{"Value": "Hello World Peer"}})
+	eventManager.Publish(Event{EventType: "GroupEvent", Data: map[string]string{"Value": "Hello World Group"}})
+	eventManager.Publish(Event{EventType: "PeerEvent", Data: map[string]string{"Value": "Hello World Peer"}})
+	eventManager.Publish(Event{EventType: "ErrorEvent", Data: map[string]string{"Value": "Hello World Error"}})
+	eventManager.Publish(Event{EventType: "NobodyIsSubscribedToThisEvent", Data: map[string]string{"Value": "Noone should see this!"}})
 
 	assertLength := func(len int, expected int, label string) {
 		if len != expected {
@@ -70,7 +70,7 @@ func TestEventManagerMultiple(t *testing.T) {
 	assertLength(peerEventQueue.Backlog(), 2, "Peer Event Queue Length")
 	assertLength(allEventQueue.Backlog(), 4, "All Event Queue Length")
 
-	checkEvent := func(eventType string, expected string, label string) {
+	checkEvent := func(eventType Type, expected Type, label string) {
 		if eventType != expected {
 			t.Errorf("Expected %s to be %v was %v", label, expected, eventType)
 		}

+ 5 - 5
model/message_test.go

@@ -80,12 +80,12 @@ func TestTranscriptConsistency(t *testing.T) {
 	c5, s5, _ := alice.EncryptMessageToGroup("Hello World 5", group.GroupID)
 	t.Logf("Length of Encrypted Message: %v", len(c5))
 
-	_, m1 := sarah.AttemptDecryption(c1, s1)
+	_, _, m1 := sarah.AttemptDecryption(c1, s1)
 	sarah.AttemptDecryption(c1, s1) // Try a duplicate
-	_, m2 := sarah.AttemptDecryption(c2, s2)
-	_, m3 := sarah.AttemptDecryption(c3, s3)
-	_, m4 := sarah.AttemptDecryption(c4, s4)
-	_, m5 := sarah.AttemptDecryption(c5, s5)
+	_, _, m2 := sarah.AttemptDecryption(c2, s2)
+	_, _, m3 := sarah.AttemptDecryption(c3, s3)
+	_, _, m4 := sarah.AttemptDecryption(c4, s4)
+	_, _, m5 := sarah.AttemptDecryption(c5, s5)
 
 	// Now we simulate a client receiving these messages completely out of order
 	timeline.Insert(m1)

+ 5 - 5
model/profile.go

@@ -314,7 +314,7 @@ func (p *Profile) AddGroup(group *Group) {
 }
 
 // AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups.
-func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, *Message) {
+func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message) {
 	for _, group := range p.Groups {
 		success, dgm := group.DecryptMessage(ciphertext)
 		if success {
@@ -329,7 +329,7 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool,
 				// We set the flag to be handled by the UX and reject the message.
 				if !valid {
 					group.Compromised()
-					return false, nil
+					return false, "", nil
 				}
 			}
 
@@ -340,13 +340,13 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool,
 			// Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised.
 			if !verified {
 				group.Compromised()
-				return false, nil
+				return false, "", nil
 			}
 
-			return true, group.AddMessage(dgm, signature)
+			return true, group.GroupID, group.AddMessage(dgm, signature)
 		}
 	}
-	return false, nil
+	return false, "", nil
 }
 
 func getRandomness(arr *[]byte) {

+ 2 - 2
model/profile_test.go

@@ -161,13 +161,13 @@ func TestProfileGroup(t *testing.T) {
 	bob.ProcessInvite(gci2.GetGroupChatInvite(), alice.Onion)
 	c3, s3, err := bob.EncryptMessageToGroup("Bobs Message", group2.GroupID)
 	if err == nil {
-		ok, message := alice.AttemptDecryption(c3, s3)
+		ok, _, message := alice.AttemptDecryption(c3, s3)
 		if !ok {
 			t.Errorf("Bobs message to the group should be decrypted %v %v", message, ok)
 		}
 
 		eve := GenerateNewProfile("eve")
-		ok, _ = eve.AttemptDecryption(c3, s3)
+		ok, _, _ = eve.AttemptDecryption(c3, s3)
 		if ok {
 			t.Errorf("Eves hould not be able to decrypt messages!")
 		}

+ 70 - 188
peer/cwtch_peer.go

@@ -1,48 +1,41 @@
 package peer
 
 import (
-	"crypto/rsa"
+	"cwtch.im/cwtch/event"
 	"cwtch.im/cwtch/model"
-	"cwtch.im/cwtch/peer/connections"
-	"cwtch.im/cwtch/peer/peer"
 	"cwtch.im/cwtch/protocol"
+	"cwtch.im/cwtch/protocol/connections"
 	"encoding/base64"
 	"errors"
-	"fmt"
-	"git.openprivacy.ca/openprivacy/libricochet-go/application"
-	"git.openprivacy.ca/openprivacy/libricochet-go/channels"
-	"git.openprivacy.ca/openprivacy/libricochet-go/connection"
 	"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
 	"git.openprivacy.ca/openprivacy/libricochet-go/identity"
-	"git.openprivacy.ca/openprivacy/libricochet-go/log"
 	"git.openprivacy.ca/openprivacy/libricochet-go/utils"
+	"github.com/ethereum/go-ethereum/log"
 	"github.com/golang/protobuf/proto"
 	"golang.org/x/crypto/ed25519"
 	"strings"
 	"sync"
-	"time"
 )
 
 // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
 type cwtchPeer struct {
-	connection.AutoConnectionHandler
-	Profile            *model.Profile
-	app                *application.RicochetApplication
-	acn                connectivity.ACN
-	mutex              sync.Mutex
-	connectionsManager *connections.Manager
-	dataHandler        func(string, []byte) []byte
-	shutdown           bool
-	aif                application.ApplicationInstanceFactory
-	started            bool
+	Profile  *model.Profile
+	mutex    sync.Mutex
+	shutdown bool
+	started  bool
+
+	engine   *connections.Engine
+	queue    *event.Queue
+	eventBus *event.Manager
 }
 
 // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
 // directly implement a cwtchPeer.
 type CwtchPeer interface {
-	Init(connectivity.ACN)
+	Init(connectivity.ACN, *event.Manager)
 	PeerWithOnion(string) *connections.PeerPeerConnection
 	InviteOnionToGroup(string, string) error
+	SendMessageToPeer(string, string)
 
 	TrustPeer(string) error
 	BlockPeer(string) error
@@ -67,9 +60,6 @@ type CwtchPeer interface {
 	GetContacts() []string
 	GetContact(string) *model.PublicProfile
 
-	SetApplicationInstanceFactory(factory application.ApplicationInstanceFactory)
-	SetPeerDataHandler(func(string, []byte) []byte)
-
 	IsStarted() bool
 	Listen()
 	Shutdown()
@@ -91,10 +81,17 @@ func FromProfile(profile *model.Profile) CwtchPeer {
 }
 
 // Init instantiates a cwtchPeer
-func (cp *cwtchPeer) Init(acn connectivity.ACN) {
-	cp.acn = acn
-	cp.connectionsManager = connections.NewConnectionsManager(cp.acn)
-	go cp.connectionsManager.AttemptReconnections()
+func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus *event.Manager) {
+	cp.queue = event.NewEventQueue(100)
+	go cp.eventHandler()
+
+	cp.eventBus = eventBus
+	cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue.EventChannel)
+	cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue.EventChannel)
+
+	// TODO: Would be nice if ProtocolEngine did not need to explictly be given the Private Key.
+	cp.engine = connections.NewProtocolEngine(cp.Profile.Ed25519PrivateKey, acn, eventBus)
+	cp.engine.Identity = identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey)
 }
 
 // ImportGroup intializes a group from an imported source rather than a peer invite
@@ -121,20 +118,6 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err
 	return
 }
 
-// a handler for the optional data handler
-// note that the "correct" way to do this would be to AddChannelHandler("im.cwtch.peerdata", ...") but peerdata is such
-// a handy channel that it's nice to have this convenience shortcut
-// SetPeerDataHandler sets the handler for the (optional) data channel for cwtch peers.
-func (cp *cwtchPeer) SetPeerDataHandler(dataHandler func(string, []byte) []byte) {
-	cp.dataHandler = dataHandler
-}
-
-// add extra channel handlers (note that the peer will merge these with the ones necessary to make cwtch work, so you
-// are not clobbering the underlying functionality)
-func (cp *cwtchPeer) SetApplicationInstanceFactory(aif application.ApplicationInstanceFactory) {
-	cp.aif = aif
-}
-
 // ExportGroup serializes a group invite so it can be given offline
 func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
 	group := cp.Profile.GetGroupByGroupID(groupID)
@@ -180,81 +163,62 @@ func (cp *cwtchPeer) GetContact(onion string) *model.PublicProfile {
 }
 
 // GetProfile returns the profile associated with this cwtchPeer.
-// TODO While it is probably "safe", it is not really "safe", to call functions on this profile. This only exists to return things like Name and Onion,we should gate these.
 func (cp *cwtchPeer) GetProfile() *model.Profile {
 	return cp.Profile
 }
 
 // PeerWithOnion is the entry point for cwtchPeer relationships
 func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
-	return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler, cp.aif)
+	cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[string]string{"Onion": onion}))
+	return nil
 }
 
 // InviteOnionToGroup kicks off the invite process
 func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
-
 	group := cp.Profile.GetGroupByGroupID(groupid)
-	if group != nil {
-		log.Infof("Constructing invite for group: %v\n", group)
-		invite, err := group.Invite(group.GetInitialMessage())
-		if err != nil {
-			return err
-		}
-		ppc := cp.connectionsManager.GetPeerPeerConnectionForOnion(onion)
-		if ppc == nil {
-			return errors.New("peer connection not setup for onion. peers must be trusted before sending")
-		}
-		if ppc.GetState() == connections.AUTHENTICATED {
-			log.Infof("Got connection for group: %v - Sending Invite\n", ppc)
-			ppc.SendGroupInvite(invite)
-		} else {
-			return errors.New("cannot send invite to onion: peer connection is not ready")
-		}
-		return nil
+	if group == nil {
+		return errors.New("invalid group id")
 	}
-	return errors.New("group id could not be found")
-}
 
-// ReceiveGroupMessage is a callback function that processes GroupMessages from a given server
-func (cp *cwtchPeer) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) {
-	cp.Profile.AttemptDecryption(gm.Ciphertext, gm.Signature)
+	invite, err := group.Invite(group.InitialMessage)
+	if err == nil {
+		cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[string]string{"Onion": onion, "Invite": string(invite)}))
+	}
+	return err
 }
 
 // JoinServer manages a new server connection with the given onion address
 func (cp *cwtchPeer) JoinServer(onion string) {
-	cp.connectionsManager.ManageServerConnection(onion, cp.ReceiveGroupMessage)
+	cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[string]string{"Onion": onion}))
 }
 
 // SendMessageToGroup attemps to sent the given message to the given group id.
 func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error {
 	group := cp.Profile.GetGroupByGroupID(groupid)
 	if group == nil {
-		return errors.New("group does not exist")
-	}
-	psc := cp.connectionsManager.GetPeerServerConnectionForOnion(group.GroupServer)
-	if psc == nil {
-		return errors.New("could not find server connection to send message to")
+		return errors.New("invalid group id")
 	}
 	ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
-	if err != nil {
-		return err
-	}
-	gm := &protocol.GroupMessage{
-		Ciphertext: ct,
-		Signature:  sig,
+
+	if err == nil {
+		cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[string]string{"Server": group.GroupServer, "Ciphertext": string(ct), "Signature": string(sig)}))
 	}
-	err = psc.SendGroupMessage(gm)
+
 	return err
 }
 
+func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) {
+	cp.eventBus.Publish(event.NewEvent(event.SendMessageToPeer, map[string]string{"Peer": onion, "Message": message}))
+}
+
 // GetPeers returns a list of peer connections.
 func (cp *cwtchPeer) GetPeers() map[string]connections.ConnectionState {
-	return cp.connectionsManager.GetPeers()
+	return cp.engine.GetPeers()
 }
 
 // GetServers returns a list of server connections
 func (cp *cwtchPeer) GetServers() map[string]connections.ConnectionState {
-	return cp.connectionsManager.GetServers()
+	return cp.engine.GetServers()
 }
 
 // TrustPeer sets an existing peer relationship to trusted
@@ -269,7 +233,7 @@ func (cp *cwtchPeer) TrustPeer(peer string) error {
 // BlockPeer blocks an existing peer relationship.
 func (cp *cwtchPeer) BlockPeer(peer string) error {
 	err := cp.Profile.BlockPeer(peer)
-	cp.connectionsManager.ClosePeerConnection(peer)
+	cp.eventBus.Publish(event.NewEvent(event.BlockPeer, map[string]string{"Onion": peer}))
 	return err
 }
 
@@ -283,90 +247,15 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
 	cp.Profile.RejectInvite(groupID)
 }
 
-// LookupContact returns that a contact is known and allowed to communicate for all cases.
-func (cp *cwtchPeer) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) {
-	blocked := cp.Profile.IsBlocked(hostname)
-	return !blocked, true
-}
-
-// LookupContactV3 returns that a contact is known and allowed to communicate for all cases.
-func (cp *cwtchPeer) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
-	blocked := cp.Profile.IsBlocked(hostname)
-	return !blocked, true
-}
-
-// ContactRequest needed to implement ContactRequestHandler Interface
-func (cp *cwtchPeer) ContactRequest(name string, message string) string {
-	return "Accepted"
-}
-
 func (cp *cwtchPeer) Listen() {
-	go func() {
-		for !cp.shutdown {
-			e := cp.listenFn()
-			if e != nil {
-				// TODO: was panic, then fatal
-				fmt.Printf("ERROR: peer %v has crashed with: %v\n", cp.GetProfile().Onion, e)
-			}
-			// listenFn failed, wait 5 seconds and try again
-			time.Sleep(5 * time.Second)
-		}
-	}()
-}
-
-// Listen sets up an onion listener to process incoming cwtch messages
-func (cp *cwtchPeer) listenFn() error {
-	ra := new(application.RicochetApplication)
-	onionService, err := cp.acn.Listen(cp.Profile.Ed25519PrivateKey, application.RicochetPort)
-	if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ {
-		return err
-	}
-
-	af := application.ApplicationInstanceFactory{}
-	af.Init()
-	af.AddHandler("im.cwtch.peer", func(rai *application.ApplicationInstance) func() channels.Handler {
-		cpi := new(CwtchPeerInstance)
-		cpi.Init(rai, ra)
-		return func() channels.Handler {
-			cpc := new(peer.CwtchPeerChannel)
-			cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp}
-			return cpc
-		}
-	})
-
-	if cp.dataHandler != nil {
-		af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler {
-			cpi := new(CwtchPeerInstance)
-			cpi.Init(rai, ra)
-			return func() channels.Handler {
-				cpc := new(peer.CwtchPeerDataChannel)
-				cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp, DataHandler: cp.dataHandler}
-				return cpc
-			}
-		})
-	}
-
-	handlers := cp.aif.GetHandlers()
-	for i := range handlers {
-		af.AddHandler(handlers[i], cp.aif.GetHandler(handlers[i]))
-	}
-
-	ra.Init(cp.acn, cp.Profile.Name, identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey), af, cp)
-	log.Infof("Running cwtch peer on %v", onionService.AddressFull())
-	cp.app = ra
-	cp.started = true
-	ra.Run(onionService)
-
-	return nil
+	cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[string]string{}))
 }
 
 // Shutdown kills all connections and cleans up all goroutines for the peer
 func (cp *cwtchPeer) Shutdown() {
 	cp.shutdown = true
-	cp.connectionsManager.Shutdown()
-	if cp.app != nil {
-		cp.app.Shutdown()
-	}
+	cp.engine.Shutdown()
+	cp.queue.Shutdown()
 }
 
 // IsStarted returns true if Listen() has successfully been run before on this connection (ever). TODO: we will need to properly unset this flag on error if we want to support resumption in the future
@@ -374,32 +263,25 @@ func (cp *cwtchPeer) IsStarted() bool {
 	return cp.started
 }
 
-// CwtchPeerInstance encapsulates incoming peer connections
-type CwtchPeerInstance struct {
-	rai *application.ApplicationInstance
-	ra  *application.RicochetApplication
-}
-
-// Init sets up a CwtchPeerInstance
-func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *application.RicochetApplication) {
-	cpi.rai = rai
-	cpi.ra = ra
-}
-
-// CwtchPeerHandler encapsulates handling of incoming CwtchPackets
-type CwtchPeerHandler struct {
-	Onion       string
-	Peer        *cwtchPeer
-	DataHandler func(string, []byte) []byte
-}
-
-// HandleGroupInvite handles incoming GroupInvites
-func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
-	log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String())
-	cph.Peer.Profile.ProcessInvite(gci, cph.Onion)
-}
-
-// HandlePacket handles the Cwtch cwtchPeer Data Channel
-func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
-	return cph.DataHandler(cph.Onion, data)
+// eventHandler process events from other subsystems
+func (cp *cwtchPeer) eventHandler() {
+	for {
+		ev := cp.queue.Next()
+		switch ev.EventType {
+		case event.EncryptedGroupMessage:
+			ok, groupID, _ := cp.Profile.AttemptDecryption([]byte(ev.Data["Ciphertext"]), []byte(ev.Data["Signature"]))
+			if ok {
+				cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[string]string{"GroupID": groupID}))
+			}
+		case event.NewGroupInvite:
+			var groupInvite protocol.GroupChatInvite
+			proto.Unmarshal([]byte(ev.Data["GroupInvite"]), &groupInvite)
+			cp.Profile.ProcessInvite(&groupInvite, ev.Data["Onion"])
+		default:
+			if ev.EventType != "" {
+				log.Error("peer event handler received an event it was not subscribed for: %v")
+			}
+			return
+		}
+	}
 }

+ 11 - 5
peer/cwtch_peer_test.go

@@ -1,12 +1,12 @@
 package peer
 
 import (
-	"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
 	"testing"
 )
 
+// TODO: Rewrite these tests (and others) using the news event bus interface.
 func TestCwtchPeerGenerate(t *testing.T) {
-
+	/**
 	alice := NewCwtchPeer("alice")
 
 	groupID, _, _ := alice.StartGroup("test.server")
@@ -16,16 +16,21 @@ func TestCwtchPeerGenerate(t *testing.T) {
 	importedGroupID, err := alice.ImportGroup(exportedGroup)
 	group := alice.GetGroup(importedGroupID)
 	t.Logf("Imported Group: %v, err := %v %v", group, err, importedGroupID)
-
+	*/
 }
 
 func TestTrustPeer(t *testing.T) {
+	/**
 	groupName := "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"
 	alice := NewCwtchPeer("alice")
-	alice.Init(connectivity.LocalProvider())
+	aem := new(event.Manager)
+	aem.Initialize()
+	alice.Init(connectivity.LocalProvider(),aem)
 	defer alice.Shutdown()
 	bob := NewCwtchPeer("bob")
-	bob.Init(connectivity.LocalProvider())
+	bem := new(event.Manager)
+	bem.Initialize()
+	bob.Init(connectivity.LocalProvider(), bem)
 	defer bob.Shutdown()
 
 	bobOnion := bob.GetProfile().Onion
@@ -70,4 +75,5 @@ func TestTrustPeer(t *testing.T) {
 	if err == nil {
 		t.Errorf("bob trusts alice but peer connection is not ready yet. should not be able to invite her to group, instead got: %v", err)
 	}
+	*/
 }

+ 2 - 4
peer/connections/connectionsmanager.go

@@ -1,9 +1,7 @@
 package connections
 
 import (
-	"cwtch.im/cwtch/model"
 	"cwtch.im/cwtch/protocol"
-	"git.openprivacy.ca/openprivacy/libricochet-go/application"
 	"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
 	"sync"
 	"time"
@@ -29,13 +27,13 @@ func NewConnectionsManager(acn connectivity.ACN) *Manager {
 }
 
 // ManagePeerConnection creates a new PeerConnection for the given Host and Profile.
-func (m *Manager) ManagePeerConnection(host string, profile *model.Profile, dataHandler func(string, []byte) []byte, aif application.ApplicationInstanceFactory) *PeerPeerConnection {
+func (m *Manager) ManagePeerConnection(host string, engine *Engine) *PeerPeerConnection {
 	m.lock.Lock()
 	defer m.lock.Unlock()
 
 	_, exists := m.peerConnections[host]
 	if !exists {
-		ppc := NewPeerPeerConnection(m.acn, host, profile, dataHandler, aif)
+		ppc := NewPeerPeerConnection(host, engine)
 		go ppc.Run()
 		m.peerConnections[host] = ppc
 		return ppc

peer/connections/connectsmanager_test.go → protocol/connections/connectsmanager_test.go


+ 246 - 0
protocol/connections/engine.go

@@ -0,0 +1,246 @@
+package connections
+
+import (
+	"crypto/rsa"
+	"cwtch.im/cwtch/event"
+	"cwtch.im/cwtch/protocol"
+	"cwtch.im/cwtch/protocol/connections/peer"
+	"errors"
+	"git.openprivacy.ca/openprivacy/libricochet-go/application"
+	"git.openprivacy.ca/openprivacy/libricochet-go/channels"
+	"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
+	"git.openprivacy.ca/openprivacy/libricochet-go/identity"
+	"git.openprivacy.ca/openprivacy/libricochet-go/log"
+	"github.com/golang/protobuf/proto"
+	"golang.org/x/crypto/ed25519"
+)
+
+// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
+// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
+type Engine struct {
+	queue              *event.Queue
+	connectionsManager *Manager
+
+	// Engine Attributes
+	Identity identity.Identity
+	ACN      connectivity.ACN
+
+	app *application.RicochetApplication
+
+	// Engine State
+	started bool
+
+	// Pointer to the Global Event Manager
+	eventManager *event.Manager
+	privateKey   ed25519.PrivateKey
+}
+
+// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
+func NewProtocolEngine(privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager) *Engine {
+	engine := new(Engine)
+	engine.privateKey = privateKey
+	engine.queue = event.NewEventQueue(100)
+	go engine.eventHandler()
+
+	engine.ACN = acn
+	engine.connectionsManager = NewConnectionsManager(engine.ACN)
+	go engine.connectionsManager.AttemptReconnections()
+
+	engine.eventManager = eventManager
+	engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel)
+	engine.eventManager.Subscribe(event.PeerRequest, engine.queue.EventChannel)
+	engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue.EventChannel)
+	engine.eventManager.Subscribe(event.JoinServer, engine.queue.EventChannel)
+	engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue.EventChannel)
+	engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue.EventChannel)
+
+	return engine
+}
+
+// eventHandler process events from other subsystems
+func (e *Engine) eventHandler() {
+	for {
+		ev := e.queue.Next()
+		switch ev.EventType {
+		case event.StatusRequest:
+			e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
+		case event.PeerRequest:
+			e.PeerWithOnion(ev.Data["Onion"])
+		case event.InvitePeerToGroup:
+			e.InviteOnionToGroup(ev.Data["Onion"], []byte(ev.Data["Invite"]))
+		case event.JoinServer:
+			e.JoinServer(ev.Data["Onion"])
+		case event.SendMessageToGroup:
+			e.SendMessageToGroup(ev.Data["Server"], []byte(ev.Data["Ciphertext"]), []byte(ev.Data["Signature"]))
+		case event.SendMessageToPeer:
+			log.Debugf("Sending Message to Peer.....")
+			ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data["Peer"])
+			if ppc != nil {
+				// TODO this will block.
+				ppc.SendPacket([]byte(ev.Data["Message"]))
+			}
+		case event.ProtocolEngineStartListen:
+			go e.listenFn()
+		default:
+			return
+		}
+	}
+}
+
+// GetPeerHandler is an external interface function that allows callers access to a CwtchPeerHandler
+// TODO: There is likely a slightly better way to encapsulate this behavior
+func (e *Engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler {
+	return &CwtchPeerHandler{Onion: remotePeerHostname, EventBus: e.eventManager}
+}
+
+// Listen sets up an onion listener to process incoming cwtch messages
+func (e *Engine) listenFn() {
+	ra := new(application.RicochetApplication)
+	onionService, err := e.ACN.Listen(e.privateKey, application.RicochetPort)
+	if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ {
+		e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[string]string{"Identity": e.Identity.Hostname(), "Error": err.Error()}))
+		return
+	}
+
+	af := application.ApplicationInstanceFactory{}
+	af.Init()
+	af.AddHandler("im.cwtch.peer", func(rai *application.ApplicationInstance) func() channels.Handler {
+		cpi := new(CwtchPeerInstance)
+		cpi.Init(rai, ra)
+		return func() channels.Handler {
+			cpc := new(peer.CwtchPeerChannel)
+			cpc.Handler = e.GetPeerHandler(rai.RemoteHostname)
+			return cpc
+		}
+	})
+
+	af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler {
+		cpi := new(CwtchPeerInstance)
+		cpi.Init(rai, ra)
+		return func() channels.Handler {
+			cpc := new(peer.CwtchPeerDataChannel)
+			cpc.Handler = e.GetPeerHandler(rai.RemoteHostname)
+			return cpc
+		}
+	})
+
+	ra.Init(e.ACN, e.Identity.Name, e.Identity, af, e)
+	log.Infof("Running cwtch peer on %v", onionService.AddressFull())
+	e.started = true
+	e.app = ra
+	ra.Run(onionService)
+	e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[string]string{"Identity": e.Identity.Hostname()}))
+	return
+}
+
+// LookupContact returns that a contact is known and allowed to communicate for all cases.
+func (e *Engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) {
+	return true, true
+}
+
+// LookupContactV3 returns that a contact is known and allowed to communicate for all cases.
+func (e *Engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
+	return true, true
+}
+
+// ContactRequest needed to implement ContactRequestHandler Interface
+func (e *Engine) ContactRequest(name string, message string) string {
+	return "Accepted"
+}
+
+// Shutdown tears down the eventHandler goroutine
+func (e *Engine) Shutdown() {
+	e.connectionsManager.Shutdown()
+	e.app.Shutdown()
+	e.queue.Shutdown()
+}
+
+// PeerWithOnion is the entry point for cwtchPeer relationships
+func (e *Engine) PeerWithOnion(onion string) *PeerPeerConnection {
+	return e.connectionsManager.ManagePeerConnection(onion, e)
+}
+
+// InviteOnionToGroup kicks off the invite process
+func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error {
+	ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(onion)
+	if ppc == nil {
+		return errors.New("peer connection not setup for onion. peers must be trusted before sending")
+	}
+	if ppc.GetState() == AUTHENTICATED {
+		log.Infof("Got connection for group: %v - Sending Invite\n", ppc)
+		ppc.SendGroupInvite(invite)
+	} else {
+		return errors.New("cannot send invite to onion: peer connection is not ready")
+	}
+	return nil
+}
+
+// ReceiveGroupMessage is a callback function that processes GroupMessages from a given server
+func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) {
+	// Publish Event so that a Profile Engine can deal with it.
+	// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
+	e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[string]string{"Ciphertext": string(gm.GetCiphertext()), "Signature": string(gm.GetSignature())}))
+}
+
+// JoinServer manages a new server connection with the given onion address
+func (e *Engine) JoinServer(onion string) {
+	e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage)
+}
+
+// SendMessageToGroup attemps to sent the given message to the given group id.
+func (e *Engine) SendMessageToGroup(server string, ct []byte, sig []byte) error {
+	psc := e.connectionsManager.GetPeerServerConnectionForOnion(server)
+	if psc == nil {
+		return errors.New("could not find server connection to send message to")
+	}
+	gm := &protocol.GroupMessage{
+		Ciphertext: ct,
+		Signature:  sig,
+	}
+	err := psc.SendGroupMessage(gm)
+	return err
+}
+
+// GetPeers returns a list of peer connections.
+func (e *Engine) GetPeers() map[string]ConnectionState {
+	return e.connectionsManager.GetPeers()
+}
+
+// GetServers returns a list of server connections
+func (e *Engine) GetServers() map[string]ConnectionState {
+	return e.connectionsManager.GetServers()
+}
+
+// CwtchPeerInstance encapsulates incoming peer connections
+type CwtchPeerInstance struct {
+	rai *application.ApplicationInstance
+	ra  *application.RicochetApplication
+}
+
+// Init sets up a CwtchPeerInstance
+func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *application.RicochetApplication) {
+	cpi.rai = rai
+	cpi.ra = ra
+}
+
+// CwtchPeerHandler encapsulates handling of incoming CwtchPackets
+type CwtchPeerHandler struct {
+	Onion       string
+	EventBus    *event.Manager
+	DataHandler func(string, []byte) []byte
+}
+
+// HandleGroupInvite handles incoming GroupInvites
+func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
+	log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String())
+	marshal, err := proto.Marshal(gci)
+	if err == nil {
+		cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[string]string{"Onion": cph.Onion, "GroupInvite": string(marshal)}))
+	}
+}
+
+// HandlePacket handles the Cwtch cwtchPeer Data Channel
+func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
+	cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[string]string{"Onion": cph.Onion, "Data": string(data)}))
+	return []byte{} // TODO remove this
+}

peer/fetch/peer_fetch_channel.go → protocol/connections/fetch/peer_fetch_channel.go


peer/fetch/peer_fetch_channel_test.go → protocol/connections/fetch/peer_fetch_channel_test.go


+ 1 - 3
peer/listen/peer_listen_channel.go

@@ -78,9 +78,7 @@ func (cplc *CwtchPeerListenChannel) Packet(data []byte) {
 	if err == nil {
 		if csp.GetGroupMessage() != nil {
 			gm := csp.GetGroupMessage()
-			// We create a new go routine here to avoid leaking any information about processing time
-			// TODO Server can probably try to use this to DoS a peer
-			go cplc.Handler.HandleGroupMessage(gm)
+			cplc.Handler.HandleGroupMessage(gm)
 		}
 	}
 }

peer/listen/peer_listen_channel_test.go → protocol/connections/listen/peer_listen_channel_test.go


peer/peer/peer_channel.go → protocol/connections/peer/peer_channel.go


peer/peer/peer_channel_test.go → protocol/connections/peer/peer_channel_test.go


peer/peer/peer_data_channel.go → protocol/connections/peer/peer_data_channel.go


+ 14 - 57
peer/connections/peerpeerconnection.go

@@ -1,15 +1,10 @@
 package connections
 
 import (
-	"cwtch.im/cwtch/model"
-	"cwtch.im/cwtch/peer/peer"
-	"cwtch.im/cwtch/protocol"
+	"cwtch.im/cwtch/protocol/connections/peer"
 	"git.openprivacy.ca/openprivacy/libricochet-go"
-	"git.openprivacy.ca/openprivacy/libricochet-go/application"
 	"git.openprivacy.ca/openprivacy/libricochet-go/channels"
 	"git.openprivacy.ca/openprivacy/libricochet-go/connection"
-	"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
-	"git.openprivacy.ca/openprivacy/libricochet-go/identity"
 	"git.openprivacy.ca/openprivacy/libricochet-go/log"
 	"time"
 )
@@ -17,23 +12,17 @@ import (
 // PeerPeerConnection encapsulates a single outgoing cwtchPeer->cwtchPeer connection
 type PeerPeerConnection struct {
 	connection.AutoConnectionHandler
-	PeerHostname string
-	state        ConnectionState
-	connection   *connection.Connection
-	profile      *model.Profile
-	dataHandler  func(string, []byte) []byte
-	aif          application.ApplicationInstanceFactory
-	acn          connectivity.ACN
+	PeerHostname   string
+	state          ConnectionState
+	connection     *connection.Connection
+	protocolEngine *Engine
 }
 
 // NewPeerPeerConnection creates a new peer connection for the given hostname and profile.
-func NewPeerPeerConnection(acn connectivity.ACN, peerhostname string, profile *model.Profile, dataHandler func(string, []byte) []byte, aif application.ApplicationInstanceFactory) *PeerPeerConnection {
+func NewPeerPeerConnection(peerhostname string, protocolEngine *Engine) *PeerPeerConnection {
 	ppc := new(PeerPeerConnection)
-	ppc.acn = acn
 	ppc.PeerHostname = peerhostname
-	ppc.profile = profile
-	ppc.dataHandler = dataHandler
-	ppc.aif = aif
+	ppc.protocolEngine = protocolEngine
 	ppc.Init()
 	return ppc
 }
@@ -43,16 +32,6 @@ func (ppc *PeerPeerConnection) GetState() ConnectionState {
 	return ppc.state
 }
 
-// HandleGroupInvite passes the given group invite tothe profile
-func (ppc *PeerPeerConnection) HandleGroupInvite(gci *protocol.GroupChatInvite) {
-	ppc.profile.ProcessInvite(gci, ppc.PeerHostname)
-}
-
-// HandlePacket handles data packets on the optional data channel
-func (ppc *PeerPeerConnection) HandlePacket(data []byte) []byte {
-	return ppc.dataHandler(ppc.PeerHostname, data)
-}
-
 // SendPacket sends data packets on the optional data channel
 func (ppc *PeerPeerConnection) SendPacket(data []byte) {
 	ppc.WaitTilAuthenticated()
@@ -69,18 +48,6 @@ func (ppc *PeerPeerConnection) SendPacket(data []byte) {
 	})
 }
 
-// DoOnChannel performs an operation on the requested channel
-func (ppc *PeerPeerConnection) DoOnChannel(ctype string, direction channels.Direction, doSomethingWith func(channel *channels.Channel)) {
-	ppc.WaitTilAuthenticated()
-	ppc.connection.Do(func() error {
-		channel := ppc.connection.Channel(ctype, direction)
-		if channel != nil {
-			doSomethingWith(channel)
-		}
-		return nil
-	})
-}
-
 // SendGroupInvite sends the given serialized invite packet to the Peer
 func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) {
 	ppc.WaitTilAuthenticated()
@@ -110,33 +77,23 @@ func (ppc *PeerPeerConnection) WaitTilAuthenticated() {
 // Run manages the setup and teardown of a peer->peer connection
 func (ppc *PeerPeerConnection) Run() error {
 	ppc.state = CONNECTING
-	rc, err := goricochet.Open(ppc.acn, ppc.PeerHostname)
+	rc, err := goricochet.Open(ppc.protocolEngine.ACN, ppc.PeerHostname)
 	if err == nil {
 		ppc.connection = rc
 		ppc.state = CONNECTED
-		_, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(identity.InitializeV3(ppc.profile.Name, &ppc.profile.Ed25519PrivateKey, &ppc.profile.Ed25519PublicKey))
+		_, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity)
 		if err == nil {
 			ppc.state = AUTHENTICATED
 			go func() {
 				ppc.connection.Do(func() error {
-					ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc})
+					ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)})
 					return nil
 				})
 
-				if ppc.dataHandler != nil {
-					ppc.connection.Do(func() error {
-						ppc.connection.RequestOpenChannel("im.cwtch.peer.data", &peer.CwtchPeerDataChannel{Handler: ppc})
-						return nil
-					})
-				}
-
-				handlers := ppc.aif.GetHandlers()
-				for i := range handlers {
-					ppc.connection.Do(func() error {
-						ppc.connection.RequestOpenChannel(handlers[i], ppc.aif.GetHandler(handlers[i])(ppc.aif.GetApplicationInstance(ppc.connection))())
-						return nil
-					})
-				}
+				ppc.connection.Do(func() error {
+					ppc.connection.RequestOpenChannel("im.cwtch.peer.data", &peer.CwtchPeerDataChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)})
+					return nil
+				})
 			}()
 
 			ppc.connection.Process(ppc)

+ 2 - 3
peer/connections/peerpeerconnection_test.go

@@ -3,11 +3,10 @@ package connections
 import (
 	"crypto/rand"
 	"cwtch.im/cwtch/model"
-	"cwtch.im/cwtch/peer/peer"
 	"cwtch.im/cwtch/protocol"
+	"cwtch.im/cwtch/protocol/connections/peer"
 	"fmt"
 	"git.openprivacy.ca/openprivacy/libricochet-go"
-	"git.openprivacy.ca/openprivacy/libricochet-go/application"
 	"git.openprivacy.ca/openprivacy/libricochet-go/channels"
 	"git.openprivacy.ca/openprivacy/libricochet-go/connection"
 	"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
@@ -61,7 +60,7 @@ func TestPeerPeerConnection(t *testing.T) {
 
 	profile := model.GenerateNewProfile("alice")
 	hostname := identity.Hostname()
-	ppc := NewPeerPeerConnection(connectivity.LocalProvider(), "127.0.0.1:5452|"+hostname, profile, nil, application.ApplicationInstanceFactory{})
+	ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, &Engine{ACN: connectivity.LocalProvider(), Identity: identity})
 
 	tp := new(TestPeer)
 	tp.Init()

+ 3 - 3
peer/connections/peerserverconnection.go

@@ -2,10 +2,10 @@ package connections
 
 import (
 	"crypto/rand"
-	"cwtch.im/cwtch/peer/fetch"
-	"cwtch.im/cwtch/peer/listen"
-	"cwtch.im/cwtch/peer/send"
 	"cwtch.im/cwtch/protocol"
+	"cwtch.im/cwtch/protocol/connections/fetch"
+	"cwtch.im/cwtch/protocol/connections/listen"
+	"cwtch.im/cwtch/protocol/connections/send"
 	"errors"
 	"git.openprivacy.ca/openprivacy/libricochet-go"
 	"git.openprivacy.ca/openprivacy/libricochet-go/channels"

peer/connections/peerserverconnection_test.go → protocol/connections/peerserverconnection_test.go


+ 1 - 1
peer/send/peer_send_channel.go

@@ -2,7 +2,7 @@ package send
 
 import (
 	"cwtch.im/cwtch/protocol"
-	"cwtch.im/cwtch/protocol/spam"
+	"cwtch.im/cwtch/protocol/connections/spam"
 	"errors"
 	"git.openprivacy.ca/openprivacy/libricochet-go/channels"
 	"git.openprivacy.ca/openprivacy/libricochet-go/utils"

+ 1 - 1
peer/send/peer_send_channel_test.go

@@ -2,7 +2,7 @@ package send
 
 import (
 	"cwtch.im/cwtch/protocol"
-	"cwtch.im/cwtch/protocol/spam"
+	"cwtch.im/cwtch/protocol/connections/spam"
 	"git.openprivacy.ca/openprivacy/libricochet-go/channels"
 	"git.openprivacy.ca/openprivacy/libricochet-go/wire/control"
 	"github.com/golang/protobuf/proto"

protocol/spam/spamguard.go → protocol/connections/spam/spamguard.go


protocol/spam/spamguard_test.go → protocol/connections/spam/spamguard_test.go


peer/connections/state.go → protocol/connections/state.go


+ 1 - 1
server/send/server_send_channel.go

@@ -2,7 +2,7 @@ package send
 
 import (
 	"cwtch.im/cwtch/protocol"
-	"cwtch.im/cwtch/protocol/spam"
+	"cwtch.im/cwtch/protocol/connections/spam"
 	"errors"
 	"git.openprivacy.ca/openprivacy/libricochet-go/channels"
 	"git.openprivacy.ca/openprivacy/libricochet-go/log"

+ 1 - 1
server/send/server_send_channel_test.go

@@ -2,7 +2,7 @@ package send
 
 import (
 	"cwtch.im/cwtch/protocol"
-	"cwtch.im/cwtch/protocol/spam"
+	"cwtch.im/cwtch/protocol/connections/spam"
 	"git.openprivacy.ca/openprivacy/libricochet-go/channels"
 	"git.openprivacy.ca/openprivacy/libricochet-go/wire/control"
 	"github.com/golang/protobuf/proto"

+ 32 - 15
testing/cwtch_peer_server_intergration_test.go

@@ -1,15 +1,17 @@
 package testing
 
 import (
+	"cwtch.im/cwtch/event"
 	"cwtch.im/cwtch/model"
 	"cwtch.im/cwtch/peer"
-	"cwtch.im/cwtch/peer/connections"
+	"cwtch.im/cwtch/protocol/connections"
 	cwtchserver "cwtch.im/cwtch/server"
 	"fmt"
 	"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
 	"golang.org/x/net/proxy"
 	"os"
 	"runtime"
+	"runtime/pprof"
 	"testing"
 	"time"
 )
@@ -62,13 +64,13 @@ func waitForPeerServerConnection(t *testing.T, peer peer.CwtchPeer, server strin
 			}
 			if state != connections.AUTHENTICATED {
 				fmt.Printf("peer %v waiting connect to server %v, currently: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state])
-				time.Sleep(time.Second * 10)
+				time.Sleep(time.Second * 5)
 				continue
+			} else {
+				break
 			}
-		} else {
-			t.Fatalf("peer server connectiond %v should have entry for server %v", servers, server)
-		}
-		break
+		} // It might take a second for the server to show up as it is now going through the event bus
+		time.Sleep(time.Second)
 	}
 	return
 }
@@ -83,13 +85,13 @@ func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.Cw
 			}
 			if state != connections.AUTHENTICATED {
 				fmt.Printf("peer% v waiting connect to peer %v, currently: %v\n", peera.GetProfile().Onion, peerb.GetProfile().Onion, connections.ConnectionStateName[state])
-				time.Sleep(time.Second * 10)
+				time.Sleep(time.Second * 5)
 				continue
+			} else {
+				break
 			}
-		} else {
-			t.Fatalf("peer peer connectiond %v should have entry for server %v", peers, peerb.GetProfile().Onion)
-		}
-		break
+		} // It might take a second for the peer to show up as it is now going through the event bus
+		time.Sleep(time.Second)
 	}
 	return
 }
@@ -130,21 +132,29 @@ func TestCwtchPeerIntegration(t *testing.T) {
 
 	// ***** cwtchPeer setup *****
 
+	// It's important that each Peer have their own EventBus
+	aliceEventBus := new(event.Manager)
+	aliceEventBus.Initialize()
+	bobEventBus := new(event.Manager)
+	bobEventBus.Initialize()
+	carolEventBus := new(event.Manager)
+	carolEventBus.Initialize()
+
 	fmt.Println("Creating Alice...")
 	alice := peer.NewCwtchPeer("Alice")
-	alice.Init(acn)
+	alice.Init(acn, aliceEventBus)
 	alice.Listen()
 	fmt.Println("Alice created:", alice.GetProfile().Onion)
 
 	fmt.Println("Creating Bob...")
 	bob := peer.NewCwtchPeer("Bob")
-	bob.Init(acn)
+	bob.Init(acn, bobEventBus)
 	bob.Listen()
 	fmt.Println("Bob created:", bob.GetProfile().Onion)
 
 	fmt.Println("Creating Carol...")
 	carol := peer.NewCwtchPeer("Carol")
-	carol.Init(acn)
+	carol.Init(acn, carolEventBus)
 	carol.Listen()
 	fmt.Println("Carol created:", carol.GetProfile().Onion)
 
@@ -280,6 +290,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
 
 	fmt.Println("Shutting down Alice...")
 	alice.Shutdown()
+	aliceEventBus.Shutdown()
 	time.Sleep(time.Second * 5)
 	numGoRoutinesPostAlice := runtime.NumGoroutine()
 
@@ -368,6 +379,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
 
 	fmt.Println("Shutting down Bob...")
 	bob.Shutdown()
+	bobEventBus.Shutdown()
 	time.Sleep(time.Second * 3)
 	numGoRoutinesPostBob := runtime.NumGoroutine()
 	if server != nil {
@@ -377,11 +389,16 @@ func TestCwtchPeerIntegration(t *testing.T) {
 	}
 	numGoRoutinesPostServerShutdown := runtime.NumGoroutine()
 
-	fmt.Println("Shuttind down Carol...")
+	fmt.Println("Shutting down Carol...")
 	carol.Shutdown()
+	carolEventBus.Shutdown()
 	time.Sleep(time.Second * 3)
 	numGoRoutinesPostCarol := runtime.NumGoroutine()
 
+	// Printing out the current goroutines
+	// Very useful if we are leaking any.
+	pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
+
 	fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
 		"numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\n",
 		numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,

+ 6 - 6
testing/tests.sh

@@ -4,13 +4,13 @@ set -e
 pwd
 go test ${1} -coverprofile=model.cover.out -v ./model
 go test ${1} -coverprofile=event.cover.out -v ./event
-go test ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/spam
 go test ${1} -coverprofile=storage.cover.out -v ./storage
-go test ${1} -coverprofile=peer.connections.cover.out -v ./peer/connections
-go test ${1} -coverprofile=peer.fetch.cover.out -v ./peer/fetch
-go test ${1} -coverprofile=peer.listen.cover.out -v ./peer/listen
-go test ${1} -coverprofile=peer.peer.cover.out -v ./peer/peer
-go test ${1} -coverprofile=peer.send.cover.out -v ./peer/send
+go test ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections
+go test ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/connections/spam
+go test ${1} -coverprofile=peer.fetch.cover.out -v ./protocol/connections/fetch
+go test ${1} -coverprofile=peer.listen.cover.out -v ./protocol/connections/listen
+go test ${1} -coverprofile=peer.peer.cover.out -v ./protocol/connections/peer
+go test ${1} -coverprofile=peer.send.cover.out -v ./protocol/connections/send
 go test ${1} -coverprofile=peer.cover.out -v ./peer
 go test ${1} -coverprofile=server.fetch.cover.out -v ./server/fetch
 go test ${1} -coverprofile=server.listen.cover.out -v ./server/listen