Add JSON Annotations
continuous-integration/drone/pr Build is passing Details

Also fix race condition in app map
This commit is contained in:
Sarah Jamie Lewis 2024-06-10 10:34:11 -07:00
parent fdec3302af
commit e14044e404
Signed by: sarah
GPG Key ID: F27FD21A270837EF
6 changed files with 50 additions and 31 deletions

View File

@ -396,12 +396,14 @@ func (app *application) installProfile(profile peer.CwtchPeer) bool {
func (app *application) ActivatePeerEngine(onion string) {
profile := app.GetPeer(onion)
if profile != nil {
app.appmutex.Lock()
if _, exists := app.engines[onion]; !exists {
eventBus, exists := app.eventBuses[profile.GetOnion()]
if !exists {
// todo handle this case?
log.Errorf("cannot activate peer engine without an event bus")
app.appmutex.Unlock()
return
}
@ -410,12 +412,13 @@ func (app *application) ActivatePeerEngine(onion string) {
log.Debugf("restartFlow: Creating a New Protocol Engine...")
app.engines[profile.GetOnion()] = engine
eventBus.Publish(event.NewEventList(event.ProtocolEngineCreated))
app.QueryACNStatus()
} else {
log.Errorf("corrupted profile detected for %v", onion)
}
}
app.appmutex.Unlock()
}
app.QueryACNStatus()
}
// ConfigureConnections autostarts the given kinds of connections.
@ -423,7 +426,9 @@ func (app *application) ConfigureConnections(onion string, listen bool, peers bo
profile := app.GetPeer(onion)
if profile != nil {
app.appmutex.Lock()
profileBus, exists := app.eventBuses[profile.GetOnion()]
app.appmutex.Unlock()
if exists {
// if we are making a decision to ignore
if !peers || !servers {

View File

@ -284,6 +284,7 @@ const (
Status = Field("Status")
EventID = Field("EventID")
EventContext = Field("EventContext")
Channel = Field("Channel")
Index = Field("Index")
RowIndex = Field("RowIndex")
ContentHash = Field("ContentHash")

View File

@ -63,12 +63,12 @@ type SyncRequestMessage struct {
// This file contains code for the Hybrid Group / Managed Group types..
type HybridGroupMessage struct {
Author string // the authors cwtch address
MemberGroupID uint32
MemberMessageID uint32
MessageBody string
Sent uint64 // milliseconds since epoch
Signature []byte // of json-encoded content (including empty sig)
Author string `json:"a"` // the authors cwtch address
MemberGroupID uint32 `json:"g"`
MemberMessageID uint32 `json:"m"`
MessageBody string `json:"b"`
Sent uint64 `json:"t"` // milliseconds since epoch
Signature []byte `json:"s"` // of json-encoded content (including empty sig)
}
// AuthenticateMessage returns true if the Author of the message produced the Signature over the message

View File

@ -7,8 +7,8 @@ package hybrid
import (
"encoding/json"
"fmt"
"strconv"
"time"
// "strconv"
// "time"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
@ -131,14 +131,14 @@ func (f *GroupManagerFunctionality) handleNewMessageEvent(profile peer.CwtchPeer
return
} else {
mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on...
newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID))
// Set the attributes of this message...
attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr,
constants.AttrAuthor: hgm.Author,
constants.AttrAck: event.True,
constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)}
profile.InternalInsertMessage(conversation.ID, 0, hgm.Author, hgm.MessageBody, attr, hgm.Signature)
// mgidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberGroupID)) // we need both MemberGroupId and MemberMessageId for attestation later on...
// newmmidstr := strconv.Itoa(int(nme.HybridGroupMessage.MemberMessageID))
// // Set the attributes of this message...
// attr := model.Attributes{MemberGroupIDKey: mgidstr, MemberMessageIDKey: newmmidstr,
// constants.AttrAuthor: hgm.Author,
// constants.AttrAck: event.True,
// constants.AttrSentTimestamp: time.UnixMilli(int64(hgm.Sent)).Format(time.RFC3339Nano)}
// profile.InternalInsertMessage(conversation.ID, constants.CHANNEL_MANAGER, hgm.Author, hgm.MessageBody, attr, hgm.Signature)
// forward the message to everyone who the server has added as a contact
// and who are represented in the ACL...

View File

@ -1873,6 +1873,21 @@ func (cp *cwtchPeer) attemptInsertOrAcknowledgeLegacyGroupConversation(conversat
return err
}
// finds a message by a signature by searching across all possible channels returns a channel and a message id
func (cp *cwtchPeer) findChannelMessageBySignature(ci *model.Conversation, signature string) (int, int, error) {
id, err := cp.GetChannelMessageBySignature(ci.ID, constants.CHANNEL_CHAT, signature)
if err == nil {
return constants.CHANNEL_CHAT, id, nil
}
if ci.HasChannel(constants.CHANNEL_MANAGER) {
id, err = cp.GetChannelMessageBySignature(ci.ID, constants.CHANNEL_MANAGER, signature)
if err == nil {
return constants.CHANNEL_CHAT, id, nil
}
}
return -1, -1, err
}
// attemptAcknowledgeP2PConversation is a convenience method that looks up the conversation
// by the given handle and attempts to mark the message as acknowledged. returns error on failure
// to either find the contact or the associated message
@ -1880,21 +1895,19 @@ func (cp *cwtchPeer) attemptAcknowledgeP2PConversation(handle string, signature
ci, err := cp.FetchConversationInfo(handle)
// We should *never* received a peer acknowledgement for a conversation that doesn't exist...
if ci != nil && err == nil {
// for p2p messages the randomly generated event ID is the "signature"
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature)
chid, mid, err := cp.findChannelMessageBySignature(ci, signature)
if err == nil {
_, attributes, err := cp.GetChannelMessage(ci.ID, 0, id)
_, attributes, err := cp.GetChannelMessage(ci.ID, chid, mid)
if err == nil {
cp.mutex.Lock()
attributes[constants.AttrAck] = constants.True
cp.mutex.Unlock()
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attributes)
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: handle, event.Index: strconv.Itoa(id)}))
cp.storage.UpdateMessageAttributes(ci.ID, chid, mid, attributes)
cp.eventBus.Publish(event.NewEvent(event.IndexedAcknowledgement, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.RemotePeer: ci.Handle, event.Channel: strconv.Itoa(chid), event.Index: strconv.Itoa(mid)}))
return nil
}
return err
}
return err
return fmt.Errorf("no such signature error: %x", signature)
}
return err
}
@ -1908,16 +1921,16 @@ func (cp *cwtchPeer) attemptErrorConversationMessage(handle string, signature st
// We should *never* received an error for a conversation that doesn't exist...
if ci != nil && err == nil {
// "signature" here is event ID for peer messages...
id, err := cp.GetChannelMessageBySignature(ci.ID, 0, signature)
chid, mid, err := cp.findChannelMessageBySignature(ci, signature)
if err == nil {
_, attributes, err := cp.GetChannelMessage(ci.ID, 0, id)
_, attributes, err := cp.GetChannelMessage(ci.ID, chid, mid)
if err == nil {
cp.mutex.Lock()
attributes[constants.AttrErr] = constants.True
cp.storage.UpdateMessageAttributes(ci.ID, 0, id, attributes)
cp.storage.UpdateMessageAttributes(ci.ID, chid, mid, attributes)
cp.mutex.Unlock()
// Send a generic indexed failure...
cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.Handle: handle, event.Error: error, event.Index: strconv.Itoa(id)}))
cp.eventBus.Publish(event.NewEvent(event.IndexedFailure, map[event.Field]string{event.ConversationID: strconv.Itoa(ci.ID), event.Handle: handle, event.Error: error, event.Channel: strconv.Itoa(chid), event.Index: strconv.Itoa(mid)}))
return nil
}
return err

