libricochet-go/channels/v3/inbound/3dhauthchannel_test.go

127 lines
3.8 KiB
Go

package inbound
import (
"crypto/rand"
"git.openprivacy.ca/openprivacy/libricochet-go/channels"
"git.openprivacy.ca/openprivacy/libricochet-go/channels/v3/outbound"
"git.openprivacy.ca/openprivacy/libricochet-go/identity"
"git.openprivacy.ca/openprivacy/libricochet-go/utils"
"git.openprivacy.ca/openprivacy/libricochet-go/wire/auth/3edh"
"git.openprivacy.ca/openprivacy/libricochet-go/wire/control"
"github.com/golang/protobuf/proto"
"golang.org/x/crypto/ed25519"
"testing"
)
func TestServer3DHAuthChannel(t *testing.T) {
cc := new(channels.Channel)
cc.ID = 1
closed := false
cc.CloseChannel = func() { closed = true }
cc.DelegateEncryption = func([32]byte) {}
clientChannel := new(outbound.Client3DHAuthChannel)
pub, priv, _ := ed25519.GenerateKey(rand.Reader)
cid := identity.InitializeV3("", &priv, &pub)
clientChannel.ClientIdentity = cid
ocb, _ := clientChannel.OpenOutbound(cc)
packet := new(Protocol_Data_Control.Packet)
proto.Unmarshal(ocb, packet)
s3dhchannel := new(Server3DHAuthChannel)
pub, priv, _ = ed25519.GenerateKey(rand.Reader)
sid := identity.InitializeV3("", &priv, &pub)
s3dhchannel.ServerIdentity = sid
clientChannel.ServerHostname = utils.GetTorV3Hostname(pub)
cr, _ := s3dhchannel.OpenInbound(cc, packet.GetOpenChannel())
proto.Unmarshal(cr, packet)
if packet.GetChannelResult() != nil {
authPacket := new(Protocol_Data_Auth_TripleEDH.Packet)
var lastMessage []byte
cc.SendMessage = func(message []byte) {
t.Logf("Received: %x", message)
proto.Unmarshal(message, authPacket)
lastMessage = message
}
clientChannel.OpenOutboundResult(nil, packet.GetChannelResult())
if closed == true {
t.Fatalf("Should not have closed channel!")
}
if authPacket.Proof == nil {
t.Errorf("Was expected a Proof Packet, instead %v", authPacket)
}
s3dhchannel.ServerAuthValid = func(hostname string, publicKey ed25519.PublicKey) (allowed, known bool) {
if hostname != clientChannel.ClientIdentity.Hostname() {
t.Errorf("Hostname and public key did not match %v %v", hostname, pub)
}
return true, true
}
cc.DelegateAuthorization = func() {}
s3dhchannel.Packet(lastMessage)
} else {
t.Errorf("Should have received a Channel Response from OpenInbound: %v", packet)
}
}
func TestServer3DHAuthChannelReject(t *testing.T) {
cc := new(channels.Channel)
cc.ID = 1
cc.CloseChannel = func() {}
cc.DelegateEncryption = func([32]byte) {}
clientChannel := new(outbound.Client3DHAuthChannel)
pub, priv, _ := ed25519.GenerateKey(rand.Reader)
cid := identity.InitializeV3("", &priv, &pub)
clientChannel.ClientIdentity = cid
ocb, _ := clientChannel.OpenOutbound(cc)
packet := new(Protocol_Data_Control.Packet)
proto.Unmarshal(ocb, packet)
s3dhchannel := new(Server3DHAuthChannel)
pub, priv, _ = ed25519.GenerateKey(rand.Reader)
sid := identity.InitializeV3("", &priv, &pub)
s3dhchannel.ServerIdentity = sid
clientChannel.ServerHostname = utils.GetTorV3Hostname(pub)
cr, _ := s3dhchannel.OpenInbound(cc, packet.GetOpenChannel())
proto.Unmarshal(cr, packet)
if packet.GetChannelResult() != nil {
authPacket := new(Protocol_Data_Auth_TripleEDH.Packet)
var lastMessage []byte
cc.SendMessage = func(message []byte) {
proto.Unmarshal(message, authPacket)
// Replace the Auth Proof Packet to cause this to fail.
if authPacket.GetProof() != nil {
authPacket.GetProof().Proof = []byte{}
lastMessage, _ = proto.Marshal(authPacket)
}
}
clientChannel.OpenOutboundResult(nil, packet.GetChannelResult())
if authPacket.Proof == nil {
t.Errorf("Was expected a Proof Packet, instead %v", authPacket)
}
s3dhchannel.ServerAuthInvalid = func(err error) {
if err == nil {
t.Fatal("Server Auth Should have been invalid!")
}
}
cc.DelegateAuthorization = func() {}
s3dhchannel.Packet(lastMessage)
} else {
t.Errorf("Should have received a Channel Response from OpenInbound: %v", packet)
}
}