Browse Source

Groups Cleanup

upgrade
Sarah Jamie Lewis 5 months ago
parent
commit
bea58b5ba4
  1. 2
      app/applets.go
  2. 9
      app/utils/utils.go
  3. 5
      event/eventmanager.go
  4. 31
      model/group.go
  5. 4
      model/message_test.go
  6. 57
      model/profile.go
  7. 16
      model/profile_test.go
  8. 99
      peer/cwtch_peer.go
  9. 5
      protocol/connections/engine.go
  10. 36
      protocol/fuzzing/groups/fuzz.go
  11. 20
      protocol/fuzzing/invites/fuzz.go
  12. 13
      protocol/groups/common.go
  13. 8
      server/app/main.go
  14. 2
      server/server.go
  15. 2
      storage/profile_store_test.go
  16. 4
      storage/v0/profile_store.go
  17. 2
      storage/v0/profile_store_test.go
  18. 6
      storage/v1/profile_store.go
  19. 4
      storage/v1/profile_store_test.go
  20. 47
      testing/cwtch_peer_server_integration_test.go

2
app/applets.go

@ -69,7 +69,7 @@ func (ap *appletPeers) ListPeers() map[string]string {
ap.peerLock.Lock()
defer ap.peerLock.Unlock()
for k, p := range ap.peers {
keys[k] = p.GetName()
keys[k] = p.GetOnion()
}
return keys
}

9
app/utils/utils.go