View File

@ -6,7 +6,7 @@ echo ""
echo ""
echo "Running staticcheck..."
staticcheck ./...
CGO_CFLAGS="-D_LARGEFILE64_SOURCE" staticcheck ./...
# In the future we should remove include-pkgs. However, there are a few false positives in the overall go stdlib that make this
# too noisy right now, specifically assigning nil to initialize slices (safe), and using go internal context channels assigned
@ -14,10 +14,10 @@ staticcheck ./...
# We also have one file infinite_channel.go written in a way that static analysis cannot reason about easily. So it is explictly
# ignored.
echo "Running nilaway..."
nilaway -include-pkgs="cwtch.im/cwtch,cwtch.im/tapir,git.openprivacy.ca/openprivacy/connectivity" -exclude-file-docstrings="nolint:nilaway" ./...
CGO_CFLAGS="-D_LARGEFILE64_SOURCE" nilaway -include-pkgs="cwtch.im/cwtch,cwtch.im/tapir,git.openprivacy.ca/openprivacy/connectivity" -exclude-file-docstrings="nolint:nilaway" ./...
echo "Time to format"
gofmt -l -s -w .
CGO_CFLAGS="-D_LARGEFILE64_SOURCE" gofmt -l -s -w .
# ineffassign (https://github.com/gordonklaus/ineffassign)
# echo "Checking for ineffectual assignment of errors (unchecked errors...)"