package connections import ( "crypto/rand" "cwtch.im/cwtch/event" "cwtch.im/cwtch/protocol" "cwtch.im/cwtch/protocol/connections/fetch" "cwtch.im/cwtch/protocol/connections/listen" "cwtch.im/cwtch/protocol/connections/send" "errors" "git.openprivacy.ca/openprivacy/libricochet-go" "git.openprivacy.ca/openprivacy/libricochet-go/channels" "git.openprivacy.ca/openprivacy/libricochet-go/connection" "git.openprivacy.ca/openprivacy/libricochet-go/identity" "git.openprivacy.ca/openprivacy/libricochet-go/log" "golang.org/x/crypto/ed25519" "sync" "time" ) // PeerServerConnection encapsulates a single Peer->Server connection type PeerServerConnection struct { connection.AutoConnectionHandler Server string stateMutex sync.Mutex state ConnectionState connection *connection.Connection protocolEngine Engine GroupMessageHandler func(string, *protocol.GroupMessage) } // NewPeerServerConnection creates a new Peer->Server outbound connection func NewPeerServerConnection(engine Engine, serverhostname string) *PeerServerConnection { psc := new(PeerServerConnection) psc.protocolEngine = engine psc.Server = serverhostname psc.setState(DISCONNECTED) psc.Init() return psc } // GetState returns the current connection state func (psc *PeerServerConnection) GetState() ConnectionState { psc.stateMutex.Lock() defer psc.stateMutex.Unlock() return psc.state } func (psc *PeerServerConnection) setState(state ConnectionState) { log.Debugf("Setting State to %v for %v\n", ConnectionStateName[state], psc.Server) psc.stateMutex.Lock() defer psc.stateMutex.Unlock() psc.state = state psc.protocolEngine.EventManager().Publish(event.NewEvent(event.ServerStateChange, map[event.Field]string{ event.GroupServer: string(psc.Server), event.ConnectionState: ConnectionStateName[state], })) } // WaitTilSynced waits until the underlying connection is authenticated func (psc *PeerServerConnection) WaitTilSynced() { for { if psc.GetState() == SYNCED { break } time.Sleep(time.Second * 1) } } // Run manages the setup and teardown of a peer server connection func (psc *PeerServerConnection) Run() error { log.Infof("Connecting to %v", psc.Server) psc.setState(CONNECTING) rc, err := goricochet.Open(psc.protocolEngine.ACN(), psc.Server) if err == nil { psc.connection = rc if psc.GetState() == KILLED { return nil } psc.setState(CONNECTED) pub, priv, err := ed25519.GenerateKey(rand.Reader) if err == nil { _, err := connection.HandleOutboundConnection(psc.connection).ProcessAuthAsV3Client(identity.InitializeV3("cwtchpeer", &priv, &pub)) if err == nil { if psc.GetState() == KILLED { return nil } psc.setState(AUTHENTICATED) go func() { psc.connection.Do(func() error { psc.connection.RequestOpenChannel("im.cwtch.server.fetch", &fetch.CwtchPeerFetchChannel{Handler: psc}) return nil }) psc.connection.Do(func() error { psc.connection.RequestOpenChannel("im.cwtch.server.listen", &listen.CwtchPeerListenChannel{Handler: psc}) return nil }) }() psc.connection.Process(psc) } } } psc.setState(FAILED) return err } // Break makes Run() return and prevents processing, but doesn't close the connection. func (psc *PeerServerConnection) Break() error { return psc.connection.Break() } // SendGroupMessage sends the given protocol message to the Server. func (psc *PeerServerConnection) SendGroupMessage(gm *protocol.GroupMessage) error { if psc.state != SYNCED { return errors.New("peer is not yet connected & authenticated & synced to server cannot send message") } err := psc.connection.Do(func() error { psc.connection.RequestOpenChannel("im.cwtch.server.send", &send.CwtchPeerSendChannel{}) return nil }) errCount := 0 for errCount < 5 { time.Sleep(time.Second * time.Duration(errCount+1)) // back off retry err = psc.connection.Do(func() error { channel := psc.connection.Channel("im.cwtch.server.send", channels.Outbound) if channel == nil { return errors.New("no channel found") } sendchannel, ok := channel.Handler.(*send.CwtchPeerSendChannel) if ok { return sendchannel.SendGroupMessage(gm) } return errors.New("channel is not a peer send channel (this should definitely not happen)") }) if err != nil { errCount++ } else { return nil } } return err } // Close shuts down the connection (freeing the handler goroutines) func (psc *PeerServerConnection) Close() { psc.setState(KILLED) if psc.connection != nil { psc.connection.Close() } } // HandleGroupMessage passes the given group message back to the profile. func (psc *PeerServerConnection) HandleGroupMessage(gm *protocol.GroupMessage) { psc.GroupMessageHandler(psc.Server, gm) } // HandleFetchDone calls the supplied callback for when a fetch connection is closed func (psc *PeerServerConnection) HandleFetchDone() { psc.setState(SYNCED) }