From 258cf84e6848de07ddf71784bc4649bec0a7c2a9 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Mon, 3 Feb 2020 13:46:15 -0500 Subject: [PATCH] fixing race conditions; removing peer.GetProfile as unsafe --- .drone.yml | 2 +- app/appClient.go | 4 +- app/appService.go | 2 + app/applets.go | 14 +- app/bots/servermon/main.go | 8 +- app/cli/main.go | 16 +-- app/peer/alice/alice.go | 2 +- event/bridge/pipeBridge.go | 49 ++++--- go.mod | 13 +- go.sum | 42 +++--- peer/cwtch_peer.go | 123 ++++++++++++++++-- protocol/connections/engine.go | 1 + protocol/fuzzing/groups/fuzz.go | 18 ++- server/metrics/metrics_test.go | 2 +- server/metrics/monitors.go | 16 ++- testing/cwtch_peer_server_integration_test.go | 96 +++++++------- 16 files changed, 274 insertions(+), 134 deletions(-) diff --git a/.drone.yml b/.drone.yml index 5366861..fc1fa5f 100644 --- a/.drone.yml +++ b/.drone.yml @@ -43,7 +43,7 @@ pipeline: commands: - ./tor -f ./torrc - sleep 15 - - go test -v cwtch.im/cwtch/testing/ + - go test -race -v cwtch.im/cwtch/testing/ notify-email: image: drillster/drone-email host: build.openprivacy.ca diff --git a/app/appClient.go b/app/appClient.go index fd016ba..50f9c5d 100644 --- a/app/appClient.go +++ b/app/appClient.go @@ -83,8 +83,8 @@ func (ac *applicationClient) newPeer(localID, key, salt string, reload bool) { peer := peer.FromProfile(profile) peer.Init(eventBus) - ac.acmutex.Lock() - defer ac.acmutex.Unlock() + ac.peerLock.Lock() + defer ac.peerLock.Unlock() ac.peers[profile.Onion] = peer ac.eventBuses[profile.Onion] = eventBus npEvent := event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion}) diff --git a/app/appService.go b/app/appService.go index 03055a5..3afa5b2 100644 --- a/app/appService.go +++ b/app/appService.go @@ -150,6 +150,8 @@ func (as *applicationService) getACNStatusHandler() func(int, string) { return func(progress int, status string) { progStr := strconv.Itoa(progress) as.bridge.Write(&event.IPCMessage{Dest: DestApp, Message: event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)}) + as.applicationCore.coremutex.Lock() + defer as.applicationCore.coremutex.Unlock() for _, bus := range as.eventBuses { bus.Publish(event.NewEventList(event.ACNStatus, event.Progreess, progStr, event.Status, status)) } diff --git a/app/applets.go b/app/applets.go index 751ef0a..1dc0339 100644 --- a/app/applets.go +++ b/app/applets.go @@ -11,6 +11,7 @@ import ( ) type appletPeers struct { + peerLock sync.Mutex peers map[string]peer.CwtchPeer launched bool // bit hacky, place holder while we transition to full multi peer support and a better api } @@ -46,13 +47,19 @@ func (ap *appletPeers) init() { // LaunchPeers starts each peer Listening and connecting to peers and groups func (ap *appletPeers) LaunchPeers() { log.Debugf("appletPeers LaunchPeers\n") + ap.peerLock.Lock() + defer ap.peerLock.Unlock() if ap.launched { return } - for _, p := range ap.peers { + for pid, p := range ap.peers { + log.Debugf("Launching %v\n", pid) p.Listen() + log.Debugf("done Listen() for %v\n", pid) p.StartPeersConnections() + log.Debugf("done StartPeersConnections() for %v\n", pid) p.StartGroupConnections() + log.Debugf("done StartGroupConnections() for %v\n", pid) } ap.launched = true } @@ -60,8 +67,11 @@ func (ap *appletPeers) LaunchPeers() { // ListPeers returns a map of onions to their profile's Name func (ap *appletPeers) ListPeers() map[string]string { keys := map[string]string{} + + ap.peerLock.Lock() + defer ap.peerLock.Unlock() for k, p := range ap.peers { - keys[k] = p.GetProfile().Name + keys[k] = p.GetName() } return keys } diff --git a/app/bots/servermon/main.go b/app/bots/servermon/main.go index 7fb8b01..59785a1 100644 --- a/app/bots/servermon/main.go +++ b/app/bots/servermon/main.go @@ -14,14 +14,14 @@ import ( func waitForPeerGroupConnection(peer peer.CwtchPeer, groupID string) error { for { - _, ok := peer.GetProfile().Groups[groupID] - if ok { - state := peer.GetGroupState(groupID) + group := peer.GetGroup(groupID) + if group != nil { + state, _ := peer.GetGroupState(groupID) if state == connections.FAILED { return errors.New("Connection to group " + groupID + " failed!") } if state != connections.AUTHENTICATED { - fmt.Printf("peer %v waiting to authenticate with group %v 's server, current state: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state]) + fmt.Printf("peer %v waiting to authenticate with group %v 's server, current state: %v\n", peer.GetOnion(), groupID, connections.ConnectionStateName[state]) time.Sleep(time.Second * 10) continue } diff --git a/app/cli/main.go b/app/cli/main.go index 8b6ae5d..55b257c 100644 --- a/app/cli/main.go +++ b/app/cli/main.go @@ -84,8 +84,8 @@ func printMessage(m model.Message) { name := "unknown" if p != nil { name = p.Name - } else if peer.GetProfile().Onion == m.PeerID { - name = peer.GetProfile().Name + } else if peer.GetOnion() == m.PeerID { + name = peer.GetName() } fmt.Printf("%v %v (%v): %v\n", m.Timestamp, name, m.PeerID, m.Message) @@ -221,7 +221,7 @@ func handleAppEvents(em event.Manager) { p := app.GetPeer(onion) app.LaunchPeers() - fmt.Printf("\nLoaded profile %v (%v)\n", p.GetProfile().Name, p.GetProfile().Onion) + fmt.Printf("\nLoaded profile %v (%v)\n", p.GetName(), p.GetOnion()) suggestions = append(suggestionsBase, suggestionsSelectedProfile...) profiles := app.ListPeers() @@ -304,9 +304,9 @@ func main() { prmpt = "cwtch> " if group != nil { - prmpt = fmt.Sprintf("cwtch %v (%v) [%v] say> ", peer.GetProfile().Name, peer.GetProfile().Onion, group.GroupID) + prmpt = fmt.Sprintf("cwtch %v (%v) [%v] say> ", peer.GetName(), peer.GetOnion(), group.GroupID) } else if peer != nil { - prmpt = fmt.Sprintf("cwtch %v (%v)> ", peer.GetProfile().Name, peer.GetProfile().Onion) + prmpt = fmt.Sprintf("cwtch %v (%v)> ", peer.GetName(), peer.GetOnion()) } text := prompt.Input(prmpt, completer, prompt.OptionSuggestionBGColor(prompt.Purple), @@ -409,7 +409,7 @@ func main() { // Auto cwtchPeer / Join Server // TODO There are some privacy implications with this that we should // think over. - for _, name := range p.GetProfile().GetContacts() { + for _, name := range p.GetContacts() { profile := p.GetContact(name) if profile.Trusted && !profile.Blocked { p.PeerWithOnion(profile.Onion) @@ -428,7 +428,7 @@ func main() { } case "/info": if peer != nil { - fmt.Printf("Address cwtch:%v\n", peer.GetProfile().Onion) + fmt.Printf("Address cwtch:%v\n", peer.GetOnion()) } else { fmt.Printf("Profile needs to be set\n") } @@ -594,7 +594,7 @@ func main() { for i := 0; i < 100; i++ { found := false for _, m := range timeline { - if m.Message == fmt.Sprintf("this is message %v", i) && m.PeerID == peer.GetProfile().Onion { + if m.Message == fmt.Sprintf("this is message %v", i) && m.PeerID == peer.GetOnion() { found = true latency := m.Received.Sub(m.Timestamp) fmt.Printf("Latency for Message %v was %v\n", i, latency) diff --git a/app/peer/alice/alice.go b/app/peer/alice/alice.go index c3d7d1e..b234ce3 100644 --- a/app/peer/alice/alice.go +++ b/app/peer/alice/alice.go @@ -26,7 +26,7 @@ func main() { app.CreatePeer("alice", "be gay, do crimes") alice := utils.WaitGetPeer(app, "alice") app.LaunchPeers() - eventBus := app.GetEventBus(alice.GetProfile().Onion) + eventBus := app.GetEventBus(alice.GetOnion()) queue := event.NewQueue() eventBus.Subscribe(event.NewMessageFromPeer, queue) diff --git a/event/bridge/pipeBridge.go b/event/bridge/pipeBridge.go index fb6ea45..dff6ab8 100644 --- a/event/bridge/pipeBridge.go +++ b/event/bridge/pipeBridge.go @@ -3,17 +3,16 @@ package bridge import ( + "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol/connections" "encoding/base64" "encoding/binary" - "git.openprivacy.ca/openprivacy/libricochet-go/log" - "syscall" - "time" - - "cwtch.im/cwtch/event" "encoding/json" + "git.openprivacy.ca/openprivacy/libricochet-go/log" "os" "sync" + "syscall" + "time" ) /* pipeBridge creates a pair of named pipes @@ -76,23 +75,37 @@ func NewPipeBridgeService(inFilename, outFilename string) event.IPCBridge { return pb } +func (pb *pipeBridge) setState(state connections.ConnectionState) { + pb.lock.Lock() + defer pb.lock.Unlock() + + pb.state = state +} + +func (pb *pipeBridge) getState() connections.ConnectionState { + pb.lock.Lock() + defer pb.lock.Unlock() + + return pb.state +} + func (pb *pipeBridge) connectionManager() { - for pb.state != connections.KILLED { + for pb.getState() != connections.KILLED { log.Debugf("clientConnManager loop start init\n") - pb.state = connections.CONNECTING + pb.setState(connections.CONNECTING) var err error log.Debugf("%v open file infile\n", pb.name) pb.in, err = os.OpenFile(pb.infile, os.O_RDWR, 0600) if err != nil { - pb.state = connections.DISCONNECTED + pb.setState(connections.DISCONNECTED) continue } log.Debugf("%v open file outfile\n", pb.name) pb.out, err = os.OpenFile(pb.outfile, os.O_RDWR, 0600) if err != nil { - pb.state = connections.DISCONNECTED + pb.setState(connections.DISCONNECTED) continue } @@ -167,12 +180,12 @@ func (pb *pipeBridge) threeShakeClient() bool { func (pb *pipeBridge) handleConns() { if !pb.threeShake() { - pb.state = connections.FAILED + pb.setState(connections.FAILED) pb.closeReset() return } - pb.state = connections.AUTHENTICATED + pb.setState(connections.AUTHENTICATED) pb.closedChan = make(chan bool, 5) @@ -183,8 +196,8 @@ func (pb *pipeBridge) handleConns() { <-pb.closedChan log.Debugf("handleConns <-closedChan (%v)\n", pb.name) - if pb.state != connections.KILLED { - pb.state = connections.FAILED + if pb.getState() != connections.KILLED { + pb.setState(connections.FAILED) } pb.closeReset() log.Debugf("handleConns done for %v, exit\n", pb.name) @@ -196,7 +209,7 @@ func (pb *pipeBridge) closeReset() { close(pb.read) pb.write.Close() - if pb.state != connections.KILLED { + if pb.getState() != connections.KILLED { pb.read = make(chan event.IPCMessage, maxBufferSize) pb.write = newInfiniteChannel() } @@ -219,7 +232,7 @@ func (pb *pipeBridge) handleWrite() { } else { log.Debugf("handleWrite <- message: %v\n", message) } - if pb.state == connections.AUTHENTICATED { + if pb.getState() == connections.AUTHENTICATED { encMessage := &event.IPCMessage{Dest: message.Dest, Message: event.Event{EventType: message.Message.EventType, EventID: message.Message.EventID, Data: make(map[event.Field]string)}} for k, v := range message.Message.Data { encMessage.Message.Data[k] = base64.StdEncoding.EncodeToString([]byte(v)) @@ -276,7 +289,7 @@ func (pb *pipeBridge) Read() (*event.IPCMessage, bool) { log.Debugf("Read() %v...\n", pb.name) var ok = false var message event.IPCMessage - for !ok && pb.state != connections.KILLED { + for !ok && pb.getState() != connections.KILLED { message, ok = <-pb.read if message.Message.EventType == event.EncryptedGroupMessage || message.Message.EventType == event.SendMessageToGroup || message.Message.EventType == event.NewMessageFromGroup { log.Debugf("Read %v: %v %v ...\n", pb.name, message.Dest, message.Message.EventType) @@ -284,7 +297,7 @@ func (pb *pipeBridge) Read() (*event.IPCMessage, bool) { log.Debugf("Read %v: %v\n", pb.name, message) } } - return &message, pb.state != connections.KILLED + return &message, pb.getState() != connections.KILLED } func (pb *pipeBridge) Write(message *event.IPCMessage) { @@ -298,7 +311,7 @@ func (pb *pipeBridge) Write(message *event.IPCMessage) { } func (pb *pipeBridge) Shutdown() { - log.Debugf("pb.Shutdown() for %v currently in state: %v\n", pb.name, connections.ConnectionStateName[pb.state]) + log.Debugf("pb.Shutdown() for %v currently in state: %v\n", pb.name, connections.ConnectionStateName[pb.getState()]) pb.state = connections.KILLED pb.closedChan <- true log.Debugf("Done Shutdown for %v\n", pb.name) diff --git a/go.mod b/go.mod index 00cf497..0be4bf1 100644 --- a/go.mod +++ b/go.mod @@ -2,17 +2,16 @@ module cwtch.im/cwtch require ( cwtch.im/tapir v0.1.14 - git.openprivacy.ca/openprivacy/libricochet-go v1.0.9 + git.openprivacy.ca/openprivacy/libricochet-go v1.0.10 github.com/c-bata/go-prompt v0.2.3 github.com/golang/protobuf v1.3.3 - github.com/mattn/go-colorable v0.1.2 // indirect - github.com/mattn/go-runewidth v0.0.4 // indirect - github.com/mattn/go-tty v0.0.0-20190424173100-523744f04859 // indirect + github.com/mattn/go-runewidth v0.0.8 // indirect + github.com/mattn/go-tty v0.0.3 // indirect github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 // indirect - github.com/struCoder/pidusage v0.1.2 + github.com/struCoder/pidusage v0.1.3 golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d - golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa - golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9 // indirect + golang.org/x/net v0.0.0-20200202094626-16171245cfb2 + golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 // indirect ) go 1.13 diff --git a/go.sum b/go.sum index 2828950..cff8adb 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,8 @@ cwtch.im/tapir v0.1.14 h1:lg+reZNT998l++4Q4RQBLXYv3ukqWffhI0Wed9RSjuA= cwtch.im/tapir v0.1.14/go.mod h1:QwERb982YIes9UOxDqIthm1HZ1xy0YQetD2+XxDbg9Y= -git.openprivacy.ca/openprivacy/libricochet-go v1.0.4 h1:GWLMJ5jBSIC/gFXzdbbeVz7fIAn2FTgW8+wBci6/3Ek= git.openprivacy.ca/openprivacy/libricochet-go v1.0.4/go.mod h1:yMSG1gBaP4f1U+RMZXN85d29D39OK5s8aTpyVRoH5FY= -git.openprivacy.ca/openprivacy/libricochet-go v1.0.9 h1:saqwSfxx8MJ16KT10F3wSiMi8iWvgFdrCkUk5WZVGzs= -git.openprivacy.ca/openprivacy/libricochet-go v1.0.9/go.mod h1:jJdxIwYDCcM4w4HAydeHuksPRTirUnyERAloPL0qtic= +git.openprivacy.ca/openprivacy/libricochet-go v1.0.10 h1:yxEqFJH4EdacPwGuOXx+QieYqIPDyzWP50H27EI7fxI= +git.openprivacy.ca/openprivacy/libricochet-go v1.0.10/go.mod h1:jJdxIwYDCcM4w4HAydeHuksPRTirUnyERAloPL0qtic= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI= github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0= github.com/c-bata/go-prompt v0.2.3 h1:jjCS+QhG/sULBhAaBdjb2PlMRVaKXQgn+4yzaauvs2s= @@ -15,21 +14,28 @@ github.com/cretz/bine v0.1.1-0.20200124154328-f9f678b84cca/go.mod h1:6PF6fWAvYtw github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I= +github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/gtank/merlin v0.1.1 h1:eQ90iG7K9pOhtereWsmyRJ6RAwcP4tHTDBHXNg+u5is= github.com/gtank/merlin v0.1.1/go.mod h1:T86dnYJhcGOh5BjZFCJWTDeTK7XW8uE+E21Cy/bIQ+s= github.com/gtank/ristretto255 v0.1.2 h1:JEqUCPA1NvLq5DwYtuzigd7ss8fwbYay9fi4/5uMzcc= github.com/gtank/ristretto255 v0.1.2/go.mod h1:Ph5OpO6c7xKUGROZfWVLiJf9icMDwUeIvY4OmlYW69o= -github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= -github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= -github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= -github.com/mattn/go-tty v0.0.0-20190424173100-523744f04859 h1:smQbSzmT3EHl4EUwtFwFGmGIpiYgIiiPeVv1uguIQEE= -github.com/mattn/go-tty v0.0.0-20190424173100-523744f04859/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= +github.com/mattn/go-isatty v0.0.10 h1:qxFzApOv4WsAL965uUPIsXzAKCZxN2p9UqdhFS4ZW10= +github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= +github.com/mattn/go-runewidth v0.0.6/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-runewidth v0.0.8 h1:3tS41NlGYSmhhe/8fhGRzc+z3AYCw1Fe1WAyLuujKs0= +github.com/mattn/go-runewidth v0.0.8/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/mattn/go-tty v0.0.3 h1:5OfyWorkyO7xP52Mq7tB36ajHDG5OHrmBGIS/DtakQI= +github.com/mattn/go-tty v0.0.3/go.mod h1:ihxohKRERHTVzN+aSVRwACLCeqIoZAWpoICkkvrWyR0= github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643 h1:hLDRPB66XQT/8+wG9WsDpiCvZf1yKO7sz7scAjSlBa0= github.com/mimoo/StrobeGo v0.0.0-20181016162300-f8f6d4d2b643/go.mod h1:43+3pMjjKimDBf5Kr4ZFNGbLql1zKkbImw+fZbw3geM= github.com/pkg/term v0.0.0-20190109203006-aa71e9d9e942 h1:A7GG7zcGjl3jqAqGPmcNjd/D9hzL95SuoOQAaFNdLU0= @@ -39,8 +45,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/struCoder/pidusage v0.1.2 h1:fFPTThlcWFQyizv3xKs5Lyq1lpG5lZ36arEGNhWz2Vs= -github.com/struCoder/pidusage v0.1.2/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI= +github.com/struCoder/pidusage v0.1.3 h1:pZcSa6asBE38TJtW0Nui6GeCjLTpaT/jAnNP7dUTLSQ= +github.com/struCoder/pidusage v0.1.3/go.mod h1:pWBlW3YuSwRl6h7R5KbvA4N8oOqe9LjaKW5CwT1SPjI= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= golang.org/x/crypto v0.0.0-20190128193316-c7b33c32a30b h1:Ib/yptP38nXZFMwqWSip+OKuMP9OkyDe3p+DssP8n9w= @@ -48,19 +54,25 @@ golang.org/x/crypto v0.0.0-20190128193316-c7b33c32a30b/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d h1:9FCpayM9Egr1baVnV1SX0H87m+XB0B8S0hAMi99X/3U= +golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d h1:9FCpayM9Egr1baVnV1SX0H87m+XB0B8S0hAMi99X/3U= +golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY= golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa h1:F+8P+gmewFQYRk6JoLQLwjBCTu3mcIURZfNkVweuRKA= -golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9 h1:1/DFK4b7JH8DmkqhUk48onnSfrPzImPoVxuomtbT2nk= -golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 h1:LfCXLvNmTYH9kEmVgqbnsWfruoXZIrh4YBgqVHtDvw0= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 8470b41..26d8496 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -40,6 +40,7 @@ type CwtchPeer interface { TrustPeer(string) error BlockPeer(string) error UnblockPeer(string) error + ProcessInvite(string, string) (string, error) AcceptInvite(string) error RejectInvite(string) DeleteContact(string) @@ -49,8 +50,11 @@ type CwtchPeer interface { SendMessageToGroup(string, string) error SendMessageToGroupTracked(string, string) (string, error) - GetProfile() *model.Profile - GetPeerState(string) connections.ConnectionState + GetName() string + SetName(string) + GetOnion() string + + GetPeerState(string) (connections.ConnectionState, bool) StartGroup(string) (string, []byte, error) @@ -58,7 +62,7 @@ type CwtchPeer interface { ExportGroup(string) (string, error) GetGroup(string) *model.Group - GetGroupState(string) connections.ConnectionState + GetGroupState(string) (connections.ConnectionState, bool) GetGroups() []string AddContact(nick, onion string, trusted bool) GetContacts() []string @@ -131,6 +135,8 @@ func (cp *cwtchPeer) ImportGroup(exportedInvite string) (err error) { // ExportGroup serializes a group invite so it can be given offline func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) { + cp.mutex.Lock() + defer cp.mutex.Unlock() group := cp.Profile.GetGroup(groupID) if group != nil { invite, err := group.Invite(group.GetInitialMessage()) @@ -149,7 +155,9 @@ func (cp *cwtchPeer) StartGroup(server string) (string, []byte, error) { // StartGroupWithMessage create a new group linked to the given server and returns the group ID, an invite or an error. func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte) (groupID string, invite []byte, err error) { + cp.mutex.Lock() groupID, invite, err = cp.Profile.StartGroupWithMessage(server, initialMessage) + cp.mutex.Unlock() if err == nil { group := cp.GetGroup(groupID) jsobj, err := json.Marshal(group) @@ -166,18 +174,24 @@ func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte) // GetGroups returns an unordered list of all group IDs. func (cp *cwtchPeer) GetGroups() []string { + cp.mutex.Lock() + defer cp.mutex.Unlock() return cp.Profile.GetGroups() } // GetGroup returns a pointer to a specific group, nil if no group exists. func (cp *cwtchPeer) GetGroup(groupID string) *model.Group { + cp.mutex.Lock() + defer cp.mutex.Unlock() return cp.Profile.GetGroup(groupID) } func (cp *cwtchPeer) AddContact(nick, onion string, trusted bool) { decodedPub, _ := base32.StdEncoding.DecodeString(strings.ToUpper(onion)) pp := &model.PublicProfile{Name: nick, Ed25519PublicKey: decodedPub, Trusted: trusted, Blocked: false, Onion: onion, Attributes: map[string]string{"nick": nick}} + cp.mutex.Lock() cp.Profile.AddContact(onion, pp) + cp.mutex.Unlock() pd, _ := json.Marshal(pp) cp.eventBus.Publish(event.NewEvent(event.PeerCreated, map[event.Field]string{ event.Data: string(pd), @@ -187,51 +201,85 @@ func (cp *cwtchPeer) AddContact(nick, onion string, trusted bool) { // GetContacts returns an unordered list of onions func (cp *cwtchPeer) GetContacts() []string { + cp.mutex.Lock() + defer cp.mutex.Unlock() return cp.Profile.GetContacts() } // GetContact returns a given contact, nil is no such contact exists func (cp *cwtchPeer) GetContact(onion string) *model.PublicProfile { + cp.mutex.Lock() + defer cp.mutex.Unlock() contact, _ := cp.Profile.GetContact(onion) return contact } -// GetProfile returns the profile associated with this cwtchPeer. -func (cp *cwtchPeer) GetProfile() *model.Profile { - return cp.Profile +func (cp *cwtchPeer) GetName() string { + cp.mutex.Lock() + defer cp.mutex.Unlock() + return cp.Profile.Name } -func (cp *cwtchPeer) GetPeerState(onion string) connections.ConnectionState { - return connections.ConnectionStateToType[cp.Profile.Contacts[onion].State] +func (cp *cwtchPeer) SetName(newName string) { + cp.mutex.Lock() + defer cp.mutex.Unlock() + cp.Profile.Name = newName } -func (cp *cwtchPeer) GetGroupState(groupid string) connections.ConnectionState { - return connections.ConnectionStateToType[cp.Profile.Groups[groupid].State] +func (cp *cwtchPeer) GetOnion() string { + cp.mutex.Lock() + defer cp.mutex.Unlock() + return cp.Profile.Onion +} +func (cp *cwtchPeer) GetPeerState(onion string) (connections.ConnectionState, bool) { + cp.mutex.Lock() + defer cp.mutex.Unlock() + if peer, ok := cp.Profile.Contacts[onion]; ok { + return connections.ConnectionStateToType[peer.State], true + } + return connections.DISCONNECTED, false +} + +func (cp *cwtchPeer) GetGroupState(groupid string) (connections.ConnectionState, bool) { + cp.mutex.Lock() + defer cp.mutex.Unlock() + if group, ok := cp.Profile.Groups[groupid]; ok { + return connections.ConnectionStateToType[group.State], true + } + return connections.DISCONNECTED, false } // PeerWithOnion is the entry point for cwtchPeer relationships func (cp *cwtchPeer) PeerWithOnion(onion string) { + cp.mutex.Lock() if _, exists := cp.Profile.GetContact(onion); !exists { cp.AddContact(onion, onion, false) } + defer cp.mutex.Unlock() cp.eventBus.Publish(event.NewEvent(event.PeerRequest, map[event.Field]string{event.RemotePeer: onion})) } // DeleteContact deletes a peer from the profile, storage, and handling func (cp *cwtchPeer) DeleteContact(onion string) { + cp.mutex.Lock() cp.Profile.DeleteContact(onion) + defer cp.mutex.Unlock() cp.eventBus.Publish(event.NewEventList(event.DeleteContact, event.RemotePeer, onion)) } // DeleteGroup deletes a Group from the profile, storage, and handling func (cp *cwtchPeer) DeleteGroup(groupID string) { + cp.mutex.Lock() cp.Profile.DeleteGroup(groupID) + defer cp.mutex.Unlock() cp.eventBus.Publish(event.NewEventList(event.DeleteGroup, event.GroupID, groupID)) } // InviteOnionToGroup kicks off the invite process func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error { + cp.mutex.Lock() group := cp.Profile.GetGroup(groupid) + defer cp.mutex.Unlock() if group == nil { return errors.New("invalid group id") } @@ -258,7 +306,10 @@ func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error { // SendMessageToGroup attempts to sent the given message to the given group id. // It returns the signature of the message which can be used to identify it in any UX layer. func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) { + cp.mutex.Lock() group := cp.Profile.GetGroup(groupid) + defer cp.mutex.Unlock() + if group == nil { return "", errors.New("invalid group id") } @@ -275,13 +326,17 @@ func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message}) cp.eventBus.Publish(event) + cp.mutex.Lock() cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID) + cp.mutex.Unlock() return event.EventID } // TrustPeer sets an existing peer relationship to trusted func (cp *cwtchPeer) TrustPeer(peer string) error { + cp.mutex.Lock() + defer cp.mutex.Unlock() err := cp.Profile.TrustPeer(peer) if err == nil { cp.PeerWithOnion(peer) @@ -291,21 +346,34 @@ func (cp *cwtchPeer) TrustPeer(peer string) error { // BlockPeer blocks an existing peer relationship. func (cp *cwtchPeer) BlockPeer(peer string) error { + cp.mutex.Lock() err := cp.Profile.BlockPeer(peer) + cp.mutex.Unlock() cp.eventBus.Publish(event.NewEvent(event.BlockPeer, map[event.Field]string{event.RemotePeer: peer})) return err } // UnblockPeer blocks an existing peer relationship. func (cp *cwtchPeer) UnblockPeer(peer string) error { + cp.mutex.Lock() err := cp.Profile.UnblockPeer(peer) + cp.mutex.Unlock() cp.eventBus.Publish(event.NewEvent(event.UnblockPeer, map[event.Field]string{event.RemotePeer: peer})) return err } +// ProcessInvite adds a new group invite to the profile. returns the new group ID +func (cp *cwtchPeer) ProcessInvite(invite string, remotePeer string) (string, error) { + cp.mutex.Lock() + defer cp.mutex.Unlock() + return cp.Profile.ProcessInvite(invite, remotePeer) +} + // AcceptInvite accepts a given existing group invite func (cp *cwtchPeer) AcceptInvite(groupID string) error { + cp.mutex.Lock() err := cp.Profile.AcceptInvite(groupID) + cp.mutex.Unlock() if err != nil { return err } @@ -317,6 +385,8 @@ func (cp *cwtchPeer) AcceptInvite(groupID string) error { // RejectInvite rejects a given group invite. func (cp *cwtchPeer) RejectInvite(groupID string) { + cp.mutex.Lock() + defer cp.mutex.Unlock() cp.Profile.RejectInvite(groupID) } @@ -339,17 +409,21 @@ func (cp *cwtchPeer) StartGroupConnections() { for _, groupID := range cp.GetGroups() { // Only send a join server packet if we haven't joined this server yet... group := cp.GetGroup(groupID) + cp.mutex.Lock() if joined := joinedServers[groupID]; group.Accepted && !joined { log.Infof("Join Server %v (%v)\n", group.GroupServer, joined) cp.JoinServer(group.GroupServer) joinedServers[group.GroupServer] = true } + cp.mutex.Unlock() } } // SetAttribute sets an attribute for this profile and emits an event func (cp *cwtchPeer) SetAttribute(key string, val string) { + cp.mutex.Lock() cp.Profile.SetAttribute(key, val) + defer cp.mutex.Unlock() cp.eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{ event.Key: key, event.Data: val, @@ -358,11 +432,15 @@ func (cp *cwtchPeer) SetAttribute(key string, val string) { // GetAttribute gets an attribute for the profile func (cp *cwtchPeer) GetAttribute(key string) (string, bool) { + cp.mutex.Lock() + defer cp.mutex.Unlock() return cp.Profile.GetAttribute(key) } // SetContactAttribute sets an attribute for the indicated contact and emits an event func (cp *cwtchPeer) SetContactAttribute(onion string, key string, val string) { + cp.mutex.Lock() + defer cp.mutex.Unlock() if contact, ok := cp.Profile.GetContact(onion); ok { contact.SetAttribute(key, val) cp.eventBus.Publish(event.NewEvent(event.SetPeerAttribute, map[event.Field]string{ @@ -375,6 +453,8 @@ func (cp *cwtchPeer) SetContactAttribute(onion string, key string, val string) { // GetContactAttribute gets an attribute for the indicated contact func (cp *cwtchPeer) GetContactAttribute(onion string, key string) (string, bool) { + cp.mutex.Lock() + defer cp.mutex.Unlock() if contact, ok := cp.Profile.GetContact(onion); ok { return contact.GetAttribute(key) } @@ -383,6 +463,8 @@ func (cp *cwtchPeer) GetContactAttribute(onion string, key string) (string, bool // SetGroupAttribute sets an attribute for the indicated group and emits an event func (cp *cwtchPeer) SetGroupAttribute(gid string, key string, val string) { + cp.mutex.Lock() + defer cp.mutex.Unlock() if group := cp.Profile.GetGroup(gid); group != nil { group.SetAttribute(key, val) cp.eventBus.Publish(event.NewEvent(event.SetGroupAttribute, map[event.Field]string{ @@ -395,6 +477,8 @@ func (cp *cwtchPeer) SetGroupAttribute(gid string, key string, val string) { // GetGroupAttribute gets an attribute for the indicated group func (cp *cwtchPeer) GetGroupAttribute(gid string, key string) (string, bool) { + cp.mutex.Lock() + defer cp.mutex.Unlock() if group := cp.Profile.GetGroup(gid); group != nil { return group.GetAttribute(key) } @@ -403,6 +487,8 @@ func (cp *cwtchPeer) GetGroupAttribute(gid string, key string) (string, bool) { // Shutdown kills all connections and cleans up all goroutines for the peer func (cp *cwtchPeer) Shutdown() { + cp.mutex.Lock() + defer cp.mutex.Unlock() cp.shutdown = true cp.queue.Shutdown() } @@ -416,39 +502,54 @@ func (cp *cwtchPeer) eventHandler() { case event.EncryptedGroupMessage: // If successful, a side effect is the message is added to the group's timeline + cp.mutex.Lock() ok, groupID, message, seen := cp.Profile.AttemptDecryption([]byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature])) + cp.mutex.Unlock() if ok && !seen { cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: string(message.Signature), event.PreviousSignature: string(message.PreviousMessageSig), event.RemotePeer: message.PeerID})) } case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) + cp.mutex.Lock() cp.Profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) + cp.mutex.Unlock() case event.PeerAcknowledgement: + cp.mutex.Lock() cp.Profile.AckSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID]) + cp.mutex.Unlock() case event.SendMessageToGroupError: + cp.mutex.Lock() cp.Profile.AddGroupSentMessageError(ev.Data[event.GroupServer], ev.Data[event.Signature], ev.Data[event.Error]) + cp.mutex.Unlock() case event.SendMessageToPeerError: + cp.mutex.Lock() cp.Profile.ErrorSentMessageToPeer(ev.Data[event.RemotePeer], ev.Data[event.EventID], ev.Data[event.Error]) + cp.mutex.Unlock() /***** Non default but requestable handlable events *****/ case event.NewGroupInvite: + cp.mutex.Lock() cp.Profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer]) + cp.mutex.Unlock() case event.PeerStateChange: + cp.mutex.Lock() if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists { cp.Profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState] } + cp.mutex.Unlock() case event.ServerStateChange: + cp.mutex.Lock() for _, group := range cp.Profile.Groups { if group.GroupServer == ev.Data[event.GroupServer] { group.State = ev.Data[event.ConnectionState] } } - + cp.mutex.Unlock() default: if ev.EventType != "" { log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index b5a3026..5a1c7e1 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -209,6 +209,7 @@ func (e *engine) Shutdown() { // peerWithOnion is the entry point for cwtchPeer relationships // needs to be run in a goroutine as will block on Open. func (e *engine) peerWithOnion(onion string) { + log.Infof("PEER WITH ONION: %v\n", onion) blocked, known := e.blocked.Load(onion) if known && !(blocked.(bool)) { e.ignoreOnShutdown(e.peerConnecting)(onion) diff --git a/protocol/fuzzing/groups/fuzz.go b/protocol/fuzzing/groups/fuzz.go index 08b677f..ab25d82 100644 --- a/protocol/fuzzing/groups/fuzz.go +++ b/protocol/fuzzing/groups/fuzz.go @@ -2,18 +2,15 @@ package groups import ( "crypto/rand" - "cwtch.im/cwtch/event" - "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/model" "golang.org/x/crypto/nacl/secretbox" "io" ) // Fuzz various group related functions func Fuzz(data []byte) int { - peer := peer.NewCwtchPeer("fuzz") - peer.Init(event.NewEventManager()) - - inviteid, err := peer.GetProfile().ProcessInvite(string(data), peer.GetProfile().Onion) + profile := model.GenerateNewProfile("fuzz") + inviteid, err := profile.ProcessInvite(string(data), profile.Onion) if err != nil { if inviteid != "" { @@ -22,15 +19,16 @@ func Fuzz(data []byte) int { return 1 } - id, _, _ := peer.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") + id, _, _ := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") var nonce [24]byte io.ReadFull(rand.Reader, nonce[:]) - encrypted := secretbox.Seal(nonce[:], data, &nonce, &peer.GetGroup(id).GroupKey) - ok, _, _, _ := peer.GetProfile().AttemptDecryption(encrypted, data) + encrypted := secretbox.Seal(nonce[:], data, &nonce, &profile.GetGroup(id).GroupKey) + + ok, _, _, _ := profile.AttemptDecryption(encrypted, data) if ok { panic("this probably shouldn't happen") } - ok = peer.GetProfile().VerifyGroupMessage(string(data), string(data), string(data), 0, encrypted, data) + ok = profile.VerifyGroupMessage(string(data), string(data), string(data), 0, encrypted, data) if ok { panic("this probably shouldn't happen") } diff --git a/server/metrics/metrics_test.go b/server/metrics/metrics_test.go index b200681..4841ecd 100644 --- a/server/metrics/metrics_test.go +++ b/server/metrics/metrics_test.go @@ -21,7 +21,7 @@ func TestCounter(t *testing.T) { } for i := 0; i < max; i++ { - <- done + <-done } val := c.Count() diff --git a/server/metrics/monitors.go b/server/metrics/monitors.go index c63983b..8a1f1bd 100644 --- a/server/metrics/monitors.go +++ b/server/metrics/monitors.go @@ -8,6 +8,7 @@ import ( "github.com/struCoder/pidusage" "os" "path" + "sync" "time" ) @@ -36,8 +37,19 @@ func (mp *Monitors) Start(ra *application.RicochetApplication, configDir string, mp.breakChannel = make(chan bool) mp.MessageCounter = NewCounter() mp.Messages = NewMonitorHistory(Count, Cumulative, func() (c float64) { c = float64(mp.MessageCounter.Count()); mp.MessageCounter.Reset(); return }) - mp.CPU = NewMonitorHistory(Percent, Average, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.CPU) }) - mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 { sysInfo, _ := pidusage.GetStat(os.Getpid()); return float64(sysInfo.Memory) }) + var pidUsageLock sync.Mutex + mp.CPU = NewMonitorHistory(Percent, Average, func() float64 { + pidUsageLock.Lock() + defer pidUsageLock.Unlock() + sysInfo, _ := pidusage.GetStat(os.Getpid()) + return float64(sysInfo.CPU) + }) + mp.Memory = NewMonitorHistory(MegaBytes, Average, func() float64 { + pidUsageLock.Lock() + defer pidUsageLock.Unlock() + sysInfo, _ := pidusage.GetStat(os.Getpid()) + return float64(sysInfo.Memory) + }) mp.ClientConns = NewMonitorHistory(Count, Average, func() float64 { return float64(ra.ConnectionCount()) }) if mp.log { diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 184d095..5264728 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -2,7 +2,6 @@ package testing import ( app2 "cwtch.im/cwtch/app" - "cwtch.im/cwtch/app/plugins" "cwtch.im/cwtch/app/utils" "cwtch.im/cwtch/event" "cwtch.im/cwtch/event/bridge" @@ -15,6 +14,8 @@ import ( "git.openprivacy.ca/openprivacy/libricochet-go/log" "golang.org/x/net/proxy" "os" + "os/user" + "path" "runtime" "runtime/pprof" "testing" @@ -61,45 +62,43 @@ func serverCheck(t *testing.T, serverAddr string) bool { func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) { for { - _, ok := peer.GetProfile().Groups[groupID] + fmt.Printf("%v checking group conection...\n", peer.GetName()) + state, ok := peer.GetGroupState(groupID) if ok { - state := peer.GetGroupState(groupID) - //log.Infof("Waiting for Peer %v to join group %v - state: %v\n", peer.GetProfile().Name, groupID, state) + fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peer.GetName(), groupID, state) if state == connections.FAILED { - t.Fatalf("%v could not connect to %v", peer.GetProfile().Onion, groupID) + t.Fatalf("%v could not connect to %v", peer.GetOnion(), groupID) } if state != connections.SYNCED { - fmt.Printf("peer %v waiting connect to group %v, currently: %v\n", peer.GetProfile().Onion, groupID, connections.ConnectionStateName[state]) + fmt.Printf("peer %v %v waiting connect to group %v, currently: %v\n", peer.GetName(), peer.GetOnion(), groupID, connections.ConnectionStateName[state]) time.Sleep(time.Second * 5) continue } else { + fmt.Printf("peer %v %v CONNECTED to group %v\n", peer.GetName(), peer.GetOnion(), groupID) break } - } // It might take a second for the server to show up as it is now going through the event bus - time.Sleep(time.Second) + } } return } func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.CwtchPeer) { for { - //peers := peera.GetPeers() - _, ok := peera.GetProfile().Contacts[peerb.GetProfile().Onion] + state, ok := peera.GetPeerState(peerb.GetOnion()) if ok { - state := peera.GetPeerState(peerb.GetProfile().Onion) //log.Infof("Waiting for Peer %v to peer with peer: %v - state: %v\n", peera.GetProfile().Name, peerb.GetProfile().Name, state) if state == connections.FAILED { - t.Fatalf("%v could not connect to %v", peera.GetProfile().Onion, peerb.GetProfile().Onion) + t.Fatalf("%v could not connect to %v", peera.GetOnion(), peerb.GetOnion()) } if state != connections.AUTHENTICATED { - fmt.Printf("peer% v waiting connect to peer %v, currently: %v\n", peera.GetProfile().Onion, peerb.GetProfile().Onion, connections.ConnectionStateName[state]) + fmt.Printf("peer %v waiting connect to peer %v, currently: %v\n", peera.GetOnion(), peerb.GetOnion(), connections.ConnectionStateName[state]) time.Sleep(time.Second * 5) continue } else { + fmt.Printf("%v CONNECTED and AUTHED to %v\n", peera.GetName(), peerb.GetName()) break } - } // It might take a second for the peer to show up as it is now going through the event bus - time.Sleep(time.Second) + } } return } @@ -113,6 +112,8 @@ func TestCwtchPeerIntegration(t *testing.T) { log.ExcludeFromPattern("outbound/3dhauthchannel") log.ExcludeFromPattern("event/eventmanager") log.ExcludeFromPattern("pipeBridge") + log.ExcludeFromPattern("tapir") + os.RemoveAll("tor") acn, err := connectivity.StartTor(".", "") if err != nil { t.Fatalf("Could not start Tor: %v", err) @@ -145,8 +146,13 @@ func TestCwtchPeerIntegration(t *testing.T) { app := app2.NewApp(acn, "./storage") - bridgeClient := bridge.NewPipeBridgeClient("./clientPipe", "./servicePipe") - bridgeService := bridge.NewPipeBridgeService("./servicePipe", "./clientPipe") + usr, _ := user.Current() + cwtchDir := path.Join(usr.HomeDir, ".cwtch") + os.Mkdir(cwtchDir, 0700) + os.RemoveAll(path.Join(cwtchDir, "testing")) + os.Mkdir(path.Join(cwtchDir, "testing"), 0700) + bridgeClient := bridge.NewPipeBridgeClient(path.Join(cwtchDir, "testing/clientPipe"), path.Join(cwtchDir, "testing/servicePipe")) + bridgeService := bridge.NewPipeBridgeService(path.Join(cwtchDir, "testing/servicePipe"), path.Join(cwtchDir, "testing/clientPipe")) appClient := app2.NewAppClient("./storage", bridgeClient) appService := app2.NewAppService(acn, "./storage", bridgeService) @@ -164,27 +170,37 @@ func TestCwtchPeerIntegration(t *testing.T) { appClient.CreatePeer("carol", "asdfasdf") alice := utils.WaitGetPeer(app, "alice") - app.AddPeerPlugin(alice.GetProfile().Onion, plugins.NETWORKCHECK) - fmt.Println("Alice created:", alice.GetProfile().Onion) + fmt.Println("Alice created:", alice.GetOnion()) alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite}) bob := utils.WaitGetPeer(app, "bob") - fmt.Println("Bob created:", bob.GetProfile().Onion) + fmt.Println("Bob created:", bob.GetOnion()) bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite}) carol := utils.WaitGetPeer(appClient, "carol") - fmt.Println("Carol created:", carol.GetProfile().Onion) + fmt.Println("Carol created:", carol.GetOnion()) carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite}) app.LaunchPeers() appClient.LaunchPeers() fmt.Println("Waiting for Alice, Bob, and Carol to connect with onion network...") - time.Sleep(time.Second * 90) + time.Sleep(time.Second * 60) numGoRoutinesPostPeerStart := runtime.NumGoroutine() // ***** Peering, server joining, group creation / invite ***** + fmt.Println("Alice joining server...") + alice.JoinServer(serverAddr) + + fmt.Println("Alice peering with Bob...") + alice.AddContact("Bob", bob.GetOnion(), false) // Add contact so we can track connection state + alice.PeerWithOnion(bob.GetOnion()) + + fmt.Println("Alice peering with Carol...") + alice.AddContact("Carol", carol.GetOnion(), false) + alice.PeerWithOnion(carol.GetOnion()) + fmt.Println("Creating group on ", serverAddr, "...") groupID, _, err := alice.StartGroup(serverAddr) fmt.Printf("Created group: %v!\n", groupID) @@ -193,37 +209,19 @@ func TestCwtchPeerIntegration(t *testing.T) { return } - fmt.Println("Alice peering with Bob...") - alice.AddContact("Bob", bob.GetProfile().Onion, false) // Add contact so we can track connection state - alice.PeerWithOnion(bob.GetProfile().Onion) - fmt.Println("Alice peering with Carol...") - alice.AddContact("Carol", carol.GetProfile().Onion, false) - alice.PeerWithOnion(carol.GetProfile().Onion) - - fmt.Println("Alice joining server...") - alice.JoinServer(serverAddr) - fmt.Println("Bob joining server...") - bob.JoinServer(serverAddr) - fmt.Println("Waiting for alice to join server...") waitForPeerGroupConnection(t, alice, groupID) - //fmt.Println("Waiting for bob to join server...") - //waitForPeerGroupConnection(t, bob, groupID) - fmt.Println("Waiting for alice and Bob to peer...") waitForPeerPeerConnection(t, alice, bob) - /*fmt.Println("Waiting for peerings and server joins...") - time.Sleep(time.Second * 240)*/ - fmt.Println("Alice inviting Bob to group...") - err = alice.InviteOnionToGroup(bob.GetProfile().Onion, groupID) + err = alice.InviteOnionToGroup(bob.GetOnion(), groupID) if err != nil { t.Fatalf("Error for Alice inviting Bob to group: %v", err) } - time.Sleep(time.Second * 10) + time.Sleep(time.Second * 5) fmt.Println("Bob examining groups and accepting invites...") for _, groupID := range bob.GetGroups() { @@ -234,15 +232,9 @@ func TestCwtchPeerIntegration(t *testing.T) { bob.AcceptInvite(group.GroupID) } } - time.Sleep(time.Second * 5) numGoRoutinesPostServerConnect := runtime.NumGoroutine() - // Wait for them to join the server - waitForPeerGroupConnection(t, alice, groupID) - waitForPeerGroupConnection(t, bob, groupID) - //numGouRoutinesPostServerConnect := runtime.NumGoroutine() - // ***** Conversation ***** fmt.Println("Starting conversation in group...") @@ -270,7 +262,7 @@ func TestCwtchPeerIntegration(t *testing.T) { time.Sleep(time.Second * 10) fmt.Println("Alice inviting Carol to group...") - err = alice.InviteOnionToGroup(carol.GetProfile().Onion, groupID) + err = alice.InviteOnionToGroup(carol.GetOnion(), groupID) if err != nil { t.Fatalf("Error for Alice inviting Carol to group: %v", err) } @@ -286,7 +278,7 @@ func TestCwtchPeerIntegration(t *testing.T) { } fmt.Println("Shutting down Alice...") - app.ShutdownPeer(alice.GetProfile().Onion) + app.ShutdownPeer(alice.GetOnion()) time.Sleep(time.Second * 5) numGoRoutinesPostAlice := runtime.NumGoroutine() @@ -374,7 +366,7 @@ func TestCwtchPeerIntegration(t *testing.T) { } fmt.Println("Shutting down Bob...") - app.ShutdownPeer(bob.GetProfile().Onion) + app.ShutdownPeer(bob.GetOnion()) time.Sleep(time.Second * 3) numGoRoutinesPostBob := runtime.NumGoroutine() if server != nil { @@ -385,7 +377,7 @@ func TestCwtchPeerIntegration(t *testing.T) { numGoRoutinesPostServerShutdown := runtime.NumGoroutine() fmt.Println("Shutting down Carol...") - appClient.ShutdownPeer(carol.GetProfile().Onion) + appClient.ShutdownPeer(carol.GetOnion()) time.Sleep(time.Second * 3) numGoRoutinesPostCarol := runtime.NumGoroutine()