cwtch/peer/cwtch_peer.go

406 lines
13 KiB
Go

package peer
import (
"crypto/rsa"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer/connections"
"cwtch.im/cwtch/peer/peer"
"cwtch.im/cwtch/protocol"
"encoding/base64"
"errors"
"fmt"
"git.openprivacy.ca/openprivacy/libricochet-go/application"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/connection"
"git.openprivacy.ca/openprivacy/libricochet-go/connectivity"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/log"
"git.openprivacy.ca/openprivacy/libricochet-go/utils"
"github.com/golang/protobuf/proto"
"golang.org/x/crypto/ed25519"
"strings"
"sync"
"time"
)
// cwtchPeer manages incoming and outgoing connections and all processing for a Cwtch cwtchPeer
type cwtchPeer struct {
connection.AutoConnectionHandler
Profile *model.Profile
app *application.RicochetApplication
acn connectivity.ACN
mutex sync.Mutex
connectionsManager *connections.Manager
dataHandler func(string, []byte) []byte
shutdown bool
aif application.ApplicationInstanceFactory
started bool
}
// CwtchPeer provides us with a way of testing systems built on top of cwtch without having to
// directly implement a cwtchPeer.
type CwtchPeer interface {
Init(connectivity.ACN)
PeerWithOnion(string) *connections.PeerPeerConnection
InviteOnionToGroup(string, string) error
TrustPeer(string) error
BlockPeer(string) error
AcceptInvite(string) error
RejectInvite(string)
JoinServer(string)
SendMessageToGroup(string, string) error
GetProfile() *model.Profile
GetPeers() map[string]connections.ConnectionState
GetServers() map[string]connections.ConnectionState
StartGroup(string) (string, []byte, error)
ImportGroup(string) (string, error)
ExportGroup(string) (string, error)
GetGroup(string) *model.Group
GetGroups() []string
GetContacts() []string
GetContact(string) *model.PublicProfile
SetApplicationInstanceFactory(factory application.ApplicationInstanceFactory)
SetPeerDataHandler(func(string, []byte) []byte)
IsStarted() bool
Listen()
Shutdown()
}
// NewCwtchPeer creates and returns a new cwtchPeer with the given name.
func NewCwtchPeer(name string) CwtchPeer {
cp := new(cwtchPeer)
cp.Profile = model.GenerateNewProfile(name)
cp.shutdown = false
return cp
}
// FromProfile generates a new peer from a profile.
func FromProfile(profile *model.Profile) CwtchPeer {
cp := new(cwtchPeer)
cp.Profile = profile
return cp
}
// Init instantiates a cwtchPeer
func (cp *cwtchPeer) Init(acn connectivity.ACN) {
cp.acn = acn
cp.connectionsManager = connections.NewConnectionsManager(cp.acn)
go cp.connectionsManager.AttemptReconnections()
}
// ImportGroup intializes a group from an imported source rather than a peer invite
func (cp *cwtchPeer) ImportGroup(exportedInvite string) (groupID string, err error) {
if strings.HasPrefix(exportedInvite, "torv3") {
data, err := base64.StdEncoding.DecodeString(exportedInvite[5+44:])
if err == nil {
cpp := &protocol.CwtchPeerPacket{}
err := proto.Unmarshal(data, cpp)
if err == nil {
pk, err := base64.StdEncoding.DecodeString(exportedInvite[5 : 5+44])
if err == nil {
edpk := ed25519.PublicKey(pk)
onion := utils.GetTorV3Hostname(edpk)
cp.Profile.AddContact(onion, &model.PublicProfile{Name: "", Ed25519PublicKey: edpk, Trusted: true, Blocked: false, Onion: onion})
cp.Profile.ProcessInvite(cpp.GetGroupChatInvite(), onion)
return cpp.GroupChatInvite.GetGroupName(), nil
}
}
}
} else {
err = errors.New("unsupported exported group type")
}
return
}
// a handler for the optional data handler
// note that the "correct" way to do this would be to AddChannelHandler("im.cwtch.peerdata", ...") but peerdata is such
// a handy channel that it's nice to have this convenience shortcut
// SetPeerDataHandler sets the handler for the (optional) data channel for cwtch peers.
func (cp *cwtchPeer) SetPeerDataHandler(dataHandler func(string, []byte) []byte) {
cp.dataHandler = dataHandler
}
// add extra channel handlers (note that the peer will merge these with the ones necessary to make cwtch work, so you
// are not clobbering the underlying functionality)
func (cp *cwtchPeer) SetApplicationInstanceFactory(aif application.ApplicationInstanceFactory) {
cp.aif = aif
}
// ExportGroup serializes a group invite so it can be given offline
func (cp *cwtchPeer) ExportGroup(groupID string) (string, error) {
group := cp.Profile.GetGroupByGroupID(groupID)
if group != nil {
invite, err := group.Invite(group.GetInitialMessage())
if err == nil {
exportedInvite := "torv3" + base64.StdEncoding.EncodeToString(cp.Profile.Ed25519PublicKey) + base64.StdEncoding.EncodeToString(invite)
return exportedInvite, err
}
}
return "", errors.New("group id could not be found")
}
// StartGroup create a new group linked to the given server and returns the group ID, an invite or an error.
func (cp *cwtchPeer) StartGroup(server string) (string, []byte, error) {
return cp.Profile.StartGroup(server)
}
// StartGroupWithMessage create a new group linked to the given server and returns the group ID, an invite or an error.
func (cp *cwtchPeer) StartGroupWithMessage(server string, initialMessage []byte) (string, []byte, error) {
return cp.Profile.StartGroupWithMessage(server, initialMessage)
}
// GetGroups returns an unordered list of all group IDs.
func (cp *cwtchPeer) GetGroups() []string {
return cp.Profile.GetGroups()
}
// GetGroup returns a pointer to a specific group, nil if no group exists.
func (cp *cwtchPeer) GetGroup(groupID string) *model.Group {
return cp.Profile.GetGroupByGroupID(groupID)
}
// GetContacts returns an unordered list of onions
func (cp *cwtchPeer) GetContacts() []string {
return cp.Profile.GetContacts()
}
// GetContact returns a given contact, nil is no such contact exists
func (cp *cwtchPeer) GetContact(onion string) *model.PublicProfile {
contact, _ := cp.Profile.GetContact(onion)
return contact
}
// GetProfile returns the profile associated with this cwtchPeer.
// TODO While it is probably "safe", it is not really "safe", to call functions on this profile. This only exists to return things like Name and Onion,we should gate these.
func (cp *cwtchPeer) GetProfile() *model.Profile {
return cp.Profile
}
// PeerWithOnion is the entry point for cwtchPeer relationships
func (cp *cwtchPeer) PeerWithOnion(onion string) *connections.PeerPeerConnection {
return cp.connectionsManager.ManagePeerConnection(onion, cp.Profile, cp.dataHandler, cp.aif)
}
// InviteOnionToGroup kicks off the invite process
func (cp *cwtchPeer) InviteOnionToGroup(onion string, groupid string) error {
group := cp.Profile.GetGroupByGroupID(groupid)
if group != nil {
log.Infof("Constructing invite for group: %v\n", group)
invite, err := group.Invite(group.GetInitialMessage())
if err != nil {
return err
}
ppc := cp.connectionsManager.GetPeerPeerConnectionForOnion(onion)
if ppc == nil {
return errors.New("peer connection not setup for onion. peers must be trusted before sending")
}
if ppc.GetState() == connections.AUTHENTICATED {
log.Infof("Got connection for group: %v - Sending Invite\n", ppc)
ppc.SendGroupInvite(invite)
} else {
return errors.New("cannot send invite to onion: peer connection is not ready")
}
return nil
}
return errors.New("group id could not be found")
}
// ReceiveGroupMessage is a callback function that processes GroupMessages from a given server
func (cp *cwtchPeer) ReceiveGroupMessage(server string, gm *protocol.GroupMessage) {
cp.Profile.AttemptDecryption(gm.Ciphertext, gm.Signature)
}
// JoinServer manages a new server connection with the given onion address
func (cp *cwtchPeer) JoinServer(onion string) {
cp.connectionsManager.ManageServerConnection(onion, cp.ReceiveGroupMessage)
}
// SendMessageToGroup attemps to sent the given message to the given group id.
func (cp *cwtchPeer) SendMessageToGroup(groupid string, message string) error {
group := cp.Profile.GetGroupByGroupID(groupid)
if group == nil {
return errors.New("group does not exist")
}
psc := cp.connectionsManager.GetPeerServerConnectionForOnion(group.GroupServer)
if psc == nil {
return errors.New("could not find server connection to send message to")
}
ct, sig, err := cp.Profile.EncryptMessageToGroup(message, groupid)
if err != nil {
return err
}
gm := &protocol.GroupMessage{
Ciphertext: ct,
Signature: sig,
}
err = psc.SendGroupMessage(gm)
return err
}
// GetPeers returns a list of peer connections.
func (cp *cwtchPeer) GetPeers() map[string]connections.ConnectionState {
return cp.connectionsManager.GetPeers()
}
// GetServers returns a list of server connections
func (cp *cwtchPeer) GetServers() map[string]connections.ConnectionState {
return cp.connectionsManager.GetServers()
}
// TrustPeer sets an existing peer relationship to trusted
func (cp *cwtchPeer) TrustPeer(peer string) error {
err := cp.Profile.TrustPeer(peer)
if err == nil {
cp.PeerWithOnion(peer)
}
return err
}
// BlockPeer blocks an existing peer relationship.
func (cp *cwtchPeer) BlockPeer(peer string) error {
err := cp.Profile.BlockPeer(peer)
cp.connectionsManager.ClosePeerConnection(peer)
return err
}
// AcceptInvite accepts a given existing group invite
func (cp *cwtchPeer) AcceptInvite(groupID string) error {
return cp.Profile.AcceptInvite(groupID)
}
// RejectInvite rejects a given group invite.
func (cp *cwtchPeer) RejectInvite(groupID string) {
cp.Profile.RejectInvite(groupID)
}
// LookupContact returns that a contact is known and allowed to communicate for all cases.
func (cp *cwtchPeer) LookupContact(hostname string, publicKey rsa.PublicKey) (allowed, known bool) {
blocked := cp.Profile.IsBlocked(hostname)
return !blocked, true
}
// LookupContactV3 returns that a contact is known and allowed to communicate for all cases.
func (cp *cwtchPeer) LookupContactV3(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
blocked := cp.Profile.IsBlocked(hostname)
return !blocked, true
}
// ContactRequest needed to implement ContactRequestHandler Interface
func (cp *cwtchPeer) ContactRequest(name string, message string) string {
return "Accepted"
}
func (cp *cwtchPeer) Listen() {
go func() {
for !cp.shutdown {
e := cp.listenFn()
if e != nil {
// TODO: was panic, then fatal
fmt.Printf("ERROR: peer %v has crashed with: %v\n", cp.GetProfile().Onion, e)
}
// listenFn failed, wait 5 seconds and try again
time.Sleep(5 * time.Second)
}
}()
}
// Listen sets up an onion listener to process incoming cwtch messages
func (cp *cwtchPeer) listenFn() error {
ra := new(application.RicochetApplication)
onionService, err := cp.acn.Listen(cp.Profile.Ed25519PrivateKey, application.RicochetPort)
if err != nil /*&& fmt.Sprintf("%v", err) != "550 Unspecified Tor error: Onion address collision"*/ {
return err
}
af := application.ApplicationInstanceFactory{}
af.Init()
af.AddHandler("im.cwtch.peer", func(rai *application.ApplicationInstance) func() channels.Handler {
cpi := new(CwtchPeerInstance)
cpi.Init(rai, ra)
return func() channels.Handler {
cpc := new(peer.CwtchPeerChannel)
cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp}
return cpc
}
})
if cp.dataHandler != nil {
af.AddHandler("im.cwtch.peer.data", func(rai *application.ApplicationInstance) func() channels.Handler {
cpi := new(CwtchPeerInstance)
cpi.Init(rai, ra)
return func() channels.Handler {
cpc := new(peer.CwtchPeerDataChannel)
cpc.Handler = &CwtchPeerHandler{Onion: rai.RemoteHostname, Peer: cp, DataHandler: cp.dataHandler}
return cpc
}
})
}
handlers := cp.aif.GetHandlers()
for i := range handlers {
af.AddHandler(handlers[i], cp.aif.GetHandler(handlers[i]))
}
ra.Init(cp.acn, cp.Profile.Name, identity.InitializeV3(cp.Profile.Name, &cp.Profile.Ed25519PrivateKey, &cp.Profile.Ed25519PublicKey), af, cp)
log.Infof("Running cwtch peer on %v", onionService.AddressFull())
cp.app = ra
cp.started = true
ra.Run(onionService)
return nil
}
// Shutdown kills all connections and cleans up all goroutines for the peer
func (cp *cwtchPeer) Shutdown() {
cp.shutdown = true
cp.connectionsManager.Shutdown()
if cp.app != nil {
cp.app.Shutdown()
}
}
// IsStarted returns true if Listen() has successfully been run before on this connection (ever). TODO: we will need to properly unset this flag on error if we want to support resumption in the future
func (cp *cwtchPeer) IsStarted() bool {
return cp.started
}
// CwtchPeerInstance encapsulates incoming peer connections
type CwtchPeerInstance struct {
rai *application.ApplicationInstance
ra *application.RicochetApplication
}
// Init sets up a CwtchPeerInstance
func (cpi *CwtchPeerInstance) Init(rai *application.ApplicationInstance, ra *application.RicochetApplication) {
cpi.rai = rai
cpi.ra = ra
}
// CwtchPeerHandler encapsulates handling of incoming CwtchPackets
type CwtchPeerHandler struct {
Onion string
Peer *cwtchPeer
DataHandler func(string, []byte) []byte
}
// HandleGroupInvite handles incoming GroupInvites
func (cph *CwtchPeerHandler) HandleGroupInvite(gci *protocol.GroupChatInvite) {
log.Debugf("Received GroupID from %v %v\n", cph.Onion, gci.String())
cph.Peer.Profile.ProcessInvite(gci, cph.Onion)
}
// HandlePacket handles the Cwtch cwtchPeer Data Channel
func (cph *CwtchPeerHandler) HandlePacket(data []byte) []byte {
return cph.DataHandler(cph.Onion, data)
}