Group Refactor Part 1
Remove SendMessage* calls in place of a unified interface Remove Unack*Messages from Group and Store everything in the timeline
This commit is contained in:
parent
3cc839cd45
commit
3d2cafd1de
|
@ -33,18 +33,17 @@ const GroupInvitePrefix = "torv3"
|
||||||
// tied to a server under a given group key. Each group has a set of Messages.
|
// tied to a server under a given group key. Each group has a set of Messages.
|
||||||
type Group struct {
|
type Group struct {
|
||||||
// GroupID is now derived from the GroupKey and the GroupServer
|
// GroupID is now derived from the GroupKey and the GroupServer
|
||||||
GroupID string
|
GroupID string
|
||||||
GroupKey [32]byte
|
GroupKey [32]byte
|
||||||
GroupServer string
|
GroupServer string
|
||||||
Timeline Timeline `json:"-"`
|
Timeline Timeline `json:"-"`
|
||||||
Accepted bool
|
Accepted bool
|
||||||
IsCompromised bool
|
IsCompromised bool
|
||||||
Attributes map[string]string
|
Attributes map[string]string
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
LocalID string
|
LocalID string
|
||||||
State string `json:"-"`
|
State string `json:"-"`
|
||||||
UnacknowledgedMessages []Message
|
Version int
|
||||||
Version int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGroup initializes a new group associated with a given CwtchServer
|
// NewGroup initializes a new group associated with a given CwtchServer
|
||||||
|
@ -123,7 +122,7 @@ func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte
|
||||||
PreviousMessageSig: message.PreviousMessageSig,
|
PreviousMessageSig: message.PreviousMessageSig,
|
||||||
ReceivedByServer: false,
|
ReceivedByServer: false,
|
||||||
}
|
}
|
||||||
g.UnacknowledgedMessages = append(g.UnacknowledgedMessages, timelineMessage)
|
g.Timeline.Insert(&timelineMessage)
|
||||||
return timelineMessage
|
return timelineMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,35 +130,30 @@ func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte
|
||||||
func (g *Group) ErrorSentMessage(sig []byte, error string) bool {
|
func (g *Group) ErrorSentMessage(sig []byte, error string) bool {
|
||||||
g.lock.Lock()
|
g.lock.Lock()
|
||||||
defer g.lock.Unlock()
|
defer g.lock.Unlock()
|
||||||
var message *Message
|
|
||||||
|
|
||||||
// Delete the message from the unack'd buffer if it exists
|
return g.Timeline.SetSendError(sig, error)
|
||||||
for i, unAckedMessage := range g.UnacknowledgedMessages {
|
|
||||||
if compareSignatures(unAckedMessage.Signature, sig) {
|
|
||||||
message = &unAckedMessage
|
|
||||||
g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...)
|
|
||||||
|
|
||||||
message.Error = error
|
|
||||||
g.Timeline.Insert(message)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline
|
// GetMessage returns the message at index `index` if it exists. Otherwise returns false.
|
||||||
func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, bool) {
|
// This routine also returns the length of the timeline
|
||||||
|
// If go has an optional type this would return Option<Message>...
|
||||||
|
func (g *Group) GetMessage(index int) (bool, Message, int) {
|
||||||
g.lock.Lock()
|
g.lock.Lock()
|
||||||
defer g.lock.Unlock()
|
defer g.lock.Unlock()
|
||||||
|
|
||||||
// Delete the message from the unack'd buffer if it exists
|
length := len(g.Timeline.Messages)
|
||||||
for i, unAckedMessage := range g.UnacknowledgedMessages {
|
|
||||||
if compareSignatures(unAckedMessage.Signature, sig) {
|
if len(g.Timeline.Messages) > index {
|
||||||
g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...)
|
return true, g.Timeline.Messages[index], length
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return false, Message{}, len(g.Timeline.Messages)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline
|
||||||
|
func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, int) {
|
||||||
|
|
||||||
|
g.lock.Lock()
|
||||||
|
defer g.lock.Unlock()
|
||||||
|
|
||||||
timelineMessage := &Message{
|
timelineMessage := &Message{
|
||||||
Message: message.Text,
|
Message: message.Text,
|
||||||
|
@ -172,16 +166,16 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*
|
||||||
Error: "",
|
Error: "",
|
||||||
Acknowledged: true,
|
Acknowledged: true,
|
||||||
}
|
}
|
||||||
seen := g.Timeline.Insert(timelineMessage)
|
index := g.Timeline.Insert(timelineMessage)
|
||||||
|
|
||||||
return timelineMessage, seen
|
return timelineMessage, index
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetTimeline provides a safe copy of the timeline
|
// GetTimeline provides a safe copy of the timeline
|
||||||
func (g *Group) GetTimeline() (timeline []Message) {
|
func (g *Group) GetTimeline() (timeline []Message) {
|
||||||
g.lock.Lock()
|
g.lock.Lock()
|
||||||
defer g.lock.Unlock()
|
defer g.lock.Unlock()
|
||||||
return append(g.Timeline.GetMessages(), g.UnacknowledgedMessages...)
|
return g.Timeline.GetMessages()
|
||||||
}
|
}
|
||||||
|
|
||||||
//EncryptMessage takes a message and encrypts the message under the group key.
|
//EncryptMessage takes a message and encrypts the message under the group key.
|
||||||
|
|
|
@ -61,18 +61,17 @@ func TestGroupErr(t *testing.T) {
|
||||||
func TestGroupValidation(t *testing.T) {
|
func TestGroupValidation(t *testing.T) {
|
||||||
|
|
||||||
group := &Group{
|
group := &Group{
|
||||||
GroupID: "",
|
GroupID: "",
|
||||||
GroupKey: [32]byte{},
|
GroupKey: [32]byte{},
|
||||||
GroupServer: "",
|
GroupServer: "",
|
||||||
Timeline: Timeline{},
|
Timeline: Timeline{},
|
||||||
Accepted: false,
|
Accepted: false,
|
||||||
IsCompromised: false,
|
IsCompromised: false,
|
||||||
Attributes: nil,
|
Attributes: nil,
|
||||||
lock: sync.Mutex{},
|
lock: sync.Mutex{},
|
||||||
LocalID: "",
|
LocalID: "",
|
||||||
State: "",
|
State: "",
|
||||||
UnacknowledgedMessages: nil,
|
Version: 0,
|
||||||
Version: 0,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
invite, _ := group.Invite()
|
invite, _ := group.Invite()
|
||||||
|
|
|
@ -17,7 +17,7 @@ type Timeline struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
|
||||||
// a cache to allow quick checks for existing messages...
|
// a cache to allow quick checks for existing messages...
|
||||||
signatureCache map[string]bool
|
signatureCache map[string]int
|
||||||
|
|
||||||
// a cache to allowing looking up messages by content hash
|
// a cache to allowing looking up messages by content hash
|
||||||
// we need this for features like reply-to message, and other self
|
// we need this for features like reply-to message, and other self
|
||||||
|
@ -90,12 +90,10 @@ func (t *Timeline) GetCopy() *Timeline {
|
||||||
// SetMessages sets the Messages of this timeline. Only to be used in loading/initialization
|
// SetMessages sets the Messages of this timeline. Only to be used in loading/initialization
|
||||||
func (t *Timeline) SetMessages(messages []Message) {
|
func (t *Timeline) SetMessages(messages []Message) {
|
||||||
t.lock.Lock()
|
t.lock.Lock()
|
||||||
defer t.lock.Unlock()
|
|
||||||
t.init()
|
t.init()
|
||||||
t.Messages = messages
|
t.lock.Unlock()
|
||||||
for idx, message := range t.Messages {
|
for _, m := range messages {
|
||||||
t.signatureCache[base64.StdEncoding.EncodeToString(message.Signature)] = true
|
t.Insert(&m)
|
||||||
t.hashCache[t.calculateHash(message)] = append(t.hashCache[t.calculateHash(message)], idx)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,7 +177,7 @@ func (t *Timeline) Sort() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert a message into the timeline in a thread safe way.
|
// Insert a message into the timeline in a thread safe way.
|
||||||
func (t *Timeline) Insert(mi *Message) bool {
|
func (t *Timeline) Insert(mi *Message) int {
|
||||||
t.lock.Lock()
|
t.lock.Lock()
|
||||||
defer t.lock.Unlock()
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
@ -188,28 +186,44 @@ func (t *Timeline) Insert(mi *Message) bool {
|
||||||
|
|
||||||
// check that we haven't seen this message before (this has no impact on p2p messages, but is essential for
|
// check that we haven't seen this message before (this has no impact on p2p messages, but is essential for
|
||||||
// group messages)
|
// group messages)
|
||||||
_, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)]
|
idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)]
|
||||||
if exists {
|
if exists {
|
||||||
return true
|
t.Messages[idx].Acknowledged = true
|
||||||
|
return idx
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the message store
|
// update the message store
|
||||||
t.Messages = append(t.Messages, *mi)
|
t.Messages = append(t.Messages, *mi)
|
||||||
// add to signature cache for fast checking of group messages...
|
// add to signature cache for fast checking of group messages...
|
||||||
t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = true
|
t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = len(t.Messages) - 1
|
||||||
// content based addressing index
|
// content based addressing index
|
||||||
contentHash := t.calculateHash(*mi)
|
contentHash := t.calculateHash(*mi)
|
||||||
t.hashCache[contentHash] = append(t.hashCache[contentHash], len(t.Messages)-1)
|
t.hashCache[contentHash] = append(t.hashCache[contentHash], len(t.Messages)-1)
|
||||||
return false
|
return len(t.Messages) - 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Timeline) init() {
|
func (t *Timeline) init() {
|
||||||
// only allow this setting once...
|
// only allow this setting once...
|
||||||
if t.signatureCache == nil {
|
if t.signatureCache == nil {
|
||||||
t.signatureCache = make(map[string]bool)
|
t.signatureCache = make(map[string]int)
|
||||||
}
|
}
|
||||||
|
|
||||||
if t.hashCache == nil {
|
if t.hashCache == nil {
|
||||||
t.hashCache = make(map[string][]int)
|
t.hashCache = make(map[string][]int)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetSendError marks a message has having some kind of application specific error.
|
||||||
|
// Note: The message here is indexed by signature.
|
||||||
|
func (t *Timeline) SetSendError(sig []byte, e string) bool {
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(sig)]
|
||||||
|
if !exists {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Messages[idx].Error = e
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
|
@ -423,7 +423,7 @@ func (p *Profile) AddGroup(group *Group) {
|
||||||
|
|
||||||
// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups.
|
// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups.
|
||||||
// If successful, adds the message to the group's timeline
|
// If successful, adds the message to the group's timeline
|
||||||
func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, bool) {
|
func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, int) {
|
||||||
for _, group := range p.Groups {
|
for _, group := range p.Groups {
|
||||||
success, dgm := group.DecryptMessage(ciphertext)
|
success, dgm := group.DecryptMessage(ciphertext)
|
||||||
if success {
|
if success {
|
||||||
|
@ -434,7 +434,7 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool,
|
||||||
// Someone send a message that isn't a valid Decrypted Group Message. Since we require this struct in orer
|
// Someone send a message that isn't a valid Decrypted Group Message. Since we require this struct in orer
|
||||||
// to verify the message, we simply ignore it.
|
// to verify the message, we simply ignore it.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, group.GroupID, nil, false
|
return false, group.GroupID, nil, -1
|
||||||
}
|
}
|
||||||
|
|
||||||
// This now requires knowledge of the Sender, the Onion and the Specific Decrypted Group Message (which should only
|
// This now requires knowledge of the Sender, the Onion and the Specific Decrypted Group Message (which should only
|
||||||
|
@ -460,15 +460,15 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool,
|
||||||
// Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised.
|
// Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised.
|
||||||
if !verified {
|
if !verified {
|
||||||
group.Compromised()
|
group.Compromised()
|
||||||
return false, group.GroupID, nil, false
|
return false, group.GroupID, nil, -1
|
||||||
}
|
}
|
||||||
message, seen := group.AddMessage(dgm, signature)
|
message, index := group.AddMessage(dgm, signature)
|
||||||
return true, group.GroupID, message, seen
|
return true, group.GroupID, message, index
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we couldn't find a group to decrypt the message with we just return false. This is an expected case
|
// If we couldn't find a group to decrypt the message with we just return false. This is an expected case
|
||||||
return false, "", nil, false
|
return false, "", nil, -1
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRandomness(arr *[]byte) {
|
func getRandomness(arr *[]byte) {
|
||||||
|
|
|
@ -56,8 +56,8 @@ type cwtchPeer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *cwtchPeer) SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, path string) {
|
func (cp *cwtchPeer) SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, path string) {
|
||||||
event := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, handle, event.Scope, string(scope), event.Path, string(zone.ConstructZonedPath(path)))
|
ev := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, handle, event.Scope, string(scope), event.Path, string(zone.ConstructZonedPath(path)))
|
||||||
cp.eventBus.Publish(event)
|
cp.eventBus.Publish(ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) {
|
func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) {
|
||||||
|
@ -90,20 +90,46 @@ func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, k
|
||||||
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
|
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
|
||||||
// this function will error
|
// this function will error
|
||||||
func (cp *cwtchPeer) SendMessage(handle string, message string) error {
|
func (cp *cwtchPeer) SendMessage(handle string, message string) error {
|
||||||
|
cp.mutex.Lock()
|
||||||
|
defer cp.mutex.Unlock()
|
||||||
|
|
||||||
|
var ev event.Event
|
||||||
// Group Handles are always 32 bytes in length, but we forgo any further testing here
|
// Group Handles are always 32 bytes in length, but we forgo any further testing here
|
||||||
// and delegate the group existence check to SendMessageToGroupTracked
|
// and delegate the group existence check to EncryptMessageToGroup
|
||||||
if len(handle) == 32 {
|
if len(handle) == 32 {
|
||||||
_, err := cp.SendMessageToGroupTracked(handle, message)
|
group := cp.Profile.GetGroup(handle)
|
||||||
return err
|
if group == nil {
|
||||||
|
return errors.New("invalid group id")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Group adds it's own sent message to timeline
|
||||||
|
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, handle)
|
||||||
|
|
||||||
|
// Group does not exist or some other unrecoverable error...
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ev = event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
|
||||||
} else if tor.IsValidHostname(handle) {
|
} else if tor.IsValidHostname(handle) {
|
||||||
// We assume we are sending to a Contact.
|
// We assume we are sending to a Contact.
|
||||||
// (Servers are technically Contacts)
|
// (Servers are technically Contacts)
|
||||||
cp.SendMessageToPeer(handle, message)
|
contact, exists := cp.Profile.GetContact(handle)
|
||||||
|
ev = event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: handle, event.Data: message})
|
||||||
|
// If the contact exists replace the event id wih the index of this message in the contacts timeline...
|
||||||
|
// Otherwise assume we don't log the message in the timeline...
|
||||||
|
if exists {
|
||||||
|
ev.EventID = strconv.Itoa(contact.Timeline.Len())
|
||||||
|
cp.Profile.AddSentMessageToContactTimeline(handle, message, time.Now(), ev.EventID)
|
||||||
|
}
|
||||||
|
// Regardless we publish the send message to peer event for the protocol engine to execute on...
|
||||||
// We assume this is always successful as it is always valid to attempt to
|
// We assume this is always successful as it is always valid to attempt to
|
||||||
// Contact a valid hostname
|
// Contact a valid hostname
|
||||||
return nil
|
} else {
|
||||||
|
return errors.New("malformed handle type")
|
||||||
}
|
}
|
||||||
return errors.New("malformed handle type")
|
|
||||||
|
cp.eventBus.Publish(ev)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *cwtchPeer) UpdateMessageFlags(handle string, mIdx int, flags uint64) {
|
func (cp *cwtchPeer) UpdateMessageFlags(handle string, mIdx int, flags uint64) {
|
||||||
|
@ -200,22 +226,12 @@ type SendMessages interface {
|
||||||
|
|
||||||
SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, key string)
|
SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, key string)
|
||||||
|
|
||||||
// Deprecated
|
|
||||||
SendMessageToPeer(string, string) string
|
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
// Deprecated use overlays instead
|
// Deprecated use overlays instead
|
||||||
InviteOnionToGroup(string, string) error
|
InviteOnionToGroup(string, string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendMessagesToGroup enables a caller to sender messages to a group
|
// ModifyMessages enables a caller to modify the messages in a timeline
|
||||||
type SendMessagesToGroup interface {
|
|
||||||
|
|
||||||
// Deprecated
|
|
||||||
SendMessageToGroupTracked(string, string) (string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ModifyMessages enables a caller to modify the messages in a timline
|
|
||||||
type ModifyMessages interface {
|
type ModifyMessages interface {
|
||||||
UpdateMessageFlags(string, int, uint64)
|
UpdateMessageFlags(string, int, uint64)
|
||||||
}
|
}
|
||||||
|
@ -233,11 +249,14 @@ type CwtchPeer interface {
|
||||||
StartServerConnections()
|
StartServerConnections()
|
||||||
Shutdown()
|
Shutdown()
|
||||||
|
|
||||||
// Relating to local attributes
|
// Deprecated
|
||||||
GetOnion() string
|
GetOnion() string
|
||||||
|
|
||||||
|
// SetScopedZonedAttribute allows the setting of an attribute by scope and zone
|
||||||
// scope.zone.key = value
|
// scope.zone.key = value
|
||||||
SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string, value string)
|
SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string, value string)
|
||||||
|
|
||||||
|
// GetScopedZonedAttribute allows the retrieval of an attribute by scope and zone
|
||||||
// scope.zone.key = value
|
// scope.zone.key = value
|
||||||
GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool)
|
GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool)
|
||||||
|
|
||||||
|
@ -255,7 +274,6 @@ type CwtchPeer interface {
|
||||||
|
|
||||||
SendMessages
|
SendMessages
|
||||||
ModifyMessages
|
ModifyMessages
|
||||||
SendMessagesToGroup
|
|
||||||
|
|
||||||
ShareFile(fileKey string, serializedManifest string)
|
ShareFile(fileKey string, serializedManifest string)
|
||||||
}
|
}
|
||||||
|
@ -280,7 +298,6 @@ func FromProfile(profile *model.Profile) CwtchPeer {
|
||||||
func (cp *cwtchPeer) Init(eventBus event.Manager) {
|
func (cp *cwtchPeer) Init(eventBus event.Manager) {
|
||||||
cp.InitForEvents(eventBus, DefaultEventsToHandle)
|
cp.InitForEvents(eventBus, DefaultEventsToHandle)
|
||||||
|
|
||||||
|
|
||||||
// Upgrade the Cwtch Peer if necessary
|
// Upgrade the Cwtch Peer if necessary
|
||||||
// It would be nice to do these checks in the storage engine itself, but it is easier to do them here
|
// It would be nice to do these checks in the storage engine itself, but it is easier to do them here
|
||||||
// rather than duplicating the logic to construct/reconstruct attributes in storage engine...
|
// rather than duplicating the logic to construct/reconstruct attributes in storage engine...
|
||||||
|
@ -590,7 +607,7 @@ func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
|
||||||
invite, err := group.Invite()
|
invite, err := group.Invite()
|
||||||
cp.mutex.Unlock()
|
cp.mutex.Unlock()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
cp.SendMessageToPeer(onion, invite)
|
err = cp.SendMessage(onion, invite)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -626,45 +643,9 @@ func (cp *cwtchPeer) ResyncServer(onion string) error {
|
||||||
return errors.New("no keys found for server connection")
|
return errors.New("no keys found for server connection")
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendMessageToGroupTracked attempts to sent the given message to the given group id.
|
|
||||||
// It returns the signature of the message which can be used to identify it in any UX layer.
|
|
||||||
func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) {
|
|
||||||
cp.mutex.Lock()
|
|
||||||
defer cp.mutex.Unlock()
|
|
||||||
group := cp.Profile.GetGroup(groupid)
|
|
||||||
|
|
||||||
if group == nil {
|
|
||||||
return "", errors.New("invalid group id")
|
|
||||||
}
|
|
||||||
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
|
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: groupid, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
|
|
||||||
}
|
|
||||||
|
|
||||||
return base64.StdEncoding.EncodeToString(sig), err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
|
|
||||||
cp.mutex.Lock()
|
|
||||||
defer cp.mutex.Unlock()
|
|
||||||
event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message})
|
|
||||||
contact, exists := cp.Profile.GetContact(onion)
|
|
||||||
|
|
||||||
// If the contact exists replace the event id wih the index of this message in the contacts timeline...
|
|
||||||
// Otherwise assume we don't log the message in the timeline...
|
|
||||||
if exists {
|
|
||||||
event.EventID = strconv.Itoa(contact.Timeline.Len())
|
|
||||||
cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID)
|
|
||||||
}
|
|
||||||
// Regardless we publish the send message to peer event for the protocol engine to execute on...
|
|
||||||
cp.eventBus.Publish(event)
|
|
||||||
return event.EventID
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cp *cwtchPeer) SendGetValToPeer(onion string, scope string, path string) {
|
func (cp *cwtchPeer) SendGetValToPeer(onion string, scope string, path string) {
|
||||||
event := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, onion, event.Scope, scope, event.Path, path)
|
ev := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, onion, event.Scope, scope, event.Path, path)
|
||||||
cp.eventBus.Publish(event)
|
cp.eventBus.Publish(ev)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlockPeer blocks an existing peer relationship.
|
// BlockPeer blocks an existing peer relationship.
|
||||||
|
@ -685,9 +666,9 @@ func (cp *cwtchPeer) AcceptInvite(groupID string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID}))
|
cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID}))
|
||||||
cp.JoinServer(cp.Profile.Groups[groupID].GroupServer)
|
err = cp.JoinServer(cp.Profile.Groups[groupID].GroupServer)
|
||||||
|
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// RejectInvite rejects a given group invite.
|
// RejectInvite rejects a given group invite.
|
||||||
|
@ -698,7 +679,7 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
|
||||||
cp.eventBus.Publish(event.NewEvent(event.RejectGroupInvite, map[event.Field]string{event.GroupID: groupID}))
|
cp.eventBus.Publish(event.NewEvent(event.RejectGroupInvite, map[event.Field]string{event.GroupID: groupID}))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
|
// Listen makes the peer open a listening port to accept incoming connections (and be detectably online)
|
||||||
func (cp *cwtchPeer) Listen() {
|
func (cp *cwtchPeer) Listen() {
|
||||||
cp.mutex.Lock()
|
cp.mutex.Lock()
|
||||||
defer cp.mutex.Unlock()
|
defer cp.mutex.Unlock()
|
||||||
|
@ -724,7 +705,10 @@ func (cp *cwtchPeer) StartPeersConnections() {
|
||||||
func (cp *cwtchPeer) StartServerConnections() {
|
func (cp *cwtchPeer) StartServerConnections() {
|
||||||
for _, contact := range cp.GetContacts() {
|
for _, contact := range cp.GetContacts() {
|
||||||
if cp.GetContact(contact).IsServer() {
|
if cp.GetContact(contact).IsServer() {
|
||||||
cp.JoinServer(contact)
|
err := cp.JoinServer(contact)
|
||||||
|
if err == nil {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -828,10 +812,10 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
|
cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
|
||||||
|
|
||||||
cp.mutex.Lock()
|
cp.mutex.Lock()
|
||||||
ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature)
|
ok, groupID, message, index := cp.Profile.AttemptDecryption(ciphertext, signature)
|
||||||
cp.mutex.Unlock()
|
cp.mutex.Unlock()
|
||||||
if ok && !seen {
|
if ok && index > -1 {
|
||||||
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
|
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID, event.Index: strconv.Itoa(index)}))
|
||||||
}
|
}
|
||||||
|
|
||||||
// The group has been compromised
|
// The group has been compromised
|
||||||
|
@ -868,7 +852,10 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
case event.RetryServerRequest:
|
case event.RetryServerRequest:
|
||||||
// Automated Join Server Request triggered by a plugin.
|
// Automated Join Server Request triggered by a plugin.
|
||||||
log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer])
|
log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer])
|
||||||
cp.JoinServer(ev.Data[event.GroupServer])
|
err := cp.JoinServer(ev.Data[event.GroupServer])
|
||||||
|
if err == nil {
|
||||||
|
log.Debugf("error joining server... %v", err)
|
||||||
|
}
|
||||||
case event.NewGetValMessageFromPeer:
|
case event.NewGetValMessageFromPeer:
|
||||||
onion := ev.Data[event.RemotePeer]
|
onion := ev.Data[event.RemotePeer]
|
||||||
scope := ev.Data[event.Scope]
|
scope := ev.Data[event.Scope]
|
||||||
|
@ -901,7 +888,7 @@ func (cp *cwtchPeer) eventHandler() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/***** Non default but requestable handlable events *****/
|
/***** Non default but requestable handleable events *****/
|
||||||
|
|
||||||
case event.ManifestReceived:
|
case event.ManifestReceived:
|
||||||
log.Debugf("Manifest Received Event!: %v", ev)
|
log.Debugf("Manifest Received Event!: %v", ev)
|
||||||
|
|
|
@ -50,7 +50,7 @@ func TestProfileStoreUpgradeV0toV1(t *testing.T) {
|
||||||
fmt.Println("Sending 200 messages...")
|
fmt.Println("Sending 200 messages...")
|
||||||
|
|
||||||
for i := 0; i < 200; i++ {
|
for i := 0; i < 200; i++ {
|
||||||
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage)
|
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage, []byte{byte(i)})
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Shutdown v0 profile store...")
|
fmt.Println("Shutdown v0 profile store...")
|
||||||
|
|
|
@ -63,10 +63,10 @@ func (ps *ProfileStoreV0) AddGroup(invite string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddGroupMessage for testing, adds a group message
|
// AddGroupMessage for testing, adds a group message
|
||||||
func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string) {
|
func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string, signature []byte) {
|
||||||
received, _ := time.Parse(time.RFC3339Nano, timeRecvied)
|
received, _ := time.Parse(time.RFC3339Nano, timeRecvied)
|
||||||
sent, _ := time.Parse(time.RFC3339Nano, timeSent)
|
sent, _ := time.Parse(time.RFC3339Nano, timeSent)
|
||||||
message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: []byte("signature"), PreviousMessageSig: []byte("PreviousSignature")}
|
message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: signature, PreviousMessageSig: []byte("PreviousSignature")}
|
||||||
ss, exists := ps.streamStores[groupid]
|
ss, exists := ps.streamStores[groupid]
|
||||||
if exists {
|
if exists {
|
||||||
ss.Write(message)
|
ss.Write(message)
|
||||||
|
|
|
@ -42,7 +42,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
|
||||||
|
|
||||||
ps1.AddGroup(invite)
|
ps1.AddGroup(invite)
|
||||||
|
|
||||||
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage)
|
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage, []byte{byte(0x01)})
|
||||||
|
|
||||||
ps1.Shutdown()
|
ps1.Shutdown()
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ package v1
|
||||||
import (
|
import (
|
||||||
"cwtch.im/cwtch/event"
|
"cwtch.im/cwtch/event"
|
||||||
"cwtch.im/cwtch/model"
|
"cwtch.im/cwtch/model"
|
||||||
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
@ -107,6 +108,7 @@ func TestProfileStoreChangePassword(t *testing.T) {
|
||||||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||||
event.RemotePeer: profile.Onion,
|
event.RemotePeer: profile.Onion,
|
||||||
event.Data: testMessage,
|
event.Data: testMessage,
|
||||||
|
event.Signature: base64.StdEncoding.EncodeToString([]byte{byte(i)}),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,6 +131,7 @@ func TestProfileStoreChangePassword(t *testing.T) {
|
||||||
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
|
||||||
event.RemotePeer: profile.Onion,
|
event.RemotePeer: profile.Onion,
|
||||||
event.Data: testMessage,
|
event.Data: testMessage,
|
||||||
|
event.Signature: base64.StdEncoding.EncodeToString([]byte{0x01, byte(i)}),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(3 * time.Second)
|
||||||
|
|
|
@ -224,7 +224,9 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
alice.SendScopedZonedGetValToContact(carol.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
|
alice.SendScopedZonedGetValToContact(carol.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
|
||||||
carol.SendScopedZonedGetValToContact(alice.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
|
carol.SendScopedZonedGetValToContact(alice.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
|
||||||
|
|
||||||
time.Sleep(10 * time.Second)
|
// This used to be 10, but increasing it to 30 because this is now causing frequent issues
|
||||||
|
// Probably related to latency/throughput problems in the underlying tor network.
|
||||||
|
time.Sleep(30 * time.Second)
|
||||||
|
|
||||||
aliceName, exists := bob.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope(constants.Name))
|
aliceName, exists := bob.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope(constants.Name))
|
||||||
if !exists || aliceName != "Alice" {
|
if !exists || aliceName != "Alice" {
|
||||||
|
@ -281,25 +283,25 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
fmt.Println("Starting conversation in group...")
|
fmt.Println("Starting conversation in group...")
|
||||||
// Conversation
|
// Conversation
|
||||||
fmt.Printf("%v> %v\n", aliceName, aliceLines[0])
|
fmt.Printf("%v> %v\n", aliceName, aliceLines[0])
|
||||||
_, err = alice.SendMessageToGroupTracked(groupID, aliceLines[0])
|
err = alice.SendMessage(groupID, aliceLines[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Alice failed to send a message to the group: %v", err)
|
t.Fatalf("Alice failed to send a message to the group: %v", err)
|
||||||
}
|
}
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
|
|
||||||
fmt.Printf("%v> %v\n", bobName, bobLines[0])
|
fmt.Printf("%v> %v\n", bobName, bobLines[0])
|
||||||
_, err = bob.SendMessageToGroupTracked(groupID, bobLines[0])
|
err = bob.SendMessage(groupID, bobLines[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Bob failed to send a message to the group: %v", err)
|
t.Fatalf("Bob failed to send a message to the group: %v", err)
|
||||||
}
|
}
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
|
|
||||||
fmt.Printf("%v> %v\n", aliceName, aliceLines[1])
|
fmt.Printf("%v> %v\n", aliceName, aliceLines[1])
|
||||||
alice.SendMessageToGroupTracked(groupID, aliceLines[1])
|
alice.SendMessage(groupID, aliceLines[1])
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
|
|
||||||
fmt.Printf("%v> %v\n", bobName, bobLines[1])
|
fmt.Printf("%v> %v\n", bobName, bobLines[1])
|
||||||
bob.SendMessageToGroupTracked(groupID, bobLines[1])
|
bob.SendMessage(groupID, bobLines[1])
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
|
|
||||||
fmt.Println("Alice inviting Carol to group...")
|
fmt.Println("Alice inviting Carol to group...")
|
||||||
|
@ -333,12 +335,12 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
numGoRotinesPostCarolConnect := runtime.NumGoroutine()
|
numGoRotinesPostCarolConnect := runtime.NumGoroutine()
|
||||||
|
|
||||||
fmt.Printf("%v> %v", bobName, bobLines[2])
|
fmt.Printf("%v> %v", bobName, bobLines[2])
|
||||||
bob.SendMessageToGroupTracked(groupID, bobLines[2])
|
bob.SendMessage(groupID, bobLines[2])
|
||||||
// Bob should have enough tokens so we don't need to account for
|
// Bob should have enough tokens so we don't need to account for
|
||||||
// token acquisition here...
|
// token acquisition here...
|
||||||
|
|
||||||
fmt.Printf("%v> %v", carolName, carolLines[0])
|
fmt.Printf("%v> %v", carolName, carolLines[0])
|
||||||
carol.SendMessageToGroupTracked(groupID, carolLines[0])
|
carol.SendMessage(groupID, carolLines[0])
|
||||||
time.Sleep(time.Second * 30) // we need to account for spam-based token acquisition, but everything should
|
time.Sleep(time.Second * 30) // we need to account for spam-based token acquisition, but everything should
|
||||||
// be warmed-up and delays should be pretty small.
|
// be warmed-up and delays should be pretty small.
|
||||||
|
|
||||||
|
@ -407,7 +409,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
||||||
carolGroupTimeline := carolsGroup.GetTimeline()
|
carolGroupTimeline := carolsGroup.GetTimeline()
|
||||||
if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
|
if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
|
||||||
carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] ||
|
carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] ||
|
||||||
carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] {
|
carolGroupTimeline[4].Message != carolLines[0] || carolGroupTimeline[5].Message != bobLines[2] {
|
||||||
t.Errorf("Some of Carol's timeline messages did not have the expected content!")
|
t.Errorf("Some of Carol's timeline messages did not have the expected content!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue