Fixup Integ Test + Limit Messages in Server Memory #36
|
@ -12,7 +12,7 @@ type Manager struct {
|
|||
peerConnections map[string]*PeerPeerConnection
|
||||
serverConnections map[string]*PeerServerConnection
|
||||
lock sync.Mutex
|
||||
breakChannel chan bool
|
||||
breakChannel chan bool
|
||||
}
|
||||
|
||||
// NewConnectionsManager creates a new instance of Manager.
|
||||
|
@ -96,27 +96,27 @@ func (m *Manager) AttemptReconnections() {
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(timeout * time.Second):
|
||||
m.lock.Lock()
|
||||
for _, ppc := range m.peerConnections {
|
||||
if ppc.GetState() == FAILED {
|
||||
go ppc.Run()
|
||||
}
|
||||
case <-time.After(timeout * time.Second):
|
||||
m.lock.Lock()
|
||||
for _, ppc := range m.peerConnections {
|
||||
if ppc.GetState() == FAILED {
|
||||
go ppc.Run()
|
||||
}
|
||||
m.lock.Unlock()
|
||||
}
|
||||
m.lock.Unlock()
|
||||
|
||||
m.lock.Lock()
|
||||
for _, psc := range m.serverConnections {
|
||||
if psc.GetState() == FAILED {
|
||||
go psc.Run()
|
||||
}
|
||||
m.lock.Lock()
|
||||
for _, psc := range m.serverConnections {
|
||||
if psc.GetState() == FAILED {
|
||||
go psc.Run()
|
||||
}
|
||||
m.lock.Unlock()
|
||||
}
|
||||
m.lock.Unlock()
|
||||
|
||||
// Launch Another Run In 30 Seconds
|
||||
timeout = time.Duration(30)
|
||||
case <-m.breakChannel:
|
||||
return
|
||||
// Launch Another Run In 30 Seconds
|
||||
timeout = time.Duration(30)
|
||||
case <-m.breakChannel:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -109,4 +109,3 @@ func (ppc *PeerPeerConnection) Close() {
|
|||
ppc.state = KILLED
|
||||
ppc.connection.Conn.Close()
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ import (
|
|||
type CwtchPeer struct {
|
||||
connection.AutoConnectionHandler
|
||||
Profile *model.Profile
|
||||
app *application.RicochetApplication
|
||||
app *application.RicochetApplication
|
||||
mutex sync.Mutex
|
||||
Log chan string `json:"-"`
|
||||
connectionsManager *connections.Manager
|
||||
|
|
|
@ -14,9 +14,9 @@ var _ = fmt.Errorf
|
|||
var _ = math.Inf
|
||||
|
||||
type CwtchServerPacket struct {
|
||||
GroupMessage *GroupMessage `protobuf:"bytes,1,opt,name=group_message,json=groupMessage" json:"group_message,omitempty"`
|
||||
FetchMessage *FetchMessage `protobuf:"bytes,2,opt,name=fetch_message,json=fetchMessage" json:"fetch_message,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
GroupMessage *GroupMessage `protobuf:"bytes,1,opt,name=group_message,json=groupMessage" json:"group_message,omitempty"`
|
||||
FetchMessage *FetchMessage `protobuf:"bytes,2,opt,name=fetch_message,json=fetchMessage" json:"fetch_message,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *CwtchServerPacket) Reset() { *m = CwtchServerPacket{} }
|
||||
|
|
|
@ -31,5 +31,6 @@ func main() {
|
|||
server := new(cwtchserver.Server)
|
||||
log.Printf("starting cwtch server...")
|
||||
|
||||
server.Run(privateKeyFile)
|
||||
// TODO load params from .cwtch/server.conf or command line flag
|
||||
server.Run(privateKeyFile, 100000)
|
||||
}
|
||||
|
|
|
@ -16,10 +16,9 @@ type Server struct {
|
|||
app *application.RicochetApplication
|
||||
}
|
||||
|
||||
// Run s
|
||||
// tarts a server with the given privateKey
|
||||
// Run starts a server with the given privateKey
|
||||
// TODO: surface errors
|
||||
func (s *Server) Run(privateKeyFile string) {
|
||||
func (s *Server) Run(privateKeyFile string, bufferSize int) {
|
||||
cwtchserver := new(application.RicochetApplication)
|
||||
|
||||
pk, err := utils.LoadPrivateKeyFromFile(privateKeyFile)
|
||||
|
@ -37,7 +36,7 @@ func (s *Server) Run(privateKeyFile string) {
|
|||
af := application.ApplicationInstanceFactory{}
|
||||
af.Init()
|
||||
ms := new(storage.MessageStore)
|
||||
ms.Init("cwtch.messages")
|
||||
ms.Init("cwtch.messages", bufferSize)
|
||||
af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler {
|
||||
si := new(Instance)
|
||||
si.Init(rai, cwtchserver, ms)
|
||||
|
|
|
@ -15,7 +15,7 @@ func TestServerInstance(t *testing.T) {
|
|||
ra := new(application.RicochetApplication)
|
||||
msi := new(storage.MessageStore)
|
||||
os.Remove("ms.test")
|
||||
msi.Init("ms.test")
|
||||
msi.Init("ms.test", 5)
|
||||
gm := protocol.GroupMessage{
|
||||
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."),
|
||||
Spamguard: []byte{},
|
||||
|
@ -26,7 +26,7 @@ func TestServerInstance(t *testing.T) {
|
|||
res := si.HandleFetchRequest()
|
||||
|
||||
if len(res) != 1 {
|
||||
t.Errorf("Expected Group Messages Instead got %v", res)
|
||||
t.Errorf("Expected 1 Group Messages Instead got %v", res)
|
||||
}
|
||||
|
||||
// ra.HandleApplicationInstance(ai)
|
||||
|
|
|
@ -18,9 +18,12 @@ type MessageStoreInterface interface {
|
|||
|
||||
// MessageStore is a file-backed implementation of MessageStoreInterface
|
||||
type MessageStore struct {
|
||||
file *os.File
|
||||
lock sync.Mutex
|
||||
messages []*protocol.GroupMessage
|
||||
file *os.File
|
||||
lock sync.Mutex
|
||||
messages []*protocol.GroupMessage
|
||||
bufferSize int
|
||||
pos int
|
||||
rotated bool
|
||||
}
|
||||
|
||||
// Close closes the message store and underlying resources.
|
||||
|
@ -31,13 +34,26 @@ func (ms *MessageStore) Close() {
|
|||
ms.lock.Unlock()
|
||||
}
|
||||
|
||||
// Init sets up a MessageStore backed by filename
|
||||
func (ms *MessageStore) Init(filename string) {
|
||||
func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) {
|
||||
ms.messages[ms.pos] = gm
|
||||
ms.pos++
|
||||
if ms.pos == ms.bufferSize {
|
||||
ms.pos = 0
|
||||
ms.rotated = true
|
||||
}
|
||||
}
|
||||
|
||||
// Init sets up a MessageStore of size bufferSize backed by filename
|
||||
func (ms *MessageStore) Init(filename string, bufferSize int) {
|
||||
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ms.file = f
|
||||
ms.pos = 0
|
||||
ms.bufferSize = bufferSize
|
||||
ms.messages = make([]*protocol.GroupMessage, bufferSize)
|
||||
ms.rotated = false
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
for scanner.Scan() {
|
||||
|
@ -45,7 +61,7 @@ func (ms *MessageStore) Init(filename string) {
|
|||
gm := &protocol.GroupMessage{}
|
||||
err := json.Unmarshal([]byte(gms), gm)
|
||||
if err == nil {
|
||||
ms.messages = append(ms.messages, gm)
|
||||
ms.updateBuffer(gm)
|
||||
} else {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -59,9 +75,14 @@ func (ms *MessageStore) Init(filename string) {
|
|||
|
||||
// FetchMessages returns all messages from the backing file.
|
||||
func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) {
|
||||
messages = make([]*protocol.GroupMessage, len(ms.messages))
|
||||
ms.lock.Lock()
|
||||
copy(messages, ms.messages)
|
||||
if !ms.rotated {
|
||||
messages = make([]*protocol.GroupMessage, ms.pos)
|
||||
copy(messages, ms.messages[0:ms.pos])
|
||||
} else {
|
||||
messages = make([]*protocol.GroupMessage, ms.bufferSize)
|
||||
copy(messages, ms.messages)
|
||||
}
|
||||
ms.lock.Unlock()
|
||||
return
|
||||
}
|
||||
|
@ -69,7 +90,7 @@ func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) {
|
|||
// AddMessage adds a GroupMessage to the store
|
||||
func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) {
|
||||
ms.lock.Lock()
|
||||
ms.messages = append(ms.messages, &gm)
|
||||
ms.updateBuffer(&gm)
|
||||
s, err := json.Marshal(gm)
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] Failed to unmarshal group message %v\n", err)
|
||||
|
|
|
@ -10,8 +10,8 @@ import (
|
|||
func TestMessageStore(t *testing.T) {
|
||||
os.Remove("ms.test")
|
||||
ms := new(MessageStore)
|
||||
ms.Init("ms.test")
|
||||
for i := 0; i < 100000; i++ {
|
||||
ms.Init("ms.test", 100000)
|
||||
for i := 0; i < 50000; i++ {
|
||||
gm := protocol.GroupMessage{
|
||||
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
|
||||
Spamguard: []byte{},
|
||||
|
@ -19,11 +19,25 @@ func TestMessageStore(t *testing.T) {
|
|||
ms.AddMessage(gm)
|
||||
}
|
||||
ms.Close()
|
||||
ms.Init("ms.test")
|
||||
ms.Init("ms.test", 100000)
|
||||
m := ms.FetchMessages()
|
||||
if len(m) != 50000 {
|
||||
t.Errorf("Should have been 5000 was %v", len(m))
|
||||
}
|
||||
|
||||
for i := 0; i < 100000; i++ {
|
||||
gm := protocol.GroupMessage{
|
||||
Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)),
|
||||
Spamguard: []byte{},
|
||||
}
|
||||
ms.AddMessage(gm)
|
||||
}
|
||||
|
||||
m = ms.FetchMessages()
|
||||
if len(m) != 100000 {
|
||||
t.Errorf("Should have been 100000 was %v", len(m))
|
||||
}
|
||||
|
||||
ms.Close()
|
||||
os.Remove("ms.test")
|
||||
}
|
||||
|
|
|
@ -1,24 +1,25 @@
|
|||
package testing
|
||||
|
||||
import (
|
||||
"crypto/rsa"
|
||||
"cwtch.im/cwtch/model"
|
||||
"cwtch.im/cwtch/peer"
|
||||
"cwtch.im/cwtch/peer/connections"
|
||||
cwtchserver "cwtch.im/cwtch/server"
|
||||
"fmt"
|
||||
"github.com/s-rah/go-ricochet"
|
||||
"github.com/s-rah/go-ricochet/utils"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
"crypto/rsa"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
)
|
||||
|
||||
const (
|
||||
serverKeyfile = "./../server/app/private_key"
|
||||
localKeyfile = "./private_key"
|
||||
localKeyfile = "./private_key"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -34,12 +35,12 @@ func loadOrGenPrivateKey(t *testing.T) (pk *rsa.PrivateKey, generated bool) {
|
|||
if err != nil {
|
||||
t.Fatalf("error generating new private key: %v\n", err)
|
||||
}
|
||||
err = ioutil.WriteFile(localKeyfile, []byte(utils.PrivateKeyToString(pk)), 0400)
|
||||
err = ioutil.WriteFile(localKeyfile, []byte(utils.PrivateKeyToString(pk)), 0600)
|
||||
if err != nil {
|
||||
log.Fatalf("error writing new private key to file %s: %v\n", localKeyfile, err)
|
||||
t.Fatalf("error writing new private key to file %s: %v\n", localKeyfile, err)
|
||||
}
|
||||
|
||||
return pk,true
|
||||
return pk, true
|
||||
}
|
||||
fmt.Println("Found server key " + serverKeyfile + ", loading...")
|
||||
pk, err := utils.LoadPrivateKeyFromFile(serverKeyfile)
|
||||
|
@ -62,15 +63,32 @@ func printAndCountVerifedTimeline(t *testing.T, timeline []model.Message) int {
|
|||
|
||||
func serverCheck(serverAddr string) bool {
|
||||
rc, err := goricochet.Open(serverAddr)
|
||||
// Won't actaully free thread because rc.start() will wait for the uncalled rc.Process to read from errorChannel...
|
||||
rc.Conn.Close()
|
||||
|
||||
if err == nil {
|
||||
// TODO should change libricochet-go to avoid accessing member to shutdown all goroutines.
|
||||
rc.Conn.Close()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func waitForPeerConnection(t *testing.T, peer *peer.CwtchPeer, server string) {
|
||||
for {
|
||||
servers := peer.GetServers()
|
||||
state, ok := servers[server]
|
||||
if ok {
|
||||
if state != connections.AUTHENTICATED {
|
||||
time.Sleep(time.Second * 10)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
t.Fatalf("peer server connectiond %v should have entry for server %v", servers, server)
|
||||
}
|
||||
break
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func TestCwtchPeerIntegration(t *testing.T) {
|
||||
// Hide logging "noise"
|
||||
log.SetOutput(ioutil.Discard)
|
||||
|
@ -91,8 +109,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
// launch app
|
||||
server = new(cwtchserver.Server)
|
||||
fmt.Printf("No server found\nStarting cwtch server...\n")
|
||||
|
||||
go server.Run(localKeyfile)
|
||||
go server.Run(localKeyfile, 100)
|
||||
|
||||
// let tor get established
|
||||
fmt.Printf("Establishing Tor hidden service: %v...\n", serverAddr)
|
||||
|
@ -141,12 +158,10 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
|
||||
fmt.Println("Alice joining server...")
|
||||
alice.JoinServer(serverAddr)
|
||||
|
||||
fmt.Println("Bob joining server...")
|
||||
bob.JoinServer(serverAddr)
|
||||
|
||||
fmt.Println("Waiting for peerings and server joins...")
|
||||
|
||||
time.Sleep(time.Second * 60)
|
||||
|
||||
fmt.Println("Alice inviting Bob to group...")
|
||||
|
@ -172,29 +187,33 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
// ***** Fill up message history of server ******
|
||||
|
||||
/*
|
||||
// filler group will be used to fill up the servers message history a bit to stress test fetch later for carol
|
||||
fillerGroupId, _, err := alice.Profile.StartGroup(serverAddr)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to init filler group: %v", err)
|
||||
return
|
||||
}
|
||||
// filler group will be used to fill up the servers message history a bit to stress test fetch later for carol
|
||||
fillerGroupId, _, err := alice.Profile.StartGroup(serverAddr)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to init filler group: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("Alice filling message history of server...")
|
||||
for i := 0; i < 100; i++ {
|
||||
fmt.Println("Alice filling message history of server...")
|
||||
for i := 0; i < 100; i++ {
|
||||
|
||||
go func (x int) {
|
||||
time.Sleep(time.Second * time.Duration(x))
|
||||
err := alice.SendMessageToGroup(fillerGroupId, aliceLines[0])
|
||||
if err != nil {
|
||||
fmt.Println("SEND", x, "ERROR:", err)
|
||||
} else {
|
||||
fmt.Println("SEND", x, " SUCCESS!")
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
go func (x int) {
|
||||
time.Sleep(time.Second * time.Duration(x))
|
||||
err := alice.SendMessageToGroup(fillerGroupId, aliceLines[0])
|
||||
if err != nil {
|
||||
fmt.Println("SEND", x, "ERROR:", err)
|
||||
} else {
|
||||
fmt.Println("SEND", x, " SUCCESS!")
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 110)
|
||||
time.Sleep(time.Second * 110)
|
||||
*/
|
||||
// Wait for them to join the server
|
||||
waitForPeerConnection(t, alice, serverAddr)
|
||||
waitForPeerConnection(t, bob, serverAddr)
|
||||
//numGouRoutinesPostServerConnect := runtime.NumGoroutine()
|
||||
|
||||
// ***** Conversation *****
|
||||
|
||||
|
@ -244,8 +263,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
|
||||
fmt.Println("Carol joining server...")
|
||||
carol.JoinServer(serverAddr)
|
||||
|
||||
time.Sleep(time.Second * 60)
|
||||
waitForPeerConnection(t, carol, serverAddr)
|
||||
numGoRotinesPostCarolConnect := runtime.NumGoroutine()
|
||||
|
||||
fmt.Println("Bob> ", bobLines[2])
|
||||
|
@ -343,7 +361,7 @@ func TestCwtchPeerIntegration(t *testing.T) {
|
|||
time.Sleep(time.Second * 3)
|
||||
numGoRoutinesPostCarol := runtime.NumGoroutine()
|
||||
|
||||
fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n" +
|
||||
fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+
|
||||
"numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\n",
|
||||
numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect,
|
||||
numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostCarol)
|
||||
|
|
Loading…
Reference in New Issue