This commit is contained in:
erinn 2021-11-02 15:07:24 -07:00
commit 5162561b33
29 changed files with 467 additions and 43709 deletions

4
.gitignore vendored
View File

@ -16,6 +16,10 @@ ebusgraph.txt
messages/
serverMonitorReport.txt
testing/cwtch.out.png
testing/filesharing/storage
testing/filesharing/tordir
testing/filesharing/cwtch.out.png
testing/filesharing/cwtch.out.png.manifest
testing/cwtch.out.png.manifest
testing/tordir/
tokens-bak.db

View File

@ -4,6 +4,8 @@ import (
"cwtch.im/cwtch/app/plugins"
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/storage"
@ -13,14 +15,11 @@ import (
"git.openprivacy.ca/openprivacy/log"
"io/ioutil"
"os"
"path"
path "path/filepath"
"strconv"
"sync"
)
// AttributeTag is a const name for a peer attribute that can be set at creation time, for example for versioning info
const AttributeTag = "tag"
type applicationCore struct {
eventBuses map[string]event.Manager
@ -58,7 +57,7 @@ type Application interface {
Shutdown()
GetPeer(onion string) peer.CwtchPeer
ListPeers() map[string]string
ListProfiles() []string
}
// LoadProfileFn is the function signature for a function in an app that loads a profile
@ -131,7 +130,7 @@ func (app *application) CreateTaggedPeer(name string, password string, tag strin
app.engines[profile.Onion] = engine
if tag != "" {
p.SetAttribute(AttributeTag, tag)
p.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag)
}
app.appBus.Publish(event.NewEvent(event.NewPeer, map[event.Field]string{event.Identity: profile.Onion, event.Created: event.True}))

View File

@ -9,7 +9,7 @@ import (
"git.openprivacy.ca/cwtch.im/tapir/primitives"
"git.openprivacy.ca/openprivacy/connectivity"
"git.openprivacy.ca/openprivacy/log"
"path"
path "path/filepath"
"strconv"
"sync"
)
@ -106,10 +106,6 @@ func (as *applicationService) createPeer(name, password, tag string) {
return
}
if tag != "" {
profile.SetAttribute(AttributeTag, tag)
}
profileStore := storage.CreateProfileWriterStore(as.eventBuses[profile.Onion], path.Join(as.directory, "profiles", profile.LocalID), password, profile)
peerAuthorizations := profile.ContactsAuthorizations()

View File

@ -62,14 +62,14 @@ func (ap *appletPeers) LaunchPeers() {
ap.launched = true
}
// ListPeers returns a map of onions to their profile's Name
func (ap *appletPeers) ListPeers() map[string]string {
keys := map[string]string{}
// ListProfiles returns a map of onions to their profile's Name
func (ap *appletPeers) ListProfiles() []string {
var keys []string
ap.peerLock.Lock()
defer ap.peerLock.Unlock()
for k, p := range ap.peers {
keys[k] = p.GetOnion()
for handle := range ap.peers {
keys = append(keys, handle)
}
return keys
}

View File

@ -1,86 +0,0 @@
package main
import (
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/utils"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"os"
"time"
)
func waitForPeerGroupConnection(peer peer.CwtchPeer, groupID string) error {
for {
group := peer.GetGroup(groupID)
if group != nil {
state, _ := peer.GetGroupState(groupID)
if state == connections.FAILED {
return errors.New("Connection to group " + groupID + " failed!")
}
if state != connections.AUTHENTICATED {
fmt.Printf("peer %v waiting to authenticate with group %v 's server, current state: %v\n", peer.GetOnion(), groupID, connections.ConnectionStateName[state])
time.Sleep(time.Second * 10)
continue
}
} else {
return errors.New("peer server connections should have entry for server but do not")
}
break
}
return nil
}
func main() {
if len(os.Args) != 2 {
fmt.Printf("Usage: ./servermon SERVER_ADDRESS\n")
os.Exit(1)
}
serverAddr := os.Args[1]
acn, err := tor.NewTorACN(".", "")
if err != nil {
fmt.Printf("Could not start tor: %v\n", err)
os.Exit(1)
}
app := app2.NewApp(acn, ".")
app.CreatePeer("servermon", "be gay, do crimes")
botPeer := utils.WaitGetPeer(app, "servermon")
fmt.Printf("Connecting to %v...\n", serverAddr)
botPeer.JoinServer(serverAddr)
groupID, _, err := botPeer.StartGroup(serverAddr)
if err != nil {
fmt.Printf("Error creating group on server %v: %v\n", serverAddr, err)
os.Exit(1)
}
err = waitForPeerGroupConnection(botPeer, groupID)
if err != nil {
fmt.Printf("Could not connect to server %v: %v\n", serverAddr, err)
os.Exit(1)
}
timeout := 1 * time.Second
timeElapsed := 0 * time.Second
for {
_, err := botPeer.SendMessageToGroupTracked(groupID, timeout.String())
if err != nil {
fmt.Printf("Sent to group on server %v failed at interval %v of total %v with: %v\n", serverAddr, timeout, timeElapsed, err)
os.Exit(1)
} else {
fmt.Printf("Successfully sent message to %v at interval %v of total %v\n", serverAddr, timeout, timeElapsed)
}
time.Sleep(timeout)
timeElapsed += timeout
if timeout < 2*time.Minute {
timeout = timeout * 2
}
}
}

View File

@ -1,159 +0,0 @@
package main
import (
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"os"
//"bufio"
//"cwtch.im/cwtch/storage"
)
func convertTorFile(filename string, password string) error {
return errors.New("this code doesn't work and can never work :( it's a math thing")
/*name, _ := diceware.Generate(2)
sk, err := ioutil.ReadFile("hs_ed25519_secret_key")
if err != nil {
return err
}
sk = sk[32:]
pk, err := ioutil.ReadFile("hs_ed25519_public_key")
if err != nil {
return err
}
pk = pk[32:]
onion, err := ioutil.ReadFile("hostname")
if err != nil {
return err
}
onion = onion[:56]
peer := libpeer.NewCwtchPeer(strings.Join(name, "-"))
fmt.Printf("%d %d %s\n", len(peer.GetProfile().Ed25519PublicKey), len(peer.GetProfile().Ed25519PrivateKey), peer.GetProfile().Onion)
peer.GetProfile().Ed25519PrivateKey = sk
peer.GetProfile().Ed25519PublicKey = pk
peer.GetProfile().Onion = string(onion)
fileStore := storage2.NewFileStore(filename, password)
err = fileStore.save(peer)
if err != nil {
return err
}
log.Infof("success! loaded %d byte pk and %d byte sk for %s.onion\n", len(pk), len(sk), onion)
return nil*/
}
/*
func vanity() error {
for {
pk, sk, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
continue
}
onion := utils.GetTorV3Hostname(pk)
for i := 4; i < len(os.Args); i++ {
if strings.HasPrefix(onion, os.Args[i]) {
peer := libpeer.NewCwtchPeer(os.Args[i])
peer.GetProfile().Ed25519PrivateKey = sk
peer.GetProfile().Ed25519PublicKey = pk
peer.GetProfile().Onion = onion
profileStore, _ := storage2.NewProfileStore(nil, os.Args[3], onion+".cwtch")
profileStore.Init("")
// need to signal new onion? impossible
log.Infof("found %s.onion\n", onion)
}
}
}
}*/
func printHelp() {
log.Infoln("usage: cwtchutil {help, convert-cwtch-file, convert-tor-file, changepw, vanity}")
}
func main() {
log.SetLevel(log.LevelInfo)
if len(os.Args) < 2 {
printHelp()
os.Exit(1)
}
switch os.Args[1] {
default:
printHelp()
case "help":
printHelp()
case "convert-tor-file":
if len(os.Args) != 4 {
fmt.Println("example: cwtchutil convert-tor-file /var/lib/tor/hs1 passw0rd")
os.Exit(1)
}
err := convertTorFile(os.Args[2], os.Args[3])
if err != nil {
log.Errorln(err)
}
/*case "vanity":
if len(os.Args) < 5 {
fmt.Println("example: cwtchutil vanity 4 passw0rd erinn openpriv")
os.Exit(1)
}
goroutines, err := strconv.Atoi(os.Args[2])
if err != nil {
log.Errorf("first parameter after vanity should be a number\n")
os.Exit(1)
}
log.Infoln("searching. press ctrl+c to stop")
for i := 0; i < goroutines; i++ {
go vanity()
}
for { // run until ctrl+c
time.Sleep(time.Hour * 24)
}*/
/*case "changepw":
if len(os.Args) != 3 {
fmt.Println("example: cwtch changepw ~/.cwtch/profiles/XXX")
os.Exit(1)
}
fmt.Printf("old password: ")
reader := bufio.NewReader(os.Stdin)
pw, err := reader.ReadString('\n')
if err != nil {
log.Errorln(err)
os.Exit(1)
}
pw = pw[:len(pw)-1]
profileStore, _ := storage.NewProfileStore(nil, os.Args[2], pw)
err = profileStore.Read()
if err != nil {
log.Errorln(err)
os.Exit(1)
}
fmt.Printf("new password: ")
newpw1, err := reader.ReadString('\n')
if err != nil {
log.Errorln(err)
os.Exit(1)
}
newpw1 = newpw1[:len(newpw1)-1] // fuck go with this linebreak shit ^ea
fileStore2, _ := storage.NewProfileStore(nil, os.Args[2], newpw1)
// No way to copy, populate this method
err = fileStore2.save(peer)
if err != nil {
log.Errorln(err)
os.Exit(1)
}
log.Infoln("success!")
*/
}
}

View File

@ -1,38 +0,0 @@
package main
import (
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/utils"
"cwtch.im/cwtch/event"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"os"
"path"
)
func main() {
// System Setup, We need Tor and Logging up and Running
log.AddEverythingFromPattern("peer/alice")
log.SetLevel(log.LevelDebug)
acn, err := tor.NewTorACN(path.Join(".", ".cwtch"), "")
if err != nil {
log.Errorf("\nError connecting to Tor: %v\n", err)
os.Exit(1)
}
app := app2.NewApp(acn, ".")
app.CreatePeer("alice", "be gay, do crimes")
alice := utils.WaitGetPeer(app, "alice")
app.LaunchPeers()
eventBus := app.GetEventBus(alice.GetOnion())
queue := event.NewQueue()
eventBus.Subscribe(event.NewMessageFromPeer, queue)
// For every new Data Packet Alice received she will Print it out.
for {
event := queue.Next()
log.Printf(log.LevelInfo, "Received %v from %v: %s", event.EventType, event.Data["Onion"], event.Data["Data"])
}
}

View File

@ -1,39 +0,0 @@
package main
import (
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/app/utils"
"git.openprivacy.ca/openprivacy/connectivity/tor"
"git.openprivacy.ca/openprivacy/log"
"os"
"path"
"time"
)
func main() {
// System Boilerplate, We need Tor Up and Running
log.AddEverythingFromPattern("peer/bob")
log.SetLevel(log.LevelDebug)
acn, err := tor.NewTorACN(path.Join(".", ".cwtch"), "")
if err != nil {
log.Errorf("\nError connecting to Tor: %v\n", err)
os.Exit(1)
}
app := app2.NewApp(acn, ".")
app.CreatePeer("bob", "be gay, do crimes")
bob := utils.WaitGetPeer(app, "bob")
// Add Alice's Onion Here (It changes run to run)
bob.PeerWithOnion("upiztu7myymjf2dn4x4czhagp7axlnqjvf5zwfegbhtpkqb6v3vgu5yd")
// Send the Message...
log.Infof("Waiting for Bob to Connect to Alice...")
bob.SendMessageToPeer("upiztu7myymjf2dn4x4czhagp7axlnqjvf5zwfegbhtpkqb6v3vgu5yd", "Hello Alice!!!")
// Wait a while...
// Everything is run in a goroutine so the main thread has to stay active
time.Sleep(time.Second * 100)
}

View File

@ -3,6 +3,7 @@ package utils
import (
app2 "cwtch.im/cwtch/app"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"time"
)
@ -13,9 +14,9 @@ import (
// may fill that usecase better
func WaitGetPeer(app app2.Application, name string) peer.CwtchPeer {
for {
for id := range app.ListPeers() {
peer := app.GetPeer(id)
localName, _ := peer.GetAttribute(attr.GetLocalScope("name"))
for _, handle := range app.ListProfiles() {
peer := app.GetPeer(handle)
localName, _ := peer.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
if localName == name {
return peer
}

View File

@ -149,11 +149,6 @@ const (
// GroupID
DeleteGroup = Type("DeleteGroup")
// change the .Name attribute of a profile (careful - this is not a custom attribute. it is used in the underlying protocol during handshakes!)
// attributes:
// ProfileName [eg "erinn"]
SetProfileName = Type("SetProfileName")
// request to store a profile-wide attribute (good for e.g. per-profile settings like theme prefs)
// attributes:
// Key [eg "fontcolor"]

View File

@ -2,20 +2,19 @@ package filesharing
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"math"
"path"
"strconv"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/files"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/log"
"io"
"math"
path "path/filepath"
"strconv"
)
// Functionality groups some common UI triggered functions for contacts...
@ -42,9 +41,14 @@ type OverlayMessage struct {
// DownloadFile given a profile, a conversation handle and a file sharing key, start off a download process
// to downloadFilePath
func (f *Functionality) DownloadFile(profile peer.CwtchPeer, handle string, downloadFilePath string, manifestFilePath string, key string) {
profile.SetAttribute(attr.GetLocalScope(fmt.Sprintf("%s.manifest", key)), manifestFilePath)
profile.SetAttribute(attr.GetLocalScope(fmt.Sprintf("%s.path", key)), downloadFilePath)
profile.SendGetValToPeer(handle, attr.PublicScope, fmt.Sprintf("%s.manifest.size", key))
// Store local.filesharing.filekey.manifest as the location of the manifest
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest", key), manifestFilePath)
// Store local.filesharing.filekey.path as the location of the download
profile.SetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%s.path", key), downloadFilePath)
// Get the value of conversation.filesharing.filekey.manifest.size from `handle`
profile.SendScopedZonedGetValToContact(handle, attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key))
}
// ShareFile given a profile and a conversation handle, sets up a file sharing process to share the file
@ -84,7 +88,7 @@ func (f *Functionality) ShareFile(filepath string, profile peer.CwtchPeer, handl
// manifest.FileName gets redacted in filesharing_subsystem (to remove the system-specific file hierarchy),
// but we need to *store* the full path because the sender also uses it to locate the file
lenDiff := len(filepath) - len(path.Base(filepath))
profile.SetAttribute(attr.GetPublicScope(fmt.Sprintf("%s.manifest.size", key)), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest)-lenDiff)/float64(files.DefaultChunkSize)))))
profile.SetScopedZonedAttribute(attr.ConversationScope, attr.FilesharingZone, fmt.Sprintf("%s.manifest.size", key), strconv.Itoa(int(math.Ceil(float64(len(serializedManifest)-lenDiff)/float64(files.DefaultChunkSize)))))
profile.ShareFile(key, string(serializedManifest))

View File

@ -1,9 +1,5 @@
package attr
import (
"strings"
)
/*
Scope model for peer attributes and requests
@ -16,45 +12,87 @@ values stored in the LocalScope.
*/
// Scope strongly types Scope strings
type Scope string
// ScopedZonedPath typed path with a scope and a zone
type ScopedZonedPath string
// scopes for attributes
const (
// on a peer, local and peer supplied data
LocalScope = "local"
PeerScope = "peer"
LocalScope = Scope("local")
PeerScope = Scope("peer")
ConversationScope = Scope("conversation")
// on a local profile, public data and private settings
PublicScope = "public"
SettingsScope = "settings"
PublicScope = Scope("public")
UnknownScope = Scope("unknown")
)
// Separator for scope and the rest of path
const Separator = "."
// GetPublicScope takes a path and attaches the pubic scope to it
func GetPublicScope(path string) string {
return PublicScope + Separator + path
// IntoScope converts a string to a Scope
func IntoScope(scope string) Scope {
switch scope {
case "local":
return LocalScope
case "peer":
return PeerScope
case "conversation":
return ConversationScope
case "public":
return PublicScope
}
return UnknownScope
}
// GetSettingsScope takes a path and attaches the settings scope to it
func GetSettingsScope(path string) string {
return SettingsScope + Separator + path
// ConstructScopedZonedPath enforces a scope over a zoned path
func (scope Scope) ConstructScopedZonedPath(zonedPath ZonedPath) ScopedZonedPath {
return ScopedZonedPath(string(scope) + Separator + string(zonedPath))
}
// ToString converts a ScopedZonedPath to a string
func (szp ScopedZonedPath) ToString() string {
return string(szp)
}
// IsLocal returns true if the scope is a local scope
func (scope Scope) IsLocal() bool {
return scope == LocalScope
}
// IsPeer returns true if the scope is a peer scope
func (scope Scope) IsPeer() bool {
return scope == PeerScope
}
// IsPublic returns true if the scope is a public scope
func (scope Scope) IsPublic() bool {
return scope == PublicScope
}
// IsConversation returns true if the scope is a conversation scope
func (scope Scope) IsConversation() bool {
return scope == ConversationScope
}
// GetLocalScope takes a path and attaches the local scope to it
// Deprecated: Use ConstructScopedZonedPath
func GetLocalScope(path string) string {
return LocalScope + Separator + path
return string(LocalScope) + Separator + path
}
// GetPublicScope takes a path and attaches the local scope to it
// Deprecated: Use ConstructScopedZonedPath
func GetPublicScope(path string) string {
return string(PublicScope) + Separator + path
}
// GetPeerScope takes a path and attaches the peer scope to it
// Deprecated: Use ConstructScopedZonedPath
func GetPeerScope(path string) string {
return PeerScope + Separator + path
}
// GetScopePath take a full path and returns the scope and the scope-less path
func GetScopePath(fullPath string) (string, string) {
parts := strings.SplitN(fullPath, Separator, 1)
if len(parts) != 2 {
return "", ""
}
return parts[0], parts[1]
return string(PeerScope) + Separator + path
}

52
model/attr/zone.go Normal file
View File

@ -0,0 +1,52 @@
package attr
import (
"git.openprivacy.ca/openprivacy/log"
"strings"
)
// Zone forces attributes to belong to a given subsystem e.g profile or filesharing
// Note: Zone is different from Scope which deals with public visibility of a given attribute
type Zone string
// ZonedPath explicitly types paths that contain a zone for strongly typed APIs
type ZonedPath string
const (
// ProfileZone for attributes related to profile details like name and profile image
ProfileZone = Zone("profile")
// FilesharingZone for attributes related to file sharing
FilesharingZone = Zone("filesharing")
// UnknownZone is a catch all useful for error handling
UnknownZone = Zone("unknown")
)
// ConstructZonedPath takes a path and attaches a zone to it.
// Note that this returns a ZonedPath which isn't directly usable, it must be given to ConstructScopedZonedPath
// in order to be realized into an actual attribute path.
func (zone Zone) ConstructZonedPath(path string) ZonedPath {
return ZonedPath(string(zone) + Separator + path)
}
// ParseZone takes in an untyped string and returns an explicit Zone along with the rest of the untyped path
func ParseZone(path string) (Zone, string) {
parts := strings.SplitN(path, Separator, 2)
log.Debugf("parsed zone: %v %v", parts, path)
if len(parts) != 2 {
return UnknownZone, ""
}
switch Zone(parts[0]) {
case ProfileZone:
return ProfileZone, parts[1]
case FilesharingZone:
return FilesharingZone, parts[1]
default:
return UnknownZone, parts[1]
}
}

View File

@ -0,0 +1,13 @@
package constants
// Name refers to a Profile Name
const Name = "name"
// Tag describes the type of a profile e.g. default password / encrypted etc.
const Tag = "tag"
// ProfileTypeV1DefaultPassword is a tag describing a profile protected with the default password.
const ProfileTypeV1DefaultPassword = "v1-defaultPassword"
// ProfileTypeV1Password is a tag describing a profile encrypted derived from a user-provided password.
const ProfileTypeV1Password = "v1-userPassword"

View File

@ -5,6 +5,7 @@ import (
"crypto/rand"
"crypto/sha512"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"encoding/base32"
"encoding/base64"
@ -32,18 +33,17 @@ const GroupInvitePrefix = "torv3"
// tied to a server under a given group key. Each group has a set of Messages.
type Group struct {
// GroupID is now derived from the GroupKey and the GroupServer
GroupID string
GroupKey [32]byte
GroupServer string
Timeline Timeline `json:"-"`
Accepted bool
IsCompromised bool
Attributes map[string]string
lock sync.Mutex
LocalID string
State string `json:"-"`
UnacknowledgedMessages []Message
Version int
GroupID string
GroupKey [32]byte
GroupServer string
Timeline Timeline `json:"-"`
Accepted bool
IsCompromised bool
Attributes map[string]string
lock sync.Mutex
LocalID string
State string `json:"-"`
Version int
}
// NewGroup initializes a new group associated with a given CwtchServer
@ -72,7 +72,7 @@ func NewGroup(server string) (*Group, error) {
group.Attributes = make(map[string]string)
// By default we set the "name" of the group to a random string, we can override this later, but to simplify the
// codes around invite, we assume that this is always set.
group.Attributes[attr.GetLocalScope("name")] = group.GroupID
group.Attributes[attr.GetLocalScope(constants.Name)] = group.GroupID
return group, nil
}
@ -99,7 +99,7 @@ func (g *Group) Invite() (string, error) {
gci := &groups.GroupInvite{
GroupID: g.GroupID,
GroupName: g.Attributes[attr.GetLocalScope("name")],
GroupName: g.Attributes[attr.GetLocalScope(constants.Name)],
SharedKey: g.GroupKey[:],
ServerHost: g.GroupServer,
}
@ -122,7 +122,7 @@ func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte
PreviousMessageSig: message.PreviousMessageSig,
ReceivedByServer: false,
}
g.UnacknowledgedMessages = append(g.UnacknowledgedMessages, timelineMessage)
g.Timeline.Insert(&timelineMessage)
return timelineMessage
}
@ -130,35 +130,30 @@ func (g *Group) AddSentMessage(message *groups.DecryptedGroupMessage, sig []byte
func (g *Group) ErrorSentMessage(sig []byte, error string) bool {
g.lock.Lock()
defer g.lock.Unlock()
var message *Message
// Delete the message from the unack'd buffer if it exists
for i, unAckedMessage := range g.UnacknowledgedMessages {
if compareSignatures(unAckedMessage.Signature, sig) {
message = &unAckedMessage
g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...)
message.Error = error
g.Timeline.Insert(message)
return true
}
}
return false
return g.Timeline.SetSendError(sig, error)
}
// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline
func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, bool) {
// GetMessage returns the message at index `index` if it exists. Otherwise returns false.
// This routine also returns the length of the timeline
// If go has an optional type this would return Option<Message>...
func (g *Group) GetMessage(index int) (bool, Message, int) {
g.lock.Lock()
defer g.lock.Unlock()
// Delete the message from the unack'd buffer if it exists
for i, unAckedMessage := range g.UnacknowledgedMessages {
if compareSignatures(unAckedMessage.Signature, sig) {
g.UnacknowledgedMessages = append(g.UnacknowledgedMessages[:i], g.UnacknowledgedMessages[i+1:]...)
break
}
length := len(g.Timeline.Messages)
if length > index {
return true, g.Timeline.Messages[index], length
}
return false, Message{}, length
}
// AddMessage takes a DecryptedGroupMessage and adds it to the Groups Timeline
func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*Message, int) {
g.lock.Lock()
defer g.lock.Unlock()
timelineMessage := &Message{
Message: message.Text,
@ -171,16 +166,16 @@ func (g *Group) AddMessage(message *groups.DecryptedGroupMessage, sig []byte) (*
Error: "",
Acknowledged: true,
}
seen := g.Timeline.Insert(timelineMessage)
index := g.Timeline.Insert(timelineMessage)
return timelineMessage, seen
return timelineMessage, index
}
// GetTimeline provides a safe copy of the timeline
func (g *Group) GetTimeline() (timeline []Message) {
g.lock.Lock()
defer g.lock.Unlock()
return append(g.Timeline.GetMessages(), g.UnacknowledgedMessages...)
return g.Timeline.GetMessages()
}
//EncryptMessage takes a message and encrypts the message under the group key.

View File

@ -61,18 +61,17 @@ func TestGroupErr(t *testing.T) {
func TestGroupValidation(t *testing.T) {
group := &Group{
GroupID: "",
GroupKey: [32]byte{},
GroupServer: "",
Timeline: Timeline{},
Accepted: false,
IsCompromised: false,
Attributes: nil,
lock: sync.Mutex{},
LocalID: "",
State: "",
UnacknowledgedMessages: nil,
Version: 0,
GroupID: "",
GroupKey: [32]byte{},
GroupServer: "",
Timeline: Timeline{},
Accepted: false,
IsCompromised: false,
Attributes: nil,
lock: sync.Mutex{},
LocalID: "",
State: "",
Version: 0,
}
invite, _ := group.Invite()

View File

@ -17,7 +17,7 @@ type Timeline struct {
lock sync.Mutex
// a cache to allow quick checks for existing messages...
signatureCache map[string]bool
signatureCache map[string]int
// a cache to allowing looking up messages by content hash
// we need this for features like reply-to message, and other self
@ -90,12 +90,10 @@ func (t *Timeline) GetCopy() *Timeline {
// SetMessages sets the Messages of this timeline. Only to be used in loading/initialization
func (t *Timeline) SetMessages(messages []Message) {
t.lock.Lock()
defer t.lock.Unlock()
t.init()
t.Messages = messages
for idx, message := range t.Messages {
t.signatureCache[base64.StdEncoding.EncodeToString(message.Signature)] = true
t.hashCache[t.calculateHash(message)] = append(t.hashCache[t.calculateHash(message)], idx)
t.lock.Unlock()
for _, m := range messages {
t.Insert(&m)
}
}
@ -179,7 +177,7 @@ func (t *Timeline) Sort() {
}
// Insert a message into the timeline in a thread safe way.
func (t *Timeline) Insert(mi *Message) bool {
func (t *Timeline) Insert(mi *Message) int {
t.lock.Lock()
defer t.lock.Unlock()
@ -188,28 +186,44 @@ func (t *Timeline) Insert(mi *Message) bool {
// check that we haven't seen this message before (this has no impact on p2p messages, but is essential for
// group messages)
_, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)]
idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)]
if exists {
return true
t.Messages[idx].Acknowledged = true
return idx
}
// update the message store
t.Messages = append(t.Messages, *mi)
// add to signature cache for fast checking of group messages...
t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = true
t.signatureCache[base64.StdEncoding.EncodeToString(mi.Signature)] = len(t.Messages) - 1
// content based addressing index
contentHash := t.calculateHash(*mi)
t.hashCache[contentHash] = append(t.hashCache[contentHash], len(t.Messages)-1)
return false
return len(t.Messages) - 1
}
func (t *Timeline) init() {
// only allow this setting once...
if t.signatureCache == nil {
t.signatureCache = make(map[string]bool)
t.signatureCache = make(map[string]int)
}
if t.hashCache == nil {
t.hashCache = make(map[string][]int)
}
}
// SetSendError marks a message has having some kind of application specific error.
// Note: The message here is indexed by signature.
func (t *Timeline) SetSendError(sig []byte, e string) bool {
t.lock.Lock()
defer t.lock.Unlock()
idx, exists := t.signatureCache[base64.StdEncoding.EncodeToString(sig)]
if !exists {
return false
}
t.Messages[idx].Error = e
return true
}

View File

@ -3,6 +3,7 @@ package model
import (
"crypto/rand"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/protocol/groups"
"encoding/base32"
"encoding/base64"
@ -403,7 +404,7 @@ func (p *Profile) ProcessInvite(invite string) (string, error) {
group.GroupServer = gci.ServerHost
group.Accepted = false
group.Attributes = make(map[string]string)
group.Attributes[attr.GetLocalScope("name")] = gci.GroupName
group.Attributes[attr.GetLocalScope(constants.Name)] = gci.GroupName
p.AddGroup(group)
return gci.GroupID, nil
}
@ -422,7 +423,7 @@ func (p *Profile) AddGroup(group *Group) {
// AttemptDecryption takes a ciphertext and signature and attempts to decrypt it under known groups.
// If successful, adds the message to the group's timeline
func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, bool) {
func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool, string, *Message, int) {
for _, group := range p.Groups {
success, dgm := group.DecryptMessage(ciphertext)
if success {
@ -433,7 +434,7 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool,
// Someone send a message that isn't a valid Decrypted Group Message. Since we require this struct in orer
// to verify the message, we simply ignore it.
if err != nil {
return false, group.GroupID, nil, false
return false, group.GroupID, nil, -1
}
// This now requires knowledge of the Sender, the Onion and the Specific Decrypted Group Message (which should only
@ -459,15 +460,15 @@ func (p *Profile) AttemptDecryption(ciphertext []byte, signature []byte) (bool,
// Either way, someone who has the private key is being detectably bad so we are just going to throw this message away and mark the group as Compromised.
if !verified {
group.Compromised()
return false, group.GroupID, nil, false
return false, group.GroupID, nil, -1
}
message, seen := group.AddMessage(dgm, signature)
return true, group.GroupID, message, seen
message, index := group.AddMessage(dgm, signature)
return true, group.GroupID, message, index
}
}
// If we couldn't find a group to decrypt the message with we just return false. This is an expected case
return false, "", nil, false
return false, "", nil, -1
}
func getRandomness(arr *[]byte) {

View File

@ -1,6 +1,7 @@
package peer
import (
"cwtch.im/cwtch/model/constants"
"encoding/base32"
"encoding/base64"
"encoding/json"
@ -54,24 +55,81 @@ type cwtchPeer struct {
eventBus event.Manager
}
func (cp *cwtchPeer) SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, path string) {
ev := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, handle, event.Scope, string(scope), event.Path, string(zone.ConstructZonedPath(path)))
cp.eventBus.Publish(ev)
}
func (cp *cwtchPeer) GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key))
log.Debugf("looking up attribute %v %v %v (%v)", scope, zone, key, scopedZonedKey)
if val, exists := cp.Profile.GetAttribute(scopedZonedKey.ToString()); exists {
return val, true
}
return "", false
}
func (cp *cwtchPeer) SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string, value string) {
cp.mutex.Lock()
scopedZonedKey := scope.ConstructScopedZonedPath(zone.ConstructZonedPath(key))
log.Debugf("storing attribute: %v = %v", scopedZonedKey, value)
cp.Profile.SetAttribute(scopedZonedKey.ToString(), value)
defer cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{
event.Key: scopedZonedKey.ToString(),
event.Data: value,
}))
}
// SendMessage is a higher level that merges sending messages to contacts and group handles
// If you try to send a message to a handle that doesn't exist, malformed or an incorrect type then
// this function will error
func (cp *cwtchPeer) SendMessage(handle string, message string) error {
cp.mutex.Lock()
defer cp.mutex.Unlock()
var ev event.Event
// Group Handles are always 32 bytes in length, but we forgo any further testing here
// and delegate the group existence check to SendMessageToGroupTracked
// and delegate the group existence check to EncryptMessageToGroup
if len(handle) == 32 {
_, err := cp.SendMessageToGroupTracked(handle, message)
return err
group := cp.Profile.GetGroup(handle)
if group == nil {
return errors.New("invalid group id")
}
// Group adds it's own sent message to timeline
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, handle)
// Group does not exist or some other unrecoverable error...
if err != nil {
return err
}
ev = event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: handle, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)})
} else if tor.IsValidHostname(handle) {
// We assume we are sending to a Contact.
// (Servers are technically Contacts)
cp.SendMessageToPeer(handle, message)
contact, exists := cp.Profile.GetContact(handle)
ev = event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: handle, event.Data: message})
// If the contact exists replace the event id wih the index of this message in the contacts timeline...
// Otherwise assume we don't log the message in the timeline...
if exists {
ev.EventID = strconv.Itoa(contact.Timeline.Len())
cp.Profile.AddSentMessageToContactTimeline(handle, message, time.Now(), ev.EventID)
}
// Regardless we publish the send message to peer event for the protocol engine to execute on...
// We assume this is always successful as it is always valid to attempt to
// Contact a valid hostname
return nil
} else {
return errors.New("malformed handle type")
}
return errors.New("malformed handle type")
cp.eventBus.Publish(ev)
return nil
}
func (cp *cwtchPeer) UpdateMessageFlags(handle string, mIdx int, flags uint64) {
@ -163,26 +221,17 @@ type ModifyServers interface {
type SendMessages interface {
SendMessage(handle string, message string) error
// Deprecated: is unsafe
SendGetValToPeer(string, string, string)
// Deprecated
SendMessageToPeer(string, string) string
SendScopedZonedGetValToContact(handle string, scope attr.Scope, zone attr.Zone, key string)
// TODO This should probably not be exposed
StoreMessage(onion string, messageTxt string, sent time.Time)
// TODO Extract once groups are stable
// TODO
// Deprecated use overlays instead
InviteOnionToGroup(string, string) error
}
// SendMessagesToGroup enables a caller to sender messages to a group
type SendMessagesToGroup interface {
// Deprecated
SendMessageToGroupTracked(string, string) (string, error)
}
// ModifyMessages enables a caller to modify the messages in a timline
// ModifyMessages enables a caller to modify the messages in a timeline
type ModifyMessages interface {
UpdateMessageFlags(string, int, uint64)
}
@ -200,10 +249,19 @@ type CwtchPeer interface {
StartServerConnections()
Shutdown()
// Relating to local attributes
// GetOnion is deprecated. If you find yourself needing to rely on this method it is time
// to consider replacing this with a GetAddress(es) function that can fully expand cwtch beyond the boundaries
// of tor v3 onion services.
// Deprecated
GetOnion() string
SetAttribute(string, string)
GetAttribute(string) (string, bool)
// SetScopedZonedAttribute allows the setting of an attribute by scope and zone
// scope.zone.key = value
SetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string, value string)
// GetScopedZonedAttribute allows the retrieval of an attribute by scope and zone
// scope.zone.key = value
GetScopedZonedAttribute(scope attr.Scope, zone attr.Zone, key string) (string, bool)
ReadContacts
ModifyContacts
@ -219,7 +277,6 @@ type CwtchPeer interface {
SendMessages
ModifyMessages
SendMessagesToGroup
ShareFile(fileKey string, serializedManifest string)
}
@ -243,6 +300,58 @@ func FromProfile(profile *model.Profile) CwtchPeer {
// Init instantiates a cwtchPeer
func (cp *cwtchPeer) Init(eventBus event.Manager) {
cp.InitForEvents(eventBus, DefaultEventsToHandle)
// Upgrade the Cwtch Peer if necessary
// It would be nice to do these checks in the storage engine itself, but it is easier to do them here
// rather than duplicating the logic to construct/reconstruct attributes in storage engine...
// TODO: Remove these checks after Cwtch ~1.5 storage engine is implemented
if _, exists := cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name); !exists {
// If public.profile.name does not exist, and we have an existing public.name then:
// set public.profile.name from public.name
// set local.profile.name from public.name
if name, exists := cp.Profile.GetAttribute(attr.GetPublicScope(constants.Name)); exists {
cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name)
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name)
} else {
// Otherwise check if local.name exists and set it from that
// If not, then check the very old unzoned, unscoped name.
// If not, then set directly from Profile.Name...
if name, exists := cp.Profile.GetAttribute(attr.GetLocalScope(constants.Name)); exists {
cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name)
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name)
} else if name, exists := cp.Profile.GetAttribute(constants.Name); exists {
cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, name)
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, name)
} else {
// Profile.Name is very deprecated at this point...
cp.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, cp.Profile.Name)
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, cp.Profile.Name)
}
}
}
// At this point we can safely assume that public.profile.name exists
localName, _ := cp.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
publicName, _ := cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
if localName != publicName {
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name, publicName)
}
// At this point we can safely assume that public.profile.name exists AND is consistent with
// local.profile.name - regardless of whatever Cwtch version we have upgraded from. This will
// be important after Cwtch 1.5 when we purge all previous references to local.profile.name and
// profile-> name - and remove all name processing code from libcwtch-go.
// If local.profile.tag does not exist then set it from deprecated GetAttribute
if _, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag); !exists {
if tag, exists := cp.Profile.GetAttribute(constants.Tag); exists {
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, tag)
} else {
// Assume a default password, which will allow the older profile to have it's password reset by the UI
cp.SetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Tag, constants.ProfileTypeV1DefaultPassword)
}
}
}
func (cp *cwtchPeer) InitForEvents(eventBus event.Manager, toBeHandled []event.Type) {
@ -501,7 +610,7 @@ func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
invite, err := group.Invite()
cp.mutex.Unlock()
if err == nil {
cp.SendMessageToPeer(onion, invite)
err = cp.SendMessage(onion, invite)
}
return err
}
@ -537,45 +646,9 @@ func (cp *cwtchPeer) ResyncServer(onion string) error {
return errors.New("no keys found for server connection")
}
// SendMessageToGroupTracked attempts to sent the given message to the given group id.
// It returns the signature of the message which can be used to identify it in any UX layer.
func (cp *cwtchPeer) SendMessageToGroupTracked(groupid string, message string) (string, error) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
group := cp.Profile.GetGroup(groupid)
if group == nil {
return "", errors.New("invalid group id")
}
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
if err == nil {
cp.eventBus.Publish(event.NewEvent(event.SendMessageToGroup, map[event.Field]string{event.GroupID: groupid, event.GroupServer: group.GroupServer, event.Ciphertext: base64.StdEncoding.EncodeToString(ct), event.Signature: base64.StdEncoding.EncodeToString(sig)}))
}
return base64.StdEncoding.EncodeToString(sig), err
}
func (cp *cwtchPeer) SendMessageToPeer(onion string, message string) string {
cp.mutex.Lock()
defer cp.mutex.Unlock()
event := event.NewEvent(event.SendMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Data: message})
contact, exists := cp.Profile.GetContact(onion)
// If the contact exists replace the event id wih the index of this message in the contacts timeline...
// Otherwise assume we don't log the message in the timeline...
if exists {
event.EventID = strconv.Itoa(contact.Timeline.Len())
cp.Profile.AddSentMessageToContactTimeline(onion, message, time.Now(), event.EventID)
}
// Regardless we publish the send message to peer event for the protocol engine to execute on...
cp.eventBus.Publish(event)
return event.EventID
}
func (cp *cwtchPeer) SendGetValToPeer(onion string, scope string, path string) {
event := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, onion, event.Scope, scope, event.Path, path)
cp.eventBus.Publish(event)
ev := event.NewEventList(event.SendGetValMessageToPeer, event.RemotePeer, onion, event.Scope, scope, event.Path, path)
cp.eventBus.Publish(ev)
}
// BlockPeer blocks an existing peer relationship.
@ -596,9 +669,9 @@ func (cp *cwtchPeer) AcceptInvite(groupID string) error {
return err
}
cp.eventBus.Publish(event.NewEvent(event.AcceptGroupInvite, map[event.Field]string{event.GroupID: groupID}))
cp.JoinServer(cp.Profile.Groups[groupID].GroupServer)
err = cp.JoinServer(cp.Profile.Groups[groupID].GroupServer)
return nil
return err
}
// RejectInvite rejects a given group invite.
@ -609,7 +682,7 @@ func (cp *cwtchPeer) RejectInvite(groupID string) {
cp.eventBus.Publish(event.NewEvent(event.RejectGroupInvite, map[event.Field]string{event.GroupID: groupID}))
}
// Listen makes the peer open a listening port to accept incoming connections (and be detactably online)
// Listen makes the peer open a listening port to accept incoming connections (and be detectably online)
func (cp *cwtchPeer) Listen() {
cp.mutex.Lock()
defer cp.mutex.Unlock()
@ -635,37 +708,15 @@ func (cp *cwtchPeer) StartPeersConnections() {
func (cp *cwtchPeer) StartServerConnections() {
for _, contact := range cp.GetContacts() {
if cp.GetContact(contact).IsServer() {
cp.JoinServer(contact)
err := cp.JoinServer(contact)
if err != nil {
// Almost certainly a programming error so print it..
log.Errorf("error joining server %v", err)
}
}
}
}
// SetAttribute sets an attribute for this profile and emits an event
func (cp *cwtchPeer) SetAttribute(key string, val string) {
cp.mutex.Lock()
cp.Profile.SetAttribute(key, val)
defer cp.mutex.Unlock()
cp.eventBus.Publish(event.NewEvent(event.SetAttribute, map[event.Field]string{
event.Key: key,
event.Data: val,
}))
}
// GetAttribute gets an attribute for the profile
func (cp *cwtchPeer) GetAttribute(key string) (string, bool) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if val, exists := cp.Profile.GetAttribute(key); exists {
return val, true
}
if key == attr.GetLocalScope("name") {
return cp.Profile.Name, true
}
return "", false
}
// SetContactAttribute sets an attribute for the indicated contact and emits an event
func (cp *cwtchPeer) SetContactAttribute(onion string, key string, val string) {
cp.mutex.Lock()
@ -726,7 +777,7 @@ func (cp *cwtchPeer) Shutdown() {
cp.queue.Shutdown()
}
func (cp *cwtchPeer) StoreMessage(onion string, messageTxt string, sent time.Time) {
func (cp *cwtchPeer) storeMessage(onion string, messageTxt string, sent time.Time) {
if cp.GetContact(onion) == nil {
cp.AddContact(onion, onion, model.AuthUnknown)
}
@ -765,10 +816,10 @@ func (cp *cwtchPeer) eventHandler() {
cp.SetContactAttribute(ev.Data[event.GroupServer], lastKnownSignature, ev.Data[event.Signature])
cp.mutex.Lock()
ok, groupID, message, seen := cp.Profile.AttemptDecryption(ciphertext, signature)
ok, groupID, message, index := cp.Profile.AttemptDecryption(ciphertext, signature)
cp.mutex.Unlock()
if ok && !seen {
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID}))
if ok && index > -1 {
cp.eventBus.Publish(event.NewEvent(event.NewMessageFromGroup, map[event.Field]string{event.TimestampReceived: message.Received.Format(time.RFC3339Nano), event.TimestampSent: message.Timestamp.Format(time.RFC3339Nano), event.Data: message.Message, event.GroupID: groupID, event.Signature: base64.StdEncoding.EncodeToString(message.Signature), event.PreviousSignature: base64.StdEncoding.EncodeToString(message.PreviousMessageSig), event.RemotePeer: message.PeerID, event.Index: strconv.Itoa(index)}))
}
// The group has been compromised
@ -779,7 +830,7 @@ func (cp *cwtchPeer) eventHandler() {
}
case event.NewMessageFromPeer: //event.TimestampReceived, event.RemotePeer, event.Data
ts, _ := time.Parse(time.RFC3339Nano, ev.Data[event.TimestampReceived])
cp.StoreMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
cp.storeMessage(ev.Data[event.RemotePeer], ev.Data[event.Data], ts)
case event.PeerAcknowledgement:
cp.mutex.Lock()
@ -805,18 +856,29 @@ func (cp *cwtchPeer) eventHandler() {
case event.RetryServerRequest:
// Automated Join Server Request triggered by a plugin.
log.Debugf("profile received an automated retry event for %v", ev.Data[event.GroupServer])
cp.JoinServer(ev.Data[event.GroupServer])
err := cp.JoinServer(ev.Data[event.GroupServer])
if err != nil {
log.Errorf("error joining server... %v", err)
}
case event.NewGetValMessageFromPeer:
onion := ev.Data[event.RemotePeer]
scope := ev.Data[event.Scope]
path := ev.Data[event.Path]
log.Debugf("NewGetValMessageFromPeer for %v%v from %v\n", scope, path, onion)
log.Debugf("NewGetValMessageFromPeer for %v.%v from %v\n", scope, path, onion)
remotePeer := cp.GetContact(onion)
if remotePeer != nil && remotePeer.Authorization == model.AuthApproved {
if scope == attr.PublicScope {
val, exists := cp.GetAttribute(attr.GetPublicScope(path))
scope := attr.IntoScope(scope)
if scope.IsPublic() || scope.IsConversation() {
zone, zpath := attr.ParseZone(path)
val, exists := cp.GetScopedZonedAttribute(scope, zone, zpath)
// NOTE: Temporary Override because UI currently wipes names if it can't find them...
if !exists && zone == attr.UnknownZone && path == constants.Name {
val, exists = cp.GetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name)
}
resp := event.NewEvent(event.SendRetValMessageToPeer, map[event.Field]string{event.RemotePeer: onion, event.Exists: strconv.FormatBool(exists)})
resp.EventID = ev.EventID
if exists {
@ -830,7 +892,7 @@ func (cp *cwtchPeer) eventHandler() {
}
}
/***** Non default but requestable handlable events *****/
/***** Non default but requestable handleable events *****/
case event.ManifestReceived:
log.Debugf("Manifest Received Event!: %v", ev)
@ -838,9 +900,9 @@ func (cp *cwtchPeer) eventHandler() {
fileKey := ev.Data[event.FileKey]
serializedManifest := ev.Data[event.SerializedManifest]
manifestFilePath, exists := cp.GetAttribute(attr.GetLocalScope(fmt.Sprintf("%v.manifest", fileKey)))
manifestFilePath, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fmt.Sprintf("%v.manifest", fileKey))
if exists {
downloadFilePath, exists := cp.GetAttribute(attr.GetLocalScope(fileKey))
downloadFilePath, exists := cp.GetScopedZonedAttribute(attr.LocalScope, attr.FilesharingZone, fileKey)
if exists {
log.Debugf("downloading manifest to %v, file to %v", manifestFilePath, downloadFilePath)
var manifest files.Manifest
@ -886,22 +948,27 @@ func (cp *cwtchPeer) eventHandler() {
exists, _ := strconv.ParseBool(ev.Data[event.Exists])
log.Debugf("NewRetValMessageFromPeer %v %v%v %v %v\n", onion, scope, path, exists, val)
if exists {
if scope == attr.PublicScope {
if strings.HasSuffix(path, ".manifest.size") {
fileKey := strings.Replace(path, ".manifest.size", "", 1)
size, err := strconv.Atoi(val)
// if size is valid and below the maximum size for a manifest
// this is to prevent malicious sharers from using large amounts of memory when distributing
// a manifest as we reconstruct this in-memory
if err == nil && size < files.MaxManifestSize {
cp.eventBus.Publish(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: val, event.Handle: onion}))
} else {
cp.eventBus.Publish(event.NewEvent(event.ManifestError, map[event.Field]string{event.FileKey: fileKey, event.Handle: onion}))
}
// Handle File Sharing Metadata
// TODO This probably should be broken out to it's own code..
zone, path := attr.ParseZone(path)
if attr.Scope(scope).IsConversation() && zone == attr.FilesharingZone && strings.HasSuffix(path, ".manifest.size") {
fileKey := strings.Replace(path, ".manifest.size", "", 1)
size, err := strconv.Atoi(val)
// if size is valid and below the maximum size for a manifest
// this is to prevent malicious sharers from using large amounts of memory when distributing
// a manifest as we reconstruct this in-memory
if err == nil && size < files.MaxManifestSize {
cp.eventBus.Publish(event.NewEvent(event.ManifestSizeReceived, map[event.Field]string{event.FileKey: fileKey, event.ManifestSize: val, event.Handle: onion}))
} else {
cp.SetContactAttribute(onion, attr.GetPeerScope(path), val)
cp.eventBus.Publish(event.NewEvent(event.ManifestError, map[event.Field]string{event.FileKey: fileKey, event.Handle: onion}))
}
}
// Allow public profile parameters to be added as peer specific attributes...
if attr.Scope(scope).IsPublic() && zone == attr.ProfileZone {
cp.SetContactAttribute(onion, attr.GetPeerScope(path), val)
}
}
case event.PeerStateChange:
cp.mutex.Lock()

View File

@ -171,7 +171,7 @@ func (ta *TokenBoardClient) Post(ct []byte, sig []byte) (bool, int) {
// MakePayment uses the PoW based token protocol to obtain more tokens
func (ta *TokenBoardClient) MakePayment() error {
log.Debugf("Making a Payment %v", ta)
log.Debugf("Making a Payment")
id, sk := primitives.InitializeEphemeralIdentity()
client := new(tor.BaseOnionService)
client.Init(ta.acn, sk, &id)
@ -201,7 +201,7 @@ func (ta *TokenBoardClient) MakePayment() error {
conn.Close()
return nil
}
log.Errorf("invalid cast of powapp. this should not happen %v %v", powtapp, reflect.TypeOf(conn.App()))
log.Errorf("invalid cast of powapp. this should never happen %v %v", powtapp, reflect.TypeOf(conn.App()))
return errors.New("invalid cast of powapp. this should never happen")
}
log.Debugf("could not connect to payment server %v..trying again")

View File

@ -4,7 +4,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"path"
path "path/filepath"
"strconv"
"strings"
"sync"

View File

@ -50,7 +50,7 @@ func TestProfileStoreUpgradeV0toV1(t *testing.T) {
fmt.Println("Sending 200 messages...")
for i := 0; i < 200; i++ {
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage)
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), profile.Onion, testMessage, []byte{byte(i)})
}
fmt.Println("Shutdown v0 profile store...")

View File

@ -63,10 +63,10 @@ func (ps *ProfileStoreV0) AddGroup(invite string) {
}
// AddGroupMessage for testing, adds a group message
func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string) {
func (ps *ProfileStoreV0) AddGroupMessage(groupid string, timeSent, timeRecvied string, remotePeer, data string, signature []byte) {
received, _ := time.Parse(time.RFC3339Nano, timeRecvied)
sent, _ := time.Parse(time.RFC3339Nano, timeSent)
message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: []byte("signature"), PreviousMessageSig: []byte("PreviousSignature")}
message := model.Message{Received: received, Timestamp: sent, Message: data, PeerID: remotePeer, Signature: signature, PreviousMessageSig: []byte("PreviousSignature")}
ss, exists := ps.streamStores[groupid]
if exists {
ss.Write(message)

View File

@ -42,7 +42,7 @@ func TestProfileStoreWriteRead(t *testing.T) {
ps1.AddGroup(invite)
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage)
ps1.AddGroupMessage(groupid, time.Now().Format(time.RFC3339Nano), time.Now().Format(time.RFC3339Nano), ps1.getProfileCopy(true).Onion, testMessage, []byte{byte(0x01)})
ps1.Shutdown()

View File

@ -85,7 +85,6 @@ func (ps *ProfileStoreV1) initProfileWriterStore() {
ps.eventManager.Subscribe(event.SetPeerAuthorization, ps.queue)
ps.eventManager.Subscribe(event.PeerCreated, ps.queue)
ps.eventManager.Subscribe(event.GroupCreated, ps.queue)
ps.eventManager.Subscribe(event.SetProfileName, ps.queue)
ps.eventManager.Subscribe(event.SetAttribute, ps.queue)
ps.eventManager.Subscribe(event.SetPeerAttribute, ps.queue)
ps.eventManager.Subscribe(event.SetGroupAttribute, ps.queue)
@ -361,10 +360,6 @@ func (ps *ProfileStoreV1) eventHandler() {
ps.profile.AddGroup(group)
ps.streamStores[group.GroupID] = NewStreamStore(ps.directory, group.LocalID, ps.key)
ps.save()
case event.SetProfileName:
ps.profile.Name = ev.Data[event.ProfileName]
ps.profile.SetAttribute("name", ev.Data[event.ProfileName])
ps.save()
case event.SetAttribute:
ps.profile.SetAttribute(ev.Data[event.Key], ev.Data[event.Data])
ps.save()

View File

@ -5,6 +5,7 @@ package v1
import (
"cwtch.im/cwtch/event"
"cwtch.im/cwtch/model"
"encoding/base64"
"fmt"
"log"
"os"
@ -107,6 +108,7 @@ func TestProfileStoreChangePassword(t *testing.T) {
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: profile.Onion,
event.Data: testMessage,
event.Signature: base64.StdEncoding.EncodeToString([]byte{byte(i)}),
}))
}
@ -129,6 +131,7 @@ func TestProfileStoreChangePassword(t *testing.T) {
event.TimestampReceived: time.Now().Format(time.RFC3339Nano),
event.RemotePeer: profile.Onion,
event.Data: testMessage,
event.Signature: base64.StdEncoding.EncodeToString([]byte{0x01, byte(i)}),
}))
}
time.Sleep(3 * time.Second)

View File

@ -8,6 +8,7 @@ import (
"cwtch.im/cwtch/event/bridge"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"encoding/base64"
@ -41,7 +42,7 @@ func printAndCountVerifedTimeline(t *testing.T, timeline []model.Message) int {
}
func waitForPeerGroupConnection(t *testing.T, peer peer.CwtchPeer, groupID string) {
peerName, _ := peer.GetAttribute(attr.GetLocalScope("name"))
peerName, _ := peer.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
for {
fmt.Printf("%v checking group connection...\n", peerName)
state, ok := peer.GetGroupState(groupID)
@ -77,8 +78,8 @@ func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.Cw
time.Sleep(time.Second * 5)
continue
} else {
peerAName, _ := peera.GetAttribute(attr.GetLocalScope("name"))
peerBName, _ := peerb.GetAttribute(attr.GetLocalScope("name"))
peerAName, _ := peera.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
peerBName, _ := peerb.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
fmt.Printf("%v CONNECTED and AUTHED to %v\n", peerAName, peerBName)
break
}
@ -154,17 +155,17 @@ func TestCwtchPeerIntegration(t *testing.T) {
alice := utils.WaitGetPeer(app, "alice")
fmt.Println("Alice created:", alice.GetOnion())
alice.SetAttribute(attr.GetPublicScope("name"), "Alice")
alice.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Alice")
alice.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
bob := utils.WaitGetPeer(app, "bob")
fmt.Println("Bob created:", bob.GetOnion())
bob.SetAttribute(attr.GetPublicScope("name"), "Bob")
bob.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Bob")
bob.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
carol := utils.WaitGetPeer(appClient, "carol")
fmt.Println("Carol created:", carol.GetOnion())
carol.SetAttribute(attr.GetPublicScope("name"), "Carol")
carol.SetScopedZonedAttribute(attr.PublicScope, attr.ProfileZone, constants.Name, "Carol")
carol.AutoHandleEvents([]event.Type{event.PeerStateChange, event.ServerStateChange, event.NewGroupInvite, event.NewRetValMessageFromPeer})
app.LaunchPeers()
@ -217,32 +218,34 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Println("Alice and Bob getVal public.name...")
alice.SendGetValToPeer(bob.GetOnion(), attr.PublicScope, "name")
bob.SendGetValToPeer(alice.GetOnion(), attr.PublicScope, "name")
alice.SendScopedZonedGetValToContact(bob.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
bob.SendScopedZonedGetValToContact(alice.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
alice.SendGetValToPeer(carol.GetOnion(), attr.PublicScope, "name")
carol.SendGetValToPeer(alice.GetOnion(), attr.PublicScope, "name")
alice.SendScopedZonedGetValToContact(carol.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
carol.SendScopedZonedGetValToContact(alice.GetOnion(), attr.PublicScope, attr.ProfileZone, constants.Name)
time.Sleep(10 * time.Second)
// This used to be 10, but increasing it to 30 because this is now causing frequent issues
// Probably related to latency/throughput problems in the underlying tor network.
time.Sleep(30 * time.Second)
aliceName, exists := bob.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope("name"))
aliceName, exists := bob.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope(constants.Name))
if !exists || aliceName != "Alice" {
t.Fatalf("Bob: alice GetKeyVal error on alice peer.name %v\n", exists)
}
fmt.Printf("Bob has alice's name as '%v'\n", aliceName)
bobName, exists := alice.GetContactAttribute(bob.GetOnion(), attr.GetPeerScope("name"))
bobName, exists := alice.GetContactAttribute(bob.GetOnion(), attr.GetPeerScope(constants.Name))
if !exists || bobName != "Bob" {
t.Fatalf("Alice: bob GetKeyVal error on bob peer.name\n")
}
fmt.Printf("Alice has bob's name as '%v'\n", bobName)
aliceName, exists = carol.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope("name"))
aliceName, exists = carol.GetContactAttribute(alice.GetOnion(), attr.GetPeerScope(constants.Name))
if !exists || aliceName != "Alice" {
t.Fatalf("carol GetKeyVal error for alice peer.name %v\n", exists)
}
carolName, exists := alice.GetContactAttribute(carol.GetOnion(), attr.GetPeerScope("name"))
carolName, exists := alice.GetContactAttribute(carol.GetOnion(), attr.GetPeerScope(constants.Name))
if !exists || carolName != "Carol" {
t.Fatalf("alice GetKeyVal error, carol peer.name\n")
}
@ -280,25 +283,25 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Println("Starting conversation in group...")
// Conversation
fmt.Printf("%v> %v\n", aliceName, aliceLines[0])
_, err = alice.SendMessageToGroupTracked(groupID, aliceLines[0])
err = alice.SendMessage(groupID, aliceLines[0])
if err != nil {
t.Fatalf("Alice failed to send a message to the group: %v", err)
}
time.Sleep(time.Second * 10)
fmt.Printf("%v> %v\n", bobName, bobLines[0])
_, err = bob.SendMessageToGroupTracked(groupID, bobLines[0])
err = bob.SendMessage(groupID, bobLines[0])
if err != nil {
t.Fatalf("Bob failed to send a message to the group: %v", err)
}
time.Sleep(time.Second * 10)
fmt.Printf("%v> %v\n", aliceName, aliceLines[1])
alice.SendMessageToGroupTracked(groupID, aliceLines[1])
alice.SendMessage(groupID, aliceLines[1])
time.Sleep(time.Second * 10)
fmt.Printf("%v> %v\n", bobName, bobLines[1])
bob.SendMessageToGroupTracked(groupID, bobLines[1])
bob.SendMessage(groupID, bobLines[1])
time.Sleep(time.Second * 10)
fmt.Println("Alice inviting Carol to group...")
@ -332,12 +335,12 @@ func TestCwtchPeerIntegration(t *testing.T) {
numGoRotinesPostCarolConnect := runtime.NumGoroutine()
fmt.Printf("%v> %v", bobName, bobLines[2])
bob.SendMessageToGroupTracked(groupID, bobLines[2])
bob.SendMessage(groupID, bobLines[2])
// Bob should have enough tokens so we don't need to account for
// token acquisition here...
fmt.Printf("%v> %v", carolName, carolLines[0])
carol.SendMessageToGroupTracked(groupID, carolLines[0])
carol.SendMessage(groupID, carolLines[0])
time.Sleep(time.Second * 30) // we need to account for spam-based token acquisition, but everything should
// be warmed-up and delays should be pretty small.
@ -406,7 +409,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
carolGroupTimeline := carolsGroup.GetTimeline()
if carolGroupTimeline[0].Message != aliceLines[0] || carolGroupTimeline[1].Message != bobLines[0] ||
carolGroupTimeline[2].Message != aliceLines[1] || carolGroupTimeline[3].Message != bobLines[1] ||
carolGroupTimeline[4].Message != bobLines[2] || carolGroupTimeline[5].Message != carolLines[0] {
carolGroupTimeline[4].Message != carolLines[0] || carolGroupTimeline[5].Message != bobLines[2] {
t.Errorf("Some of Carol's timeline messages did not have the expected content!")
}
}

View File

@ -8,6 +8,7 @@ import (
"cwtch.im/cwtch/functionality/filesharing"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/model/attr"
"cwtch.im/cwtch/model/constants"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/protocol/connections"
"cwtch.im/cwtch/protocol/files"
@ -40,8 +41,8 @@ func waitForPeerPeerConnection(t *testing.T, peera peer.CwtchPeer, peerb peer.Cw
time.Sleep(time.Second * 5)
continue
} else {
peerAName, _ := peera.GetAttribute(attr.GetLocalScope("name"))
peerBName, _ := peerb.GetAttribute(attr.GetLocalScope("name"))
peerAName, _ := peera.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
peerBName, _ := peerb.GetScopedZonedAttribute(attr.LocalScope, attr.ProfileZone, constants.Name)
fmt.Printf("%v CONNECTED and AUTHED to %v\n", peerAName, peerBName)
break
}