Merge pull request 'allow custom message handling' (#325) from specify-events into master
the build was successful Details

Reviewed-on: #325
This commit is contained in:
Sarah Jamie Lewis 2020-10-22 16:34:21 -07:00
commit 21c41f24ee
4 changed files with 50 additions and 27 deletions

View File

@ -33,7 +33,7 @@ type Group struct {
lock sync.Mutex lock sync.Mutex
LocalID string LocalID string
State string `json:"-"` State string `json:"-"`
unacknowledgedMessages []Message UnacknowledgedMessages []Message
Version int Version int
} }
@ -119,7 +119,7 @@ func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte
PreviousMessageSig: message.PreviousMessageSig, PreviousMessageSig: message.PreviousMessageSig,
ReceivedByServer: false, ReceivedByServer: false,
} }
g.unacknowledgedMessages = append(g.unacknowledgedMessages, timelineMessage) g.UnacknowledgedMessages = append(g.UnacknowledgedMessages, timelineMessage)
return timelineMessage return timelineMessage
} }
@ -130,10 +130,10 @@ func (g *Group) ErrorSentMessage(sig []byte, error string) bool {
var message *Message var message *Message
// Delete the message from the unack'd buffer if it exists // Delete the message from the unack'd buffer if it exists
for i, unAckedMessage := range g.unacknowledgedMessages { for i, unAckedMessage := range g.UnacknowledgedMessages {
if compareSignatures(unAckedMessage.Signature, sig) { if compareSignatures(unAckedMessage.Signature, sig) {
message = &unAckedMessage message = &unAckedMessage
g.unacknowledgedMessages = append(g.unacknowledgedMessages[:i], g.unacknowledgedMessages[i+1:]...) g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...)
message.Error = error message.Error = error
g.Timeline.Insert(message) g.Timeline.Insert(message)
@ -150,9 +150,9 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*
defer g.lock.Unlock() defer g.lock.Unlock()
// Delete the message from the unack'd buffer if it exists // Delete the message from the unack'd buffer if it exists
for i, unAckedMessage := range g.unacknowledgedMessages { for i, unAckedMessage := range g.UnacknowledgedMessages {
if compareSignatures(unAckedMessage.Signature, sig) { if compareSignatures(unAckedMessage.Signature, sig) {
g.unacknowledgedMessages = append(g.unacknowledgedMessages[:i], g.unacknowledgedMessages[i+1:]...) g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...)
break break
} }
} }
@ -176,7 +176,7 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*
func (g *Group) GetTimeline() (timeline []Message) { func (g *Group) GetTimeline() (timeline []Message) {
g.lock.Lock() g.lock.Lock()
defer g.lock.Unlock() defer g.lock.Unlock()
return append(g.Timeline.GetMessages(), g.unacknowledgedMessages...) return append(g.Timeline.GetMessages(), g.UnacknowledgedMessages...)
} }
//EncryptMessage takes a message and encrypts the message under the group key. //EncryptMessage takes a message and encrypts the message under the group key.

View File

@ -40,7 +40,7 @@ type PublicProfile struct {
LocalID string // used by storage engine LocalID string // used by storage engine
State string `json:"-"` State string `json:"-"`
lock sync.Mutex lock sync.Mutex
unacknowledgedMessages map[string]Message UnacknowledgedMessages map[string]int
} }
// Profile encapsulates all the attributes necessary to be a Cwtch Peer. // Profile encapsulates all the attributes necessary to be a Cwtch Peer.
@ -66,7 +66,7 @@ func (p *PublicProfile) init() {
if p.Attributes == nil { if p.Attributes == nil {
p.Attributes = make(map[string]string) p.Attributes = make(map[string]string)
} }
p.unacknowledgedMessages = make(map[string]Message) p.UnacknowledgedMessages = make(map[string]int)
p.LocalID = GenerateRandomID() p.LocalID = GenerateRandomID()
} }
@ -150,10 +150,11 @@ func (p *Profile) AddSentMessageToContactTimeline(onion string, messageTxt strin
sig := p.SignMessage(onion + messageTxt + sent.String() + now.String()) sig := p.SignMessage(onion + messageTxt + sent.String() + now.String())
message := &Message{PeerID: p.Onion, Message: messageTxt, Timestamp: sent, Received: now, Signature: sig, Acknowledged: false} message := &Message{PeerID: p.Onion, Message: messageTxt, Timestamp: sent, Received: now, Signature: sig, Acknowledged: false}
if contact.unacknowledgedMessages == nil { if contact.UnacknowledgedMessages == nil {
contact.unacknowledgedMessages = make(map[string]Message) contact.UnacknowledgedMessages = make(map[string]int)
} }
contact.unacknowledgedMessages[eventID] = *message contact.Timeline.Insert(message)
contact.UnacknowledgedMessages[eventID] = contact.Timeline.Len() - 1
return message return message
} }
return nil return nil
@ -182,11 +183,10 @@ func (p *Profile) ErrorSentMessageToPeer(onion string, eventID string, error str
contact, ok := p.Contacts[onion] contact, ok := p.Contacts[onion]
if ok { if ok {
message, ok := contact.unacknowledgedMessages[eventID] mIdx, ok := contact.UnacknowledgedMessages[eventID]
if ok { if ok {
message.Error = error p.Timeline.Messages[mIdx].Error = error
contact.Timeline.Insert(&message) // TODO: do we want a non timeline.Insert way to handle errors delete(contact.UnacknowledgedMessages, eventID)
delete(contact.unacknowledgedMessages, eventID)
} }
} }
} }
@ -198,11 +198,10 @@ func (p *Profile) AckSentMessageToPeer(onion string, eventID string) {
contact, ok := p.Contacts[onion] contact, ok := p.Contacts[onion]
if ok { if ok {
message, ok := contact.unacknowledgedMessages[eventID] mIdx, ok := contact.UnacknowledgedMessages[eventID]
if ok { if ok {
message.Acknowledged = true contact.Timeline.Messages[mIdx].Acknowledged = true
contact.Timeline.Insert(&message) delete(contact.UnacknowledgedMessages, eventID)
delete(contact.unacknowledgedMessages, eventID)
} }
} }
} }

