add application integration test: (#42)
- start two peers alice, bob - make alice requests contact with bob - they send messages - they shutdown - verify (and fix) no threads leaked - verify messages - add inbound connection handler to chatChannelHandler - add Open and AcceptAllContactHandler to application
This commit is contained in:
parent
417d25dc7c
commit
5a94afa0f7
|
@ -0,0 +1,13 @@
|
|||
package application
|
||||
|
||||
type AcceptAllContactHandler struct{}
|
||||
|
||||
func (aach *AcceptAllContactHandler) ContactRequest(name string, message string) string {
|
||||
return "Pending"
|
||||
}
|
||||
func (aach *AcceptAllContactHandler) ContactRequestRejected() {
|
||||
}
|
||||
func (aach *AcceptAllContactHandler) ContactRequestAccepted() {
|
||||
}
|
||||
func (aach *AcceptAllContactHandler) ContactRequestError() {
|
||||
}
|
|
@ -3,6 +3,7 @@ package application
|
|||
import (
|
||||
"crypto/rsa"
|
||||
"github.com/s-rah/go-ricochet"
|
||||
"github.com/s-rah/go-ricochet/channels"
|
||||
"github.com/s-rah/go-ricochet/connection"
|
||||
"github.com/s-rah/go-ricochet/identity"
|
||||
"log"
|
||||
|
@ -15,13 +16,15 @@ import (
|
|||
type RicochetApplication struct {
|
||||
contactManager ContactManagerInterface
|
||||
privateKey *rsa.PrivateKey
|
||||
name string
|
||||
l net.Listener
|
||||
instances []*ApplicationInstance
|
||||
lock sync.Mutex
|
||||
aif ApplicationInstanceFactory
|
||||
}
|
||||
|
||||
func (ra *RicochetApplication) Init(pk *rsa.PrivateKey, af ApplicationInstanceFactory, cm ContactManagerInterface) {
|
||||
func (ra *RicochetApplication) Init(name string, pk *rsa.PrivateKey, af ApplicationInstanceFactory, cm ContactManagerInterface) {
|
||||
ra.name = name
|
||||
ra.privateKey = pk
|
||||
ra.aif = af
|
||||
ra.contactManager = cm
|
||||
|
@ -38,7 +41,7 @@ func (ra *RicochetApplication) handleConnection(conn net.Conn) {
|
|||
|
||||
ich := connection.HandleInboundConnection(rc)
|
||||
|
||||
err = ich.ProcessAuthAsServer(identity.Initialize("", ra.privateKey), ra.contactManager.LookupContact)
|
||||
err = ich.ProcessAuthAsServer(identity.Initialize(ra.name, ra.privateKey), ra.contactManager.LookupContact)
|
||||
if err != nil {
|
||||
log.Printf("There was an error")
|
||||
conn.Close()
|
||||
|
@ -58,6 +61,38 @@ func (ra *RicochetApplication) HandleApplicationInstance(rai *ApplicationInstanc
|
|||
ra.lock.Unlock()
|
||||
}
|
||||
|
||||
// Open a connection to another Ricochet peer at onionAddress. If they are unknown to use, use requestMessage (otherwise can be blank)
|
||||
func (ra *RicochetApplication) Open(onionAddress string, requestMessage string) (*ApplicationInstance, error) {
|
||||
rc, err := goricochet.Open(onionAddress)
|
||||
rc.TraceLog(true)
|
||||
if err != nil {
|
||||
log.Printf("Error in application.Open(): %v\n", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
known, err := connection.HandleOutboundConnection(rc).ProcessAuthAsClient(identity.Initialize(ra.name, ra.privateKey))
|
||||
rai := ra.aif.GetApplicationInstance(rc)
|
||||
go rc.Process(rai)
|
||||
|
||||
if !known {
|
||||
err := rc.Do(func() error {
|
||||
_, err := rc.RequestOpenChannel("im.ricochet.contact.request",
|
||||
&channels.ContactRequestChannel{
|
||||
Handler: new(AcceptAllContactHandler),
|
||||
Name: ra.name,
|
||||
Message: requestMessage,
|
||||
})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("could not contact %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
ra.HandleApplicationInstance(rai)
|
||||
return rai, nil
|
||||
}
|
||||
|
||||
func (ra *RicochetApplication) Broadcast(do func(rai *ApplicationInstance)) {
|
||||
ra.lock.Lock()
|
||||
for _, rai := range ra.instances {
|
||||
|
@ -68,6 +103,9 @@ func (ra *RicochetApplication) Broadcast(do func(rai *ApplicationInstance)) {
|
|||
|
||||
func (ra *RicochetApplication) Shutdown() {
|
||||
ra.l.Close()
|
||||
for _, instance := range ra.instances {
|
||||
instance.Connection.Conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (ra *RicochetApplication) Run(l net.Listener) {
|
||||
|
|
|
@ -27,7 +27,7 @@ func (af *ApplicationInstanceFactory) AddHandler(ctype string, chandler func(*Ap
|
|||
af.handlerMap[ctype] = chandler
|
||||
}
|
||||
|
||||
// GetApplicationInstance buulds a new application instance using a connection as a base.
|
||||
// GetApplicationInstance builds a new application instance using a connection as a base.
|
||||
func (af *ApplicationInstanceFactory) GetApplicationInstance(rc *connection.Connection) *ApplicationInstance {
|
||||
rai := new(ApplicationInstance)
|
||||
rai.Init()
|
||||
|
|
|
@ -18,6 +18,21 @@ func (ebi *EchoBotInstance) Init(rai *application.ApplicationInstance, ra *appli
|
|||
ebi.ra = ra
|
||||
}
|
||||
|
||||
// We always want bidirectional chat channels
|
||||
func (ebi *EchoBotInstance) OpenInbound() {
|
||||
log.Println("OpenInbound() ChatChannel handler called...")
|
||||
outboutChatChannel := ebi.rai.Connection.Channel("im.ricochet.chat", channels.Outbound)
|
||||
if outboutChatChannel == nil {
|
||||
ebi.rai.Connection.Do(func() error {
|
||||
ebi.rai.Connection.RequestOpenChannel("im.ricochet.chat",
|
||||
&channels.ChatChannel{
|
||||
Handler: ebi,
|
||||
})
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (ebi *EchoBotInstance) ChatMessage(messageID uint32, when time.Time, message string) bool {
|
||||
log.Printf("message from %v - %v", ebi.rai.RemoteHostname, message)
|
||||
go ebi.ra.Broadcast(func(rai *application.ApplicationInstance) {
|
||||
|
@ -32,12 +47,6 @@ func (ebi *EchoBotInstance) ChatMessageAck(messageID uint32, accepted bool) {
|
|||
|
||||
func (ebi *EchoBotInstance) SendChatMessage(rai *application.ApplicationInstance, message string) {
|
||||
rai.Connection.Do(func() error {
|
||||
// Technically this errors after the second time but we can ignore it.
|
||||
rai.Connection.RequestOpenChannel("im.ricochet.chat",
|
||||
&channels.ChatChannel{
|
||||
Handler: ebi,
|
||||
})
|
||||
|
||||
channel := rai.Connection.Channel("im.ricochet.chat", channels.Outbound)
|
||||
if channel != nil {
|
||||
chatchannel, ok := channel.Handler.(*channels.ChatChannel)
|
||||
|
|
|
@ -28,6 +28,8 @@ type ChatChannel struct {
|
|||
// ConnectionHandler; there is no need to use a distinct type as a
|
||||
// ChatChannelHandler.
|
||||
type ChatChannelHandler interface {
|
||||
// OpenInbound is called when a inbound chat channel is opened
|
||||
OpenInbound()
|
||||
// ChatMessage is called when a chat message is received. Return true to acknowledge
|
||||
// the message successfully, and false to NACK and refuse the message.
|
||||
ChatMessage(messageID uint32, when time.Time, message string) bool
|
||||
|
@ -101,6 +103,7 @@ func (cc *ChatChannel) OpenInbound(channel *Channel, raw *Protocol_Data_Control.
|
|||
cc.lastMessageID = uint32(id.Uint64())
|
||||
cc.channel.Pending = false
|
||||
messageBuilder := new(utils.MessageBuilder)
|
||||
go cc.Handler.OpenInbound()
|
||||
return messageBuilder.AckOpenChannel(channel.ID), nil
|
||||
}
|
||||
|
||||
|
|
|
@ -30,28 +30,6 @@ func TestChatChannelOptions(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestChatChannelOpenInbound(t *testing.T) {
|
||||
messageBuilder := new(utils.MessageBuilder)
|
||||
ocm := messageBuilder.OpenChannel(2, "im.ricochet.chat")
|
||||
|
||||
// We have just constructed this so there is little
|
||||
// point in doing error checking here in the test
|
||||
res := new(Protocol_Data_Control.Packet)
|
||||
proto.Unmarshal(ocm[:], res)
|
||||
opm := res.GetOpenChannel()
|
||||
|
||||
chatChannel := new(ChatChannel)
|
||||
channel := Channel{ID: 1}
|
||||
response, err := chatChannel.OpenInbound(&channel, opm)
|
||||
|
||||
if err == nil {
|
||||
res := new(Protocol_Data_Control.Packet)
|
||||
proto.Unmarshal(response[:], res)
|
||||
} else {
|
||||
t.Errorf("Error while parsing chatchannel openinbound output: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChatChannelOpenOutbound(t *testing.T) {
|
||||
chatChannel := new(ChatChannel)
|
||||
channel := Channel{ID: 1}
|
||||
|
@ -72,6 +50,10 @@ func TestChatChannelOpenOutbound(t *testing.T) {
|
|||
type TestChatChannelHandler struct {
|
||||
}
|
||||
|
||||
func (tcch *TestChatChannelHandler) OpenInbound() {
|
||||
|
||||
}
|
||||
|
||||
func (tcch *TestChatChannelHandler) ChatMessage(messageID uint32, when time.Time, message string) bool {
|
||||
return true
|
||||
}
|
||||
|
@ -80,6 +62,29 @@ func (tcch *TestChatChannelHandler) ChatMessageAck(messageID uint32, accepted bo
|
|||
|
||||
}
|
||||
|
||||
func TestChatChannelOpenInbound(t *testing.T) {
|
||||
messageBuilder := new(utils.MessageBuilder)
|
||||
ocm := messageBuilder.OpenChannel(2, "im.ricochet.chat")
|
||||
|
||||
// We have just constructed this so there is little
|
||||
// point in doing error checking here in the test
|
||||
res := new(Protocol_Data_Control.Packet)
|
||||
proto.Unmarshal(ocm[:], res)
|
||||
opm := res.GetOpenChannel()
|
||||
|
||||
chatChannel := new(ChatChannel)
|
||||
chatChannel.Handler = new(TestChatChannelHandler)
|
||||
channel := Channel{ID: 1}
|
||||
response, err := chatChannel.OpenInbound(&channel, opm)
|
||||
|
||||
if err == nil {
|
||||
res := new(Protocol_Data_Control.Packet)
|
||||
proto.Unmarshal(response[:], res)
|
||||
} else {
|
||||
t.Errorf("Error while parsing chatchannel openinbound output: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChatChannelOperations(t *testing.T) {
|
||||
|
||||
// We test OpenOutboundElsewhere
|
||||
|
|
|
@ -33,6 +33,9 @@ func (echobot *RicochetEchoBot) ChatMessage(messageID uint32, when time.Time, me
|
|||
return true
|
||||
}
|
||||
|
||||
func (echobot *RicochetEchoBot) OpenInbound() {
|
||||
}
|
||||
|
||||
func (echobot *RicochetEchoBot) ChatMessageAck(messageID uint32, accepted bool) {
|
||||
|
||||
}
|
||||
|
@ -48,6 +51,7 @@ func (echobot *RicochetEchoBot) Connect(privateKeyFile string, hostname string)
|
|||
contact.Handler = echobot
|
||||
return contact
|
||||
})
|
||||
|
||||
echobot.RegisterChannelHandler("im.ricochet.chat", func() channels.Handler {
|
||||
chat := new(channels.ChatChannel)
|
||||
chat.Handler = echobot
|
||||
|
|
|
@ -0,0 +1,226 @@
|
|||
package testing
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/s-rah/go-ricochet/application"
|
||||
"github.com/s-rah/go-ricochet/channels"
|
||||
"github.com/s-rah/go-ricochet/utils"
|
||||
"log"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Message struct {
|
||||
From, To string
|
||||
Message string
|
||||
}
|
||||
|
||||
type MessageStack interface {
|
||||
Add(from, to, message string)
|
||||
Get() []Message
|
||||
}
|
||||
|
||||
type Messages struct {
|
||||
messages []Message
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func (messages *Messages) Init() {
|
||||
messages.messages = []Message{}
|
||||
}
|
||||
|
||||
func (messages *Messages) Add(from, to, message string) {
|
||||
messages.lock.Lock()
|
||||
messages.messages = append(messages.messages, Message{from, to, message})
|
||||
messages.lock.Unlock()
|
||||
}
|
||||
|
||||
func (messages *Messages) Get() []Message {
|
||||
return messages.messages
|
||||
}
|
||||
|
||||
type ChatEchoBot struct {
|
||||
onion string
|
||||
rai *application.ApplicationInstance
|
||||
n int
|
||||
Messages MessageStack
|
||||
}
|
||||
|
||||
// We always want bidirectional chat channels
|
||||
func (bot *ChatEchoBot) OpenInbound() {
|
||||
log.Println("OpenInbound() ChatChannel handler called...")
|
||||
outboutChatChannel := bot.rai.Connection.Channel("im.ricochet.chat", channels.Outbound)
|
||||
if outboutChatChannel == nil {
|
||||
bot.rai.Connection.Do(func() error {
|
||||
bot.rai.Connection.RequestOpenChannel("im.ricochet.chat",
|
||||
&channels.ChatChannel{
|
||||
Handler: bot,
|
||||
})
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (bot *ChatEchoBot) ChatMessage(messageID uint32, when time.Time, message string) bool {
|
||||
log.Printf("ChatMessage(from: %v, %v", bot.rai.RemoteHostname, message)
|
||||
bot.Messages.Add(bot.rai.RemoteHostname, bot.onion, message)
|
||||
SendMessage(bot.rai, strconv.Itoa(bot.n)+" witty response")
|
||||
bot.n += 1
|
||||
return true
|
||||
}
|
||||
|
||||
func SendMessage(rai *application.ApplicationInstance, message string) {
|
||||
log.Printf("SendMessage(to: %v, %v)\n", rai.RemoteHostname, message)
|
||||
rai.Connection.Do(func() error {
|
||||
|
||||
log.Printf("Finding Chat Channel")
|
||||
channel := rai.Connection.Channel("im.ricochet.chat", channels.Outbound)
|
||||
if channel != nil {
|
||||
log.Printf("Found Chat Channel")
|
||||
chatchannel, ok := channel.Handler.(*channels.ChatChannel)
|
||||
if ok {
|
||||
chatchannel.SendMessage(message)
|
||||
}
|
||||
} else {
|
||||
log.Printf("Could not find chat channel")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (bot *ChatEchoBot) ChatMessageAck(messageID uint32, accepted bool) {
|
||||
|
||||
}
|
||||
|
||||
func TestApplicationIntegration(t *testing.T) {
|
||||
startGoRoutines := runtime.NumGoroutine()
|
||||
messageStack := &Messages{}
|
||||
messageStack.Init()
|
||||
|
||||
fmt.Println("Initializing application factory...")
|
||||
af := application.ApplicationInstanceFactory{}
|
||||
af.Init()
|
||||
|
||||
af.AddHandler("im.ricochet.contact.request", func(rai *application.ApplicationInstance) func() channels.Handler {
|
||||
return func() channels.Handler {
|
||||
contact := new(channels.ContactRequestChannel)
|
||||
contact.Handler = new(application.AcceptAllContactHandler)
|
||||
return contact
|
||||
}
|
||||
})
|
||||
|
||||
fmt.Println("Starting alice...")
|
||||
alice := new(application.RicochetApplication)
|
||||
fmt.Println("Generating alice's pk...")
|
||||
apk, _ := utils.GeneratePrivateKey()
|
||||
aliceAddr, _ := utils.GetOnionAddress(apk)
|
||||
fmt.Println("Seting up alice's onion " + aliceAddr + "...")
|
||||
al, err := application.SetupOnion("127.0.0.1:9051", "tcp4", "", apk, 9878)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not setup Onion for Alice: %v", err)
|
||||
}
|
||||
|
||||
fmt.Println("Initializing alice...")
|
||||
af.AddHandler("im.ricochet.chat", func(rai *application.ApplicationInstance) func() channels.Handler {
|
||||
return func() channels.Handler {
|
||||
chat := new(channels.ChatChannel)
|
||||
chat.Handler = &ChatEchoBot{rai: rai, n: 0, Messages: messageStack, onion: aliceAddr}
|
||||
return chat
|
||||
}
|
||||
})
|
||||
alice.Init("Alice", apk, af, new(application.AcceptAllContactManager))
|
||||
fmt.Println("Running alice...")
|
||||
go alice.Run(al)
|
||||
|
||||
fmt.Println("Starting bob...")
|
||||
bob := new(application.RicochetApplication)
|
||||
bpk, err := utils.GeneratePrivateKey()
|
||||
if err != nil {
|
||||
t.Fatalf("Could not setup Onion for Alice: %v", err)
|
||||
}
|
||||
bobAddr, _ := utils.GetOnionAddress(bpk)
|
||||
fmt.Println("Seting up bob's onion " + bobAddr + "...")
|
||||
bl, _ := application.SetupOnion("127.0.0.1:9051", "tcp4", "", bpk, 9878)
|
||||
af.AddHandler("im.ricochet.chat", func(rai *application.ApplicationInstance) func() channels.Handler {
|
||||
return func() channels.Handler {
|
||||
chat := new(channels.ChatChannel)
|
||||
chat.Handler = &ChatEchoBot{rai: rai, n: 0, Messages: messageStack, onion: bobAddr}
|
||||
return chat
|
||||
}
|
||||
})
|
||||
bob.Init("Bob", bpk, af, new(application.AcceptAllContactManager))
|
||||
go bob.Run(bl)
|
||||
|
||||
fmt.Println("Waiting for alice and bob hidden services to percolate...")
|
||||
time.Sleep(60 * time.Second)
|
||||
runningGoRoutines := runtime.NumGoroutine()
|
||||
|
||||
fmt.Println("Alice connecting to Bob...")
|
||||
// out going rc from alice to bob
|
||||
alicei, err := alice.Open(bobAddr, "It's alice")
|
||||
if err != nil {
|
||||
t.Fatalf("Error Alice connecting to Bob: %v", err)
|
||||
}
|
||||
time.Sleep(10 * time.Second)
|
||||
|
||||
fmt.Println("Alice request open chat channel...")
|
||||
// TODO: opening a channel should be easier?
|
||||
alicei.Connection.Do(func() error {
|
||||
handler, err := alicei.OnOpenChannelRequest("im.ricochet.chat")
|
||||
if err != nil {
|
||||
log.Printf("Could not get chat handler!\n")
|
||||
return err
|
||||
}
|
||||
_, err = alicei.Connection.RequestOpenChannel("im.ricochet.chat", handler)
|
||||
return err
|
||||
})
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
fmt.Println("Alice sending message to Bob...")
|
||||
SendMessage(alicei, "Hello Bob!")
|
||||
|
||||
if err != nil {
|
||||
log.Fatal("Error dialing from Alice to Bob: ", err)
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Second)
|
||||
|
||||
// should now be connected to bob
|
||||
connectedGoRoutines := runtime.NumGoroutine()
|
||||
|
||||
fmt.Println("Shutting bob down...")
|
||||
bob.Shutdown()
|
||||
|
||||
time.Sleep(15 * time.Second)
|
||||
|
||||
bobShutdownGoRoutines := runtime.NumGoroutine()
|
||||
|
||||
fmt.Println("Shutting alice down...")
|
||||
alice.Shutdown()
|
||||
time.Sleep(15 * time.Second)
|
||||
|
||||
finalGoRoutines := runtime.NumGoroutine()
|
||||
|
||||
fmt.Printf("startGoRoutines: %v\nrunningGoROutines: %v\nconnectedGoRoutines: %v\nBobShutdownGoRoutines: %v\nfinalGoRoutines: %v\n", startGoRoutines, runningGoRoutines, connectedGoRoutines, bobShutdownGoRoutines, finalGoRoutines)
|
||||
|
||||
if bobShutdownGoRoutines != startGoRoutines+1 {
|
||||
t.Errorf("After shutting down bob, go routines were not start + 1 (alice) value. Expected: %v Actual %v", startGoRoutines+1, bobShutdownGoRoutines)
|
||||
}
|
||||
|
||||
if finalGoRoutines != startGoRoutines {
|
||||
t.Errorf("After shutting alice and bob down, go routines were not at start value. Expected: %v Actual: %v", startGoRoutines, finalGoRoutines)
|
||||
}
|
||||
|
||||
fmt.Println("Messages:")
|
||||
for _, message := range messageStack.Get() {
|
||||
fmt.Printf(" from:%v to:%v '%v'\n", message.From, message.To, message.Message)
|
||||
}
|
||||
|
||||
messages := messageStack.Get()
|
||||
if messages[0].Message != "Hello Bob!" || messages[1].Message != "0 witty response" {
|
||||
t.Errorf("Message history did not contain first two expected messages!")
|
||||
}
|
||||
}
|
Reference in New Issue