Merge branch 'protocolengine' of cwtch.im/cwtch into master

This commit is contained in:
Dan Ballard 2019-01-09 20:24:57 +00:00 committed by Gogs
commit 591a09e25d
39 changed files with 719 additions and 322 deletions

View File

@ -2,6 +2,7 @@ package app
import ( import (
"crypto/rand" "crypto/rand"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/peer" "cwtch.im/cwtch/peer"
"cwtch.im/cwtch/storage" "cwtch.im/cwtch/storage"
"encoding/hex" "encoding/hex"
@ -23,6 +24,7 @@ type application struct {
mutex sync.Mutex mutex sync.Mutex
primaryonion string primaryonion string
storage map[string]storage.ProfileStore storage map[string]storage.ProfileStore
eventBus *event.Manager
} }
// Application is a full cwtch peer application. It allows management, usage and storage of multiple peers // Application is a full cwtch peer application. It allows management, usage and storage of multiple peers
@ -46,6 +48,8 @@ func NewApp(acn connectivity.ACN, appDirectory string) Application {
log.Debugf("NewApp(%v)\n", appDirectory) log.Debugf("NewApp(%v)\n", appDirectory)
app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), directory: appDirectory, acn: acn} app := &application{peers: make(map[string]peer.CwtchPeer), storage: make(map[string]storage.ProfileStore), directory: appDirectory, acn: acn}
os.Mkdir(path.Join(app.directory, "profiles"), 0700) os.Mkdir(path.Join(app.directory, "profiles"), 0700)
app.eventBus = new(event.Manager)
app.eventBus.Initialize()
return app return app
} }
@ -73,7 +77,7 @@ func (app *application) CreatePeer(name string, password string) (peer.CwtchPeer
if err != nil { if err != nil {
return nil, err return nil, err
} }
p.Init(app.acn) p.Init(app.acn, app.eventBus)
_, exists := app.peers[p.GetProfile().Onion] _, exists := app.peers[p.GetProfile().Onion]
if exists { if exists {
p.Shutdown() p.Shutdown()
@ -109,7 +113,7 @@ func (app *application) LoadProfiles(password string) error {
continue continue
} }
p.Init(app.acn) p.Init(app.acn, app.eventBus)
app.mutex.Lock() app.mutex.Lock()
app.peers[p.GetProfile().Onion] = p app.peers[p.GetProfile().Onion] = p

View File

@ -1,8 +1,9 @@
package main package main
import ( import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/peer" "cwtch.im/cwtch/peer"
"cwtch.im/cwtch/peer/connections" "cwtch.im/cwtch/protocol/connections"
"errors" "errors"
"fmt" "fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
@ -46,7 +47,7 @@ func main() {
} }
botPeer := peer.NewCwtchPeer("servermon") botPeer := peer.NewCwtchPeer("servermon")
botPeer.Init(acn) botPeer.Init(acn, new(event.Manager))
fmt.Printf("Connecting to %v...\n", serverAddr) fmt.Printf("Connecting to %v...\n", serverAddr)
botPeer.JoinServer(serverAddr) botPeer.JoinServer(serverAddr)

View File

@ -6,7 +6,7 @@ import (
"bytes" "bytes"
"cwtch.im/cwtch/model" "cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer/connections" "cwtch.im/cwtch/protocol/connections"
"fmt" "fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/log"
@ -265,6 +265,7 @@ func main() {
log.Errorf("Error initializing application: %v", err) log.Errorf("Error initializing application: %v", err)
os.Exit(1) os.Exit(1)
} }
log.SetLevel(log.LevelDebug)
fmt.Printf("\nWelcome to Cwtch!\n") fmt.Printf("\nWelcome to Cwtch!\n")
fmt.Printf("If this if your first time you should create a profile by running `/new-profile`\n") fmt.Printf("If this if your first time you should create a profile by running `/new-profile`\n")
fmt.Printf("`/load-profiles` will prompt you for a password and load profiles from storage\n") fmt.Printf("`/load-profiles` will prompt you for a password and load profiles from storage\n")
@ -283,6 +284,8 @@ func main() {
text := prompt.Input(prmpt, completer, prompt.OptionSuggestionBGColor(prompt.Purple), text := prompt.Input(prmpt, completer, prompt.OptionSuggestionBGColor(prompt.Purple),
prompt.OptionDescriptionBGColor(prompt.White), prompt.OptionDescriptionBGColor(prompt.White),
prompt.OptionPrefixTextColor(prompt.White),
prompt.OptionInputTextColor(prompt.Purple),
prompt.OptionHistory(history)) prompt.OptionHistory(history))
commands := strings.Split(text[0:], " ") commands := strings.Split(text[0:], " ")

View File

@ -1,20 +1,40 @@
package main package main
import ( import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/peer" "cwtch.im/cwtch/peer"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/log"
"os"
"path"
) )
func main() { func main() {
log.AddEverythingFromPattern("peer/alice")
alice := peer.NewCwtchPeer("alice")
processData := func(onion string, data []byte) []byte { // System Setup, We need Tor and Logging up and Running
log.Debugf("Recieved %s from %v", data, onion) log.AddEverythingFromPattern("peer/alice")
return data log.SetLevel(log.LevelDebug)
acn, err := connectivity.StartTor(path.Join(".", ".cwtch"), "")
if err != nil {
log.Errorf("\nError connecting to Tor: %v\n", err)
os.Exit(1)
} }
alice.SetPeerDataHandler(processData) // Setup the Event Bus to Listen for Data Packets
eventBus := new(event.Manager)
eventBus.Initialize()
queue := event.NewEventQueue(100)
eventBus.Subscribe(event.NewMessageFromPeer, queue.EventChannel)
// Setup Alice to Listen for new Events
alice := peer.NewCwtchPeer("alice")
alice.Init(acn, eventBus)
alice.Listen() alice.Listen()
// For every new Data Packet Alice recieved she will Print it out.
for {
event := queue.Next()
log.Printf(log.LevelInfo, "Received %v from %v: %s", event.EventType, event.Data["Onion"], event.Data["Data"])
}
} }

View File

@ -1,28 +1,41 @@
package main package main
import ( import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/peer" "cwtch.im/cwtch/peer"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/log"
"strconv" "os"
"path"
"time" "time"
) )
func main() { func main() {
// System Boilerplate, We need Tor Up and Running
log.AddEverythingFromPattern("peer/bob") log.AddEverythingFromPattern("peer/bob")
log.SetLevel(log.LevelDebug)
acn, err := connectivity.StartTor(path.Join(".", ".cwtch"), "")
if err != nil {
log.Errorf("\nError connecting to Tor: %v\n", err)
os.Exit(1)
}
// Set up the Event Buss and Initialize the Peer
eventBus := new(event.Manager)
eventBus.Initialize()
bob := peer.NewCwtchPeer("bob") bob := peer.NewCwtchPeer("bob")
counter := 1 bob.Init(acn, eventBus)
bob.SetPeerDataHandler(func(onion string, data []byte) []byte { // Add Alice's Onion Here (It changes run to run)
log.Infof("Recieved %s from %v", data, onion) bob.PeerWithOnion("upiztu7myymjf2dn4x4czhagp7axlnqjvf5zwfegbhtpkqb6v3vgu5yd")
counter++
return []byte(strconv.Itoa(counter))
})
connection := bob.PeerWithOnion("f4b6thuwmfszsqd3fzqpr45sdem4qoazdlzr2xmnc7fq22qe746hjqqd")
// Send the Message...
log.Infof("Waiting for Bob to Connect to Alice...") log.Infof("Waiting for Bob to Connect to Alice...")
connection.SendPacket([]byte("Hello Alice!!!")) bob.SendMessageToPeer("upiztu7myymjf2dn4x4czhagp7axlnqjvf5zwfegbhtpkqb6v3vgu5yd", "Hello Alice!!!")
// Wait a while... // Wait a while...
// Everything is run in a goroutine so the main thread has to stay active
time.Sleep(time.Second * 100) time.Sleep(time.Second * 100)
} }

31
event/EventQueue.go Normal file
View File

@ -0,0 +1,31 @@
package event
// Queue is a wrapper around a channel for handling Events in a consistent way across subsystems.
// The expectation is that each subsystem in Cwtch will manage a given an event.Queue fed from
// the event.Manager.
type Queue struct {
EventChannel chan Event
}
// NewEventQueue initializes an event.Queue of the given buffer size.
func NewEventQueue(buffer int) *Queue {
queue := new(Queue)
queue.EventChannel = make(chan Event, buffer)
return queue
}
// Backlog returns the length of the queue backlog
func (eq *Queue) Backlog() int {
return len(eq.EventChannel)
}
// Next returns the next available event from the front of the queue
func (eq *Queue) Next() (event Event) {
event = <-eq.EventChannel
return
}
// Shutdown closes our EventChannel
func (eq *Queue) Shutdown() {
close(eq.EventChannel)
}

27
event/common.go Normal file
View File

@ -0,0 +1,27 @@
package event
// Type captures the definition of many common Cwtch application events
type Type string
// Defining Common Event Types
const (
StatusRequest = Type("StatusRequest")
ProtocolEngineStatus = Type("ProtocolEngineStatus")
PeerRequest = Type("PeerRequest")
BlockPeer = Type("BlockPeer")
JoinServer = Type("JoinServer")
ProtocolEngineStartListen = Type("ProtocolEngineStartListen")
ProtocolEngineStopped = Type("ProtocolEngineStopped")
InvitePeerToGroup = Type("InvitePeerToGroup")
NewGroupInvite = Type("NewGroupInvite")
SendMessageToGroup = Type("SendMessagetoGroup")
EncryptedGroupMessage = Type("EncryptedGroupMessage")
NewMessageFromGroup = Type("NewMessageFromGroup")
SendMessageToPeer = Type("SendMessageToPeer")
NewMessageFromPeer = Type("NewMessageFromPeer")
)

90
event/eventmanager.go Normal file
View File

@ -0,0 +1,90 @@
package event
import (
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"git.openprivacy.ca/openprivacy/libricochet-go/utils"
"sync"
)
// Event is a structure which binds a given set of data to an Type
type Event struct {
EventType Type
EventID string
Data map[string]string
}
// NewEvent creates a new event object with a unique ID and the given type and data.
func NewEvent(eventType Type, data map[string]string) Event {
return Event{EventType: eventType, EventID: utils.GetRandNumber().String(), Data: data}
}
// Manager is an Event Bus which allows subsystems to subscribe to certain EventTypes and publish others.
type Manager struct {
subscribers map[Type][]chan Event
events chan Event
mapMutex sync.Mutex
internal chan bool
}
// Initialize sets up the Manager.
func (em *Manager) Initialize() {
em.subscribers = make(map[Type][]chan Event)
em.events = make(chan Event)
em.internal = make(chan bool)
go em.eventBus()
}
// Subscribe takes an eventType and an Channel and associates them in the eventBus. All future events of that type
// will be sent to the eventChannel.
func (em *Manager) Subscribe(eventType Type, eventChannel chan Event) {
em.mapMutex.Lock()
defer em.mapMutex.Unlock()
em.subscribers[eventType] = append(em.subscribers[eventType], eventChannel)
}
// Publish takes an Event and sends it to the internal eventBus where it is distributed to all Subscribers
func (em *Manager) Publish(event Event) {
if event.EventType != "" {
em.events <- event
}
}
// eventBus is an internal function that is used to distribute events to all subscribers
func (em *Manager) eventBus() {
for {
event := <-em.events
// In the case on an empty event. Teardown the Queue
if event.EventType == "" {
break
}
// maps aren't thread safe
em.mapMutex.Lock()
subscribers := em.subscribers[event.EventType]
em.mapMutex.Unlock()
// Send the event to any subscribers to that event type
for _, subscriber := range subscribers {
select {
case subscriber <- event:
log.Debugf("Sending %v to %v", event.EventType, subscriber)
default:
log.Errorf("Failed to send %v to %v. The subsystem might be running too slow!", event.EventType, subscriber)
}
}
}
// We are about to exit the eventbus thread, fire off an event internally
em.internal <- true
}
// Shutdown triggers, and waits for, the internal eventBus goroutine to finish
func (em *Manager) Shutdown() {
em.events <- Event{}
// wait for eventBus to finish
<-em.internal
close(em.events)
close(em.internal)
}

104
event/eventmanager_test.go Normal file
View File

@ -0,0 +1,104 @@
package event
import (
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"testing"
"time"
)
// Most basic Manager Test, Initialize, Subscribe, Publish, Receive
func TestEventManager(t *testing.T) {
eventManager := new(Manager)
eventManager.Initialize()
// We need to make this buffer at least 1, otherwise we will log an error!
testChan := make(chan Event, 1)
eventManager.Subscribe("TEST", testChan)
eventManager.Publish(Event{EventType: "TEST", Data: map[string]string{"Value": "Hello World"}})
event := <-testChan
if event.EventType == "TEST" && event.Data["Value"] == "Hello World" {
} else {
t.Errorf("Received Invalid Event")
}
eventManager.Shutdown()
}
// Most basic Manager Test, Initialize, Subscribe, Publish, Receive
func TestEventManagerOverflow(t *testing.T) {
eventManager := new(Manager)
eventManager.Initialize()
// Explicitly setting this to 0 log an error!
testChan := make(chan Event)
eventManager.Subscribe("TEST", testChan)
eventManager.Publish(Event{EventType: "TEST"})
}
func TestEventManagerMultiple(t *testing.T) {
log.SetLevel(log.LevelDebug)
eventManager := new(Manager)
eventManager.Initialize()
groupEventQueue := NewEventQueue(10)
peerEventQueue := NewEventQueue(10)
allEventQueue := NewEventQueue(10)
eventManager.Subscribe("PeerEvent", peerEventQueue.EventChannel)
eventManager.Subscribe("GroupEvent", groupEventQueue.EventChannel)
eventManager.Subscribe("PeerEvent", allEventQueue.EventChannel)
eventManager.Subscribe("GroupEvent", allEventQueue.EventChannel)
eventManager.Subscribe("ErrorEvent", allEventQueue.EventChannel)
eventManager.Publish(Event{EventType: "PeerEvent", Data: map[string]string{"Value": "Hello World Peer"}})
eventManager.Publish(Event{EventType: "GroupEvent", Data: map[string]string{"Value": "Hello World Group"}})
eventManager.Publish(Event{EventType: "PeerEvent", Data: map[string]string{"Value": "Hello World Peer"}})
eventManager.Publish(Event{EventType: "ErrorEvent", Data: map[string]string{"Value": "Hello World Error"}})
eventManager.Publish(Event{EventType: "NobodyIsSubscribedToThisEvent", Data: map[string]string{"Value": "Noone should see this!"}})
assertLength := func(len int, expected int, label string) {
if len != expected {
t.Errorf("Expected %s to be %v was %v", label, expected, len)
}
}
time.Sleep(time.Second)
assertLength(groupEventQueue.Backlog(), 1, "Group Event Queue Length")
assertLength(peerEventQueue.Backlog(), 2, "Peer Event Queue Length")
assertLength(allEventQueue.Backlog(), 4, "All Event Queue Length")
checkEvent := func(eventType Type, expected Type, label string) {
if eventType != expected {
t.Errorf("Expected %s to be %v was %v", label, expected, eventType)
}
}
event := groupEventQueue.Next()
checkEvent(event.EventType, "GroupEvent", "First Group Event")
event = peerEventQueue.Next()
checkEvent(event.EventType, "PeerEvent", "First Peer Event")
event = peerEventQueue.Next()
checkEvent(event.EventType, "PeerEvent", "Second Peer Event")
event = allEventQueue.Next()
checkEvent(event.EventType, "PeerEvent", "ALL: First Peer Event")
event = allEventQueue.Next()
checkEvent(event.EventType, "GroupEvent", "ALL: First Group Event")
event = allEventQueue.Next()
checkEvent(event.EventType, "PeerEvent", "ALL: Second Peer Event")
event = allEventQueue.Next()
checkEvent(event.EventType, "ErrorEvent", "ALL: First Error Event")
eventManager.Shutdown()
groupEventQueue.Shutdown()
peerEventQueue.Shutdown()
allEventQueue.Shutdown()
// Reading from a closed queue should result in an instant return and an empty event
event = groupEventQueue.Next()
checkEvent(event.EventType, "", "Test Next() on Empty Queue")
}

View File

@ -80,12 +80,12 @@ func TestTranscriptConsistency(t *testing.T) {
c5, s5, _ := alice.EncryptMessageToGroup("Hello World 5", group.GroupID) c5, s5, _ := alice.EncryptMessageToGroup("Hello World 5", group.GroupID)
t.Logf("Length of Encrypted Message: %v", len(c5)) t.Logf("Length of Encrypted Message: %v", len(c5))
_, m1 := sarah.AttemptDecryption(c1, s1) _, _, m1 := sarah.AttemptDecryption(c1, s1)
sarah.AttemptDecryption(c1, s1) // Try a duplicate sarah.AttemptDecryption(c1, s1) // Try a duplicate
_, m2 := sarah.AttemptDecryption(c2, s2) _, _, m2 := sarah.AttemptDecryption(c2, s2)
_, m3 := sarah.AttemptDecryption(c3, s3) _, _, m3 := sarah.AttemptDecryption(c3, s3)
_, m4 := sarah.AttemptDecryption(c4, s4) _, _, m4 := sarah.AttemptDecryption(c4, s4)
_, m5 := sarah.AttemptDecryption(c5, s5) _, _, m5 := sarah.AttemptDecryption(c5, s5)
// Now we simulate a client receiving these messages completely out of order // Now we simulate a client receiving these messages completely out of order
timeline.Insert(m1) timeline.Insert(m1)

View File

@ -314,7 +314,7 @@ func (p *Profile) AddGroup(group *Group) {
} }
// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups. // AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups.
func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, *Message) { func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message) {
for _, group := range p.Groups { for _, group := range p.Groups {
success, dgm := group.DecryptMessage(ciphertext) success, dgm := group.DecryptMessage(ciphertext)
if success { if success {
@ -329,7 +329,7 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool,
// We set the flag to be handled by the UX and reject the message. // We set the flag to be handled by the UX and reject the message.
if !valid { if !valid {
group.Compromised() group.Compromised()
return false, nil return false, "", nil
} }
} }
@ -340,13 +340,13 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool,
// Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised. // Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised.
if !verified { if !verified {
group.Compromised() group.Compromised()
return false, nil return false, "", nil
} }
return true, group.AddMessage(dgm, signature) return true, group.GroupID, group.AddMessage(dgm, signature)
} }
} }
return false, nil return false, "", nil
} }
func getRandomness(arr *[]byte) { func getRandomness(arr *[]byte) {

View File

@ -161,13 +161,13 @@ func TestProfileGroup(t *testing.T) {
bob.ProcessInvite(gci2.GetGroupChatInvite(), alice.Onion) bob.ProcessInvite(gci2.GetGroupChatInvite(), alice.Onion)
c3, s3, err := bob.EncryptMessageToGroup("Bobs Message", group2.GroupID) c3, s3, err := bob.EncryptMessageToGroup("Bobs Message", group2.GroupID)
if err == nil { if err == nil {
ok, message := alice.AttemptDecryption(c3, s3) ok, _, message := alice.AttemptDecryption(c3, s3)
if !ok { if !ok {
t.Errorf("Bobs message to the group should be decrypted %v %v", message, ok) t.Errorf("Bobs message to the group should be decrypted %v %v", message, ok)
} }
eve := GenerateNewProfile("eve") eve := GenerateNewProfile("eve")
ok, _ = eve.AttemptDecryption(c3, s3) ok, _, _ = eve.AttemptDecryption(c3, s3)
if ok { if ok {
t.Errorf("Eves hould not be able to decrypt messages!") t.Errorf("Eves hould not be able to decrypt messages!")
} }

View File

@ -1,48 +1,41 @@
package peer package peer
import ( import (
"crypto/rsa" "cwtch.im/cwtch/event"
"cwtch.im/cwtch/model" "cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer/connections"
"cwtch.im/cwtch/peer/peer"
"cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections"
"encoding/base64" "encoding/base64"
"errors" "errors"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/application"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/connection"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"git.openprivacy.ca/openprivacy/libricochet-go/utils" "git.openprivacy.ca/openprivacy/libricochet-go/utils"
"github.com/ethereum/go-ethereum/log"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"golang.org/x/crypto/ed25519" "golang.org/x/crypto/ed25519"
"strings" "strings"
"sync" "sync"
"time"
) )
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
type cwtchPeer struct { type cwtchPeer struct {
connection.AutoConnectionHandler Profile *model.Profile
Profile *model.Profile mutex sync.Mutex
app *application.RicochetApplication shutdown bool
acn connectivity.ACN started bool
mutex sync.Mutex
connectionsManager *connections.Manager engine *connections.Engine
dataHandler func(string, []byte) []byte queue *event.Queue
shutdown bool eventBus *event.Manager
aif application.ApplicationInstanceFactory
started bool
} }
// CwtchPeer provides us with a way of testing systems built on top of cwtch without having to // CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
// directly implement a cwtchPeer. // directly implement a cwtchPeer.
type CwtchPeer interface { type CwtchPeer interface {
Init(connectivity.ACN) Init(connectivity.ACN, *event.Manager)
PeerWithOnion(string) *connections.PeerPeerConnection PeerWithOnion(string) *connections.PeerPeerConnection
InviteOnionToGroup(string, string) error InviteOnionToGroup(string, string) error
SendMessageToPeer(string, string)
TrustPeer(string) error TrustPeer(string) error
BlockPeer(string) error BlockPeer(string) error
@ -67,9 +60,6 @@ type CwtchPeer interface {
GetContacts() []string GetContacts() []string
GetContact(string) *model.PublicProfile GetContact(string) *model.PublicProfile
SetApplicationInstanceFactory(factory application.ApplicationInstanceFactory)
SetPeerDataHandler(func(string, []byte) []byte)
IsStarted() bool IsStarted() bool
Listen() Listen()
Shutdown() Shutdown()
@ -91,10 +81,17 @@ func FromProfile(profile *model.Profile) CwtchPeer {
} }
// Init instantiates a cwtchPeer // Init instantiates a cwtchPeer
func (cp *cwtchPeer) Init(acn connectivity.ACN) { func (cp *cwtchPeer) Init(acn connectivity.ACN, eventBus *event.Manager) {
cp.acn = acn cp.queue = event.NewEventQueue(100)
cp.connectionsManager = connections.NewConnectionsManager(cp.acn) go cp.eventHandler()
go cp.connectionsManager.AttemptReconnections()
cp.eventBus = eventBus
cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue.EventChannel)
cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue.EventChannel)
// TODO: Would be nice if ProtocolEngine did not need to explictly be given the Private Key.
cp.engine = connections.NewProtocolEngine(cp.Profile.Ed25519PrivateKey, acn, eventBus)
cp.engine.Identity = identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey)
} }
// ImportGroup intializes a group from an imported source rather than a peer invite // ImportGroup intializes a group from an imported source rather than a peer invite
@ -121,20 +118,6 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err err
return return
} }
// a handler for the optional data handler
// note that the "correct" way to do this would be to AddChannelHandler("im.cwtch.peerdata", ...") but peerdata is such
// a handy channel that it's nice to have this convenience shortcut
// SetPeerDataHandler sets the handler for the (optional) data channel for cwtch peers.
func (cp *cwtchPeer) SetPeerDataHandler(dataHandler func(string, []byte) []byte) {
cp.dataHandler = dataHandler
}
// add extra channel handlers (note that the peer will merge these with the ones necessary to make cwtch work, so you
// are not clobbering the underlying functionality)
func (cp *cwtchPeer) SetApplicationInstanceFactory(aif application.ApplicationInstanceFactory) {
cp.aif = aif
}
// ExportGroup serializes a group invite so it can be given offline // ExportGroup serializes a group invite so it can be given offline
func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) { func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
group := cp.Profile.GetGroupByGroupID(groupID) group := cp.Profile.GetGroupByGroupID(groupID)
@ -180,81 +163,62 @@ func (cp *cwtchPeer) GetContact(onion string) *model.PublicProfile {
} }
// GetProfile returns the profile associated with this cwtchPeer. // GetProfile returns the profile associated with this cwtchPeer.
// TODO While it is probably "safe", it is not really "safe", to call functions on this profile. This only exists to return things like Name and Onion,we should gate these.
func (cp *cwtchPeer) GetProfile() *model.Profile { func (cp *cwtchPeer) GetProfile() *model.Profile {
return cp.Profile return cp.Profile
} }
// PeerWithOnion is the entry point for cwtchPeer relationships // PeerWithOnion is the entry point for cwtchPeer relationships
func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection { func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler, cp.aif) cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[string]string{"Onion": onion}))
return nil
} }
// InviteOnionToGroup kicks off the invite process // InviteOnionToGroup kicks off the invite process
func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error { func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
group := cp.Profile.GetGroupByGroupID(groupid) group := cp.Profile.GetGroupByGroupID(groupid)
if group != nil { if group == nil {
log.Infof("Constructing invite for group: %v\n", group) return errors.New("invalid group id")
invite, err := group.Invite(group.GetInitialMessage())
if err != nil {
return err
}
ppc := cp.connectionsManager.GetPeerPeerConnectionForOnion(onion)
if ppc == nil {
return errors.New("peer connection not setup for onion. peers must be trusted before sending")
}
if ppc.GetState() == connections.AUTHENTICATED {
log.Infof("Got connection for group: %v - Sending Invite\n", ppc)
ppc.SendGroupInvite(invite)
} else {
return errors.New("cannot send invite to onion: peer connection is not ready")
}
return nil
} }
return errors.New("group id could not be found")
}
// ReceiveGroupMessage is a callback function that processes GroupMessages from a given server invite, err := group.Invite(group.InitialMessage)
func (cp *cwtchPeer) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) { if err == nil {
cp.Profile.AttemptDecryption(gm.Ciphertext, gm.Signature) cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[string]string{"Onion": onion, "Invite": string(invite)}))
}
return err
} }
// JoinServer manages a new server connection with the given onion address // JoinServer manages a new server connection with the given onion address
func (cp *cwtchPeer) JoinServer(onion string) { func (cp *cwtchPeer) JoinServer(onion string) {
cp.connectionsManager.ManageServerConnection(onion, cp.ReceiveGroupMessage) cp.eventBus.Publish(event.NewEvent(event.JoinServer, map[string]string{"Onion": onion}))
} }
// SendMessageToGroup attemps to sent the given message to the given group id. // SendMessageToGroup attemps to sent the given message to the given group id.
func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error { func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error {
group := cp.Profile.GetGroupByGroupID(groupid) group := cp.Profile.GetGroupByGroupID(groupid)
if group == nil { if group == nil {
return errors.New("group does not exist") return errors.New("invalid group id")
}
psc := cp.connectionsManager.GetPeerServerConnectionForOnion(group.GroupServer)
if psc == nil {
return errors.New("could not find server connection to send message to")
} }
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid) ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
if err != nil {
return err if err == nil {
cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[string]string{"Server": group.GroupServer, "Ciphertext": string(ct), "Signature": string(sig)}))
} }
gm := &protocol.GroupMessage{
Ciphertext: ct,
Signature: sig,
}
err = psc.SendGroupMessage(gm)
return err return err
} }
func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) {
cp.eventBus.Publish(event.NewEvent(event.SendMessageToPeer, map[string]string{"Peer": onion, "Message": message}))
}
// GetPeers returns a list of peer connections. // GetPeers returns a list of peer connections.
func (cp *cwtchPeer) GetPeers() map[string]connections.ConnectionState { func (cp *cwtchPeer) GetPeers() map[string]connections.ConnectionState {
return cp.connectionsManager.GetPeers() return cp.engine.GetPeers()
} }
// GetServers returns a list of server connections // GetServers returns a list of server connections
func (cp *cwtchPeer) GetServers() map[string]connections.ConnectionState { func (cp *cwtchPeer) GetServers() map[string]connections.ConnectionState {
return cp.connectionsManager.GetServers() return cp.engine.GetServers()
} }
// TrustPeer sets an existing peer relationship to trusted // TrustPeer sets an existing peer relationship to trusted
@ -269,7 +233,7 @@ func (cp *cwtchPeer) TrustPeer(peer string) error {
// BlockPeer blocks an existing peer relationship. // BlockPeer blocks an existing peer relationship.
func (cp *cwtchPeer) BlockPeer(peer string) error { func (cp *cwtchPeer) BlockPeer(peer string) error {
err := cp.Profile.BlockPeer(peer) err := cp.Profile.BlockPeer(peer)
cp.connectionsManager.ClosePeerConnection(peer) cp.eventBus.Publish(event.NewEvent(event.BlockPeer, map[string]string{"Onion": peer}))
return err return err
} }
@ -283,90 +247,15 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
cp.Profile.RejectInvite(groupID) cp.Profile.RejectInvite(groupID)
} }
// LookupContact returns that a contact is known and allowed to communicate for all cases.
func (cp *cwtchPeer) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) {
blocked := cp.Profile.IsBlocked(hostname)
return !blocked, true
}
// LookupContactV3 returns that a contact is known and allowed to communicate for all cases.
func (cp *cwtchPeer) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
blocked := cp.Profile.IsBlocked(hostname)
return !blocked, true
}
// ContactRequest needed to implement ContactRequestHandler Interface
func (cp *cwtchPeer) ContactRequest(name string, message string) string {
return "Accepted"
}
func (cp *cwtchPeer) Listen() { func (cp *cwtchPeer) Listen() {
go func() { cp.eventBus.Publish(event.NewEvent(event.ProtocolEngineStartListen, map[string]string{}))
for !cp.shutdown {
e := cp.listenFn()
if e != nil {
// TODO: was panic, then fatal
fmt.Printf("ERROR: peer %v has crashed with: %v\n", cp.GetProfile().Onion, e)
}
// listenFn failed, wait 5 seconds and try again
time.Sleep(5 * time.Second)
}
}()
}
// Listen sets up an onion listener to process incoming cwtch messages
func (cp *cwtchPeer) listenFn() error {
ra := new(application.RicochetApplication)
onionService, err := cp.acn.Listen(cp.Profile.Ed25519PrivateKey, application.RicochetPort)
if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ {
return err
}
af := application.ApplicationInstanceFactory{}
af.Init()
af.AddHandler("im.cwtch.peer", func(rai *application.ApplicationInstance) func() channels.Handler {
cpi := new(CwtchPeerInstance)
cpi.Init(rai, ra)
return func() channels.Handler {
cpc := new(peer.CwtchPeerChannel)
cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp}
return cpc
}
})
if cp.dataHandler != nil {
af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler {
cpi := new(CwtchPeerInstance)
cpi.Init(rai, ra)
return func() channels.Handler {
cpc := new(peer.CwtchPeerDataChannel)
cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp, DataHandler: cp.dataHandler}
return cpc
}
})
}
handlers := cp.aif.GetHandlers()
for i := range handlers {
af.AddHandler(handlers[i], cp.aif.GetHandler(handlers[i]))
}
ra.Init(cp.acn, cp.Profile.Name, identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey), af, cp)
log.Infof("Running cwtch peer on %v", onionService.AddressFull())
cp.app = ra
cp.started = true
ra.Run(onionService)
return nil
} }
// Shutdown kills all connections and cleans up all goroutines for the peer // Shutdown kills all connections and cleans up all goroutines for the peer
func (cp *cwtchPeer) Shutdown() { func (cp *cwtchPeer) Shutdown() {
cp.shutdown = true cp.shutdown = true
cp.connectionsManager.Shutdown() cp.engine.Shutdown()
if cp.app != nil { cp.queue.Shutdown()
cp.app.Shutdown()
}
} }
// IsStarted returns true if Listen() has successfully been run before on this connection (ever). TODO: we will need to properly unset this flag on error if we want to support resumption in the future // IsStarted returns true if Listen() has successfully been run before on this connection (ever). TODO: we will need to properly unset this flag on error if we want to support resumption in the future
@ -374,32 +263,25 @@ func (cp *cwtchPeer) IsStarted() bool {
return cp.started return cp.started
} }
// CwtchPeerInstance encapsulates incoming peer connections // eventHandler process events from other subsystems
type CwtchPeerInstance struct { func (cp *cwtchPeer) eventHandler() {
rai *application.ApplicationInstance for {
ra *application.RicochetApplication ev := cp.queue.Next()
} switch ev.EventType {
case event.EncryptedGroupMessage:
// Init sets up a CwtchPeerInstance ok, groupID, _ := cp.Profile.AttemptDecryption([]byte(ev.Data["Ciphertext"]), []byte(ev.Data["Signature"]))
func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *application.RicochetApplication) { if ok {
cpi.rai = rai cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[string]string{"GroupID": groupID}))
cpi.ra = ra }
} case event.NewGroupInvite:
var groupInvite protocol.GroupChatInvite
// CwtchPeerHandler encapsulates handling of incoming CwtchPackets proto.Unmarshal([]byte(ev.Data["GroupInvite"]), &groupInvite)
type CwtchPeerHandler struct { cp.Profile.ProcessInvite(&groupInvite, ev.Data["Onion"])
Onion string default:
Peer *cwtchPeer if ev.EventType != "" {
DataHandler func(string, []byte) []byte log.Error("peer event handler received an event it was not subscribed for: %v")
} }
return
// HandleGroupInvite handles incoming GroupInvites }
func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) { }
log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String())
cph.Peer.Profile.ProcessInvite(gci, cph.Onion)
}
// HandlePacket handles the Cwtch cwtchPeer Data Channel
func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
return cph.DataHandler(cph.Onion, data)
} }

