1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
use std::net::SocketAddr;
use std::{collections::HashMap, error::Error, path::PathBuf, sync::Arc};
use http::{Method, StatusCode};
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tickborg::config;
use tokio::net::TcpListener;
use tracing::{error, info, warn};
#[derive(serde::Serialize, Default)]
struct Attempt {
metadata: Option<serde_json::Value>,
result: Option<serde_json::Value>,
log_url: Option<String>,
}
#[derive(serde::Serialize)]
struct LogResponse {
attempts: HashMap<String, Attempt>,
}
#[derive(Clone)]
struct LogApiConfig {
logs_path: String,
serve_root: String,
}
fn response(status: StatusCode, body: &'static str) -> Response<Full<Bytes>> {
Response::builder()
.status(status)
.body(Full::new(Bytes::from(body)))
.unwrap()
}
fn json_response(status: StatusCode, body: String) -> Response<Full<Bytes>> {
Response::builder()
.status(status)
.header("Content-Type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap()
}
async fn handle_request(
req: Request<hyper::body::Incoming>,
cfg: Arc<LogApiConfig>,
) -> Result<Response<Full<Bytes>>, hyper::Error> {
if req.method() != Method::GET {
return Ok(response(StatusCode::METHOD_NOT_ALLOWED, ""));
}
let uri = req.uri().path().to_string();
let Some(reqd) = uri.strip_prefix("/logs/").map(ToOwned::to_owned) else {
return Ok(response(StatusCode::NOT_FOUND, "invalid uri"));
};
let path: PathBuf = [&cfg.logs_path, &reqd].iter().collect();
let Ok(path) = std::fs::canonicalize(&path) else {
return Ok(response(StatusCode::NOT_FOUND, "absent"));
};
let Ok(iter) = std::fs::read_dir(path) else {
return Ok(response(StatusCode::NOT_FOUND, "non dir"));
};
let mut attempts = HashMap::<String, Attempt>::new();
for e in iter {
let Ok(e) = e else { continue };
let e_metadata = e.metadata();
if e_metadata.as_ref().map(|v| v.is_dir()).unwrap_or(true) {
return Ok(response(StatusCode::INTERNAL_SERVER_ERROR, "dir found"));
}
if e_metadata.as_ref().map(|v| v.is_file()).unwrap_or_default() {
let Ok(file_name) = e.file_name().into_string() else {
warn!("entry filename is not a utf-8 string: {:?}", e.file_name());
continue;
};
if file_name.ends_with(".metadata.json") || file_name.ends_with(".result.json") {
let Ok(file) = std::fs::File::open(e.path()) else {
warn!("could not open file: {file_name}");
continue;
};
let Ok(json) = serde_json::from_reader::<_, serde_json::Value>(file) else {
warn!("file is not a valid json file: {file_name}");
continue;
};
let Some(attempt_id) = json
.get("attempt_id")
.and_then(|v| v.as_str())
.map(ToOwned::to_owned)
else {
warn!("attempt_id not found in file: {file_name}");
continue;
};
let attempt_obj = attempts.entry(attempt_id).or_default();
if file_name.ends_with(".metadata.json") {
attempt_obj.metadata = Some(json);
} else {
attempt_obj.result = Some(json);
}
} else {
let attempt_obj = attempts.entry(file_name.clone()).or_default();
attempt_obj.log_url = Some(format!("{}/{reqd}/{file_name}", &cfg.serve_root));
}
}
}
let body = serde_json::to_string(&LogResponse { attempts }).unwrap_or_default();
Ok(json_response(StatusCode::OK, body))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
tickborg::setup_log();
let arg = std::env::args()
.nth(1)
.unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap()));
let Some(cfg) = config::load(arg.as_ref()).log_api_config else {
error!("No LogApi configuration found!");
panic!();
};
let api_cfg = Arc::new(LogApiConfig {
logs_path: cfg.logs_path,
serve_root: cfg.serve_root,
});
let addr: SocketAddr = cfg.listen.parse()?;
let listener = TcpListener::bind(addr).await?;
info!("Listening on {}", addr);
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
let api_cfg = api_cfg.clone();
tokio::task::spawn(async move {
let service = service_fn(move |req| handle_request(req, api_cfg.clone()));
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
warn!("Error serving connection: {:?}", err);
}
});
}
}
|