diff --git a/peer/connections/connectionsmanager.go b/peer/connections/connectionsmanager.go index 307c4ff..31863e7 100644 --- a/peer/connections/connectionsmanager.go +++ b/peer/connections/connectionsmanager.go @@ -12,7 +12,7 @@ type Manager struct { peerConnections map[string]*PeerPeerConnection serverConnections map[string]*PeerServerConnection lock sync.Mutex - breakChannel chan bool + breakChannel chan bool } // NewConnectionsManager creates a new instance of Manager. @@ -96,27 +96,27 @@ func (m *Manager) AttemptReconnections() { for { select { - case <-time.After(timeout * time.Second): - m.lock.Lock() - for _, ppc := range m.peerConnections { - if ppc.GetState() == FAILED { - go ppc.Run() - } + case <-time.After(timeout * time.Second): + m.lock.Lock() + for _, ppc := range m.peerConnections { + if ppc.GetState() == FAILED { + go ppc.Run() } - m.lock.Unlock() + } + m.lock.Unlock() - m.lock.Lock() - for _, psc := range m.serverConnections { - if psc.GetState() == FAILED { - go psc.Run() - } + m.lock.Lock() + for _, psc := range m.serverConnections { + if psc.GetState() == FAILED { + go psc.Run() } - m.lock.Unlock() + } + m.lock.Unlock() - // Launch Another Run In 30 Seconds - timeout = time.Duration(30) - case <-m.breakChannel: - return + // Launch Another Run In 30 Seconds + timeout = time.Duration(30) + case <-m.breakChannel: + return } } } diff --git a/peer/connections/peerpeerconnection.go b/peer/connections/peerpeerconnection.go index ecf12dd..4eab248 100644 --- a/peer/connections/peerpeerconnection.go +++ b/peer/connections/peerpeerconnection.go @@ -109,4 +109,3 @@ func (ppc *PeerPeerConnection) Close() { ppc.state = KILLED ppc.connection.Conn.Close() } - diff --git a/peer/cwtch_peer.go b/peer/cwtch_peer.go index 60b616b..2299dea 100644 --- a/peer/cwtch_peer.go +++ b/peer/cwtch_peer.go @@ -20,7 +20,7 @@ import ( type CwtchPeer struct { connection.AutoConnectionHandler Profile *model.Profile - app *application.RicochetApplication + app *application.RicochetApplication mutex sync.Mutex Log chan string `json:"-"` connectionsManager *connections.Manager diff --git a/protocol/group_message.pb.go b/protocol/group_message.pb.go index 596200c..6275df0 100644 --- a/protocol/group_message.pb.go +++ b/protocol/group_message.pb.go @@ -14,9 +14,9 @@ var _ = fmt.Errorf var _ = math.Inf type CwtchServerPacket struct { - GroupMessage *GroupMessage `protobuf:"bytes,1,opt,name=group_message,json=groupMessage" json:"group_message,omitempty"` - FetchMessage *FetchMessage `protobuf:"bytes,2,opt,name=fetch_message,json=fetchMessage" json:"fetch_message,omitempty"` - XXX_unrecognized []byte `json:"-"` + GroupMessage *GroupMessage `protobuf:"bytes,1,opt,name=group_message,json=groupMessage" json:"group_message,omitempty"` + FetchMessage *FetchMessage `protobuf:"bytes,2,opt,name=fetch_message,json=fetchMessage" json:"fetch_message,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *CwtchServerPacket) Reset() { *m = CwtchServerPacket{} } diff --git a/server/app/main.go b/server/app/main.go index 5a45fd9..846b687 100644 --- a/server/app/main.go +++ b/server/app/main.go @@ -31,5 +31,6 @@ func main() { server := new(cwtchserver.Server) log.Printf("starting cwtch server...") - server.Run(privateKeyFile) + // TODO load params from .cwtch/server.conf or command line flag + server.Run(privateKeyFile, 100000) } diff --git a/server/server.go b/server/server.go index b0a2246..1feee82 100644 --- a/server/server.go +++ b/server/server.go @@ -16,10 +16,9 @@ type Server struct { app *application.RicochetApplication } -// Run s -// tarts a server with the given privateKey +// Run starts a server with the given privateKey // TODO: surface errors -func (s *Server) Run(privateKeyFile string) { +func (s *Server) Run(privateKeyFile string, bufferSize int) { cwtchserver := new(application.RicochetApplication) pk, err := utils.LoadPrivateKeyFromFile(privateKeyFile) @@ -37,7 +36,7 @@ func (s *Server) Run(privateKeyFile string) { af := application.ApplicationInstanceFactory{} af.Init() ms := new(storage.MessageStore) - ms.Init("cwtch.messages") + ms.Init("cwtch.messages", bufferSize) af.AddHandler("im.cwtch.server.listen", func(rai *application.ApplicationInstance) func() channels.Handler { si := new(Instance) si.Init(rai, cwtchserver, ms) diff --git a/server/server_instance_test.go b/server/server_instance_test.go index 4363a72..99671c0 100644 --- a/server/server_instance_test.go +++ b/server/server_instance_test.go @@ -15,7 +15,7 @@ func TestServerInstance(t *testing.T) { ra := new(application.RicochetApplication) msi := new(storage.MessageStore) os.Remove("ms.test") - msi.Init("ms.test") + msi.Init("ms.test", 5) gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here."), Spamguard: []byte{}, @@ -26,7 +26,7 @@ func TestServerInstance(t *testing.T) { res := si.HandleFetchRequest() if len(res) != 1 { - t.Errorf("Expected Group Messages Instead got %v", res) + t.Errorf("Expected 1 Group Messages Instead got %v", res) } // ra.HandleApplicationInstance(ai) diff --git a/storage/message_store.go b/storage/message_store.go index 0bf0caa..7841d8a 100644 --- a/storage/message_store.go +++ b/storage/message_store.go @@ -18,9 +18,12 @@ type MessageStoreInterface interface { // MessageStore is a file-backed implementation of MessageStoreInterface type MessageStore struct { - file *os.File - lock sync.Mutex - messages []*protocol.GroupMessage + file *os.File + lock sync.Mutex + messages []*protocol.GroupMessage + bufferSize int + pos int + rotated bool } // Close closes the message store and underlying resources. @@ -31,13 +34,26 @@ func (ms *MessageStore) Close() { ms.lock.Unlock() } -// Init sets up a MessageStore backed by filename -func (ms *MessageStore) Init(filename string) { +func (ms *MessageStore) updateBuffer(gm *protocol.GroupMessage) { + ms.messages[ms.pos] = gm + ms.pos++ + if ms.pos == ms.bufferSize { + ms.pos = 0 + ms.rotated = true + } +} + +// Init sets up a MessageStore of size bufferSize backed by filename +func (ms *MessageStore) Init(filename string, bufferSize int) { f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0600) if err != nil { panic(err) } ms.file = f + ms.pos = 0 + ms.bufferSize = bufferSize + ms.messages = make([]*protocol.GroupMessage, bufferSize) + ms.rotated = false scanner := bufio.NewScanner(f) for scanner.Scan() { @@ -45,7 +61,7 @@ func (ms *MessageStore) Init(filename string) { gm := &protocol.GroupMessage{} err := json.Unmarshal([]byte(gms), gm) if err == nil { - ms.messages = append(ms.messages, gm) + ms.updateBuffer(gm) } else { panic(err) } @@ -59,9 +75,14 @@ func (ms *MessageStore) Init(filename string) { // FetchMessages returns all messages from the backing file. func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) { - messages = make([]*protocol.GroupMessage, len(ms.messages)) ms.lock.Lock() - copy(messages, ms.messages) + if !ms.rotated { + messages = make([]*protocol.GroupMessage, ms.pos) + copy(messages, ms.messages[0:ms.pos]) + } else { + messages = make([]*protocol.GroupMessage, ms.bufferSize) + copy(messages, ms.messages) + } ms.lock.Unlock() return } @@ -69,7 +90,7 @@ func (ms *MessageStore) FetchMessages() (messages []*protocol.GroupMessage) { // AddMessage adds a GroupMessage to the store func (ms *MessageStore) AddMessage(gm protocol.GroupMessage) { ms.lock.Lock() - ms.messages = append(ms.messages, &gm) + ms.updateBuffer(&gm) s, err := json.Marshal(gm) if err != nil { log.Printf("[ERROR] Failed to unmarshal group message %v\n", err) diff --git a/storage/message_store_test.go b/storage/message_store_test.go index 134d781..e6b5407 100644 --- a/storage/message_store_test.go +++ b/storage/message_store_test.go @@ -10,8 +10,8 @@ import ( func TestMessageStore(t *testing.T) { os.Remove("ms.test") ms := new(MessageStore) - ms.Init("ms.test") - for i := 0; i < 100000; i++ { + ms.Init("ms.test", 100000) + for i := 0; i < 50000; i++ { gm := protocol.GroupMessage{ Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), Spamguard: []byte{}, @@ -19,11 +19,25 @@ func TestMessageStore(t *testing.T) { ms.AddMessage(gm) } ms.Close() - ms.Init("ms.test") + ms.Init("ms.test", 100000) m := ms.FetchMessages() + if len(m) != 50000 { + t.Errorf("Should have been 5000 was %v", len(m)) + } + + for i := 0; i < 100000; i++ { + gm := protocol.GroupMessage{ + Ciphertext: []byte("Hello this is a fairly average length message that we are writing here. " + strconv.Itoa(i)), + Spamguard: []byte{}, + } + ms.AddMessage(gm) + } + + m = ms.FetchMessages() if len(m) != 100000 { t.Errorf("Should have been 100000 was %v", len(m)) } + ms.Close() os.Remove("ms.test") } diff --git a/testing/cwtch_peer_server_intergration_test.go b/testing/cwtch_peer_server_intergration_test.go index ce5dcd3..c16d0a9 100644 --- a/testing/cwtch_peer_server_intergration_test.go +++ b/testing/cwtch_peer_server_intergration_test.go @@ -1,24 +1,25 @@ package testing import ( + "crypto/rsa" "cwtch.im/cwtch/model" "cwtch.im/cwtch/peer" + "cwtch.im/cwtch/peer/connections" cwtchserver "cwtch.im/cwtch/server" "fmt" "github.com/s-rah/go-ricochet" "github.com/s-rah/go-ricochet/utils" + "io/ioutil" + "log" "os" "runtime" "testing" "time" - "crypto/rsa" - "io/ioutil" - "log" ) const ( serverKeyfile = "./../server/app/private_key" - localKeyfile = "./private_key" + localKeyfile = "./private_key" ) var ( @@ -34,12 +35,12 @@ func loadOrGenPrivateKey(t *testing.T) (pk *rsa.PrivateKey, generated bool) { if err != nil { t.Fatalf("error generating new private key: %v\n", err) } - err = ioutil.WriteFile(localKeyfile, []byte(utils.PrivateKeyToString(pk)), 0400) + err = ioutil.WriteFile(localKeyfile, []byte(utils.PrivateKeyToString(pk)), 0600) if err != nil { - log.Fatalf("error writing new private key to file %s: %v\n", localKeyfile, err) + t.Fatalf("error writing new private key to file %s: %v\n", localKeyfile, err) } - return pk,true + return pk, true } fmt.Println("Found server key " + serverKeyfile + ", loading...") pk, err := utils.LoadPrivateKeyFromFile(serverKeyfile) @@ -62,15 +63,32 @@ func printAndCountVerifedTimeline(t *testing.T, timeline []model.Message) int { func serverCheck(serverAddr string) bool { rc, err := goricochet.Open(serverAddr) - // Won't actaully free thread because rc.start() will wait for the uncalled rc.Process to read from errorChannel... - rc.Conn.Close() if err == nil { + // TODO should change libricochet-go to avoid accessing member to shutdown all goroutines. + rc.Conn.Close() return true } return false } +func waitForPeerConnection(t *testing.T, peer *peer.CwtchPeer, server string) { + for { + servers := peer.GetServers() + state, ok := servers[server] + if ok { + if state != connections.AUTHENTICATED { + time.Sleep(time.Second * 10) + continue + } + } else { + t.Fatalf("peer server connectiond %v should have entry for server %v", servers, server) + } + break + } + return +} + func TestCwtchPeerIntegration(t *testing.T) { // Hide logging "noise" log.SetOutput(ioutil.Discard) @@ -91,8 +109,7 @@ func TestCwtchPeerIntegration(t *testing.T) { // launch app server = new(cwtchserver.Server) fmt.Printf("No server found\nStarting cwtch server...\n") - - go server.Run(localKeyfile) + go server.Run(localKeyfile, 100) // let tor get established fmt.Printf("Establishing Tor hidden service: %v...\n", serverAddr) @@ -141,12 +158,10 @@ func TestCwtchPeerIntegration(t *testing.T) { fmt.Println("Alice joining server...") alice.JoinServer(serverAddr) - fmt.Println("Bob joining server...") bob.JoinServer(serverAddr) fmt.Println("Waiting for peerings and server joins...") - time.Sleep(time.Second * 60) fmt.Println("Alice inviting Bob to group...") @@ -172,29 +187,33 @@ func TestCwtchPeerIntegration(t *testing.T) { // ***** Fill up message history of server ****** /* - // filler group will be used to fill up the servers message history a bit to stress test fetch later for carol - fillerGroupId, _, err := alice.Profile.StartGroup(serverAddr) - if err != nil { - t.Errorf("Failed to init filler group: %v", err) - return - } + // filler group will be used to fill up the servers message history a bit to stress test fetch later for carol + fillerGroupId, _, err := alice.Profile.StartGroup(serverAddr) + if err != nil { + t.Errorf("Failed to init filler group: %v", err) + return + } - fmt.Println("Alice filling message history of server...") - for i := 0; i < 100; i++ { + fmt.Println("Alice filling message history of server...") + for i := 0; i < 100; i++ { - go func (x int) { - time.Sleep(time.Second * time.Duration(x)) - err := alice.SendMessageToGroup(fillerGroupId, aliceLines[0]) - if err != nil { - fmt.Println("SEND", x, "ERROR:", err) - } else { - fmt.Println("SEND", x, " SUCCESS!") - } - }(i) - } + go func (x int) { + time.Sleep(time.Second * time.Duration(x)) + err := alice.SendMessageToGroup(fillerGroupId, aliceLines[0]) + if err != nil { + fmt.Println("SEND", x, "ERROR:", err) + } else { + fmt.Println("SEND", x, " SUCCESS!") + } + }(i) + } - time.Sleep(time.Second * 110) + time.Sleep(time.Second * 110) */ + // Wait for them to join the server + waitForPeerConnection(t, alice, serverAddr) + waitForPeerConnection(t, bob, serverAddr) + //numGouRoutinesPostServerConnect := runtime.NumGoroutine() // ***** Conversation ***** @@ -244,8 +263,7 @@ func TestCwtchPeerIntegration(t *testing.T) { fmt.Println("Carol joining server...") carol.JoinServer(serverAddr) - - time.Sleep(time.Second * 60) + waitForPeerConnection(t, carol, serverAddr) numGoRotinesPostCarolConnect := runtime.NumGoroutine() fmt.Println("Bob> ", bobLines[2]) @@ -343,7 +361,7 @@ func TestCwtchPeerIntegration(t *testing.T) { time.Sleep(time.Second * 3) numGoRoutinesPostCarol := runtime.NumGoroutine() - fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n" + + fmt.Printf("numGoRoutinesStart: %v\nnumGoRoutinesPostServer: %v\nnumGoRoutinesPostPeerStart: %v\nnumGoRoutinesPostPeerAndServerConnect: %v\n"+ "numGoRoutinesPostAlice: %v\nnumGoRotinesPostCarolConnect: %v\nnumGoRoutinesPostBob: %v\nnumGoRoutinesPostServerShutdown: %v\nnumGoRoutinesPostCarol: %v\n", numGoRoutinesStart, numGoRoutinesPostServer, numGoRoutinesPostPeerStart, numGoRoutinesPostServerConnect, numGoRoutinesPostAlice, numGoRotinesPostCarolConnect, numGoRoutinesPostBob, numGoRoutinesPostServerShutdown, numGoRoutinesPostCarol)