Move starting of tasks into a helper struct

This commit is contained in:
Dennis 2024-06-02 16:34:05 +02:00
parent 9fff420a1a
commit 7f7aa3dc2c
No known key found for this signature in database
GPG Key ID: 6937EAEA33A3FA5D
3 changed files with 80 additions and 38 deletions

View File

@ -5,3 +5,4 @@ pub mod osm;
pub mod osm_db; pub mod osm_db;
pub mod osrm_world; pub mod osrm_world;
pub mod scenario_id; pub mod scenario_id;
pub mod task_starter;

View File

@ -0,0 +1,65 @@
use std::{
io::{BufRead, BufReader},
process::{Child, Command, Stdio},
};
#[derive(Default)]
pub struct TaskStarter {
ready: bool,
command: String,
arguments: Vec<String>,
child: Option<Child>,
}
impl TaskStarter {
pub fn new(command: &str) -> Self {
Self {
ready: false,
command: command.into(),
arguments: Vec::new(),
child: None,
}
}
pub fn is_ready(&self) -> bool {
self.ready
}
pub fn arg(&mut self, argument: &str) -> &mut Self {
self.arguments.push(argument.into());
self
}
pub fn spawn_wait_till_ready(&mut self, ready_token: &str) {
// TODO: move the child handling into a convenience struct
let mut command = &mut Command::new(&self.command);
for argument in &self.arguments {
command = command.arg(argument);
}
match command.stdout(Stdio::piped()).spawn() {
Ok(o) => self.child = Some(o),
Err(e) => panic!("cannot spawn task: {e}"),
}
if let Some(output) = &mut self.child.as_mut().unwrap().stdout {
// implement with a timeout
let mut reader = BufReader::new(output);
let mut line = String::new();
while let Ok(_count) = reader.read_line(&mut line) {
// println!("count: {count} ->{line}");
if line.contains(ready_token) {
self.ready = true;
break;
}
}
}
}
}
impl Drop for TaskStarter {
fn drop(&mut self) {
if let Err(e) = self.child.as_mut().expect("can't access child").kill() {
panic!("shutdown failed: {e}");
}
}
}

View File

@ -9,15 +9,14 @@ use common::cli_arguments::Args;
use common::lexicographic_file_walker::LexicographicFileWalker; use common::lexicographic_file_walker::LexicographicFileWalker;
use common::nearest_response::NearestResponse; use common::nearest_response::NearestResponse;
use common::osm::OSMWay; use common::osm::OSMWay;
use common::task_starter::TaskStarter;
use core::panic; use core::panic;
use cucumber::{self, gherkin::Step, given, when, World}; use cucumber::{self, gherkin::Step, given, when, World};
use futures::{future, FutureExt}; use futures::{future, FutureExt};
use geo_types::{point, Point}; use geo_types::{point, Point};
use std::fmt::{format, Display};
use std::fs::{create_dir_all, File}; use std::fs::{create_dir_all, File};
use std::io::{BufRead, BufReader, Read, Write}; use std::io::{Read, Write};
use std::path::PathBuf; use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::time::Duration; use std::time::Duration;
use std::{env, fs}; use std::{env, fs};
use ureq::Agent; use ureq::Agent;
@ -163,11 +162,6 @@ fn request_nearest(world: &mut OSRMWorld, step: &Step) {
println!("{cache_path:?} does not exist"); println!("{cache_path:?} does not exist");
} }
let routed_path = path.join("build").join("osrm-routed");
if !routed_path.exists() {
panic!("osrm-routed binary not found");
}
// parse query data // parse query data
let t = &step.table.as_ref().expect("no query table specified"); let t = &step.table.as_ref().expect("no query table specified");
let test_cases: Vec<_> = t let test_cases: Vec<_> = t
@ -192,36 +186,18 @@ fn request_nearest(world: &mut OSRMWorld, step: &Step) {
.collect(); .collect();
let data_path = cache_path.join(world.scenario_id.to_owned() + ".osrm"); let data_path = cache_path.join(world.scenario_id.to_owned() + ".osrm");
println!("{routed_path:?} {}", data_path.to_str().unwrap());
// TODO: move the child handling into a convenience struct let routed_path = path.join("build").join("osrm-routed");
let mut handle = Command::new(routed_path) if !routed_path.exists() {
.arg(data_path.to_str().unwrap()) panic!("osrm-routed binary not found");
.stdout(Stdio::piped())
.spawn();
let child = match &mut handle {
Ok(o) => o,
Err(e) => panic!("cannot access handle: {e}"),
};
let mut running = false;
if let Some(output) = &mut child.stdout {
// implement with a timeout
let mut reader = BufReader::new(output);
let mut line = String::new();
while let Ok(_count) = reader.read_line(&mut line) {
// println!("count: {count} ->{line}");
if line.contains("running and waiting for requests") {
running = true;
break;
}
}
}
if !running {
panic! {"routed not started"}
} }
// TODO: this should not require a temporary and behave like the API of std::process
let mut task = TaskStarter::new(routed_path.to_str().unwrap());
task.arg(data_path.to_str().unwrap());
task.spawn_wait_till_ready("running and waiting for requests");
assert!(task.is_ready());
// TODO: move to generic http request handling struct // TODO: move to generic http request handling struct
let agent: Agent = ureq::AgentBuilder::new() let agent: Agent = ureq::AgentBuilder::new()
.timeout_read(Duration::from_secs(5)) .timeout_read(Duration::from_secs(5))
@ -260,9 +236,9 @@ fn request_nearest(world: &mut OSRMWorld, step: &Step) {
assert!(approx_equal(result_location.y(), expected_location.y(), 5)); assert!(approx_equal(result_location.y(), expected_location.y(), 5));
} }
if let Err(e) = child.kill() { // if let Err(e) = child.kill() {
panic!("shutdown failed: {e}"); // panic!("shutdown failed: {e}");
} // }
} }
pub fn approx_equal(a: f64, b: f64, dp: u8) -> bool { pub fn approx_equal(a: f64, b: f64, dp: u8) -> bool {