Fixing Error Handling for Peer Connections, Ack's now don't ack'd error'd messages.
Also fixed 1 concurrent map access issue by moving to sync.Map
This commit is contained in:
parent
d6fc4766a0
commit
a9b1f7904a
|
@ -11,7 +11,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func IncomingListener(callback func(*gobjects.Message), groupErrorCallback func(string,string)) {
|
func IncomingListener(callback func(*gobjects.Message), groupErrorCallback func(string, string,string)) {
|
||||||
q := event.NewEventQueue(1000)
|
q := event.NewEventQueue(1000)
|
||||||
the.CwtchApp.EventBus().Subscribe(event.NewMessageFromPeer, q.EventChannel)
|
the.CwtchApp.EventBus().Subscribe(event.NewMessageFromPeer, q.EventChannel)
|
||||||
the.CwtchApp.EventBus().Subscribe(event.NewMessageFromGroup, q.EventChannel)
|
the.CwtchApp.EventBus().Subscribe(event.NewMessageFromGroup, q.EventChannel)
|
||||||
|
@ -68,9 +68,9 @@ func IncomingListener(callback func(*gobjects.Message), groupErrorCallback func(
|
||||||
case event.NewGroupInvite:
|
case event.NewGroupInvite:
|
||||||
log.Debugf("got a group invite!")
|
log.Debugf("got a group invite!")
|
||||||
case event.SendMessageToGroupError:
|
case event.SendMessageToGroupError:
|
||||||
groupErrorCallback(e.Data[event.Signature], e.Data[event.Error])
|
groupErrorCallback(e.Data[event.GroupServer], e.Data[event.Signature], e.Data[event.Error])
|
||||||
case event.SendMessageToPeerError:
|
case event.SendMessageToPeerError:
|
||||||
groupErrorCallback(e.Data[event.Signature], e.Data[event.Error])
|
groupErrorCallback(e.Data[event.RemotePeer], e.Data[event.Signature], e.Data[event.Error])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -12,4 +12,5 @@ type Message struct {
|
||||||
MessageID string
|
MessageID string
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
Acknowledged bool
|
Acknowledged bool
|
||||||
|
Error bool
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ type GrandCentralDispatcher struct {
|
||||||
_ func(handle, key, value string) `signal:"UpdateContactAttribute"`
|
_ func(handle, key, value string) `signal:"UpdateContactAttribute"`
|
||||||
|
|
||||||
// messages pane stuff
|
// messages pane stuff
|
||||||
_ func(handle, from, displayName, message, image string, mID string, fromMe bool, ts string, ackd bool) `signal:"AppendMessage"`
|
_ func(handle, from, displayName, message, image string, mID string, fromMe bool, ts string, ackd bool, error bool) `signal:"AppendMessage"`
|
||||||
_ func() `signal:"ClearMessages"`
|
_ func() `signal:"ClearMessages"`
|
||||||
_ func() `signal:"ResetMessagePane"`
|
_ func() `signal:"ResetMessagePane"`
|
||||||
_ func(mID string) `signal:"Acknowledged"`
|
_ func(mID string) `signal:"Acknowledged"`
|
||||||
|
@ -99,6 +99,7 @@ func (this *GrandCentralDispatcher) sendMessage(message string, mID string) {
|
||||||
mID,
|
mID,
|
||||||
time.Now(),
|
time.Now(),
|
||||||
false,
|
false,
|
||||||
|
false,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -118,7 +119,7 @@ func (this *GrandCentralDispatcher) sendMessage(message string, mID string) {
|
||||||
mID = the.Peer.SendMessageToPeer(to, message)
|
mID = the.Peer.SendMessageToPeer(to, message)
|
||||||
|
|
||||||
this.UIState.AddMessage(&gobjects.Message{
|
this.UIState.AddMessage(&gobjects.Message{
|
||||||
this.CurrentOpenConversation(),
|
to,
|
||||||
"me",
|
"me",
|
||||||
"",
|
"",
|
||||||
message,
|
message,
|
||||||
|
@ -127,6 +128,7 @@ func (this *GrandCentralDispatcher) sendMessage(message string, mID string) {
|
||||||
mID,
|
mID,
|
||||||
time.Now(),
|
time.Now(),
|
||||||
false,
|
false,
|
||||||
|
false,
|
||||||
})
|
})
|
||||||
|
|
||||||
ackID := new(the.AckId)
|
ackID := new(the.AckId)
|
||||||
|
@ -201,6 +203,7 @@ func (this *GrandCentralDispatcher) loadMessagesPaneHelper(handle string) {
|
||||||
tl[i].PeerID == the.Peer.GetProfile().Onion,
|
tl[i].PeerID == the.Peer.GetProfile().Onion,
|
||||||
tl[i].Timestamp.Format(constants.TIME_FORMAT),
|
tl[i].Timestamp.Format(constants.TIME_FORMAT),
|
||||||
tl[i].Received.Equal(time.Unix(0,0)) == false, // If the received timestamp is epoch, we have not yet received this message through an active server
|
tl[i].Received.Equal(time.Unix(0,0)) == false, // If the received timestamp is epoch, we have not yet received this message through an active server
|
||||||
|
false,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -222,6 +225,9 @@ func (this *GrandCentralDispatcher) loadMessagesPaneHelper(handle string) {
|
||||||
if messages[i].FromMe {
|
if messages[i].FromMe {
|
||||||
from = "me"
|
from = "me"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Infof("Loading Message from Store %v", messages[i])
|
||||||
|
|
||||||
this.AppendMessage(
|
this.AppendMessage(
|
||||||
messages[i].Handle,
|
messages[i].Handle,
|
||||||
from,
|
from,
|
||||||
|
@ -231,10 +237,11 @@ func (this *GrandCentralDispatcher) loadMessagesPaneHelper(handle string) {
|
||||||
messages[i].MessageID,
|
messages[i].MessageID,
|
||||||
messages[i].FromMe,
|
messages[i].FromMe,
|
||||||
messages[i].Timestamp.Format(constants.TIME_FORMAT),
|
messages[i].Timestamp.Format(constants.TIME_FORMAT),
|
||||||
true,
|
false,
|
||||||
|
messages[i].Error,
|
||||||
)
|
)
|
||||||
for _,id := range the.AcknowledgementIDs[messages[i].Handle] {
|
for _,id := range the.AcknowledgementIDs[messages[i].Handle] {
|
||||||
if id.ID == messages[i].MessageID && id.Ack{
|
if id.ID == messages[i].MessageID && id.Ack && !id.Error {
|
||||||
this.Acknowledged(id.ID)
|
this.Acknowledged(id.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,16 +9,18 @@ import (
|
||||||
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
"git.openprivacy.ca/openprivacy/libricochet-go/log"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type InterfaceState struct {
|
type InterfaceState struct {
|
||||||
parentGcd *GrandCentralDispatcher
|
parentGcd *GrandCentralDispatcher
|
||||||
contacts map[string]*gobjects.Contact
|
contacts map[string]*gobjects.Contact
|
||||||
messages map[string][]*gobjects.Message
|
messages sync.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUIState(gcd *GrandCentralDispatcher) (uis InterfaceState) {
|
func NewUIState(gcd *GrandCentralDispatcher) (uis InterfaceState) {
|
||||||
uis = InterfaceState{gcd, make(map[string]*gobjects.Contact), make(map[string][]*gobjects.Message)}
|
uis = InterfaceState{gcd, make(map[string]*gobjects.Contact), sync.Map{}}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,16 +88,33 @@ func (this *InterfaceState) GetContact(handle string) *gobjects.Contact {
|
||||||
return this.contacts[handle]
|
return this.contacts[handle]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *InterfaceState) AddGroupError(signature string, err string) {
|
func (this *InterfaceState) AddSendMessageError(peer string, signature string, err string) {
|
||||||
|
acklist := the.AcknowledgementIDs[peer]
|
||||||
|
for _,ack := range acklist {
|
||||||
|
if ack.ID == signature {
|
||||||
|
ack.Error = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
messages,_ := this.messages.Load(peer)
|
||||||
|
messageList,_ := messages.([]*gobjects.Message)
|
||||||
|
|
||||||
|
for _,message := range messageList {
|
||||||
|
if message.MessageID == signature {
|
||||||
|
message.Error = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Debugf("Received Error Sending Message: %v %v", signature, err)
|
||||||
|
// FIXME: Sometimes, for the first Peer message we send our error beats our message to the UI
|
||||||
|
time.Sleep(time.Second*1)
|
||||||
this.parentGcd.GroupSendError(signature, err)
|
this.parentGcd.GroupSendError(signature, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *InterfaceState) AddMessage(m *gobjects.Message) {
|
func (this *InterfaceState) AddMessage(m *gobjects.Message) {
|
||||||
this.GetContact(m.From)
|
this.GetContact(m.From)
|
||||||
|
|
||||||
_, found := this.messages[m.Handle]
|
_,found := this.messages.Load(m.Handle)
|
||||||
if !found {
|
if !found {
|
||||||
this.messages[m.Handle] = make([]*gobjects.Message, 0)
|
this.messages.Store(m.Handle, make([]*gobjects.Message, 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ack message sent to group
|
// Ack message sent to group
|
||||||
|
@ -107,17 +126,22 @@ func (this *InterfaceState) AddMessage(m *gobjects.Message) {
|
||||||
// If an ack, swallow the message and ack from the list.
|
// If an ack, swallow the message and ack from the list.
|
||||||
acklist := the.AcknowledgementIDs[m.From]
|
acklist := the.AcknowledgementIDs[m.From]
|
||||||
for _,ack := range acklist {
|
for _,ack := range acklist {
|
||||||
|
if ack.Error == false {
|
||||||
ack.Ack = true
|
ack.Ack = true
|
||||||
this.parentGcd.Acknowledged(ack.ID)
|
this.parentGcd.Acknowledged(ack.ID)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
this.messages[m.Handle] = append(this.messages[m.Handle], m)
|
|
||||||
|
messages,_ := this.messages.Load(m.Handle)
|
||||||
|
messageList,_ := messages.([]*gobjects.Message)
|
||||||
|
this.messages.Store(m.Handle, append(messageList, m))
|
||||||
|
|
||||||
// If we have this group loaded already
|
// If we have this group loaded already
|
||||||
if this.parentGcd.CurrentOpenConversation() == m.Handle {
|
if this.parentGcd.CurrentOpenConversation() == m.Handle {
|
||||||
// If the message is not from the user then add it, otherwise, just acknowledge.
|
// If the message is not from the user then add it, otherwise, just acknowledge.
|
||||||
if !m.FromMe || !m.Acknowledged {
|
if !m.FromMe || !m.Acknowledged {
|
||||||
this.parentGcd.AppendMessage(m.Handle, m.From, m.DisplayName, m.Message, m.Image, m.MessageID, m.FromMe, m.Timestamp.Format(constants.TIME_FORMAT), m.Acknowledged)
|
this.parentGcd.AppendMessage(m.Handle, m.From, m.DisplayName, m.Message, m.Image, m.MessageID, m.FromMe, m.Timestamp.Format(constants.TIME_FORMAT), m.Acknowledged, m.Error)
|
||||||
} else {
|
} else {
|
||||||
this.parentGcd.Acknowledged(m.MessageID)
|
this.parentGcd.Acknowledged(m.MessageID)
|
||||||
}
|
}
|
||||||
|
@ -133,11 +157,13 @@ func (this *InterfaceState) AddMessage(m *gobjects.Message) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *InterfaceState) GetMessages(handle string) []*gobjects.Message {
|
func (this *InterfaceState) GetMessages(handle string) []*gobjects.Message {
|
||||||
_, found := this.messages[handle]
|
_,found := this.messages.Load(handle)
|
||||||
if !found {
|
if !found {
|
||||||
this.messages[handle] = make([]*gobjects.Message, 0)
|
this.messages.Store(handle, make([]*gobjects.Message, 0))
|
||||||
}
|
}
|
||||||
return this.messages[handle]
|
messages,found := this.messages.Load(handle)
|
||||||
|
messageList,_ := messages.([]*gobjects.Message)
|
||||||
|
return messageList
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *InterfaceState) UpdateContact(handle string) {
|
func (this *InterfaceState) UpdateContact(handle string) {
|
||||||
|
|
|
@ -12,6 +12,7 @@ var CwtchDir string
|
||||||
type AckId struct {
|
type AckId struct {
|
||||||
ID string
|
ID string
|
||||||
Ack bool
|
Ack bool
|
||||||
|
Error bool
|
||||||
}
|
}
|
||||||
|
|
||||||
var AcknowledgementIDs map[string][]*AckId
|
var AcknowledgementIDs map[string][]*AckId
|
||||||
|
|
2
main.go
2
main.go
|
@ -111,7 +111,7 @@ func main() {
|
||||||
|
|
||||||
// these are long-lived pollers/listeners for incoming messages and status changes
|
// these are long-lived pollers/listeners for incoming messages and status changes
|
||||||
loadCwtchData(gcd, acn)
|
loadCwtchData(gcd, acn)
|
||||||
go characters.IncomingListener(gcd.UIState.AddMessage, gcd.UIState.AddGroupError)
|
go characters.IncomingListener(gcd.UIState.AddMessage, gcd.UIState.AddSendMessageError)
|
||||||
go characters.TorStatusPoller(gcd.TorStatus, acn)
|
go characters.TorStatusPoller(gcd.TorStatus, acn)
|
||||||
go characters.PresencePoller(gcd.UIState.GetContact, gcd.UIState.AddContact, gcd.UIState.UpdateContact)
|
go characters.PresencePoller(gcd.UIState.GetContact, gcd.UIState.AddContact, gcd.UIState.UpdateContact)
|
||||||
go characters.GroupPoller(gcd.UIState.GetContact, gcd.UIState.UpdateContact)
|
go characters.GroupPoller(gcd.UIState.GetContact, gcd.UIState.UpdateContact)
|
||||||
|
|
|
@ -59,7 +59,7 @@ ColumnLayout {
|
||||||
jsonModel4.clear()
|
jsonModel4.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
onAppendMessage: function(handle, from, displayName, message, image, mid, fromMe, ts) {
|
onAppendMessage: function(handle, from, displayName, message, image, mid, fromMe, ts, ack, error) {
|
||||||
var msg
|
var msg
|
||||||
try {
|
try {
|
||||||
msg = JSON.parse(message)
|
msg = JSON.parse(message)
|
||||||
|
|
|
@ -32,7 +32,7 @@ ColumnLayout {
|
||||||
txtMessage.text = ""
|
txtMessage.text = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
onAppendMessage: function(handle, from, displayName, message, image, mid, fromMe, ts, ackd) {
|
onAppendMessage: function(handle, from, displayName, message, image, mid, fromMe, ts, ackd, error) {
|
||||||
var msg
|
var msg
|
||||||
try {
|
try {
|
||||||
msg = JSON.parse(message)
|
msg = JSON.parse(message)
|
||||||
|
@ -52,6 +52,7 @@ ColumnLayout {
|
||||||
"_fromMe": fromMe,
|
"_fromMe": fromMe,
|
||||||
"_ts": ts,
|
"_ts": ts,
|
||||||
"_ackd": ackd,
|
"_ackd": ackd,
|
||||||
|
"_error": error == true ? "this message failed to send" : "",
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
@ -93,6 +94,7 @@ ColumnLayout {
|
||||||
fromMe: _fromMe
|
fromMe: _fromMe
|
||||||
timestamp: _ts
|
timestamp: _ts
|
||||||
ackd: _ackd
|
ackd: _ackd
|
||||||
|
error: _error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ ColumnLayout {
|
||||||
jsonModel4.clear()
|
jsonModel4.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
onAppendMessage: function(handle, from, displayName, message, image, mid, fromMe, ts) {
|
onAppendMessage: function(handle, from, displayName, message, image, mid, fromMe, ts, ack, error) {
|
||||||
var msg
|
var msg
|
||||||
try {
|
try {
|
||||||
msg = JSON.parse(message)
|
msg = JSON.parse(message)
|
||||||
|
|
|
@ -35,12 +35,10 @@ RowLayout {
|
||||||
}
|
}
|
||||||
|
|
||||||
onGroupSendError: function(mid, error) {
|
onGroupSendError: function(mid, error) {
|
||||||
console.log("Error " + mid + " " + messageID)
|
|
||||||
if (mid == messageID) {
|
if (mid == messageID) {
|
||||||
root.error = error
|
root.error = error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Reference in New Issue