Group Refactor Part 1 #399
|
@ -43,7 +43,6 @@ type Group struct {
|
|||
lock sync.Mutex
|
||||
LocalID string
|
||||
State string `json:"-"`
|
||||
UnacknowledgedMessages []Message
|
||||
Version int
|
||||
}
|
||||
|
||||
|
@ -123,7 +122,7 @@ func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte
|
|||
PreviousMessageSig: message.PreviousMessageSig,
|
||||
ReceivedByServer: false,
|
||||
}
|
||||
g.UnacknowledgedMessages = append(g.UnacknowledgedMessages, timelineMessage)
|
||||
g.Timeline.Insert(&timelineMessage)
|
||||
return timelineMessage
|
||||
}
|
||||
|
||||
|
@ -131,35 +130,30 @@ func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte
|
|||
func (g *Group) ErrorSentMessage(sig []byte, error string) bool {
|
||||
g.lock.Lock()
|
||||
defer g.lock.Unlock()
|
||||
var message *Message
|
||||
|
||||
// Delete the message from the unack'd buffer if it exists
|
||||
for i, unAckedMessage := range g.UnacknowledgedMessages {
|
||||
if compareSignatures(unAckedMessage.Signature, sig) {
|
||||
message = &unAckedMessage
|
||||
g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...)
|
||||
|
||||
message.Error = error
|
||||
g.Timeline.Insert(message)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
return g.Timeline.SetSendError(sig, error)
|
||||
}
|
||||
|
||||
// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline
|
||||
func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, bool) {
|
||||
|
||||
// GetMessage returns the message at index `index` if it exists. Otherwise returns false.
|
||||
// This routine also returns the length of the timeline
|
||||
// If go has an optional type this would return Option<Message>...
|
||||
func (g *Group) GetMessage(index int) (bool, Message, int) {
|
||||
g.lock.Lock()
|
||||
defer g.lock.Unlock()
|
||||
|
||||
// Delete the message from the unack'd buffer if it exists
|
||||
for i, unAckedMessage := range g.UnacknowledgedMessages {
|
||||
if compareSignatures(unAckedMessage.Signature, sig) {
|
||||
g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...)
|
||||
break
|
||||
}
|
||||
length := len(g.Timeline.Messages)
|
||||
|
||||
if length > index {
|
||||
return true, g.Timeline.Messages[index], length
|
||||
}
|
||||
return false, Message{}, length
|
||||
}
|
||||
|
||||
// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline
|
||||
func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, int) {
|
||||
|
||||
g.lock.Lock()
|
||||
defer g.lock.Unlock()
|
||||
|
||||
timelineMessage := &Message{
|
||||
Message: message.Text,
|
||||
|
@ -172,16 +166,16 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*
|
|||
Error: "",
|
||||
Acknowledged: true,
|
||||
}
|
||||
seen := g.Timeline.Insert(timelineMessage)
|
||||
index := g.Timeline.Insert(timelineMessage)
|
||||
|
||||
return timelineMessage, seen
|
||||
return timelineMessage, index
|
||||
}
|
||||
|
||||
// GetTimeline provides a safe copy of the timeline
|
||||
func (g *Group) GetTimeline() (timeline []Message) {
|
||||
g.lock.Lock()
|
||||
defer g.lock.Unlock()
|
||||
return append(g.Timeline.GetMessages(), g.UnacknowledgedMessages...)
|
||||
return g.Timeline.GetMessages()
|
||||
}
|
||||
|
||||
//EncryptMessage takes a message and encrypts the message under the group key.
|
||||
|
|
|
@ -71,7 +71,6 @@ func TestGroupValidation(t *testing.T) {
|
|||
lock: sync.Mutex{},
|
||||
LocalID: "",
|
||||
State: "",
|
||||
UnacknowledgedMessages: nil,
|
||||
Version: 0,
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ type Timeline struct {
|
|||
lock sync.Mutex
|
||||
|
||||
// a cache to allow quick checks for existing messages...
|
||||
signatureCache map[string]bool
|
||||
signatureCache map[string]int
|
||||
|
||||
// a cache to allowing looking up messages by content hash
|
||||
// we need this for features like reply-to message, and other self
|
||||
|
@ -90,12 +90,10 @@ func (t *Timeline) GetCopy() *Timeline {
|
|||
// SetMessages sets the Messages of this timeline. Only to be used in loading/initialization
|
||||
func (t *Timeline) SetMessages(messages []Message) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
t.init()
|
||||
t.Messages = messages
|
||||
for idx, message := range t.Messages {
|
||||
t.signatureCache[base64.StdEncoding.EncodeToString(message.Signature)] = true
|
||||
t.hashCache[t.calculateHash(message)] = append(t.hashCache[t.calculateHash(message)], idx)
|
||||
t.lock.Unlock()
|
||||
for _, m := range messages {
|
||||
t.Insert(&m)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -179,7 +177,7 @@ func (t *Timeline) Sort() {
|
|||
}
|
||||
|
||||
// Insert a message into the timeline in a thread safe way.
|
||||
func (t *Timeline) Insert(mi *Message) bool {
|
||||
func (t *Timeline) Insert(mi *Message) int {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
|
@ -188,28 +186,44 @@ func (t *Timeline) Insert(mi *Message) bool {
|
|||
|
||||
// check that we haven't seen this message before (this has no impact on p2p messages, but is essential for
|
||||
// group messages)
|
||||
_, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)]
|
||||
idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)]
|
||||
if exists {
|
||||
return true
|
||||
t.Messages[idx].Acknowledged = true
|
||||
return idx
|
||||
}
|
||||
|
||||
// update the message store
|
||||
t.Messages = append(t.Messages, *mi)
|
||||
// add to signature cache for fast checking of group messages...
|
||||
t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = true
|
||||
t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = len(t.Messages) - 1
|
||||
// content based addressing index
|
||||
contentHash := t.calculateHash(*mi)
|
||||
t.hashCache[contentHash] = append(t.hashCache[contentHash], len(t.Messages)-1)
|
||||
return false
|
||||
return len(t.Messages) - 1
|
||||
}
|
||||
|
||||
func (t *Timeline) init() {
|
||||
// only allow this setting once...
|
||||
if t.signatureCache == nil {
|
||||
t.signatureCache = make(map[string]bool)
|
||||
t.signatureCache = make(map[string]int)
|
||||
}
|
||||
|
||||
if t.hashCache == nil {
|
||||
t.hashCache = make(map[string][]int)
|
||||
}
|
||||
}
|
||||
|
||||
// SetSendError marks a message has having some kind of application specific error.
|
||||
// Note: The message here is indexed by signature.
|
||||
func (t *Timeline) SetSendError(sig []byte, e string) bool {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(sig)]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
t.Messages[idx].Error = e
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -423,7 +423,7 @@ func (p *Profile) AddGroup(group *Group) {
|
|||
|
||||
// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups.
|
||||
// If successful, adds the message to the group's timeline
|
||||
func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, bool) {
|
||||
func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, int) {
|
||||
for _, group := range p.Groups {
|
||||
success, dgm := group.DecryptMessage(ciphertext)
|
||||
if success {
|
||||
|
@ -434,7 +434,7 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool,
|
|||
// Someone send a message that isn't a valid Decrypted Group Message. Since we require this struct in orer
|
||||
// to verify the message, we simply ignore it.
|
||||
if err != nil {
|
||||
return false, group.GroupID, nil, false
|
||||
return false, group.GroupID, nil, -1
|
||||
}
|
||||
|
||||
// This now requires knowledge of the Sender, the Onion and the Specific Decrypted Group Message (which should only
|
||||
|
@ -460,15 +460,15 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool,
|
|||
// Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised.
|
||||
if !verified {
|
||||
group.Compromised()
|
||||
return false, group.GroupID, nil, false
|
||||
return false, group.GroupID, nil, -1
|
||||
}
|
||||
message, seen := group.AddMessage(dgm, signature)
|
||||
return true, group.GroupID, message, seen
|
||||
message, index := group.AddMessage(dgm, signature)
|
||||
return true, group.GroupID, message, index
|
||||
}
|
||||
}
|
||||
|
||||
// If we couldn't find a group to decrypt the message with we just return false. This is an expected case
|
||||
return false, "", nil, false
|
||||
return false, "", nil, -1
|
||||
}
|
||||
|
||||
func getRandomness(arr *[]byte) {
|
||||
|
|
|
@ -56,8 +56,8 @@ type cwtchPeer struct {
|
|||
}
|
||||
|
||||
func (cp *cwtchPeer) SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, path string) {
|
||||
event := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, handle, event.Scope, string(scope), event.Path, string(zone.ConstructZonedPath(path)))
|
||||
cp.eventBus.Publish(event)
|
||||
ev := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, handle, event.Scope, string(scope), event.Path, string(zone.ConstructZonedPath(path)))
|
||||
cp.eventBus.Publish(ev)
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) {
|
||||
|
@ -90,20 +90,46 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
|
|||
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
|
||||
// this function will error
|
||||
func (cp *cwtchPeer) SendMessage(handle string, message string) error {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
||||
var ev event.Event
|
||||
// Group Handles are always 32 bytes in length, but we forgo any further testing here
|
||||
// and delegate the group existence check to SendMessageToGroupTracked
|
||||
// and delegate the group existence check to EncryptMessageToGroup
|
||||
if len(handle) == 32 {
|
||||
_, err := cp.SendMessageToGroupTracked(handle, message)
|
||||
group := cp.Profile.GetGroup(handle)
|
||||
if group == nil {
|
||||
return errors.New("invalid group id")
|
||||
}
|
||||
|
||||
// Group adds it's own sent message to timeline
|
||||
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, handle)
|
||||
|
||||
// Group does not exist or some other unrecoverable error...
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ev = event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
|
||||
} else if tor.IsValidHostname(handle) {
|
||||
// We assume we are sending to a Contact.
|
||||
// (Servers are technically Contacts)
|
||||
cp.SendMessageToPeer(handle, message)
|
||||
contact, exists := cp.Profile.GetContact(handle)
|
||||
ev = event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: handle, event.Data: message})
|
||||
// If the contact exists replace the event id wih the index of this message in the contacts timeline...
|
||||
// Otherwise assume we don't log the message in the timeline...
|
||||
if exists {
|
||||
ev.EventID = strconv.Itoa(contact.Timeline.Len())
|
||||
cp.Profile.AddSentMessageToContactTimeline(handle, message, time.Now(), ev.EventID)
|
||||
}
|
||||
// Regardless we publish the send message to peer event for the protocol engine to execute on...
|
||||
// We assume this is always successful as it is always valid to attempt to
|
||||
// Contact a valid hostname
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
return errors.New("malformed handle type")
|
||||
}
|
||||
|
||||
cp.eventBus.Publish(ev)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) UpdateMessageFlags(handle string, mIdx int, flags uint64) {
|
||||
|
@ -200,22 +226,12 @@ type SendMessages interface {
|
|||
|
||||
SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, key string)
|
||||
|
||||
// Deprecated
|
||||
SendMessageToPeer(string, string) string
|
||||
|
||||
// TODO
|
||||
// Deprecated use overlays instead
|
||||
InviteOnionToGroup(string, string) error
|
||||
}
|
||||
|
||||
// SendMessagesToGroup enables a caller to sender messages to a group
|
||||
type SendMessagesToGroup interface {
|
||||
|
||||
// Deprecated
|
||||
SendMessageToGroupTracked(string, string) (string, error)
|
||||
}
|
||||
|
||||
// ModifyMessages enables a caller to modify the messages in a timline
|
||||
// ModifyMessages enables a caller to modify the messages in a timeline
|
||||
type ModifyMessages interface {
|
||||
UpdateMessageFlags(string, int, uint64)
|
||||
}
|
||||
|
@ -233,11 +249,17 @@ type CwtchPeer interface {
|
|||
StartServerConnections()
|
||||
Shutdown()
|
||||
|
||||
// Relating to local attributes
|
||||
// GetOnion is deprecated. If you find yourself needing to rely on this method it is time
|
||||
// to consider replacing this with a GetAddress(es) function that can fully expand cwtch beyond the boundaries
|
||||
// of tor v3 onion services.
|
||||
// Deprecated
|
||||
GetOnion() string
|
||||
|
||||
// SetScopedZonedAttribute allows the setting of an attribute by scope and zone
|
||||
// scope.zone.key = value
|
||||
SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string, value string)
|
||||
|
||||
// GetScopedZonedAttribute allows the retrieval of an attribute by scope and zone
|
||||
// scope.zone.key = value
|
||||
GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool)
|
||||
|
||||
|
@ -255,7 +277,6 @@ type CwtchPeer interface {
|
|||
|
||||
SendMessages
|
||||
ModifyMessages
|
||||
SendMessagesToGroup
|
||||
|
||||
ShareFile(fileKey string, serializedManifest string)
|
||||
}
|
||||
|
@ -280,7 +301,6 @@ func FromProfile(profile *model.Profile) CwtchPeer {
|
|||
func (cp *cwtchPeer) Init(eventBus event.Manager) {
|
||||
cp.InitForEvents(eventBus, DefaultEventsToHandle)
|
||||
|
||||
|
||||
// Upgrade the Cwtch Peer if necessary
|
||||
// It would be nice to do these checks in the storage engine itself, but it is easier to do them here
|
||||
// rather than duplicating the logic to construct/reconstruct attributes in storage engine...
|
||||
|
@ -590,7 +610,7 @@ func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
|
|||
invite, err := group.Invite()
|
||||
cp.mutex.Unlock()
|
||||
if err == nil {
|
||||
cp.SendMessageToPeer(onion, invite)
|
||||
err = cp.SendMessage(onion, invite)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -626,45 +646,9 @@ func (cp *cwtchPeer) ResyncServer(onion string) error {
|
|||
return errors.New("no keys found for server connection")
|
||||
}
|
||||
|
||||
// SendMessageToGroupTracked attempts to sent the given message to the given group id.
|
||||
// It returns the signature of the message which can be used to identify it in any UX layer.
|
||||
func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
group := cp.Profile.GetGroup(groupid)
|
||||
|
||||
if group == nil {
|
||||
return "", errors.New("invalid group id")
|
||||
}
|
||||
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
|
||||
|
||||
if err == nil {
|
||||
cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: groupid, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
|
||||
}
|
||||
|
||||
return base64.StdEncoding.EncodeToString(sig), err
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message})
|
||||
contact, exists := cp.Profile.GetContact(onion)
|
||||
|
||||
// If the contact exists replace the event id wih the index of this message in the contacts timeline...
|
||||
// Otherwise assume we don't log the message in the timeline...
|
||||
if exists {
|
||||
event.EventID = strconv.Itoa(contact.Timeline.Len())
|
||||
cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID)
|
||||
}
|
||||
// Regardless we publish the send message to peer event for the protocol engine to execute on...
|
||||
cp.eventBus.Publish(event)
|
||||
return event.EventID
|
||||
}
|
||||
|
||||
func (cp *cwtchPeer) SendGetValToPeer(onion string, scope string, path string) {
|
||||
event := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, onion, event.Scope, scope, event.Path, path)
|
||||
cp.eventBus.Publish(event)
|
||||
ev := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, onion, event.Scope, scope, event.Path, path)
|
||||
cp.eventBus.Publish(ev)
|
||||
}
|
||||
|
||||
// BlockPeer blocks an existing peer relationship.
|
||||
|
@ -685,9 +669,9 @@ func (cp *cwtchPeer) AcceptInvite(groupID string) error {
|
|||
return err
|
||||
}
|
||||
cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID}))
|
||||
cp.JoinServer(cp.Profile.Groups[groupID].GroupServer)
|
||||
err = cp.JoinServer(cp.Profile.Groups[groupID].GroupServer)
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// RejectInvite rejects a given group invite.
|
||||
|
@ -698,7 +682,7 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
|
|||
cp.eventBus.Publish(event.NewEvent(event.RejectGroupInvite, map[event.Field]string{event.GroupID: groupID}))
|
||||
}
|
||||
|
||||
// Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
|
||||
// Listen makes the peer open a listening port to accept incoming connections (and be detectably online)
|
||||
func (cp *cwtchPeer) Listen() {
|
||||
cp.mutex.Lock()
|
||||
defer cp.mutex.Unlock()
|
||||
|
@ -724,7 +708,11 @@ func (cp *cwtchPeer) StartPeersConnections() {
|
|||
func (cp *cwtchPeer) StartServerConnections() {
|
||||
for _, contact := range cp.GetContacts() {
|
||||
if cp.GetContact(contact).IsServer() {
|
||||
cp.JoinServer(contact)
|
||||
err := cp.JoinServer(contact)
|
||||
if err != nil {
|
||||
// Almost certainly a programming error so print it..
|
||||
log.Errorf("error joining server %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -828,10 +816,10 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
|
||||
|
||||
cp.mutex.Lock()
|
||||
ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature)
|
||||
ok, groupID, message, index := cp.Profile.AttemptDecryption(ciphertext, signature)
|
||||
cp.mutex.Unlock()
|
||||
if ok && !seen {
|
||||
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
|
||||
if ok && index > -1 {
|
||||
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID, event.Index: strconv.Itoa(index)}))
|
||||
}
|
||||
|
||||
// The group has been compromised
|
||||
|
@ -868,7 +856,10 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
case event.RetryServerRequest:
|
||||
// Automated Join Server Request triggered by a plugin.
|
||||
log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer])
|
||||
cp.JoinServer(ev.Data[event.GroupServer])
|
||||
err := cp.JoinServer(ev.Data[event.GroupServer])
|
||||
if err != nil {
|
||||
log.Errorf("error joining server... %v", err)
|
||||
}
|
||||
case event.NewGetValMessageFromPeer:
|
||||
onion := ev.Data[event.RemotePeer]
|
||||
scope := ev.Data[event.Scope]
|
||||
|
@ -901,7 +892,7 @@ func (cp *cwtchPeer) eventHandler() {
|
|||
}
|
||||
}
|
||||
|
||||
/***** Non default but requestable handlable events *****/
|
||||
/***** Non default but requestable handleable events *****/
|
||||
|
||||
case event.ManifestReceived:
|
||||
log.Debugf("Manifest Received Event!: %v", ev)
|
||||
|
|
|
@ -50,7 +50,7 @@ func TestProfileStoreUpgradeV0toV1(t *testing.T) {
|
|||
fmt.Println("Sending 200 messages...")
|
||||
|
||||
for i := 0; i < 200; i++ {
|
||||
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage)
|
||||
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage, []byte{byte(i)})
|
||||
}
|
||||
|
||||
fmt.Println("Shutdown v0 profile store...")
|
||||
|
|
|
@ -63,10 +63,10 @@ func (ps *ProfileStoreV0) AddGroup(invite string) {
|
|||
}
|
||||
|
||||
// AddGroupMessage for testing, adds a group message
|
||||
func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string) {
|
||||
func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string, signature []byte) {
|
||||
received, _ := time.Parse(time.RFC3339Nano, timeRecvied)
|
||||
sent, _ := time.Parse(time.RFC3339Nano, timeSent)
|
||||
message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: []byte("signature"), PreviousMessageSig: []byte("PreviousSignature")}
|
||||
message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: signature, PreviousMessageSig: []byte("PreviousSignature")}
|
||||
ss, exists := ps.streamStores[groupid]
|
||||
if exists {
|
||||
ss.Write(message)
|
||||
|
|
|
@ -42,7 +42,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
|
|||
|
||||
ps1.AddGroup(invite)
|
||||
|
||||
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage)
|
||||
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage, []byte{byte(0x01)})
|
||||
|
||||
ps1.Shutdown()
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ package v1
|
|||
import (
|
||||
"cwtch.im/cwtch/event"
|
||||
"cwtch.im/cwtch/model"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
@ -107,6 +108,7 @@ func TestProfileStoreChangePassword(t *testing.T) {
|
|||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||
event.RemotePeer: profile.Onion,
|
||||
event.Data: testMessage,
|
||||
event.Signature: base64.StdEncoding.EncodeToString([]byte{byte(i)}),
|
||||
}))
|
||||
}
|
||||
|
||||
|
@ -129,6 +131,7 @@ func TestProfileStoreChangePassword(t *testing.T) {
|
|||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||
event.RemotePeer: profile.Onion,
|
||||
event.Data: testMessage,
|
||||
event.Signature: base64.StdEncoding.EncodeToString([]byte{0x01, byte(i)}),
|
||||
}))
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
|
|
|
@ -224,7 +224,9 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
alice.SendScopedZonedGetValToContact(carol.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
|
||||
carol.SendScopedZonedGetValToContact(alice.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
|
||||
|
||||
time.Sleep(10 * time.Second)
|
||||
// This used to be 10, but increasing it to 30 because this is now causing frequent issues
|
||||
// Probably related to latency/throughput problems in the underlying tor network.
|
||||
time.Sleep(30 * time.Second)
|
||||
|
||||
aliceName, exists := bob.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope(constants.Name))
|
||||
if !exists || aliceName != "Alice" {
|
||||
|
@ -281,25 +283,25 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
fmt.Println("Starting conversation in group...")
|
||||
// Conversation
|
||||
fmt.Printf("%v> %v\n", aliceName, aliceLines[0])
|
||||
_, err = alice.SendMessageToGroupTracked(groupID, aliceLines[0])
|
||||
err = alice.SendMessage(groupID, aliceLines[0])
|
||||
if err != nil {
|
||||
t.Fatalf("Alice failed to send a message to the group: %v", err)
|
||||
}
|
||||
time.Sleep(time.Second * 10)
|
||||
|
||||
fmt.Printf("%v> %v\n", bobName, bobLines[0])
|
||||
_, err = bob.SendMessageToGroupTracked(groupID, bobLines[0])
|
||||
err = bob.SendMessage(groupID, bobLines[0])
|
||||
if err != nil {
|
||||
t.Fatalf("Bob failed to send a message to the group: %v", err)
|
||||
}
|
||||
time.Sleep(time.Second * 10)
|
||||
|
||||
fmt.Printf("%v> %v\n", aliceName, aliceLines[1])
|
||||
alice.SendMessageToGroupTracked(groupID, aliceLines[1])
|
||||
alice.SendMessage(groupID, aliceLines[1])
|
||||
time.Sleep(time.Second * 10)
|
||||
|
||||
fmt.Printf("%v> %v\n", bobName, bobLines[1])
|
||||
bob.SendMessageToGroupTracked(groupID, bobLines[1])
|
||||
bob.SendMessage(groupID, bobLines[1])
|
||||
time.Sleep(time.Second * 10)
|
||||
|
||||
fmt.Println("Alice inviting Carol to group...")
|
||||
|
@ -333,12 +335,12 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
numGoRotinesPostCarolConnect := runtime.NumGoroutine()
|
||||
|
||||
fmt.Printf("%v> %v", bobName, bobLines[2])
|
||||
bob.SendMessageToGroupTracked(groupID, bobLines[2])
|
||||
bob.SendMessage(groupID, bobLines[2])
|
||||
// Bob should have enough tokens so we don't need to account for
|
||||
// token acquisition here...
|
||||
|
||||
fmt.Printf("%v> %v", carolName, carolLines[0])
|
||||
carol.SendMessageToGroupTracked(groupID, carolLines[0])
|
||||
carol.SendMessage(groupID, carolLines[0])
|
||||
time.Sleep(time.Second * 30) // we need to account for spam-based token acquisition, but everything should
|
||||
// be warmed-up and delays should be pretty small.
|
||||
|
||||
|
@ -407,7 +409,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
carolGroupTimeline := carolsGroup.GetTimeline()
|
||||
if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
|
||||
carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] ||
|
||||
carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] {
|
||||
carolGroupTimeline[4].Message != carolLines[0] || carolGroupTimeline[5].Message != bobLines[2] {
|
||||
t.Errorf("Some of Carol's timeline messages did not have the expected content!")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue