From 4606489cca840ac7a2007bfb306adaa45231ebe1 Mon Sep 17 00:00:00 2001 From: erinn Date: Sat, 3 Oct 2020 15:13:06 -0700 Subject: [PATCH 1/8] add cwtchPeer.InitForEvents which allows overriding the default set of eventbus events handles by the peer --- peer/cwtch_peer.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 109bc1c..f56e096 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -21,6 +21,16 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true, event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true} +var DefaultEventsToHandle = []event.Type{ + event.EncryptedGroupMessage, + event.NewMessageFromPeer, + event.PeerAcknowledgement, + event.NewGroupInvite, + event.PeerError, + event.SendMessageToGroupError, + event.NewGetValMessageFromPeer, +} + // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer type cwtchPeer struct { Profile *model.Profile @@ -101,12 +111,15 @@ func FromProfile(profile *model.Profile) CwtchPeer { // Init instantiates a cwtchPeer func (cp *cwtchPeer) Init(eventBus event.Manager) { + cp.InitForEvents(eventBus, DefaultEventsToHandle) +} + +func (cp *cwtchPeer) InitForEvents(eventBus event.Manager, toBeHandled []event.Type) { cp.queue = event.NewQueue() go cp.eventHandler() cp.eventBus = eventBus - cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement, event.NewGroupInvite, - event.PeerError, event.SendMessageToGroupError, event.NewGetValMessageFromPeer}) + cp.AutoHandleEvents(toBeHandled) } // AutoHandleEvents sets an event (if able) to be handled by this peer From b423f1e17636c90aa1d7e682376820f55267c736 Mon Sep 17 00:00:00 2001 From: erinn Date: Sat, 3 Oct 2020 16:09:47 -0700 Subject: [PATCH 2/8] expose cwtchPeer.Profile.AddMessageToContactTimeline() via StoreMessage() --- peer/cwtch_peer.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index f56e096..bcbf1fa 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -50,6 +50,7 @@ type CwtchPeer interface { InviteOnionToGroup(string, string) error SendMessageToPeer(string, string) string SendGetValToPeer(string, string, string) + StoreMessage(onion string, messageTxt string, sent time.Time) SetContactAuthorization(string, model.Authorization) error ProcessInvite(string, string) (string, error) @@ -566,6 +567,12 @@ func (cp *cwtchPeer) Shutdown() { cp.queue.Shutdown() } +func (cp *cwtchPeer) StoreMessage(onion string, messageTxt string, sent time.Time) { + cp.mutex.Lock() + cp.Profile.AddMessageToContactTimeline(onion, messageTxt, sent) + cp.mutex.Unlock() +} + // eventHandler process events from other subsystems func (cp *cwtchPeer) eventHandler() { for { @@ -583,10 +590,7 @@ func (cp *cwtchPeer) eventHandler() { } case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data - ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) - cp.mutex.Lock() - cp.Profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) - cp.mutex.Unlock() + cp.StoreMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) case event.PeerAcknowledgement: cp.mutex.Lock() From d3930ec78f80daab06be505ba44e7a90babfa150 Mon Sep 17 00:00:00 2001 From: erinn Date: Sat, 3 Oct 2020 16:49:05 -0700 Subject: [PATCH 3/8] bugfix --- peer/cwtch_peer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index bcbf1fa..9f46acf 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -590,6 +590,7 @@ func (cp *cwtchPeer) eventHandler() { } case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data + ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) cp.StoreMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) case event.PeerAcknowledgement: From 05eed99a3c431ec376de6affacd435ac4de6e544 Mon Sep 17 00:00:00 2001 From: erinn Date: Wed, 7 Oct 2020 13:53:22 -0700 Subject: [PATCH 4/8] UnackdMessages to public --- model/group.go | 14 +++++++------- model/profile.go | 18 +++++++++--------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/model/group.go b/model/group.go index aaa2ddb..8583b3b 100644 --- a/model/group.go +++ b/model/group.go @@ -33,7 +33,7 @@ type Group struct { lock sync.Mutex LocalID string State string `json:"-"` - unacknowledgedMessages []Message + UnacknowledgedMessages []Message Version int } @@ -119,7 +119,7 @@ func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte PreviousMessageSig: message.PreviousMessageSig, ReceivedByServer: false, } - g.unacknowledgedMessages = append(g.unacknowledgedMessages, timelineMessage) + g.UnacknowledgedMessages = append(g.UnacknowledgedMessages, timelineMessage) return timelineMessage } @@ -130,10 +130,10 @@ func (g *Group) ErrorSentMessage(sig []byte, error string) bool { var message *Message // Delete the message from the unack'd buffer if it exists - for i, unAckedMessage := range g.unacknowledgedMessages { + for i, unAckedMessage := range g.UnacknowledgedMessages { if compareSignatures(unAckedMessage.Signature, sig) { message = &unAckedMessage - g.unacknowledgedMessages = append(g.unacknowledgedMessages[:i], g.unacknowledgedMessages[i+1:]...) + g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...) message.Error = error g.Timeline.Insert(message) @@ -150,9 +150,9 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (* defer g.lock.Unlock() // Delete the message from the unack'd buffer if it exists - for i, unAckedMessage := range g.unacknowledgedMessages { + for i, unAckedMessage := range g.UnacknowledgedMessages { if compareSignatures(unAckedMessage.Signature, sig) { - g.unacknowledgedMessages = append(g.unacknowledgedMessages[:i], g.unacknowledgedMessages[i+1:]...) + g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...) break } } @@ -176,7 +176,7 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (* func (g *Group) GetTimeline() (timeline []Message) { g.lock.Lock() defer g.lock.Unlock() - return append(g.Timeline.GetMessages(), g.unacknowledgedMessages...) + return append(g.Timeline.GetMessages(), g.UnacknowledgedMessages...) } //EncryptMessage takes a message and encrypts the message under the group key. diff --git a/model/profile.go b/model/profile.go index 85099c3..4bd449f 100644 --- a/model/profile.go +++ b/model/profile.go @@ -40,7 +40,7 @@ type PublicProfile struct { LocalID string // used by storage engine State string `json:"-"` lock sync.Mutex - unacknowledgedMessages map[string]Message + UnacknowledgedMessages map[string]Message } // Profile encapsulates all the attributes necessary to be a Cwtch Peer. @@ -66,7 +66,7 @@ func (p *PublicProfile) init() { if p.Attributes == nil { p.Attributes = make(map[string]string) } - p.unacknowledgedMessages = make(map[string]Message) + p.UnacknowledgedMessages = make(map[string]Message) p.LocalID = GenerateRandomID() } @@ -150,10 +150,10 @@ func (p *Profile) AddSentMessageToContactTimeline(onion string, messageTxt strin sig := p.SignMessage(onion + messageTxt + sent.String() + now.String()) message := &Message{PeerID: p.Onion, Message: messageTxt, Timestamp: sent, Received: now, Signature: sig, Acknowledged: false} - if contact.unacknowledgedMessages == nil { - contact.unacknowledgedMessages = make(map[string]Message) + if contact.UnacknowledgedMessages == nil { + contact.UnacknowledgedMessages = make(map[string]Message) } - contact.unacknowledgedMessages[eventID] = *message + contact.UnacknowledgedMessages[eventID] = *message return message } return nil @@ -182,11 +182,11 @@ func (p *Profile) ErrorSentMessageToPeer(onion string, eventID string, error str contact, ok := p.Contacts[onion] if ok { - message, ok := contact.unacknowledgedMessages[eventID] + message, ok := contact.UnacknowledgedMessages[eventID] if ok { message.Error = error contact.Timeline.Insert(&message) // TODO: do we want a non timeline.Insert way to handle errors - delete(contact.unacknowledgedMessages, eventID) + delete(contact.UnacknowledgedMessages, eventID) } } } @@ -198,11 +198,11 @@ func (p *Profile) AckSentMessageToPeer(onion string, eventID string) { contact, ok := p.Contacts[onion] if ok { - message, ok := contact.unacknowledgedMessages[eventID] + message, ok := contact.UnacknowledgedMessages[eventID] if ok { message.Acknowledged = true contact.Timeline.Insert(&message) - delete(contact.unacknowledgedMessages, eventID) + delete(contact.UnacknowledgedMessages, eventID) } } } From 8c9c66d2a1e64730e5e378de8b1c136070af45d6 Mon Sep 17 00:00:00 2001 From: erinn Date: Thu, 8 Oct 2020 12:40:27 -0700 Subject: [PATCH 5/8] unackd messages to timeline instead of separate map --- model/profile.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/model/profile.go b/model/profile.go index 4bd449f..9cb9b8e 100644 --- a/model/profile.go +++ b/model/profile.go @@ -40,7 +40,7 @@ type PublicProfile struct { LocalID string // used by storage engine State string `json:"-"` lock sync.Mutex - UnacknowledgedMessages map[string]Message + UnacknowledgedMessages map[string]int } // Profile encapsulates all the attributes necessary to be a Cwtch Peer. @@ -66,7 +66,7 @@ func (p *PublicProfile) init() { if p.Attributes == nil { p.Attributes = make(map[string]string) } - p.UnacknowledgedMessages = make(map[string]Message) + p.UnacknowledgedMessages = make(map[string]int) p.LocalID = GenerateRandomID() } @@ -151,9 +151,10 @@ func (p *Profile) AddSentMessageToContactTimeline(onion string, messageTxt strin message := &Message{PeerID: p.Onion, Message: messageTxt, Timestamp: sent, Received: now, Signature: sig, Acknowledged: false} if contact.UnacknowledgedMessages == nil { - contact.UnacknowledgedMessages = make(map[string]Message) + contact.UnacknowledgedMessages = make(map[string]int) } - contact.UnacknowledgedMessages[eventID] = *message + p.AddMessageToContactTimeline(onion, messageTxt, sent) + contact.UnacknowledgedMessages[eventID] = p.Timeline.Len() - 1 return message } return nil @@ -182,10 +183,9 @@ func (p *Profile) ErrorSentMessageToPeer(onion string, eventID string, error str contact, ok := p.Contacts[onion] if ok { - message, ok := contact.UnacknowledgedMessages[eventID] + mIdx, ok := contact.UnacknowledgedMessages[eventID] if ok { - message.Error = error - contact.Timeline.Insert(&message) // TODO: do we want a non timeline.Insert way to handle errors + p.Timeline.Messages[mIdx].Error = error delete(contact.UnacknowledgedMessages, eventID) } } @@ -198,10 +198,9 @@ func (p *Profile) AckSentMessageToPeer(onion string, eventID string) { contact, ok := p.Contacts[onion] if ok { - message, ok := contact.UnacknowledgedMessages[eventID] + mIdx, ok := contact.UnacknowledgedMessages[eventID] if ok { - message.Acknowledged = true - contact.Timeline.Insert(&message) + p.Timeline.Messages[mIdx].Acknowledged = true delete(contact.UnacknowledgedMessages, eventID) } } From bc047108566be12dbc678b42fc5acc596f3991da Mon Sep 17 00:00:00 2001 From: erinn Date: Thu, 8 Oct 2020 12:51:17 -0700 Subject: [PATCH 6/8] bugfix --- model/profile.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model/profile.go b/model/profile.go index 9cb9b8e..cb8c048 100644 --- a/model/profile.go +++ b/model/profile.go @@ -153,7 +153,7 @@ func (p *Profile) AddSentMessageToContactTimeline(onion string, messageTxt strin if contact.UnacknowledgedMessages == nil { contact.UnacknowledgedMessages = make(map[string]int) } - p.AddMessageToContactTimeline(onion, messageTxt, sent) + contact.Timeline.Insert(message) contact.UnacknowledgedMessages[eventID] = p.Timeline.Len() - 1 return message } From a2c5a28e092d9766196b730128c53d8238b3bfb2 Mon Sep 17 00:00:00 2001 From: erinn Date: Thu, 8 Oct 2020 13:08:20 -0700 Subject: [PATCH 7/8] bugfix --- model/profile.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/model/profile.go b/model/profile.go index cb8c048..d97911d 100644 --- a/model/profile.go +++ b/model/profile.go @@ -154,7 +154,7 @@ func (p *Profile) AddSentMessageToContactTimeline(onion string, messageTxt strin contact.UnacknowledgedMessages = make(map[string]int) } contact.Timeline.Insert(message) - contact.UnacknowledgedMessages[eventID] = p.Timeline.Len() - 1 + contact.UnacknowledgedMessages[eventID] = contact.Timeline.Len() - 1 return message } return nil @@ -200,7 +200,7 @@ func (p *Profile) AckSentMessageToPeer(onion string, eventID string) { if ok { mIdx, ok := contact.UnacknowledgedMessages[eventID] if ok { - p.Timeline.Messages[mIdx].Acknowledged = true + contact.Timeline.Messages[mIdx].Acknowledged = true delete(contact.UnacknowledgedMessages, eventID) } } From 1933fb703fb0f0047252c474beafccce5b303999 Mon Sep 17 00:00:00 2001 From: erinn Date: Thu, 15 Oct 2020 22:39:57 -0700 Subject: [PATCH 8/8] testing new message id id-ea --- peer/cwtch_peer.go | 5 ++++- server/app/main.go | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 9f46acf..d9ec40f 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -418,12 +418,15 @@ func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) ( func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message}) - cp.eventBus.Publish(event) cp.mutex.Lock() + contact, _ := cp.Profile.GetContact(onion) + event.EventID = strconv.Itoa(contact.Timeline.Len()) cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID) cp.mutex.Unlock() + cp.eventBus.Publish(event) + return event.EventID } diff --git a/server/app/main.go b/server/app/main.go index 6e8dd4a..4ced0ba 100644 --- a/server/app/main.go +++ b/server/app/main.go @@ -40,7 +40,7 @@ func main() { serverConfig := cwtchserver.LoadConfig(configDir, serverConfigFile) - acn, err := tor.NewTorACNWithAuth(".", "", 9051, tor.HashedPasswordAuthenticator{Password: "examplehashedpassword"}) + acn, err := tor.NewTorACNWithAuth(".", "./tor/tor", 9010, tor.NullAuthenticator{}) if err != nil { log.Errorf("\nError connecting to Tor: %v\n", err) os.Exit(1) @@ -61,7 +61,7 @@ func main() { if err != nil { panic(err) } - fmt.Printf("%v", "torv3"+base64.StdEncoding.EncodeToString(invite)) + fmt.Printf("%v\n", "torv3"+base64.StdEncoding.EncodeToString(invite)) bundle := server.KeyBundle().Serialize() log.Infof("Server Config: server:%s", base64.StdEncoding.EncodeToString(bundle))