diff --git a/model/group.go b/model/group.go index 2242d2d..51b38d3 100644 --- a/model/group.go +++ b/model/group.go @@ -33,18 +33,17 @@ const GroupInvitePrefix = "torv3" // tied to a server under a given group key. Each group has a set of Messages. type Group struct { // GroupID is now derived from the GroupKey and the GroupServer - GroupID string - GroupKey [32]byte - GroupServer string - Timeline Timeline `json:"-"` - Accepted bool - IsCompromised bool - Attributes map[string]string - lock sync.Mutex - LocalID string - State string `json:"-"` - UnacknowledgedMessages []Message - Version int + GroupID string + GroupKey [32]byte + GroupServer string + Timeline Timeline `json:"-"` + Accepted bool + IsCompromised bool + Attributes map[string]string + lock sync.Mutex + LocalID string + State string `json:"-"` + Version int } // NewGroup initializes a new group associated with a given CwtchServer @@ -123,7 +122,7 @@ func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte PreviousMessageSig: message.PreviousMessageSig, ReceivedByServer: false, } - g.UnacknowledgedMessages = append(g.UnacknowledgedMessages, timelineMessage) + g.Timeline.Insert(&timelineMessage) return timelineMessage } @@ -131,35 +130,30 @@ func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte func (g *Group) ErrorSentMessage(sig []byte, error string) bool { g.lock.Lock() defer g.lock.Unlock() - var message *Message - // Delete the message from the unack'd buffer if it exists - for i, unAckedMessage := range g.UnacknowledgedMessages { - if compareSignatures(unAckedMessage.Signature, sig) { - message = &unAckedMessage - g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...) - - message.Error = error - g.Timeline.Insert(message) - return true - } - } - return false + return g.Timeline.SetSendError(sig, error) } -// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline -func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, bool) { - +// GetMessage returns the message at index `index` if it exists. Otherwise returns false. +// This routine also returns the length of the timeline +// If go has an optional type this would return Option... +func (g *Group) GetMessage(index int) (bool, Message, int) { g.lock.Lock() defer g.lock.Unlock() - // Delete the message from the unack'd buffer if it exists - for i, unAckedMessage := range g.UnacknowledgedMessages { - if compareSignatures(unAckedMessage.Signature, sig) { - g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...) - break - } + length := len(g.Timeline.Messages) + + if len(g.Timeline.Messages) > index { + return true, g.Timeline.Messages[index], length } + return false, Message{}, len(g.Timeline.Messages) +} + +// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline +func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, int) { + + g.lock.Lock() + defer g.lock.Unlock() timelineMessage := &Message{ Message: message.Text, @@ -172,16 +166,16 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (* Error: "", Acknowledged: true, } - seen := g.Timeline.Insert(timelineMessage) + index := g.Timeline.Insert(timelineMessage) - return timelineMessage, seen + return timelineMessage, index } // GetTimeline provides a safe copy of the timeline func (g *Group) GetTimeline() (timeline []Message) { g.lock.Lock() defer g.lock.Unlock() - return append(g.Timeline.GetMessages(), g.UnacknowledgedMessages...) + return g.Timeline.GetMessages() } //EncryptMessage takes a message and encrypts the message under the group key. diff --git a/model/group_test.go b/model/group_test.go index 904887f..1de73e7 100644 --- a/model/group_test.go +++ b/model/group_test.go @@ -61,18 +61,17 @@ func TestGroupErr(t *testing.T) { func TestGroupValidation(t *testing.T) { group := &Group{ - GroupID: "", - GroupKey: [32]byte{}, - GroupServer: "", - Timeline: Timeline{}, - Accepted: false, - IsCompromised: false, - Attributes: nil, - lock: sync.Mutex{}, - LocalID: "", - State: "", - UnacknowledgedMessages: nil, - Version: 0, + GroupID: "", + GroupKey: [32]byte{}, + GroupServer: "", + Timeline: Timeline{}, + Accepted: false, + IsCompromised: false, + Attributes: nil, + lock: sync.Mutex{}, + LocalID: "", + State: "", + Version: 0, } invite, _ := group.Invite() diff --git a/model/message.go b/model/message.go index fca564c..566545e 100644 --- a/model/message.go +++ b/model/message.go @@ -17,7 +17,7 @@ type Timeline struct { lock sync.Mutex // a cache to allow quick checks for existing messages... - signatureCache map[string]bool + signatureCache map[string]int // a cache to allowing looking up messages by content hash // we need this for features like reply-to message, and other self @@ -90,12 +90,10 @@ func (t *Timeline) GetCopy() *Timeline { // SetMessages sets the Messages of this timeline. Only to be used in loading/initialization func (t *Timeline) SetMessages(messages []Message) { t.lock.Lock() - defer t.lock.Unlock() t.init() - t.Messages = messages - for idx, message := range t.Messages { - t.signatureCache[base64.StdEncoding.EncodeToString(message.Signature)] = true - t.hashCache[t.calculateHash(message)] = append(t.hashCache[t.calculateHash(message)], idx) + t.lock.Unlock() + for _, m := range messages { + t.Insert(&m) } } @@ -179,7 +177,7 @@ func (t *Timeline) Sort() { } // Insert a message into the timeline in a thread safe way. -func (t *Timeline) Insert(mi *Message) bool { +func (t *Timeline) Insert(mi *Message) int { t.lock.Lock() defer t.lock.Unlock() @@ -188,28 +186,44 @@ func (t *Timeline) Insert(mi *Message) bool { // check that we haven't seen this message before (this has no impact on p2p messages, but is essential for // group messages) - _, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] + idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] if exists { - return true + t.Messages[idx].Acknowledged = true + return idx } // update the message store t.Messages = append(t.Messages, *mi) // add to signature cache for fast checking of group messages... - t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = true + t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = len(t.Messages) - 1 // content based addressing index contentHash := t.calculateHash(*mi) t.hashCache[contentHash] = append(t.hashCache[contentHash], len(t.Messages)-1) - return false + return len(t.Messages) - 1 } func (t *Timeline) init() { // only allow this setting once... if t.signatureCache == nil { - t.signatureCache = make(map[string]bool) + t.signatureCache = make(map[string]int) } if t.hashCache == nil { t.hashCache = make(map[string][]int) } } + +// SetSendError marks a message has having some kind of application specific error. +// Note: The message here is indexed by signature. +func (t *Timeline) SetSendError(sig []byte, e string) bool { + t.lock.Lock() + defer t.lock.Unlock() + + idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(sig)] + if !exists { + return false + } + + t.Messages[idx].Error = e + return true +} diff --git a/model/profile.go b/model/profile.go index c2b918b..03c5dc3 100644 --- a/model/profile.go +++ b/model/profile.go @@ -423,7 +423,7 @@ func (p *Profile) AddGroup(group *Group) { // AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups. // If successful, adds the message to the group's timeline -func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, bool) { +func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, int) { for _, group := range p.Groups { success, dgm := group.DecryptMessage(ciphertext) if success { @@ -434,7 +434,7 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, // Someone send a message that isn't a valid Decrypted Group Message. Since we require this struct in orer // to verify the message, we simply ignore it. if err != nil { - return false, group.GroupID, nil, false + return false, group.GroupID, nil, -1 } // This now requires knowledge of the Sender, the Onion and the Specific Decrypted Group Message (which should only @@ -460,15 +460,15 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, // Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised. if !verified { group.Compromised() - return false, group.GroupID, nil, false + return false, group.GroupID, nil, -1 } - message, seen := group.AddMessage(dgm, signature) - return true, group.GroupID, message, seen + message, index := group.AddMessage(dgm, signature) + return true, group.GroupID, message, index } } // If we couldn't find a group to decrypt the message with we just return false. This is an expected case - return false, "", nil, false + return false, "", nil, -1 } func getRandomness(arr *[]byte) { diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 6bc16f4..d4f8bff 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -56,8 +56,8 @@ type cwtchPeer struct { } func (cp *cwtchPeer) SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, path string) { - event := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, handle, event.Scope, string(scope), event.Path, string(zone.ConstructZonedPath(path))) - cp.eventBus.Publish(event) + ev := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, handle, event.Scope, string(scope), event.Path, string(zone.ConstructZonedPath(path))) + cp.eventBus.Publish(ev) } func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) { @@ -90,20 +90,46 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k // If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then // this function will error func (cp *cwtchPeer) SendMessage(handle string, message string) error { + cp.mutex.Lock() + defer cp.mutex.Unlock() + + var ev event.Event // Group Handles are always 32 bytes in length, but we forgo any further testing here - // and delegate the group existence check to SendMessageToGroupTracked + // and delegate the group existence check to EncryptMessageToGroup if len(handle) == 32 { - _, err := cp.SendMessageToGroupTracked(handle, message) - return err + group := cp.Profile.GetGroup(handle) + if group == nil { + return errors.New("invalid group id") + } + + // Group adds it's own sent message to timeline + ct, sig, err := cp.Profile.EncryptMessageToGroup(message, handle) + + // Group does not exist or some other unrecoverable error... + if err != nil { + return err + } + ev = event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}) } else if tor.IsValidHostname(handle) { // We assume we are sending to a Contact. // (Servers are technically Contacts) - cp.SendMessageToPeer(handle, message) + contact, exists := cp.Profile.GetContact(handle) + ev = event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: handle, event.Data: message}) + // If the contact exists replace the event id wih the index of this message in the contacts timeline... + // Otherwise assume we don't log the message in the timeline... + if exists { + ev.EventID = strconv.Itoa(contact.Timeline.Len()) + cp.Profile.AddSentMessageToContactTimeline(handle, message, time.Now(), ev.EventID) + } + // Regardless we publish the send message to peer event for the protocol engine to execute on... // We assume this is always successful as it is always valid to attempt to // Contact a valid hostname - return nil + } else { + return errors.New("malformed handle type") } - return errors.New("malformed handle type") + + cp.eventBus.Publish(ev) + return nil } func (cp *cwtchPeer) UpdateMessageFlags(handle string, mIdx int, flags uint64) { @@ -200,22 +226,12 @@ type SendMessages interface { SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, key string) - // Deprecated - SendMessageToPeer(string, string) string - // TODO // Deprecated use overlays instead InviteOnionToGroup(string, string) error } -// SendMessagesToGroup enables a caller to sender messages to a group -type SendMessagesToGroup interface { - - // Deprecated - SendMessageToGroupTracked(string, string) (string, error) -} - -// ModifyMessages enables a caller to modify the messages in a timline +// ModifyMessages enables a caller to modify the messages in a timeline type ModifyMessages interface { UpdateMessageFlags(string, int, uint64) } @@ -233,11 +249,14 @@ type CwtchPeer interface { StartServerConnections() Shutdown() - // Relating to local attributes + // Deprecated GetOnion() string + // SetScopedZonedAttribute allows the setting of an attribute by scope and zone // scope.zone.key = value SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string, value string) + + // GetScopedZonedAttribute allows the retrieval of an attribute by scope and zone // scope.zone.key = value GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) @@ -255,7 +274,6 @@ type CwtchPeer interface { SendMessages ModifyMessages - SendMessagesToGroup ShareFile(fileKey string, serializedManifest string) } @@ -280,7 +298,6 @@ func FromProfile(profile *model.Profile) CwtchPeer { func (cp *cwtchPeer) Init(eventBus event.Manager) { cp.InitForEvents(eventBus, DefaultEventsToHandle) - // Upgrade the Cwtch Peer if necessary // It would be nice to do these checks in the storage engine itself, but it is easier to do them here // rather than duplicating the logic to construct/reconstruct attributes in storage engine... @@ -590,7 +607,7 @@ func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error { invite, err := group.Invite() cp.mutex.Unlock() if err == nil { - cp.SendMessageToPeer(onion, invite) + err = cp.SendMessage(onion, invite) } return err } @@ -626,45 +643,9 @@ func (cp *cwtchPeer) ResyncServer(onion string) error { return errors.New("no keys found for server connection") } -// SendMessageToGroupTracked attempts to sent the given message to the given group id. -// It returns the signature of the message which can be used to identify it in any UX layer. -func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) { - cp.mutex.Lock() - defer cp.mutex.Unlock() - group := cp.Profile.GetGroup(groupid) - - if group == nil { - return "", errors.New("invalid group id") - } - ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid) - - if err == nil { - cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: groupid, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})) - } - - return base64.StdEncoding.EncodeToString(sig), err -} - -func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { - cp.mutex.Lock() - defer cp.mutex.Unlock() - event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message}) - contact, exists := cp.Profile.GetContact(onion) - - // If the contact exists replace the event id wih the index of this message in the contacts timeline... - // Otherwise assume we don't log the message in the timeline... - if exists { - event.EventID = strconv.Itoa(contact.Timeline.Len()) - cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID) - } - // Regardless we publish the send message to peer event for the protocol engine to execute on... - cp.eventBus.Publish(event) - return event.EventID -} - func (cp *cwtchPeer) SendGetValToPeer(onion string, scope string, path string) { - event := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, onion, event.Scope, scope, event.Path, path) - cp.eventBus.Publish(event) + ev := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, onion, event.Scope, scope, event.Path, path) + cp.eventBus.Publish(ev) } // BlockPeer blocks an existing peer relationship. @@ -685,9 +666,9 @@ func (cp *cwtchPeer) AcceptInvite(groupID string) error { return err } cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID})) - cp.JoinServer(cp.Profile.Groups[groupID].GroupServer) + err = cp.JoinServer(cp.Profile.Groups[groupID].GroupServer) - return nil + return err } // RejectInvite rejects a given group invite. @@ -698,7 +679,7 @@ func (cp *cwtchPeer) RejectInvite(groupID string) { cp.eventBus.Publish(event.NewEvent(event.RejectGroupInvite, map[event.Field]string{event.GroupID: groupID})) } -// Listen makes the peer open a listening port to accept incoming connections (and be detactably online) +// Listen makes the peer open a listening port to accept incoming connections (and be detectably online) func (cp *cwtchPeer) Listen() { cp.mutex.Lock() defer cp.mutex.Unlock() @@ -724,7 +705,10 @@ func (cp *cwtchPeer) StartPeersConnections() { func (cp *cwtchPeer) StartServerConnections() { for _, contact := range cp.GetContacts() { if cp.GetContact(contact).IsServer() { - cp.JoinServer(contact) + err := cp.JoinServer(contact) + if err == nil { + + } } } } @@ -828,10 +812,10 @@ func (cp *cwtchPeer) eventHandler() { cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature]) cp.mutex.Lock() - ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature) + ok, groupID, message, index := cp.Profile.AttemptDecryption(ciphertext, signature) cp.mutex.Unlock() - if ok && !seen { - cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID})) + if ok && index > -1 { + cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID, event.Index: strconv.Itoa(index)})) } // The group has been compromised @@ -868,7 +852,10 @@ func (cp *cwtchPeer) eventHandler() { case event.RetryServerRequest: // Automated Join Server Request triggered by a plugin. log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer]) - cp.JoinServer(ev.Data[event.GroupServer]) + err := cp.JoinServer(ev.Data[event.GroupServer]) + if err == nil { + log.Debugf("error joining server... %v", err) + } case event.NewGetValMessageFromPeer: onion := ev.Data[event.RemotePeer] scope := ev.Data[event.Scope] @@ -901,7 +888,7 @@ func (cp *cwtchPeer) eventHandler() { } } - /***** Non default but requestable handlable events *****/ + /***** Non default but requestable handleable events *****/ case event.ManifestReceived: log.Debugf("Manifest Received Event!: %v", ev) diff --git a/storage/profile_store_test.go b/storage/profile_store_test.go index fcec0a3..ed71ad8 100644 --- a/storage/profile_store_test.go +++ b/storage/profile_store_test.go @@ -50,7 +50,7 @@ func TestProfileStoreUpgradeV0toV1(t *testing.T) { fmt.Println("Sending 200 messages...") for i := 0; i < 200; i++ { - ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage) + ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage, []byte{byte(i)}) } fmt.Println("Shutdown v0 profile store...") diff --git a/storage/v0/profile_store.go b/storage/v0/profile_store.go index de7d240..f5aeb9d 100644 --- a/storage/v0/profile_store.go +++ b/storage/v0/profile_store.go @@ -63,10 +63,10 @@ func (ps *ProfileStoreV0) AddGroup(invite string) { } // AddGroupMessage for testing, adds a group message -func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string) { +func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string, signature []byte) { received, _ := time.Parse(time.RFC3339Nano, timeRecvied) sent, _ := time.Parse(time.RFC3339Nano, timeSent) - message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: []byte("signature"), PreviousMessageSig: []byte("PreviousSignature")} + message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: signature, PreviousMessageSig: []byte("PreviousSignature")} ss, exists := ps.streamStores[groupid] if exists { ss.Write(message) diff --git a/storage/v0/profile_store_test.go b/storage/v0/profile_store_test.go index 33b1357..cd5db41 100644 --- a/storage/v0/profile_store_test.go +++ b/storage/v0/profile_store_test.go @@ -42,7 +42,7 @@ func TestProfileStoreWriteRead(t *testing.T) { ps1.AddGroup(invite) - ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage) + ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage, []byte{byte(0x01)}) ps1.Shutdown() diff --git a/storage/v1/profile_store_test.go b/storage/v1/profile_store_test.go index 1fb8133..af7526e 100644 --- a/storage/v1/profile_store_test.go +++ b/storage/v1/profile_store_test.go @@ -5,6 +5,7 @@ package v1 import ( "cwtch.im/cwtch/event" "cwtch.im/cwtch/model" + "encoding/base64" "fmt" "log" "os" @@ -107,6 +108,7 @@ func TestProfileStoreChangePassword(t *testing.T) { event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: profile.Onion, event.Data: testMessage, + event.Signature: base64.StdEncoding.EncodeToString([]byte{byte(i)}), })) } @@ -129,6 +131,7 @@ func TestProfileStoreChangePassword(t *testing.T) { event.TimestampReceived: time.Now().Format(time.RFC3339Nano), event.RemotePeer: profile.Onion, event.Data: testMessage, + event.Signature: base64.StdEncoding.EncodeToString([]byte{0x01, byte(i)}), })) } time.Sleep(3 * time.Second) diff --git a/testing/cwtch_peer_server_integration_test.go b/testing/cwtch_peer_server_integration_test.go index 0a7b5f0..ff63524 100644 --- a/testing/cwtch_peer_server_integration_test.go +++ b/testing/cwtch_peer_server_integration_test.go @@ -224,7 +224,9 @@ func TestCwtchPeerIntegration(t *testing.T) { alice.SendScopedZonedGetValToContact(carol.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name) carol.SendScopedZonedGetValToContact(alice.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name) - time.Sleep(10 * time.Second) + // This used to be 10, but increasing it to 30 because this is now causing frequent issues + // Probably related to latency/throughput problems in the underlying tor network. + time.Sleep(30 * time.Second) aliceName, exists := bob.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope(constants.Name)) if !exists || aliceName != "Alice" { @@ -281,25 +283,25 @@ func TestCwtchPeerIntegration(t *testing.T) { fmt.Println("Starting conversation in group...") // Conversation fmt.Printf("%v> %v\n", aliceName, aliceLines[0]) - _, err = alice.SendMessageToGroupTracked(groupID, aliceLines[0]) + err = alice.SendMessage(groupID, aliceLines[0]) if err != nil { t.Fatalf("Alice failed to send a message to the group: %v", err) } time.Sleep(time.Second * 10) fmt.Printf("%v> %v\n", bobName, bobLines[0]) - _, err = bob.SendMessageToGroupTracked(groupID, bobLines[0]) + err = bob.SendMessage(groupID, bobLines[0]) if err != nil { t.Fatalf("Bob failed to send a message to the group: %v", err) } time.Sleep(time.Second * 10) fmt.Printf("%v> %v\n", aliceName, aliceLines[1]) - alice.SendMessageToGroupTracked(groupID, aliceLines[1]) + alice.SendMessage(groupID, aliceLines[1]) time.Sleep(time.Second * 10) fmt.Printf("%v> %v\n", bobName, bobLines[1]) - bob.SendMessageToGroupTracked(groupID, bobLines[1]) + bob.SendMessage(groupID, bobLines[1]) time.Sleep(time.Second * 10) fmt.Println("Alice inviting Carol to group...") @@ -333,12 +335,12 @@ func TestCwtchPeerIntegration(t *testing.T) { numGoRotinesPostCarolConnect := runtime.NumGoroutine() fmt.Printf("%v> %v", bobName, bobLines[2]) - bob.SendMessageToGroupTracked(groupID, bobLines[2]) + bob.SendMessage(groupID, bobLines[2]) // Bob should have enough tokens so we don't need to account for // token acquisition here... fmt.Printf("%v> %v", carolName, carolLines[0]) - carol.SendMessageToGroupTracked(groupID, carolLines[0]) + carol.SendMessage(groupID, carolLines[0]) time.Sleep(time.Second * 30) // we need to account for spam-based token acquisition, but everything should // be warmed-up and delays should be pretty small. @@ -407,7 +409,7 @@ func TestCwtchPeerIntegration(t *testing.T) { carolGroupTimeline := carolsGroup.GetTimeline() if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] || carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] || - carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] { + carolGroupTimeline[4].Message != carolLines[0] || carolGroupTimeline[5].Message != bobLines[2] { t.Errorf("Some of Carol's timeline messages did not have the expected content!") } }