@ -2,6 +2,7 @@ package utils
import (
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/peer"
"time"
)
@ -12,9 +13,11 @@ import (
// may fill that usecase better
func WaitGetPeer(app app2.Application, name string) peer.CwtchPeer {
for true {
for id, n := range app.ListPeers() {
if n == name {
return app.GetPeer(id)
for id := range app.ListPeers() {
peer := app.GetPeer(id)
localName, _ := peer.GetAttribute(attr.GetLocalScope("name"))
if localName == name {
return peer
}
}
time.Sleep(100 * time.Millisecond)

5
event/eventmanager.go

@ -13,8 +13,6 @@ import (
"sync"
)
// Event is the core struct type passed around between various subsystems. Events consist of a type which can be
// filtered on, an event ID for tracing and a map of Fields to string values.
type Event struct {
@ -60,7 +58,7 @@ type manager struct {
mapMutex sync.Mutex
internal chan bool
closed bool
trace bool
trace bool
}
// Manager is an interface for an event bus
@ -86,7 +84,6 @@ func (em *manager) initialize() {
em.internal = make(chan bool)
em.closed = false
_, em.trace = os.LookupEnv("CWTCH_EVENT_SOURCE")
go em.eventBus()

31
model/group.go

@ -4,6 +4,7 @@ import (
"crypto/rand"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/groups"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
@ -27,9 +28,7 @@ type Group struct {
GroupServer string
Timeline Timeline `json:"-"`
Accepted bool
Owner string
IsCompromised bool
InitialMessage []byte
Attributes map[string]string
lock sync.Mutex
LocalID string
@ -63,7 +62,6 @@ func NewGroup(server string) (*Group, error) {
return nil, err
}
copy(group.GroupKey[:], groupKey[:])
group.Owner = "self"
group.Attributes = make(map[string]string)
// By default we set the "name" of the group to a random string, we can override this later, but to simplify the
// codes around invite, we assume that this is always set.
@ -82,33 +80,24 @@ func (g *Group) Compromised() {
g.IsCompromised = true
}
// GetInitialMessage returns the first message of the group, if one was sent with the invite.
func (g *Group) GetInitialMessage() []byte {
g.lock.Lock()
defer g.lock.Unlock()
return g.InitialMessage
}
// Invite generates a invitation that can be sent to a cwtch peer
func (g *Group) Invite(initialMessage []byte) ([]byte, error) {
func (g *Group) Invite() (string, error) {
if g.SignedGroupID == nil {
return nil, errors.New("group isn't signed")
return "", errors.New("group isn't signed")
}
g.InitialMessage = initialMessage[:]
gci := &groups.GroupInvite{
GroupID: g.GroupID,
GroupName: g.Attributes[attr.GetLocalScope("name")],
SharedKey: g.GroupKey[:],
ServerHost: g.GroupServer,
SignedGroupID: g.SignedGroupID[:],
InitialMessage: initialMessage[:],
GroupID: g.GroupID,
GroupName: g.Attributes[attr.GetLocalScope("name")],
SharedKey: g.GroupKey[:],
ServerHost: g.GroupServer,
SignedGroupID: g.SignedGroupID[:],
}
invite, err := json.Marshal(gci)
return invite, err
serializedInvite := fmt.Sprintf("%v\n", "torv3"+base64.StdEncoding.EncodeToString(invite))
return serializedInvite, err
}
// AddSentMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline

4
model/message_test.go

@ -16,7 +16,7 @@ func TestMessagePadding(t *testing.T) {
gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
sarah.ProcessInvite(string(invite), alice.Onion)
sarah.ProcessInvite(invite)
group := alice.GetGroup(gid)
@ -49,7 +49,7 @@ func TestTranscriptConsistency(t *testing.T) {
gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
sarah.ProcessInvite(string(invite), alice.Onion)
sarah.ProcessInvite(invite)
group := alice.GetGroup(gid)

57
model/profile.go

@ -4,6 +4,7 @@ import (
"crypto/rand"
"cwtch.im/cwtch/protocol/groups"
"encoding/base32"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
@ -333,22 +334,15 @@ func (p *Profile) SignMessage(message string) []byte {
// StartGroup when given a server, creates a new Group under this profile and returns the group id an a precomputed
// invite which can be sent on the wire.
func (p *Profile) StartGroup(server string) (groupID string, invite []byte, err error) {
return p.StartGroupWithMessage(server, []byte{})
}
// StartGroupWithMessage when given a server, and an initial message creates a new Group under this profile and returns the group id an a precomputed
// invite which can be sent on the wire.
func (p *Profile) StartGroupWithMessage(server string, initialMessage []byte) (groupID string, invite []byte, err error) {
func (p *Profile) StartGroup(server string) (groupID string, invite string, err error) {
group, err := NewGroup(server)
if err != nil {
return "", nil, err
return "", "", err
}
groupID = group.GroupID
group.Owner = p.Onion
signedGroupID := p.SignMessage(groupID + server)
group.SignGroup(signedGroupID)
invite, err = group.Invite(initialMessage)
invite, err = group.Invite()
p.lock.Lock()
defer p.lock.Unlock()
p.Groups[group.GroupID] = group
@ -364,33 +358,36 @@ func (p *Profile) GetGroup(groupID string) (g *Group) {
}
// ProcessInvite adds a new group invite to the profile. returns the new group ID
func (p *Profile) ProcessInvite(invite string, peerHostname string) (string, string, error) {
var gci groups.GroupInvite
err := json.Unmarshal([]byte(invite), &gci)
if err == nil {
group := new(Group)
group.Version = CurrentGroupVersion
group.GroupID = gci.GroupID
group.LocalID = GenerateRandomID()
group.SignedGroupID = gci.SignedGroupID
copy(group.GroupKey[:], gci.SharedKey[:])
group.GroupServer = gci.ServerHost
group.InitialMessage = []byte(gci.InitialMessage)
group.Accepted = false
group.Owner = peerHostname
group.Attributes = make(map[string]string)
p.AddGroup(group)
return group.GroupID, gci.GroupName, nil
func (p *Profile) ProcessInvite(invite string) (string, error) {
if strings.HasPrefix(invite, "torv3") {
data, err := base64.StdEncoding.DecodeString(invite[5:])
if err == nil {
var gci groups.GroupInvite
err := json.Unmarshal(data, &gci)
if err == nil {
group := new(Group)
group.Version = CurrentGroupVersion
group.GroupID = gci.GroupID
group.LocalID = GenerateRandomID()
group.SignedGroupID = gci.SignedGroupID
copy(group.GroupKey[:], gci.SharedKey[:])
group.GroupServer = gci.ServerHost
group.Accepted = false
group.Attributes = make(map[string]string)
p.AddGroup(group)
return group.GroupID, nil
}
}
}
return "", "", err
return "", errors.New("unsupported exported group type")
}
// AddGroup is a convenience method for adding a group to a profile.
func (p *Profile) AddGroup(group *Group) {
p.lock.Lock()
defer p.lock.Unlock()
_, exists := p.Groups[group.GroupID]
if !exists {
p.lock.Lock()
defer p.lock.Unlock()
p.Groups[group.GroupID] = group
}
}

16
model/profile_test.go

@ -64,7 +64,7 @@ func TestRejectGroupInvite(t *testing.T) {
alice.AddContact(sarah.Onion, &sarah.PublicProfile)
gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
sarah.ProcessInvite(string(invite), alice.Onion)
sarah.ProcessInvite(invite)
group := alice.GetGroup(gid)
if len(sarah.Groups) == 1 {
if sarah.GetGroup(group.GroupID).Accepted {
@ -85,8 +85,8 @@ func TestProfileGroup(t *testing.T) {
sarah.AddContact(alice.Onion, &alice.PublicProfile)
alice.AddContact(sarah.Onion, &sarah.PublicProfile)
gid, invite, _ := alice.StartGroupWithMessage("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd", []byte("Hello World"))
sarah.ProcessInvite(string(invite), alice.Onion)
gid, invite, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
sarah.ProcessInvite(invite)
if len(sarah.GetGroups()) != 1 {
t.Errorf("sarah should only be in 1 group instead: %v", sarah.GetGroups())
}
@ -97,17 +97,11 @@ func TestProfileGroup(t *testing.T) {
alice.AttemptDecryption(c, s1)
gid2, invite2, _ := alice.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
sarah.ProcessInvite(string(invite2), alice.Onion)
sarah.ProcessInvite(invite2)
group2 := alice.GetGroup(gid2)
c2, s2, _ := sarah.EncryptMessageToGroup("Hello World", group2.GroupID)
alice.AttemptDecryption(c2, s2)
sarahGroup := sarah.GetGroup(group.GroupID)
im := sarahGroup.GetInitialMessage()
if string(im) != "Hello World" {
t.Errorf("Initial Message was not stored properly: %v", im)
}
_, _, err := sarah.EncryptMessageToGroup(string(make([]byte, MaxGroupMessageLength*2)), group2.GroupID)
if err == nil {
t.Errorf("Overly long message should have returned an error")
@ -115,7 +109,7 @@ func TestProfileGroup(t *testing.T) {
bob := GenerateNewProfile("bob")
bob.AddContact(alice.Onion, &alice.PublicProfile)
bob.ProcessInvite(string(invite2), alice.Onion)
bob.ProcessInvite(invite2)
c3, s3, err := bob.EncryptMessageToGroup("Bobs Message", group2.GroupID)
if err == nil {
ok, _, message, _ := alice.AttemptDecryption(c3, s3)

99
peer/cwtch_peer.go

@ -109,8 +109,8 @@ type ReadGroups interface {
// ModifyGroups provides write-only access add/edit/remove new groups
type ModifyGroups interface {
ImportGroup(string) error
StartGroup(string) (string, []byte, error)
ImportGroup(string) (string, error)
StartGroup(string) (string, string, error)
AcceptInvite(string) error
RejectInvite(string)
DeleteGroup(string)
@ -172,14 +172,6 @@ type CwtchPeer interface {
SendMessages
SendMessagesToGroup
// Deprecated
// TODO Should be removed
GetName() string
SetName(string)
// TODO Should not be exposed...
ProcessInvite(string, string) (string, string, error)
}
// NewCwtchPeer creates and returns a new cwtchPeer with the given name.
@ -223,20 +215,16 @@ func (cp *cwtchPeer) AutoHandleEvents(events []event.Type) {
}
// ImportGroup initializes a group from an imported source rather than a peer invite
func (cp *cwtchPeer) ImportGroup(exportedInvite string) (err error) {
if strings.HasPrefix(exportedInvite, "torv3") {
data, err := base64.StdEncoding.DecodeString(exportedInvite[5:])
if err == nil {
cp.eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{
event.GroupInvite: string(data),
event.Imported: "true",
}))
} else {
log.Errorf("error decoding group invite: %v", err)
}
return nil
func (cp *cwtchPeer) ImportGroup(exportedInvite string) (string, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
gid, err := cp.Profile.ProcessInvite(exportedInvite)
if err == nil {
cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.GroupID: gid, event.GroupInvite: exportedInvite}))
}
return errors.New("unsupported exported group type")
return gid, err
}
// ExportGroup serializes a group invite so it can be given offline
@ -245,24 +233,15 @@ func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
defer cp.mutex.Unlock()
group := cp.Profile.GetGroup(groupID)
if group != nil {
invite, err := group.Invite(group.GetInitialMessage())
if err == nil {
exportedInvite := "torv3" + base64.StdEncoding.EncodeToString(invite)
return exportedInvite, err
}
return group.Invite()
}
return "", errors.New("group id could not be found")
}
// StartGroup create a new group linked to the given server and returns the group ID, an invite or an error.
func (cp *cwtchPeer) StartGroup(server string) (string, []byte, error) {
return cp.StartGroupWithMessage(server, []byte{})
}
// StartGroupWithMessage create a new group linked to the given server and returns the group ID, an invite or an error.
func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte) (groupID string, invite []byte, err error) {
func (cp *cwtchPeer) StartGroup(server string) (string, string, error) {
cp.mutex.Lock()
groupID, invite, err = cp.Profile.StartGroupWithMessage(server, initialMessage)
groupID, invite, err := cp.Profile.StartGroup(server)
cp.mutex.Unlock()
if err == nil {
group := cp.GetGroup(groupID)
@ -275,7 +254,7 @@ func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte)
} else {
log.Errorf("error creating group: %v", err)
}
return
return groupID, invite, err
}
// GetGroups returns an unordered list of all group IDs.
@ -400,18 +379,6 @@ func (cp *cwtchPeer) GetContact(onion string) *model.PublicProfile {
return contact
}
func (cp *cwtchPeer) GetName() string {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.Name
}
func (cp *cwtchPeer) SetName(newName string) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
cp.Profile.Name = newName
}
func (cp *cwtchPeer) GetOnion() string {
cp.mutex.Lock()
defer cp.mutex.Unlock()
@ -465,14 +432,13 @@ func (cp *cwtchPeer) DeleteGroup(groupID string) {
func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
cp.mutex.Lock()
group := cp.Profile.GetGroup(groupid)
defer cp.mutex.Unlock()
if group == nil {
return errors.New("invalid group id")
}
invite, err := group.Invite(group.InitialMessage)
invite, err := group.Invite()
cp.mutex.Unlock()
if err == nil {
cp.eventBus.Publish(event.NewEvent(event.InvitePeerToGroup, map[event.Field]string{event.RemotePeer: onion, event.GroupInvite: string(invite)}))
cp.SendMessageToPeer(onion, invite)
}
return err
}
@ -537,13 +503,6 @@ func (cp *cwtchPeer) SetContactAuthorization(peer string, authorization model.Au
return err
}
// ProcessInvite adds a new group invite to the profile. returns the new group ID
func (cp *cwtchPeer) ProcessInvite(invite string, remotePeer string) (string, string, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
return cp.Profile.ProcessInvite(invite, remotePeer)
}
// AcceptInvite accepts a given existing group invite
func (cp *cwtchPeer) AcceptInvite(groupID string) error {
cp.mutex.Lock()
@ -615,6 +574,11 @@ func (cp *cwtchPeer) GetAttribute(key string) (string, bool) {
if val, exists := cp.Profile.GetAttribute(key); exists {
return val, true
}
if key == attr.GetLocalScope("name") {
return cp.Profile.Name, true
}
return "", false
}
@ -772,23 +736,6 @@ func (cp *cwtchPeer) eventHandler() {
cp.SetContactAttribute(onion, attr.GetPeerScope(path), val)
}
}
case event.NewGroupInvite:
cp.mutex.Lock()
group, groupName, err := cp.Profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
if err == nil {
if ev.Data[event.Imported] == "true" {
cp.Profile.GetGroup(group).Accepted = true
cp.mutex.Unlock() // TODO...seriously need a better way of handling these cases
cp.SetGroupAttribute(group, attr.GetLocalScope("name"), groupName)
err = cp.JoinServer(cp.Profile.GetGroup(group).GroupServer)
cp.mutex.Lock()
if err != nil {
log.Errorf("Joining Server should have worked %v", err)
}
}
cp.eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.GroupID: group}))
}
cp.mutex.Unlock()
case event.PeerStateChange:
cp.mutex.Lock()
if _, exists := cp.Profile.Contacts[ev.Data[event.RemotePeer]]; exists {

5
protocol/connections/engine.go

@ -468,10 +468,7 @@ func (e *engine) sendMessageToGroup(server string, ct []byte, sig []byte) error
func (e *engine) handlePeerMessage(hostname string, eventID string, context string, message []byte) {
log.Debugf("New message from peer: %v %v", hostname, context)
if context == event.ContextInvite {
// TODO: Convert this to an inline peer message...
// e.eventManager.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: hostname, event.GroupInvite: string(message)}))
} else if context == event.ContextGetVal {
if context == event.ContextGetVal {
var getVal peerGetVal
err := json.Unmarshal(message, &getVal)
if err == nil {

36
protocol/fuzzing/groups/fuzz.go

@ -1,36 +0,0 @@
package groups
import (
"crypto/rand"
"cwtch.im/cwtch/model"
"golang.org/x/crypto/nacl/secretbox"
"io"
)
// Fuzz various group related functions
func Fuzz(data []byte) int {
profile := model.GenerateNewProfile("fuzz")
inviteid, _, err := profile.ProcessInvite(string(data), profile.Onion)
if err != nil {
if inviteid != "" {
panic("should not have added a group on err")
}
return 1
}
id, _, _ := profile.StartGroup("2c3kmoobnyghj2zw6pwv7d57yzld753auo3ugauezzpvfak3ahc4bdyd")
var nonce [24]byte
io.ReadFull(rand.Reader, nonce[:])
encrypted := secretbox.Seal(nonce[:], data, &nonce, &profile.GetGroup(id).GroupKey)
ok, _, _, _ := profile.AttemptDecryption(encrypted, data)
if ok {
panic("this probably shouldn't happen")
}
ok = profile.VerifyGroupMessage(string(data), string(data), string(data), 0, encrypted, data)
if ok {
panic("this probably shouldn't happen")
}
return 0
}

20
protocol/fuzzing/invites/fuzz.go

@ -1,20 +0,0 @@
package invites
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/peer"
)
// Fuzz import group function
func Fuzz(data []byte) int {
peer := peer.NewCwtchPeer("fuzz")
peer.Init(event.NewEventManager())
err := peer.ImportGroup(string(data))
if err != nil {
if len(peer.GetGroups()) > 0 {
panic("group added despite error")
}
return 0
}
return 1
}

13
protocol/groups/common.go

@ -11,13 +11,12 @@ const CwtchServerSyncedCapability = tapir.Capability("CwtchServerSyncedCapabilit
// GroupInvite provides a structured type for communicating group information to peers
type GroupInvite struct {
GroupID string
GroupName string
SignedGroupID []byte
Timestamp uint64
SharedKey []byte
ServerHost string
InitialMessage []byte
GroupID string
GroupName string
SignedGroupID []byte
Timestamp uint64
SharedKey []byte
ServerHost string
}
// DecryptedGroupMessage is the main encapsulation of group message data

8
server/app/main.go

@ -21,6 +21,7 @@ const (
func main() {
log.AddEverythingFromPattern("server/app/main")
log.AddEverythingFromPattern("server/server")
log.SetLevel(log.LevelDebug)
configDir := os.Getenv("CWTCH_CONFIG_DIR")
if len(os.Args) == 2 && os.Args[1] == "gen1" {
@ -73,14 +74,17 @@ func main() {
// TODO create a random group for testing
group, _ := model.NewGroup(tor.GetTorV3Hostname(serverConfig.PublicKey))
group.SignGroup([]byte{})
invite, err := group.Invite([]byte{})
invite, err := group.Invite()
if err != nil {
panic(err)
}
fmt.Printf("%v\n", "torv3"+base64.StdEncoding.EncodeToString(invite))
fmt.Printf("Invite: %v", invite)
bundle := server.KeyBundle().Serialize()
log.Infof("Server Config: server:%s", base64.StdEncoding.EncodeToString(bundle))
log.Infof("Server Tofu Bundle: tofubundle:server:%s||%s", base64.StdEncoding.EncodeToString(bundle), invite)
server.Run(acn)
for {
time.Sleep(time.Second)

2
server/server.go

@ -39,7 +39,7 @@ type Server struct {
func (s *Server) Setup(serverConfig Config) {
s.config = serverConfig
bs := new(persistence.BoltPersistence)
bs.Open(path.Join(serverConfig.ConfigDir, "tokens.db"))
bs.Open(path.Join(serverConfig.ConfigDir, "tokens1.db"))
s.tokenServer = privacypass.NewTokenServerFromStore(bs)
s.tokenService = s.config.TokenServiceIdentity()
s.tokenServicePrivKey = s.config.TokenServerPrivateKey

2
storage/profile_store_test.go

@ -42,7 +42,7 @@ func TestProfileStoreUpgradeV0toV1(t *testing.T) {
t.Errorf("Creating group invite: %v\n", err)
}
ps1.AddGroup(invite, profile.Onion)
ps1.AddGroup(invite)
fmt.Println("Sending 200 messages...")

4
storage/v0/profile_store.go

@ -53,8 +53,8 @@ func ReadProfile(directory, password string) (*model.Profile, error) {
/********************************************************************************************/
// AddGroup For testing, adds a group to the profile (and starts a stream store)
func (ps *ProfileStoreV0) AddGroup(invite []byte, peer string) {
gid, _, err := ps.profile.ProcessInvite(string(invite), peer)
func (ps *ProfileStoreV0) AddGroup(invite string) {
gid, err := ps.profile.ProcessInvite(invite)
if err == nil {
ps.save()
group := ps.profile.Groups[gid]

2
storage/v0/profile_store_test.go

@ -40,7 +40,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
t.Errorf("Creating group invite: %v\n", err)
}
ps1.AddGroup(invite, profile.Onion)
ps1.AddGroup(invite)
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage)

6
storage/v1/profile_store.go

@ -76,7 +76,7 @@ func (ps *ProfileStoreV1) initProfileWriterStore() {
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue)
ps.eventManager.Subscribe(event.AcceptGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.RejectGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewGroupInvite, ps.queue)
ps.eventManager.Subscribe(event.NewGroup, ps.queue)
ps.eventManager.Subscribe(event.NewMessageFromGroup, ps.queue)
ps.eventManager.Subscribe(event.SendMessageToPeer, ps.queue)
ps.eventManager.Subscribe(event.PeerAcknowledgement, ps.queue)
@ -377,8 +377,8 @@ func (ps *ProfileStoreV1) eventHandler() {
case event.RejectGroupInvite:
ps.profile.RejectInvite(ev.Data[event.GroupID])
ps.save()
case event.NewGroupInvite:
gid, _, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite], ev.Data[event.RemotePeer])
case event.NewGroup:
gid, err := ps.profile.ProcessInvite(ev.Data[event.GroupInvite])
if err == nil {
ps.save()
group := ps.profile.Groups[gid]

4
storage/v1/profile_store_test.go

@ -35,7 +35,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
t.Errorf("Creating group invite: %v\n", err)
}
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
time.Sleep(1 * time.Second)
eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{
@ -89,7 +89,7 @@ func TestProfileStoreChangePassword(t *testing.T) {
t.Errorf("Creating group invite: %v\n", err)
}
eventBus.Publish(event.NewEvent(event.NewGroupInvite, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
eventBus.Publish(event.NewEvent(event.NewGroup, map[event.Field]string{event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: ps1.GetProfileCopy(true).Onion, event.GroupInvite: string(invite)}))
time.Sleep(1 * time.Second)
fmt.Println("Sending 200 messages...")

47
testing/cwtch_peer_server_integration_test.go

@ -23,6 +23,7 @@ import (
"path"
"runtime"
"runtime/pprof"
"strings"
"testing"
"time"
)
@ -66,23 +67,25 @@ func serverCheck(t *testing.T, serverAddr string) bool {
}
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) {
peerName, _ := peer.GetAttribute(attr.GetLocalScope("name"))
for {
fmt.Printf("%v checking group connection...\n", peer.GetName())
fmt.Printf("%v checking group connection...\n", peerName)
state, ok := peer.GetGroupState(groupID)
if ok {
fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peer.GetName(), groupID, state)
fmt.Printf("Waiting for Peer %v to join group %v - state: %v\n", peerName, groupID, state)
if state == connections.FAILED {
t.Fatalf("%v could not connect to %v", peer.GetOnion(), groupID)
}
if state != connections.SYNCED {
fmt.Printf("peer %v %v waiting connect to group %v, currently: %v\n", peer.GetName(), peer.GetOnion(), groupID, connections.ConnectionStateName[state])
fmt.Printf("peer %v %v waiting connect to group %v, currently: %v\n", peerName, peer.GetOnion(), groupID, connections.ConnectionStateName[state])
time.Sleep(time.Second * 5)
continue
} else {
fmt.Printf("peer %v %v CONNECTED to group %v\n", peer.GetName(), peer.GetOnion(), groupID)
fmt.Printf("peer %v %v CONNECTED to group %v\n", peerName, peer.GetOnion(), groupID)
break
}
}
time.Sleep(time.Second * 2)
}
return
}
@ -100,7 +103,9 @@ func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.Cw
time.Sleep(time.Second * 5)
continue
} else {
fmt.Printf("%v CONNECTED and AUTHED to %v\n", peera.GetName(), peerb.GetName())
peerAName, _ := peera.GetAttribute(attr.GetLocalScope("name"))
peerBName, _ := peerb.GetAttribute(attr.GetLocalScope("name"))
fmt.Printf("%v CONNECTED and AUTHED to %v\n", peerAName, peerBName)
break
}
}
@ -300,12 +305,16 @@ func TestCwtchPeerIntegration(t *testing.T) {
time.Sleep(time.Second * 5)
fmt.Println("Bob examining groups and accepting invites...")
for _, groupID := range bob.GetGroups() {
group := bob.GetGroup(groupID)
fmt.Printf("Bob group: %v (Accepted: %v)\n", group.GroupID, group.Accepted)
if group.Accepted == false {
fmt.Printf("Bob received and accepting group invite: %v\n", group.GroupID)
bob.AcceptInvite(group.GroupID)
for _, message := range bob.GetContact(alice.GetOnion()).Timeline.GetMessages() {
fmt.Printf("Found message from Alice: %v", message.Message)
if strings.HasPrefix(message.Message, "torv3") {
gid, err := bob.ImportGroup(message.Message)
if err == nil {
fmt.Printf("Bob found invite...now accepting %v...", gid)
bob.AcceptInvite(gid)
} else {
t.Fatalf("Bob could not accept invite...%v", gid)
}
}
}
@ -347,12 +356,16 @@ func TestCwtchPeerIntegration(t *testing.T) {
}
time.Sleep(time.Second * 60) // Account for some token acquisition in Alice and Bob flows.
fmt.Println("Carol examining groups and accepting invites...")
for _, groupID := range carol.GetGroups() {
group := carol.GetGroup(groupID)
fmt.Printf("Carol group: %v (Accepted: %v)\n", group.GroupID, group.Accepted)
if group.Accepted == false {
fmt.Printf("Carol received and accepting group invite: %v\n", group.GroupID)
carol.AcceptInvite(group.GroupID)
for _, message := range carol.GetContact(alice.GetOnion()).Timeline.GetMessages() {
fmt.Printf("Found message from Alice: %v", message.Message)
if strings.HasPrefix(message.Message, "torv3") {
gid, err := carol.ImportGroup(message.Message)
if err == nil {
fmt.Printf("Carol found invite...now accepting %v...", gid)
carol.AcceptInvite(gid)
} else {
t.Fatalf("Carol could not accept invite...%v", gid)
}
}
}

Loading…
Cancel
Save