Fixup Integ Test + Limit Messages in Server Memory #36

Merged
dan merged 1 commits from integ_test_fix into master 2018-06-09 21:21:43 +00:00
10 changed files with 129 additions and 77 deletions

View File

@ -12,7 +12,7 @@ type Manager struct {
peerConnections map[string]*PeerPeerConnection peerConnections map[string]*PeerPeerConnection
serverConnections map[string]*PeerServerConnection serverConnections map[string]*PeerServerConnection
lock sync.Mutex lock sync.Mutex
breakChannel chan bool breakChannel chan bool
} }
// NewConnectionsManager creates a new instance of Manager. // NewConnectionsManager creates a new instance of Manager.
@ -96,27 +96,27 @@ func (m *Manager) AttemptReconnections() {
for { for {
select { select {
case <-time.After(timeout * time.Second): case <-time.After(timeout * time.Second):
m.lock.Lock() m.lock.Lock()
for _, ppc := range m.peerConnections { for _, ppc := range m.peerConnections {
if ppc.GetState() == FAILED { if ppc.GetState() == FAILED {
go ppc.Run() go ppc.Run()
}
} }
m.lock.Unlock() }
m.lock.Unlock()
m.lock.Lock() m.lock.Lock()
for _, psc := range m.serverConnections { for _, psc := range m.serverConnections {
if psc.GetState() == FAILED { if psc.GetState() == FAILED {
go psc.Run() go psc.Run()
}
} }
m.lock.Unlock() }
m.lock.Unlock()
// Launch Another Run In 30 Seconds // Launch Another Run In 30 Seconds
timeout = time.Duration(30) timeout = time.Duration(30)
case <-m.breakChannel: case <-m.breakChannel:
return return
} }
} }
} }

View File

@ -109,4 +109,3 @@ func (ppc *PeerPeerConnection) Close() {
ppc.state = KILLED ppc.state = KILLED
ppc.connection.Conn.Close() ppc.connection.Conn.Close()
} }

View File

