diff --git a/model/group.go b/model/group.go index aaa2ddb..8583b3b 100644 --- a/model/group.go +++ b/model/group.go @@ -33,7 +33,7 @@ type Group struct { lock sync.Mutex LocalID string State string `json:"-"` - unacknowledgedMessages []Message + UnacknowledgedMessages []Message Version int } @@ -119,7 +119,7 @@ func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte PreviousMessageSig: message.PreviousMessageSig, ReceivedByServer: false, } - g.unacknowledgedMessages = append(g.unacknowledgedMessages, timelineMessage) + g.UnacknowledgedMessages = append(g.UnacknowledgedMessages, timelineMessage) return timelineMessage } @@ -130,10 +130,10 @@ func (g *Group) ErrorSentMessage(sig []byte, error string) bool { var message *Message // Delete the message from the unack'd buffer if it exists - for i, unAckedMessage := range g.unacknowledgedMessages { + for i, unAckedMessage := range g.UnacknowledgedMessages { if compareSignatures(unAckedMessage.Signature, sig) { message = &unAckedMessage - g.unacknowledgedMessages = append(g.unacknowledgedMessages[:i], g.unacknowledgedMessages[i+1:]...) + g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...) message.Error = error g.Timeline.Insert(message) @@ -150,9 +150,9 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (* defer g.lock.Unlock() // Delete the message from the unack'd buffer if it exists - for i, unAckedMessage := range g.unacknowledgedMessages { + for i, unAckedMessage := range g.UnacknowledgedMessages { if compareSignatures(unAckedMessage.Signature, sig) { - g.unacknowledgedMessages = append(g.unacknowledgedMessages[:i], g.unacknowledgedMessages[i+1:]...) + g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...) break } } @@ -176,7 +176,7 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (* func (g *Group) GetTimeline() (timeline []Message) { g.lock.Lock() defer g.lock.Unlock() - return append(g.Timeline.GetMessages(), g.unacknowledgedMessages...) + return append(g.Timeline.GetMessages(), g.UnacknowledgedMessages...) } //EncryptMessage takes a message and encrypts the message under the group key. diff --git a/model/profile.go b/model/profile.go index 85099c3..d97911d 100644 --- a/model/profile.go +++ b/model/profile.go @@ -40,7 +40,7 @@ type PublicProfile struct { LocalID string // used by storage engine State string `json:"-"` lock sync.Mutex - unacknowledgedMessages map[string]Message + UnacknowledgedMessages map[string]int } // Profile encapsulates all the attributes necessary to be a Cwtch Peer. @@ -66,7 +66,7 @@ func (p *PublicProfile) init() { if p.Attributes == nil { p.Attributes = make(map[string]string) } - p.unacknowledgedMessages = make(map[string]Message) + p.UnacknowledgedMessages = make(map[string]int) p.LocalID = GenerateRandomID() } @@ -150,10 +150,11 @@ func (p *Profile) AddSentMessageToContactTimeline(onion string, messageTxt strin sig := p.SignMessage(onion + messageTxt + sent.String() + now.String()) message := &Message{PeerID: p.Onion, Message: messageTxt, Timestamp: sent, Received: now, Signature: sig, Acknowledged: false} - if contact.unacknowledgedMessages == nil { - contact.unacknowledgedMessages = make(map[string]Message) + if contact.UnacknowledgedMessages == nil { + contact.UnacknowledgedMessages = make(map[string]int) } - contact.unacknowledgedMessages[eventID] = *message + contact.Timeline.Insert(message) + contact.UnacknowledgedMessages[eventID] = contact.Timeline.Len() - 1 return message } return nil @@ -182,11 +183,10 @@ func (p *Profile) ErrorSentMessageToPeer(onion string, eventID string, error str contact, ok := p.Contacts[onion] if ok { - message, ok := contact.unacknowledgedMessages[eventID] + mIdx, ok := contact.UnacknowledgedMessages[eventID] if ok { - message.Error = error - contact.Timeline.Insert(&message) // TODO: do we want a non timeline.Insert way to handle errors - delete(contact.unacknowledgedMessages, eventID) + p.Timeline.Messages[mIdx].Error = error + delete(contact.UnacknowledgedMessages, eventID) } } } @@ -198,11 +198,10 @@ func (p *Profile) AckSentMessageToPeer(onion string, eventID string) { contact, ok := p.Contacts[onion] if ok { - message, ok := contact.unacknowledgedMessages[eventID] + mIdx, ok := contact.UnacknowledgedMessages[eventID] if ok { - message.Acknowledged = true - contact.Timeline.Insert(&message) - delete(contact.unacknowledgedMessages, eventID) + contact.Timeline.Messages[mIdx].Acknowledged = true + delete(contact.UnacknowledgedMessages, eventID) } } } diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 109bc1c..c894bdd 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -21,6 +21,18 @@ var autoHandleableEvents = map[event.Type]bool{event.EncryptedGroupMessage: true event.PeerAcknowledgement: true, event.PeerError: true, event.SendMessageToGroupError: true, event.NewGetValMessageFromPeer: true, event.NewRetValMessageFromPeer: true} +// DefaultEventsToHandle specifies which events will be subscribed to +// when a peer has its Init() function called +var DefaultEventsToHandle = []event.Type{ + event.EncryptedGroupMessage, + event.NewMessageFromPeer, + event.PeerAcknowledgement, + event.NewGroupInvite, + event.PeerError, + event.SendMessageToGroupError, + event.NewGetValMessageFromPeer, +} + // cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer type cwtchPeer struct { Profile *model.Profile @@ -40,6 +52,7 @@ type CwtchPeer interface { InviteOnionToGroup(string, string) error SendMessageToPeer(string, string) string SendGetValToPeer(string, string, string) + StoreMessage(onion string, messageTxt string, sent time.Time) SetContactAuthorization(string, model.Authorization) error ProcessInvite(string, string) (string, error) @@ -101,12 +114,15 @@ func FromProfile(profile *model.Profile) CwtchPeer { // Init instantiates a cwtchPeer func (cp *cwtchPeer) Init(eventBus event.Manager) { + cp.InitForEvents(eventBus, DefaultEventsToHandle) +} + +func (cp *cwtchPeer) InitForEvents(eventBus event.Manager, toBeHandled []event.Type) { cp.queue = event.NewQueue() go cp.eventHandler() cp.eventBus = eventBus - cp.AutoHandleEvents([]event.Type{event.EncryptedGroupMessage, event.NewMessageFromPeer, event.PeerAcknowledgement, event.NewGroupInvite, - event.PeerError, event.SendMessageToGroupError, event.NewGetValMessageFromPeer}) + cp.AutoHandleEvents(toBeHandled) } // AutoHandleEvents sets an event (if able) to be handled by this peer @@ -404,12 +420,15 @@ func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) ( func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string { event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message}) - cp.eventBus.Publish(event) cp.mutex.Lock() + contact, _ := cp.Profile.GetContact(onion) + event.EventID = strconv.Itoa(contact.Timeline.Len()) cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID) cp.mutex.Unlock() + cp.eventBus.Publish(event) + return event.EventID } @@ -553,6 +572,12 @@ func (cp *cwtchPeer) Shutdown() { cp.queue.Shutdown() } +func (cp *cwtchPeer) StoreMessage(onion string, messageTxt string, sent time.Time) { + cp.mutex.Lock() + cp.Profile.AddMessageToContactTimeline(onion, messageTxt, sent) + cp.mutex.Unlock() +} + // eventHandler process events from other subsystems func (cp *cwtchPeer) eventHandler() { for { @@ -571,9 +596,7 @@ func (cp *cwtchPeer) eventHandler() { case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived]) - cp.mutex.Lock() - cp.Profile.AddMessageToContactTimeline(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) - cp.mutex.Unlock() + cp.StoreMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts) case event.PeerAcknowledgement: cp.mutex.Lock() diff --git a/server/app/main.go b/server/app/main.go index 518ac1d..b9e72a6 100644 --- a/server/app/main.go +++ b/server/app/main.go @@ -57,6 +57,7 @@ func main() { os.MkdirAll("tordir/tor",0700) tor.NewTorrc().WithHashedPassword(base64.StdEncoding.EncodeToString(key)).WithControlPort(controlPort).Build("./tordir/tor/torrc") acn, err := tor.NewTorACNWithAuth("tordir", "", controlPort, tor.HashedPasswordAuthenticator{Password: base64.StdEncoding.EncodeToString(key)}) + if err != nil { log.Errorf("\nError connecting to Tor: %v\n", err) os.Exit(1) @@ -77,7 +78,7 @@ func main() { if err != nil { panic(err) } - fmt.Printf("%v", "torv3"+base64.StdEncoding.EncodeToString(invite)) + fmt.Printf("%v\n", "torv3"+base64.StdEncoding.EncodeToString(invite)) bundle := server.KeyBundle().Serialize() log.Infof("Server Config: server:%s", base64.StdEncoding.EncodeToString(bundle))