tapir-rs/src/connections/service.rs

116 lines
4.1 KiB
Rust

use crate::connections::ServiceError::{ClosedNormally, ConnectionFailed};
use crate::connections::{Connection, InboundConnection, OutboundConnection, ServiceError};
use crate::primitives::identity::Identity;
use std::net::TcpListener;
use std::sync::Arc;
use std::thread::{spawn, JoinHandle};
use socks::Socks5Stream;
use socks::TargetAddr::Domain;
pub struct NoListenService(());
pub struct ApplicationListenService(());
pub struct Service<ListenService> {
identity: Arc<Identity>,
socks_port: u16,
listen_service: ListenService,
}
impl<ListenService> Service<ListenService> {
pub fn connect<F>(&mut self, hostname: &str, application: F) -> Result<(), ServiceError>
where
F: FnOnce(Connection<OutboundConnection>) + Send + Clone + 'static,
{
let conn = Socks5Stream::connect(format!("127.0.0.1:{}", self.socks_port), Domain(format!("{}.onion", hostname), 9878));
match conn {
Ok(conn) => {
let application = application.clone();
spawn(move || application(Connection::<OutboundConnection>::new_outbound(conn.into_inner())));
Ok(())
}
Err(err) => Err(ConnectionFailed(err.to_string())),
}
}
}
impl Service<NoListenService> {
pub fn init(identity: Arc<Identity>, socks_port: u16) -> Service<NoListenService> {
Service {
identity,
socks_port,
listen_service: NoListenService(()),
}
}
pub fn listen<F>(self, port: u16, application: F) -> Result<Service<JoinHandle<ServiceError>>, ServiceError>
where
F: FnOnce(Connection<InboundConnection>) + Send + Clone + 'static,
{
let jh = spawn(move || {
let listener = TcpListener::bind(format!("127.0.0.1:{}", port));
match listener {
Ok(listener) => {
for stream in listener.incoming() {
match stream {
Ok(conn) => {
let application = application.clone();
spawn(move || application(Connection::<InboundConnection>::new_inbound(conn)));
}
Err(_) => {}
}
}
ClosedNormally
}
Err(err) => ServiceError::ListenFailed(err.to_string()),
}
});
Ok(Service {
identity: self.identity,
socks_port: self.socks_port,
listen_service: jh,
})
}
}
impl Service<JoinHandle<ServiceError>> {
pub fn close(self) {
let result = self.listen_service.join();
match result {
Ok(err) => eprintln!("{:?}", err),
_ => eprintln!("Error joining listen thread"),
}
}
}
#[cfg(test)]
mod tests {
use crate::connections::service::Service;
use crate::primitives::identity::Identity;
use ed25519_dalek::SecretKey;
use rand::rngs::OsRng;
#[test]
fn service_state() {
let mut csprng = OsRng {};
let keypair = ed25519_dalek::Keypair::generate(&mut csprng);
let _secret_key = SecretKey::from_bytes(&keypair.secret.to_bytes());
let identity = Identity::initialize(keypair);
let _service = Service::init(identity, 9051);
//let mut listen_service = service.listen(10000, TranscriptApp::new_instance()).unwrap_or_else(|_| panic!());
// this will not compile! wish we could test that service.connect(Hostname{},TranscriptApp::new_instance());
}
#[test]
fn service_lifetime() {
let mut csprng = OsRng {};
let keypair = ed25519_dalek::Keypair::generate(&mut csprng);
let _secret_key = SecretKey::from_bytes(&keypair.secret.to_bytes());
let identity = Identity::initialize(keypair);
let _service = Service::init(identity, 9051);
//let listen_service = service.listen(1000, TranscriptApp::new_instance()).unwrap_or_else(|_| panic!());
// TODO use trybuild to test that this fails: service.connect(Hostname{},TranscriptApp::new_instance());
}
}