repliqate/src/main.rs

334 lines
13 KiB
Rust

use std::fs::{remove_file, File};
use std::io::{BufRead, BufReader, Read, Write};
use std::net::TcpListener;
use std::path::Path;
use std::process::{exit, Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread::{sleep, spawn};
use std::time::Duration;
use std::{fs, thread};
pub struct QemuProcess {
child: Box<Child>,
stdout: Arc<Mutex<Vec<u8>>>,
len: usize,
}
const DEBUG : bool = false;
/// Pipe streams are blocking, we need separate threads to monitor them without blocking the primary thread.
fn child_stream_to_vec<R>(stream: R) -> Arc<Mutex<Vec<u8>>>
where
R: Read + Send + 'static,
{
let out = Arc::new(Mutex::new(Vec::new()));
let vec = out.clone();
thread::Builder::new()
.name("child_stream_to_vec".into())
.spawn(move || {
let mut reader = BufReader::new(stream);
loop {
let mut buf = vec![0; 8192];
match reader.read(&mut buf) {
Err(err) => {
println!("Error reading from stream: {}", err);
break;
}
Ok(got) => {
if got == 0 {
break;
} else {
//println!("Waiting for lock");
vec.lock().expect("!lock").extend_from_slice(&buf[0..got]);
//println!("Wrote {} Data", got);
}
}
}
}
println!("stream crashed");
})
.expect("!thread");
out
}
impl QemuProcess {
pub fn new(path: &str) -> QemuProcess {
let mut child = Command::new("qemu-system-x86_64")
.args([
"-nographic",
"-m",
"4096",
"-net",
"nic",
"-net",
"user",
// comment out the below line to disable kvm acceleration
"-accel","kvm",
"-drive",
format!("if=virtio,format=qcow2,file={}", path).as_str(),
"-drive",
"if=virtio,format=qcow2,file=vd.img",
])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn()
.unwrap();
let out = child_stream_to_vec(child.stdout.take().expect("!stdout"));
QemuProcess {
child: Box::new(child),
stdout: out,
len: 0,
}
}
pub fn wait_for_shell(&mut self) {
println!("Starting Qemu...\x1b[1;32m OK \x1b[0m");
let child_stdin = self.child.stdin.as_mut().unwrap();
child_stdin.write_all(format!("\n").as_ref()).unwrap();
child_stdin.flush().expect("could not flush qemu stdin");
print!("waiting for vm to start (this can take up to a minute) ...");
std::io::stdout()
.flush()
.expect("could not flush actual stdout");
loop {
let stdout = self.stdout.lock().unwrap().len();
if self.len > 0 && self.len == stdout {
println!("\x1b[1;32m OK \x1b[0m");
child_stdin.write_all(format!("root\n").as_ref()).unwrap();
child_stdin.flush().expect("could not flush qemu input");
break;
}
sleep(Duration::from_secs(5));
self.len = stdout;
print!(".");
std::io::stdout()
.flush()
.expect("could not flush actual stdout");
}
}
pub fn read_until_shell(&mut self, output: bool) -> Vec<String> {
// wait until attempting to acquire a lock...some programs (especially Java-based) freak out if stdout isn't consumed fast enough...
sleep(Duration::from_millis(2000));
let mut last_length = self.len;
loop {
{
let stdout = self.stdout.lock().unwrap();
let tbd = String::from_utf8(stdout.as_slice()[self.len..stdout.len()].to_vec()).unwrap();
let parts: Vec<String> = tbd.split("\n").map(|x| String::from(x)).collect();
let last = &parts[parts.len() - 1];
let current_length = stdout.len();
if current_length > last_length {
if DEBUG {
println!("{}", tbd);
std::io::stdout().flush().expect("could not flush output");
}
}
// Update length
last_length = current_length;
print!(".");
std::io::stdout().flush().expect("could not flush output");
if last.contains("root@debian:") {
let mut cleaned_lines = vec![];
println!(". \x1b[1;32mOK\x1b[0m");
// skip the last element (which is the shell prompt)
// skip the first element (which is the command)
for part in parts.split_last().unwrap().1.iter().skip(1) {
if output {
println!(" {:?}", part);
}
let rparts: Vec<&str> = part.split("\r").collect();
if rparts.len() == 0 {
cleaned_lines.push(part.clone())
} else if rparts.len() == 1 {
// remove end \r
cleaned_lines.push(String::from(rparts[0]))
} else if rparts.len() == 2 {
// remove bracketed paste \u{1b}[?2004l\r2 from start of line
cleaned_lines.push(String::from(rparts[0]))
} else if rparts.len() == 3 {
// remove bracketed paste \u{1b}[?2004l\r2 from start of line
cleaned_lines.push(String::from(rparts[1]))
}
}
self.len = stdout.len();
return cleaned_lines;
}
}
sleep(Duration::from_millis(2000));
}
}
pub fn execute_command(&mut self, cmd: &str) {
let child_stdin = self.child.stdin.as_mut().unwrap();
child_stdin
.write_all(format!("{}\r\n", cmd).as_ref())
.unwrap();
child_stdin.flush().expect("could not write");
print!("{} ", cmd);
}
pub fn extract(&mut self, file_to_extract: &str) {
let listener = TcpListener::bind("0.0.0.0:13087");
match listener {
Ok(socket) => {
let file_to_extract_clone = String::from(file_to_extract);
let handle = spawn(move || {
println!("setting up listen socket to extract file from guest network");
let conn = socket.accept();
match conn {
Ok((mut conn, _addr)) => {
let mut file = vec![];
conn.read_to_end(&mut file)
.expect("could not read contents of file");
let extracted_file_name = file_to_extract_clone.split("/").last().unwrap_or("extract_file");
fs::write(extracted_file_name, file).expect("could not write extracted file to ");
}
Err(_) => {}
}
});
sleep(Duration::from_secs(1));
self.execute_command(format!("nc -w 3 10.0.2.2 13087 < {}", file_to_extract).as_str());
self.read_until_shell(false);
handle.join().expect("could not join networking thread");
}
Err(err) => {
println!("unable to listen to guest network...{}", err)
}
}
}
pub fn close(&mut self) {
self.child.kill().expect("could not kill qemu process");
}
}
// The output is wrapped in a Result to allow matching on errors
// Returns an Iterator to the Reader of the lines of the file.
fn read_lines<P>(filename: P) -> std::io::Result<std::io::Lines<BufReader<File>>>
where
P: AsRef<Path>,
{
let file = File::open(filename)?;
Ok(BufReader::new(file).lines())
}
fn main() {
let args: Vec<String> = std::env::args().collect();
if args.len() != 3 {
println!("usage: repliqate <cloud-image.qcow2> <build script>");
exit(1);
}
let cloud_image = &args[1];
let build_script = &args[2];
let inuse_cloud_image = format!("inuse-{}", cloud_image);
let _ = remove_file("vd.img");
fs::copy(cloud_image.as_str(), inuse_cloud_image.as_str()).expect("copying of clean image failed...");
Command::new("qemu-img")
.args(["create", "-f", "qcow2", "vd.img", "30G"])
.output()
.expect("error creating secondary harddisk image...");
let mut qemu_process = QemuProcess::new(inuse_cloud_image.as_str());
qemu_process.wait_for_shell();
let mut purge = true;
if let Ok(lines) = read_lines(build_script.as_str()) {
// Consumes the iterator, returns an (Optional) String
for line in lines {
if let Ok(line) = line {
let mut command = line.clone();
let mut output = false;
if line.trim().is_empty() {
continue;
}
if line.trim().starts_with("#") {
// comment - skip
continue;
}
if line.trim().starts_with("@%") {
command = line.replacen("@%", "", 1);
output = true;
}
if line.trim().starts_with("@!") {
command = line.replacen("@!", "", 1);
let parts: Vec<&str> = command.split(" ").collect();
match parts[0] {
"preserve" => {
purge = false;
println!("preserve metacommand activated. Virtual Disk Images will not be purged at end of run")
}
"setup-secondary" => {
qemu_process.execute_command("mkfs -t ext4 /dev/vdb");
qemu_process.read_until_shell(false);
qemu_process.execute_command(" mkdir /mount");
qemu_process.read_until_shell(false);
qemu_process.execute_command("mount -t auto /dev/vdb /mount");
qemu_process.read_until_shell(false);
}
"extract" => {
if parts.len() >= 2 {
qemu_process.extract(parts[1]);
} else {
println!("@!extract requires file parameter");
break;
}
}
"check" => {
if parts.len() != 3 {
println!("check metacommand needs 2 arguments: check file hash")
}
// maybe we need to sleep to let the slow...disk update...
sleep(Duration::from_secs(1));
qemu_process.execute_command(format!("sha512sum {}", parts[1].trim()).as_str());
let sha512result = qemu_process.read_until_shell(false);
println!("{:?}", sha512result);
if sha512result[0].starts_with(parts[2]) {
println!("confirmed build hash of {}", parts[1])
} else {
println!(
"check hashes do not match {}. {} != {}",
parts[1], parts[2], sha512result[0]
);
break;
}
}
x => {
println!("unknown metacommand {}", x);
break;
}
}
} else {
qemu_process.execute_command(command.as_str());
qemu_process.read_until_shell(output);
}
}
}
}
qemu_process.close();
if purge {
println!("cleaning up...");
let _ = remove_file("vd.img");
let _ = remove_file(inuse_cloud_image.as_str());
}
println!("script finished");
}