@ -20,7 +20,7 @@ import (
type CwtchPeer struct { type CwtchPeer struct {
connection.AutoConnectionHandler connection.AutoConnectionHandler
Profile *model.Profile Profile *model.Profile
app *application.RicochetApplication app *application.RicochetApplication
mutex sync.Mutex mutex sync.Mutex
Log chan string `json:"-"` Log chan string `json:"-"`
connectionsManager *connections.Manager connectionsManager *connections.Manager

View File

@ -14,9 +14,9 @@ var _ = fmt.Errorf
var _ = math.Inf var _ = math.Inf
type CwtchServerPacket struct { type CwtchServerPacket struct {
GroupMessage *GroupMessage `protobuf:"bytes,1,opt,name=group_message,json=groupMessage" json:"group_message,omitempty"` GroupMessage *GroupMessage `protobuf:"bytes,1,opt,name=group_message,json=groupMessage" json:"group_message,omitempty"`
FetchMessage *FetchMessage `protobuf:"bytes,2,opt,name=fetch_message,json=fetchMessage" json:"fetch_message,omitempty"` FetchMessage *FetchMessage `protobuf:"bytes,2,opt,name=fetch_message,json=fetchMessage" json:"fetch_message,omitempty"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
} }
func (m *CwtchServerPacket) Reset() { *m = CwtchServerPacket{} } func (m *CwtchServerPacket) Reset() { *m = CwtchServerPacket{} }

View File

@ -31,5 +31,6 @@ func main() {
server := new(cwtchserver.Server) server := new(cwtchserver.Server)
log.Printf("starting cwtch server...") log.Printf("starting cwtch server...")
server.Run(privateKeyFile) // TODO load params from .cwtch/server.conf or command line flag
server.Run(privateKeyFile, 100000)
} }

View File

@ -16,10 +16,9 @@ type Server struct {
app *application.RicochetApplication app *application.RicochetApplication
} }
// Run s // Run starts a server with the given privateKey
// tarts a server with the given privateKey
// TODO: surface errors // TODO: surface errors
func (s *Server) Run(privateKeyFile string) { func (s *Server) Run(privateKeyFile string, bufferSize int) {
cwtchserver := new(application.RicochetApplication) cwtchserver := new(application.RicochetApplication)
pk, err := utils.LoadPrivateKeyFromFile(privateKeyFile) pk, err := utils.LoadPrivateKeyFromFile(privateKeyFile)
@ -37,7 +36,7 @@ func (s *Server) Run(privateKeyFile string) {
af := application.ApplicationInstanceFactory{} af := application.ApplicationInstanceFactory{}
af.Init() af.Init()
ms := new(storage.MessageStore) ms := new(storage.MessageStore)
ms.Init("cwtch.messages") ms.Init("cwtch.messages", bufferSize)
af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler { af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler {
si := new(Instance) si := new(Instance)
si.Init(rai, cwtchserver, ms) si.Init(rai, cwtchserver, ms)

View File

@ -15,7 +15,7 @@ func TestServerInstance(t *testing.T) {
ra := new(application.RicochetApplication) ra := new(application.RicochetApplication)
msi := new(storage.MessageStore) msi := new(storage.MessageStore)
os.Remove("ms.test") os.Remove("ms.test")
msi.Init("ms.test") msi.Init("ms.test", 5)
gm := protocol.GroupMessage{ gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."), Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."),
Spamguard: []byte{}, Spamguard: []byte{},
@ -26,7 +26,7 @@ func TestServerInstance(t *testing.T) {
res := si.HandleFetchRequest() res := si.HandleFetchRequest()
if len(res) != 1 { if len(res) != 1 {
t.Errorf("Expected Group Messages Instead got %v", res) t.Errorf("Expected 1 Group Messages Instead got %v", res)
} }
// ra.HandleApplicationInstance(ai) // ra.HandleApplicationInstance(ai)

View File

@ -18,9 +18,12 @@ type MessageStoreInterface interface {
// MessageStore is a file-backed implementation of MessageStoreInterface // MessageStore is a file-backed implementation of MessageStoreInterface
type MessageStore struct { type MessageStore struct {
file *os.File file *os.File
lock sync.Mutex lock sync.Mutex
messages []*protocol.GroupMessage messages []*protocol.GroupMessage
bufferSize int
pos int
rotated bool
} }
// Close closes the message store and underlying resources. // Close closes the message store and underlying resources.
@ -31,13 +34,26 @@ func (ms *MessageStore) Close() {
ms.lock.Unlock() ms.lock.Unlock()
} }
// Init sets up a MessageStore backed by filename func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) {
func (ms *MessageStore) Init(filename string) { ms.messages[ms.pos] = gm
ms.pos++
if ms.pos == ms.bufferSize {
ms.pos = 0
ms.rotated = true
}
}
// Init sets up a MessageStore of size bufferSize backed by filename
func (ms *MessageStore) Init(filename string, bufferSize int) {
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
if err != nil { if err != nil {
panic(err) panic(err)
} }
ms.file = f ms.file = f
ms.pos = 0
ms.bufferSize = bufferSize
ms.messages = make([]*protocol.GroupMessage, bufferSize)
ms.rotated = false
scanner := bufio.NewScanner(f) scanner := bufio.NewScanner(f)
for scanner.Scan() { for scanner.Scan() {
@ -45,7 +61,7 @@ func (ms *MessageStore) Init(filename string) {
gm := &protocol.GroupMessage{} gm := &protocol.GroupMessage{}
err := json.Unmarshal([]byte(gms), gm) err := json.Unmarshal([]byte(gms), gm)
if err == nil { if err == nil {
ms.messages = append(ms.messages, gm) ms.updateBuffer(gm)
} else { } else {
panic(err) panic(err)
} }
@ -59,9 +75,14 @@ func (ms *MessageStore) Init(filename string) {
// FetchMessages returns all messages from the backing file. // FetchMessages returns all messages from the backing file.
func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) { func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) {
messages = make([]*protocol.GroupMessage, len(ms.messages))
ms.lock.Lock() ms.lock.Lock()
copy(messages, ms.messages) if !ms.rotated {
messages = make([]*protocol.GroupMessage, ms.pos)
copy(messages, ms.messages[0:ms.pos])
} else {
messages = make([]*protocol.GroupMessage, ms.bufferSize)
copy(messages, ms.messages)
}
ms.lock.Unlock() ms.lock.Unlock()
return return
} }
@ -69,7 +90,7 @@ func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) {
// AddMessage adds a GroupMessage to the store // AddMessage adds a GroupMessage to the store
func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) { func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) {
ms.lock.Lock() ms.lock.Lock()
ms.messages = append(ms.messages, &gm) ms.updateBuffer(&gm)
s, err := json.Marshal(gm) s, err := json.Marshal(gm)
if err != nil { if err != nil {
log.Printf("[ERROR] Failed to unmarshal group message %v\n", err) log.Printf("[ERROR] Failed to unmarshal group message %v\n", err)

View File

@ -10,8 +10,8 @@ import (
func TestMessageStore(t *testing.T) { func TestMessageStore(t *testing.T) {
os.Remove("ms.test") os.Remove("ms.test")
ms := new(MessageStore) ms := new(MessageStore)
ms.Init("ms.test") ms.Init("ms.test", 100000)
for i := 0; i < 100000; i++ { for i := 0; i < 50000; i++ {
gm := protocol.GroupMessage{ gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
Spamguard: []byte{}, Spamguard: []byte{},
@ -19,11 +19,25 @@ func TestMessageStore(t *testing.T) {
ms.AddMessage(gm) ms.AddMessage(gm)
} }
ms.Close() ms.Close()
ms.Init("ms.test") ms.Init("ms.test", 100000)
m := ms.FetchMessages() m := ms.FetchMessages()
if len(m) != 50000 {
t.Errorf("Should have been 5000 was %v", len(m))
}
for i := 0; i < 100000; i++ {
gm := protocol.GroupMessage{
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
Spamguard: []byte{},
}
ms.AddMessage(gm)
}
m = ms.FetchMessages()
if len(m) != 100000 { if len(m) != 100000 {
t.Errorf("Should have been 100000 was %v", len(m)) t.Errorf("Should have been 100000 was %v", len(m))
} }
ms.Close() ms.Close()
os.Remove("ms.test") os.Remove("ms.test")
} }

View File

@ -1,24 +1,25 @@
package testing package testing
import ( import (
"crypto/rsa"
"cwtch.im/cwtch/model" "cwtch.im/cwtch/model"
"cwtch.im/cwtch/peer" "cwtch.im/cwtch/peer"
"cwtch.im/cwtch/peer/connections"
cwtchserver "cwtch.im/cwtch/server" cwtchserver "cwtch.im/cwtch/server"
"fmt" "fmt"
"github.com/s-rah/go-ricochet" "github.com/s-rah/go-ricochet"
"github.com/s-rah/go-ricochet/utils" "github.com/s-rah/go-ricochet/utils"
"io/ioutil"
"log"
"os" "os"
"runtime" "runtime"
"testing" "testing"
"time" "time"
"crypto/rsa"
"io/ioutil"
"log"
) )
const ( const (
serverKeyfile = "./../server/app/private_key" serverKeyfile = "./../server/app/private_key"
localKeyfile = "./private_key" localKeyfile = "./private_key"
) )
var ( var (
@ -34,12 +35,12 @@ func loadOrGenPrivateKey(t *testing.T) (pk *rsa.PrivateKey, generated bool) {
if err != nil { if err != nil {
t.Fatalf("error generating new private key: %v\n", err) t.Fatalf("error generating new private key: %v\n", err)
} }
err = ioutil.WriteFile(localKeyfile, []byte(utils.PrivateKeyToString(pk)), 0400) err = ioutil.WriteFile(localKeyfile, []byte(utils.PrivateKeyToString(pk)), 0600)
if err != nil { if err != nil {
log.Fatalf("error writing new private key to file %s: %v\n", localKeyfile, err) t.Fatalf("error writing new private key to file %s: %v\n", localKeyfile, err)
} }
return pk,true return pk, true
} }
fmt.Println("Found server key " + serverKeyfile + ", loading...") fmt.Println("Found server key " + serverKeyfile + ", loading...")
pk, err := utils.LoadPrivateKeyFromFile(serverKeyfile) pk, err := utils.LoadPrivateKeyFromFile(serverKeyfile)
@ -62,15 +63,32 @@ func printAndCountVerifedTimeline(t *testing.T, timeline []model.Message) int {
func serverCheck(serverAddr string) bool { func serverCheck(serverAddr string) bool {
rc, err := goricochet.Open(serverAddr) rc, err := goricochet.Open(serverAddr)
// Won't actaully free thread because rc.start() will wait for the uncalled rc.Process to read from errorChannel...
rc.Conn.Close()
if err == nil { if err == nil {
// TODO should change libricochet-go to avoid accessing member to shutdown all goroutines.
rc.Conn.Close()
return true return true
} }
return false return false
} }
func waitForPeerConnection(t *testing.T, peer *peer.CwtchPeer, server string) {
for {
servers := peer.GetServers()
state, ok := servers[server]
if ok {
if state != connections.AUTHENTICATED {
time.Sleep(time.Second * 10)
continue
}
} else {
t.Fatalf("peer server connectiond %v should have entry for server %v", servers, server)
}
break
}
return
}
func TestCwtchPeerIntegration(t *testing.T) { func TestCwtchPeerIntegration(t *testing.T) {
// Hide logging "noise" // Hide logging "noise"
log.SetOutput(ioutil.Discard) log.SetOutput(ioutil.Discard)
@ -91,8 +109,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
// launch app // launch app
server = new(cwtchserver.Server) server = new(cwtchserver.Server)
fmt.Printf("No server found\nStarting cwtch server...\n") fmt.Printf("No server found\nStarting cwtch server...\n")
go server.Run(localKeyfile, 100)
go server.Run(localKeyfile)
// let tor get established // let tor get established
fmt.Printf("Establishing Tor hidden service: %v...\n", serverAddr) fmt.Printf("Establishing Tor hidden service: %v...\n", serverAddr)
@ -141,12 +158,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Println("Alice joining server...") fmt.Println("Alice joining server...")
alice.JoinServer(serverAddr) alice.JoinServer(serverAddr)
fmt.Println("Bob joining server...") fmt.Println("Bob joining server...")
bob.JoinServer(serverAddr) bob.JoinServer(serverAddr)
fmt.Println("Waiting for peerings and server joins...") fmt.Println("Waiting for peerings and server joins...")
time.Sleep(time.Second * 60) time.Sleep(time.Second * 60)
fmt.Println("Alice inviting Bob to group...") fmt.Println("Alice inviting Bob to group...")
@ -172,29 +187,33 @@ func TestCwtchPeerIntegration(t *testing.T) {
// ***** Fill up message history of server ****** // ***** Fill up message history of server ******
/* /*
// filler group will be used to fill up the servers message history a bit to stress test fetch later for carol // filler group will be used to fill up the servers message history a bit to stress test fetch later for carol
fillerGroupId, _, err := alice.Profile.StartGroup(serverAddr) fillerGroupId, _, err := alice.Profile.StartGroup(serverAddr)
if err != nil { if err != nil {
t.Errorf("Failed to init filler group: %v", err) t.Errorf("Failed to init filler group: %v", err)
return return
} }
fmt.Println("Alice filling message history of server...") fmt.Println("Alice filling message history of server...")
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
go func (x int) { go func (x int) {
time.Sleep(time.Second * time.Duration(x)) time.Sleep(time.Second * time.Duration(x))
err := alice.SendMessageToGroup(fillerGroupId, aliceLines[0]) err := alice.SendMessageToGroup(fillerGroupId, aliceLines[0])
if err != nil { if err != nil {
fmt.Println("SEND", x, "ERROR:", err) fmt.Println("SEND", x, "ERROR:", err)
} else { } else {
fmt.Println("SEND", x, " SUCCESS!") fmt.Println("SEND", x, " SUCCESS!")
} }
}(i) }(i)
} }
time.Sleep(time.Second * 110) time.Sleep(time.Second * 110)
*/ */
// Wait for them to join the server
waitForPeerConnection(t, alice, serverAddr)
waitForPeerConnection(t, bob, serverAddr)
//numGouRoutinesPostServerConnect := runtime.NumGoroutine()
// ***** Conversation ***** // ***** Conversation *****
@ -244,8 +263,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
fmt.Println("Carol joining server...") fmt.Println("Carol joining server...")
carol.JoinServer(serverAddr) carol.JoinServer(serverAddr)
waitForPeerConnection(t, carol, serverAddr)
time.Sleep(time.Second * 60)
numGoRotinesPostCarolConnect := runtime.NumGoroutine() numGoRotinesPostCarolConnect := runtime.NumGoroutine()
fmt.Println("Bob> ", bobLines[2]) fmt.Println("Bob> ", bobLines[2])
@ -343,7 +361,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
numGoRoutinesPostCarol := runtime.NumGoroutine() numGoRoutinesPostCarol := runtime.NumGoroutine()
fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n" + fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
"numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\n", "numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\n",
numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostCarol) numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostCarol)