From 15582c7e79f7f0ebc949d784c4923ecb242303d2 Mon Sep 17 00:00:00 2001 From: Dan Ballard Date: Thu, 19 Sep 2019 16:14:35 -0700 Subject: [PATCH] Rework group invite workflow: delete cwtchPacket references as no longer needed. Remove more events from being default handled by Peer (but allow them for some usecases still (testing, simple apps). --- app/cli/main.go | 4 +- event/bridge/pipeBridge.go | 6 +- event/common.go | 1 + model/group.go | 5 +- model/message_test.go | 12 ++-- model/profile.go | 46 ++++++------- model/profile_test.go | 25 ++----- peer/cwtch_peer.go | 66 ++++++++++--------- protocol/connections/engine.go | 8 +-- storage/profile_store.go | 11 ++-- storage/profile_store_test.go | 5 -- testing/cwtch_peer_server_integration_test.go | 4 ++ 12 files changed, 78 insertions(+), 115 deletions(-) diff --git a/app/cli/main.go b/app/cli/main.go index b858a5d..8b6ae5d 100644 --- a/app/cli/main.go +++ b/app/cli/main.go @@ -558,11 +558,11 @@ func main() { } case "/import-group": if len(commands) == 2 { - groupID, err := peer.ImportGroup(commands[1]) + err := peer.ImportGroup(commands[1]) if err != nil { fmt.Printf("Error importing group: %v\n", err) } else { - fmt.Printf("Imported group: %s\n", groupID) + fmt.Printf("Imported group!\n") } } else { fmt.Printf("%v", commands) diff --git a/event/bridge/pipeBridge.go b/event/bridge/pipeBridge.go index bbe8383..fb6ea45 100644 --- a/event/bridge/pipeBridge.go +++ b/event/bridge/pipeBridge.go @@ -37,7 +37,7 @@ type pipeBridge struct { closedChan chan bool state connections.ConnectionState lock sync.Mutex - threeShake func () bool + threeShake func() bool // For logging / debugging purposes name string @@ -138,8 +138,8 @@ func (pb *pipeBridge) synLoop(stop chan bool) { return } delay = time.Second - case <- stop: - return + case <-stop: + return } } } diff --git a/event/common.go b/event/common.go index 9f4c3a4..103968c 100644 --- a/event/common.go +++ b/event/common.go @@ -117,6 +117,7 @@ const ( // Data [eg "open privacy board"] SetGroupAttribute = Type("SetGroupAttribute") + // PeerStateChange servers as a new incoming connection message as well, and can/is consumed by frontends to alert of new p2p connections // RemotePeer // ConnectionState PeerStateChange = Type("PeerStateChange") diff --git a/model/group.go b/model/group.go index 9978582..ed49d8d 100644 --- a/model/group.go +++ b/model/group.go @@ -97,10 +97,7 @@ func (g *Group) Invite(initialMessage []byte) ([]byte, error) { InitialMessage: initialMessage[:], } - cp := &protocol.CwtchPeerPacket{ - GroupChatInvite: gci, - } - invite, err := proto.Marshal(cp) + invite, err := proto.Marshal(gci) return invite, err } diff --git a/model/message_test.go b/model/message_test.go index b18239e..007af59 100644 --- a/model/message_test.go +++ b/model/message_test.go @@ -1,8 +1,6 @@ package model import ( - "cwtch.im/cwtch/protocol" - "github.com/golang/protobuf/proto" "strconv" "testing" "time" @@ -17,9 +15,8 @@ func TestMessagePadding(t *testing.T) { alice.AddContact(sarah.Onion, &sarah.PublicProfile) gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - gci := &protocol.CwtchPeerPacket{} - proto.Unmarshal(invite, gci) - sarah.ProcessInvite(gci.GetGroupChatInvite(), alice.Onion) + + sarah.ProcessInvite(string(invite), alice.Onion) group := alice.GetGroupByGroupID(gid) @@ -51,9 +48,8 @@ func TestTranscriptConsistency(t *testing.T) { alice.AddContact(sarah.Onion, &sarah.PublicProfile) gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - gci := &protocol.CwtchPeerPacket{} - proto.Unmarshal(invite, gci) - sarah.ProcessInvite(gci.GetGroupChatInvite(), alice.Onion) + + sarah.ProcessInvite(string(invite), alice.Onion) group := alice.GetGroupByGroupID(gid) diff --git a/model/profile.go b/model/profile.go index 964e94c..b2e1719 100644 --- a/model/profile.go +++ b/model/profile.go @@ -87,20 +87,6 @@ func GenerateNewProfile(name string) *Profile { return p } -// GetCwtchIdentityPacket returns the wire message for conveying this profiles identity. -func (p *Profile) GetCwtchIdentityPacket() (message []byte) { - ci := &protocol.CwtchIdentity{ - Name: p.Name, - Ed25519PublicKey: p.Ed25519PublicKey, - } - cpp := &protocol.CwtchPeerPacket{ - CwtchIdentify: ci, - } - message, err := proto.Marshal(cpp) - utils.CheckError(err) - return -} - // AddContact allows direct manipulation of cwtch contacts func (p *Profile) AddContact(onion string, profile *PublicProfile) { p.lock.Lock() @@ -317,19 +303,25 @@ func (p *Profile) GetGroupByGroupID(groupID string) (g *Group) { return } -// ProcessInvite adds a new group invite to the profile. -func (p *Profile) ProcessInvite(gci *protocol.GroupChatInvite, peerHostname string) { - group := new(Group) - group.GroupID = gci.GetGroupName() - group.LocalID = generateRandomID() - group.SignedGroupID = gci.GetSignedGroupId() - copy(group.GroupKey[:], gci.GetGroupSharedKey()[:]) - group.GroupServer = gci.GetServerHost() - group.InitialMessage = gci.GetInitialMessage()[:] - group.Accepted = false - group.Owner = peerHostname - group.Attributes = make(map[string]string) - p.AddGroup(group) +// ProcessInvite adds a new group invite to the profile. returns the new group ID +func (p *Profile) ProcessInvite(invite string, peerHostname string) (string, error) { + var gci protocol.GroupChatInvite + err := proto.Unmarshal([]byte(invite), &gci) + if err == nil { + group := new(Group) + group.GroupID = gci.GetGroupName() + group.LocalID = generateRandomID() + group.SignedGroupID = gci.GetSignedGroupId() + copy(group.GroupKey[:], gci.GetGroupSharedKey()[:]) + group.GroupServer = gci.GetServerHost() + group.InitialMessage = gci.GetInitialMessage()[:] + group.Accepted = false + group.Owner = peerHostname + group.Attributes = make(map[string]string) + p.AddGroup(group) + return group.GroupID, nil + } + return "", err } // AddGroup is a convenience method for adding a group to a profile. diff --git a/model/profile_test.go b/model/profile_test.go index 7071b4c..f73488f 100644 --- a/model/profile_test.go +++ b/model/profile_test.go @@ -1,23 +1,14 @@ package model import ( - "cwtch.im/cwtch/protocol" - "github.com/golang/protobuf/proto" "testing" ) + func TestProfileIdentity(t *testing.T) { sarah := GenerateNewProfile("Sarah") alice := GenerateNewProfile("Alice") - message := sarah.GetCwtchIdentityPacket() - - ci := &protocol.CwtchPeerPacket{} - err := proto.Unmarshal(message, ci) - if err != nil { - t.Errorf("alice should have added sarah as a contact %v", err) - } - alice.AddContact(sarah.Onion, &sarah.PublicProfile) if alice.Contacts[sarah.Onion].Name != "Sarah" { t.Errorf("alice should have added sarah as a contact %v", alice.Contacts) @@ -78,9 +69,7 @@ func TestRejectGroupInvite(t *testing.T) { alice.AddContact(sarah.Onion, &sarah.PublicProfile) gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - gci := &protocol.CwtchPeerPacket{} - proto.Unmarshal(invite, gci) - sarah.ProcessInvite(gci.GetGroupChatInvite(), alice.Onion) + sarah.ProcessInvite(string(invite), alice.Onion) group := alice.GetGroupByGroupID(gid) if len(sarah.Groups) == 1 { if sarah.GetGroupByGroupID(group.GroupID).Accepted { @@ -102,9 +91,7 @@ func TestProfileGroup(t *testing.T) { alice.AddContact(sarah.Onion, &sarah.PublicProfile) gid, invite, _ := alice.StartGroupWithMessage("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", []byte("Hello World")) - gci := &protocol.CwtchPeerPacket{} - proto.Unmarshal(invite, gci) - sarah.ProcessInvite(gci.GetGroupChatInvite(), alice.Onion) + sarah.ProcessInvite(string(invite), alice.Onion) if len(sarah.GetGroups()) != 1 { t.Errorf("sarah should only be in 1 group instead: %v", sarah.GetGroups()) } @@ -115,9 +102,7 @@ func TestProfileGroup(t *testing.T) { alice.AttemptDecryption(c, s1) gid2, invite2, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd") - gci2 := &protocol.CwtchPeerPacket{} - proto.Unmarshal(invite2, gci2) - sarah.ProcessInvite(gci2.GetGroupChatInvite(), alice.Onion) + sarah.ProcessInvite(string(invite2), alice.Onion) group2 := alice.GetGroupByGroupID(gid2) c2, s2, _ := sarah.EncryptMessageToGroup("Hello World", group2.GroupID) alice.AttemptDecryption(c2, s2) @@ -135,7 +120,7 @@ func TestProfileGroup(t *testing.T) { bob := GenerateNewProfile("bob") bob.AddContact(alice.Onion, &alice.PublicProfile) - bob.ProcessInvite(gci2.GetGroupChatInvite(), alice.Onion) + bob.ProcessInvite(string(invite2), alice.Onion) c3, s3, err := bob.EncryptMessageToGroup("Bobs Message", group2.GroupID) if err == nil { ok, _, message, _ := alice.AttemptDecryption(c3, s3) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 937aa0e..1930917 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -3,19 +3,19 @@ package peer import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" - "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol/connections" "encoding/base32" "encoding/base64" "encoding/json" "errors" "git.openprivacy.ca/openprivacy/libricochet-go/log" - "github.com/golang/protobuf/proto" "strings" "sync" "time" ) +var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true, event.PeerStateChange: true, event.ServerStateChange: true, event.NewGroupInvite: true} + // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer type cwtchPeer struct { Profile *model.Profile @@ -30,6 +30,7 @@ type cwtchPeer struct { // directly implement a cwtchPeer. type CwtchPeer interface { Init(event.Manager) + AutoHandleEvents(events []event.Type) PeerWithOnion(string) InviteOnionToGroup(string, string) error SendMessageToPeer(string, string) string @@ -51,7 +52,7 @@ type CwtchPeer interface { StartGroup(string) (string, []byte, error) - ImportGroup(string) (string, error) + ImportGroup(string) error ExportGroup(string) (string, error) GetGroup(string) *model.Group @@ -79,6 +80,7 @@ func NewCwtchPeer(name string) CwtchPeer { func FromProfile(profile *model.Profile) CwtchPeer { cp := new(cwtchPeer) cp.Profile = profile + cp.shutdown = false return cp } @@ -88,35 +90,34 @@ func (cp *cwtchPeer) Init(eventBus event.Manager) { go cp.eventHandler() cp.eventBus = eventBus - cp.eventBus.Subscribe(event.EncryptedGroupMessage, cp.queue) - cp.eventBus.Subscribe(event.NewGroupInvite, cp.queue) - cp.eventBus.Subscribe(event.ServerStateChange, cp.queue) - cp.eventBus.Subscribe(event.PeerStateChange, cp.queue) + cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage}) +} + +// AutoHandleEvents sets an event (if able) to be handled by this peer +func (cp *cwtchPeer) AutoHandleEvents(events []event.Type) { + for _, ev := range events { + if _, exists := autoHandleableEvents[ev]; exists { + cp.eventBus.Subscribe(ev, cp.queue) + } else { + log.Errorf("Peer asked to autohandle event it cannot: %v\n", ev) + } + } } // ImportGroup intializes a group from an imported source rather than a peer invite -func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err error) { +func (cp *cwtchPeer) ImportGroup(exportedInvite string) (err error) { if strings.HasPrefix(exportedInvite, "torv3") { - data, err := base64.StdEncoding.DecodeString(exportedInvite[5+44:]) + data, err := base64.StdEncoding.DecodeString(exportedInvite[5:]) if err == nil { - cpp := &protocol.CwtchPeerPacket{} - err = proto.Unmarshal(data, cpp) - if err == nil { - jsobj, err := proto.Marshal(cpp.GetGroupChatInvite()) - if err == nil { - cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{ - event.GroupInvite: string(jsobj), - })) - } else { - log.Errorf("error serializing group: %v", err) - } - return cpp.GroupChatInvite.GetGroupName(), nil - } + cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{ + event.GroupInvite: string(data), + })) + } else { + log.Errorf("error decoding group invite: %v", err) } - } else { - err = errors.New("unsupported exported group type") + return nil } - return + return errors.New("unsupported exported group type") } // ExportGroup serializes a group invite so it can be given offline @@ -125,7 +126,7 @@ func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) { if group != nil { invite, err := group.Invite(group.GetInitialMessage()) if err == nil { - exportedInvite := "torv3" + base64.StdEncoding.EncodeToString(cp.Profile.Ed25519PublicKey) + base64.StdEncoding.EncodeToString(invite) + exportedInvite := "torv3" + base64.StdEncoding.EncodeToString(invite) return exportedInvite, err } } @@ -342,18 +343,18 @@ func (cp *cwtchPeer) eventHandler() { for { ev := cp.queue.Next() switch ev.EventType { + /***** Default auto handled events *****/ + case event.EncryptedGroupMessage: ok, groupID, message, seen := cp.Profile.AttemptDecryption([]byte(ev.Data[event.Ciphertext]), []byte(ev.Data[event.Signature])) if ok && !seen { cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: string(message.Signature), event.PreviousSignature: string(message.PreviousMessageSig), event.RemotePeer: message.PeerID})) } + + /***** Non default but requestable handlable events *****/ + case event.NewGroupInvite: - var groupInvite protocol.GroupChatInvite - err := proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &groupInvite) - if err != nil { - log.Errorf("NewGroupInvite could not json decode invite: %v\n", err) - } - cp.Profile.ProcessInvite(&groupInvite, ev.Data[event.RemotePeer]) + cp.Profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer]) case event.PeerStateChange: if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists { cp.Profile.Contacts[ev.Data[event.RemotePeer]].State = ev.Data[event.ConnectionState] @@ -364,6 +365,7 @@ func (cp *cwtchPeer) eventHandler() { group.State = ev.Data[event.ConnectionState] } } + default: if ev.EventType != "" { log.Errorf("peer event handler received an event it was not subscribed for: %v", ev.EventType) diff --git a/protocol/connections/engine.go b/protocol/connections/engine.go index 7aee63f..d7ff0b3 100644 --- a/protocol/connections/engine.go +++ b/protocol/connections/engine.go @@ -11,7 +11,6 @@ import ( "git.openprivacy.ca/openprivacy/libricochet-go/connectivity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "git.openprivacy.ca/openprivacy/libricochet-go/utils" - "github.com/golang/protobuf/proto" "golang.org/x/crypto/ed25519" "sync" "time" @@ -332,12 +331,7 @@ func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) { func (e *engine) handlePeerMessage(hostname string, context string, message []byte) { log.Debugf("New message from peer: %v %v", hostname, context) if context == event.ContextInvite { - cpp := &protocol.CwtchPeerPacket{} - err := proto.Unmarshal(message, cpp) - if err == nil && cpp.GetGroupChatInvite() != nil { - marshal, _ := proto.Marshal(cpp.GetGroupChatInvite()) - e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(marshal)})) - } + e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(message)})) } else { e.eventManager.Publish(event.NewEvent(event.NewMessageFromPeer, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.Data: string(message)})) } diff --git a/storage/profile_store.go b/storage/profile_store.go index 53a1478..529756c 100644 --- a/storage/profile_store.go +++ b/storage/profile_store.go @@ -3,10 +3,8 @@ package storage import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" - "cwtch.im/cwtch/protocol" "encoding/json" "git.openprivacy.ca/openprivacy/libricochet-go/log" - "github.com/golang/protobuf/proto" "os" "time" ) @@ -212,15 +210,14 @@ func (ps *profileStore) eventHandler() { log.Errorf("error accepting group invite") } case event.NewGroupInvite: - var gci protocol.GroupChatInvite - err := proto.Unmarshal([]byte(ev.Data[event.GroupInvite]), &gci) + gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer]) + log.Errorf("gid: %v err:%v\n", gid, err) if err == nil { - ps.profile.ProcessInvite(&gci, ev.Data[event.RemotePeer]) ps.save() - group := ps.profile.Groups[gci.GetGroupName()] + group := ps.profile.Groups[gid] ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.password) } else { - log.Errorf("error storing new group invite: %v %v", ev, err) + log.Errorf("error storing new group invite: %v (%v)", err, ev) } case event.NewMessageFromGroup: groupid := ev.Data[event.GroupID] diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go index 9e19908..777f04c 100644 --- a/storage/profile_store_test.go +++ b/storage/profile_store_test.go @@ -2,8 +2,6 @@ package storage import ( "cwtch.im/cwtch/event" - "cwtch.im/cwtch/protocol" - "github.com/golang/protobuf/proto" "os" "testing" "time" @@ -32,9 +30,6 @@ func TestProfileStoreWriteRead(t *testing.T) { t.Errorf("Creating group invite: %v\n", err) } - packet := protocol.CwtchPeerPacket{} - proto.Unmarshal(invite, &packet) - invite, _ = proto.Marshal(packet.GetGroupChatInvite()) eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)})) time.Sleep(1 * time.Second) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 117d59e..2f690c6 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -3,6 +3,7 @@ package testing import ( app2 "cwtch.im/cwtch/app" "cwtch.im/cwtch/app/utils" + "cwtch.im/cwtch/event" "cwtch.im/cwtch/event/bridge" "cwtch.im/cwtch/model" "cwtch.im/cwtch/peer" @@ -163,12 +164,15 @@ func TestCwtchPeerIntegration(t *testing.T) { alice := utils.WaitGetPeer(app, "alice") fmt.Println("Alice created:", alice.GetProfile().Onion) + alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite}) bob := utils.WaitGetPeer(app, "bob") fmt.Println("Bob created:", bob.GetProfile().Onion) + bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite}) carol := utils.WaitGetPeer(appClient, "carol") fmt.Println("Carol created:", carol.GetProfile().Onion) + carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite}) app.LaunchPeers() appClient.LaunchPeers()