View File

@ -21,6 +21,18 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true
event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true, event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true,
event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true} event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true}
// DefaultEventsToHandle specifies which events will be subscribed to
// when a peer has its Init() function called
var DefaultEventsToHandle = []event.Type{
event.EncryptedGroupMessage,
event.NewMessageFromPeer,
event.PeerAcknowledgement,
event.NewGroupInvite,
event.PeerError,
event.SendMessageToGroupError,
event.NewGetValMessageFromPeer,
}
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
type cwtchPeer struct { type cwtchPeer struct {
Profile *model.Profile Profile *model.Profile
@ -40,6 +52,7 @@ type CwtchPeer interface {
InviteOnionToGroup(string, string) error InviteOnionToGroup(string, string) error
SendMessageToPeer(string, string) string SendMessageToPeer(string, string) string
SendGetValToPeer(string, string, string) SendGetValToPeer(string, string, string)
StoreMessage(onion string, messageTxt string, sent time.Time)
SetContactAuthorization(string, model.Authorization) error SetContactAuthorization(string, model.Authorization) error
ProcessInvite(string, string) (string, error) ProcessInvite(string, string) (string, error)
@ -101,12 +114,15 @@ func FromProfile(profile *model.Profile) CwtchPeer {
// Init instantiates a cwtchPeer // Init instantiates a cwtchPeer
func (cp *cwtchPeer) Init(eventBus event.Manager) { func (cp *cwtchPeer) Init(eventBus event.Manager) {
cp.InitForEvents(eventBus, DefaultEventsToHandle)
}
func (cp *cwtchPeer) InitForEvents(eventBus event.Manager, toBeHandled []event.Type) {
cp.queue = event.NewQueue() cp.queue = event.NewQueue()
go cp.eventHandler() go cp.eventHandler()
cp.eventBus = eventBus cp.eventBus = eventBus
cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement, event.NewGroupInvite, cp.AutoHandleEvents(toBeHandled)
event.PeerError, event.SendMessageToGroupError, event.NewGetValMessageFromPeer})
} }
// AutoHandleEvents sets an event (if able) to be handled by this peer // AutoHandleEvents sets an event (if able) to be handled by this peer
@ -404,12 +420,15 @@ func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (
func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message}) event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message})
cp.eventBus.Publish(event)
cp.mutex.Lock() cp.mutex.Lock()
contact, _ := cp.Profile.GetContact(onion)
event.EventID = strconv.Itoa(contact.Timeline.Len())
cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID) cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID)
cp.mutex.Unlock() cp.mutex.Unlock()
cp.eventBus.Publish(event)
return event.EventID return event.EventID
} }
@ -553,6 +572,12 @@ func (cp *cwtchPeer) Shutdown() {
cp.queue.Shutdown() cp.queue.Shutdown()
} }
func (cp *cwtchPeer) StoreMessage(onion string, messageTxt string, sent time.Time) {
cp.mutex.Lock()
cp.Profile.AddMessageToContactTimeline(onion, messageTxt, sent)
cp.mutex.Unlock()
}
// eventHandler process events from other subsystems // eventHandler process events from other subsystems
func (cp *cwtchPeer) eventHandler() { func (cp *cwtchPeer) eventHandler() {
for { for {
@ -571,9 +596,7 @@ func (cp *cwtchPeer) eventHandler() {
case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data
ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
cp.mutex.Lock() cp.StoreMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
cp.Profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
cp.mutex.Unlock()
case event.PeerAcknowledgement: case event.PeerAcknowledgement:
cp.mutex.Lock() cp.mutex.Lock()

View File

@ -57,6 +57,7 @@ func main() {
os.MkdirAll("tordir/tor",0700) os.MkdirAll("tordir/tor",0700)
tor.NewTorrc().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("./tordir/tor/torrc") tor.NewTorrc().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("./tordir/tor/torrc")
acn, err := tor.NewTorACNWithAuth("tordir", "", controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) acn, err := tor.NewTorACNWithAuth("tordir", "", controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)})
if err != nil { if err != nil {
log.Errorf("\nError connecting to Tor: %v\n", err) log.Errorf("\nError connecting to Tor: %v\n", err)
os.Exit(1) os.Exit(1)
@ -77,7 +78,7 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
fmt.Printf("%v", "torv3"+base64.StdEncoding.EncodeToString(invite)) fmt.Printf("%v\n", "torv3"+base64.StdEncoding.EncodeToString(invite))
bundle := server.KeyBundle().Serialize() bundle := server.KeyBundle().Serialize()
log.Infof("Server Config: server:%s", base64.StdEncoding.EncodeToString(bundle)) log.Infof("Server Config: server:%s", base64.StdEncoding.EncodeToString(bundle))