View File

@ -1,12 +1,12 @@
package peer package peer
import ( import (
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"testing" "testing"
) )
// TODO: Rewrite these tests (and others) using the news event bus interface.
func TestCwtchPeerGenerate(t *testing.T) { func TestCwtchPeerGenerate(t *testing.T) {
/**
alice := NewCwtchPeer("alice") alice := NewCwtchPeer("alice")
groupID, _, _ := alice.StartGroup("test.server") groupID, _, _ := alice.StartGroup("test.server")
@ -16,16 +16,21 @@ func TestCwtchPeerGenerate(t *testing.T) {
importedGroupID, err := alice.ImportGroup(exportedGroup) importedGroupID, err := alice.ImportGroup(exportedGroup)
group := alice.GetGroup(importedGroupID) group := alice.GetGroup(importedGroupID)
t.Logf("Imported Group: %v, err := %v %v", group, err, importedGroupID) t.Logf("Imported Group: %v, err := %v %v", group, err, importedGroupID)
*/
} }
func TestTrustPeer(t *testing.T) { func TestTrustPeer(t *testing.T) {
/**
groupName := "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd" groupName := "2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd"
alice := NewCwtchPeer("alice") alice := NewCwtchPeer("alice")
alice.Init(connectivity.LocalProvider()) aem := new(event.Manager)
aem.Initialize()
alice.Init(connectivity.LocalProvider(),aem)
defer alice.Shutdown() defer alice.Shutdown()
bob := NewCwtchPeer("bob") bob := NewCwtchPeer("bob")
bob.Init(connectivity.LocalProvider()) bem := new(event.Manager)
bem.Initialize()
bob.Init(connectivity.LocalProvider(), bem)
defer bob.Shutdown() defer bob.Shutdown()
bobOnion := bob.GetProfile().Onion bobOnion := bob.GetProfile().Onion
@ -70,4 +75,5 @@ func TestTrustPeer(t *testing.T) {
if err == nil { if err == nil {
t.Errorf("bob trusts alice but peer connection is not ready yet. should not be able to invite her to group, instead got: %v", err) t.Errorf("bob trusts alice but peer connection is not ready yet. should not be able to invite her to group, instead got: %v", err)
} }
*/
} }

