2018-03-11 18:49:10 +00:00
package peer
2018-03-09 20:44:13 +00:00
import (
2021-11-11 00:41:43 +00:00
"crypto/rand"
2021-10-15 19:38:22 +00:00
"cwtch.im/cwtch/model/constants"
2021-11-16 23:06:30 +00:00
"cwtch.im/cwtch/protocol/groups"
2023-03-06 21:06:15 +00:00
"cwtch.im/cwtch/settings"
2018-06-09 07:49:18 +00:00
"encoding/base64"
2021-12-06 20:20:38 +00:00
"encoding/hex"
2019-02-03 01:18:33 +00:00
"encoding/json"
2018-03-30 21:16:51 +00:00
"errors"
2021-09-30 00:57:13 +00:00
"fmt"
2021-11-09 23:47:33 +00:00
"git.openprivacy.ca/cwtch.im/tapir/primitives"
2022-10-11 17:58:10 +00:00
"git.openprivacy.ca/cwtch.im/tapir/primitives/privacypass"
2021-11-09 23:47:33 +00:00
"git.openprivacy.ca/openprivacy/connectivity"
2021-11-11 00:41:43 +00:00
"git.openprivacy.ca/openprivacy/connectivity/tor"
"golang.org/x/crypto/ed25519"
2022-09-06 19:41:52 +00:00
"os"
2021-12-17 21:58:54 +00:00
path "path/filepath"
2022-09-26 01:28:04 +00:00
"sort"
2020-03-07 07:41:00 +00:00
"strconv"
2018-06-09 07:49:18 +00:00
"strings"
2018-03-09 20:44:13 +00:00
"sync"
2019-01-22 19:11:25 +00:00
"time"
2021-09-30 00:57:13 +00:00
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/protocol/connections"
"git.openprivacy.ca/openprivacy/log"
2018-03-09 20:44:13 +00:00
)
2021-09-29 20:52:48 +00:00
const lastKnownSignature = "LastKnowSignature"
2021-11-18 23:43:58 +00:00
const lastReceivedSignature = "LastReceivedSignature"
2021-05-07 23:16:22 +00:00
2019-10-18 23:56:10 +00:00
var autoHandleableEvents = map [ event . Type ] bool { event . EncryptedGroupMessage : true , event . PeerStateChange : true ,
2021-12-06 20:20:38 +00:00
event . ServerStateChange : true , event . NewGroupInvite : true , event . NewMessageFromPeerEngine : true ,
2021-05-28 08:45:59 +00:00
event . PeerAcknowledgement : true , event . PeerError : true , event . SendMessageToPeerError : true , event . SendMessageToGroupError : true ,
2023-01-05 21:52:43 +00:00
event . NewGetValMessageFromPeer : true , event . NewRetValMessageFromPeer : true , event . ProtocolEngineStopped : true , event . RetryServerRequest : true , event . TriggerAntispamCheck : true }
2019-09-19 23:14:35 +00:00
2020-10-22 23:21:33 +00:00
// DefaultEventsToHandle specifies which events will be subscribed to
2022-09-06 19:41:52 +00:00
//
// when a peer has its Init() function called
2020-10-03 22:13:06 +00:00
var DefaultEventsToHandle = [ ] event . Type {
event . EncryptedGroupMessage ,
2021-12-06 20:20:38 +00:00
event . NewMessageFromPeerEngine ,
2020-10-03 22:13:06 +00:00
event . PeerAcknowledgement ,
event . NewGroupInvite ,
event . PeerError ,
event . SendMessageToGroupError ,
event . NewGetValMessageFromPeer ,
2021-04-13 22:12:12 +00:00
event . ProtocolEngineStopped ,
2021-04-28 19:47:55 +00:00
event . RetryServerRequest ,
2022-03-03 23:58:41 +00:00
event . PeerStateChange ,
event . ServerStateChange ,
event . SendMessageToPeerError ,
event . NewRetValMessageFromPeer ,
2022-09-10 17:15:32 +00:00
event . TriggerAntispamCheck ,
2020-10-03 22:13:06 +00:00
}
2018-10-06 03:50:55 +00:00
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
2018-06-19 22:28:44 +00:00
type cwtchPeer struct {
2021-04-13 22:12:12 +00:00
mutex sync . Mutex
shutdown bool
listenStatus bool
2021-11-09 23:47:33 +00:00
storage * CwtchProfileStorage
state map [ string ] connections . ConnectionState
2019-01-04 21:44:21 +00:00
2019-08-14 20:56:45 +00:00
queue event . Queue
2019-06-05 20:40:55 +00:00
eventBus event . Manager
2023-01-05 21:52:43 +00:00
2023-01-25 20:32:26 +00:00
extensions [ ] ProfileHook
extensionLock sync . Mutex // we don't want to hold up all of cwtch for managing thread safe access to extensions
experiments model . Experiments
experimentsLock sync . Mutex
}
2023-03-02 18:51:44 +00:00
// EnhancedSendInviteMessage encapsulates attempting to send an invite to a conversation and looking up the enhanced message
// useful for UIs.
func ( cp * cwtchPeer ) EnhancedSendInviteMessage ( conversation int , inviteConversationID int ) string {
mid , err := cp . SendInviteToConversation ( conversation , inviteConversationID )
if err == nil {
return cp . EnhancedGetMessageById ( conversation , mid )
}
return ""
}
2023-02-27 21:31:32 +00:00
func ( cp * cwtchPeer ) EnhancedImportBundle ( importString string ) string {
return cp . ImportBundle ( importString ) . Error ( )
}
2023-02-21 23:55:14 +00:00
func ( cp * cwtchPeer ) EnhancedGetMessages ( conversation int , index int , count int ) string {
var emessages [ ] EnhancedMessage = make ( [ ] EnhancedMessage , count )
messages , err := cp . GetMostRecentMessages ( conversation , 0 , index , count )
if err == nil {
for i , message := range messages {
time , _ := time . Parse ( time . RFC3339Nano , message . Attr [ constants . AttrSentTimestamp ] )
emessages [ i ] . Message = model . Message {
Message : message . Body ,
Acknowledged : message . Attr [ constants . AttrAck ] == constants . True ,
Error : message . Attr [ constants . AttrErr ] ,
PeerID : message . Attr [ constants . AttrAuthor ] ,
Timestamp : time ,
}
emessages [ i ] . ID = message . ID
emessages [ i ] . Attributes = message . Attr
emessages [ i ] . ContentHash = model . CalculateContentHash ( message . Attr [ constants . AttrAuthor ] , message . Body )
}
}
bytes , _ := json . Marshal ( emessages )
return string ( bytes )
}
func ( cp * cwtchPeer ) EnhancedGetMessageById ( conversation int , messageID int ) string {
var message EnhancedMessage
dbmessage , attr , err := cp . GetChannelMessage ( conversation , 0 , messageID )
if err == nil {
time , _ := time . Parse ( time . RFC3339Nano , attr [ constants . AttrSentTimestamp ] )
message . Message = model . Message {
Message : dbmessage ,
Acknowledged : attr [ constants . AttrAck ] == constants . True ,
Error : attr [ constants . AttrErr ] ,
PeerID : attr [ constants . AttrAuthor ] ,
Timestamp : time ,
}
message . ID = messageID
message . Attributes = attr
message . ContentHash = model . CalculateContentHash ( attr [ constants . AttrAuthor ] , dbmessage )
}
bytes , _ := json . Marshal ( message )
return string ( bytes )
}
func ( cp * cwtchPeer ) EnhancedGetMessageByContentHash ( conversation int , contentHash string ) string {
var message EnhancedMessage
offset , err := cp . GetChannelMessageByContentHash ( conversation , 0 , contentHash )
if err == nil {
messages , err := cp . GetMostRecentMessages ( conversation , 0 , offset , 1 )
if err == nil {
time , _ := time . Parse ( time . RFC3339Nano , messages [ 0 ] . Attr [ constants . AttrSentTimestamp ] )
message . Message = model . Message {
Message : messages [ 0 ] . Body ,
Acknowledged : messages [ 0 ] . Attr [ constants . AttrAck ] == constants . True ,
Error : messages [ 0 ] . Attr [ constants . AttrErr ] ,
PeerID : messages [ 0 ] . Attr [ constants . AttrAuthor ] ,
Timestamp : time ,
}
message . ID = messages [ 0 ] . ID
message . Attributes = messages [ 0 ] . Attr
message . LocalIndex = offset
message . ContentHash = contentHash
} else {
log . Errorf ( "error fetching local index {} " , err )
}
}
bytes , _ := json . Marshal ( message )
return string ( bytes )
}
func ( cp * cwtchPeer ) EnhancedSendMessage ( conversation int , message string ) string {
mid , err := cp . SendMessage ( conversation , message )
if err == nil {
return cp . EnhancedGetMessageById ( conversation , mid )
}
return ""
}
func ( cp * cwtchPeer ) ArchiveConversation ( conversationID int ) {
cp . SetConversationAttribute ( conversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Archived ) ) , constants . True )
}
2023-01-25 20:32:26 +00:00
// IsFeatureEnabled returns true if the functionality defined by featureName has been enabled by the application, false otherwise.
// this function is intended to be used by ProfileHooks to determine if they should execute experimental functionality.
func ( cp * cwtchPeer ) IsFeatureEnabled ( featureName string ) bool {
cp . experimentsLock . Lock ( )
defer cp . experimentsLock . Unlock ( )
return cp . experiments . IsEnabled ( featureName )
}
// UpdateExperiments notifies a Cwtch profile of a change in the nature of global experiments. The Cwtch Profile uses
// this information to update registered extensions.
func ( cp * cwtchPeer ) UpdateExperiments ( enabled bool , experiments map [ string ] bool ) {
cp . experimentsLock . Lock ( )
defer cp . experimentsLock . Unlock ( )
cp . experiments = model . InitExperiments ( enabled , experiments )
2023-01-05 21:52:43 +00:00
}
2023-03-06 21:06:15 +00:00
// NotifySettingsUpdate notifies a Cwtch profile of a change in the nature of global experiments. The Cwtch Profile uses
// this information to update registered extensions.
func ( cp * cwtchPeer ) NotifySettingsUpdate ( settings settings . GlobalSettings ) {
2023-03-13 19:46:15 +00:00
log . Debugf ( "Cwtch Profile Settings Update: %v" , settings )
2023-03-06 21:06:15 +00:00
cp . extensionLock . Lock ( )
defer cp . extensionLock . Unlock ( )
for _ , extension := range cp . extensions {
extension . extension . NotifySettingsUpdate ( settings )
}
}
2023-01-05 21:52:43 +00:00
func ( cp * cwtchPeer ) PublishEvent ( resp event . Event ) {
2023-01-25 20:32:26 +00:00
log . Debugf ( "Publishing Event: %v %v" , resp . EventType , resp . Data )
2023-01-05 21:52:43 +00:00
cp . eventBus . Publish ( resp )
}
func ( cp * cwtchPeer ) RegisterHook ( extension ProfileHooks ) {
cp . extensionLock . Lock ( )
defer cp . extensionLock . Unlock ( )
// Register Requested Events
2023-01-25 20:34:58 +00:00
for _ , event := range extension . EventsToRegister ( ) {
2023-01-05 21:52:43 +00:00
cp . eventBus . Subscribe ( event , cp . queue )
}
cp . extensions = append ( cp . extensions , ConstructHook ( extension ) )
2018-03-09 20:44:13 +00:00
}
2022-11-30 15:58:37 +00:00
func ( cp * cwtchPeer ) StoreCachedTokens ( tokenServer string , tokens [ ] * privacypass . Token ) {
2022-10-11 17:58:10 +00:00
ci , err := cp . FetchConversationInfo ( tokenServer )
if ci != nil && err == nil {
// Overwrite any existing tokens..
tokenPath := attr . LocalScope . ConstructScopedZonedPath ( attr . ServerZone . ConstructZonedPath ( "tokens" ) )
data , _ := json . Marshal ( tokens )
log . Debugf ( "storing cached tokens for %v" , tokenServer )
cp . SetConversationAttribute ( ci . ID , tokenPath , string ( data ) )
}
}
2023-02-21 23:55:14 +00:00
func ( cp * cwtchPeer ) ExportProfile ( file string ) error {
2022-03-08 21:45:26 +00:00
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
return cp . storage . Export ( file )
}
2021-11-19 19:49:04 +00:00
func ( cp * cwtchPeer ) Delete ( ) {
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
cp . storage . Delete ( )
}
2022-10-25 20:59:05 +00:00
// CheckPassword returns true if the given password can be used to derive the key that encrypts the underlying
// cwtch storage database. Returns false otherwise.
2021-11-19 19:49:04 +00:00
func ( cp * cwtchPeer ) CheckPassword ( password string ) bool {
2022-10-25 20:59:05 +00:00
// this lock is not really needed, but because we directly access cp.storage.ProfileDirectory
// we keep it here.
2021-11-19 19:49:04 +00:00
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
2022-10-25 20:59:05 +00:00
// open *our* database with the given password (set createIfNotExists to false)
2021-11-19 19:49:04 +00:00
db , err := openEncryptedDatabase ( cp . storage . ProfileDirectory , password , false )
if db == nil || err != nil {
2022-10-25 20:59:05 +00:00
// this will only fail in the rare cases that ProfileDirectory has been moved or deleted
// it is actually a critical error, but far beyond the scope of Cwtch to deal with.
return false
}
// check that the storage object is valid (this will fail if the DB key is incorrect)
cps , err := NewCwtchProfileStorage ( db , cp . storage . ProfileDirectory )
if err != nil {
// this will error if any SQL queries fail, which will be the case if the profile is invalid.
2021-11-19 19:49:04 +00:00
return false
}
2022-10-25 20:59:05 +00:00
// we have a valid database, close the storage (but don't purge as we may be using those conversations...)
cps . Close ( false )
// success!
2021-11-19 19:49:04 +00:00
return true
}
2021-12-17 21:58:54 +00:00
func ( cp * cwtchPeer ) ChangePassword ( password string , newpassword string , newpasswordAgain string ) error {
2022-10-25 20:59:05 +00:00
if cp . CheckPassword ( password ) {
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
2021-12-17 21:58:54 +00:00
2022-10-25 20:59:05 +00:00
salt , err := os . ReadFile ( path . Join ( cp . storage . ProfileDirectory , saltFile ) )
if err != nil {
return err
}
2021-12-17 21:58:54 +00:00
2022-10-25 20:59:05 +00:00
// probably redundant but we like api safety
if newpassword == newpasswordAgain {
rekey := createKey ( newpassword , salt )
2023-01-25 20:32:26 +00:00
log . Debugf ( "rekeying database..." )
2022-10-25 20:59:05 +00:00
return cp . storage . Rekey ( rekey )
}
return errors . New ( constants . PasswordsDoNotMatchError )
2021-12-17 21:58:54 +00:00
}
2022-10-25 20:59:05 +00:00
return errors . New ( constants . InvalidPasswordError )
2021-12-17 21:58:54 +00:00
}
2021-11-09 23:47:33 +00:00
// GenerateProtocolEngine
// Status: New in 1.5
2021-11-11 00:41:43 +00:00
func ( cp * cwtchPeer ) GenerateProtocolEngine ( acn connectivity . ACN , bus event . Manager ) ( connections . Engine , error ) {
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
conversations , _ := cp . storage . FetchConversations ( )
2021-11-09 23:47:33 +00:00
2021-11-11 00:41:43 +00:00
authorizations := make ( map [ string ] model . Authorization )
for _ , conversation := range conversations {
if tor . IsValidHostname ( conversation . Handle ) {
if conversation . ACL [ conversation . Handle ] . Blocked {
authorizations [ conversation . Handle ] = model . AuthBlocked
} else {
authorizations [ conversation . Handle ] = model . AuthApproved
}
}
}
privateKey , err := cp . storage . LoadProfileKeyValue ( TypePrivateKey , "Ed25519PrivateKey" )
if err != nil {
log . Errorf ( "error loading private key from storage" )
return nil , err
}
publicKey , err := cp . storage . LoadProfileKeyValue ( TypePublicKey , "Ed25519PublicKey" )
if err != nil {
log . Errorf ( "error loading public key from storage" )
return nil , err
}
identity := primitives . InitializeIdentity ( "" , ( * ed25519 . PrivateKey ) ( & privateKey ) , ( * ed25519 . PublicKey ) ( & publicKey ) )
return connections . NewProtocolEngine ( identity , privateKey , acn , bus , authorizations ) , nil
2021-11-09 23:47:33 +00:00
}
// SendScopedZonedGetValToContact
// Status: No change in 1.5
2021-11-17 22:34:13 +00:00
func ( cp * cwtchPeer ) SendScopedZonedGetValToContact ( conversationID int , scope attr . Scope , zone attr . Zone , path string ) {
ci , err := cp . GetConversationInfo ( conversationID )
if err == nil {
ev := event . NewEventList ( event . SendGetValMessageToPeer , event . RemotePeer , ci . Handle , event . Scope , string ( scope ) , event . Path , string ( zone . ConstructZonedPath ( path ) ) )
cp . eventBus . Publish ( ev )
} else {
log . Errorf ( "Error sending scoped zone to contact %v %v" , conversationID , err )
}
2021-10-07 22:40:25 +00:00
}
2021-11-09 23:47:33 +00:00
// GetScopedZonedAttribute
// Status: Ready for 1.5
2021-10-07 22:40:25 +00:00
func ( cp * cwtchPeer ) GetScopedZonedAttribute ( scope attr . Scope , zone attr . Zone , key string ) ( string , bool ) {
2022-08-26 20:54:48 +00:00
2021-10-08 17:59:32 +00:00
scopedZonedKey := scope . ConstructScopedZonedPath ( zone . ConstructZonedPath ( key ) )
2021-10-07 22:40:25 +00:00
2021-11-09 23:47:33 +00:00
value , err := cp . storage . LoadProfileKeyValue ( TypeAttribute , scopedZonedKey . ToString ( ) )
2021-10-07 22:40:25 +00:00
2021-11-09 23:47:33 +00:00
if err != nil {
return "" , false
2021-10-07 22:40:25 +00:00
}
2021-11-09 23:47:33 +00:00
return string ( value ) , true
2021-10-07 22:40:25 +00:00
}
2022-07-05 22:31:44 +00:00
// GetScopedZonedAttributes finds all keys associated with the given scope and zone
2022-07-06 03:41:16 +00:00
func ( cp * cwtchPeer ) GetScopedZonedAttributeKeys ( scope attr . Scope , zone attr . Zone ) ( [ ] string , error ) {
2022-08-26 20:54:48 +00:00
2022-07-05 22:31:44 +00:00
scopedZonedKey := scope . ConstructScopedZonedPath ( zone . ConstructZonedPath ( "" ) )
keys , err := cp . storage . FindProfileKeysByPrefix ( TypeAttribute , scopedZonedKey . ToString ( ) )
if err != nil {
return nil , err
}
return keys , nil
}
2021-11-09 23:47:33 +00:00
// SetScopedZonedAttribute
2021-10-07 22:40:25 +00:00
func ( cp * cwtchPeer ) SetScopedZonedAttribute ( scope attr . Scope , zone attr . Zone , key string , value string ) {
2021-11-09 23:47:33 +00:00
2021-10-08 17:59:32 +00:00
scopedZonedKey := scope . ConstructScopedZonedPath ( zone . ConstructZonedPath ( key ) )
2021-11-25 22:34:47 +00:00
2021-11-09 23:47:33 +00:00
err := cp . storage . StoreProfileKeyValue ( TypeAttribute , scopedZonedKey . ToString ( ) , [ ] byte ( value ) )
if err != nil {
log . Errorf ( "error setting attribute %v" )
2021-11-25 22:34:47 +00:00
return
2021-11-09 23:47:33 +00:00
}
2021-11-25 22:34:47 +00:00
// We always want to publish profile level attributes to the ui
// This should be low traffic.
2021-11-25 23:39:08 +00:00
if cp . eventBus != nil {
cp . eventBus . Publish ( event . NewEvent ( event . UpdatedProfileAttribute , map [ event . Field ] string { event . Key : scopedZonedKey . ToString ( ) , event . Data : value } ) )
}
2021-10-07 22:40:25 +00:00
}
2021-09-30 00:57:13 +00:00
// SendMessage is a higher level that merges sending messages to contacts and group handles
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
// this function will error
2022-03-23 21:08:30 +00:00
func ( cp * cwtchPeer ) SendMessage ( conversation int , message string ) ( int , error ) {
2021-11-11 00:41:43 +00:00
// We assume we are sending to a Contact.
conversationInfo , err := cp . storage . GetConversation ( conversation )
2021-11-17 23:59:52 +00:00
// If the contact exists replace the event id with the index of this message in the contacts timeline...
2021-11-11 00:41:43 +00:00
// Otherwise assume we don't log the message in the timeline...
if conversationInfo != nil && err == nil {
2021-11-16 23:06:30 +00:00
if tor . IsValidHostname ( conversationInfo . Handle ) {
2021-11-17 22:34:13 +00:00
ev := event . NewEvent ( event . SendMessageToPeer , map [ event . Field ] string { event . ConversationID : strconv . Itoa ( conversationInfo . ID ) , event . RemotePeer : conversationInfo . Handle , event . Data : message } )
2021-11-16 23:06:30 +00:00
onion , _ := cp . storage . LoadProfileKeyValue ( TypeAttribute , attr . PublicScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Onion ) ) . ToString ( ) )
// For p2p messages we store the event id of the message as the "signature" we can then look this up in the database later for acks
2022-03-23 21:08:30 +00:00
id , err := cp . storage . InsertMessage ( conversationInfo . ID , 0 , message , model . Attributes { constants . AttrAuthor : string ( onion ) , constants . AttrAck : event . False , constants . AttrSentTimestamp : time . Now ( ) . Format ( time . RFC3339Nano ) } , ev . EventID , model . CalculateContentHash ( string ( onion ) , message ) )
2021-11-16 23:06:30 +00:00
if err != nil {
2022-03-23 21:08:30 +00:00
return - 1 , err
2021-11-16 23:06:30 +00:00
}
cp . eventBus . Publish ( ev )
2022-03-23 21:08:30 +00:00
return id , nil
2021-11-16 23:06:30 +00:00
} else {
group , err := cp . constructGroupFromConversation ( conversationInfo )
if err != nil {
log . Errorf ( "error constructing group" )
2022-03-23 21:08:30 +00:00
return - 1 , err
2021-11-16 23:06:30 +00:00
}
privateKey , err := cp . storage . LoadProfileKeyValue ( TypePrivateKey , "Ed25519PrivateKey" )
if err != nil {
log . Errorf ( "error loading private key from storage" )
2022-03-23 21:08:30 +00:00
return - 1 , err
2021-11-16 23:06:30 +00:00
}
publicKey , err := cp . storage . LoadProfileKeyValue ( TypePublicKey , "Ed25519PublicKey" )
if err != nil {
log . Errorf ( "error loading public key from storage" )
2022-03-23 21:08:30 +00:00
return - 1 , err
2021-11-16 23:06:30 +00:00
}
identity := primitives . InitializeIdentity ( "" , ( * ed25519 . PrivateKey ) ( & privateKey ) , ( * ed25519 . PublicKey ) ( & publicKey ) )
2021-12-06 20:20:38 +00:00
latestMessage , err := cp . storage . GetMostRecentMessages ( conversation , 0 , 0 , 1 )
signatureBytes , _ := hex . DecodeString ( group . GroupID )
signature := base64 . StdEncoding . EncodeToString ( signatureBytes )
if len ( latestMessage ) > 0 && err == nil {
signature = latestMessage [ 0 ] . Signature
}
ct , sig , dm , err := model . EncryptMessageToGroup ( message , identity , group , signature )
2021-11-16 23:06:30 +00:00
if err != nil {
2022-03-23 21:08:30 +00:00
return - 1 , err
2021-11-16 23:06:30 +00:00
}
// Insert the Group Message
2021-12-06 20:20:38 +00:00
log . Debugf ( "sending message to group: %v" , conversationInfo . ID )
2022-03-23 21:08:30 +00:00
id , err := cp . storage . InsertMessage ( conversationInfo . ID , 0 , dm . Text , model . Attributes { constants . AttrAck : constants . False , "PreviousSignature" : base64 . StdEncoding . EncodeToString ( dm . PreviousMessageSig ) , constants . AttrAuthor : dm . Onion , constants . AttrSentTimestamp : time . Now ( ) . Format ( time . RFC3339Nano ) } , base64 . StdEncoding . EncodeToString ( sig ) , model . CalculateContentHash ( dm . Onion , dm . Text ) )
2021-11-17 22:34:13 +00:00
if err == nil {
2022-11-23 16:01:22 +00:00
ev := event . NewEvent ( event . SendMessageToGroup , map [ event . Field ] string { event . ConversationID : strconv . Itoa ( conversationInfo . ID ) , event . GroupID : conversationInfo . Handle , event . GroupServer : group . GroupServer , event . Ciphertext : base64 . StdEncoding . EncodeToString ( ct ) , event . Signature : base64 . StdEncoding . EncodeToString ( sig ) } )
cp . eventBus . Publish ( ev )
return id , nil
2021-11-17 22:34:13 +00:00
}
2022-03-23 21:08:30 +00:00
return - 1 , err
2021-11-16 23:06:30 +00:00
}
2021-09-30 00:57:13 +00:00
}
2022-03-23 21:08:30 +00:00
return - 1 , fmt . Errorf ( "error sending message to conversation %v" , err )
2021-09-30 00:57:13 +00:00
}
2021-04-06 21:22:36 +00:00
// BlockUnknownConnections will auto disconnect from connections if authentication doesn't resolve a hostname
// known to peer.
2021-11-09 23:47:33 +00:00
// Status: Ready for 1.5
2021-04-06 21:22:36 +00:00
func ( cp * cwtchPeer ) BlockUnknownConnections ( ) {
cp . eventBus . Publish ( event . NewEvent ( event . BlockUnknownPeers , map [ event . Field ] string { } ) )
}
// AllowUnknownConnections will permit connections from unknown contacts.
2021-11-09 23:47:33 +00:00
// Status: Ready for 1.5
2021-04-06 21:22:36 +00:00
func ( cp * cwtchPeer ) AllowUnknownConnections ( ) {
cp . eventBus . Publish ( event . NewEvent ( event . AllowUnknownPeers , map [ event . Field ] string { } ) )
}
2021-11-09 23:47:33 +00:00
// NewProfileWithEncryptedStorage instantiates a new Cwtch Profile from encrypted storage
func NewProfileWithEncryptedStorage ( name string , cps * CwtchProfileStorage ) CwtchPeer {
cp := new ( cwtchPeer )
cp . shutdown = false
cp . storage = cps
2021-11-17 23:34:14 +00:00
cp . queue = event . NewQueue ( )
2021-11-09 23:47:33 +00:00
cp . state = make ( map [ string ] connections . ConnectionState )
2021-03-29 18:53:02 +00:00
2021-11-11 00:41:43 +00:00
pub , priv , _ := ed25519 . GenerateKey ( rand . Reader )
2021-11-09 23:47:33 +00:00
// Store all the Necessary Base Attributes In The Database
2021-11-11 00:41:43 +00:00
cp . SetScopedZonedAttribute ( attr . PublicScope , attr . ProfileZone , constants . Name , name )
cp . storage . StoreProfileKeyValue ( TypeAttribute , attr . PublicScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Onion ) ) . ToString ( ) , [ ] byte ( tor . GetTorV3Hostname ( pub ) ) )
cp . storage . StoreProfileKeyValue ( TypePrivateKey , "Ed25519PrivateKey" , priv )
cp . storage . StoreProfileKeyValue ( TypePublicKey , "Ed25519PublicKey" , pub )
2021-09-30 00:57:13 +00:00
2021-11-09 23:47:33 +00:00
return cp
2018-06-19 22:28:44 +00:00
}
2021-11-09 23:47:33 +00:00
// FromEncryptedStorage loads an existing Profile from Encrypted Storage
func FromEncryptedStorage ( cps * CwtchProfileStorage ) CwtchPeer {
2018-06-23 23:56:51 +00:00
cp := new ( cwtchPeer )
2018-11-10 22:14:12 +00:00
cp . shutdown = false
2021-11-09 23:47:33 +00:00
cp . storage = cps
2021-11-17 23:34:14 +00:00
cp . queue = event . NewQueue ( )
2021-11-09 23:47:33 +00:00
cp . state = make ( map [ string ] connections . ConnectionState )
2021-11-17 23:34:14 +00:00
// At some point we may want to populate caches here, for now we will assume hitting the
// database directly is tolerable
2021-11-25 23:39:08 +00:00
// Clean up anything that wasn't cleaned up on shutdown
// TODO ideally this shouldn't need to be done but the UI sometimes doesn't shut down cleanly
cp . storage . PurgeNonSavedMessages ( )
2018-10-06 03:50:55 +00:00
return cp
2018-10-15 00:59:53 +00:00
}
2021-11-17 23:34:14 +00:00
// ImportLegacyProfile generates a new peer from a profile.
2021-11-09 23:47:33 +00:00
// Deprecated - Only to be used for importing new profiles
2021-11-17 23:34:14 +00:00
func ImportLegacyProfile ( profile * model . Profile , cps * CwtchProfileStorage ) CwtchPeer {
2018-10-06 03:50:55 +00:00
cp := new ( cwtchPeer )
2019-09-19 23:14:35 +00:00
cp . shutdown = false
2021-11-09 23:47:33 +00:00
cp . storage = cps
2021-11-18 23:43:58 +00:00
cp . eventBus = event . NewEventManager ( )
2021-11-17 23:34:14 +00:00
cp . queue = event . NewQueue ( )
2021-11-18 23:43:58 +00:00
cp . state = make ( map [ string ] connections . ConnectionState )
2021-11-09 23:47:33 +00:00
// Store all the Necessary Base Attributes In The Database
2021-11-17 23:34:14 +00:00
cp . SetScopedZonedAttribute ( attr . PublicScope , attr . ProfileZone , constants . Name , profile . Name )
cp . storage . StoreProfileKeyValue ( TypeAttribute , attr . PublicScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Onion ) ) . ToString ( ) , [ ] byte ( tor . GetTorV3Hostname ( profile . Ed25519PublicKey ) ) )
2021-11-11 00:41:43 +00:00
cp . storage . StoreProfileKeyValue ( TypePrivateKey , "Ed25519PrivateKey" , profile . Ed25519PrivateKey )
cp . storage . StoreProfileKeyValue ( TypePublicKey , "Ed25519PublicKey" , profile . Ed25519PublicKey )
2021-11-09 23:47:33 +00:00
2021-11-17 23:34:14 +00:00
for k , v := range profile . Attributes {
parts := strings . SplitN ( k , "." , 2 )
2021-11-18 23:43:58 +00:00
if len ( parts ) == 2 {
2021-11-17 23:34:14 +00:00
scope := attr . IntoScope ( parts [ 0 ] )
zone , path := attr . ParseZone ( parts [ 1 ] )
cp . SetScopedZonedAttribute ( scope , zone , path , v )
} else {
2021-12-19 20:28:38 +00:00
log . Debugf ( "could not import legacy style attribute %v" , k )
2021-11-17 23:34:14 +00:00
}
}
for _ , contact := range profile . Contacts {
var conversationID int
var err error
if contact . Authorization == model . AuthApproved {
conversationID , err = cp . NewContactConversation ( contact . Onion , model . DefaultP2PAccessControl ( ) , true )
} else if contact . Authorization == model . AuthBlocked {
conversationID , err = cp . NewContactConversation ( contact . Onion , model . AccessControl { Blocked : true , Read : false , Append : false } , true )
} else {
conversationID , err = cp . NewContactConversation ( contact . Onion , model . DefaultP2PAccessControl ( ) , false )
}
if err == nil {
for key , value := range contact . Attributes {
switch key {
case event . SaveHistoryKey :
cp . SetConversationAttribute ( conversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( event . SaveHistoryKey ) ) , value )
case string ( model . BundleType ) :
cp . AddServer ( value )
case string ( model . KeyTypeTokenOnion ) :
//ignore
case string ( model . KeyTypeServerOnion ) :
// ignore
case string ( model . KeyTypePrivacyPass ) :
// ignore
2021-11-18 23:43:58 +00:00
case lastKnownSignature :
cp . SetConversationAttribute ( conversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( lastReceivedSignature ) ) , value )
2021-11-17 23:34:14 +00:00
default :
2021-12-19 20:28:38 +00:00
log . Debugf ( "could not import conversation attribute %v" , key )
2021-11-17 23:34:14 +00:00
}
}
2021-11-18 23:43:58 +00:00
if name , exists := contact . Attributes [ "local.name" ] ; exists {
cp . SetConversationAttribute ( conversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Name ) ) , name )
}
if name , exists := contact . Attributes [ "peer.name" ] ; exists {
cp . SetConversationAttribute ( conversationID , attr . PublicScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Name ) ) , name )
}
2021-11-17 23:34:14 +00:00
for _ , message := range contact . Timeline . GetMessages ( ) {
// By definition anything stored in legacy timelines in acknowledged
2021-11-18 23:43:58 +00:00
attr := model . Attributes { constants . AttrAuthor : message . PeerID , constants . AttrAck : event . True , constants . AttrSentTimestamp : message . Timestamp . Format ( time . RFC3339Nano ) }
2021-11-17 23:34:14 +00:00
if message . Flags & 0x01 == 0x01 {
attr [ constants . AttrRejected ] = event . True
}
if message . Flags & 0x02 == 0x02 {
attr [ constants . AttrDownloaded ] = event . True
}
cp . storage . InsertMessage ( conversationID , 0 , message . Message , attr , model . GenerateRandomID ( ) , model . CalculateContentHash ( message . PeerID , message . Message ) )
}
}
}
for _ , group := range profile . Groups {
2021-11-18 23:43:58 +00:00
group . GroupName = group . Attributes [ "local.name" ]
2021-11-17 23:34:14 +00:00
invite , err := group . Invite ( )
if err == nil {
// Automatically grab all the important fields...
conversationID , err := cp . ImportGroup ( invite )
if err == nil {
for _ , message := range group . Timeline . GetMessages ( ) {
// By definition anything stored in legacy timelines in acknowledged
2021-11-18 23:43:58 +00:00
attr := model . Attributes { constants . AttrAuthor : message . PeerID , constants . AttrAck : event . True , constants . AttrSentTimestamp : message . Timestamp . Format ( time . RFC3339Nano ) }
2021-11-17 23:34:14 +00:00
if message . Flags & 0x01 == 0x01 {
attr [ constants . AttrRejected ] = event . True
}
if message . Flags & 0x02 == 0x02 {
attr [ constants . AttrDownloaded ] = event . True
}
cp . storage . InsertMessage ( conversationID , 0 , message . Message , attr , base64 . StdEncoding . EncodeToString ( message . Signature ) , model . CalculateContentHash ( message . PeerID , message . Message ) )
}
}
}
}
2021-11-18 23:43:58 +00:00
cp . eventBus . Shutdown ( ) // We disregard all events from profile...
2018-10-06 03:50:55 +00:00
return cp
2018-03-09 20:44:13 +00:00
}
2018-10-06 03:50:55 +00:00
// Init instantiates a cwtchPeer
2021-11-09 23:47:33 +00:00
// Status: Ready for 1.5
2019-06-21 21:50:43 +00:00
func ( cp * cwtchPeer ) Init ( eventBus event . Manager ) {
2020-10-03 22:13:06 +00:00
cp . InitForEvents ( eventBus , DefaultEventsToHandle )
2021-10-15 19:38:22 +00:00
2021-10-26 21:48:26 +00:00
// At this point we can safely assume that public.profile.name exists
localName , _ := cp . GetScopedZonedAttribute ( attr . LocalScope , attr . ProfileZone , constants . Name )
publicName , _ := cp . GetScopedZonedAttribute ( attr . PublicScope , attr . ProfileZone , constants . Name )
if localName != publicName {
cp . SetScopedZonedAttribute ( attr . LocalScope , attr . ProfileZone , constants . Name , publicName )
}
2020-10-03 22:13:06 +00:00
}
2021-11-09 23:47:33 +00:00
// InitForEvents
// Status: Ready for 1.5
2020-10-03 22:13:06 +00:00
func ( cp * cwtchPeer ) InitForEvents ( eventBus event . Manager , toBeHandled [ ] event . Type ) {
2019-01-04 21:44:21 +00:00
go cp . eventHandler ( )
cp . eventBus = eventBus
2020-10-03 22:13:06 +00:00
cp . AutoHandleEvents ( toBeHandled )
2019-09-19 23:14:35 +00:00
}
// AutoHandleEvents sets an event (if able) to be handled by this peer
2021-11-09 23:47:33 +00:00
// Status: Ready for 1.5
2019-09-19 23:14:35 +00:00
func ( cp * cwtchPeer ) AutoHandleEvents ( events [ ] event . Type ) {
for _ , ev := range events {
if _ , exists := autoHandleableEvents [ ev ] ; exists {
cp . eventBus . Subscribe ( ev , cp . queue )
} else {
log . Errorf ( "Peer asked to autohandle event it cannot: %v\n" , ev )
}
}
2018-03-09 20:44:13 +00:00
}
2021-04-28 19:47:55 +00:00
// ImportGroup initializes a group from an imported source rather than a peer invite
2021-11-11 00:41:43 +00:00
func ( cp * cwtchPeer ) ImportGroup ( exportedInvite string ) ( int , error ) {
2021-11-16 23:06:30 +00:00
gci , err := model . ValidateInvite ( exportedInvite )
if err != nil {
return - 1 , err
}
2021-12-01 12:10:46 +00:00
cp . mutex . Lock ( )
2022-07-30 07:19:16 +00:00
conversationInfo , _ := cp . storage . GetConversationByHandle ( gci . GroupID )
2022-07-30 00:24:22 +00:00
if conversationInfo != nil {
cp . mutex . Unlock ( )
2022-07-30 07:19:16 +00:00
return - 1 , fmt . Errorf ( "group already exists" )
2022-07-30 00:24:22 +00:00
}
2021-12-01 12:10:46 +00:00
groupConversationID , err := cp . storage . NewConversation ( gci . GroupID , map [ string ] string { } , model . AccessControlList { } , true )
cp . mutex . Unlock ( )
2022-07-30 00:24:22 +00:00
2021-11-16 23:06:30 +00:00
if err == nil {
cp . SetConversationAttribute ( groupConversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupID ) ) , gci . GroupID )
cp . SetConversationAttribute ( groupConversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupServer ) ) , gci . ServerHost )
cp . SetConversationAttribute ( groupConversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupKey ) ) , base64 . StdEncoding . EncodeToString ( gci . SharedKey ) )
2021-11-18 23:43:58 +00:00
cp . SetConversationAttribute ( groupConversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Name ) ) , gci . GroupName )
2021-12-17 00:07:08 +00:00
cp . eventBus . Publish ( event . NewEvent ( event . NewGroup , map [ event . Field ] string { event . ConversationID : strconv . Itoa ( groupConversationID ) , event . GroupServer : gci . ServerHost , event . GroupInvite : exportedInvite , event . GroupName : gci . GroupName } ) )
2021-12-01 12:10:46 +00:00
cp . JoinServer ( gci . ServerHost )
2021-11-16 23:06:30 +00:00
}
return groupConversationID , err
2018-06-09 07:49:18 +00:00
}
2021-11-09 23:47:33 +00:00
// NewContactConversation create a new p2p conversation with the given acl applied to the handle.
2021-11-11 00:41:43 +00:00
func ( cp * cwtchPeer ) NewContactConversation ( handle string , acl model . AccessControl , accepted bool ) ( int , error ) {
2021-11-09 23:47:33 +00:00
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
2022-07-30 07:19:16 +00:00
conversationInfo , _ := cp . storage . GetConversationByHandle ( handle )
2022-07-30 00:24:22 +00:00
if conversationInfo == nil {
conversationID , err := cp . storage . NewConversation ( handle , model . Attributes { event . SaveHistoryKey : event . DeleteHistoryDefault } , model . AccessControlList { handle : acl } , accepted )
cp . eventBus . Publish ( event . NewEvent ( event . ContactCreated , map [ event . Field ] string { event . ConversationID : strconv . Itoa ( conversationID ) , event . RemotePeer : handle } ) )
return conversationID , err
}
2022-07-30 07:19:16 +00:00
return - 1 , fmt . Errorf ( "contact conversation already exists" )
2021-11-09 23:47:33 +00:00
}
// AcceptConversation looks up a conversation by `handle` and sets the Accepted status to `true`
// This will cause Cwtch to auto connect to this conversation on start up
2021-11-10 22:28:52 +00:00
func ( cp * cwtchPeer ) AcceptConversation ( id int ) error {
2021-12-01 12:10:46 +00:00
err := cp . storage . AcceptConversation ( id )
if err == nil {
// If a p2p conversation then attempt to peer with the onion...
2022-01-06 17:55:26 +00:00
// Groups and Server have their own acceptance flow.,
ci , err := cp . storage . GetConversation ( id )
if err != nil {
log . Errorf ( "Could not get conversation for %v: %v" , id , err )
return err
}
2021-12-01 12:10:46 +00:00
if ! ci . IsGroup ( ) && ! ci . IsServer ( ) {
2022-01-06 17:55:26 +00:00
cp . sendUpdateAuth ( id , ci . Handle , ci . Accepted , ci . ACL [ ci . Handle ] . Blocked )
2021-12-01 12:10:46 +00:00
cp . PeerWithOnion ( ci . Handle )
}
}
return err
2021-11-10 22:28:52 +00:00
}
2022-01-06 17:55:26 +00:00
// BlockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
// This will cause Cwtch to never try to connect to and refuse connections from the peer
2021-11-17 22:34:13 +00:00
func ( cp * cwtchPeer ) BlockConversation ( id int ) error {
2022-01-06 19:35:05 +00:00
ci , err := cp . storage . GetConversation ( id )
if err != nil {
return err
}
// p2p conversations have a single ACL referencing the remote peer. Set this to blocked...
if ac , exists := ci . ACL [ ci . Handle ] ; exists {
ac . Blocked = true
ci . ACL [ ci . Handle ] = ac
}
// Send an event in any case to block the protocol engine...
// TODO at some point in the future engine needs to understand ACLs not just legacy auth status
cp . sendUpdateAuth ( id , ci . Handle , ci . Accepted , ci . ACL [ ci . Handle ] . Blocked )
return cp . storage . SetConversationACL ( id , ci . ACL )
2022-01-06 17:55:26 +00:00
}
// UnblockConversation looks up a conversation by `handle` and sets the Blocked ACL field to `true`
// Further actions depend on the Accepted field
func ( cp * cwtchPeer ) UnblockConversation ( id int ) error {
2021-11-17 22:34:13 +00:00
ci , err := cp . storage . GetConversation ( id )
if err != nil {
return err
}
2022-01-06 17:55:26 +00:00
2022-01-06 19:35:05 +00:00
// p2p conversations have a single ACL referencing the remote peer. Set ACL's blocked to false...
if ac , exists := ci . ACL [ ci . Handle ] ; exists {
ac . Blocked = false
ci . ACL [ ci . Handle ] = ac
}
2022-01-06 17:55:26 +00:00
2021-11-17 22:34:13 +00:00
// Send an event in any case to block the protocol engine...
// TODO at some point in the future engine needs to understand ACLs not just legacy auth status
2022-01-06 17:55:26 +00:00
cp . sendUpdateAuth ( id , ci . Handle , ci . Accepted , ci . ACL [ ci . Handle ] . Blocked )
if ! ci . IsGroup ( ) && ! ci . IsServer ( ) && ci . Accepted {
cp . PeerWithOnion ( ci . Handle )
}
2021-11-17 22:34:13 +00:00
return cp . storage . SetConversationACL ( id , ci . ACL )
}
2022-01-06 17:55:26 +00:00
func ( cp * cwtchPeer ) sendUpdateAuth ( id int , handle string , accepted bool , blocked bool ) {
cp . eventBus . Publish ( event . NewEvent ( event . UpdateConversationAuthorization , map [ event . Field ] string { event . ConversationID : strconv . Itoa ( id ) , event . RemotePeer : handle , event . Accepted : strconv . FormatBool ( accepted ) , event . Blocked : strconv . FormatBool ( blocked ) } ) )
}
2021-11-10 22:28:52 +00:00
func ( cp * cwtchPeer ) FetchConversations ( ) ( [ ] * model . Conversation , error ) {
return cp . storage . FetchConversations ( )
2021-11-09 23:47:33 +00:00
}
2021-11-11 00:41:43 +00:00
func ( cp * cwtchPeer ) GetConversationInfo ( conversation int ) ( * model . Conversation , error ) {
return cp . storage . GetConversation ( conversation )
}
2021-11-09 23:47:33 +00:00
// FetchConversationInfo returns information about the given conversation referenced by the handle
func ( cp * cwtchPeer ) FetchConversationInfo ( handle string ) ( * model . Conversation , error ) {
2021-11-10 22:28:52 +00:00
return cp . storage . GetConversationByHandle ( handle )
2021-11-09 23:47:33 +00:00
}
// DeleteConversation purges all data about the conversation, including message timelines, referenced by the handle
2021-11-10 22:28:52 +00:00
func ( cp * cwtchPeer ) DeleteConversation ( id int ) error {
2021-11-09 23:47:33 +00:00
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
2022-07-30 23:05:39 +00:00
ci , err := cp . storage . GetConversation ( id )
if err == nil && ci != nil {
cp . eventBus . Publish ( event . NewEventList ( event . DeleteContact , event . RemotePeer , ci . Handle ) )
return cp . storage . DeleteConversation ( id )
}
return fmt . Errorf ( "could not delete conversation, did not exist" )
2021-11-09 23:47:33 +00:00
}
// SetConversationAttribute sets the conversation attribute at path to value
2021-11-10 22:28:52 +00:00
func ( cp * cwtchPeer ) SetConversationAttribute ( id int , path attr . ScopedZonedPath , value string ) error {
return cp . storage . SetConversationAttribute ( id , path , value )
2021-11-09 23:47:33 +00:00
}
// GetConversationAttribute is a shortcut method for retrieving the value of a given path
2021-11-10 22:28:52 +00:00
func ( cp * cwtchPeer ) GetConversationAttribute ( id int , path attr . ScopedZonedPath ) ( string , error ) {
ci , err := cp . storage . GetConversation ( id )
2021-11-09 23:47:33 +00:00
if err != nil {
return "" , err
}
val , exists := ci . Attributes [ path . ToString ( ) ]
if ! exists {
2021-11-10 22:28:52 +00:00
return "" , fmt . Errorf ( "%v does not exist for conversation %v" , path . ToString ( ) , id )
2021-11-09 23:47:33 +00:00
}
return val , nil
}
2021-11-17 22:34:13 +00:00
// GetChannelMessage returns a message from a conversation channel referenced by the absolute ID.
// Note: This should note be used to index a list as the ID is not expected to be tied to absolute position
// in the table (e.g. deleted messages, expired messages, etc.)
2021-11-09 23:47:33 +00:00
func ( cp * cwtchPeer ) GetChannelMessage ( conversation int , channel int , id int ) ( string , model . Attributes , error ) {
return cp . storage . GetChannelMessage ( conversation , channel , id )
}
2021-11-17 22:34:13 +00:00
// GetChannelMessageCount returns the absolute number of messages in a given conversation channel
func ( cp * cwtchPeer ) GetChannelMessageCount ( conversation int , channel int ) ( int , error ) {
return cp . storage . GetChannelMessageCount ( conversation , channel )
}
// GetMostRecentMessages returns a selection of messages, ordered by most recently inserted
func ( cp * cwtchPeer ) GetMostRecentMessages ( conversation int , channel int , offset int , limit int ) ( [ ] model . ConversationMessage , error ) {
return cp . storage . GetMostRecentMessages ( conversation , channel , offset , limit )
}
2021-11-19 22:04:43 +00:00
// UpdateMessageAttribute sets a given key/value attribute on the message in the given conversation/channel
2022-08-26 20:54:48 +00:00
// errors if the message doesn't exist, or for underlying database issues.
2021-11-19 20:27:52 +00:00
func ( cp * cwtchPeer ) UpdateMessageAttribute ( conversation int , channel int , id int , key string , value string ) error {
_ , attr , err := cp . GetChannelMessage ( conversation , channel , id )
if err == nil {
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
attr [ key ] = value
return cp . storage . UpdateMessageAttributes ( conversation , channel , id , attr )
}
return err
}
2018-06-19 22:28:44 +00:00
// StartGroup create a new group linked to the given server and returns the group ID, an invite or an error.
2021-11-11 00:41:43 +00:00
// Status: TODO change server handle to conversation id...?
2021-11-16 23:06:30 +00:00
func ( cp * cwtchPeer ) StartGroup ( name string , server string ) ( int , error ) {
2021-11-11 00:41:43 +00:00
group , err := model . NewGroup ( server )
2019-02-03 01:18:33 +00:00
if err == nil {
2021-12-01 12:10:46 +00:00
cp . mutex . Lock ( )
conversationID , err := cp . storage . NewConversation ( group . GroupID , map [ string ] string { } , model . AccessControlList { } , true )
cp . mutex . Unlock ( )
2021-11-11 00:41:43 +00:00
if err != nil {
return - 1 , err
2019-02-03 01:18:33 +00:00
}
2021-11-11 00:41:43 +00:00
cp . SetConversationAttribute ( conversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupID ) ) , group . GroupID )
cp . SetConversationAttribute ( conversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupServer ) ) , group . GroupServer )
cp . SetConversationAttribute ( conversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupKey ) ) , base64 . StdEncoding . EncodeToString ( group . GroupKey [ : ] ) )
2021-11-18 23:43:58 +00:00
cp . SetConversationAttribute ( conversationID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Name ) ) , name )
2021-11-11 00:41:43 +00:00
cp . eventBus . Publish ( event . NewEvent ( event . GroupCreated , map [ event . Field ] string {
2021-11-17 22:34:13 +00:00
event . ConversationID : strconv . Itoa ( conversationID ) ,
event . GroupID : group . GroupID ,
event . GroupServer : group . GroupServer ,
2021-12-16 18:11:16 +00:00
event . GroupName : name ,
2021-11-11 00:41:43 +00:00
} ) )
return conversationID , nil
2019-02-03 01:18:33 +00:00
}
2021-11-11 00:41:43 +00:00
log . Errorf ( "error creating group: %v" , err )
return - 1 , err
2018-06-19 22:28:44 +00:00
}
2020-07-23 21:40:44 +00:00
// AddServer takes in a serialized server specification (a bundle of related keys) and adds a contact for the
// server assuming there are no errors and the contact doesn't already exist.
2021-11-05 22:16:47 +00:00
// Returns the onion of the new server if added
2020-07-23 21:40:44 +00:00
// TODO in the future this function should also integrate with a trust provider to validate the key bundle.
2021-11-11 00:41:43 +00:00
// Status: Ready for 1.5
2021-11-19 08:09:19 +00:00
func ( cp * cwtchPeer ) AddServer ( serverSpecification string ) ( string , error ) {
2021-11-11 00:41:43 +00:00
// This confirms that the server did at least sign the bundle
keyBundle , err := model . DeserializeAndVerify ( [ ] byte ( serverSpecification ) )
if err != nil {
2021-11-05 22:16:47 +00:00
return "" , err
2021-11-11 00:41:43 +00:00
}
log . Debugf ( "Got new key bundle %v" , keyBundle . Serialize ( ) )
2021-11-23 20:17:11 +00:00
// if the key bundle is incomplete then error out.
// TODO In the future we may allow servers to attest to new
2021-11-11 00:41:43 +00:00
// keys or subsets of keys, but for now they must commit only to a complete set of keys required for Cwtch Groups
// (that way we can be assured that the keybundle we store is a valid one)
if ! keyBundle . HasKeyType ( model . KeyTypeTokenOnion ) || ! keyBundle . HasKeyType ( model . KeyTypeServerOnion ) || ! keyBundle . HasKeyType ( model . KeyTypePrivacyPass ) {
2021-11-05 22:16:47 +00:00
return "" , errors . New ( "keybundle is incomplete" )
2021-11-11 00:41:43 +00:00
}
if keyBundle . HasKeyType ( model . KeyTypeServerOnion ) {
onionKey , _ := keyBundle . GetKey ( model . KeyTypeServerOnion )
onion := string ( onionKey )
// Add the contact if we don't already have it
conversationInfo , _ := cp . FetchConversationInfo ( onion )
if conversationInfo == nil {
2021-12-01 12:10:46 +00:00
cp . mutex . Lock ( )
// Create a new conversation but do **not** push an event out.
_ , err := cp . storage . NewConversation ( onion , map [ string ] string { } , model . AccessControlList { onion : model . DefaultP2PAccessControl ( ) } , true )
cp . mutex . Unlock ( )
2021-11-17 22:34:13 +00:00
if err != nil {
2021-11-19 08:09:19 +00:00
return "" , err
2021-11-17 22:34:13 +00:00
}
2021-11-11 00:41:43 +00:00
}
conversationInfo , err = cp . FetchConversationInfo ( onion )
if conversationInfo != nil && err == nil {
ab := keyBundle . AttributeBundle ( )
for k , v := range ab {
val , exists := conversationInfo . Attributes [ k ]
if exists {
if val != v {
2021-12-08 01:01:27 +00:00
// the keybundle is inconsistent!
2021-11-19 08:09:19 +00:00
return "" , model . InconsistentKeyBundleError
2021-11-11 00:41:43 +00:00
}
}
// we haven't seen this key associated with the server before
}
2022-08-26 20:54:48 +00:00
// // If we have gotten to this point we can assume this is a safe key bundle signed by the
// // server with no conflicting keys. So we are going to save all the keys
2021-11-11 00:41:43 +00:00
for k , v := range ab {
cp . SetConversationAttribute ( conversationInfo . ID , attr . PublicScope . ConstructScopedZonedPath ( attr . ServerKeyZone . ConstructZonedPath ( k ) ) , v )
}
cp . SetConversationAttribute ( conversationInfo . ID , attr . PublicScope . ConstructScopedZonedPath ( attr . ServerKeyZone . ConstructZonedPath ( string ( model . BundleType ) ) ) , serverSpecification )
2021-12-16 18:11:16 +00:00
cp . JoinServer ( onion )
2021-11-19 08:09:19 +00:00
return onion , err
2021-11-11 00:41:43 +00:00
}
2021-11-19 22:10:51 +00:00
return "" , err
2021-11-11 00:41:43 +00:00
}
2021-11-19 08:09:19 +00:00
return "" , model . InconsistentKeyBundleError
2018-06-19 22:28:44 +00:00
}
2021-11-19 22:04:43 +00:00
// GetServers returns an unordered list of server handles
2020-12-17 01:40:03 +00:00
func ( cp * cwtchPeer ) GetServers ( ) [ ] string {
var servers [ ] string
2021-11-19 22:04:43 +00:00
conversations , err := cp . FetchConversations ( )
if err == nil {
for _ , conversationInfo := range conversations {
if conversationInfo . IsServer ( ) {
servers = append ( servers , conversationInfo . Handle )
}
}
}
2020-12-17 01:40:03 +00:00
return servers
}
2021-11-09 23:47:33 +00:00
// GetOnion
// Status: Deprecated in 1.5
2020-02-03 18:46:15 +00:00
func ( cp * cwtchPeer ) GetOnion ( ) string {
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
2021-11-11 00:41:43 +00:00
onion , _ := cp . storage . LoadProfileKeyValue ( TypeAttribute , attr . PublicScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Onion ) ) . ToString ( ) )
return string ( onion )
2020-02-03 18:46:15 +00:00
}
2021-11-09 23:47:33 +00:00
// GetPeerState
2021-11-11 00:41:43 +00:00
// Status: Ready for 1.5
2021-11-17 22:34:13 +00:00
func ( cp * cwtchPeer ) GetPeerState ( handle string ) connections . ConnectionState {
2020-02-03 18:46:15 +00:00
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
2021-11-11 00:41:43 +00:00
if state , ok := cp . state [ handle ] ; ok {
2021-11-17 22:34:13 +00:00
return state
2020-02-03 18:46:15 +00:00
}
2021-11-17 22:34:13 +00:00
return connections . DISCONNECTED
2019-05-15 20:12:11 +00:00
}
2021-11-09 23:47:33 +00:00
// PeerWithOnion initiates a request to the Protocol Engine to set up Cwtch Session with a given tor v3 onion
// address.
2019-07-17 19:10:52 +00:00
func ( cp * cwtchPeer ) PeerWithOnion ( onion string ) {
2022-12-03 00:23:11 +00:00
lastSeen := event . CwtchEpoch
ci , err := cp . FetchConversationInfo ( onion )
if err == nil {
lastSeen = cp . GetConversationLastSeenTime ( ci . ID )
}
cp . eventBus . Publish ( event . NewEvent ( event . PeerRequest , map [ event . Field ] string { event . RemotePeer : onion , event . LastSeen : lastSeen . Format ( time . RFC3339Nano ) } ) )
}
// QueuePeeringWithOnion sends the request to peer with an onion directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
// Status: Ready for 1.10
func ( cp * cwtchPeer ) QueuePeeringWithOnion ( handle string ) {
lastSeen := event . CwtchEpoch
ci , err := cp . FetchConversationInfo ( handle )
if err == nil {
lastSeen = cp . GetConversationLastSeenTime ( ci . ID )
}
2022-12-07 19:30:11 +00:00
if ! ci . ACL [ ci . Handle ] . Blocked && ci . Accepted {
cp . eventBus . Publish ( event . NewEvent ( event . QueuePeerRequest , map [ event . Field ] string { event . RemotePeer : handle , event . LastSeen : lastSeen . Format ( time . RFC3339Nano ) } ) )
}
2022-12-03 00:23:11 +00:00
}
// QueueJoinServer sends the request to join a server directly to the contact retry queue; this is a mechanism to not flood tor with circuit requests
// Status: Ready for 1.10
func ( cp * cwtchPeer ) QueueJoinServer ( handle string ) {
lastSeen := event . CwtchEpoch
ci , err := cp . FetchConversationInfo ( handle )
if err == nil {
lastSeen = cp . GetConversationLastSeenTime ( ci . ID )
}
cp . eventBus . Publish ( event . NewEvent ( event . QueueJoinServer , map [ event . Field ] string { event . GroupServer : handle , event . LastSeen : lastSeen . Format ( time . RFC3339Nano ) } ) )
2018-03-09 20:44:13 +00:00
}
2021-11-23 20:17:11 +00:00
// SendInviteToConversation kicks off the invite process
2022-03-23 21:08:30 +00:00
func ( cp * cwtchPeer ) SendInviteToConversation ( conversationID int , inviteConversationID int ) ( int , error ) {
2021-11-11 00:41:43 +00:00
var invite model . MessageWrapper
2021-11-16 23:06:30 +00:00
inviteConversationInfo , err := cp . GetConversationInfo ( inviteConversationID )
2021-11-11 00:41:43 +00:00
2021-11-16 23:06:30 +00:00
if inviteConversationInfo == nil || err != nil {
2022-03-23 21:08:30 +00:00
return - 1 , err
2018-03-15 20:53:22 +00:00
}
2021-11-11 00:41:43 +00:00
2021-11-16 23:06:30 +00:00
if tor . IsValidHostname ( inviteConversationInfo . Handle ) {
2021-11-19 20:27:52 +00:00
invite = model . MessageWrapper { Overlay : model . OverlayInviteContact , Data : inviteConversationInfo . Handle }
2021-11-11 00:41:43 +00:00
} else {
// Reconstruct Group
2021-11-16 23:06:30 +00:00
groupID , ok := inviteConversationInfo . Attributes [ attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupID ) ) . ToString ( ) ]
2021-11-11 00:41:43 +00:00
if ! ok {
2022-03-23 21:08:30 +00:00
return - 1 , errors . New ( "group structure is malformed - no id" )
2021-11-11 00:41:43 +00:00
}
2021-11-16 23:06:30 +00:00
groupServer , ok := inviteConversationInfo . Attributes [ attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupServer ) ) . ToString ( ) ]
2021-11-11 00:41:43 +00:00
if ! ok {
2022-03-23 21:08:30 +00:00
return - 1 , errors . New ( "group structure is malformed - no server" )
2021-11-11 00:41:43 +00:00
}
2021-11-16 23:06:30 +00:00
groupKeyBase64 , ok := inviteConversationInfo . Attributes [ attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupKey ) ) . ToString ( ) ]
2021-11-11 00:41:43 +00:00
if ! ok {
2022-03-23 21:08:30 +00:00
return - 1 , errors . New ( "group structure is malformed - no key" )
2021-11-11 00:41:43 +00:00
}
2021-11-18 23:43:58 +00:00
groupName , ok := inviteConversationInfo . Attributes [ attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Name ) ) . ToString ( ) ]
2021-11-11 00:41:43 +00:00
if ! ok {
2022-03-23 21:08:30 +00:00
return - 1 , errors . New ( "group structure is malformed - no name" )
2021-11-11 00:41:43 +00:00
}
groupKey , err := base64 . StdEncoding . DecodeString ( groupKeyBase64 )
if err != nil {
2022-03-23 21:08:30 +00:00
return - 1 , errors . New ( "malformed group key" )
2021-11-11 00:41:43 +00:00
}
var groupKeyFixed = [ 32 ] byte { }
copy ( groupKeyFixed [ : ] , groupKey [ : ] )
group := model . Group {
GroupID : groupID ,
2021-11-17 22:34:13 +00:00
GroupName : groupName ,
2021-11-11 00:41:43 +00:00
GroupKey : groupKeyFixed ,
GroupServer : groupServer ,
}
2021-11-17 22:34:13 +00:00
groupInvite , err := group . Invite ( )
2021-11-16 23:06:30 +00:00
if err != nil {
2022-03-23 21:08:30 +00:00
return - 1 , errors . New ( "group invite is malformed" )
2021-11-11 00:41:43 +00:00
}
serverInfo , err := cp . FetchConversationInfo ( groupServer )
if err != nil {
2022-03-23 21:08:30 +00:00
return - 1 , errors . New ( "unknown server associated with group" )
2021-11-11 00:41:43 +00:00
}
bundle , exists := serverInfo . Attributes [ attr . PublicScope . ConstructScopedZonedPath ( attr . ServerKeyZone . ConstructZonedPath ( string ( model . BundleType ) ) ) . ToString ( ) ]
if ! exists {
2022-03-23 21:08:30 +00:00
return - 1 , errors . New ( "server bundle not found" )
2021-11-11 00:41:43 +00:00
}
2021-12-08 01:01:27 +00:00
invite = model . MessageWrapper { Overlay : model . OverlayInviteGroup , Data : fmt . Sprintf ( "tofubundle:server:%s||%s" , base64 . StdEncoding . EncodeToString ( [ ] byte ( bundle ) ) , groupInvite ) }
2021-11-11 00:41:43 +00:00
}
inviteBytes , err := json . Marshal ( invite )
if err != nil {
log . Errorf ( "malformed invite: %v" , err )
2022-03-23 21:08:30 +00:00
return - 1 , err
2019-01-04 21:44:21 +00:00
}
2021-11-16 23:06:30 +00:00
return cp . SendMessage ( conversationID , string ( inviteBytes ) )
}
func ( cp * cwtchPeer ) ImportBundle ( importString string ) error {
2021-12-01 12:10:46 +00:00
if strings . HasPrefix ( importString , constants . TofuBundlePrefix ) {
2021-11-16 23:06:30 +00:00
bundle := strings . Split ( importString , "||" )
if len ( bundle ) == 2 {
2021-11-23 20:17:11 +00:00
err := cp . ImportBundle ( bundle [ 0 ] [ len ( constants . TofuBundlePrefix ) : ] )
2021-11-16 23:06:30 +00:00
// if the server import failed then abort the whole process..
if err != nil && ! strings . HasSuffix ( err . Error ( ) , "success" ) {
2021-11-23 20:17:11 +00:00
return ConstructResponse ( constants . ImportBundlePrefix , err . Error ( ) )
2021-11-16 23:06:30 +00:00
}
return cp . ImportBundle ( bundle [ 1 ] )
}
2021-11-23 20:17:11 +00:00
} else if strings . HasPrefix ( importString , constants . ServerPrefix ) {
2021-11-16 23:06:30 +00:00
// Server Key Bundles are prefixed with
2021-11-23 20:17:11 +00:00
bundle , err := base64 . StdEncoding . DecodeString ( importString [ len ( constants . ServerPrefix ) : ] )
2021-11-16 23:06:30 +00:00
if err == nil {
2021-11-19 08:09:19 +00:00
if _ , err = cp . AddServer ( string ( bundle ) ) ; err != nil {
2021-11-23 20:17:11 +00:00
return ConstructResponse ( constants . ImportBundlePrefix , err . Error ( ) )
2021-11-16 23:06:30 +00:00
}
2021-11-23 20:17:11 +00:00
return ConstructResponse ( constants . ImportBundlePrefix , "success" )
2021-11-16 23:06:30 +00:00
}
2021-11-23 20:17:11 +00:00
return ConstructResponse ( constants . ImportBundlePrefix , err . Error ( ) )
} else if strings . HasPrefix ( importString , constants . GroupPrefix ) {
2021-11-16 23:06:30 +00:00
//eg: torv3JFDWkXExBsZLkjvfkkuAxHsiLGZBk0bvoeJID9ItYnU=EsEBCiBhOWJhZDU1OTQ0NWI3YmM2N2YxYTM5YjkzMTNmNTczNRIgpHeNaG+6jy750eDhwLO39UX4f2xs0irK/M3P6mDSYQIaOTJjM2ttb29ibnlnaGoyenc2cHd2N2Q1N3l6bGQ3NTNhdW8zdWdhdWV6enB2ZmFrM2FoYzRiZHlkCiJAdVSSVgsksceIfHe41OJu9ZFHO8Kwv3G6F5OK3Hw4qZ6hn6SiZjtmJlJezoBH0voZlCahOU7jCOg+dsENndZxAA==
if _ , err := cp . ImportGroup ( importString ) ; err != nil {
2021-11-23 20:17:11 +00:00
return ConstructResponse ( constants . ImportBundlePrefix , err . Error ( ) )
2021-11-16 23:06:30 +00:00
}
2021-11-23 20:17:11 +00:00
return ConstructResponse ( constants . ImportBundlePrefix , "success" )
2021-12-06 20:20:38 +00:00
} else if tor . IsValidHostname ( importString ) {
2021-12-01 12:10:46 +00:00
_ , err := cp . NewContactConversation ( importString , model . DefaultP2PAccessControl ( ) , true )
if err == nil {
// Assuming all is good, we should peer with this contact.
cp . PeerWithOnion ( importString )
return ConstructResponse ( constants . ImportBundlePrefix , "success" )
}
return ConstructResponse ( constants . ImportBundlePrefix , err . Error ( ) )
2021-11-16 23:06:30 +00:00
}
2021-11-23 20:17:11 +00:00
return ConstructResponse ( constants . ImportBundlePrefix , "invalid_group_invite_prefix" )
2018-03-30 21:16:51 +00:00
}
2018-05-16 21:31:06 +00:00
// JoinServer manages a new server connection with the given onion address
2020-09-21 21:26:28 +00:00
func ( cp * cwtchPeer ) JoinServer ( onion string ) error {
2021-11-16 23:06:30 +00:00
ci , err := cp . FetchConversationInfo ( onion )
if ci == nil || err != nil {
return errors . New ( "no keys found for server connection" )
}
2021-11-09 23:47:33 +00:00
//if cp.GetContact(onion) != nil {
2021-11-16 23:06:30 +00:00
tokenY , yExists := ci . Attributes [ attr . PublicScope . ConstructScopedZonedPath ( attr . ServerKeyZone . ConstructZonedPath ( string ( model . KeyTypePrivacyPass ) ) ) . ToString ( ) ]
tokenOnion , onionExists := ci . Attributes [ attr . PublicScope . ConstructScopedZonedPath ( attr . ServerKeyZone . ConstructZonedPath ( string ( model . KeyTypeTokenOnion ) ) ) . ToString ( ) ]
if yExists && onionExists {
2021-11-18 23:43:58 +00:00
signature , exists := ci . Attributes [ attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( lastReceivedSignature ) ) . ToString ( ) ]
2021-11-16 23:06:30 +00:00
if ! exists {
signature = base64 . StdEncoding . EncodeToString ( [ ] byte { } )
}
2022-11-23 16:01:22 +00:00
cachedTokensJson , hasCachedTokens := ci . GetAttribute ( attr . LocalScope , attr . ServerZone , "tokens" )
if hasCachedTokens {
log . Debugf ( "using cached tokens for %v" , ci . Handle )
}
cp . eventBus . Publish ( event . NewEvent ( event . JoinServer , map [ event . Field ] string { event . GroupServer : onion , event . ServerTokenY : tokenY , event . ServerTokenOnion : tokenOnion , event . Signature : signature , event . CachedTokens : cachedTokensJson } ) )
2021-11-16 23:06:30 +00:00
return nil
}
2020-09-21 21:26:28 +00:00
return errors . New ( "no keys found for server connection" )
2018-03-09 20:44:13 +00:00
}
2022-09-10 17:15:32 +00:00
// MakeAntispamPayment allows a peer to retrigger antispam, important if the initial connection somehow fails...
// TODO in the future we might want to expose this in CwtchPeer interface
// Additionally we may want to add extra checks here to deduplicate groups from tokenservers to cut down
// on the number of events (right now it should be minimal)
func ( cp * cwtchPeer ) MakeAntispamPayment ( server string ) {
cp . eventBus . Publish ( event . NewEvent ( event . MakeAntispamPayment , map [ event . Field ] string { event . GroupServer : server } ) )
}
2022-04-20 20:10:07 +00:00
// ResyncServer completely tears down and resyncs a new server connection with the given handle
func ( cp * cwtchPeer ) ResyncServer ( handle string ) error {
ci , err := cp . FetchConversationInfo ( handle )
2021-11-23 20:17:11 +00:00
if ci == nil || err != nil {
return errors . New ( "no keys found for server connection" )
}
2022-04-20 20:10:07 +00:00
// delete lastReceivedSignature - this will cause JoinServer to issue a resync
2021-11-23 20:17:11 +00:00
cp . SetConversationAttribute ( ci . ID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( lastReceivedSignature ) ) , base64 . StdEncoding . EncodeToString ( [ ] byte { } ) )
2022-04-20 20:10:07 +00:00
// send an explicit leave server event...
leaveServerEvent := event . NewEventList ( event . LeaveServer , event . GroupServer , handle )
cp . eventBus . Publish ( leaveServerEvent )
// rejoin the server
return cp . JoinServer ( handle )
2021-06-01 18:34:35 +00:00
}
2021-11-09 23:47:33 +00:00
// SendGetValToPeer
// Status: Ready for 1.5
2020-03-07 07:41:00 +00:00
func ( cp * cwtchPeer ) SendGetValToPeer ( onion string , scope string , path string ) {
2021-10-30 00:44:16 +00:00
ev := event . NewEventList ( event . SendGetValMessageToPeer , event . RemotePeer , onion , event . Scope , scope , event . Path , path )
cp . eventBus . Publish ( ev )
2020-03-07 07:41:00 +00:00
}
2021-10-30 00:44:16 +00:00
// Listen makes the peer open a listening port to accept incoming connections (and be detectably online)
2021-11-09 23:47:33 +00:00
// Status: Ready for 1.5
2018-11-10 22:14:12 +00:00
func ( cp * cwtchPeer ) Listen ( ) {
2021-04-13 22:12:12 +00:00
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
2021-06-02 18:34:57 +00:00
if ! cp . listenStatus {
2022-12-04 19:06:50 +00:00
log . Debugf ( "cwtchPeer Listen sending ProtocolEngineStartListen\n" )
2021-04-13 22:12:12 +00:00
cp . listenStatus = true
2021-11-11 00:41:43 +00:00
onion , _ := cp . storage . LoadProfileKeyValue ( TypeAttribute , attr . PublicScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Onion ) ) . ToString ( ) )
cp . eventBus . Publish ( event . NewEvent ( event . ProtocolEngineStartListen , map [ event . Field ] string { event . Onion : string ( onion ) } ) )
2021-04-13 22:12:12 +00:00
}
2021-06-02 18:34:57 +00:00
// else protocol engine is already listening
2018-03-09 20:44:13 +00:00
}
2022-12-03 00:23:11 +00:00
type LastSeenConversation struct {
model * model . Conversation
lastSeen time . Time
2022-09-26 01:28:04 +00:00
}
2022-12-03 00:23:11 +00:00
func ( cp * cwtchPeer ) GetConversationLastSeenTime ( conversationId int ) time . Time {
2022-12-06 05:07:09 +00:00
lastTime := event . CwtchEpoch
2022-12-03 00:23:11 +00:00
timestamp , err := cp . GetConversationAttribute ( conversationId , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . AttrLastConnectionTime ) ) )
if err == nil {
if time , err := time . Parse ( time . RFC3339Nano , timestamp ) ; err == nil {
2022-12-06 05:07:09 +00:00
lastTime = time
2022-12-03 00:23:11 +00:00
}
}
2022-12-06 05:07:09 +00:00
// for peers
2022-12-03 00:23:11 +00:00
lastMessage , _ := cp . GetMostRecentMessages ( conversationId , 0 , 0 , 1 )
if len ( lastMessage ) != 0 {
2022-12-06 05:07:09 +00:00
lastMsgTime , err := time . Parse ( time . RFC3339Nano , lastMessage [ 0 ] . Attr [ constants . AttrSentTimestamp ] )
2022-12-03 00:23:11 +00:00
if err == nil {
2022-12-06 05:07:09 +00:00
if lastMsgTime . After ( lastTime ) {
lastTime = lastMsgTime
}
2022-12-03 00:23:11 +00:00
}
}
2022-12-06 05:07:09 +00:00
// for servers
recentTimeStr , err := cp . GetConversationAttribute ( conversationId , attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . SyncMostRecentMessageTime ) ) )
if err == nil {
if recentTime , err := time . Parse ( time . RFC3339Nano , recentTimeStr ) ; err == nil && recentTime . After ( lastTime ) {
lastTime = recentTime
}
}
2022-12-03 00:23:11 +00:00
2022-12-06 05:07:09 +00:00
return lastTime
2022-12-03 00:23:11 +00:00
}
func ( cp * cwtchPeer ) getConnectionsSortedByLastSeen ( doPeers , doServers bool ) [ ] * LastSeenConversation {
2021-11-17 23:59:52 +00:00
conversations , _ := cp . FetchConversations ( )
2022-12-03 00:23:11 +00:00
byRecent := [ ] * LastSeenConversation { }
2022-09-26 01:28:04 +00:00
2021-11-17 23:59:52 +00:00
for _ , conversation := range conversations {
2022-12-06 03:24:22 +00:00
if ! conversation . IsGroup ( ) {
2022-12-03 00:23:11 +00:00
if conversation . IsServer ( ) {
if ! doServers {
continue
}
} else {
2022-12-06 03:24:22 +00:00
if ! doPeers || ! conversation . Accepted {
2022-12-03 00:23:11 +00:00
continue
2022-09-26 01:28:04 +00:00
}
}
2022-12-03 00:23:11 +00:00
byRecent = append ( byRecent , & LastSeenConversation { conversation , cp . GetConversationLastSeenTime ( conversation . ID ) } )
2021-11-17 23:59:52 +00:00
}
}
2022-09-26 01:28:04 +00:00
sort . Slice ( byRecent , func ( i , j int ) bool {
2022-12-03 00:23:11 +00:00
return byRecent [ i ] . lastSeen . After ( byRecent [ j ] . lastSeen )
2022-09-26 01:28:04 +00:00
} )
2022-12-03 00:23:11 +00:00
return byRecent
}
func ( cp * cwtchPeer ) StartConnections ( doPeers , doServers bool ) {
byRecent := cp . getConnectionsSortedByLastSeen ( doPeers , doServers )
log . Infof ( "StartConnections for %v" , cp . GetOnion ( ) )
for _ , conversation := range byRecent {
if conversation . model . IsServer ( ) {
2022-12-03 23:48:09 +00:00
log . Debugf ( " QueueJoinServer(%v)" , conversation . model . Handle )
2022-12-03 00:23:11 +00:00
cp . QueueJoinServer ( conversation . model . Handle )
} else {
2022-12-03 23:48:09 +00:00
log . Debugf ( " QueuePeerWithOnion(%v)" , conversation . model . Handle )
2022-12-03 00:23:11 +00:00
cp . QueuePeeringWithOnion ( conversation . model . Handle )
}
time . Sleep ( 50 * time . Millisecond )
}
}
// StartPeersConnections attempts to connect to peer connections
// Status: Ready for 1.5
// Deprecated: for 1.10 use StartConnections
func ( cp * cwtchPeer ) StartPeersConnections ( ) {
log . Infof ( "StartPeerConnections" )
byRecent := cp . getConnectionsSortedByLastSeen ( true , false )
2022-09-26 01:28:04 +00:00
for _ , conversation := range byRecent {
2022-12-03 23:48:09 +00:00
log . Debugf ( " QueuePeerWithOnion(%v)" , conversation . model . Handle )
2022-12-03 00:23:11 +00:00
cp . QueuePeeringWithOnion ( conversation . model . Handle )
2022-09-26 01:28:04 +00:00
}
2020-10-29 21:22:33 +00:00
}
// StartServerConnections attempts to connect to all server connections
2021-11-09 23:47:33 +00:00
// Status: Ready for 1.5
2022-12-03 00:23:11 +00:00
// Deprecated: for 1.10 use StartConnections
2020-10-29 21:22:33 +00:00
func ( cp * cwtchPeer ) StartServerConnections ( ) {
2022-09-26 01:28:04 +00:00
log . Infof ( "StartServerConections" )
2022-12-03 00:23:11 +00:00
byRecent := cp . getConnectionsSortedByLastSeen ( false , true )
2022-09-26 01:28:04 +00:00
2022-12-03 00:23:11 +00:00
for _ , conversation := range byRecent {
if conversation . model . IsServer ( ) {
2022-12-03 23:48:09 +00:00
log . Debugf ( " QueueJoinServer(%v)" , conversation . model . Handle )
2022-12-03 00:23:11 +00:00
cp . QueueJoinServer ( conversation . model . Handle )
2021-11-17 23:59:52 +00:00
}
}
2019-10-31 21:39:31 +00:00
}
2018-06-15 16:21:07 +00:00
// Shutdown kills all connections and cleans up all goroutines for the peer
2021-11-09 23:47:33 +00:00
// Status: Ready for 1.5
2018-06-19 22:28:44 +00:00
func ( cp * cwtchPeer ) Shutdown ( ) {
2020-02-03 18:46:15 +00:00
cp . mutex . Lock ( )
defer cp . mutex . Unlock ( )
2022-11-30 22:13:54 +00:00
// don't allow this to be shutdown twice...
if ! cp . shutdown {
cp . shutdown = true
cp . queue . Shutdown ( )
if cp . storage != nil {
cp . storage . Close ( true )
}
2021-11-09 23:47:33 +00:00
}
2018-05-30 18:42:17 +00:00
}
2021-12-06 20:20:38 +00:00
func ( cp * cwtchPeer ) storeMessage ( handle string , message string , sent time . Time ) ( int , error ) {
2021-11-17 23:59:52 +00:00
// TODO maybe atomize this?
2021-11-09 23:47:33 +00:00
ci , err := cp . FetchConversationInfo ( handle )
if err != nil {
2021-11-11 00:41:43 +00:00
id , err := cp . NewContactConversation ( handle , model . DefaultP2PAccessControl ( ) , false )
2021-11-09 23:47:33 +00:00
if err != nil {
2021-12-06 20:20:38 +00:00
return - 1 , err
2021-11-09 23:47:33 +00:00
}
2021-11-11 00:41:43 +00:00
ci , err = cp . GetConversationInfo ( id )
2021-11-09 23:47:33 +00:00
if err != nil {
2021-12-06 20:20:38 +00:00
return - 1 , err
2021-11-09 23:47:33 +00:00
}
2020-11-12 21:17:06 +00:00
}
2021-11-16 23:06:30 +00:00
// Generate a random number and use it as the signature
signature := event . GetRandNumber ( ) . String ( )
2021-11-18 23:43:58 +00:00
return cp . storage . InsertMessage ( ci . ID , 0 , message , model . Attributes { constants . AttrAuthor : handle , constants . AttrAck : event . True , constants . AttrSentTimestamp : sent . Format ( time . RFC3339Nano ) } , signature , model . CalculateContentHash ( handle , message ) )
2020-10-03 23:09:47 +00:00
}
2019-01-04 21:44:21 +00:00
// eventHandler process events from other subsystems
func ( cp * cwtchPeer ) eventHandler ( ) {
for {
ev := cp . queue . Next ( )
switch ev . EventType {
2019-09-19 23:14:35 +00:00
/***** Default auto handled events *****/
2021-04-13 22:12:12 +00:00
case event . ProtocolEngineStopped :
cp . mutex . Lock ( )
cp . listenStatus = false
2021-11-27 00:01:58 +00:00
onion , _ := cp . storage . LoadProfileKeyValue ( TypeAttribute , attr . PublicScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . Onion ) ) . ToString ( ) )
2022-04-20 20:10:07 +00:00
log . Infof ( "Protocol engine for %s has stopped listening: %v" , onion , ev . Data [ event . Error ] )
2021-04-13 22:12:12 +00:00
cp . mutex . Unlock ( )
2019-01-04 21:44:21 +00:00
case event . EncryptedGroupMessage :
2021-06-01 18:34:35 +00:00
2021-11-16 23:06:30 +00:00
// If successful, a side effect is the message is added to the group's timeline
ciphertext , _ := base64 . StdEncoding . DecodeString ( ev . Data [ event . Ciphertext ] )
signature , _ := base64 . StdEncoding . DecodeString ( ev . Data [ event . Signature ] )
log . Debugf ( "received encrypted group message: %x" , ev . Data [ event . Signature ] )
2021-06-01 18:34:35 +00:00
// SECURITY NOTE: A malicious server could insert posts such that everyone always has a different lastKnownSignature
// However the server can always replace **all** messages in an attempt to track users
// This is mitigated somewhat by resync events which do wipe things entire.
// The security of cwtch groups are also not dependent on the servers inability to uniquely tag connections (as long as
// it learns nothing else about each connection).
2021-06-24 01:29:23 +00:00
// store the base64 encoded signature for later use
2021-06-01 18:34:35 +00:00
2021-12-08 01:01:27 +00:00
// TODO Server Connections should send Connection ID
2021-11-25 22:34:47 +00:00
ci , err := cp . FetchConversationInfo ( ev . Data [ event . GroupServer ] )
if ci == nil || err != nil {
log . Errorf ( "no server connection count" )
return
}
cp . SetConversationAttribute ( ci . ID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( lastReceivedSignature ) ) , ev . Data [ event . Signature ] )
2021-11-16 23:06:30 +00:00
conversations , err := cp . FetchConversations ( )
if err == nil {
for _ , conversationInfo := range conversations {
2021-11-17 23:59:52 +00:00
if ! tor . IsValidHostname ( conversationInfo . Handle ) {
2021-11-16 23:06:30 +00:00
group , err := cp . constructGroupFromConversation ( conversationInfo )
if err == nil {
success , dgm := group . AttemptDecryption ( ciphertext , signature )
if success {
// Time to either acknowledge the message or insert a new message
2022-02-03 22:39:52 +00:00
// Re-encode signature to base64
cp . attemptInsertOrAcknowledgeLegacyGroupConversation ( conversationInfo . ID , base64 . StdEncoding . EncodeToString ( signature ) , dgm )
2022-03-03 23:58:41 +00:00
if serverState , exists := cp . state [ ev . Data [ event . GroupServer ] ] ; exists && serverState == connections . AUTHENTICATED {
// server is syncing, update it's most recent sync message time
cp . SetConversationAttribute ( ci . ID , attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . SyncMostRecentMessageTime ) ) , time . Unix ( int64 ( dgm . Timestamp ) , 0 ) . Format ( time . RFC3339Nano ) )
}
2021-11-16 23:06:30 +00:00
break
}
}
}
}
}
2021-12-06 20:20:38 +00:00
case event . NewMessageFromPeerEngine : //event.TimestampReceived, event.RemotePeer, event.Data
2020-10-03 23:49:05 +00:00
ts , _ := time . Parse ( time . RFC3339Nano , ev . Data [ event . TimestampReceived ] )
2021-12-06 20:20:38 +00:00
id , err := cp . storeMessage ( ev . Data [ event . RemotePeer ] , ev . Data [ event . Data ] , ts )
if err == nil {
// Republish as NewMessageFromPeer
ev . EventType = event . NewMessageFromPeer
ev . Data [ event . Index ] = strconv . Itoa ( id )
2022-01-20 05:48:31 +00:00
ev . Data [ event . ContentHash ] = model . CalculateContentHash ( ev . Data [ event . RemotePeer ] , ev . Data [ event . Data ] )
2021-12-06 20:20:38 +00:00
cp . eventBus . Publish ( ev )
}
2019-10-18 23:56:10 +00:00
case event . PeerAcknowledgement :
2021-11-16 23:06:30 +00:00
err := cp . attemptAcknowledgeP2PConversation ( ev . Data [ event . RemotePeer ] , ev . Data [ event . EventID ] )
if err != nil {
// Note: This is not an Error because malicious peers can just send acks for random things
// There is no point in polluting error logs with that mess.
log . Debugf ( "failed to acknowledge acknowledgement: %v" , err )
}
2019-10-18 23:56:10 +00:00
case event . SendMessageToGroupError :
2021-12-19 01:51:22 +00:00
err := cp . attemptErrorConversationMessage ( ev . Data [ event . GroupID ] , ev . Data [ event . Signature ] , ev . Data [ event . Error ] )
2021-11-16 23:06:30 +00:00
if err != nil {
2021-12-01 12:10:46 +00:00
log . Errorf ( "failed to error group message: %s %v" , ev . Data [ event . GroupID ] , err )
2021-11-16 23:06:30 +00:00
}
2019-10-18 23:56:10 +00:00
case event . SendMessageToPeerError :
2021-12-01 12:10:46 +00:00
context := ev . Data [ event . EventContext ]
if context == string ( event . SendMessageToPeer ) {
2021-12-19 01:51:22 +00:00
err := cp . attemptErrorConversationMessage ( ev . Data [ event . RemotePeer ] , ev . Data [ event . EventID ] , ev . Data [ event . Error ] )
2021-12-01 12:10:46 +00:00
if err != nil {
log . Errorf ( "failed to error p2p message: %s %v" , ev . Data [ event . RemotePeer ] , err )
}
2021-11-16 23:06:30 +00:00
}
2021-04-28 19:47:55 +00:00
case event . RetryServerRequest :
// Automated Join Server Request triggered by a plugin.
2021-05-07 23:16:22 +00:00
log . Debugf ( "profile received an automated retry event for %v" , ev . Data [ event . GroupServer ] )
2021-10-30 00:44:16 +00:00
err := cp . JoinServer ( ev . Data [ event . GroupServer ] )
2021-10-31 19:16:50 +00:00
if err != nil {
log . Errorf ( "error joining server... %v" , err )
2021-10-30 00:44:16 +00:00
}
2020-03-07 07:41:00 +00:00
case event . NewGetValMessageFromPeer :
onion := ev . Data [ event . RemotePeer ]
scope := ev . Data [ event . Scope ]
path := ev . Data [ event . Path ]
2021-10-07 22:40:25 +00:00
log . Debugf ( "NewGetValMessageFromPeer for %v.%v from %v\n" , scope , path , onion )
2021-11-16 23:06:30 +00:00
conversationInfo , err := cp . FetchConversationInfo ( onion )
2023-01-05 21:52:43 +00:00
2021-11-16 23:06:30 +00:00
log . Debugf ( "confo info lookup newgetval %v %v %v" , onion , conversationInfo , err )
2023-01-05 21:52:43 +00:00
// only accepted contacts can look up information
2021-11-09 23:47:33 +00:00
if conversationInfo != nil && conversationInfo . Accepted {
2023-01-05 21:52:43 +00:00
// Type Safe Scoped/Zoned Path
zscope := attr . IntoScope ( scope )
zone , zpath := attr . ParseZone ( path )
scopedZonedPath := zscope . ConstructScopedZonedPath ( zone . ConstructZonedPath ( zpath ) )
// Safe Access to Extensions
cp . extensionLock . Lock ( )
2023-01-25 20:32:26 +00:00
log . Debugf ( "checking extension...%v" , cp . extensions )
2023-01-05 21:52:43 +00:00
for _ , extension := range cp . extensions {
2023-01-25 20:32:26 +00:00
log . Debugf ( "checking extension...%v" , extension )
// check if the current map of experiments satisfies the extension requirements
if ! cp . checkExtensionExperiment ( extension ) {
2023-03-06 21:06:15 +00:00
log . Debugf ( "skipping extension (%s) ..not all experiments satisfied" , extension )
2023-01-25 20:32:26 +00:00
continue
}
2023-01-05 21:52:43 +00:00
extension . extension . OnContactRequestValue ( cp , * conversationInfo , ev . EventID , scopedZonedPath )
2020-03-07 07:41:00 +00:00
}
2023-01-05 21:52:43 +00:00
cp . extensionLock . Unlock ( )
2020-03-07 07:41:00 +00:00
2021-09-30 00:57:13 +00:00
}
2020-03-07 07:41:00 +00:00
case event . NewRetValMessageFromPeer :
2023-01-05 21:52:43 +00:00
handle := ev . Data [ event . RemotePeer ]
2020-03-07 07:41:00 +00:00
scope := ev . Data [ event . Scope ]
path := ev . Data [ event . Path ]
val := ev . Data [ event . Data ]
exists , _ := strconv . ParseBool ( ev . Data [ event . Exists ] )
2023-01-05 21:52:43 +00:00
log . Debugf ( "NewRetValMessageFromPeer %v %v %v %v %v\n" , handle , scope , path , exists , val )
2021-10-07 22:40:25 +00:00
2023-01-05 21:52:43 +00:00
conversationInfo , _ := cp . FetchConversationInfo ( handle )
// only accepted contacts can look up information
if conversationInfo != nil && conversationInfo . Accepted {
// Type Safe Scoped/Zoned Path
zscope := attr . IntoScope ( scope )
zone , zpath := attr . ParseZone ( path )
scopedZonedPath := zscope . ConstructScopedZonedPath ( zone . ConstructZonedPath ( zpath ) )
// Safe Access to Extensions
cp . extensionLock . Lock ( )
for _ , extension := range cp . extensions {
2023-01-25 20:32:26 +00:00
log . Debugf ( "checking extension...%v" , extension )
// check if the current map of experiments satisfies the extension requirements
if ! cp . checkExtensionExperiment ( extension ) {
2023-03-06 21:06:15 +00:00
log . Debugf ( "skipping extension (%s) ..not all experiments satisfied" , extension )
2023-01-25 20:32:26 +00:00
continue
}
2023-01-05 21:52:43 +00:00
extension . extension . OnContactReceiveValue ( cp , * conversationInfo , scopedZonedPath , val , exists )
2021-10-07 22:40:25 +00:00
}
2023-01-05 21:52:43 +00:00
cp . extensionLock . Unlock ( )
2020-03-07 07:41:00 +00:00
}
2019-05-15 20:12:11 +00:00
case event . PeerStateChange :
2021-12-19 20:01:21 +00:00
handle := ev . Data [ event . RemotePeer ]
if connections . ConnectionStateToType ( ) [ ev . Data [ event . ConnectionState ] ] == connections . AUTHENTICATED {
2022-12-03 00:23:11 +00:00
ci , err := cp . FetchConversationInfo ( handle )
var cid int
2021-12-19 20:01:21 +00:00
if err != nil {
2022-12-03 00:23:11 +00:00
// if it's a newly authenticated connection with no conversation storage, init
2022-12-03 17:50:31 +00:00
cid , _ = cp . NewContactConversation ( handle , model . DefaultP2PAccessControl ( ) , false )
2022-12-03 00:23:11 +00:00
} else {
cid = ci . ID
}
timestamp := time . Now ( ) . Format ( time . RFC3339Nano )
cp . SetConversationAttribute ( cid , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . AttrLastConnectionTime ) ) , timestamp )
} else if connections . ConnectionStateToType ( ) [ ev . Data [ event . ConnectionState ] ] == connections . DISCONNECTED {
ci , err := cp . FetchConversationInfo ( handle )
if err == nil {
cp . mutex . Lock ( )
if cp . state [ ev . Data [ event . RemotePeer ] ] == connections . AUTHENTICATED {
// If peer went offline, set last seen time to now
timestamp := time . Now ( ) . Format ( time . RFC3339Nano )
cp . SetConversationAttribute ( ci . ID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . AttrLastConnectionTime ) ) , timestamp )
}
cp . mutex . Unlock ( )
2021-12-19 20:01:21 +00:00
}
}
2023-04-03 19:44:28 +00:00
// Safe Access to Extensions
cp . extensionLock . Lock ( )
for _ , extension := range cp . extensions {
log . Debugf ( "checking extension...%v" , extension )
// check if the current map of experiments satisfies the extension requirements
if ! cp . checkExtensionExperiment ( extension ) {
log . Debugf ( "skipping extension (%s) ..not all experiments satisfied" , extension )
continue
}
if cp . checkEventExperiment ( extension , ev . EventType ) {
extension . extension . OnEvent ( ev , cp )
}
}
cp . extensionLock . Unlock ( )
2020-02-03 18:46:15 +00:00
cp . mutex . Lock ( )
2021-11-09 23:47:33 +00:00
cp . state [ ev . Data [ event . RemotePeer ] ] = connections . ConnectionStateToType ( ) [ ev . Data [ event . ConnectionState ] ]
2020-02-03 18:46:15 +00:00
cp . mutex . Unlock ( )
2019-05-15 20:12:11 +00:00
case event . ServerStateChange :
2020-02-03 18:46:15 +00:00
cp . mutex . Lock ( )
2022-12-03 00:23:11 +00:00
prevState := cp . state [ ev . Data [ event . GroupServer ] ]
2022-03-03 23:58:41 +00:00
state := connections . ConnectionStateToType ( ) [ ev . Data [ event . ConnectionState ] ]
cp . state [ ev . Data [ event . GroupServer ] ] = state
2020-02-03 18:46:15 +00:00
cp . mutex . Unlock ( )
2022-03-03 23:58:41 +00:00
// If starting to sync, determine last message from known groups on server so we can calculate sync progress
if state == connections . AUTHENTICATED {
conversations , err := cp . FetchConversations ( )
2022-12-03 00:23:11 +00:00
mostRecentTime := event . CwtchEpoch
2022-03-03 23:58:41 +00:00
if err == nil {
for _ , conversationInfo := range conversations {
if server , exists := conversationInfo . GetAttribute ( attr . LocalScope , attr . LegacyGroupZone , constants . GroupServer ) ; exists && server == ev . Data [ event . GroupServer ] {
lastMessage , _ := cp . GetMostRecentMessages ( conversationInfo . ID , 0 , 0 , 1 )
if len ( lastMessage ) == 0 {
continue
}
lastGroupMsgTime , err := time . Parse ( time . RFC3339Nano , lastMessage [ 0 ] . Attr [ constants . AttrSentTimestamp ] )
if err != nil {
continue
}
if lastGroupMsgTime . After ( mostRecentTime ) {
mostRecentTime = lastGroupMsgTime
}
}
}
}
serverInfo , err := cp . FetchConversationInfo ( ev . Data [ event . GroupServer ] )
if err == nil {
cp . SetConversationAttribute ( serverInfo . ID , attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . SyncPreLastMessageTime ) ) , mostRecentTime . Format ( time . RFC3339Nano ) )
cp . SetConversationAttribute ( serverInfo . ID , attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . SyncMostRecentMessageTime ) ) , mostRecentTime . Format ( time . RFC3339Nano ) )
2022-12-03 00:23:11 +00:00
if connections . ConnectionStateToType ( ) [ ev . Data [ event . ConnectionState ] ] == connections . AUTHENTICATED {
timestamp := time . Now ( ) . Format ( time . RFC3339Nano )
cp . SetConversationAttribute ( serverInfo . ID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . AttrLastConnectionTime ) ) , timestamp )
} else if connections . ConnectionStateToType ( ) [ ev . Data [ event . ConnectionState ] ] == connections . DISCONNECTED && prevState == connections . AUTHENTICATED {
// If peer went offline, set last seen time to now
timestamp := time . Now ( ) . Format ( time . RFC3339Nano )
cp . SetConversationAttribute ( serverInfo . ID , attr . LocalScope . ConstructScopedZonedPath ( attr . ProfileZone . ConstructZonedPath ( constants . AttrLastConnectionTime ) ) , timestamp )
}
2022-03-03 23:58:41 +00:00
}
}
2022-09-10 17:15:32 +00:00
case event . TriggerAntispamCheck :
conversations , _ := cp . FetchConversations ( )
for _ , conversation := range conversations {
if conversation . IsServer ( ) {
cp . MakeAntispamPayment ( conversation . Handle )
}
}
2019-01-04 21:44:21 +00:00
default :
2023-01-05 21:52:43 +00:00
// invalid event, signifies shutdown
if ev . EventType == "" {
return
}
// Otherwise, obtain Safe Access to Extensions
processed := false
cp . extensionLock . Lock ( )
for _ , extension := range cp . extensions {
2023-01-25 20:32:26 +00:00
// check if the current map of experiments satisfies the extension requirements
if ! cp . checkExtensionExperiment ( extension ) {
2023-03-06 21:06:15 +00:00
log . Debugf ( "skipping extension (%s) ..not all experiments satisfied" , extension )
2023-03-06 21:19:52 +00:00
if cp . checkEventExperiment ( extension , ev . EventType ) {
// If this experiment was enabled...we might have processed this event...
// To avoid flagging an error later on in this method we set processed to true.
processed = true
}
2023-01-25 20:32:26 +00:00
continue
}
2023-01-05 21:52:43 +00:00
// if the extension is registered for this event type then process
if _ , contains := extension . events [ ev . EventType ] ; contains {
extension . extension . OnEvent ( ev , cp )
processed = true
}
}
cp . extensionLock . Unlock ( )
if ! processed {
log . Errorf ( "cwtch profile received an event that it (or an extension) was unable to handle. this is very likely a programming error: %v" , ev . EventType )
2019-01-04 21:44:21 +00:00
}
}
}
2018-10-04 19:15:03 +00:00
}
2021-11-16 23:06:30 +00:00
2023-03-06 21:19:52 +00:00
func ( cp * cwtchPeer ) checkEventExperiment ( hook ProfileHook , event event . Type ) bool {
cp . experimentsLock . Lock ( )
defer cp . experimentsLock . Unlock ( )
for hookEvent := range hook . events {
if event == hookEvent {
return true
}
}
return false
}
2023-01-25 20:32:26 +00:00
func ( cp * cwtchPeer ) checkExtensionExperiment ( hook ProfileHook ) bool {
cp . experimentsLock . Lock ( )
defer cp . experimentsLock . Unlock ( )
for experiment := range hook . experiments {
if ! cp . experiments . IsEnabled ( experiment ) {
return false
}
}
return true
}
2021-11-16 23:06:30 +00:00
// attemptInsertOrAcknowledgeLegacyGroupConversation is a convenience method that looks up the conversation
// by the given handle and attempts to mark the message as acknowledged. returns error on failure
// to either find the contact or the associated message
func ( cp * cwtchPeer ) attemptInsertOrAcknowledgeLegacyGroupConversation ( conversationID int , signature string , dm * groups . DecryptedGroupMessage ) error {
2021-12-01 12:10:46 +00:00
log . Debugf ( "attempting to insert or ack group message %v %v" , conversationID , signature )
2021-11-16 23:06:30 +00:00
messageID , err := cp . GetChannelMessageBySignature ( conversationID , 0 , signature )
// We have received our own message (probably), acknowledge and move on...
if err == nil {
_ , attr , err := cp . GetChannelMessage ( conversationID , 0 , messageID )
2021-12-01 12:10:46 +00:00
if err == nil && attr [ constants . AttrAck ] != constants . True {
2021-11-16 23:06:30 +00:00
cp . mutex . Lock ( )
attr [ constants . AttrAck ] = constants . True
cp . mutex . Unlock ( )
2022-08-26 20:54:48 +00:00
cp . storage . UpdateMessageAttributes ( conversationID , 0 , messageID , attr )
2021-11-17 22:34:13 +00:00
cp . eventBus . Publish ( event . NewEvent ( event . IndexedAcknowledgement , map [ event . Field ] string { event . ConversationID : strconv . Itoa ( conversationID ) , event . Index : strconv . Itoa ( messageID ) } ) )
2021-11-16 23:06:30 +00:00
return nil
}
} else {
2022-01-20 05:48:31 +00:00
contenthash := model . CalculateContentHash ( dm . Onion , dm . Text )
id , err := cp . storage . InsertMessage ( conversationID , 0 , dm . Text , model . Attributes { constants . AttrAck : constants . True , "PreviousSignature" : base64 . StdEncoding . EncodeToString ( dm . PreviousMessageSig ) , constants . AttrAuthor : dm . Onion , constants . AttrSentTimestamp : time . Unix ( int64 ( dm . Timestamp ) , 0 ) . Format ( time . RFC3339Nano ) } , signature , contenthash )
2021-12-06 20:20:38 +00:00
if err == nil {
2022-01-20 05:48:31 +00:00
cp . eventBus . Publish ( event . NewEvent ( event . NewMessageFromGroup , map [ event . Field ] string { event . ConversationID : strconv . Itoa ( conversationID ) , event . TimestampSent : time . Unix ( int64 ( dm . Timestamp ) , 0 ) . Format ( time . RFC3339Nano ) , event . RemotePeer : dm . Onion , event . Index : strconv . Itoa ( id ) , event . Data : dm . Text , event . ContentHash : contenthash } ) )
2021-12-06 20:20:38 +00:00
}
return err
2021-11-16 23:06:30 +00:00
}
return err
}
// attemptAcknowledgeP2PConversation is a convenience method that looks up the conversation
// by the given handle and attempts to mark the message as acknowledged. returns error on failure
// to either find the contact or the associated message
func ( cp * cwtchPeer ) attemptAcknowledgeP2PConversation ( handle string , signature string ) error {
ci , err := cp . FetchConversationInfo ( handle )
// We should *never* received a peer acknowledgement for a conversation that doesn't exist...
if ci != nil && err == nil {
// for p2p messages the randomly generated event ID is the "signature"
id , err := cp . GetChannelMessageBySignature ( ci . ID , 0 , signature )
if err == nil {
_ , attr , err := cp . GetChannelMessage ( ci . ID , 0 , id )
if err == nil {
cp . mutex . Lock ( )
attr [ constants . AttrAck ] = constants . True
cp . mutex . Unlock ( )
2022-08-26 20:54:48 +00:00
cp . storage . UpdateMessageAttributes ( ci . ID , 0 , id , attr )
2021-11-17 22:34:13 +00:00
cp . eventBus . Publish ( event . NewEvent ( event . IndexedAcknowledgement , map [ event . Field ] string { event . ConversationID : strconv . Itoa ( ci . ID ) , event . RemotePeer : handle , event . Index : strconv . Itoa ( id ) } ) )
2021-11-16 23:06:30 +00:00
return nil
}
return err
}
return err
}
return err
}
// attemptErrorConversationMessage is a convenience method that looks up the conversation
// by the given handle and attempts to mark the message as errored. returns error on failure
// to either find the contact or the associated message
2021-12-19 01:51:22 +00:00
func ( cp * cwtchPeer ) attemptErrorConversationMessage ( handle string , signature string , error string ) error {
2022-10-11 17:58:10 +00:00
2021-11-16 23:06:30 +00:00
ci , err := cp . FetchConversationInfo ( handle )
2021-12-19 01:38:17 +00:00
// We should *never* received an error for a conversation that doesn't exist...
2021-11-16 23:06:30 +00:00
if ci != nil && err == nil {
2021-12-19 01:38:17 +00:00
// "signature" here is event ID for peer messages...
2021-11-16 23:06:30 +00:00
id , err := cp . GetChannelMessageBySignature ( ci . ID , 0 , signature )
if err == nil {
_ , attr , err := cp . GetChannelMessage ( ci . ID , 0 , id )
if err == nil {
cp . mutex . Lock ( )
attr [ constants . AttrErr ] = constants . True
cp . storage . UpdateMessageAttributes ( ci . ID , 0 , id , attr )
cp . mutex . Unlock ( )
2021-12-19 01:38:17 +00:00
// Send a generic indexed failure...
cp . eventBus . Publish ( event . NewEvent ( event . IndexedFailure , map [ event . Field ] string { event . ConversationID : strconv . Itoa ( ci . ID ) , event . Handle : handle , event . Error : error , event . Index : strconv . Itoa ( id ) } ) )
2021-11-16 23:06:30 +00:00
return nil
}
return err
}
return err
}
return err
}
func ( cp * cwtchPeer ) GetChannelMessageBySignature ( conversationID int , channelID int , signature string ) ( int , error ) {
return cp . storage . GetChannelMessageBySignature ( conversationID , channelID , signature )
}
2021-11-18 23:43:58 +00:00
func ( cp * cwtchPeer ) GetChannelMessageByContentHash ( conversationID int , channelID int , contenthash string ) ( int , error ) {
2021-11-23 22:26:11 +00:00
messageID , err := cp . storage . GetChannelMessageByContentHash ( conversationID , channelID , contenthash )
if err == nil {
return cp . storage . GetRowNumberByMessageID ( conversationID , channelID , messageID )
}
return - 1 , err
2021-11-18 23:43:58 +00:00
}
2021-11-19 22:04:43 +00:00
// constructGroupFromConversation returns a model.Group wrapper around a database back groups. Useful for
// encrypting / decrypting messages to/from the group.
2021-11-16 23:06:30 +00:00
func ( cp * cwtchPeer ) constructGroupFromConversation ( conversationInfo * model . Conversation ) ( * model . Group , error ) {
key := conversationInfo . Attributes [ attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupKey ) ) . ToString ( ) ]
groupKey , err := base64 . StdEncoding . DecodeString ( key )
if err != nil {
return nil , errors . New ( "group key is malformed" )
}
var groupKeyFixed [ 32 ] byte
copy ( groupKeyFixed [ : ] , groupKey [ : ] )
group := model . Group {
GroupID : conversationInfo . Handle ,
GroupServer : conversationInfo . Attributes [ attr . LocalScope . ConstructScopedZonedPath ( attr . LegacyGroupZone . ConstructZonedPath ( constants . GroupServer ) ) . ToString ( ) ] ,
GroupKey : groupKeyFixed ,
}
return & group , nil
}