Fixup Integ Test + Limit Messages in Server Memory #36

Merged
dan merged 1 commits from integ_test_fix into master 2018-06-09 21:21:43 +00:00
10 changed files with 129 additions and 77 deletions

View File

@ -109,4 +109,3 @@ func (ppc *PeerPeerConnection) Close() {
ppc.state = KILLED
ppc.connection.Conn.Close()
}

View File

@ -31,5 +31,6 @@ func main() {
server := new(cwtchserver.Server)
log.Printf("starting cwtch server...")
server.Run(privateKeyFile)
// TODO load params from .cwtch/server.conf or command line flag
server.Run(privateKeyFile, 100000)
}

View File

@ -16,10 +16,9 @@ type Server struct {
app *application.RicochetApplication
}
// Run s
// tarts a server with the given privateKey
// Run starts a server with the given privateKey
// TODO: surface errors
func (s *Server) Run(privateKeyFile string) {
func (s *Server) Run(privateKeyFile string, bufferSize int) {
cwtchserver := new(application.RicochetApplication)
pk, err := utils.LoadPrivateKeyFromFile(privateKeyFile)
@ -37,7 +36,7 @@ func (s *Server) Run(privateKeyFile string) {
af := application.ApplicationInstanceFactory{}
af.Init()
ms := new(storage.MessageStore)
ms.Init("cwtch.messages")
ms.Init("cwtch.messages", bufferSize)
af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler {
si := new(Instance)
si.Init(rai, cwtchserver, ms)

View File

@ -15,7 +15,7 @@ func TestServerInstance(t *testing.T) {
ra := new(application.RicochetApplication)
msi := new(storage.MessageStore)
os.Remove("ms.test")
msi.Init("ms.test")
msi.Init("ms.test", 5)
gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."),
Spamguard: []byte{},
@ -26,7 +26,7 @@ func TestServerInstance(t *testing.T) {
res := si.HandleFetchRequest()
if len(res) != 1 {
t.Errorf("Expected Group Messages Instead got %v", res)
t.Errorf("Expected 1 Group Messages Instead got %v", res)
}
// ra.HandleApplicationInstance(ai)

View File

@ -21,6 +21,9 @@ type MessageStore struct {
file *os.File
lock sync.Mutex
messages []*protocol.GroupMessage
bufferSize int
pos int
rotated bool
}
// Close closes the message store and underlying resources.
@ -31,13 +34,26 @@ func (ms *MessageStore) Close() {
ms.lock.Unlock()
}
// Init sets up a MessageStore backed by filename
func (ms *MessageStore) Init(filename string) {
func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) {
ms.messages[ms.pos] = gm
ms.pos++
if ms.pos == ms.bufferSize {
ms.pos = 0
ms.rotated = true
}
}
// Init sets up a MessageStore of size bufferSize backed by filename
func (ms *MessageStore) Init(filename string, bufferSize int) {
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
if err != nil {
panic(err)
}
ms.file = f
ms.pos = 0
ms.bufferSize = bufferSize
ms.messages = make([]*protocol.GroupMessage, bufferSize)
ms.rotated = false
scanner := bufio.NewScanner(f)
for scanner.Scan() {
@ -45,7 +61,7 @@ func (ms *MessageStore) Init(filename string) {
gm := &protocol.GroupMessage{}
err := json.Unmarshal([]byte(gms), gm)
if err == nil {
ms.messages = append(ms.messages, gm)
ms.updateBuffer(gm)
} else {
panic(err)
}
@ -59,9 +75,14 @@ func (ms *MessageStore) Init(filename string) {
// FetchMessages returns all messages from the backing file.
func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) {
messages = make([]*protocol.GroupMessage, len(ms.messages))
ms.lock.Lock()
if !ms.rotated {
messages = make([]*protocol.GroupMessage, ms.pos)
copy(messages, ms.messages[0:ms.pos])
} else {
messages = make([]*protocol.GroupMessage, ms.bufferSize)
copy(messages, ms.messages)
}
ms.lock.Unlock()
return
}
@ -69,7 +90,7 @@ func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) {
// AddMessage adds a GroupMessage to the store
func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) {
ms.lock.Lock()
ms.messages = append(ms.messages, &gm)
ms.updateBuffer(&gm)
s, err := json.Marshal(gm)
if err != nil {
log.Printf("[ERROR] Failed to unmarshal group message %v\n", err)

View File

@ -10,8 +10,8 @@ import (
func TestMessageStore(t *testing.T) {
os.Remove("ms.test")
ms := new(MessageStore)
ms.Init("ms.test")
for i := 0; i < 100000; i++ {
ms.Init("ms.test", 100000)
for i := 0; i < 50000; i++ {
gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
Spamguard: []byte{},
@ -19,11 +19,25 @@ func TestMessageStore(t *testing.T) {
ms.AddMessage(gm)
}
ms.Close()
ms.Init("ms.test")
ms.Init("ms.test", 100000)
m := ms.FetchMessages()
if len(m) != 50000 {
t.Errorf("Should have been 5000 was %v", len(m))
}
for i := 0; i < 100000; i++ {
gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
Spamguard: []byte{},
}
ms.AddMessage(gm)
}
m = ms.FetchMessages()
if len(m) != 100000 {
t.Errorf("Should have been 100000 was %v", len(m))
}
ms.Close()
os.Remove("ms.test")
}

View File

@ -1,19 +1,20 @@
package testing
import (
"crypto/rsa"
"cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer"
"cwtch.im/cwtch/peer/connections"
cwtchserver "cwtch.im/cwtch/server"
"fmt"
"github.com/s-rah/go-ricochet"
"github.com/s-rah/go-ricochet/utils"
"io/ioutil"
"log"
"os"
"runtime"
"testing"
"time"
"crypto/rsa"
"io/ioutil"
"log"
)
const (
@ -34,9 +35,9 @@ func loadOrGenPrivateKey(t *testing.T) (pk *rsa.PrivateKey, generated bool) {
if err != nil {
t.Fatalf("error generating new private key: %v\n", err)
}
err = ioutil.WriteFile(localKeyfile, []byte(utils.PrivateKeyToString(pk)), 0400)
err = ioutil.WriteFile(localKeyfile, []byte(utils.PrivateKeyToString(pk)), 0600)
if err != nil {
log.Fatalf("error writing new private key to file %s: %v\n", localKeyfile, err)
t.Fatalf("error writing new private key to file %s: %v\n", localKeyfile, err)
}
return pk, true
@ -62,15 +63,32 @@ func printAndCountVerifedTimeline(t *testing.T, timeline []model.Message) int {
func serverCheck(serverAddr string) bool {
rc, err := goricochet.Open(serverAddr)
// Won't actaully free thread because rc.start() will wait for the uncalled rc.Process to read from errorChannel...
rc.Conn.Close()
if err == nil {
// TODO should change libricochet-go to avoid accessing member to shutdown all goroutines.
rc.Conn.Close()
return true
}
return false
}
func waitForPeerConnection(t *testing.T, peer *peer.CwtchPeer, server string) {
for {
servers := peer.GetServers()
state, ok := servers[server]
if ok {
if state != connections.AUTHENTICATED {
time.Sleep(time.Second * 10)
continue
}
} else {
t.Fatalf("peer server connectiond %v should have entry for server %v", servers, server)
}
break
}
return
}
func TestCwtchPeerIntegration(t *testing.T) {
// Hide logging "noise"
log.SetOutput(ioutil.Discard)
@ -91,8 +109,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
// launch app
server = new(cwtchserver.Server)
fmt.Printf("No server found\nStarting cwtch server...\n")
go server.Run(localKeyfile)
go server.Run(localKeyfile, 100)
// let tor get established
fmt.Printf("Establishing Tor hidden service: %v...\n", serverAddr)
@ -141,12 +158,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Println("Alice joining server...")
alice.JoinServer(serverAddr)
fmt.Println("Bob joining server...")
bob.JoinServer(serverAddr)
fmt.Println("Waiting for peerings and server joins...")
time.Sleep(time.Second * 60)
fmt.Println("Alice inviting Bob to group...")
@ -195,6 +210,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
time.Sleep(time.Second * 110)
*/
// Wait for them to join the server
waitForPeerConnection(t, alice, serverAddr)
waitForPeerConnection(t, bob, serverAddr)
//numGouRoutinesPostServerConnect := runtime.NumGoroutine()
// ***** Conversation *****
@ -244,8 +263,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Println("Carol joining server...")
carol.JoinServer(serverAddr)
time.Sleep(time.Second * 60)
waitForPeerConnection(t, carol, serverAddr)
numGoRotinesPostCarolConnect := runtime.NumGoroutine()
fmt.Println("Bob> ", bobLines[2])