View File

@ -1,9 +1,7 @@
package connections package connections
import ( import (
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol"
"git.openprivacy.ca/openprivacy/libricochet-go/application"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"sync" "sync"
"time" "time"
@ -29,13 +27,13 @@ func NewConnectionsManager(acn connectivity.ACN) *Manager {
} }
// ManagePeerConnection creates a new PeerConnection for the given Host and Profile. // ManagePeerConnection creates a new PeerConnection for the given Host and Profile.
func (m *Manager) ManagePeerConnection(host string, profile *model.Profile, dataHandler func(string, []byte) []byte, aif application.ApplicationInstanceFactory) *PeerPeerConnection { func (m *Manager) ManagePeerConnection(host string, engine *Engine) *PeerPeerConnection {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
_, exists := m.peerConnections[host] _, exists := m.peerConnections[host]
if !exists { if !exists {
ppc := NewPeerPeerConnection(m.acn, host, profile, dataHandler, aif) ppc := NewPeerPeerConnection(host, engine)
go ppc.Run() go ppc.Run()
m.peerConnections[host] = ppc m.peerConnections[host] = ppc
return ppc return ppc

View File

@ -0,0 +1,246 @@
package connections
import (
"crypto/rsa"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections/peer"
"errors"
"git.openprivacy.ca/openprivacy/libricochet-go/application"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"github.com/golang/protobuf/proto"
"golang.org/x/crypto/ed25519"
)
// Engine (ProtocolEngine) encapsulates the logic necessary to make and receive Cwtch connections.
// Note: ProtocolEngine doesn't have access to any information necessary to encrypt or decrypt GroupMessages
type Engine struct {
queue *event.Queue
connectionsManager *Manager
// Engine Attributes
Identity identity.Identity
ACN connectivity.ACN
app *application.RicochetApplication
// Engine State
started bool
// Pointer to the Global Event Manager
eventManager *event.Manager
privateKey ed25519.PrivateKey
}
// NewProtocolEngine initializes a new engine that runs Cwtch using the given parameters
func NewProtocolEngine(privateKey ed25519.PrivateKey, acn connectivity.ACN, eventManager *event.Manager) *Engine {
engine := new(Engine)
engine.privateKey = privateKey
engine.queue = event.NewEventQueue(100)
go engine.eventHandler()
engine.ACN = acn
engine.connectionsManager = NewConnectionsManager(engine.ACN)
go engine.connectionsManager.AttemptReconnections()
engine.eventManager = eventManager
engine.eventManager.Subscribe(event.ProtocolEngineStartListen, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.PeerRequest, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.InvitePeerToGroup, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.JoinServer, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.SendMessageToGroup, engine.queue.EventChannel)
engine.eventManager.Subscribe(event.SendMessageToPeer, engine.queue.EventChannel)
return engine
}
// eventHandler process events from other subsystems
func (e *Engine) eventHandler() {
for {
ev := e.queue.Next()
switch ev.EventType {
case event.StatusRequest:
e.eventManager.Publish(event.Event{EventType: event.ProtocolEngineStatus, EventID: ev.EventID})
case event.PeerRequest:
e.PeerWithOnion(ev.Data["Onion"])
case event.InvitePeerToGroup:
e.InviteOnionToGroup(ev.Data["Onion"], []byte(ev.Data["Invite"]))
case event.JoinServer:
e.JoinServer(ev.Data["Onion"])
case event.SendMessageToGroup:
e.SendMessageToGroup(ev.Data["Server"], []byte(ev.Data["Ciphertext"]), []byte(ev.Data["Signature"]))
case event.SendMessageToPeer:
log.Debugf("Sending Message to Peer.....")
ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(ev.Data["Peer"])
if ppc != nil {
// TODO this will block.
ppc.SendPacket([]byte(ev.Data["Message"]))
}
case event.ProtocolEngineStartListen:
go e.listenFn()
default:
return
}
}
}
// GetPeerHandler is an external interface function that allows callers access to a CwtchPeerHandler
// TODO: There is likely a slightly better way to encapsulate this behavior
func (e *Engine) GetPeerHandler(remotePeerHostname string) *CwtchPeerHandler {
return &CwtchPeerHandler{Onion: remotePeerHostname, EventBus: e.eventManager}
}
// Listen sets up an onion listener to process incoming cwtch messages
func (e *Engine) listenFn() {
ra := new(application.RicochetApplication)
onionService, err := e.ACN.Listen(e.privateKey, application.RicochetPort)
if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ {
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[string]string{"Identity": e.Identity.Hostname(), "Error": err.Error()}))
return
}
af := application.ApplicationInstanceFactory{}
af.Init()
af.AddHandler("im.cwtch.peer", func(rai *application.ApplicationInstance) func() channels.Handler {
cpi := new(CwtchPeerInstance)
cpi.Init(rai, ra)
return func() channels.Handler {
cpc := new(peer.CwtchPeerChannel)
cpc.Handler = e.GetPeerHandler(rai.RemoteHostname)
return cpc
}
})
af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler {
cpi := new(CwtchPeerInstance)
cpi.Init(rai, ra)
return func() channels.Handler {
cpc := new(peer.CwtchPeerDataChannel)
cpc.Handler = e.GetPeerHandler(rai.RemoteHostname)
return cpc
}
})
ra.Init(e.ACN, e.Identity.Name, e.Identity, af, e)
log.Infof("Running cwtch peer on %v", onionService.AddressFull())
e.started = true
e.app = ra
ra.Run(onionService)
e.eventManager.Publish(event.NewEvent(event.ProtocolEngineStopped, map[string]string{"Identity": e.Identity.Hostname()}))
return
}
// LookupContact returns that a contact is known and allowed to communicate for all cases.
func (e *Engine) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) {
return true, true
}
// LookupContactV3 returns that a contact is known and allowed to communicate for all cases.
func (e *Engine) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
return true, true
}
// ContactRequest needed to implement ContactRequestHandler Interface
func (e *Engine) ContactRequest(name string, message string) string {
return "Accepted"
}
// Shutdown tears down the eventHandler goroutine
func (e *Engine) Shutdown() {
e.connectionsManager.Shutdown()
e.app.Shutdown()
e.queue.Shutdown()
}
// PeerWithOnion is the entry point for cwtchPeer relationships
func (e *Engine) PeerWithOnion(onion string) *PeerPeerConnection {
return e.connectionsManager.ManagePeerConnection(onion, e)
}
// InviteOnionToGroup kicks off the invite process
func (e *Engine) InviteOnionToGroup(onion string, invite []byte) error {
ppc := e.connectionsManager.GetPeerPeerConnectionForOnion(onion)
if ppc == nil {
return errors.New("peer connection not setup for onion. peers must be trusted before sending")
}
if ppc.GetState() == AUTHENTICATED {
log.Infof("Got connection for group: %v - Sending Invite\n", ppc)
ppc.SendGroupInvite(invite)
} else {
return errors.New("cannot send invite to onion: peer connection is not ready")
}
return nil
}
// ReceiveGroupMessage is a callback function that processes GroupMessages from a given server
func (e *Engine) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) {
// Publish Event so that a Profile Engine can deal with it.
// Note: This technically means that *multiple* Profile Engines could listen to the same ProtocolEngine!
e.eventManager.Publish(event.NewEvent(event.EncryptedGroupMessage, map[string]string{"Ciphertext": string(gm.GetCiphertext()), "Signature": string(gm.GetSignature())}))
}
// JoinServer manages a new server connection with the given onion address
func (e *Engine) JoinServer(onion string) {
e.connectionsManager.ManageServerConnection(onion, e.ReceiveGroupMessage)
}
// SendMessageToGroup attemps to sent the given message to the given group id.
func (e *Engine) SendMessageToGroup(server string, ct []byte, sig []byte) error {
psc := e.connectionsManager.GetPeerServerConnectionForOnion(server)
if psc == nil {
return errors.New("could not find server connection to send message to")
}
gm := &protocol.GroupMessage{
Ciphertext: ct,
Signature: sig,
}
err := psc.SendGroupMessage(gm)
return err
}
// GetPeers returns a list of peer connections.
func (e *Engine) GetPeers() map[string]ConnectionState {
return e.connectionsManager.GetPeers()
}
// GetServers returns a list of server connections
func (e *Engine) GetServers() map[string]ConnectionState {
return e.connectionsManager.GetServers()
}
// CwtchPeerInstance encapsulates incoming peer connections
type CwtchPeerInstance struct {
rai *application.ApplicationInstance
ra *application.RicochetApplication
}
// Init sets up a CwtchPeerInstance
func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *application.RicochetApplication) {
cpi.rai = rai
cpi.ra = ra
}
// CwtchPeerHandler encapsulates handling of incoming CwtchPackets
type CwtchPeerHandler struct {
Onion string
EventBus *event.Manager
DataHandler func(string, []byte) []byte
}
// HandleGroupInvite handles incoming GroupInvites
func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String())
marshal, err := proto.Marshal(gci)
if err == nil {
cph.EventBus.Publish(event.NewEvent(event.NewGroupInvite, map[string]string{"Onion": cph.Onion, "GroupInvite": string(marshal)}))
}
}
// HandlePacket handles the Cwtch cwtchPeer Data Channel
func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
cph.EventBus.Publish(event.NewEvent(event.NewMessageFromPeer, map[string]string{"Onion": cph.Onion, "Data": string(data)}))
return []byte{} // TODO remove this
}

