Fix Contact Retry Failure to Restart
When toggling between connected and disconnected, the Contact Retry plugin could find itself in a state where the new event would never get requeued. Also: Make the unsigned nature of limit in GetMessage* Apis explicit.
This commit is contained in:
parent
be4230d16e
commit
d9ce7737cc
|
@ -302,8 +302,7 @@ func (cr *contactRetry) run() {
|
||||||
cr.authorizedPeers.Store(id, true)
|
cr.authorizedPeers.Store(id, true)
|
||||||
if c, ok := cr.connections.Load(id); ok {
|
if c, ok := cr.connections.Load(id); ok {
|
||||||
contact := c.(*contact)
|
contact := c.(*contact)
|
||||||
if contact.state == connections.DISCONNECTED && !contact.queued {
|
if contact.state == connections.DISCONNECTED {
|
||||||
|
|
||||||
// prioritize connections made in the last week
|
// prioritize connections made in the last week
|
||||||
if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
|
if time.Since(contact.lastSeen).Hours() < PriorityQueueTimeSinceQualifierHours {
|
||||||
cr.priorityQueue.insert(contact)
|
cr.priorityQueue.insert(contact)
|
||||||
|
@ -461,10 +460,10 @@ func (cr *contactRetry) addConnection(id string, state connections.ConnectionSta
|
||||||
cr.connections.Store(id, p)
|
cr.connections.Store(id, p)
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
// we have rerequested this connnection. Force set the queued parameter to true.
|
// we have rerequested this connnection, probably via an explicit ask, update it's state
|
||||||
p, _ := cr.connections.Load(id)
|
if c, ok := cr.connections.Load(id); ok {
|
||||||
if !p.(*contact).queued {
|
contact := c.(*contact)
|
||||||
p.(*contact).queued = true
|
contact.state = state
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,7 +94,7 @@ func (cp *cwtchPeer) EnhancedImportBundle(importString string) string {
|
||||||
return cp.ImportBundle(importString).Error()
|
return cp.ImportBundle(importString).Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *cwtchPeer) EnhancedGetMessages(conversation int, index int, count int) string {
|
func (cp *cwtchPeer) EnhancedGetMessages(conversation int, index int, count uint) string {
|
||||||
var emessages = make([]EnhancedMessage, count)
|
var emessages = make([]EnhancedMessage, count)
|
||||||
|
|
||||||
messages, err := cp.GetMostRecentMessages(conversation, 0, index, count)
|
messages, err := cp.GetMostRecentMessages(conversation, 0, index, count)
|
||||||
|
@ -888,7 +888,7 @@ func (cp *cwtchPeer) GetChannelMessageCount(conversation int, channel int) (int,
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMostRecentMessages returns a selection of messages, ordered by most recently inserted
|
// GetMostRecentMessages returns a selection of messages, ordered by most recently inserted
|
||||||
func (cp *cwtchPeer) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
|
func (cp *cwtchPeer) GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error) {
|
||||||
return cp.storage.GetMostRecentMessages(conversation, channel, offset, limit)
|
return cp.storage.GetMostRecentMessages(conversation, channel, offset, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -781,7 +781,7 @@ func (cps *CwtchProfileStorage) SearchMessages(conversation int, channel int, pa
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMostRecentMessages returns the most recent messages in a channel up to a given limit at a given offset
|
// GetMostRecentMessages returns the most recent messages in a channel up to a given limit at a given offset
|
||||||
func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error) {
|
func (cps *CwtchProfileStorage) GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error) {
|
||||||
channelID := ChannelID{Conversation: conversation, Channel: channel}
|
channelID := ChannelID{Conversation: conversation, Channel: channel}
|
||||||
|
|
||||||
cps.mutex.Lock()
|
cps.mutex.Lock()
|
||||||
|
|
|
@ -131,7 +131,7 @@ type CwtchPeer interface {
|
||||||
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
|
GetChannelMessage(conversation int, channel int, id int) (string, model.Attributes, error)
|
||||||
GetChannelMessageCount(conversation int, channel int) (int, error)
|
GetChannelMessageCount(conversation int, channel int) (int, error)
|
||||||
GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error)
|
GetChannelMessageByContentHash(conversation int, channel int, contenthash string) (int, error)
|
||||||
GetMostRecentMessages(conversation int, channel int, offset int, limit int) ([]model.ConversationMessage, error)
|
GetMostRecentMessages(conversation int, channel int, offset int, limit uint) ([]model.ConversationMessage, error)
|
||||||
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
|
UpdateMessageAttribute(conversation int, channel int, id int, key string, value string) error
|
||||||
SearchConversations(pattern string) string
|
SearchConversations(pattern string) string
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ type CwtchPeer interface {
|
||||||
EnhancedGetMessageByContentHash(conversation int, hash string) string
|
EnhancedGetMessageByContentHash(conversation int, hash string) string
|
||||||
|
|
||||||
// EnhancedGetMessages returns a set of json-encoded enhanced messages, suitable for rendering in a UI
|
// EnhancedGetMessages returns a set of json-encoded enhanced messages, suitable for rendering in a UI
|
||||||
EnhancedGetMessages(conversation int, index int, count int) string
|
EnhancedGetMessages(conversation int, index int, count uint) string
|
||||||
|
|
||||||
// Server Token APIS
|
// Server Token APIS
|
||||||
// TODO move these to feature protected interfaces
|
// TODO move these to feature protected interfaces
|
||||||
|
|
Loading…
Reference in New Issue