View File

@ -78,9 +78,7 @@ func (cplc *CwtchPeerListenChannel) Packet(data []byte) {
if err == nil { if err == nil {
if csp.GetGroupMessage() != nil { if csp.GetGroupMessage() != nil {
gm := csp.GetGroupMessage() gm := csp.GetGroupMessage()
// We create a new go routine here to avoid leaking any information about processing time cplc.Handler.HandleGroupMessage(gm)
// TODO Server can probably try to use this to DoS a peer
go cplc.Handler.HandleGroupMessage(gm)
} }
} }
} }

View File

@ -1,15 +1,10 @@
package connections package connections
import ( import (
"cwtch.im/cwtch/model" "cwtch.im/cwtch/protocol/connections/peer"
"cwtch.im/cwtch/peer/peer"
"cwtch.im/cwtch/protocol"
"git.openprivacy.ca/openprivacy/libricochet-go" "git.openprivacy.ca/openprivacy/libricochet-go"
"git.openprivacy.ca/openprivacy/libricochet-go/application"
"git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/connection" "git.openprivacy.ca/openprivacy/libricochet-go/connection"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/log"
"time" "time"
) )
@ -17,23 +12,17 @@ import (
// PeerPeerConnection encapsulates a single outgoing cwtchPeer->cwtchPeer connection // PeerPeerConnection encapsulates a single outgoing cwtchPeer->cwtchPeer connection
type PeerPeerConnection struct { type PeerPeerConnection struct {
connection.AutoConnectionHandler connection.AutoConnectionHandler
PeerHostname string PeerHostname string
state ConnectionState state ConnectionState
connection *connection.Connection connection *connection.Connection
profile *model.Profile protocolEngine *Engine
dataHandler func(string, []byte) []byte
aif application.ApplicationInstanceFactory
acn connectivity.ACN
} }
// NewPeerPeerConnection creates a new peer connection for the given hostname and profile. // NewPeerPeerConnection creates a new peer connection for the given hostname and profile.
func NewPeerPeerConnection(acn connectivity.ACN, peerhostname string, profile *model.Profile, dataHandler func(string, []byte) []byte, aif application.ApplicationInstanceFactory) *PeerPeerConnection { func NewPeerPeerConnection(peerhostname string, protocolEngine *Engine) *PeerPeerConnection {
ppc := new(PeerPeerConnection) ppc := new(PeerPeerConnection)
ppc.acn = acn
ppc.PeerHostname = peerhostname ppc.PeerHostname = peerhostname
ppc.profile = profile ppc.protocolEngine = protocolEngine
ppc.dataHandler = dataHandler
ppc.aif = aif
ppc.Init() ppc.Init()
return ppc return ppc
} }
@ -43,16 +32,6 @@ func (ppc *PeerPeerConnection) GetState() ConnectionState {
return ppc.state return ppc.state
} }
// HandleGroupInvite passes the given group invite tothe profile
func (ppc *PeerPeerConnection) HandleGroupInvite(gci *protocol.GroupChatInvite) {
ppc.profile.ProcessInvite(gci, ppc.PeerHostname)
}
// HandlePacket handles data packets on the optional data channel
func (ppc *PeerPeerConnection) HandlePacket(data []byte) []byte {
return ppc.dataHandler(ppc.PeerHostname, data)
}
// SendPacket sends data packets on the optional data channel // SendPacket sends data packets on the optional data channel
func (ppc *PeerPeerConnection) SendPacket(data []byte) { func (ppc *PeerPeerConnection) SendPacket(data []byte) {
ppc.WaitTilAuthenticated() ppc.WaitTilAuthenticated()
@ -69,18 +48,6 @@ func (ppc *PeerPeerConnection) SendPacket(data []byte) {
}) })
} }
// DoOnChannel performs an operation on the requested channel
func (ppc *PeerPeerConnection) DoOnChannel(ctype string, direction channels.Direction, doSomethingWith func(channel *channels.Channel)) {
ppc.WaitTilAuthenticated()
ppc.connection.Do(func() error {
channel := ppc.connection.Channel(ctype, direction)
if channel != nil {
doSomethingWith(channel)
}
return nil
})
}
// SendGroupInvite sends the given serialized invite packet to the Peer // SendGroupInvite sends the given serialized invite packet to the Peer
func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) { func (ppc *PeerPeerConnection) SendGroupInvite(invite []byte) {
ppc.WaitTilAuthenticated() ppc.WaitTilAuthenticated()
@ -110,33 +77,23 @@ func (ppc *PeerPeerConnection) WaitTilAuthenticated() {
// Run manages the setup and teardown of a peer->peer connection // Run manages the setup and teardown of a peer->peer connection
func (ppc *PeerPeerConnection) Run() error { func (ppc *PeerPeerConnection) Run() error {
ppc.state = CONNECTING ppc.state = CONNECTING
rc, err := goricochet.Open(ppc.acn, ppc.PeerHostname) rc, err := goricochet.Open(ppc.protocolEngine.ACN, ppc.PeerHostname)
if err == nil { if err == nil {
ppc.connection = rc ppc.connection = rc
ppc.state = CONNECTED ppc.state = CONNECTED
_, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(identity.InitializeV3(ppc.profile.Name, &ppc.profile.Ed25519PrivateKey, &ppc.profile.Ed25519PublicKey)) _, err := connection.HandleOutboundConnection(ppc.connection).ProcessAuthAsV3Client(ppc.protocolEngine.Identity)
if err == nil { if err == nil {
ppc.state = AUTHENTICATED ppc.state = AUTHENTICATED
go func() { go func() {
ppc.connection.Do(func() error { ppc.connection.Do(func() error {
ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc}) ppc.connection.RequestOpenChannel("im.cwtch.peer", &peer.CwtchPeerChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)})
return nil return nil
}) })
if ppc.dataHandler != nil { ppc.connection.Do(func() error {
ppc.connection.Do(func() error { ppc.connection.RequestOpenChannel("im.cwtch.peer.data", &peer.CwtchPeerDataChannel{Handler: ppc.protocolEngine.GetPeerHandler(ppc.PeerHostname)})
ppc.connection.RequestOpenChannel("im.cwtch.peer.data", &peer.CwtchPeerDataChannel{Handler: ppc}) return nil
return nil })
})
}
handlers := ppc.aif.GetHandlers()
for i := range handlers {
ppc.connection.Do(func() error {
ppc.connection.RequestOpenChannel(handlers[i], ppc.aif.GetHandler(handlers[i])(ppc.aif.GetApplicationInstance(ppc.connection))())
return nil
})
}
}() }()
ppc.connection.Process(ppc) ppc.connection.Process(ppc)

View File

@ -3,11 +3,10 @@ package connections
import ( import (
"crypto/rand" "crypto/rand"
"cwtch.im/cwtch/model" "cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer/peer"
"cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections/peer"
"fmt" "fmt"
"git.openprivacy.ca/openprivacy/libricochet-go" "git.openprivacy.ca/openprivacy/libricochet-go"
"git.openprivacy.ca/openprivacy/libricochet-go/application"
"git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/connection" "git.openprivacy.ca/openprivacy/libricochet-go/connection"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
@ -61,7 +60,7 @@ func TestPeerPeerConnection(t *testing.T) {
profile := model.GenerateNewProfile("alice") profile := model.GenerateNewProfile("alice")
hostname := identity.Hostname() hostname := identity.Hostname()
ppc := NewPeerPeerConnection(connectivity.LocalProvider(), "127.0.0.1:5452|"+hostname, profile, nil, application.ApplicationInstanceFactory{}) ppc := NewPeerPeerConnection("127.0.0.1:5452|"+hostname, &Engine{ACN: connectivity.LocalProvider(), Identity: identity})
tp := new(TestPeer) tp := new(TestPeer)
tp.Init() tp.Init()

View File

@ -2,10 +2,10 @@ package connections
import ( import (
"crypto/rand" "crypto/rand"
"cwtch.im/cwtch/peer/fetch"
"cwtch.im/cwtch/peer/listen"
"cwtch.im/cwtch/peer/send"
"cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/connections/fetch"
"cwtch.im/cwtch/protocol/connections/listen"
"cwtch.im/cwtch/protocol/connections/send"
"errors" "errors"
"git.openprivacy.ca/openprivacy/libricochet-go" "git.openprivacy.ca/openprivacy/libricochet-go"
"git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/channels"

View File

@ -2,7 +2,7 @@ package send
import ( import (
"cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/spam" "cwtch.im/cwtch/protocol/connections/spam"
"errors" "errors"
"git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/utils" "git.openprivacy.ca/openprivacy/libricochet-go/utils"

View File

@ -2,7 +2,7 @@ package send
import ( import (
"cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/spam" "cwtch.im/cwtch/protocol/connections/spam"
"git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/wire/control" "git.openprivacy.ca/openprivacy/libricochet-go/wire/control"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"

View File

@ -59,7 +59,7 @@ func (mp *Monitors) run() {
func (mp *Monitors) report() { func (mp *Monitors) report() {
f, err := os.Create(path.Join(mp.configDir, reportFile)) f, err := os.Create(path.Join(mp.configDir, reportFile))
if err != nil { if err != nil {
log.Errorf("Could not open monitor reporting file: ", err) log.Errorf("Could not open monitor reporting file: %v", err)
return return
} }
defer f.Close() defer f.Close()

View File

@ -2,7 +2,7 @@ package send
import ( import (
"cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/spam" "cwtch.im/cwtch/protocol/connections/spam"
"errors" "errors"
"git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/log"

View File

@ -2,7 +2,7 @@ package send
import ( import (
"cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol"
"cwtch.im/cwtch/protocol/spam" "cwtch.im/cwtch/protocol/connections/spam"
"git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/wire/control" "git.openprivacy.ca/openprivacy/libricochet-go/wire/control"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"

View File

@ -50,7 +50,7 @@ func LoadConfig(configDir, filename string) Config {
err = json.Unmarshal(raw, &config) err = json.Unmarshal(raw, &config)
if err != nil { if err != nil {
log.Errorf("reading config: ", err) log.Errorf("reading config: %v", err)
} }
} }

View File

@ -1,15 +1,17 @@
package testing package testing
import ( import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model" "cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer" "cwtch.im/cwtch/peer"
"cwtch.im/cwtch/peer/connections" "cwtch.im/cwtch/protocol/connections"
cwtchserver "cwtch.im/cwtch/server" cwtchserver "cwtch.im/cwtch/server"
"fmt" "fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"golang.org/x/net/proxy" "golang.org/x/net/proxy"
"os" "os"
"runtime" "runtime"
"runtime/pprof"
"testing" "testing"
"time" "time"
) )
@ -62,13 +64,13 @@ func waitForPeerServerConnection(t *testing.T, peer peer.CwtchPeer, server strin
} }
if state != connections.AUTHENTICATED { if state != connections.AUTHENTICATED {
fmt.Printf("peer %v waiting connect to server %v, currently: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state]) fmt.Printf("peer %v waiting connect to server %v, currently: %v\n", peer.GetProfile().Onion, server, connections.ConnectionStateName[state])
time.Sleep(time.Second * 10) time.Sleep(time.Second * 5)
continue continue
} else {
break
} }
} else { } // It might take a second for the server to show up as it is now going through the event bus
t.Fatalf("peer server connectiond %v should have entry for server %v", servers, server) time.Sleep(time.Second)
}
break
} }
return return
} }
@ -83,13 +85,13 @@ func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.Cw
} }
if state != connections.AUTHENTICATED { if state != connections.AUTHENTICATED {
fmt.Printf("peer% v waiting connect to peer %v, currently: %v\n", peera.GetProfile().Onion, peerb.GetProfile().Onion, connections.ConnectionStateName[state]) fmt.Printf("peer% v waiting connect to peer %v, currently: %v\n", peera.GetProfile().Onion, peerb.GetProfile().Onion, connections.ConnectionStateName[state])
time.Sleep(time.Second * 10) time.Sleep(time.Second * 5)
continue continue
} else {
break
} }
} else { } // It might take a second for the peer to show up as it is now going through the event bus
t.Fatalf("peer peer connectiond %v should have entry for server %v", peers, peerb.GetProfile().Onion) time.Sleep(time.Second)
}
break
} }
return return
} }
@ -130,21 +132,29 @@ func TestCwtchPeerIntegration(t *testing.T) {
// ***** cwtchPeer setup ***** // ***** cwtchPeer setup *****
// It's important that each Peer have their own EventBus
aliceEventBus := new(event.Manager)
aliceEventBus.Initialize()
bobEventBus := new(event.Manager)
bobEventBus.Initialize()
carolEventBus := new(event.Manager)
carolEventBus.Initialize()
fmt.Println("Creating Alice...") fmt.Println("Creating Alice...")
alice := peer.NewCwtchPeer("Alice") alice := peer.NewCwtchPeer("Alice")
alice.Init(acn) alice.Init(acn, aliceEventBus)
alice.Listen() alice.Listen()
fmt.Println("Alice created:", alice.GetProfile().Onion) fmt.Println("Alice created:", alice.GetProfile().Onion)
fmt.Println("Creating Bob...") fmt.Println("Creating Bob...")
bob := peer.NewCwtchPeer("Bob") bob := peer.NewCwtchPeer("Bob")
bob.Init(acn) bob.Init(acn, bobEventBus)
bob.Listen() bob.Listen()
fmt.Println("Bob created:", bob.GetProfile().Onion) fmt.Println("Bob created:", bob.GetProfile().Onion)
fmt.Println("Creating Carol...") fmt.Println("Creating Carol...")
carol := peer.NewCwtchPeer("Carol") carol := peer.NewCwtchPeer("Carol")
carol.Init(acn) carol.Init(acn, carolEventBus)
carol.Listen() carol.Listen()
fmt.Println("Carol created:", carol.GetProfile().Onion) fmt.Println("Carol created:", carol.GetProfile().Onion)
@ -280,6 +290,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Println("Shutting down Alice...") fmt.Println("Shutting down Alice...")
alice.Shutdown() alice.Shutdown()
aliceEventBus.Shutdown()
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
numGoRoutinesPostAlice := runtime.NumGoroutine() numGoRoutinesPostAlice := runtime.NumGoroutine()
@ -368,6 +379,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Println("Shutting down Bob...") fmt.Println("Shutting down Bob...")
bob.Shutdown() bob.Shutdown()
bobEventBus.Shutdown()
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
numGoRoutinesPostBob := runtime.NumGoroutine() numGoRoutinesPostBob := runtime.NumGoroutine()
if server != nil { if server != nil {
@ -377,11 +389,16 @@ func TestCwtchPeerIntegration(t *testing.T) {
} }
numGoRoutinesPostServerShutdown := runtime.NumGoroutine() numGoRoutinesPostServerShutdown := runtime.NumGoroutine()
fmt.Println("Shuttind down Carol...") fmt.Println("Shutting down Carol...")
carol.Shutdown() carol.Shutdown()
carolEventBus.Shutdown()
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
numGoRoutinesPostCarol := runtime.NumGoroutine() numGoRoutinesPostCarol := runtime.NumGoroutine()
// Printing out the current goroutines
// Very useful if we are leaking any.
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\n", "numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\n",
numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,

View File

@ -3,13 +3,14 @@
set -e set -e
pwd pwd
go test ${1} -coverprofile=model.cover.out -v ./model go test ${1} -coverprofile=model.cover.out -v ./model
go test ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/spam go test ${1} -coverprofile=event.cover.out -v ./event
go test ${1} -coverprofile=storage.cover.out -v ./storage go test ${1} -coverprofile=storage.cover.out -v ./storage
go test ${1} -coverprofile=peer.connections.cover.out -v ./peer/connections go test ${1} -coverprofile=peer.connections.cover.out -v ./protocol/connections
go test ${1} -coverprofile=peer.fetch.cover.out -v ./peer/fetch go test ${1} -coverprofile=protocol.spam.cover.out -v ./protocol/connections/spam
go test ${1} -coverprofile=peer.listen.cover.out -v ./peer/listen go test ${1} -coverprofile=peer.fetch.cover.out -v ./protocol/connections/fetch
go test ${1} -coverprofile=peer.peer.cover.out -v ./peer/peer go test ${1} -coverprofile=peer.listen.cover.out -v ./protocol/connections/listen
go test ${1} -coverprofile=peer.send.cover.out -v ./peer/send go test ${1} -coverprofile=peer.peer.cover.out -v ./protocol/connections/peer
go test ${1} -coverprofile=peer.send.cover.out -v ./protocol/connections/send
go test ${1} -coverprofile=peer.cover.out -v ./peer go test ${1} -coverprofile=peer.cover.out -v ./peer
go test ${1} -coverprofile=server.fetch.cover.out -v ./server/fetch go test ${1} -coverprofile=server.fetch.cover.out -v ./server/fetch
go test ${1} -coverprofile=server.listen.cover.out -v ./server/listen go test ${1} -coverprofile=server.listen.cover.out -v ./server/listen