diff options
Diffstat (limited to 'ofborg/tickborg/src')
56 files changed, 9642 insertions, 0 deletions
diff --git a/ofborg/tickborg/src/acl.rs b/ofborg/tickborg/src/acl.rs new file mode 100644 index 0000000000..2059b3e08f --- /dev/null +++ b/ofborg/tickborg/src/acl.rs @@ -0,0 +1,59 @@ +use crate::systems::System; + +pub struct Acl { + trusted_users: Option<Vec<String>>, + repos: Vec<String>, +} + +impl Acl { + pub fn new(repos: Vec<String>, mut trusted_users: Option<Vec<String>>) -> Acl { + if let Some(ref mut users) = trusted_users { + for user in users.iter_mut() { + *user = user.to_lowercase(); + } + } + + Acl { + trusted_users, + repos, + } + } + + pub fn is_repo_eligible(&self, name: &str) -> bool { + self.repos.contains(&name.to_lowercase()) + } + + pub fn build_job_architectures_for_user_repo(&self, user: &str, repo: &str) -> Vec<System> { + if self.can_build_unrestricted(user, repo) { + System::all_known_systems() + } else { + // Non-trusted users can only build on primary platforms + System::primary_systems() + } + } + + pub fn build_job_destinations_for_user_repo( + &self, + user: &str, + repo: &str, + ) -> Vec<(Option<String>, Option<String>)> { + self.build_job_architectures_for_user_repo(user, repo) + .iter() + .map(|system| system.as_build_destination()) + .collect() + } + + pub fn can_build_unrestricted(&self, user: &str, repo: &str) -> bool { + if let Some(ref users) = self.trusted_users { + if repo.to_lowercase().starts_with("project-tick/") { + users.contains(&user.to_lowercase()) + } else { + false + } + } else { + // If trusted_users is disabled (and thus None), everybody can build + // unrestricted + true + } + } +} diff --git a/ofborg/tickborg/src/asynccmd.rs b/ofborg/tickborg/src/asynccmd.rs new file mode 100644 index 0000000000..52cd20da8b --- /dev/null +++ b/ofborg/tickborg/src/asynccmd.rs @@ -0,0 +1,293 @@ +use std::collections::HashMap; +use std::io::{self, BufRead, BufReader, Read}; +use std::process::{Child, Command, ExitStatus, Stdio}; +use std::sync::mpsc::{self, Receiver, SyncSender, sync_channel}; +use std::thread::{self, JoinHandle}; + +use tracing::{debug, error, info}; + +// Specifically set to fall under 1/2 of the AMQP library's +// SyncSender limitation. +const OUT_CHANNEL_BUFFER_SIZE: usize = 30; + +// The waiter channel should never be over 3 items: process, stderr, +// stdout, and thusly probably could be unbounded just fine, but what +// the heck. +const WAITER_CHANNEL_BUFFER_SIZE: usize = 10; + +pub struct AsyncCmd { + command: Command, +} + +pub struct SpawnedAsyncCmd { + waiter: JoinHandle<Option<Result<ExitStatus, io::Error>>>, + rx: Receiver<String>, +} + +#[derive(Debug, Hash, PartialEq, Eq)] +enum WaitTarget { + Stderr, + Stdout, + Child, +} + +#[derive(Debug)] +enum WaitResult<T> { + Thread(thread::Result<T>), + Process(Result<ExitStatus, io::Error>), +} + +fn reader_tx<R: 'static + Read + Send>(read: R, tx: SyncSender<String>) -> thread::JoinHandle<()> { + let read = BufReader::new(read); + + thread::spawn(move || { + for line in read.lines() { + let to_send: String = match line { + Ok(line) => line, + Err(e) => { + error!("Error reading data in reader_tx: {:?}", e); + "Non-UTF8 data omitted from the log.".to_owned() + } + }; + + if let Err(e) = tx.send(to_send) { + error!("Failed to send log line: {:?}", e); + } + } + }) +} + +fn spawn_join<T: Send + 'static>( + id: WaitTarget, + tx: SyncSender<(WaitTarget, WaitResult<T>)>, + waiting_on: thread::JoinHandle<T>, +) -> thread::JoinHandle<()> { + thread::spawn(move || { + if let Err(e) = tx.send((id, WaitResult::Thread(waiting_on.join()))) { + error!("Failed to send message to the thread waiter: {:?}", e); + } + }) +} + +fn child_wait<T: Send + 'static>( + id: WaitTarget, + tx: SyncSender<(WaitTarget, WaitResult<T>)>, + mut waiting_on: Child, +) -> thread::JoinHandle<()> { + thread::spawn(move || { + if let Err(e) = tx.send((id, WaitResult::Process(waiting_on.wait()))) { + error!("Failed to send message to the thread waiter: {:?}", e); + } + }) +} + +impl AsyncCmd { + pub fn new(cmd: Command) -> AsyncCmd { + AsyncCmd { command: cmd } + } + + pub fn spawn(mut self) -> SpawnedAsyncCmd { + let mut child = self + .command + .stdin(Stdio::null()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let (monitor_tx, monitor_rx) = sync_channel(WAITER_CHANNEL_BUFFER_SIZE); + let (proc_tx, proc_rx) = sync_channel(OUT_CHANNEL_BUFFER_SIZE); + + let mut waiters: HashMap<WaitTarget, thread::JoinHandle<()>> = HashMap::with_capacity(3); + waiters.insert( + WaitTarget::Stderr, + spawn_join( + WaitTarget::Stderr, + monitor_tx.clone(), + reader_tx(child.stderr.take().unwrap(), proc_tx.clone()), + ), + ); + + waiters.insert( + WaitTarget::Stdout, + spawn_join( + WaitTarget::Stdout, + monitor_tx.clone(), + reader_tx(child.stdout.take().unwrap(), proc_tx), + ), + ); + + waiters.insert( + WaitTarget::Child, + child_wait(WaitTarget::Child, monitor_tx, child), + ); + + let head_waiter = thread::spawn(move || block_on_waiters(monitor_rx, waiters)); + + SpawnedAsyncCmd { + waiter: head_waiter, + rx: proc_rx, + } + } +} + +impl SpawnedAsyncCmd { + pub fn lines(&mut self) -> mpsc::Iter<'_, String> { + self.rx.iter() + } + + pub fn get_next_line(&mut self) -> Result<String, mpsc::RecvError> { + self.rx.recv() + } + + pub fn wait(self) -> Result<ExitStatus, io::Error> { + self.waiter + .join() + .map_err(|_err| io::Error::other("Couldn't join thread.")) + .and_then(|opt| { + opt.ok_or_else(|| io::Error::other("Thread didn't return an exit status.")) + }) + .and_then(|res| res) + } +} + +// FIXME: remove with rust/cargo update +#[allow(clippy::cognitive_complexity)] +fn block_on_waiters( + monitor_rx: mpsc::Receiver<(WaitTarget, WaitResult<()>)>, + mut waiters: HashMap<WaitTarget, thread::JoinHandle<()>>, +) -> Option<Result<ExitStatus, io::Error>> { + let mut status = None; + + for (id, interior_result) in monitor_rx.iter() { + match waiters.remove(&id) { + Some(handle) => { + info!("Received notice that {:?} finished", id); + let waiter_result = handle.join(); + + info!("waiter status: {:?}", waiter_result); + info!("interior status: {:?}", interior_result); + + match interior_result { + WaitResult::Thread(t) => { + debug!("thread result: {:?}", t); + } + WaitResult::Process(t) => { + status = Some(t); + } + } + } + None => { + error!( + "Received notice that {:?} finished, but it isn't being waited on?", + id + ); + } + } + + if waiters.is_empty() { + debug!("Closing up the waiter receiver thread, no more waiters."); + break; + } + } + + info!( + "Out of the child waiter recv, with {:?} remaining waits", + waiters.len() + ); + + status +} + +#[cfg(test)] +mod tests { + use super::AsyncCmd; + use std::ffi::{OsStr, OsString}; + use std::os::unix::ffi::OsStrExt; + use std::process::Command; + + #[test] + fn basic_echo_test() { + let mut cmd = Command::new("/bin/sh"); + cmd.arg("-c"); + cmd.arg("echo hi"); + let acmd = AsyncCmd::new(cmd); + + let mut spawned = acmd.spawn(); + let lines: Vec<String> = spawned.lines().collect(); + assert_eq!(lines, vec!["hi"]); + let exit_status = spawned.wait().unwrap(); + assert!(exit_status.success()); + } + + #[test] + fn basic_interpolation_test() { + let mut cmd = Command::new("stdbuf"); + cmd.arg("-o0"); + cmd.arg("-e0"); + cmd.arg("bash"); + cmd.arg("-c"); + + // The sleep 0's are to introduce delay between output to help + // make it more predictably received in the right order + cmd.arg("echo stdout; sleep 0.1; echo stderr >&2; sleep 0.1; echo stdout2; sleep 0.1; echo stderr2 >&2"); + let acmd = AsyncCmd::new(cmd); + + let mut spawned = acmd.spawn(); + let lines: Vec<String> = spawned.lines().collect(); + assert_eq!(lines, vec!["stdout", "stderr", "stdout2", "stderr2"]); + let exit_status = spawned.wait().unwrap(); + assert!(exit_status.success()); + } + + #[test] + fn lots_of_small_ios_test() { + let mut cmd = Command::new("/bin/sh"); + cmd.arg("-c"); + cmd.arg("for i in `seq 1 100`; do (seq 1 100)& (seq 1 100 >&2)& wait; wait; done"); + let acmd = AsyncCmd::new(cmd); + + let mut spawned = acmd.spawn(); + let lines: Vec<String> = spawned.lines().collect(); + assert_eq!(lines.len(), 20000); + let thread_result = spawned.wait(); + let exit_status = thread_result.expect("Thread should exit correctly"); + assert!(exit_status.success()); + } + + #[test] + fn lots_of_io_test() { + let mut cmd = Command::new("/bin/sh"); + cmd.arg("-c"); + cmd.arg("seq 1 100000; seq 1 100000 >&2"); + let acmd = AsyncCmd::new(cmd); + + let mut spawned = acmd.spawn(); + let lines: Vec<String> = spawned.lines().collect(); + assert_eq!(lines.len(), 200000); + let thread_result = spawned.wait(); + let exit_status = thread_result.expect("Thread should exit correctly"); + assert!(exit_status.success()); + } + + #[test] + fn bad_utf8_test() { + let mut echos = OsString::from("echo hi; echo "); + echos.push(OsStr::from_bytes(&[0xffu8])); + echos.push("; echo there;"); + + let mut cmd = Command::new("/bin/sh"); + cmd.arg("-c"); + cmd.arg(echos); + let acmd = AsyncCmd::new(cmd); + + let mut spawned = acmd.spawn(); + let lines: Vec<String> = spawned.lines().collect(); + assert_eq!( + lines, + vec!["hi", "Non-UTF8 data omitted from the log.", "there"] + ); + let exit_status = spawned.wait().unwrap(); + assert!(exit_status.success()); + } +} diff --git a/ofborg/tickborg/src/bin/build-faker.rs b/ofborg/tickborg/src/bin/build-faker.rs new file mode 100644 index 0000000000..df8fcbfa50 --- /dev/null +++ b/ofborg/tickborg/src/bin/build-faker.rs @@ -0,0 +1,62 @@ +use lapin::message::Delivery; +use std::env; +use std::error::Error; + +use tickborg::commentparser; +use tickborg::config; +use tickborg::easylapin; +use tickborg::message::{Pr, Repo, buildjob}; +use tickborg::notifyworker::NotificationReceiver; +use tickborg::worker; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + tickborg::setup_log(); + + let arg = env::args().nth(1).expect("usage: build-faker <config>"); + let cfg = config::load(arg.as_ref()); + + let conn = easylapin::from_config(&cfg.builder.unwrap().rabbitmq).await?; + let chan = conn.create_channel().await?; + + let repo_msg = Repo { + clone_url: "https://github.com/project-tick/Project-Tick.git".to_owned(), + full_name: "project-tick/Project-Tick".to_owned(), + owner: "project-tick".to_owned(), + name: "Project-Tick".to_owned(), + }; + + let pr_msg = Pr { + number: 42, + head_sha: "6dd9f0265d52b946dd13daf996f30b64e4edb446".to_owned(), + target_branch: Some("scratch".to_owned()), + }; + + let logbackrk = "project-tick/Project-Tick.42".to_owned(); + + let msg = buildjob::BuildJob { + repo: repo_msg, + pr: pr_msg, + subset: Some(commentparser::Subset::Project), + attrs: vec!["success".to_owned()], + logs: Some((Some("logs".to_owned()), Some(logbackrk.to_lowercase()))), + statusreport: Some((None, Some("scratch".to_owned()))), + request_id: "bogus-request-id".to_owned(), + }; + + { + let deliver = Delivery::mock(0, "no-exchange".into(), "".into(), false, vec![]); + let recv = easylapin::ChannelNotificationReceiver::new(chan.clone(), deliver); + + for _i in 1..2 { + recv.tell(worker::publish_serde_action( + None, + Some("build-inputs-x86_64-darwin".to_owned()), + &msg, + )) + .await; + } + } + + Ok(()) +} diff --git a/ofborg/tickborg/src/bin/builder.rs b/ofborg/tickborg/src/bin/builder.rs new file mode 100644 index 0000000000..5930ae5696 --- /dev/null +++ b/ofborg/tickborg/src/bin/builder.rs @@ -0,0 +1,116 @@ +use std::env; +use std::error::Error; +use std::future::Future; +use std::path::Path; +use std::pin::Pin; + +use futures_util::future; +use tracing::{error, info, warn}; + +use tickborg::easyamqp::{self, ChannelExt, ConsumerExt}; +use tickborg::easylapin; +use tickborg::{checkout, config, tasks}; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + tickborg::setup_log(); + + let arg = env::args() + .nth(1) + .unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap())); + let cfg = config::load(arg.as_ref()); + + let Some(builder_cfg) = config::load(arg.as_ref()).builder else { + error!("No builder configuration found!"); + panic!(); + }; + + let conn = easylapin::from_config(&builder_cfg.rabbitmq).await?; + let mut handles: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> = Vec::new(); + + for system in &cfg.build.system { + handles.push(self::create_handle(&conn, &cfg, system.to_string()).await?); + } + + future::join_all(handles).await; + + drop(conn); // Close connection. + info!("Closed the session... EOF"); + Ok(()) +} + +#[allow(clippy::type_complexity)] +async fn create_handle( + conn: &lapin::Connection, + cfg: &config::Config, + system: String, +) -> Result<Pin<Box<dyn Future<Output = ()> + Send>>, Box<dyn Error>> { + let mut chan = conn.create_channel().await?; + + let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root)); + let build_executor = cfg.build_executor(); + + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "build-jobs".to_owned(), + exchange_type: easyamqp::ExchangeType::Fanout, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .await?; + + let queue_name = if cfg.runner.build_all_jobs != Some(true) { + let queue_name = format!("build-inputs-{system}"); + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.clone(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + queue_name + } else { + warn!("Building all jobs, please don't use this unless you're"); + warn!("developing and have Graham's permission!"); + let queue_name = "".to_owned(); + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.clone(), + passive: false, + durable: false, + exclusive: true, + auto_delete: true, + no_wait: false, + }) + .await?; + queue_name + }; + + chan.bind_queue(easyamqp::BindQueueConfig { + queue: queue_name.clone(), + exchange: "build-jobs".to_owned(), + routing_key: None, + no_wait: false, + }) + .await?; + + let handle = easylapin::NotifyChannel(chan) + .consume( + tasks::build::BuildWorker::new(cloner, build_executor, system, cfg.runner.identity.clone()), + easyamqp::ConsumeConfig { + queue: queue_name.clone(), + consumer_tag: format!("{}-builder", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + }, + ) + .await?; + + info!("Fetching jobs from {}", &queue_name); + Ok(handle) +} diff --git a/ofborg/tickborg/src/bin/evaluation-filter.rs b/ofborg/tickborg/src/bin/evaluation-filter.rs new file mode 100644 index 0000000000..f6fba8b63e --- /dev/null +++ b/ofborg/tickborg/src/bin/evaluation-filter.rs @@ -0,0 +1,88 @@ +use std::env; +use std::error::Error; + +use tracing::{error, info}; + +use tickborg::config; +use tickborg::easyamqp::{self, ChannelExt, ConsumerExt}; +use tickborg::easylapin; +use tickborg::tasks; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + tickborg::setup_log(); + + let arg = env::args() + .nth(1) + .unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap())); + let cfg = config::load(arg.as_ref()); + + let Some(filter_cfg) = config::load(arg.as_ref()).evaluation_filter else { + error!("No evaluation filter configuration found!"); + panic!(); + }; + + let conn = easylapin::from_config(&filter_cfg.rabbitmq).await?; + let mut chan = conn.create_channel().await?; + + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "github-events".to_owned(), + exchange_type: easyamqp::ExchangeType::Topic, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .await?; + + chan.declare_queue(easyamqp::QueueConfig { + queue: "mass-rebuild-check-jobs".to_owned(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + + let queue_name = String::from("mass-rebuild-check-inputs"); + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.clone(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + + chan.bind_queue(easyamqp::BindQueueConfig { + queue: queue_name.clone(), + exchange: "github-events".to_owned(), + routing_key: Some("pull_request.project-tick/*".to_owned()), + no_wait: false, + }) + .await?; + + let handle = easylapin::WorkerChannel(chan) + .consume( + tasks::evaluationfilter::EvaluationFilterWorker::new(cfg.acl()), + easyamqp::ConsumeConfig { + queue: queue_name.clone(), + consumer_tag: format!("{}-evaluation-filter", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + }, + ) + .await?; + + info!("Fetching jobs from {}", &queue_name); + handle.await; + + drop(conn); // Close connection. + info!("Closed the session... EOF"); + Ok(()) +} diff --git a/ofborg/tickborg/src/bin/github-comment-filter.rs b/ofborg/tickborg/src/bin/github-comment-filter.rs new file mode 100644 index 0000000000..5240fba8cb --- /dev/null +++ b/ofborg/tickborg/src/bin/github-comment-filter.rs @@ -0,0 +1,114 @@ +use std::env; +use std::error::Error; + +use tickborg::systems::System; +use tracing::{error, info}; + +use tickborg::config; +use tickborg::easyamqp::{self, ChannelExt, ConsumerExt}; +use tickborg::easylapin; +use tickborg::tasks; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + tickborg::setup_log(); + + let arg = env::args() + .nth(1) + .unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap())); + let cfg = config::load(arg.as_ref()); + + let Some(filter_cfg) = config::load(arg.as_ref()).github_comment_filter else { + error!("No comment filter configuration found!"); + panic!(); + }; + + let conn = easylapin::from_config(&filter_cfg.rabbitmq).await?; + let mut chan = conn.create_channel().await?; + + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "github-events".to_owned(), + exchange_type: easyamqp::ExchangeType::Topic, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .await?; + + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "build-jobs".to_owned(), + exchange_type: easyamqp::ExchangeType::Fanout, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .await?; + + let queue_name = "build-inputs"; + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.to_owned(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + + chan.bind_queue(easyamqp::BindQueueConfig { + queue: "build-inputs".to_owned(), + exchange: "github-events".to_owned(), + routing_key: Some("issue_comment.*".to_owned()), + no_wait: false, + }) + .await?; + + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "build-results".to_owned(), + exchange_type: easyamqp::ExchangeType::Fanout, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .await?; + + // Create build job queues + for sys in System::all_known_systems().iter().map(System::to_string) { + chan.declare_queue(easyamqp::QueueConfig { + queue: format!("build-inputs-{sys}"), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + } + + let handle = easylapin::WorkerChannel(chan) + .consume( + tasks::githubcommentfilter::GitHubCommentWorker::new(cfg.acl(), cfg.github()), + easyamqp::ConsumeConfig { + queue: "build-inputs".to_owned(), + consumer_tag: format!("{}-github-comment-filter", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + }, + ) + .await?; + + info!("Fetching jobs from {}", &queue_name); + handle.await; + + drop(conn); // Close connection. + info!("Closed the session... EOF"); + Ok(()) +} diff --git a/ofborg/tickborg/src/bin/github-comment-poster.rs b/ofborg/tickborg/src/bin/github-comment-poster.rs new file mode 100644 index 0000000000..5c45d8f546 --- /dev/null +++ b/ofborg/tickborg/src/bin/github-comment-poster.rs @@ -0,0 +1,76 @@ +use std::env; +use std::error::Error; + +use tracing::{error, info}; + +use tickborg::config; +use tickborg::easyamqp::{self, ChannelExt, ConsumerExt}; +use tickborg::easylapin; +use tickborg::tasks; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + tickborg::setup_log(); + + let arg = env::args() + .nth(1) + .unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap())); + let cfg = config::load(arg.as_ref()); + + let Some(poster_cfg) = config::load(arg.as_ref()).github_comment_poster else { + error!("No comment poster configuration found!"); + panic!(); + }; + + let conn = easylapin::from_config(&poster_cfg.rabbitmq).await?; + let mut chan = conn.create_channel().await?; + + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "build-results".to_owned(), + exchange_type: easyamqp::ExchangeType::Fanout, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .await?; + + chan.declare_queue(easyamqp::QueueConfig { + queue: "build-results".to_owned(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + + chan.bind_queue(easyamqp::BindQueueConfig { + queue: "build-results".to_owned(), + exchange: "build-results".to_owned(), + routing_key: None, + no_wait: false, + }) + .await?; + + let handle = easylapin::WorkerChannel(chan) + .consume( + tasks::githubcommentposter::GitHubCommentPoster::new(cfg.github_app_vendingmachine()), + easyamqp::ConsumeConfig { + queue: "build-results".to_owned(), + consumer_tag: format!("{}-github-comment-poster", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + }, + ) + .await?; + + handle.await; + + drop(conn); // Close connection. + info!("Closed the session... EOF"); + Ok(()) +} diff --git a/ofborg/tickborg/src/bin/github-webhook-receiver.rs b/ofborg/tickborg/src/bin/github-webhook-receiver.rs new file mode 100644 index 0000000000..910cd4b350 --- /dev/null +++ b/ofborg/tickborg/src/bin/github-webhook-receiver.rs @@ -0,0 +1,278 @@ +use std::env; +use std::error::Error; +use std::net::SocketAddr; +use std::sync::Arc; + +use hmac::{Hmac, KeyInit as _, Mac}; +use http::{Method, StatusCode}; +use http_body_util::{BodyExt, Full}; +use hyper::body::{Bytes, Incoming}; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; +use lapin::options::BasicPublishOptions; +use lapin::{BasicProperties, Channel}; +use tickborg::ghevent::GenericWebhook; +use tickborg::{config, easyamqp, easyamqp::ChannelExt, easylapin}; +use sha2::Sha256; +use tokio::net::TcpListener; +use tokio::sync::Mutex; +use tracing::{error, info, warn}; + +/// Prepares the the exchange we will write to, the queues that are bound to it +/// and binds them. +async fn setup_amqp(chan: &mut Channel) -> Result<(), Box<dyn Error + Send + Sync>> { + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "github-events".to_owned(), + exchange_type: easyamqp::ExchangeType::Topic, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .await?; + + let queue_name = String::from("build-inputs"); + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.clone(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + chan.bind_queue(easyamqp::BindQueueConfig { + queue: queue_name.clone(), + exchange: "github-events".to_owned(), + routing_key: Some(String::from("issue_comment.*")), + no_wait: false, + }) + .await?; + + let queue_name = String::from("github-events-unknown"); + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.clone(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + chan.bind_queue(easyamqp::BindQueueConfig { + queue: queue_name.clone(), + exchange: "github-events".to_owned(), + routing_key: Some(String::from("unknown.*")), + no_wait: false, + }) + .await?; + + let queue_name = String::from("mass-rebuild-check-inputs"); + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.clone(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + chan.bind_queue(easyamqp::BindQueueConfig { + queue: queue_name.clone(), + exchange: "github-events".to_owned(), + routing_key: Some(String::from("pull_request.*")), + no_wait: false, + }) + .await?; + Ok(()) +} + +fn response(status: StatusCode, body: &'static str) -> Response<Full<Bytes>> { + Response::builder() + .status(status) + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +fn empty_response(status: StatusCode) -> Response<Full<Bytes>> { + Response::builder() + .status(status) + .body(Full::new(Bytes::new())) + .unwrap() +} + +async fn handle_request( + req: Request<Incoming>, + webhook_secret: Arc<String>, + chan: Arc<Mutex<Channel>>, +) -> Result<Response<Full<Bytes>>, hyper::Error> { + // HTTP 405 + if req.method() != Method::POST { + return Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED)); + } + + // Get headers before consuming body + let sig_header = req + .headers() + .get("X-Hub-Signature-256") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + let event_type = req + .headers() + .get("X-Github-Event") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + let content_type = req + .headers() + .get("Content-Type") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + // Read body + let raw = match req.collect().await { + Ok(collected) => collected.to_bytes(), + Err(e) => { + warn!("Failed to read body from client: {e}"); + return Ok(response( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to read body", + )); + } + }; + + // Validate signature + let Some(sig) = sig_header else { + return Ok(response( + StatusCode::BAD_REQUEST, + "Missing signature header", + )); + }; + let mut components = sig.splitn(2, '='); + let Some(algo) = components.next() else { + return Ok(response( + StatusCode::BAD_REQUEST, + "Signature hash method missing", + )); + }; + let Some(hash) = components.next() else { + return Ok(response(StatusCode::BAD_REQUEST, "Signature hash missing")); + }; + let Ok(hash) = hex::decode(hash) else { + return Ok(response( + StatusCode::BAD_REQUEST, + "Invalid signature hash hex", + )); + }; + + if algo != "sha256" { + return Ok(response( + StatusCode::BAD_REQUEST, + "Invalid signature hash method", + )); + } + + let Ok(mut mac) = Hmac::<Sha256>::new_from_slice(webhook_secret.as_bytes()) else { + error!("Unable to create HMAC from secret"); + return Ok(response( + StatusCode::INTERNAL_SERVER_ERROR, + "Internal error", + )); + }; + mac.update(&raw); + if mac.verify_slice(hash.as_slice()).is_err() { + return Ok(response( + StatusCode::BAD_REQUEST, + "Signature verification failed", + )); + } + + // Parse body + let Some(ct) = content_type else { + return Ok(response( + StatusCode::BAD_REQUEST, + "No Content-Type header passed", + )); + }; + if !ct.contains("application/json") { + return Ok(response( + StatusCode::BAD_REQUEST, + "Content-Type is not application/json. Webhook misconfigured?", + )); + } + + let input = match serde_json::from_slice::<GenericWebhook>(&raw) { + Ok(i) => i, + Err(e) => { + error!("Invalid JSON received: {e}"); + return Ok(response(StatusCode::BAD_REQUEST, "Invalid JSON")); + } + }; + + // Build routing key + let Some(event_type) = event_type else { + return Ok(response(StatusCode::BAD_REQUEST, "Missing event type")); + }; + let routing_key = format!("{event_type}.{}", input.repository.full_name.to_lowercase()); + + // Publish message + let chan = chan.lock().await; + let _confirmation = chan + .basic_publish( + "github-events".into(), + routing_key.as_str().into(), + BasicPublishOptions::default(), + &raw, + BasicProperties::default() + .with_content_type("application/json".into()) + .with_delivery_mode(2), // persistent + ) + .await; + + Ok(empty_response(StatusCode::NO_CONTENT)) +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { + tickborg::setup_log(); + + let arg = env::args() + .nth(1) + .unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap())); + let Some(cfg) = config::load(arg.as_ref()).github_webhook_receiver else { + error!("No GitHub Webhook configuration found!"); + panic!(); + }; + + let webhook_secret = std::fs::read_to_string(cfg.webhook_secret_file) + .expect("Unable to read webhook secret file"); + let webhook_secret = Arc::new(webhook_secret.trim().to_string()); + + let conn = easylapin::from_config(&cfg.rabbitmq).await?; + let mut chan = conn.create_channel().await?; + setup_amqp(&mut chan).await?; + let chan = Arc::new(Mutex::new(chan)); + + let addr: SocketAddr = cfg.listen.parse()?; + let listener = TcpListener::bind(addr).await?; + info!("Listening on {}", addr); + + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + + let webhook_secret = webhook_secret.clone(); + let chan = chan.clone(); + + tokio::task::spawn(async move { + let service = + service_fn(move |req| handle_request(req, webhook_secret.clone(), chan.clone())); + + if let Err(err) = http1::Builder::new().serve_connection(io, service).await { + warn!("Error serving connection: {:?}", err); + } + }); + } +} diff --git a/ofborg/tickborg/src/bin/log-message-collector.rs b/ofborg/tickborg/src/bin/log-message-collector.rs new file mode 100644 index 0000000000..728a2d7f4d --- /dev/null +++ b/ofborg/tickborg/src/bin/log-message-collector.rs @@ -0,0 +1,83 @@ +use std::env; +use std::error::Error; +use std::path::PathBuf; + +use tracing::{error, info}; + +use tickborg::config; +use tickborg::easyamqp::{self, ChannelExt, ConsumerExt}; +use tickborg::easylapin; +use tickborg::tasks; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + tickborg::setup_log(); + + let arg = env::args() + .nth(1) + .unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap())); + let cfg = config::load(arg.as_ref()); + + let Some(collector_cfg) = config::load(arg.as_ref()).log_message_collector else { + error!("No log message collector configuration found!"); + panic!(); + }; + + let conn = easylapin::from_config(&collector_cfg.rabbitmq).await?; + let mut chan = conn.create_channel().await?; + + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "logs".to_owned(), + exchange_type: easyamqp::ExchangeType::Topic, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .await?; + + let queue_name = "logs".to_owned(); + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.clone(), + passive: false, + durable: false, + exclusive: true, + auto_delete: true, + no_wait: false, + }) + .await?; + + chan.bind_queue(easyamqp::BindQueueConfig { + queue: queue_name.clone(), + exchange: "logs".to_owned(), + routing_key: Some("*.*".to_owned()), + no_wait: false, + }) + .await?; + + // Regular channel, we want prefetching here. + let handle = chan + .consume( + tasks::log_message_collector::LogMessageCollector::new( + PathBuf::from(collector_cfg.logs_path), + 100, + ), + easyamqp::ConsumeConfig { + queue: queue_name.clone(), + consumer_tag: format!("{}-log-collector", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + }, + ) + .await?; + + info!("Fetching jobs from {}", &queue_name); + handle.await; + + drop(conn); // Close connection. + info!("Closed the session... EOF"); + Ok(()) +} diff --git a/ofborg/tickborg/src/bin/logapi.rs b/ofborg/tickborg/src/bin/logapi.rs new file mode 100644 index 0000000000..c1750b76e7 --- /dev/null +++ b/ofborg/tickborg/src/bin/logapi.rs @@ -0,0 +1,151 @@ +use std::net::SocketAddr; +use std::{collections::HashMap, error::Error, path::PathBuf, sync::Arc}; + +use http::{Method, StatusCode}; +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; +use tickborg::config; +use tokio::net::TcpListener; +use tracing::{error, info, warn}; + +#[derive(serde::Serialize, Default)] +struct Attempt { + metadata: Option<serde_json::Value>, + result: Option<serde_json::Value>, + log_url: Option<String>, +} + +#[derive(serde::Serialize)] +struct LogResponse { + attempts: HashMap<String, Attempt>, +} + +#[derive(Clone)] +struct LogApiConfig { + logs_path: String, + serve_root: String, +} + +fn response(status: StatusCode, body: &'static str) -> Response<Full<Bytes>> { + Response::builder() + .status(status) + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +fn json_response(status: StatusCode, body: String) -> Response<Full<Bytes>> { + Response::builder() + .status(status) + .header("Content-Type", "application/json") + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +async fn handle_request( + req: Request<hyper::body::Incoming>, + cfg: Arc<LogApiConfig>, +) -> Result<Response<Full<Bytes>>, hyper::Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::METHOD_NOT_ALLOWED, "")); + } + + let uri = req.uri().path().to_string(); + let Some(reqd) = uri.strip_prefix("/logs/").map(ToOwned::to_owned) else { + return Ok(response(StatusCode::NOT_FOUND, "invalid uri")); + }; + let path: PathBuf = [&cfg.logs_path, &reqd].iter().collect(); + let Ok(path) = std::fs::canonicalize(&path) else { + return Ok(response(StatusCode::NOT_FOUND, "absent")); + }; + let Ok(iter) = std::fs::read_dir(path) else { + return Ok(response(StatusCode::NOT_FOUND, "non dir")); + }; + + let mut attempts = HashMap::<String, Attempt>::new(); + for e in iter { + let Ok(e) = e else { continue }; + let e_metadata = e.metadata(); + if e_metadata.as_ref().map(|v| v.is_dir()).unwrap_or(true) { + return Ok(response(StatusCode::INTERNAL_SERVER_ERROR, "dir found")); + } + + if e_metadata.as_ref().map(|v| v.is_file()).unwrap_or_default() { + let Ok(file_name) = e.file_name().into_string() else { + warn!("entry filename is not a utf-8 string: {:?}", e.file_name()); + continue; + }; + + if file_name.ends_with(".metadata.json") || file_name.ends_with(".result.json") { + let Ok(file) = std::fs::File::open(e.path()) else { + warn!("could not open file: {file_name}"); + continue; + }; + let Ok(json) = serde_json::from_reader::<_, serde_json::Value>(file) else { + warn!("file is not a valid json file: {file_name}"); + continue; + }; + let Some(attempt_id) = json + .get("attempt_id") + .and_then(|v| v.as_str()) + .map(ToOwned::to_owned) + else { + warn!("attempt_id not found in file: {file_name}"); + continue; + }; + let attempt_obj = attempts.entry(attempt_id).or_default(); + if file_name.ends_with(".metadata.json") { + attempt_obj.metadata = Some(json); + } else { + attempt_obj.result = Some(json); + } + } else { + let attempt_obj = attempts.entry(file_name.clone()).or_default(); + attempt_obj.log_url = Some(format!("{}/{reqd}/{file_name}", &cfg.serve_root)); + } + } + } + + let body = serde_json::to_string(&LogResponse { attempts }).unwrap_or_default(); + Ok(json_response(StatusCode::OK, body)) +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { + tickborg::setup_log(); + + let arg = std::env::args() + .nth(1) + .unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap())); + let Some(cfg) = config::load(arg.as_ref()).log_api_config else { + error!("No LogApi configuration found!"); + panic!(); + }; + + let api_cfg = Arc::new(LogApiConfig { + logs_path: cfg.logs_path, + serve_root: cfg.serve_root, + }); + + let addr: SocketAddr = cfg.listen.parse()?; + let listener = TcpListener::bind(addr).await?; + info!("Listening on {}", addr); + + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + + let api_cfg = api_cfg.clone(); + + tokio::task::spawn(async move { + let service = service_fn(move |req| handle_request(req, api_cfg.clone())); + + if let Err(err) = http1::Builder::new().serve_connection(io, service).await { + warn!("Error serving connection: {:?}", err); + } + }); + } +} diff --git a/ofborg/tickborg/src/bin/mass-rebuilder.rs b/ofborg/tickborg/src/bin/mass-rebuilder.rs new file mode 100644 index 0000000000..0d5fdb0127 --- /dev/null +++ b/ofborg/tickborg/src/bin/mass-rebuilder.rs @@ -0,0 +1,73 @@ +use std::env; +use std::error::Error; +use std::path::Path; + +use tracing::{error, info}; + +use tickborg::checkout; +use tickborg::config; +use tickborg::easyamqp::{self, ChannelExt, ConsumerExt}; +use tickborg::easylapin; +use tickborg::stats; +use tickborg::tasks; + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + tickborg::setup_log(); + + let arg = env::args() + .nth(1) + .unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap())); + let cfg = config::load(arg.as_ref()); + + let Some(rebuilder_cfg) = config::load(arg.as_ref()).mass_rebuilder else { + error!("No mass rebuilder configuration found!"); + panic!(); + }; + + let conn = easylapin::from_config(&rebuilder_cfg.rabbitmq).await?; + let mut chan = conn.create_channel().await?; + + let root = Path::new(&cfg.checkout.root); + let cloner = checkout::cached_cloner(&root.join(cfg.runner.instance.to_string())); + + let events = stats::RabbitMq::from_lapin(&cfg.whoami(), conn.create_channel().await?); + + let queue_name = String::from("mass-rebuild-check-jobs"); + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.clone(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + + let handle = easylapin::WorkerChannel(chan) + .consume( + tasks::evaluate::EvaluationWorker::new( + cloner, + cfg.github_app_vendingmachine(), + cfg.acl(), + cfg.runner.identity.clone(), + events, + ), + easyamqp::ConsumeConfig { + queue: queue_name.clone(), + consumer_tag: format!("{}-mass-rebuild-checker", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + }, + ) + .await?; + + info!("Fetching jobs from {}", queue_name); + handle.await; + + drop(conn); // Close connection. + info!("Closed the session... EOF"); + Ok(()) +} diff --git a/ofborg/tickborg/src/bin/stats.rs b/ofborg/tickborg/src/bin/stats.rs new file mode 100644 index 0000000000..cf2193e00f --- /dev/null +++ b/ofborg/tickborg/src/bin/stats.rs @@ -0,0 +1,134 @@ +use std::env; +use std::error::Error; +use std::net::SocketAddr; +use std::sync::Arc; + +use http::StatusCode; +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; +use tokio::net::TcpListener; +use tracing::{error, info, warn}; + +use tickborg::easyamqp::{ChannelExt, ConsumerExt}; +use tickborg::{config, easyamqp, easylapin, stats, tasks}; + +fn response(body: String) -> Response<Full<Bytes>> { + Response::builder() + .status(StatusCode::OK) + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +async fn run_http_server( + addr: SocketAddr, + metrics: Arc<stats::MetricCollector>, +) -> Result<(), Box<dyn Error + Send + Sync>> { + let listener = TcpListener::bind(addr).await?; + info!("HTTP server listening on {}", addr); + + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + + let metrics = metrics.clone(); + + tokio::task::spawn(async move { + let service = service_fn(move |_req: Request<hyper::body::Incoming>| { + let metrics = metrics.clone(); + async move { Ok::<_, hyper::Error>(response(metrics.prometheus_output())) } + }); + + if let Err(err) = http1::Builder::new().serve_connection(io, service).await { + warn!("Error serving connection: {:?}", err); + } + }); + } +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + tickborg::setup_log(); + + let arg = env::args() + .nth(1) + .unwrap_or_else(|| panic!("usage: {} <config>", std::env::args().next().unwrap())); + let cfg = config::load(arg.as_ref()); + + let Some(stats_cfg) = config::load(arg.as_ref()).stats else { + error!("No stats configuration found!"); + panic!(); + }; + + let conn = easylapin::from_config(&stats_cfg.rabbitmq).await?; + + let mut chan = conn.create_channel().await?; + + let events = stats::RabbitMq::from_lapin(&cfg.whoami(), conn.create_channel().await?); + + let metrics = Arc::new(stats::MetricCollector::new()); + let collector = tasks::statscollector::StatCollectorWorker::new(events, (*metrics).clone()); + + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "stats".to_owned(), + exchange_type: easyamqp::ExchangeType::Fanout, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .await?; + + let queue_name = String::from("stats-events"); + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.clone(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + + chan.bind_queue(easyamqp::BindQueueConfig { + queue: queue_name.clone(), + exchange: "stats".to_owned(), + routing_key: None, + no_wait: false, + }) + .await?; + + let handle = chan + .consume( + collector, + easyamqp::ConsumeConfig { + queue: "stats-events".to_owned(), + consumer_tag: format!("{}-prometheus-stats-collector", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + }, + ) + .await?; + + // Spawn HTTP server in a separate thread with its own tokio runtime + let metrics_clone = metrics.clone(); + std::thread::spawn(async move || { + let addr: SocketAddr = "0.0.0.0:9898".parse().unwrap(); + if let Err(e) = run_http_server(addr, metrics_clone).await { + error!("HTTP server error: {:?}", e); + } + }); + + info!("Fetching jobs from {}", &queue_name); + handle.await; + + drop(conn); // Close connection. + info!("Closed the session... EOF"); + Ok(()) +} diff --git a/ofborg/tickborg/src/buildtool.rs b/ofborg/tickborg/src/buildtool.rs new file mode 100644 index 0000000000..3171fac5a0 --- /dev/null +++ b/ofborg/tickborg/src/buildtool.rs @@ -0,0 +1,455 @@ +use crate::asynccmd::{AsyncCmd, SpawnedAsyncCmd}; +use crate::message::buildresult::BuildStatus; + +use std::fmt; +use std::fs; +use std::io::{BufRead, BufReader, Seek, SeekFrom}; +use std::path::Path; +use std::process::{Command, Stdio}; + +use tempfile::tempfile; + +/// Identifies which build system a project uses. +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum BuildSystem { + CMake, + Meson, + Autotools, + Cargo, + Gradle, + Make, + Custom { command: String }, +} + +impl fmt::Display for BuildSystem { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + BuildSystem::CMake => write!(f, "cmake"), + BuildSystem::Meson => write!(f, "meson"), + BuildSystem::Autotools => write!(f, "autotools"), + BuildSystem::Cargo => write!(f, "cargo"), + BuildSystem::Gradle => write!(f, "gradle"), + BuildSystem::Make => write!(f, "make"), + BuildSystem::Custom { command } => write!(f, "custom({command})"), + } + } +} + +/// Project-specific build configuration. +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct ProjectBuildConfig { + pub name: String, + pub path: String, + pub build_system: BuildSystem, + pub build_timeout_seconds: u16, + pub configure_args: Vec<String>, + pub build_args: Vec<String>, + pub test_command: Option<Vec<String>>, +} + +/// The build executor — replaces ofborg's Nix struct. +#[derive(Clone, Debug)] +pub struct BuildExecutor { + pub build_timeout: u16, +} + +impl BuildExecutor { + pub fn new(build_timeout: u16) -> Self { + Self { build_timeout } + } + + /// Build a project using its configured build system. + pub fn build_project( + &self, + project_root: &Path, + config: &ProjectBuildConfig, + ) -> Result<fs::File, fs::File> { + let project_dir = project_root.join(&config.path); + let cmd = self.build_command(&project_dir, config); + self.run(cmd, true) + } + + /// Build a project asynchronously. + pub fn build_project_async( + &self, + project_root: &Path, + config: &ProjectBuildConfig, + ) -> SpawnedAsyncCmd { + let project_dir = project_root.join(&config.path); + let cmd = self.build_command(&project_dir, config); + AsyncCmd::new(cmd).spawn() + } + + /// Run tests for a project. + pub fn test_project( + &self, + project_root: &Path, + config: &ProjectBuildConfig, + ) -> Result<fs::File, fs::File> { + let project_dir = project_root.join(&config.path); + let cmd = self.test_command(&project_dir, config); + self.run(cmd, true) + } + + fn build_command(&self, project_dir: &Path, config: &ProjectBuildConfig) -> Command { + match &config.build_system { + BuildSystem::CMake => { + let build_dir = project_dir.join("build"); + let mut cmd = Command::new("cmake"); + cmd.arg("--build").arg(&build_dir); + cmd.args(["--config", "Release"]); + for arg in &config.build_args { + cmd.arg(arg); + } + cmd.current_dir(project_dir); + cmd + } + BuildSystem::Meson => { + let mut cmd = Command::new("meson"); + cmd.arg("compile"); + cmd.args(["-C", "build"]); + for arg in &config.build_args { + cmd.arg(arg); + } + cmd.current_dir(project_dir); + cmd + } + BuildSystem::Autotools => { + let mut cmd = Command::new("make"); + cmd.args(["-j", &num_cpus().to_string()]); + for arg in &config.build_args { + cmd.arg(arg); + } + cmd.current_dir(project_dir); + cmd + } + BuildSystem::Cargo => { + let mut cmd = Command::new("cargo"); + cmd.arg("build").arg("--release"); + for arg in &config.build_args { + cmd.arg(arg); + } + cmd.current_dir(project_dir); + cmd + } + BuildSystem::Gradle => { + let gradlew = project_dir.join("gradlew"); + let prog = if gradlew.exists() { + gradlew.to_string_lossy().to_string() + } else { + "gradle".to_string() + }; + let mut cmd = Command::new(prog); + cmd.arg("build"); + for arg in &config.build_args { + cmd.arg(arg); + } + cmd.current_dir(project_dir); + cmd + } + BuildSystem::Make => { + let mut cmd = Command::new("make"); + cmd.args(["-j", &num_cpus().to_string()]); + for arg in &config.build_args { + cmd.arg(arg); + } + cmd.current_dir(project_dir); + cmd + } + BuildSystem::Custom { command } => { + let mut cmd = Command::new("sh"); + cmd.args(["-c", command]); + for arg in &config.build_args { + cmd.arg(arg); + } + cmd.current_dir(project_dir); + cmd + } + } + } + + fn test_command(&self, project_dir: &Path, config: &ProjectBuildConfig) -> Command { + if let Some(ref test_cmd) = config.test_command { + let mut cmd = Command::new(&test_cmd[0]); + for arg in &test_cmd[1..] { + cmd.arg(arg); + } + cmd.current_dir(project_dir); + return cmd; + } + + match &config.build_system { + BuildSystem::CMake => { + let mut cmd = Command::new("ctest"); + cmd.arg("--test-dir").arg("build"); + cmd.args(["--output-on-failure"]); + cmd.current_dir(project_dir); + cmd + } + BuildSystem::Meson => { + let mut cmd = Command::new("meson"); + cmd.arg("test").args(["-C", "build"]); + cmd.current_dir(project_dir); + cmd + } + BuildSystem::Autotools | BuildSystem::Make => { + let mut cmd = Command::new("make"); + cmd.arg("check"); + cmd.current_dir(project_dir); + cmd + } + BuildSystem::Cargo => { + let mut cmd = Command::new("cargo"); + cmd.arg("test").arg("--release"); + cmd.current_dir(project_dir); + cmd + } + BuildSystem::Gradle => { + let gradlew = project_dir.join("gradlew"); + let prog = if gradlew.exists() { + gradlew.to_string_lossy().to_string() + } else { + "gradle".to_string() + }; + let mut cmd = Command::new(prog); + cmd.arg("test"); + cmd.current_dir(project_dir); + cmd + } + BuildSystem::Custom { command } => { + let mut cmd = Command::new("sh"); + cmd.args(["-c", command]); + cmd.current_dir(project_dir); + cmd + } + } + } + + pub fn run(&self, mut cmd: Command, keep_stdout: bool) -> Result<fs::File, fs::File> { + let stderr = tempfile().expect("Fetching a stderr tempfile"); + let mut reader = stderr.try_clone().expect("Cloning stderr to the reader"); + + let stdout: Stdio = if keep_stdout { + Stdio::from(stderr.try_clone().expect("Cloning stderr for stdout")) + } else { + Stdio::null() + }; + + let status = cmd + .stdout(stdout) + .stderr(Stdio::from(stderr)) + .status() + .expect("Running build command ..."); + + reader + .seek(SeekFrom::Start(0)) + .expect("Seeking to Start(0)"); + + if status.success() { + Ok(reader) + } else { + Err(reader) + } + } +} + +fn num_cpus() -> usize { + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(4) +} + +pub fn lines_from_file(file: fs::File) -> Vec<String> { + BufReader::new(file) + .lines() + .map_while(Result::ok) + .collect() +} + +pub fn wait_for_build_status(spawned: SpawnedAsyncCmd) -> BuildStatus { + match spawned.wait() { + Ok(s) => match s.code() { + Some(0) => BuildStatus::Success, + Some(_code) => BuildStatus::Failure, + None => BuildStatus::UnexpectedError { + err: "process terminated by signal".into(), + }, + }, + Err(err) => BuildStatus::UnexpectedError { + err: format!("failed on interior command {err}"), + }, + } +} + +/// Known projects in the Project Tick monorepo. +pub fn known_projects() -> Vec<ProjectBuildConfig> { + vec![ + ProjectBuildConfig { + name: "mnv".into(), + path: "mnv".into(), + build_system: BuildSystem::Autotools, + build_timeout_seconds: 1800, + configure_args: vec![], + build_args: vec![], + test_command: Some(vec!["make".into(), "check".into()]), + }, + ProjectBuildConfig { + name: "cgit".into(), + path: "cgit".into(), + build_system: BuildSystem::Make, + build_timeout_seconds: 1800, + configure_args: vec![], + build_args: vec![], + test_command: None, + }, + ProjectBuildConfig { + name: "cmark".into(), + path: "cmark".into(), + build_system: BuildSystem::CMake, + build_timeout_seconds: 1800, + configure_args: vec![], + build_args: vec![], + test_command: None, + }, + ProjectBuildConfig { + name: "neozip".into(), + path: "neozip".into(), + build_system: BuildSystem::CMake, + build_timeout_seconds: 1800, + configure_args: vec![], + build_args: vec![], + test_command: None, + }, + ProjectBuildConfig { + name: "genqrcode".into(), + path: "genqrcode".into(), + build_system: BuildSystem::CMake, + build_timeout_seconds: 1800, + configure_args: vec![], + build_args: vec![], + test_command: None, + }, + ProjectBuildConfig { + name: "json4cpp".into(), + path: "json4cpp".into(), + build_system: BuildSystem::CMake, + build_timeout_seconds: 1800, + configure_args: vec![], + build_args: vec![], + test_command: None, + }, + ProjectBuildConfig { + name: "tomlplusplus".into(), + path: "tomlplusplus".into(), + build_system: BuildSystem::Meson, + build_timeout_seconds: 1800, + configure_args: vec![], + build_args: vec![], + test_command: None, + }, + ProjectBuildConfig { + name: "libnbtplusplus".into(), + path: "libnbtplusplus".into(), + build_system: BuildSystem::CMake, + build_timeout_seconds: 1800, + configure_args: vec![], + build_args: vec![], + test_command: None, + }, + ProjectBuildConfig { + name: "meshmc".into(), + path: "meshmc".into(), + build_system: BuildSystem::CMake, + build_timeout_seconds: 3600, + configure_args: vec![], + build_args: vec!["--config".into(), "Release".into()], + test_command: None, + }, + ProjectBuildConfig { + name: "forgewrapper".into(), + path: "forgewrapper".into(), + build_system: BuildSystem::Gradle, + build_timeout_seconds: 1800, + configure_args: vec![], + build_args: vec![], + test_command: None, + }, + ProjectBuildConfig { + name: "corebinutils".into(), + path: "corebinutils".into(), + build_system: BuildSystem::Make, + build_timeout_seconds: 1800, + configure_args: vec![], + build_args: vec![], + test_command: None, + }, + ] +} + +/// Look up a project by name. +pub fn find_project(name: &str) -> Option<ProjectBuildConfig> { + known_projects().into_iter().find(|p| p.name == name) +} + +/// Detect which projects changed based on modified file paths. +pub fn detect_changed_projects(changed_files: &[String]) -> Vec<String> { + let projects = known_projects(); + let mut changed = Vec::new(); + + for project in &projects { + let prefix = format!("{}/", project.path); + if changed_files.iter().any(|f| f.starts_with(&prefix)) { + changed.push(project.name.clone()); + } + } + + changed.sort(); + changed.dedup(); + changed +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_detect_changed_projects() { + let files = vec![ + "mnv/src/main.c".into(), + "mnv/Makefile.am".into(), + "meshmc/CMakeLists.txt".into(), + "README.md".into(), + ]; + let changed = detect_changed_projects(&files); + assert_eq!(changed, vec!["meshmc", "mnv"]); + } + + #[test] + fn test_detect_changed_projects_none() { + let files = vec!["README.md".into(), ".gitignore".into()]; + let changed = detect_changed_projects(&files); + assert!(changed.is_empty()); + } + + #[test] + fn test_find_project() { + assert!(find_project("meshmc").is_some()); + assert!(find_project("nonexistent").is_none()); + assert_eq!( + find_project("meshmc").unwrap().build_system, + BuildSystem::CMake + ); + assert_eq!( + find_project("forgewrapper").unwrap().build_system, + BuildSystem::Gradle + ); + } + + #[test] + fn test_build_system_display() { + assert_eq!(BuildSystem::CMake.to_string(), "cmake"); + assert_eq!(BuildSystem::Meson.to_string(), "meson"); + assert_eq!(BuildSystem::Cargo.to_string(), "cargo"); + assert_eq!(BuildSystem::Gradle.to_string(), "gradle"); + } +} diff --git a/ofborg/tickborg/src/checkout.rs b/ofborg/tickborg/src/checkout.rs new file mode 100644 index 0000000000..731e68d3f0 --- /dev/null +++ b/ofborg/tickborg/src/checkout.rs @@ -0,0 +1,340 @@ +use crate::clone::{self, GitClonable}; + +use std::ffi::{OsStr, OsString}; +use std::fs; +use std::io::Error; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; + +use tracing::info; + +pub struct CachedCloner { + root: PathBuf, +} + +pub fn cached_cloner(path: &Path) -> CachedCloner { + CachedCloner { + root: path.to_path_buf(), + } +} + +pub struct CachedProject { + root: PathBuf, + clone_url: String, +} + +pub struct CachedProjectCo { + root: PathBuf, + id: String, + clone_url: String, + local_reference: PathBuf, +} + +impl CachedCloner { + pub fn project(&self, name: &str, clone_url: String) -> CachedProject { + // <root>/repo/<hash>/clone + // <root>/repo/<hash>/clone.lock + // <root>/repo/<hash>/<type>/<id> + // <root>/repo/<hash>/<type>/<id>.lock + + let mut new_root = self.root.clone(); + new_root.push("repo"); + new_root.push(format!("{:x}", md5::compute(name))); + + CachedProject { + root: new_root, + clone_url, + } + } +} + +impl CachedProject { + pub fn clone_for(&self, use_category: String, id: String) -> Result<CachedProjectCo, Error> { + self.prefetch_cache()?; + + let mut new_root = self.root.clone(); + new_root.push(use_category); + + Ok(CachedProjectCo { + root: new_root, + id, + clone_url: self.clone_from(), + local_reference: self.clone_to(), + }) + } + + fn prefetch_cache(&self) -> Result<PathBuf, Error> { + fs::create_dir_all(&self.root)?; + + self.clone_repo()?; + self.fetch_repo()?; + + Ok(self.clone_to()) + } +} + +impl CachedProjectCo { + pub fn checkout_origin_ref(&self, git_ref: &OsStr) -> Result<String, Error> { + let mut pref = OsString::from("origin/"); + pref.push(git_ref); + + self.checkout_ref(&pref) + } + + pub fn checkout_ref(&self, git_ref: &OsStr) -> Result<String, Error> { + fs::create_dir_all(&self.root)?; + + self.clone_repo()?; + self.fetch_repo()?; + self.clean()?; + self.checkout(git_ref)?; + + // let build_dir = self.build_dir(); + + let canonicalized = fs::canonicalize(self.clone_to()).unwrap(); + Ok(canonicalized.to_str().unwrap().to_string()) + } + + pub fn fetch_pr(&self, pr_id: u64) -> Result<(), Error> { + let mut lock = self.lock()?; + + info!("Fetching PR #{}", pr_id); + let result = Command::new("git") + .arg("fetch") + .arg("origin") + .arg(format!("+refs/pull/{pr_id}/head:pr")) + .current_dir(self.clone_to()) + .stdout(Stdio::null()) + .status()?; + + lock.unlock(); + + if result.success() { + Ok(()) + } else { + Err(Error::other("Failed to fetch PR")) + } + } + + pub fn commit_exists(&self, commit: &OsStr) -> bool { + let mut lock = self.lock().expect("Failed to lock"); + + info!("Checking if commit {:?} exists", commit); + let result = Command::new("git") + .arg("--no-pager") + .arg("show") + .arg(commit) + .current_dir(self.clone_to()) + .stdout(Stdio::null()) + .status() + .expect("git show <commit> failed"); + + lock.unlock(); + + result.success() + } + + pub fn merge_commit(&self, commit: &OsStr) -> Result<(), Error> { + let mut lock = self.lock()?; + + info!("Merging commit {:?}", commit); + let result = Command::new("git") + .arg("merge") + .arg("--no-gpg-sign") + .arg("-m") + .arg("Automatic merge for GrahamCOfBorg") + .arg(commit) + .current_dir(self.clone_to()) + .stdout(Stdio::null()) + .status()?; + + lock.unlock(); + + if result.success() { + Ok(()) + } else { + Err(Error::other("Failed to merge")) + } + } + + pub fn commit_messages_from_head(&self, commit: &str) -> Result<Vec<String>, Error> { + let mut lock = self.lock()?; + + let result = Command::new("git") + .arg("log") + .arg("--format=format:%s") + .arg(format!("HEAD..{commit}")) + .current_dir(self.clone_to()) + .output()?; + + lock.unlock(); + + if result.status.success() { + Ok(String::from_utf8_lossy(&result.stdout) + .lines() + .map(|l| l.to_owned()) + .collect()) + } else { + Err(Error::other( + String::from_utf8_lossy(&result.stderr).to_lowercase(), + )) + } + } + + pub fn files_changed_from_head(&self, commit: &str) -> Result<Vec<String>, Error> { + let mut lock = self.lock()?; + + let result = Command::new("git") + .arg("diff") + .arg("--name-only") + .arg(format!("HEAD...{commit}")) + .current_dir(self.clone_to()) + .output()?; + + lock.unlock(); + + if result.status.success() { + Ok(String::from_utf8_lossy(&result.stdout) + .lines() + .map(|l| l.to_owned()) + .collect()) + } else { + Err(Error::other( + String::from_utf8_lossy(&result.stderr).to_lowercase(), + )) + } + } +} + +impl clone::GitClonable for CachedProjectCo { + fn clone_from(&self) -> String { + self.clone_url.clone() + } + + fn clone_to(&self) -> PathBuf { + let mut clone_path = self.root.clone(); + clone_path.push(&self.id); + clone_path + } + + fn lock_path(&self) -> PathBuf { + let mut lock_path = self.root.clone(); + lock_path.push(format!("{}.lock", self.id)); + lock_path + } + + fn extra_clone_args(&self) -> Vec<&OsStr> { + let local_ref = self.local_reference.as_ref(); + vec![ + OsStr::new("--shared"), + OsStr::new("--reference-if-able"), + local_ref, + ] + } +} + +impl clone::GitClonable for CachedProject { + fn clone_from(&self) -> String { + self.clone_url.clone() + } + + fn clone_to(&self) -> PathBuf { + let mut clone_path = self.root.clone(); + clone_path.push("clone"); + clone_path + } + + fn lock_path(&self) -> PathBuf { + let mut clone_path = self.root.clone(); + clone_path.push("clone.lock"); + clone_path + } + + fn extra_clone_args(&self) -> Vec<&OsStr> { + vec![OsStr::new("--bare")] + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_scratch::TestScratch; + use std::path::{Path, PathBuf}; + use std::process::{Command, Stdio}; + + fn tpath(component: &str) -> PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")).join(component) + } + + fn make_pr_repo(bare: &Path, co: &Path) -> String { + let output = Command::new("bash") + .current_dir(tpath("./test-srcs")) + .arg("./make-pr.sh") + .arg(bare) + .arg(co) + .stdout(Stdio::piped()) + .output() + .expect("building the test PR failed"); + + let stderr = + String::from_utf8(output.stderr).unwrap_or_else(|err| format!("warning: {err}")); + println!("{stderr}"); + + let hash = String::from_utf8(output.stdout).expect("Should just be a hash"); + hash.trim().to_owned() + } + + #[test] + pub fn test_commit_msg_list() { + let workingdir = TestScratch::new_dir("test-test-commit-msg-list"); + + let bare = TestScratch::new_dir("bare-commit-messages"); + let mk_co = TestScratch::new_dir("mk-commit-messages"); + let hash = make_pr_repo(&bare.path(), &mk_co.path()); + + let cloner = cached_cloner(&workingdir.path()); + let project = cloner.project("commit-msg-list", bare.string()); + let working_co = project + .clone_for("testing-commit-msgs".to_owned(), "123".to_owned()) + .expect("clone should work"); + working_co + .checkout_origin_ref(OsStr::new("master")) + .unwrap(); + + let expect: Vec<String> = vec!["check out this cool PR".to_owned()]; + + assert_eq!( + working_co + .commit_messages_from_head(&hash) + .expect("fetching messages should work",), + expect + ); + } + + #[test] + pub fn test_files_changed_list() { + let workingdir = TestScratch::new_dir("test-test-files-changed-list"); + + let bare = TestScratch::new_dir("bare-files-changed"); + let mk_co = TestScratch::new_dir("mk-files-changed"); + let hash = make_pr_repo(&bare.path(), &mk_co.path()); + + let cloner = cached_cloner(&workingdir.path()); + let project = cloner.project("commit-files-changed-list", bare.string()); + let working_co = project + .clone_for("testing-files-changed".to_owned(), "123".to_owned()) + .expect("clone should work"); + working_co + .checkout_origin_ref(OsStr::new("master")) + .unwrap(); + + let expect: Vec<String> = vec!["default.nix".to_owned(), "hi another file".to_owned()]; + + assert_eq!( + working_co + .files_changed_from_head(&hash) + .expect("fetching files changed should work",), + expect + ); + } +} diff --git a/ofborg/tickborg/src/clone.rs b/ofborg/tickborg/src/clone.rs new file mode 100644 index 0000000000..0dcb71c2c5 --- /dev/null +++ b/ofborg/tickborg/src/clone.rs @@ -0,0 +1,173 @@ +use fs2::FileExt; + +use std::ffi::OsStr; +use std::fs; +use std::io::Error; +use std::path::PathBuf; +use std::process::{Command, Stdio}; + +use tracing::{debug, info, warn}; + +pub struct Lock { + lock: Option<fs::File>, +} + +impl Lock { + pub fn unlock(&mut self) { + self.lock = None + } +} + +pub trait GitClonable { + fn clone_from(&self) -> String; + fn clone_to(&self) -> PathBuf; + fn extra_clone_args(&self) -> Vec<&OsStr>; + + fn lock_path(&self) -> PathBuf; + + fn lock(&self) -> Result<Lock, Error> { + debug!("Locking {:?}", self.lock_path()); + + match fs::File::create(self.lock_path()) { + Err(e) => { + warn!("Failed to create lock file {:?}: {}", self.lock_path(), e); + Err(e) + } + Ok(lock) => match lock.lock_exclusive() { + Err(e) => { + warn!( + "Failed to get exclusive lock on file {:?}: {}", + self.lock_path(), + e + ); + Err(e) + } + Ok(_) => { + debug!("Got lock on {:?}", self.lock_path()); + Ok(Lock { lock: Some(lock) }) + } + }, + } + } + + fn clone_repo(&self) -> Result<(), Error> { + let mut lock = self.lock()?; + + if self.clone_to().is_dir() { + debug!("Found dir at {:?}, initial clone is done", self.clone_to()); + return Ok(()); + } + + info!( + "Initial cloning of {} to {:?}", + self.clone_from(), + self.clone_to() + ); + + let result = Command::new("git") + .arg("clone") + .args(self.extra_clone_args()) + .arg(self.clone_from()) + .arg(self.clone_to()) + .stdout(Stdio::null()) + .status()?; + + lock.unlock(); + + if result.success() { + Ok(()) + } else { + Err(Error::other(format!( + "Failed to clone from {:?} to {:?}", + self.clone_from(), + self.clone_to() + ))) + } + } + + fn fetch_repo(&self) -> Result<(), Error> { + let mut lock = self.lock()?; + + info!("Fetching from origin in {:?}", self.clone_to()); + let result = Command::new("git") + .arg("fetch") + .arg("origin") + .current_dir(self.clone_to()) + .stdout(Stdio::null()) + .status()?; + + lock.unlock(); + + if result.success() { + Ok(()) + } else { + Err(Error::other("Failed to fetch")) + } + } + + fn clean(&self) -> Result<(), Error> { + let mut lock = self.lock()?; + + debug!("git am --abort"); + Command::new("git") + .arg("am") + .arg("--abort") + .current_dir(self.clone_to()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status()?; + + debug!("git merge --abort"); + Command::new("git") + .arg("merge") + .arg("--abort") + .current_dir(self.clone_to()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status()?; + + debug!("git reset --hard"); + Command::new("git") + .arg("reset") + .arg("--hard") + .current_dir(self.clone_to()) + .stdout(Stdio::null()) + .status()?; + + debug!("git clean -x -d --force"); + Command::new("git") + .arg("clean") + .arg("-x") + .arg("-d") + .arg("--force") + .current_dir(self.clone_to()) + .stdout(Stdio::null()) + .status()?; + + lock.unlock(); + + Ok(()) + } + + fn checkout(&self, git_ref: &OsStr) -> Result<(), Error> { + let mut lock = self.lock()?; + + debug!("git checkout {:?}", git_ref); + let result = Command::new("git") + .arg("checkout") + // we don't care if its dirty + .arg("--force") + .arg(git_ref) + .current_dir(self.clone_to()) + .stdout(Stdio::null()) + .status()?; + + lock.unlock(); + + if result.success() { + Ok(()) + } else { + Err(Error::other("Failed to checkout")) + } + } +} diff --git a/ofborg/tickborg/src/commentparser.rs b/ofborg/tickborg/src/commentparser.rs new file mode 100644 index 0000000000..f255af85d8 --- /dev/null +++ b/ofborg/tickborg/src/commentparser.rs @@ -0,0 +1,289 @@ +use nom::IResult; +use nom::Parser; +use nom::branch::alt; +use nom::bytes::complete::tag; +use nom::bytes::complete::tag_no_case; +use nom::character::complete::multispace0; +use nom::character::complete::multispace1; +use nom::combinator::map; +use nom::multi::many1; +use nom::sequence::preceded; +use tracing::warn; + +pub fn parse(text: &str) -> Option<Vec<Instruction>> { + let instructions: Vec<Instruction> = text + .lines() + .flat_map(|s| match parse_line(s) { + Some(instructions) => instructions.into_iter(), + None => Vec::new().into_iter(), + }) + .collect(); + + if instructions.is_empty() { + None + } else { + Some(instructions) + } +} + +fn is_not_whitespace(c: char) -> bool { + !c.is_ascii_whitespace() +} + +fn normal_token(input: &str) -> IResult<&str, String> { + let (input, tokens) = + many1(nom::character::complete::satisfy(is_not_whitespace)).parse(input)?; + + let s: String = tokens.into_iter().collect(); + if s.eq_ignore_ascii_case("@tickbot") { + Err(nom::Err::Error(nom::error::Error::new( + input, + nom::error::ErrorKind::Tag, + ))) + } else { + Ok((input, s)) + } +} + +fn parse_command(input: &str) -> IResult<&str, Instruction> { + alt(( + preceded( + preceded(multispace0, tag("build")), + preceded( + multispace1, + map(many1(preceded(multispace0, normal_token)), |targets| { + Instruction::Build(Subset::Project, targets) + }), + ), + ), + preceded( + preceded(multispace0, tag("test")), + preceded( + multispace1, + map(many1(preceded(multispace0, normal_token)), |targets| { + Instruction::Test(targets) + }), + ), + ), + preceded(multispace0, map(tag("eval"), |_| Instruction::Eval)), + )) + .parse(input) +} + +fn parse_line_impl(input: &str) -> IResult<&str, Option<Vec<Instruction>>> { + let (input, _) = multispace0.parse(input)?; + + let result = map( + many1(preceded( + multispace0, + preceded( + tag_no_case("@tickbot"), + preceded(multispace1, parse_command), + ), + )), + |instructions| Some(instructions), + ) + .parse(input); + + match result { + Ok((rest, instructions)) => Ok((rest, instructions)), + Err(_e) => Ok((input, None)), + } +} + +pub fn parse_line(text: &str) -> Option<Vec<Instruction>> { + match parse_line_impl(text) { + Ok((_, res)) => res, + Err(e) => { + warn!("Failed parsing string '{}': result was {:?}", text, e); + None + } + } +} + +#[derive(PartialEq, Eq, Debug, Clone)] +pub enum Instruction { + Build(Subset, Vec<String>), + Test(Vec<String>), + Eval, +} + +#[allow(clippy::upper_case_acronyms)] +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq, Clone)] +pub enum Subset { + Project, +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn parse_empty() { + assert_eq!(None, parse("")); + } + + #[test] + fn valid_trailing_instruction() { + assert_eq!( + Some(vec![Instruction::Eval]), + parse( + "/cc @samet for ^^ +@tickbot eval", + ) + ); + } + + #[test] + fn bogus_comment() { + assert_eq!(None, parse(":) :) :) @tickbot build hi")); + } + + #[test] + fn bogus_build_comment_empty_list() { + assert_eq!(None, parse("@tickbot build")); + } + + #[test] + fn eval_comment() { + assert_eq!(Some(vec![Instruction::Eval]), parse("@tickbot eval")); + } + + #[test] + fn eval_and_build_comment() { + assert_eq!( + Some(vec![ + Instruction::Eval, + Instruction::Build(Subset::Project, vec![String::from("meshmc")]), + ]), + parse("@tickbot eval @tickbot build meshmc") + ); + } + + #[test] + fn build_and_eval_and_build_comment() { + assert_eq!( + Some(vec![ + Instruction::Build(Subset::Project, vec![String::from("mnv")]), + Instruction::Eval, + Instruction::Build(Subset::Project, vec![String::from("meshmc")]), + ]), + parse( + " +@tickbot build mnv +@tickbot eval +@tickbot build meshmc", + ) + ); + } + + #[test] + fn complex_comment_with_paragraphs() { + assert_eq!( + Some(vec![ + Instruction::Build(Subset::Project, vec![String::from("mnv")]), + Instruction::Eval, + Instruction::Build(Subset::Project, vec![String::from("meshmc")]), + ]), + parse( + " +I like where you're going with this PR, so let's try it out! + +@tickbot build mnv + +I noticed though that the target branch was broken, which should be fixed. Let's eval again. + +@tickbot eval + +Also, just in case, let's try meshmc +@tickbot build meshmc", + ) + ); + } + + #[test] + fn build_and_eval_comment() { + assert_eq!( + Some(vec![ + Instruction::Build(Subset::Project, vec![String::from("meshmc")]), + Instruction::Eval, + ]), + parse("@tickbot build meshmc @tickbot eval") + ); + } + + #[test] + fn build_comment() { + assert_eq!( + Some(vec![Instruction::Build( + Subset::Project, + vec![String::from("meshmc"), String::from("mnv")] + ),]), + parse( + "@tickbot build meshmc mnv + +neozip", + ) + ); + } + + #[test] + fn test_comment() { + assert_eq!( + Some(vec![Instruction::Test( + vec![ + String::from("meshmc"), + String::from("mnv"), + String::from("neozip"), + ] + ),]), + parse("@tickbot test meshmc mnv neozip") + ); + } + + #[test] + fn build_comment_newlines() { + assert_eq!( + Some(vec![Instruction::Build( + Subset::Project, + vec![ + String::from("meshmc"), + String::from("mnv"), + String::from("neozip"), + ] + ),]), + parse("@tickbot build meshmc mnv neozip") + ); + } + + #[test] + fn build_comment_case_insensitive_tag() { + assert_eq!( + Some(vec![Instruction::Build( + Subset::Project, + vec![ + String::from("meshmc"), + String::from("mnv"), + String::from("neozip"), + ] + ),]), + parse("@TickBot build meshmc mnv neozip") + ); + } + + #[test] + fn build_comment_lower_package_case_retained() { + assert_eq!( + Some(vec![Instruction::Build( + Subset::Project, + vec![ + String::from("meshmc"), + String::from("mnv"), + String::from("json4cpp"), + ] + ),]), + parse("@tickbot build meshmc mnv json4cpp") + ); + } +} diff --git a/ofborg/tickborg/src/commitstatus.rs b/ofborg/tickborg/src/commitstatus.rs new file mode 100644 index 0000000000..6747f3b048 --- /dev/null +++ b/ofborg/tickborg/src/commitstatus.rs @@ -0,0 +1,103 @@ +use futures_util::future::TryFutureExt; +use tracing::warn; + +pub struct CommitStatus { + api: hubcaps::statuses::Statuses, + sha: String, + context: String, + description: String, + url: String, +} + +impl CommitStatus { + pub fn new( + api: hubcaps::statuses::Statuses, + sha: String, + context: String, + description: String, + url: Option<String>, + ) -> CommitStatus { + let mut stat = CommitStatus { + api, + sha, + context, + description, + url: "".to_owned(), + }; + + stat.set_url(url); + + stat + } + + pub fn set_url(&mut self, url: Option<String>) { + self.url = url.unwrap_or_else(|| String::from("")) + } + + pub async fn set_with_description( + &mut self, + description: &str, + state: hubcaps::statuses::State, + ) -> Result<(), CommitStatusError> { + self.set_description(description.to_owned()); + self.set(state).await + } + + pub fn set_description(&mut self, description: String) { + self.description = description; + } + + pub async fn set(&self, state: hubcaps::statuses::State) -> Result<(), CommitStatusError> { + let desc = if self.description.len() >= 140 { + warn!( + "description is over 140 char; truncating: {:?}", + &self.description + ); + self.description.chars().take(140).collect() + } else { + self.description.clone() + }; + self.api + .create( + self.sha.as_ref(), + &hubcaps::statuses::StatusOptions::builder(state) + .context(self.context.clone()) + .description(desc) + .target_url(self.url.clone()) + .build(), + ) + .map_ok(|_| ()) + .map_err(|e| CommitStatusError::from(e)) + .await?; + Ok(()) + } +} + +#[derive(Debug)] +pub enum CommitStatusError { + ExpiredCreds(hubcaps::Error), + MissingSha(hubcaps::Error), + Error(hubcaps::Error), + InternalError(String), +} + +impl From<hubcaps::Error> for CommitStatusError { + fn from(e: hubcaps::Error) -> CommitStatusError { + use http::status::StatusCode; + use hubcaps::Error; + match &e { + Error::Fault { code, error } + if code == &StatusCode::UNAUTHORIZED && error.message == "Bad credentials" => + { + CommitStatusError::ExpiredCreds(e) + } + Error::Fault { code, error } + if code == &StatusCode::UNPROCESSABLE_ENTITY + && error.message.starts_with("No commit found for SHA:") => + { + CommitStatusError::MissingSha(e) + } + _otherwise => CommitStatusError::Error(e), + } + } +} diff --git a/ofborg/tickborg/src/config.rs b/ofborg/tickborg/src/config.rs new file mode 100644 index 0000000000..7d7475e3b6 --- /dev/null +++ b/ofborg/tickborg/src/config.rs @@ -0,0 +1,387 @@ +use crate::acl; +use crate::buildtool::BuildExecutor; + +use std::collections::{HashMap, hash_map::Entry}; +use std::fmt; +use std::fs::File; +use std::io::Read; +use std::marker::PhantomData; +use std::path::{Path, PathBuf}; + +use hubcaps::{Credentials, Github, InstallationTokenGenerator, JWTCredentials}; +use rustls_pki_types::pem::PemObject as _; +use serde::de::{self, Deserializer}; +use tracing::{debug, error, info, warn}; + +/// Main tickborg configuration +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct Config { + /// Configuration for the webhook receiver + pub github_webhook_receiver: Option<GithubWebhookConfig>, + /// Configuration for the logapi receiver + pub log_api_config: Option<LogApiConfig>, + /// Configuration for the evaluation filter + pub evaluation_filter: Option<EvaluationFilter>, + /// Configuration for the GitHub comment filter + pub github_comment_filter: Option<GithubCommentFilter>, + /// Configuration for the GitHub comment poster + pub github_comment_poster: Option<GithubCommentPoster>, + /// Configuration for the mass rebuilder + pub mass_rebuilder: Option<MassRebuilder>, + /// Configuration for the builder + pub builder: Option<Builder>, + /// Configuration for the log message collector + pub log_message_collector: Option<LogMessageCollector>, + /// Configuration for the stats server + pub stats: Option<Stats>, + pub runner: RunnerConfig, + pub checkout: CheckoutConfig, + pub build: BuildConfig, + pub github_app: Option<GithubAppConfig>, +} + +/// Configuration for the webhook receiver +#[derive(serde::Serialize, serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GithubWebhookConfig { + /// Listen host/port + pub listen: String, + /// Path to the GitHub webhook secret + pub webhook_secret_file: String, + /// RabbitMQ broker to connect to + pub rabbitmq: RabbitMqConfig, +} + +fn default_logs_path() -> String { + "/var/log/tickborg".into() +} + +fn default_serve_root() -> String { + "https://logs.tickborg.project-tick.net/logfile".into() +} + +/// Configuration for logapi +#[derive(serde::Serialize, serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct LogApiConfig { + /// Listen host/port + pub listen: String, + #[serde(default = "default_logs_path")] + pub logs_path: String, + #[serde(default = "default_serve_root")] + pub serve_root: String, +} + +/// Configuration for the evaluation filter +#[derive(serde::Serialize, serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct EvaluationFilter { + /// RabbitMQ broker to connect to + pub rabbitmq: RabbitMqConfig, +} + +/// Configuration for the GitHub comment filter +#[derive(serde::Serialize, serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GithubCommentFilter { + /// RabbitMQ broker to connect to + pub rabbitmq: RabbitMqConfig, +} + +/// Configuration for the GitHub comment poster +#[derive(serde::Serialize, serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GithubCommentPoster { + /// RabbitMQ broker to connect to + pub rabbitmq: RabbitMqConfig, +} + +/// Configuration for the mass rebuilder +#[derive(serde::Serialize, serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MassRebuilder { + /// RabbitMQ broker to connect to + pub rabbitmq: RabbitMqConfig, +} + +/// Configuration for the builder +#[derive(serde::Serialize, serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct Builder { + /// RabbitMQ broker to connect to + pub rabbitmq: RabbitMqConfig, +} + +/// Configuration for the log message collector +#[derive(serde::Serialize, serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct LogMessageCollector { + /// RabbitMQ broker to connect to + pub rabbitmq: RabbitMqConfig, + /// Path where the logs reside + pub logs_path: String, +} + +/// Configuration for the stats exporter +#[derive(serde::Serialize, serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct Stats { + /// RabbitMQ broker to connect to + pub rabbitmq: RabbitMqConfig, +} + +/// Configures the connection to a RabbitMQ instance +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct RabbitMqConfig { + /// Whether or not to use SSL + pub ssl: bool, + /// Hostname to conenct to + pub host: String, + /// Virtual host to use (defaults to /) + pub virtualhost: Option<String>, + /// Username to connect with + pub username: String, + /// File to read the user password from. Contents are automatically stripped + pub password_file: PathBuf, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct BuildConfig { + #[serde(deserialize_with = "deserialize_one_or_many")] + pub system: Vec<String>, + pub build_timeout_seconds: u16, + /// Additional environment variables for build commands + pub extra_env: Option<HashMap<String, String>>, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub struct GithubAppConfig { + pub app_id: u64, + pub private_key: PathBuf, + pub oauth_client_id: String, + pub oauth_client_secret_file: PathBuf, +} + +const fn default_instance() -> u8 { + 1 +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct RunnerConfig { + #[serde(default = "default_instance")] + pub instance: u8, + pub identity: String, + /// List of GitHub repos we feel responsible for + pub repos: Option<Vec<String>>, + /// Whether to use the `trusted_users` field or just allow everyone + #[serde(default = "Default::default")] + pub disable_trusted_users: bool, + /// List of users who are allowed to build on less sandboxed platforms + pub trusted_users: Option<Vec<String>>, + + /// If true, will create its own queue attached to the build job + /// exchange. This means that builders with this enabled will + /// trigger duplicate replies to the request for this + /// architecture. + /// + /// This should only be turned on for development. + pub build_all_jobs: Option<bool>, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct CheckoutConfig { + pub root: String, +} + +impl Config { + pub fn whoami(&self) -> String { + format!("{}-{}", self.runner.identity, self.build.system.join(",")) + } + + pub fn acl(&self) -> acl::Acl { + let repos = self + .runner + .repos + .clone() + .expect("fetching config's runner.repos"); + + let trusted_users = if self.runner.disable_trusted_users { + None + } else { + Some( + self.runner + .trusted_users + .clone() + .expect("fetching config's runner.trusted_users"), + ) + }; + + acl::Acl::new(repos, trusted_users) + } + + pub fn github(&self) -> Github { + let token = std::fs::read_to_string( + self.github_app + .clone() + .expect("No GitHub app configured") + .oauth_client_secret_file, + ) + .expect("Couldn't read from GitHub app token"); + let token = token.trim(); + Github::new( + "github.com/Project-Tick/tickborg", + Credentials::Client( + self.github_app + .clone() + .expect("No GitHub app configured") + .oauth_client_id, + token.to_owned(), + ), + ) + .expect("Unable to create a github client instance") + } + + pub fn github_app_vendingmachine(&self) -> GithubAppVendingMachine { + GithubAppVendingMachine { + conf: self.github_app.clone().unwrap(), + id_cache: HashMap::new(), + client_cache: HashMap::new(), + } + } + + pub fn build_executor(&self) -> BuildExecutor { + if self.build.build_timeout_seconds < 300 { + error!(?self.build.build_timeout_seconds, "Please set build_timeout_seconds to at least 300"); + panic!(); + } + + BuildExecutor::new( + self.build.build_timeout_seconds, + ) + } +} + +impl RabbitMqConfig { + pub fn as_uri(&self) -> Result<String, std::io::Error> { + let password = std::fs::read_to_string(&self.password_file).inspect_err(|_| { + error!( + "Unable to read RabbitMQ password file at {:?}", + self.password_file + ); + })?; + let uri = format!( + "{}://{}:{}@{}/{}", + if self.ssl { "amqps" } else { "amqp" }, + self.username, + password, + self.host, + self.virtualhost.clone().unwrap_or_else(|| "/".to_owned()), + ); + Ok(uri) + } +} + +pub fn load(filename: &Path) -> Config { + let mut file = File::open(filename).unwrap(); + let mut contents = String::new(); + file.read_to_string(&mut contents).unwrap(); + + let deserialized: Config = serde_json::from_str(&contents).unwrap(); + + deserialized +} + +pub struct GithubAppVendingMachine { + conf: GithubAppConfig, + id_cache: HashMap<(String, String), Option<u64>>, + client_cache: HashMap<u64, Github>, +} + +impl GithubAppVendingMachine { + fn useragent(&self) -> &'static str { + "github.com/Project-Tick/tickborg (app)" + } + + fn jwt(&self) -> JWTCredentials { + let pem = rustls_pki_types::PrivatePkcs1KeyDer::from_pem_file(&self.conf.private_key) + .expect("Unable to read private key"); + let private_key_der = pem.secret_pkcs1_der().to_vec(); + JWTCredentials::new(self.conf.app_id, private_key_der) + .expect("Unable to create JWTCredentials") + } + + async fn install_id_for_repo(&mut self, owner: &str, repo: &str) -> Option<u64> { + let useragent = self.useragent(); + let jwt = self.jwt(); + + let key = (owner.to_owned(), repo.to_owned()); + + match self.id_cache.entry(key) { + Entry::Occupied(entry) => *entry.get(), + Entry::Vacant(entry) => { + info!("Looking up install ID for {}/{}", owner, repo); + + let lookup_gh = Github::new(useragent, Credentials::JWT(jwt)).unwrap(); + + let v = match lookup_gh.app().find_repo_installation(owner, repo).await { + Ok(install_id) => { + debug!("Received install ID {:?}", install_id); + Some(install_id.id) + } + Err(e) => { + warn!("Error during install ID lookup: {:?}", e); + None + } + }; + *entry.insert(v) + } + } + } + + pub async fn for_repo<'a>(&'a mut self, owner: &str, repo: &str) -> Option<&'a Github> { + let useragent = self.useragent(); + let jwt = self.jwt(); + let install_id = self.install_id_for_repo(owner, repo).await?; + + Some(self.client_cache.entry(install_id).or_insert_with(|| { + Github::new( + useragent, + Credentials::InstallationToken(InstallationTokenGenerator::new(install_id, jwt)), + ) + .expect("Unable to create a github client instance") + })) + } +} + +// Copied from https://stackoverflow.com/a/43627388 +fn deserialize_one_or_many<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error> +where + D: Deserializer<'de>, +{ + struct StringOrVec(PhantomData<Vec<String>>); + + impl<'de> de::Visitor<'de> for StringOrVec { + type Value = Vec<String>; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("string or list of strings") + } + + fn visit_str<E>(self, value: &str) -> Result<Self::Value, E> + where + E: de::Error, + { + Ok(vec![value.to_owned()]) + } + + fn visit_seq<S>(self, visitor: S) -> Result<Self::Value, S::Error> + where + S: de::SeqAccess<'de>, + { + serde::de::Deserialize::deserialize(de::value::SeqAccessDeserializer::new(visitor)) + } + } + + deserializer.deserialize_any(StringOrVec(PhantomData)) +} diff --git a/ofborg/tickborg/src/easyamqp.rs b/ofborg/tickborg/src/easyamqp.rs new file mode 100644 index 0000000000..2a84bb84ff --- /dev/null +++ b/ofborg/tickborg/src/easyamqp.rs @@ -0,0 +1,287 @@ +pub struct ConsumeConfig { + /// Specifies the name of the queue to consume from. + pub queue: String, + + /// Specifies the identifier for the consumer. The consumer tag is + /// local to a channel, so two clients can use the same consumer + /// tags. If this field is empty the server will generate a unique + /// tag. + /// + /// The client MUST NOT specify a tag that refers to an existing + /// consumer. Error code: not-allowed + pub consumer_tag: String, + + /// If the no-local field is set the server will not send messages + /// to the connection that published them. + pub no_local: bool, + + /// If this field is set the server does not expect + /// acknowledgements for messages. That is, when a message is + /// delivered to the client the server assumes the delivery will + /// succeed and immediately dequeues it. This functionality may + /// increase performance but at the cost of reliability. Messages + /// can get lost if a client dies before they are delivered to the + /// application. + pub no_ack: bool, + + /// Request exclusive consumer access, meaning only this consumer + /// can access the queue. + /// + /// The client MAY NOT gain exclusive access to a queue that + /// already has active consumers. Error code: access-refused + pub exclusive: bool, + + /// If set, the server will not respond to the method. The client + /// should not wait for a reply method. If the server could not + /// complete the method it will raise a channel or connection + /// exception. + pub no_wait: bool, +} + +pub struct BindQueueConfig { + /// Specifies the name of the queue to bind. + /// + /// The client MUST either specify a queue name or have previously + /// declared a queue on the same channel Error code: not-found + /// + /// The client MUST NOT attempt to bind a queue that does not + /// exist. Error code: not-found + pub queue: String, + + /// Name of the exchange to bind to. + /// + /// A client MUST NOT be allowed to bind a queue to a non-existent + /// exchange. Error code: not-found + /// + /// The server MUST accept a blank exchange name to mean the + /// default exchange. + pub exchange: String, + + /// Specifies the routing key for the binding. The routing key is + /// used for routing messages depending on the exchange + /// configuration. Not all exchanges use a routing key - refer to + /// the specific exchange documentation. If the queue name is + /// empty, the server uses the last queue declared on the channel. + /// If the routing key is also empty, the server uses this queue + /// name for the routing key as well. If the queue name is + /// provided but the routing key is empty, the server does the + /// binding with that empty routing key. The meaning of empty + /// routing keys depends on the exchange implementation. + /// + /// If a message queue binds to a direct exchange using routing + /// key K and a publisher sends the exchange a message with + /// routing key R, then the message MUST be passed to the message + /// queue if K = R. + pub routing_key: Option<String>, + + /// If set, the server will not respond to the method. The client + /// should not wait for a reply method. If the server could not + /// complete the method it will raise a channel or connection + /// exception. + pub no_wait: bool, +} + +pub enum ExchangeType { + Topic, + Headers, + Fanout, + Direct, + Custom(String), +} + +impl From<ExchangeType> for String { + fn from(exchange_type: ExchangeType) -> String { + match exchange_type { + ExchangeType::Topic => "topic".to_owned(), + ExchangeType::Headers => "headers".to_owned(), + ExchangeType::Fanout => "fanout".to_owned(), + ExchangeType::Direct => "direct".to_owned(), + ExchangeType::Custom(x) => x, + } + } +} + +pub struct ExchangeConfig { + /// Exchange names starting with "amq." are reserved for + /// pre-declared and standardised exchanges. The client MAY + /// declare an exchange starting with "amq." if the passive option + /// is set, or the exchange already exists. Error code: + /// access-refused + /// + /// The exchange name consists of a non-empty sequence of these + /// characters: letters, digits, hyphen, underscore, period, or + /// colon. Error code: precondition-failed + pub exchange: String, + + /// Each exchange belongs to one of a set of exchange types + /// implemented by the server. The exchange types define the + /// functionality of the exchange - i.e. how messages are routed + /// through it. It is not valid or meaningful to attempt to change + /// the type of an existing exchange. + /// + /// Exchanges cannot be redeclared with different types. The + /// client MUST not attempt to redeclare an existing exchange with + /// a different type than used in the original Exchange.Declare + /// method. Error code: not-allowed + /// + /// The client MUST NOT attempt to declare an exchange with a type + /// that the server does not support. Error code: command-invalid + pub exchange_type: ExchangeType, + + /// If set, the server will reply with Declare-Ok if the exchange + /// already exists with the same name, and raise an error if not. + /// The client can use this to check whether an exchange exists + /// without modifying the server state. When set, all other method + /// fields except name and no-wait are ignored. A declare with + /// both passive and no-wait has no effect. Arguments are compared + /// for semantic equivalence. + /// + /// If set, and the exchange does not already exist, the server + /// MUST raise a channel exception with reply code 404 (not + /// found). + /// + /// If not set and the exchange exists, the server MUST check that + /// the existing exchange has the same values for type, durable, + /// and arguments fields. The server MUST respond with Declare-Ok + /// if the requested exchange matches these fields, and MUST raise + /// a channel exception if not. + pub passive: bool, + + /// If set when creating a new exchange, the exchange will be + /// marked as durable. Durable exchanges remain active when a + /// server restarts. Non-durable exchanges (transient exchanges) + /// are purged if/when a server restarts. + /// + /// The server MUST support both durable and transient exchanges. + pub durable: bool, + + /// If set, the exchange is deleted when all queues have finished + /// using it. + /// + /// The server SHOULD allow for a reasonable delay between the + /// point when it determines that an exchange is not being used + /// (or no longer used), and the point when it deletes the + /// exchange. At the least it must allow a client to create an + /// exchange and then bind a queue to it, with a small but + /// non-zero delay between these two actions. + /// + /// The server MUST ignore the auto-delete field if the exchange + /// already exists. + pub auto_delete: bool, + + /// If set, the exchange may not be used directly by publishers, + /// but only when bound to other exchanges. Internal exchanges are + /// used to construct wiring that is not visible to applications. + pub internal: bool, + + /// If set, the server will not respond to the method. The client + /// should not wait for a reply method. If the server could not + /// complete the method it will raise a channel or connection + /// exception. + pub no_wait: bool, +} + +pub struct QueueConfig { + /// The queue name MAY be empty, in which case the server MUST + /// create a new queue with a unique generated name and return + /// this to the client in the Declare-Ok method. + /// + /// Queue names starting with "amq." are reserved for pre-declared + /// and standardised queues. The client MAY declare a queue + /// starting with "amq." if the passive option is set, or the + /// queue already exists. Error code: access-refused + /// + /// The queue name can be empty, or a sequence of these + /// characters: letters, digits, hyphen, underscore, period, or + /// colon. Error code: precondition-failed + pub queue: String, + + /// If set, the server will reply with Declare-Ok if the queue + /// already exists with the same name, and raise an error if not. + /// The client can use this to check whether a queue exists + /// without modifying the server state. When set, all other + /// method fields except name and no-wait are ignored. A declare + /// with both passive and no-wait has no effect. Arguments are + /// compared for semantic equivalence. + /// + /// The client MAY ask the server to assert that a queue exists + /// without creating the queue if not. If the queue does not + /// exist, the server treats this as a failure. Error code: + /// not-found + /// + /// If not set and the queue exists, the server MUST check that + /// the existing queue has the same values for durable, exclusive, + /// auto-delete, and arguments fields. The server MUST respond + /// with Declare-Ok if the requested queue matches these fields, + /// and MUST raise a channel exception if not. + pub passive: bool, + + /// If set when creating a new queue, the queue will be marked as + /// durable. Durable queues remain active when a server restarts. + /// Non-durable queues (transient queues) are purged if/when a + /// server restarts. Note that durable queues do not necessarily + /// hold persistent messages, although it does not make sense to + /// send persistent messages to a transient queue. + /// + /// The server MUST recreate the durable queue after a restart. + /// + /// The server MUST support both durable and transient queues. + pub durable: bool, + + /// Exclusive queues may only be accessed by the current + /// connection, and are deleted when that connection closes. + /// Passive declaration of an exclusive queue by other connections + /// are not allowed. + /// + /// The server MUST support both exclusive (private) and + /// non-exclusive (shared) queues. + /// The client MAY NOT attempt to use a queue that was declared as + /// exclusive by another still-open connection. Error code: + /// resource-locked + pub exclusive: bool, + + /// If set, the queue is deleted when all consumers have finished + /// using it. The last consumer can be cancelled either explicitly + /// or because its channel is closed. If there was no consumer + /// ever on the queue, it won't be deleted. Applications can + /// explicitly delete auto-delete queues using the Delete method + /// as normal. + /// + /// The server MUST ignore the auto-delete field if the queue + /// already exists. + pub auto_delete: bool, + + /// If set, the server will not respond to the method. The client + /// should not wait for a reply method. If the server could not + /// complete the method it will raise a channel or connection + /// exception. + pub no_wait: bool, +} + +pub trait ChannelExt { + type Error; + + fn declare_exchange( + &mut self, + config: ExchangeConfig, + ) -> impl std::future::Future<Output = Result<(), Self::Error>>; + fn declare_queue( + &mut self, + config: QueueConfig, + ) -> impl std::future::Future<Output = Result<(), Self::Error>>; + fn bind_queue( + &mut self, + config: BindQueueConfig, + ) -> impl std::future::Future<Output = Result<(), Self::Error>>; +} + +pub trait ConsumerExt<'a, C> { + type Error; + type Handle; + + fn consume( + self, + callback: C, + config: ConsumeConfig, + ) -> impl std::future::Future<Output = Result<Self::Handle, Self::Error>>; +} diff --git a/ofborg/tickborg/src/easylapin.rs b/ofborg/tickborg/src/easylapin.rs new file mode 100644 index 0000000000..56d90cad15 --- /dev/null +++ b/ofborg/tickborg/src/easylapin.rs @@ -0,0 +1,251 @@ +use std::pin::Pin; +use std::sync::Arc; + +use crate::config::RabbitMqConfig; +use crate::easyamqp::{ + BindQueueConfig, ChannelExt, ConsumeConfig, ConsumerExt, ExchangeConfig, ExchangeType, + QueueConfig, +}; +use crate::notifyworker::{NotificationReceiver, SimpleNotifyWorker}; +use crate::tickborg; +use crate::worker::{Action, SimpleWorker}; + +use lapin::message::Delivery; +use lapin::options::{ + BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions, + ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions, +}; +use lapin::types::FieldTable; +use lapin::{BasicProperties, Channel, Connection, ConnectionProperties, ExchangeKind}; +use tokio_stream::StreamExt; +use tracing::{debug, trace}; + +pub async fn from_config(cfg: &RabbitMqConfig) -> Result<Connection, lapin::Error> { + let opts = ConnectionProperties::default() + .with_client_property("tickborg_version".into(), tickborg::VERSION.into()); + Connection::connect(&cfg.as_uri()?, opts).await +} + +impl ChannelExt for Channel { + type Error = lapin::Error; + + async fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> { + let opts = ExchangeDeclareOptions { + passive: config.passive, + durable: config.durable, + auto_delete: config.auto_delete, + internal: config.internal, + nowait: config.no_wait, + }; + + let kind = match config.exchange_type { + ExchangeType::Topic => ExchangeKind::Topic, + ExchangeType::Fanout => ExchangeKind::Fanout, + _ => panic!("exchange kind"), + }; + self.exchange_declare(config.exchange.into(), kind, opts, FieldTable::default()) + .await?; + Ok(()) + } + + async fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> { + let opts = QueueDeclareOptions { + passive: config.passive, + durable: config.durable, + exclusive: config.exclusive, + auto_delete: config.auto_delete, + nowait: config.no_wait, + }; + + self.queue_declare(config.queue.into(), opts, FieldTable::default()) + .await?; + Ok(()) + } + + async fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> { + let opts = QueueBindOptions { + nowait: config.no_wait, + }; + + self.queue_bind( + config.queue.into(), + config.exchange.into(), + config.routing_key.unwrap_or_else(|| "".into()).into(), + opts, + FieldTable::default(), + ) + .await?; + Ok(()) + } +} + +impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel { + type Error = lapin::Error; + type Handle = Pin<Box<dyn Future<Output = ()> + 'a>>; + + async fn consume( + self, + mut worker: W, + config: ConsumeConfig, + ) -> Result<Self::Handle, Self::Error> { + let mut consumer = self + .basic_consume( + config.queue.into(), + config.consumer_tag.into(), + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + Ok(Box::pin(async move { + while let Some(Ok(deliver)) = consumer.next().await { + debug!(?deliver.delivery_tag, "consumed delivery"); + let content_type = deliver.properties.content_type(); + let job = worker + .msg_to_job( + deliver.routing_key.as_str(), + &content_type.as_ref().map(|s| s.to_string()), + &deliver.data, + ) + .await + .expect("worker unexpected message consumed"); + + for action in worker.consumer(&job).await { + action_deliver(&self, &deliver, action) + .await + .expect("action deliver failure"); + } + debug!(?deliver.delivery_tag, "done"); + } + })) + } +} + +/// Same as a regular channel, but without prefetching, +/// used for services with multiple instances. +pub struct WorkerChannel(pub Channel); + +impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for WorkerChannel { + type Error = lapin::Error; + type Handle = Pin<Box<dyn Future<Output = ()> + 'a>>; + + async fn consume(self, worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> { + self.0.basic_qos(1, BasicQosOptions::default()).await?; + self.0.consume(worker, config).await + } +} + +pub struct ChannelNotificationReceiver { + channel: lapin::Channel, + deliver: Delivery, +} + +impl ChannelNotificationReceiver { + pub fn new(channel: lapin::Channel, deliver: Delivery) -> Self { + ChannelNotificationReceiver { channel, deliver } + } +} + +#[async_trait::async_trait] +impl NotificationReceiver for ChannelNotificationReceiver { + async fn tell(&self, action: Action) { + action_deliver(&self.channel, &self.deliver, action) + .await + .expect("action deliver failure"); + } +} + +// FIXME the consumer trait for SimpleWorker and SimpleNotifyWorker conflict, +// but one could probably be implemented in terms of the other instead. +pub struct NotifyChannel(pub Channel); + +impl<'a, W: SimpleNotifyWorker + 'a + Send> ConsumerExt<'a, W> for NotifyChannel { + type Error = lapin::Error; + type Handle = Pin<Box<dyn Future<Output = ()> + 'a + Send>>; + + async fn consume(self, worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> { + self.0.basic_qos(1, BasicQosOptions::default()).await?; + + let mut consumer = self + .0 + .basic_consume( + config.queue.into(), + config.consumer_tag.into(), + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + let chan = self.0; + Ok(Box::pin(async move { + while let Some(Ok(deliver)) = consumer.next().await { + let delivery_tag = deliver.delivery_tag; + debug!(?delivery_tag, "consumed delivery"); + let receiver = ChannelNotificationReceiver { + channel: chan.clone(), + deliver, + }; + + let content_type = receiver.deliver.properties.content_type(); + let job = worker + .msg_to_job( + receiver.deliver.routing_key.as_str(), + &content_type.as_ref().map(|s| s.to_string()), + &receiver.deliver.data, + ) + .expect("worker unexpected message consumed"); + + worker.consumer(job, Arc::new(receiver)).await; + debug!(?delivery_tag, "done"); + } + })) + } +} + +async fn action_deliver( + chan: &Channel, + deliver: &Delivery, + action: Action, +) -> Result<(), lapin::Error> { + match action { + Action::Ack => { + debug!(?deliver.delivery_tag, "action ack"); + chan.basic_ack(deliver.delivery_tag, BasicAckOptions::default()) + .await + } + Action::NackRequeue => { + debug!(?deliver.delivery_tag, "action nack requeue"); + let opts = BasicNackOptions { + requeue: true, + ..Default::default() + }; + chan.basic_nack(deliver.delivery_tag, opts).await + } + Action::NackDump => { + debug!(?deliver.delivery_tag, "action nack dump"); + chan.basic_nack(deliver.delivery_tag, BasicNackOptions::default()) + .await + } + Action::Publish(msg) => { + let exch = msg.exchange.as_deref().unwrap_or(""); + let key = msg.routing_key.as_deref().unwrap_or(""); + trace!(?exch, ?key, "action publish"); + + let mut props = BasicProperties::default().with_delivery_mode(2); // persistent. + + if let Some(s) = msg.content_type.as_deref() { + props = props.with_content_type(s.into()); + } + + let _confirmaton = chan + .basic_publish( + exch.into(), + key.into(), + BasicPublishOptions::default(), + &msg.content, + props, + ) + .await? + .await?; + Ok(()) + } + } +} diff --git a/ofborg/tickborg/src/evalchecker.rs b/ofborg/tickborg/src/evalchecker.rs new file mode 100644 index 0000000000..ac1b4f8d39 --- /dev/null +++ b/ofborg/tickborg/src/evalchecker.rs @@ -0,0 +1,62 @@ +use std::fs::File; +use std::io::Write; +use std::path::Path; +use std::process::Command; + +/// A generic check that can be run against a checkout +pub struct EvalChecker { + name: String, + command: String, + args: Vec<String>, +} + +impl EvalChecker { + pub fn new(name: &str, command: &str, args: Vec<String>) -> EvalChecker { + EvalChecker { + name: name.to_owned(), + command: command.to_owned(), + args, + } + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn execute(&self, path: &Path) -> Result<File, File> { + let output = Command::new(&self.command) + .args(&self.args) + .current_dir(path) + .output(); + + let tmp = tempfile::NamedTempFile::new().expect("Failed to create temp file"); + let tmp_path = tmp.into_temp_path().to_path_buf(); + + match output { + Ok(result) => { + let mut f = File::create(&tmp_path).expect("Failed to create output file"); + f.write_all(&result.stdout).ok(); + f.write_all(&result.stderr).ok(); + drop(f); + let file = File::open(&tmp_path).expect("Failed to open output file"); + if result.status.success() { + Ok(file) + } else { + Err(file) + } + } + Err(e) => { + let mut f = File::create(&tmp_path).expect("Failed to create output file"); + write!(f, "Failed to execute {}: {}", self.command, e).ok(); + drop(f); + Err(File::open(&tmp_path).expect("Failed to open output file")) + } + } + } + + pub fn cli_cmd(&self) -> String { + let mut cli = vec![self.command.clone()]; + cli.append(&mut self.args.clone()); + cli.join(" ") + } +} diff --git a/ofborg/tickborg/src/files.rs b/ofborg/tickborg/src/files.rs new file mode 100644 index 0000000000..9e329d83e5 --- /dev/null +++ b/ofborg/tickborg/src/files.rs @@ -0,0 +1,8 @@ +use std::fs::File; +use std::io::Read; + +pub fn file_to_str(f: &mut File) -> String { + let mut buffer = Vec::new(); + f.read_to_end(&mut buffer).expect("Reading eval output"); + String::from(String::from_utf8_lossy(&buffer)) +} diff --git a/ofborg/tickborg/src/ghevent/common.rs b/ofborg/tickborg/src/ghevent/common.rs new file mode 100644 index 0000000000..2079280b9c --- /dev/null +++ b/ofborg/tickborg/src/ghevent/common.rs @@ -0,0 +1,31 @@ +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct Comment { + pub body: String, + pub user: User, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct User { + pub login: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct Repository { + pub owner: User, + pub name: String, + pub full_name: String, + pub clone_url: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct Issue { + pub number: u64, +} + +/// A generic webhook that we received with minimal verification, only for handling in the GitHub +/// webhook receiver. +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct GenericWebhook { + /// The repository the event originated + pub repository: Repository, +} diff --git a/ofborg/tickborg/src/ghevent/issuecomment.rs b/ofborg/tickborg/src/ghevent/issuecomment.rs new file mode 100644 index 0000000000..32fe136722 --- /dev/null +++ b/ofborg/tickborg/src/ghevent/issuecomment.rs @@ -0,0 +1,19 @@ +use crate::ghevent::{Comment, Issue, Repository}; + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct IssueComment { + pub action: IssueCommentAction, + pub comment: Comment, + pub repository: Repository, + pub issue: Issue, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum IssueCommentAction { + Created, + Pinned, + Unpinned, + Edited, + Deleted, +} diff --git a/ofborg/tickborg/src/ghevent/mod.rs b/ofborg/tickborg/src/ghevent/mod.rs new file mode 100644 index 0000000000..243758800a --- /dev/null +++ b/ofborg/tickborg/src/ghevent/mod.rs @@ -0,0 +1,9 @@ +mod common; +mod issuecomment; +mod pullrequestevent; + +pub use self::common::{Comment, GenericWebhook, Issue, Repository, User}; +pub use self::issuecomment::{IssueComment, IssueCommentAction}; +pub use self::pullrequestevent::{ + PullRequest, PullRequestAction, PullRequestEvent, PullRequestState, +}; diff --git a/ofborg/tickborg/src/ghevent/pullrequestevent.rs b/ofborg/tickborg/src/ghevent/pullrequestevent.rs new file mode 100644 index 0000000000..3f25201fe9 --- /dev/null +++ b/ofborg/tickborg/src/ghevent/pullrequestevent.rs @@ -0,0 +1,81 @@ +use crate::ghevent::Repository; + +#[derive(serde::Serialize, serde::Deserialize)] +pub struct PullRequestEvent { + pub action: PullRequestAction, + pub number: u64, + pub repository: Repository, + pub pull_request: PullRequest, + pub changes: Option<PullRequestChanges>, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct PullRequestChanges { + pub base: Option<BaseChange>, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct BaseChange { + #[serde(rename = "ref")] + pub git_ref: ChangeWas, + pub sha: ChangeWas, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)] +pub struct ChangeWas { + pub from: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum PullRequestState { + Open, + Closed, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum PullRequestAction { + Edited, + Opened, + Reopened, + Synchronize, + #[serde(other)] + Unknown, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct PullRequestRef { + #[serde(rename = "ref")] + pub git_ref: String, + pub sha: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct PullRequest { + pub state: PullRequestState, + pub base: PullRequestRef, + pub head: PullRequestRef, +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json; + + #[test] + fn test_parse_changed_base() { + let data = include_str!("../../test-srcs/events/pr-changed-base.json"); + + let pr: PullRequestEvent = serde_json::from_str(data).expect("Should properly deserialize"); + assert_eq!(pr.action, PullRequestAction::Edited); + } + + #[test] + fn test_parse_unknown_action() { + let data = include_str!("../../test-srcs/events/pr-converted-to-draft.json"); + + let pr: PullRequestEvent = serde_json::from_str(data).expect("Should properly deserialize"); + assert_eq!(pr.action, PullRequestAction::Unknown); + } +} diff --git a/ofborg/tickborg/src/lib.rs b/ofborg/tickborg/src/lib.rs new file mode 100644 index 0000000000..313ff34ebd --- /dev/null +++ b/ofborg/tickborg/src/lib.rs @@ -0,0 +1,103 @@ +#![recursion_limit = "512"] +// Replacing .map(|arch| arch.to_string()) +// with .map(systems::System::to_string) +// +// seems much less clear and I just don't like it :) +#![allow(clippy::redundant_closure)] + +use std::env; + +use tracing_subscriber::EnvFilter; +use tracing_subscriber::prelude::*; + +pub mod acl; +pub mod asynccmd; +pub mod buildtool; +pub mod checkout; +pub mod clone; +pub mod commentparser; +pub mod commitstatus; +pub mod config; +pub mod easyamqp; +pub mod easylapin; +pub mod evalchecker; +pub mod files; +pub mod ghevent; +pub mod locks; +pub mod message; +pub mod notifyworker; +pub mod stats; +pub mod systems; +pub mod tagger; +pub mod tasks; +pub mod test_scratch; +pub mod worker; +pub mod writetoline; + +pub mod tickborg { + pub use crate::acl; + pub use crate::asynccmd; + pub use crate::buildtool; + pub use crate::checkout; + pub use crate::clone; + pub use crate::commentparser; + pub use crate::commitstatus; + pub use crate::config; + pub use crate::easyamqp; + pub use crate::evalchecker; + pub use crate::files; + pub use crate::ghevent; + pub use crate::locks; + pub use crate::message; + pub use crate::notifyworker; + pub use crate::stats; + pub use crate::systems; + pub use crate::tagger; + pub use crate::tasks; + pub use crate::test_scratch; + pub use crate::worker; + pub use crate::writetoline; + + pub const VERSION: &str = env!("CARGO_PKG_VERSION"); + + pub fn partition_result<A, B>(results: Vec<Result<A, B>>) -> (Vec<A>, Vec<B>) { + let mut ok = Vec::new(); + let mut err = Vec::new(); + for result in results.into_iter() { + match result { + Ok(x) => { + ok.push(x); + } + Err(x) => { + err.push(x); + } + } + } + + (ok, err) + } +} + +pub fn setup_log() { + let filter_layer = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("info")) + .unwrap(); + + let log_json = env::var("RUST_LOG_JSON").is_ok_and(|s| s == "1"); + + if log_json { + let fmt_layer = tracing_subscriber::fmt::layer().json(); + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt_layer) + .init(); + } else { + let fmt_layer = tracing_subscriber::fmt::layer(); + tracing_subscriber::registry() + .with(filter_layer) + .with(fmt_layer) + .init(); + } + + tracing::info!("Logging configured"); +} diff --git a/ofborg/tickborg/src/locks.rs b/ofborg/tickborg/src/locks.rs new file mode 100644 index 0000000000..d1d2ee4788 --- /dev/null +++ b/ofborg/tickborg/src/locks.rs @@ -0,0 +1,25 @@ +use fs2::FileExt; + +use std::fs; +use std::io::Error; +use std::path::PathBuf; + +pub trait Lockable { + fn lock_path(&self) -> PathBuf; + + fn lock(&self) -> Result<Lock, Error> { + let lock = fs::File::create(self.lock_path())?; + lock.lock_exclusive()?; + Ok(Lock { lock: Some(lock) }) + } +} + +pub struct Lock { + lock: Option<fs::File>, +} + +impl Lock { + pub fn unlock(&mut self) { + self.lock = None + } +} diff --git a/ofborg/tickborg/src/maintainers.nix b/ofborg/tickborg/src/maintainers.nix new file mode 100644 index 0000000000..85dc8d85b4 --- /dev/null +++ b/ofborg/tickborg/src/maintainers.nix @@ -0,0 +1,118 @@ +{ changedattrsjson, changedpathsjson }: +let + pkgs = import ./. {}; + + changedattrs = builtins.fromJSON (builtins.readFile changedattrsjson); + changedpaths = builtins.fromJSON (builtins.readFile changedpathsjson); + + anyMatchingFile = filename: + let + matching = builtins.filter + (changed: pkgs.lib.strings.hasSuffix changed filename) + changedpaths; + in (builtins.length matching) > 0; + + anyMatchingFiles = files: + (builtins.length (builtins.filter anyMatchingFile files)) > 0; + + enrichedAttrs = builtins.map + (path: { + path = path; + name = builtins.concatStringsSep "." path; + }) + changedattrs; + + validPackageAttributes = builtins.filter + (pkg: + if (pkgs.lib.attrsets.hasAttrByPath pkg.path pkgs) + then (if (builtins.tryEval (pkgs.lib.attrsets.attrByPath pkg.path null pkgs)).success + then true + else builtins.trace "Failed to access ${pkg.name} even though it exists" false) + else builtins.trace "Failed to locate ${pkg.name}." false + ) + enrichedAttrs; + + attrsWithPackages = builtins.map + (pkg: pkg // { package = pkgs.lib.attrsets.attrByPath pkg.path null pkgs; }) + validPackageAttributes; + + attrsWithMaintainers = builtins.map + (pkg: pkg // { maintainers = (pkg.package.meta or {}).maintainers or []; }) + attrsWithPackages; + + attrsWeCanPing = builtins.filter + (pkg: if (builtins.length pkg.maintainers) > 0 + then true + else builtins.trace "Package has no maintainers: ${pkg.name}" false + ) + attrsWithMaintainers; + + relevantFilenames = drv: + (pkgs.lib.lists.unique + (builtins.map + (pos: pos.file) + (builtins.filter (x: x != null) + [ + (builtins.unsafeGetAttrPos "maintainers" (drv.meta or {})) + (builtins.unsafeGetAttrPos "src" drv) + # broken because name is always set by stdenv: + # # A hack to make `nix-env -qa` and `nix search` ignore broken packages. + # # TODO(@oxij): remove this assert when something like NixOS/nix#1771 gets merged into nix. + # name = assert validity.handled; name + lib.optionalString + #(builtins.unsafeGetAttrPos "name" drv) + (builtins.unsafeGetAttrPos "pname" drv) + (builtins.unsafeGetAttrPos "version" drv) + + # Use ".meta.position" for cases when most of the package is + # defined in a "common" section and the only place where + # reference to the file with a derivation the "pos" + # attribute. + # + # ".meta.position" has the following form: + # "pkgs/tools/package-management/nix/default.nix:155" + # We transform it to the following: + # { file = "pkgs/tools/package-management/nix/default.nix"; } + { file = pkgs.lib.head (pkgs.lib.splitString ":" (drv.meta.position or "")); } + ] + ))); + + attrsWithFilenames = builtins.map + (pkg: pkg // { filenames = relevantFilenames pkg.package; }) + attrsWithMaintainers; + + attrsWithModifiedFiles = builtins.filter + (pkg: anyMatchingFiles pkg.filenames) + attrsWithFilenames; + + listToPing = pkgs.lib.lists.flatten + (builtins.map + (pkg: + builtins.map (maintainer: { + handle = pkgs.lib.toLower maintainer.github; + packageName = pkg.name; + dueToFiles = pkg.filenames; + }) + pkg.maintainers + ) + attrsWithModifiedFiles); + + byMaintainer = pkgs.lib.lists.foldr + (ping: collector: collector // { "${ping.handle}" = [ { inherit (ping) packageName dueToFiles; } ] ++ (collector."${ping.handle}" or []); }) + {} + listToPing; + + textForPackages = packages: + pkgs.lib.strings.concatStringsSep ", " ( + builtins.map (pkg: pkg.packageName) + packages); + + textPerMaintainer = pkgs.lib.attrsets.mapAttrs + (maintainer: packages: "- @${maintainer} for ${textForPackages packages}") + byMaintainer; + + packagesPerMaintainer = pkgs.lib.attrsets.mapAttrs + (maintainer: packages: + builtins.map (pkg: pkg.packageName) + packages) + byMaintainer; +in packagesPerMaintainer diff --git a/ofborg/tickborg/src/maintainers.rs b/ofborg/tickborg/src/maintainers.rs new file mode 100644 index 0000000000..ff1bec0cee --- /dev/null +++ b/ofborg/tickborg/src/maintainers.rs @@ -0,0 +1,211 @@ +use crate::nix::Nix; + +use tempfile::NamedTempFile; + +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use std::path::Path; + +#[derive(serde::Deserialize, Debug, Eq, PartialEq)] +pub struct ImpactedMaintainers(HashMap<Maintainer, Vec<Package>>); +pub struct MaintainersByPackage(pub HashMap<Package, HashSet<Maintainer>>); + +#[derive(serde::Deserialize, Clone, Debug, Eq, PartialEq, Hash)] +pub struct Maintainer(String); +impl<'a> From<&'a str> for Maintainer { + fn from(name: &'a str) -> Maintainer { + Maintainer(name.to_ascii_lowercase()) + } +} +#[derive(serde::Deserialize, Clone, Debug, Eq, PartialEq, Hash)] +pub struct Package(String); +impl<'a> From<&'a str> for Package { + fn from(name: &'a str) -> Package { + Package(name.to_owned()) + } +} + +#[derive(Debug)] +pub enum CalculationError { + DeserializeError(serde_json::Error), + Io(std::io::Error), + Utf8(std::string::FromUtf8Error), +} +impl From<serde_json::Error> for CalculationError { + fn from(e: serde_json::Error) -> CalculationError { + CalculationError::DeserializeError(e) + } +} +impl From<std::io::Error> for CalculationError { + fn from(e: std::io::Error) -> CalculationError { + CalculationError::Io(e) + } +} +impl From<std::string::FromUtf8Error> for CalculationError { + fn from(e: std::string::FromUtf8Error) -> CalculationError { + CalculationError::Utf8(e) + } +} + +impl ImpactedMaintainers { + pub fn calculate( + nix: &Nix, + checkout: &Path, + paths: &[String], + attributes: &[Vec<&str>], + ) -> Result<ImpactedMaintainers, CalculationError> { + let mut path_file = NamedTempFile::new()?; + let pathstr = serde_json::to_string(&paths)?; + write!(path_file, "{pathstr}")?; + + let mut attr_file = NamedTempFile::new()?; + let attrstr = serde_json::to_string(&attributes)?; + write!(attr_file, "{attrstr}")?; + + let mut argstrs: HashMap<&str, &str> = HashMap::new(); + argstrs.insert("changedattrsjson", attr_file.path().to_str().unwrap()); + argstrs.insert("changedpathsjson", path_file.path().to_str().unwrap()); + + let mut cmd = nix.safely_evaluate_expr_cmd( + checkout, + include_str!("./maintainers.nix"), + argstrs, + &[path_file.path(), attr_file.path()], + ); + + let ret = cmd.output()?; + + Ok(serde_json::from_str(&String::from_utf8(ret.stdout)?)?) + } + + pub fn maintainers(&self) -> Vec<&str> { + self.0 + .keys() + .map(|Maintainer(name)| name.as_str()) + .collect() + } + + pub fn maintainers_by_package(&self) -> MaintainersByPackage { + let mut bypkg = MaintainersByPackage(HashMap::new()); + + for (maintainer, packages) in self.0.iter() { + for package in packages.iter() { + bypkg + .0 + .entry(package.clone()) + .or_default() + .insert(maintainer.clone()); + } + } + + bypkg + } +} + +impl std::fmt::Display for ImpactedMaintainers { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let mut is_first = true; + for (Maintainer(maintainer), packages) in &self.0 { + if is_first { + is_first = false; + } else { + f.write_str("\n")?; + } + + f.write_fmt(format_args!("{maintainer}"))?; + + let (first, rest) = { + let mut packages = packages.iter(); + (packages.next(), packages) + }; + if let Some(Package(package)) = first { + f.write_fmt(format_args!(": {package}"))?; + + for Package(package) in rest { + f.write_fmt(format_args!(", {package}"))?; + } + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::checkout::cached_cloner; + use crate::clone::GitClonable; + use crate::test_scratch::TestScratch; + use std::env; + use std::ffi::OsStr; + use std::path::{Path, PathBuf}; + use std::process::Command; + use std::process::Stdio; + + #[cfg(target_os = "linux")] + const SYSTEM: &str = "x86_64-linux"; + #[cfg(target_os = "macos")] + const SYSTEM: &str = "x86_64-darwin"; + + fn tpath(component: &str) -> PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")).join(component) + } + + fn make_pr_repo(bare: &Path, co: &Path) -> String { + let output = Command::new("bash") + .current_dir(tpath("./test-srcs")) + .arg("./make-maintainer-pr.sh") + .arg(bare) + .arg(co) + .stdout(Stdio::piped()) + .output() + .expect("building the test PR failed"); + + let stderr = + String::from_utf8(output.stderr).unwrap_or_else(|err| format!("warning: {err}")); + println!("{stderr}"); + + let hash = String::from_utf8(output.stdout).expect("Should just be a hash"); + hash.trim().to_owned() + } + + #[test] + fn example() { + let workingdir = TestScratch::new_dir("test-maintainers-example"); + + let bare = TestScratch::new_dir("test-maintainers-example-bare"); + let mk_co = TestScratch::new_dir("test-maintainers-example-co"); + let hash = make_pr_repo(&bare.path(), &mk_co.path()); + + let attributes = vec![vec!["foo", "bar", "packageA"]]; + + let cloner = cached_cloner(&workingdir.path()); + let project = cloner.project("maintainer-test", bare.string()); + + let working_co = project + .clone_for("testing-maintainer-list".to_owned(), "123".to_owned()) + .expect("clone should work"); + + working_co + .checkout_origin_ref(OsStr::new("master")) + .unwrap(); + + let paths = working_co.files_changed_from_head(&hash).unwrap(); + + working_co.checkout_ref(OsStr::new(&hash)).unwrap(); + + let remote = env::var("NIX_REMOTE").unwrap_or("".to_owned()); + let nix = Nix::new(SYSTEM.to_owned(), remote, 1800, None); + + let parsed = + ImpactedMaintainers::calculate(&nix, &working_co.clone_to(), &paths, &attributes); + + let mut expect = ImpactedMaintainers(HashMap::new()); + expect.0.insert( + Maintainer::from("test"), + vec![Package::from("foo.bar.packageA")], + ); + + assert_eq!(parsed.unwrap(), expect); + } +} diff --git a/ofborg/tickborg/src/message/buildjob.rs b/ofborg/tickborg/src/message/buildjob.rs new file mode 100644 index 0000000000..b09eae58bf --- /dev/null +++ b/ofborg/tickborg/src/message/buildjob.rs @@ -0,0 +1,55 @@ +use crate::commentparser::Subset; +use crate::message::{Pr, Repo}; + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct BuildJob { + pub repo: Repo, + pub pr: Pr, + pub subset: Option<Subset>, + pub attrs: Vec<String>, + pub request_id: String, + pub logs: Option<ExchangeQueue>, // (Exchange, Routing Key) + pub statusreport: Option<ExchangeQueue>, // (Exchange, Routing Key) +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct QueuedBuildJobs { + pub job: BuildJob, + pub architectures: Vec<String>, +} + +pub type ExchangeQueue = (Option<Exchange>, Option<RoutingKey>); +type Exchange = String; +type RoutingKey = String; + +impl BuildJob { + pub fn new( + repo: Repo, + pr: Pr, + subset: Subset, + attrs: Vec<String>, + logs: Option<ExchangeQueue>, + statusreport: Option<ExchangeQueue>, + request_id: String, + ) -> BuildJob { + let logbackrk = format!("{}.{}", repo.full_name.to_lowercase(), pr.number); + + BuildJob { + repo, + pr, + subset: Some(subset), + attrs, + logs: Some(logs.unwrap_or((Some("logs".to_owned()), Some(logbackrk)))), + statusreport: Some(statusreport.unwrap_or((Some("build-results".to_owned()), None))), + request_id, + } + } +} + +pub fn from(data: &[u8]) -> Result<BuildJob, serde_json::error::Error> { + serde_json::from_slice(data) +} + +pub struct Actions { + pub system: String, +} diff --git a/ofborg/tickborg/src/message/buildlogmsg.rs b/ofborg/tickborg/src/message/buildlogmsg.rs new file mode 100644 index 0000000000..1aed51829e --- /dev/null +++ b/ofborg/tickborg/src/message/buildlogmsg.rs @@ -0,0 +1,17 @@ +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub struct BuildLogMsg { + pub system: String, + pub identity: String, + pub attempt_id: String, + pub line_number: u64, + pub output: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub struct BuildLogStart { + pub system: String, + pub identity: String, + pub attempt_id: String, + pub attempted_attrs: Option<Vec<String>>, + pub skipped_attrs: Option<Vec<String>>, +} diff --git a/ofborg/tickborg/src/message/buildresult.rs b/ofborg/tickborg/src/message/buildresult.rs new file mode 100644 index 0000000000..122edacae3 --- /dev/null +++ b/ofborg/tickborg/src/message/buildresult.rs @@ -0,0 +1,225 @@ +use crate::message::{Pr, Repo}; + +use hubcaps::checks::Conclusion; + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +pub enum BuildStatus { + Skipped, + Success, + Failure, + TimedOut, + HashMismatch, + UnexpectedError { err: String }, +} + +impl From<BuildStatus> for String { + fn from(status: BuildStatus) -> String { + match status { + BuildStatus::Skipped => "No attempt".into(), + BuildStatus::Success => "Success".into(), + BuildStatus::Failure => "Failure".into(), + BuildStatus::HashMismatch => "A fixed output derivation's hash was incorrect".into(), + BuildStatus::TimedOut => "Timed out, unknown build status".into(), + BuildStatus::UnexpectedError { ref err } => format!("Unexpected error: {err}"), + } + } +} + +impl From<BuildStatus> for Conclusion { + fn from(status: BuildStatus) -> Conclusion { + match status { + BuildStatus::Skipped => Conclusion::Skipped, + BuildStatus::Success => Conclusion::Success, + BuildStatus::Failure => Conclusion::Neutral, + BuildStatus::HashMismatch => Conclusion::Failure, + BuildStatus::TimedOut => Conclusion::Neutral, + BuildStatus::UnexpectedError { .. } => Conclusion::Neutral, + } + } +} + +pub struct LegacyBuildResult { + pub repo: Repo, + pub pr: Pr, + pub system: String, + pub output: Vec<String>, + pub attempt_id: String, + pub request_id: String, + pub status: BuildStatus, + pub skipped_attrs: Option<Vec<String>>, + pub attempted_attrs: Option<Vec<String>>, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub enum V1Tag { + V1, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +#[serde(untagged)] +pub enum BuildResult { + V1 { + tag: V1Tag, // use serde once all enum variants have a tag + repo: Repo, + pr: Pr, + system: String, + output: Vec<String>, + attempt_id: String, + request_id: String, + // removed success + status: BuildStatus, + skipped_attrs: Option<Vec<String>>, + attempted_attrs: Option<Vec<String>>, + }, + Legacy { + repo: Repo, + pr: Pr, + system: String, + output: Vec<String>, + attempt_id: String, + request_id: String, + success: Option<bool>, // replaced by status + status: Option<BuildStatus>, + skipped_attrs: Option<Vec<String>>, + attempted_attrs: Option<Vec<String>>, + }, +} + +impl BuildResult { + pub fn legacy(&self) -> LegacyBuildResult { + // TODO: replace this with simpler structs for specific usecases, since + // it's decouples the structs from serialization. These can be changed + // as long as we can translate all enum variants. + match *self { + BuildResult::Legacy { + ref repo, + ref pr, + ref system, + ref output, + ref attempt_id, + ref request_id, + ref attempted_attrs, + ref skipped_attrs, + .. + } => LegacyBuildResult { + repo: repo.to_owned(), + pr: pr.to_owned(), + system: system.to_owned(), + output: output.to_owned(), + attempt_id: attempt_id.to_owned(), + request_id: request_id.to_owned(), + status: self.status(), + attempted_attrs: attempted_attrs.to_owned(), + skipped_attrs: skipped_attrs.to_owned(), + }, + BuildResult::V1 { + ref repo, + ref pr, + ref system, + ref output, + ref attempt_id, + ref request_id, + ref attempted_attrs, + ref skipped_attrs, + .. + } => LegacyBuildResult { + repo: repo.to_owned(), + pr: pr.to_owned(), + system: system.to_owned(), + output: output.to_owned(), + attempt_id: attempt_id.to_owned(), + request_id: request_id.to_owned(), + status: self.status(), + attempted_attrs: attempted_attrs.to_owned(), + skipped_attrs: skipped_attrs.to_owned(), + }, + } + } + + pub fn pr(&self) -> Pr { + match self { + BuildResult::Legacy { pr, .. } => pr.to_owned(), + BuildResult::V1 { pr, .. } => pr.to_owned(), + } + } + + pub fn status(&self) -> BuildStatus { + match *self { + BuildResult::Legacy { + ref status, + ref success, + .. + } => status.to_owned().unwrap_or({ + // Fallback for old format. + match *success { + None => BuildStatus::Skipped, + Some(true) => BuildStatus::Success, + Some(false) => BuildStatus::Failure, + } + }), + BuildResult::V1 { ref status, .. } => status.to_owned(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json; + + #[test] + fn v1_serialization() { + let input = r#"{"tag":"V1","repo":{"owner":"project-tick","name":"Project-Tick","full_name":"project-tick/Project-Tick","clone_url":"https://github.com/project-tick/Project-Tick.git"},"pr":{"target_branch":"master","number":42,"head_sha":"0000000000000000000000000000000000000000"},"system":"x86_64-linux","output":["unpacking sources"],"attempt_id":"attempt-id-foo","request_id":"bogus-request-id","status":"Success","skipped_attrs":["AAAAAASomeThingsFailToEvaluate"],"attempted_attrs":["hello"]}"#; + let result: BuildResult = serde_json::from_str(input).expect("result required"); + assert_eq!(result.status(), BuildStatus::Success); + let output = serde_json::to_string(&result).expect("json required"); + assert_eq!( + output, + r#"{"tag":"V1","repo":{"owner":"project-tick","name":"Project-Tick","full_name":"project-tick/Project-Tick","clone_url":"https://github.com/project-tick/Project-Tick.git"},"pr":{"target_branch":"master","number":42,"head_sha":"0000000000000000000000000000000000000000"},"system":"x86_64-linux","output":["unpacking sources"],"attempt_id":"attempt-id-foo","request_id":"bogus-request-id","status":"Success","skipped_attrs":["AAAAAASomeThingsFailToEvaluate"],"attempted_attrs":["hello"]}"#, + "json of: {:?}", + result + ); + } + + #[test] + fn legacy_serialization() { + let input = r#"{"repo":{"owner":"project-tick","name":"Project-Tick","full_name":"project-tick/Project-Tick","clone_url":"https://github.com/project-tick/Project-Tick.git"},"pr":{"target_branch":"master","number":42,"head_sha":"0000000000000000000000000000000000000000"},"system":"x86_64-linux","output":["unpacking sources"],"attempt_id":"attempt-id-foo","request_id":"bogus-request-id","success":true,"status":"Success","skipped_attrs":["AAAAAASomeThingsFailToEvaluate"],"attempted_attrs":["hello"]}"#; + let result: BuildResult = serde_json::from_str(input).expect("result required"); + assert_eq!(result.status(), BuildStatus::Success); + let output = serde_json::to_string(&result).expect("json required"); + assert_eq!( + output, + r#"{"repo":{"owner":"project-tick","name":"Project-Tick","full_name":"project-tick/Project-Tick","clone_url":"https://github.com/project-tick/Project-Tick.git"},"pr":{"target_branch":"master","number":42,"head_sha":"0000000000000000000000000000000000000000"},"system":"x86_64-linux","output":["unpacking sources"],"attempt_id":"attempt-id-foo","request_id":"bogus-request-id","success":true,"status":"Success","skipped_attrs":["AAAAAASomeThingsFailToEvaluate"],"attempted_attrs":["hello"]}"#, + "json of: {:?}", + result + ); + } + + #[test] + fn legacy_none_serialization() { + let input = r#"{"repo":{"owner":"project-tick","name":"Project-Tick","full_name":"project-tick/Project-Tick","clone_url":"https://github.com/project-tick/Project-Tick.git"},"pr":{"target_branch":"master","number":42,"head_sha":"0000000000000000000000000000000000000000"},"system":"x86_64-linux","output":[],"attempt_id":"attempt-id-foo","request_id":"bogus-request-id"}"#; + let result: BuildResult = serde_json::from_str(input).expect("result required"); + assert_eq!(result.status(), BuildStatus::Skipped); + let output = serde_json::to_string(&result).expect("json required"); + assert_eq!( + output, + r#"{"repo":{"owner":"project-tick","name":"Project-Tick","full_name":"project-tick/Project-Tick","clone_url":"https://github.com/project-tick/Project-Tick.git"},"pr":{"target_branch":"master","number":42,"head_sha":"0000000000000000000000000000000000000000"},"system":"x86_64-linux","output":[],"attempt_id":"attempt-id-foo","request_id":"bogus-request-id","success":null,"status":null,"skipped_attrs":null,"attempted_attrs":null}"#, + "json of: {:?}", + result + ); + } + + #[test] + fn legacy_no_status_serialization() { + let input = r#"{"repo":{"owner":"project-tick","name":"Project-Tick","full_name":"project-tick/Project-Tick","clone_url":"https://github.com/project-tick/Project-Tick.git"},"pr":{"target_branch":"master","number":42,"head_sha":"0000000000000000000000000000000000000000"},"system":"x86_64-linux","output":["unpacking sources"],"attempt_id":"attempt-id-foo","request_id":"bogus-request-id","success":true,"status":null,"skipped_attrs":["AAAAAASomeThingsFailToEvaluate"],"attempted_attrs":["hello"]}"#; + let result: BuildResult = serde_json::from_str(input).expect("result required"); + assert_eq!(result.status(), BuildStatus::Success); + let output = serde_json::to_string(&result).expect("json required"); + assert_eq!( + output, + r#"{"repo":{"owner":"project-tick","name":"Project-Tick","full_name":"project-tick/Project-Tick","clone_url":"https://github.com/project-tick/Project-Tick.git"},"pr":{"target_branch":"master","number":42,"head_sha":"0000000000000000000000000000000000000000"},"system":"x86_64-linux","output":["unpacking sources"],"attempt_id":"attempt-id-foo","request_id":"bogus-request-id","success":true,"status":null,"skipped_attrs":["AAAAAASomeThingsFailToEvaluate"],"attempted_attrs":["hello"]}"#, + "json of: {:?}", + result + ); + } +} diff --git a/ofborg/tickborg/src/message/common.rs b/ofborg/tickborg/src/message/common.rs new file mode 100644 index 0000000000..c8fcd16ea2 --- /dev/null +++ b/ofborg/tickborg/src/message/common.rs @@ -0,0 +1,14 @@ +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub struct Repo { + pub owner: String, + pub name: String, + pub full_name: String, + pub clone_url: String, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub struct Pr { + pub target_branch: Option<String>, + pub number: u64, + pub head_sha: String, +} diff --git a/ofborg/tickborg/src/message/evaluationjob.rs b/ofborg/tickborg/src/message/evaluationjob.rs new file mode 100644 index 0000000000..bd51546e4a --- /dev/null +++ b/ofborg/tickborg/src/message/evaluationjob.rs @@ -0,0 +1,29 @@ +use crate::message::{Pr, Repo}; +use crate::worker; + +pub fn from(data: &[u8]) -> Result<EvaluationJob, serde_json::error::Error> { + serde_json::from_slice(data) +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct EvaluationJob { + pub repo: Repo, + pub pr: Pr, +} + +pub struct Actions {} + +impl Actions { + pub fn retry_later(&mut self, _job: &EvaluationJob) -> worker::Actions { + vec![worker::Action::NackRequeue] + } + + pub fn skip(&mut self, _job: &EvaluationJob) -> worker::Actions { + vec![worker::Action::Ack] + } + + pub fn done(&mut self, _job: &EvaluationJob, mut response: worker::Actions) -> worker::Actions { + response.push(worker::Action::Ack); + response + } +} diff --git a/ofborg/tickborg/src/message/mod.rs b/ofborg/tickborg/src/message/mod.rs new file mode 100644 index 0000000000..03551cd1ce --- /dev/null +++ b/ofborg/tickborg/src/message/mod.rs @@ -0,0 +1,7 @@ +pub mod buildjob; +pub mod buildlogmsg; +pub mod buildresult; +mod common; +pub mod evaluationjob; + +pub use self::common::{Pr, Repo}; diff --git a/ofborg/tickborg/src/nix.rs b/ofborg/tickborg/src/nix.rs new file mode 100644 index 0000000000..77aece6d6f --- /dev/null +++ b/ofborg/tickborg/src/nix.rs @@ -0,0 +1,893 @@ +use crate::asynccmd::{AsyncCmd, SpawnedAsyncCmd}; +use crate::message::buildresult::BuildStatus; +use crate::ofborg::partition_result; + +use std::collections::HashMap; +use std::env; +use std::ffi::OsStr; +use std::fmt; +use std::fs; +use std::io::{BufRead, BufReader, Seek, SeekFrom}; +use std::path::Path; +use std::process::{Command, Stdio}; + +use tempfile::tempfile; + +#[allow(clippy::upper_case_acronyms)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum File { + DefaultNixpkgs, + ReleaseNixOS, +} + +impl fmt::Display for File { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + File::DefaultNixpkgs => write!(f, "./default.nix"), + File::ReleaseNixOS => write!(f, "./nixos/release.nix"), + } + } +} + +#[derive(Clone, Debug)] +pub enum Operation { + Evaluate, + Instantiate, + Build, + QueryPackagesJson, + QueryPackagesOutputs, + NoOp { operation: Box<Operation> }, + Unknown { program: String }, +} + +impl Operation { + fn command(&self) -> Command { + match *self { + Operation::Evaluate => Command::new("nix-instantiate"), + Operation::Instantiate => Command::new("nix-instantiate"), + Operation::Build => Command::new("nix-build"), + Operation::QueryPackagesJson => Command::new("nix-env"), + Operation::QueryPackagesOutputs => Command::new("nix-env"), + Operation::NoOp { .. } => Command::new("echo"), + Operation::Unknown { ref program } => Command::new(program), + } + } + + fn args(&self, command: &mut Command) { + match *self { + Operation::Build => { + command.args([ + "--no-out-link", + "--keep-going", + "--option", + "extra-experimental-features", + "no-url-literals", + ]); + } + Operation::QueryPackagesJson => { + command.args([ + "--query", + "--available", + "--json", + "--option", + "extra-experimental-features", + "no-url-literals", + ]); + } + Operation::QueryPackagesOutputs => { + command.args([ + "--query", + "--available", + "--no-name", + "--attr-path", + "--out-path", + "--option", + "extra-experimental-features", + "no-url-literals", + ]); + } + Operation::NoOp { ref operation } => { + operation.args(command); + } + Operation::Evaluate => { + command.args([ + "--eval", + "--strict", + "--json", + "--option", + "extra-experimental-features", + "no-url-literals", + ]); + } + Operation::Instantiate => { + command.args(["--option", "extra-experimental-features", "no-url-literals"]); + } + _ => (), + }; + } +} + +impl fmt::Display for Operation { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Operation::Build => write!(f, "nix-build"), + Operation::Instantiate => write!(f, "nix-instantiate"), + Operation::QueryPackagesJson => write!(f, "nix-env -qa --json"), + Operation::QueryPackagesOutputs => write!(f, "nix-env -qaP --no-name --out-path"), + Operation::NoOp { ref operation } => operation.fmt(f), + Operation::Unknown { ref program } => write!(f, "{}", program), + Operation::Evaluate => write!(f, "nix-instantiate --strict --json ..."), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Nix { + pub system: String, + remote: String, + build_timeout: u16, + limit_supported_systems: bool, + initial_heap_size: Option<String>, +} + +impl Nix { + pub fn new( + system: String, + remote: String, + build_timeout: u16, + initial_heap_size: Option<String>, + ) -> Nix { + Nix { + system, + remote, + build_timeout, + initial_heap_size, + limit_supported_systems: true, + } + } + + pub fn with_system(&self, system: String) -> Nix { + let mut n = self.clone(); + n.system = system; + n + } + + pub fn with_limited_supported_systems(&self) -> Nix { + let mut n = self.clone(); + n.limit_supported_systems = true; + n + } + + pub fn without_limited_supported_systems(&self) -> Nix { + let mut n = self.clone(); + n.limit_supported_systems = false; + n + } + + pub fn safely_partition_instantiable_attrs( + &self, + nixpkgs: &Path, + file: File, + attrs: Vec<String>, + ) -> (Vec<String>, Vec<(String, Vec<String>)>) { + let attr_instantiations: Vec<Result<String, (String, Vec<String>)>> = attrs + .into_iter() + .map( + |attr| match self.safely_instantiate_attrs(nixpkgs, file, vec![attr.clone()]) { + Ok(_) => Ok(attr), + Err(f) => Err((attr, lines_from_file(f))), + }, + ) + .collect(); + + partition_result(attr_instantiations) + } + + pub fn safely_instantiate_attrs( + &self, + nixpkgs: &Path, + file: File, + attrs: Vec<String>, + ) -> Result<fs::File, fs::File> { + let mut command = self.safe_command::<&OsStr>(&Operation::Instantiate, nixpkgs, &[], &[]); + self.set_attrs_command(&mut command, file, attrs); + self.run(command, true) + } + + pub fn safely_evaluate_expr_cmd( + &self, + nixpkgs: &Path, + expr: &str, + argstrs: HashMap<&str, &str>, + extra_paths: &[&Path], + ) -> Command { + let mut attrargs: Vec<String> = Vec::with_capacity(2 + (argstrs.len() * 3)); + attrargs.push("--expr".to_owned()); + attrargs.push(expr.to_owned()); + for (argname, argstr) in argstrs { + attrargs.push(String::from("--argstr")); + attrargs.push(argname.to_owned()); + attrargs.push(argstr.to_owned()); + } + + self.safe_command(&Operation::Evaluate, nixpkgs, &attrargs, extra_paths) + } + + pub fn safely_build_attrs( + &self, + nixpkgs: &Path, + file: File, + attrs: Vec<String>, + ) -> Result<fs::File, fs::File> { + let mut command = self.safe_command::<&OsStr>(&Operation::Build, nixpkgs, &[], &[]); + self.set_attrs_command(&mut command, file, attrs); + self.run(command, true) + } + + pub fn safely_build_attrs_async( + &self, + nixpkgs: &Path, + file: File, + attrs: Vec<String>, + ) -> SpawnedAsyncCmd { + let mut command = self.safe_command::<&OsStr>(&Operation::Build, nixpkgs, &[], &[]); + self.set_attrs_command(&mut command, file, attrs); + AsyncCmd::new(command).spawn() + } + + fn set_attrs_command(&self, command: &mut Command, file: File, attrs: Vec<String>) { + let mut args: Vec<String> = Vec::with_capacity(3 + (attrs.len() * 2)); + args.push(format!("{file}")); + for attr in attrs { + args.push(String::from("-A")); + args.push(attr); + } + if let File::ReleaseNixOS = file { + args.push(String::from("--arg")); + args.push(String::from("nixpkgs")); + args.push(String::from( + "{ outPath=./.; revCount=999999; shortRev=\"ofborg\"; rev=\"0000000000000000000000000000000000000000\"; }", + )); + } + command.args(args); + } + + pub fn safely( + &self, + op: &Operation, + nixpkgs: &Path, + args: Vec<String>, + keep_stdout: bool, + ) -> Result<fs::File, fs::File> { + self.run(self.safe_command(op, nixpkgs, &args, &[]), keep_stdout) + } + + pub fn run(&self, mut cmd: Command, keep_stdout: bool) -> Result<fs::File, fs::File> { + let stderr = tempfile().expect("Fetching a stderr tempfile"); + let mut reader = stderr.try_clone().expect("Cloning stderr to the reader"); + + let stdout: Stdio = if keep_stdout { + Stdio::from(stderr.try_clone().expect("Cloning stderr for stdout")) + } else { + Stdio::null() + }; + + let status = cmd + .stdout(stdout) + .stderr(Stdio::from(stderr)) + .status() + .expect("Running a program ..."); + + reader + .seek(SeekFrom::Start(0)) + .expect("Seeking to Start(0)"); + + if status.success() { + Ok(reader) + } else { + Err(reader) + } + } + + pub fn run_stderr_stdout(&self, mut cmd: Command) -> (bool, fs::File, fs::File) { + let stdout_file = tempfile().expect("Fetching a stdout tempfile"); + let mut stdout_reader = stdout_file + .try_clone() + .expect("Cloning stdout to the reader"); + + let stderr_file = tempfile().expect("Fetching a stderr tempfile"); + let mut stderr_reader = stderr_file + .try_clone() + .expect("Cloning stderr to the reader"); + + let status = cmd + .stdout(Stdio::from(stdout_file)) + .stderr(Stdio::from(stderr_file)) + .status() + .expect("Running a program ..."); + + stdout_reader + .seek(SeekFrom::Start(0)) + .expect("Seeking dout to Start(0)"); + stderr_reader + .seek(SeekFrom::Start(0)) + .expect("Seeking stderr to Start(0)"); + + (status.success(), stdout_reader, stderr_reader) + } + + pub fn safe_command<S>( + &self, + op: &Operation, + nixpkgs: &Path, + args: &[S], + safe_paths: &[&Path], + ) -> Command + where + S: AsRef<OsStr>, + { + let nixpkgspath = format!("ofborg-nixpkgs-pr={}", nixpkgs.display()); + let mut nixpath: Vec<String> = safe_paths + .iter() + .map(|path| format!("{}", path.display())) + .collect(); + nixpath.push(nixpkgspath); + + let mut command = op.command(); + op.args(&mut command); + + command.env_clear(); + command.current_dir(nixpkgs); + command.env("HOME", "/homeless-shelter"); + command.env("NIX_PATH", nixpath.join(":")); + command.env("NIX_REMOTE", &self.remote); + + if let Some(ref initial_heap_size) = self.initial_heap_size { + command.env("GC_INITIAL_HEAP_SIZE", initial_heap_size); + } + + let path = env::var("PATH").unwrap(); + command.env("PATH", path); + + command.args(["--show-trace"]); + command.args(["--option", "restrict-eval", "true"]); + command.args([ + "--option", + "build-timeout", + &format!("{}", self.build_timeout), + ]); + command.args(["--argstr", "system", &self.system]); + + if self.limit_supported_systems { + command.args([ + "--arg", + "supportedSystems", + &format!("[\"{}\"]", &self.system), + ]); + } + + command.args(args); + command + } +} + +fn lines_from_file(file: fs::File) -> Vec<String> { + BufReader::new(file) + .lines() + .map_while(Result::ok) + .filter(|msg| !is_user_setting_warning(msg)) + .collect() +} + +pub fn is_user_setting_warning(line: &str) -> bool { + let line = line.trim(); + line.starts_with("warning: ignoring the user-specified setting '") + && line.ends_with("because it is a restricted setting and you are not a trusted user") +} + +pub fn wait_for_build_status(spawned: SpawnedAsyncCmd) -> BuildStatus { + match spawned.wait() { + Ok(s) => match s.code() { + Some(0) => BuildStatus::Success, + Some(100) => BuildStatus::Failure, // nix permanent failure + Some(101) => BuildStatus::TimedOut, // nix build timedout + Some(102) => BuildStatus::HashMismatch, // Fixed Output Derivation's hash was wrong + Some(code) => BuildStatus::UnexpectedError { + err: format!("command failed with exit code {code}"), + }, + None => BuildStatus::UnexpectedError { + err: "unexpected build failure".into(), + }, + }, + Err(err) => BuildStatus::UnexpectedError { + err: format!("failed on interior command {err}"), + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + use std::path::PathBuf; + + #[cfg(target_os = "linux")] + const SYSTEM: &str = "x86_64-linux"; + #[cfg(target_os = "macos")] + const SYSTEM: &str = "x86_64-darwin"; + + fn nix() -> Nix { + let path = env::var("PATH").unwrap(); + let test_path = format!("{}/test-nix/bin:{path}", env!("CARGO_MANIFEST_DIR")); + unsafe { env::set_var("PATH", test_path) }; + let remote = env::var("NIX_REMOTE").unwrap_or("".to_owned()); + Nix::new(SYSTEM.to_owned(), remote, 1800, None) + } + + fn noop(operation: Operation) -> Operation { + Operation::NoOp { + operation: Box::new(operation), + } + } + + fn env_noop() -> Operation { + Operation::Unknown { + program: "./environment.sh".to_owned(), + } + } + + fn build_path() -> PathBuf { + let mut cwd = env::current_dir().unwrap(); + cwd.push(Path::new("./test-srcs/build")); + cwd + } + + fn passing_eval_path() -> PathBuf { + let mut cwd = env::current_dir().unwrap(); + cwd.push(Path::new("./test-srcs/eval")); + cwd + } + + fn individual_eval_path() -> PathBuf { + let mut cwd = env::current_dir().unwrap(); + cwd.push(Path::new("./test-srcs/eval-mixed-failure")); + cwd + } + + fn strip_ansi(string: &str) -> String { + string + .replace(['‘', '’'], "'") + .replace("\u{1b}[31;1m", "") // red + .replace("\u{1b}[0m", "") // reset + } + + #[derive(Debug)] + enum Expect { + Pass, + Fail, + } + + fn assert_run(res: Result<fs::File, fs::File>, expected: Expect, require: Vec<&str>) { + let expectation_held: bool = match expected { + Expect::Pass => res.is_ok(), + Expect::Fail => res.is_err(), + }; + + let file: fs::File = match res { + Ok(file) => file, + Err(file) => file, + }; + + let lines = lines_from_file(file); + + let buildlog = lines + .into_iter() + .map(|line| strip_ansi(&line)) + .map(|line| format!(" | {line}")) + .collect::<Vec<String>>() + .join("\n"); + + let total_requirements = require.len(); + let mut missed_requirements: usize = 0; + let requirements_held: Vec<Result<String, String>> = require + .into_iter() + .map(|line| line.to_owned()) + .map(|line| { + if buildlog.contains(&line) { + Ok(line) + } else { + missed_requirements += 1; + Err(line) + } + }) + .collect(); + + let mut prefixes: Vec<String> = vec!["".to_owned(), "".to_owned()]; + + if !expectation_held { + prefixes.push(format!( + "The run was expected to {:?}, but did not.", + expected + )); + prefixes.push("".to_owned()); + } else { + prefixes.push(format!("The run was expected to {expected:?}, and did.")); + prefixes.push("".to_owned()); + } + + let mut suffixes = vec![ + "".to_owned(), + format!( + "{} out of {} required lines matched.", + (total_requirements - missed_requirements), + total_requirements + ), + "".to_owned(), + ]; + + for expected_line in requirements_held { + suffixes.push(format!(" - {expected_line:?}")); + } + suffixes.push("".to_owned()); + + let output_blocks: Vec<Vec<String>> = + vec![prefixes, vec![buildlog, "".to_owned()], suffixes]; + + let output_blocks_strings: Vec<String> = output_blocks + .into_iter() + .map(|lines| lines.join("\n")) + .collect(); + + let output: String = output_blocks_strings.join("\n"); + + if expectation_held && missed_requirements == 0 { + } else { + panic!("{output}"); + } + } + + #[test] + fn test_build_operations() { + let nix = nix(); + let op = noop(Operation::Build); + assert_eq!(op.to_string(), "nix-build"); + + let ret: Result<fs::File, fs::File> = nix.run( + nix.safe_command(&op, build_path().as_path(), &["--version"], &[]), + true, + ); + + assert_run( + ret, + Expect::Pass, + vec!["--no-out-link --keep-going", "--version"], + ); + } + + #[test] + fn test_instantiate_operation() { + let nix = nix(); + let op = noop(Operation::Instantiate); + assert_eq!(op.to_string(), "nix-instantiate"); + + let ret: Result<fs::File, fs::File> = nix.run( + nix.safe_command(&op, build_path().as_path(), &["--version"], &[]), + true, + ); + + assert_run(ret, Expect::Pass, vec!["--version"]); + } + + #[test] + fn test_query_packages_json() { + let nix = nix(); + let op = noop(Operation::QueryPackagesJson); + assert_eq!(op.to_string(), "nix-env -qa --json"); + + let ret: Result<fs::File, fs::File> = nix.run( + nix.safe_command(&op, build_path().as_path(), &["--version"], &[]), + true, + ); + + assert_run( + ret, + Expect::Pass, + vec!["--query --available --json", "--version"], + ); + } + + #[test] + fn test_query_packages_outputs() { + let nix = nix(); + let op = noop(Operation::QueryPackagesOutputs); + assert_eq!(op.to_string(), "nix-env -qaP --no-name --out-path"); + + let ret: Result<fs::File, fs::File> = nix.run( + nix.safe_command(&op, build_path().as_path(), &["--version"], &[]), + true, + ); + + assert_run( + ret, + Expect::Pass, + vec![ + "--query --available --no-name --attr-path --out-path", + "--version", + ], + ); + } + + #[test] + fn safe_command_environment() { + let nix = nix(); + + let ret: Result<fs::File, fs::File> = nix.run( + nix.safe_command::<&OsStr>(&env_noop(), build_path().as_path(), &[], &[]), + true, + ); + + assert_run( + ret, + Expect::Pass, + vec![ + "HOME=/homeless-shelter", + "NIX_PATH=ofborg-nixpkgs-pr=", + "NIX_REMOTE=", + "PATH=", + ], + ); + } + + #[test] + fn safe_command_custom_gc() { + let remote = env::var("NIX_REMOTE").unwrap_or("".to_owned()); + let nix = Nix::new(SYSTEM.to_owned(), remote, 1800, Some("4g".to_owned())); + + let ret: Result<fs::File, fs::File> = nix.run( + nix.safe_command::<&OsStr>(&env_noop(), build_path().as_path(), &[], &[]), + true, + ); + + assert_run( + ret, + Expect::Pass, + vec![ + "HOME=/homeless-shelter", + "NIX_PATH=ofborg-nixpkgs-pr=", + "NIX_REMOTE=", + "PATH=", + "GC_INITIAL_HEAP_SIZE=4g", + ], + ); + } + + #[test] + fn safe_command_options() { + let nix = nix(); + let op = noop(Operation::Build); + + let ret: Result<fs::File, fs::File> = nix.run( + nix.safe_command::<&OsStr>(&op, build_path().as_path(), &[], &[]), + true, + ); + + assert_run( + ret, + Expect::Pass, + vec!["--option restrict-eval true", "--option build-timeout 1800"], + ); + } + + #[test] + fn set_attrs_nixpkgs() { + let nix = nix(); + let op = noop(Operation::Build); + + let mut command = nix.safe_command::<&OsStr>(&op, build_path().as_path(), &[], &[]); + nix.set_attrs_command( + &mut command, + File::DefaultNixpkgs, + vec!["foo".into(), "bar".into()], + ); + + let ret: Result<fs::File, fs::File> = nix.run(command, true); + + assert_run(ret, Expect::Pass, vec!["./default.nix", "-A foo -A bar"]); + } + + #[test] + fn set_attrs_nixos() { + let nix = nix(); + let op = noop(Operation::Instantiate); + + let mut command = nix.safe_command::<&OsStr>(&op, build_path().as_path(), &[], &[]); + nix.set_attrs_command( + &mut command, + File::ReleaseNixOS, + vec!["foo".into(), "bar".into()], + ); + + let ret: Result<fs::File, fs::File> = nix.run(command, true); + + assert_run( + ret, + Expect::Pass, + vec![ + "./nixos/release.nix", + "--arg nixpkgs { outPath=./.; revCount=999999; shortRev=\"ofborg\"; rev=\"0000000000000000000000000000000000000000\"; }", + ], + ); + } + + #[test] + fn safely_build_attrs_success() { + let nix = nix(); + + let ret: Result<fs::File, fs::File> = nix.safely_build_attrs( + build_path().as_path(), + File::DefaultNixpkgs, + vec![String::from("success")], + ); + + assert_run( + ret, + Expect::Pass, + vec!["-success.drv", "building ", "hi", "-success"], + ); + } + + #[test] + fn safely_build_attrs_failure() { + let nix = nix(); + + let ret: Result<fs::File, fs::File> = nix.safely_build_attrs( + build_path().as_path(), + File::DefaultNixpkgs, + vec![String::from("failed")], + ); + + assert_run( + ret, + Expect::Fail, + vec![ + "-failed.drv", + "building ", + "hi", + "failed to produce output path", + ], + ); + } + + #[test] + fn partition_instantiable_attributes() { + let nix = nix(); + + let ret: (Vec<String>, Vec<(String, Vec<String>)>) = nix + .safely_partition_instantiable_attrs( + individual_eval_path().as_path(), + File::DefaultNixpkgs, + vec![ + String::from("fails-instantiation"), + String::from("passes-instantiation"), + String::from("missing-attr"), + ], + ); + + assert_eq!(ret.0, vec!["passes-instantiation"]); + + assert_eq!(ret.1[0].0, "fails-instantiation"); + assert_eq!( + ret.1[0].1[0], + "trace: You just can't frooble the frozz on this particular system." + ); + + eprintln!("{:?}", ret.1[1].1); + assert_eq!(ret.1[1].0, "missing-attr"); + let s = strip_ansi(ret.1[1].1.last().unwrap()); + assert_eq!( + s.trim_start_matches("error: "), + "attribute 'missing-attr' in selection path 'missing-attr' not found" + ); + } + + #[test] + fn safely_instantiate_attrs_failure() { + let nix = nix(); + + let ret: Result<fs::File, fs::File> = nix.safely_instantiate_attrs( + individual_eval_path().as_path(), + File::DefaultNixpkgs, + vec![String::from("fails-instantiation")], + ); + + assert_run( + ret, + Expect::Fail, + vec!["You just can't", "assertion", "failed"], + ); + } + + #[test] + fn safely_instantiate_attrs_success() { + let nix = nix(); + + let ret: Result<fs::File, fs::File> = nix.safely_instantiate_attrs( + individual_eval_path().as_path(), + File::DefaultNixpkgs, + vec![String::from("passes-instantiation")], + ); + + assert_run(ret, Expect::Pass, vec!["-passes-instantiation.drv"]); + } + + #[test] + fn safely_evaluate_expr_success() { + let nix = nix(); + + let ret: Result<fs::File, fs::File> = nix.run( + nix.safely_evaluate_expr_cmd( + individual_eval_path().as_path(), + r#"{ foo ? "bar" }: "The magic value is ${foo}""#, + [("foo", "tux")].iter().cloned().collect(), + &[], + ), + true, + ); + + assert_run(ret, Expect::Pass, vec!["The magic value is tux"]); + } + + #[test] + fn strict_sandboxing() { + let ret: Result<fs::File, fs::File> = nix().safely_build_attrs( + build_path().as_path(), + File::DefaultNixpkgs, + vec![String::from("sandbox-violation")], + ); + + assert_run( + ret, + Expect::Fail, + vec!["access to absolute path", "is forbidden in restricted mode"], + ); + } + + #[test] + fn instantiation_success() { + let ret: Result<fs::File, fs::File> = nix().safely( + &Operation::Instantiate, + passing_eval_path().as_path(), + vec![], + true, + ); + + assert_run( + ret, + Expect::Pass, + vec![ + "the result might be removed by the garbage collector", + "-failed.drv", + "-success.drv", + ], + ); + } + + #[test] + fn instantiation_nixpkgs_restricted_mode() { + let ret: Result<fs::File, fs::File> = nix().safely( + &Operation::Instantiate, + individual_eval_path().as_path(), + vec![String::from("-A"), String::from("nixpkgs-restricted-mode")], + true, + ); + + assert_run( + ret, + Expect::Fail, + vec![ + "access to URI 'git+file:///fake", + "is forbidden in restricted mode", + ], + ); + } +} diff --git a/ofborg/tickborg/src/nixenv.rs b/ofborg/tickborg/src/nixenv.rs new file mode 100644 index 0000000000..675eecfed9 --- /dev/null +++ b/ofborg/tickborg/src/nixenv.rs @@ -0,0 +1,70 @@ +//! Evaluates the expression like Hydra would, with regards to +//! architecture support and recursed packages. + +use std::fs::File; +use std::io::{self, Read}; + +#[derive(Debug)] +pub enum Error { + Io(io::Error), + Internal(Box<dyn std::error::Error>), + CommandFailed(File), + StatsParse(File, Result<u64, io::Error>, serde_json::Error), + UncleanEvaluation(Vec<String>), +} + +impl From<io::Error> for Error { + fn from(e: io::Error) -> Error { + Error::Io(e) + } +} + +impl Error { + pub fn display(self) -> String { + match self { + Error::Io(err) => format!("Failed during the setup of executing nix-env: {err:?}"), + Error::Internal(err) => format!("Internal error: {err:?}"), + Error::CommandFailed(mut fd) => { + let mut buffer = Vec::new(); + let read_result = fd.read_to_end(&mut buffer); + let bufstr = String::from_utf8_lossy(&buffer); + + match read_result { + Ok(_) => format!("nix-env failed:\n{bufstr}"), + Err(err) => format!( + "nix-env failed and loading the error result caused a new error {err:?}\n\n{bufstr}" + ), + } + } + Error::UncleanEvaluation(warnings) => { + format!("nix-env did not evaluate cleanly:\n {warnings:?}") + } + Error::StatsParse(mut fd, seek, parse_err) => { + let mut buffer = Vec::new(); + let read_result = fd.read_to_end(&mut buffer); + let bufstr = String::from_utf8_lossy(&buffer); + + let mut lines = + String::from("Parsing nix-env's performance statistics failed.\n\n"); + + if let Err(seek_err) = seek { + lines.push_str(&format!( + "Additionally, resetting to the beginning of the output failed with:\n{seek_err:?}\n\n" + )); + } + + if let Err(read_err) = read_result { + lines.push_str(&format!( + "Additionally, loading the output failed with:\n{read_err:?}\n\n" + )); + } + + lines.push_str(&format!("Parse error:\n{parse_err:?}\n\n")); + + lines.push_str(&format!("Evaluation output:\n{bufstr}")); + + lines + } + } + } +} diff --git a/ofborg/tickborg/src/notifyworker.rs b/ofborg/tickborg/src/notifyworker.rs new file mode 100644 index 0000000000..f83ab04a1a --- /dev/null +++ b/ofborg/tickborg/src/notifyworker.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use crate::worker::Action; + +#[async_trait::async_trait] +pub trait SimpleNotifyWorker { + type J; + + async fn consumer( + &self, + job: Self::J, + notifier: Arc<dyn NotificationReceiver + std::marker::Send + std::marker::Sync>, + ); + + fn msg_to_job( + &self, + routing_key: &str, + content_type: &Option<String>, + body: &[u8], + ) -> Result<Self::J, String>; +} + +#[async_trait::async_trait] +pub trait NotificationReceiver { + async fn tell(&self, action: Action); +} + +#[derive(Default)] +pub struct DummyNotificationReceiver { + pub actions: parking_lot::Mutex<Vec<Action>>, +} + +impl DummyNotificationReceiver { + pub fn new() -> DummyNotificationReceiver { + Default::default() + } +} + +#[async_trait::async_trait] +impl NotificationReceiver for DummyNotificationReceiver { + async fn tell(&self, action: Action) { + let mut actions = self.actions.lock(); + actions.push(action); + } +} diff --git a/ofborg/tickborg/src/outpathdiff.rs b/ofborg/tickborg/src/outpathdiff.rs new file mode 100644 index 0000000000..511890d20d --- /dev/null +++ b/ofborg/tickborg/src/outpathdiff.rs @@ -0,0 +1,7 @@ +#[derive(Debug, PartialEq, Hash, Eq, Clone)] +pub struct PackageArch { + pub package: Package, + pub architecture: Architecture, +} +type Package = String; +type Architecture = String; diff --git a/ofborg/tickborg/src/stats.rs b/ofborg/tickborg/src/stats.rs new file mode 100644 index 0000000000..16705e6a68 --- /dev/null +++ b/ofborg/tickborg/src/stats.rs @@ -0,0 +1,57 @@ +use lapin::options::BasicPublishOptions; + +include!(concat!(env!("OUT_DIR"), "/events.rs")); + +#[macro_use] +mod macros { + #[macro_export] + macro_rules! my_macro(() => (FooBar)); +} + +pub trait SysEvents: Send { + fn notify(&mut self, event: Event) -> impl std::future::Future<Output = ()>; +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct EventMessage { + pub sender: String, + pub events: Vec<Event>, +} + +pub struct RabbitMq<C> { + identity: String, + channel: C, +} + +impl RabbitMq<lapin::Channel> { + pub fn from_lapin(identity: &str, channel: lapin::Channel) -> Self { + RabbitMq { + identity: identity.to_owned(), + channel, + } + } +} + +impl SysEvents for RabbitMq<lapin::Channel> { + async fn notify(&mut self, event: Event) { + let props = lapin::BasicProperties::default().with_content_type("application/json".into()); + let _confirmaton = self + .channel + .basic_publish( + "stats".into(), + "".into(), + BasicPublishOptions::default(), + &serde_json::to_string(&EventMessage { + sender: self.identity.clone(), + events: vec![event], + }) + .unwrap() + .into_bytes(), + props, + ) + .await + .unwrap() + .await + .unwrap(); + } +} diff --git a/ofborg/tickborg/src/systems.rs b/ofborg/tickborg/src/systems.rs new file mode 100644 index 0000000000..36f5f32fff --- /dev/null +++ b/ofborg/tickborg/src/systems.rs @@ -0,0 +1,74 @@ +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum System { + X8664Linux, + Aarch64Linux, + X8664Darwin, + Aarch64Darwin, + X8664Windows, + Aarch64Windows, + X8664FreeBSD, +} + +impl System { + pub fn all_known_systems() -> Vec<Self> { + vec![ + Self::X8664Linux, + Self::Aarch64Linux, + Self::X8664Darwin, + Self::Aarch64Darwin, + Self::X8664Windows, + Self::Aarch64Windows, + Self::X8664FreeBSD, + ] + } + + /// The primary CI platforms (Linux + macOS + Windows x86_64) + pub fn primary_systems() -> Vec<Self> { + vec![ + Self::X8664Linux, + Self::X8664Darwin, + Self::X8664Windows, + ] + } + + /// Systems that can run full test suites + pub fn can_run_tests(&self) -> bool { + matches!( + self, + System::X8664Linux | System::Aarch64Linux | System::X8664Darwin | System::Aarch64Darwin | System::X8664Windows + ) + } + + /// GitHub Actions runner label for this system + pub fn runner_label(&self) -> &'static str { + match self { + System::X8664Linux => "ubuntu-latest", + System::Aarch64Linux => "ubuntu-24.04-arm", + System::X8664Darwin => "macos-15", + System::Aarch64Darwin => "macos-15", + System::X8664Windows => "windows-2025", + System::Aarch64Windows => "windows-2025-arm", + System::X8664FreeBSD => "ubuntu-latest", // cross-compile or VM + } + } +} + +impl std::fmt::Display for System { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + System::X8664Linux => write!(f, "x86_64-linux"), + System::Aarch64Linux => write!(f, "aarch64-linux"), + System::X8664Darwin => write!(f, "x86_64-darwin"), + System::Aarch64Darwin => write!(f, "aarch64-darwin"), + System::X8664Windows => write!(f, "x86_64-windows"), + System::Aarch64Windows => write!(f, "aarch64-windows"), + System::X8664FreeBSD => write!(f, "x86_64-freebsd"), + } + } +} + +impl System { + pub fn as_build_destination(&self) -> (Option<String>, Option<String>) { + (None, Some(format!("build-inputs-{self}"))) + } +} diff --git a/ofborg/tickborg/src/tagger.rs b/ofborg/tickborg/src/tagger.rs new file mode 100644 index 0000000000..e718901ded --- /dev/null +++ b/ofborg/tickborg/src/tagger.rs @@ -0,0 +1,87 @@ +use crate::buildtool::detect_changed_projects; + +/// Tags PRs based on which projects were modified. +pub struct ProjectTagger { + selected: Vec<String>, +} + +impl Default for ProjectTagger { + fn default() -> Self { + Self { + selected: vec![], + } + } +} + +impl ProjectTagger { + pub fn new() -> Self { + Default::default() + } + + /// Analyze changed files and generate project labels. + pub fn analyze_changes(&mut self, changed_files: &[String]) { + let projects = detect_changed_projects(changed_files); + for project in projects { + self.selected.push(format!("project: {project}")); + } + + // Check for cross-cutting changes + let has_ci = changed_files.iter().any(|f| { + f.starts_with(".github/") || f.starts_with("ci/") + }); + let has_docs = changed_files.iter().any(|f| { + f.starts_with("docs/") || f.ends_with(".md") + }); + let has_root = changed_files.iter().any(|f| { + !f.contains('/') && !f.ends_with(".md") + }); + + if has_ci { + self.selected.push("scope: ci".into()); + } + if has_docs { + self.selected.push("scope: docs".into()); + } + if has_root { + self.selected.push("scope: root".into()); + } + } + + pub fn tags_to_add(&self) -> Vec<String> { + self.selected.clone() + } + + pub fn tags_to_remove(&self) -> Vec<String> { + vec![] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_project_tagger() { + let mut tagger = ProjectTagger::new(); + tagger.analyze_changes(&[ + "meshmc/CMakeLists.txt".into(), + "mnv/src/main.c".into(), + ".github/workflows/ci.yml".into(), + "README.md".into(), + ]); + let tags = tagger.tags_to_add(); + assert!(tags.contains(&"project: meshmc".into())); + assert!(tags.contains(&"project: mnv".into())); + assert!(tags.contains(&"scope: ci".into())); + assert!(tags.contains(&"scope: docs".into())); + } + + #[test] + fn test_project_tagger_no_projects() { + let mut tagger = ProjectTagger::new(); + tagger.analyze_changes(&["README.md".into()]); + let tags = tagger.tags_to_add(); + assert!(!tags.iter().any(|t| t.starts_with("project:"))); + assert!(tags.contains(&"scope: docs".into())); + } +} diff --git a/ofborg/tickborg/src/tasks/build.rs b/ofborg/tickborg/src/tasks/build.rs new file mode 100644 index 0000000000..56583b28b4 --- /dev/null +++ b/ofborg/tickborg/src/tasks/build.rs @@ -0,0 +1,599 @@ +use crate::checkout; +use crate::message::buildresult::{BuildResult, BuildStatus, V1Tag}; +use crate::message::{buildjob, buildlogmsg}; +use crate::buildtool; +use crate::notifyworker; +use crate::worker; + +use std::collections::VecDeque; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use tracing::{debug, debug_span, error, info}; +use uuid::Uuid; + +pub struct BuildWorker { + cloner: checkout::CachedCloner, + build_executor: buildtool::BuildExecutor, + system: String, + identity: String, +} + +impl BuildWorker { + pub fn new( + cloner: checkout::CachedCloner, + build_executor: buildtool::BuildExecutor, + system: String, + identity: String, + ) -> BuildWorker { + BuildWorker { + cloner, + build_executor, + system, + identity, + } + } + + fn actions( + &self, + job: buildjob::BuildJob, + receiver: Arc< + dyn notifyworker::NotificationReceiver + std::marker::Send + std::marker::Sync, + >, + ) -> JobActions { + JobActions::new(&self.system, &self.identity, job, receiver) + } +} + +pub struct JobActions { + system: String, + identity: String, + receiver: Arc<dyn notifyworker::NotificationReceiver + std::marker::Send + std::marker::Sync>, + job: buildjob::BuildJob, + line_counter: AtomicU64, + snippet_log: parking_lot::RwLock<VecDeque<String>>, + attempt_id: String, + log_exchange: Option<String>, + log_routing_key: Option<String>, + result_exchange: Option<String>, + result_routing_key: Option<String>, +} + +impl JobActions { + pub fn new( + system: &str, + identity: &str, + job: buildjob::BuildJob, + receiver: Arc< + dyn notifyworker::NotificationReceiver + std::marker::Send + std::marker::Sync, + >, + ) -> JobActions { + let (log_exchange, log_routing_key) = job + .logs + .clone() + .unwrap_or((Some(String::from("logs")), Some(String::from("build.log")))); + + let (result_exchange, result_routing_key) = job + .statusreport + .clone() + .unwrap_or((Some(String::from("build-results")), None)); + + JobActions { + system: system.to_owned(), + identity: identity.to_owned(), + receiver, + job, + line_counter: 0.into(), + snippet_log: parking_lot::RwLock::new(VecDeque::with_capacity(10)), + attempt_id: Uuid::new_v4().to_string(), + log_exchange, + log_routing_key, + result_exchange, + result_routing_key, + } + } + + pub fn log_snippet(&self) -> Vec<String> { + self.snippet_log.read().clone().into() + } + + pub async fn pr_head_missing(&self) { + self.tell(worker::Action::Ack).await; + } + + pub async fn commit_missing(&self) { + self.tell(worker::Action::Ack).await; + } + + pub async fn nothing_to_do(&self) { + self.tell(worker::Action::Ack).await; + } + + pub async fn merge_failed(&self) { + let msg = BuildResult::V1 { + tag: V1Tag::V1, + repo: self.job.repo.clone(), + pr: self.job.pr.clone(), + system: self.system.clone(), + output: vec![String::from("Merge failed")], + attempt_id: self.attempt_id.clone(), + request_id: self.job.request_id.clone(), + attempted_attrs: None, + skipped_attrs: None, + status: BuildStatus::Failure, + }; + + let result_exchange = self.result_exchange.clone(); + let result_routing_key = self.result_routing_key.clone(); + + self.tell(worker::publish_serde_action( + result_exchange, + result_routing_key, + &msg, + )) + .await; + self.tell(worker::Action::Ack).await; + } + + pub async fn log_started(&self, can_build: Vec<String>, cannot_build: Vec<String>) { + let msg = buildlogmsg::BuildLogStart { + identity: self.identity.clone(), + system: self.system.clone(), + attempt_id: self.attempt_id.clone(), + attempted_attrs: Some(can_build), + skipped_attrs: Some(cannot_build), + }; + + let log_exchange = self.log_exchange.clone(); + let log_routing_key = self.log_routing_key.clone(); + + self.tell(worker::publish_serde_action( + log_exchange, + log_routing_key, + &msg, + )) + .await; + } + + pub async fn log_instantiation_errors(&self, cannot_build: Vec<(String, Vec<String>)>) { + for (attr, log) in cannot_build { + self.log_line(format!("Cannot build `{attr}` because:")) + .await; + + for line in log { + self.log_line(line).await; + } + self.log_line("".into()).await; + } + } + + pub async fn log_line(&self, line: String) { + self.line_counter.fetch_add(1, Ordering::SeqCst); + + { + let mut snippet_log = self.snippet_log.write(); + if snippet_log.len() >= 10 { + snippet_log.pop_front(); + } + snippet_log.push_back(line.clone()); + } + + let msg = buildlogmsg::BuildLogMsg { + identity: self.identity.clone(), + system: self.system.clone(), + attempt_id: self.attempt_id.clone(), + line_number: self.line_counter.load(Ordering::SeqCst), + output: line, + }; + + let log_exchange = self.log_exchange.clone(); + let log_routing_key = self.log_routing_key.clone(); + + self.tell(worker::publish_serde_action( + log_exchange, + log_routing_key, + &msg, + )) + .await; + } + + pub async fn build_not_attempted(&self, not_attempted_attrs: Vec<String>) { + let msg = BuildResult::V1 { + tag: V1Tag::V1, + repo: self.job.repo.clone(), + pr: self.job.pr.clone(), + system: self.system.clone(), + output: self.log_snippet(), + attempt_id: self.attempt_id.clone(), + request_id: self.job.request_id.clone(), + skipped_attrs: Some(not_attempted_attrs), + attempted_attrs: None, + status: BuildStatus::Skipped, + }; + + let result_exchange = self.result_exchange.clone(); + let result_routing_key = self.result_routing_key.clone(); + self.tell(worker::publish_serde_action( + result_exchange, + result_routing_key, + &msg, + )) + .await; + + let log_exchange = self.log_exchange.clone(); + let log_routing_key = self.log_routing_key.clone(); + self.tell(worker::publish_serde_action( + log_exchange, + log_routing_key, + &msg, + )) + .await; + + self.tell(worker::Action::Ack).await; + } + + pub async fn build_finished( + &self, + status: BuildStatus, + attempted_attrs: Vec<String>, + not_attempted_attrs: Vec<String>, + ) { + let msg = BuildResult::V1 { + tag: V1Tag::V1, + repo: self.job.repo.clone(), + pr: self.job.pr.clone(), + system: self.system.clone(), + output: self.log_snippet(), + attempt_id: self.attempt_id.clone(), + request_id: self.job.request_id.clone(), + status, + attempted_attrs: Some(attempted_attrs), + skipped_attrs: Some(not_attempted_attrs), + }; + + let result_exchange = self.result_exchange.clone(); + let result_routing_key = self.result_routing_key.clone(); + self.tell(worker::publish_serde_action( + result_exchange, + result_routing_key, + &msg, + )) + .await; + + let log_exchange = self.log_exchange.clone(); + let log_routing_key = self.log_routing_key.clone(); + self.tell(worker::publish_serde_action( + log_exchange, + log_routing_key, + &msg, + )) + .await; + + self.tell(worker::Action::Ack).await; + } + + async fn tell(&self, action: worker::Action) { + self.receiver.tell(action).await; + } +} + +#[async_trait::async_trait] +impl notifyworker::SimpleNotifyWorker for BuildWorker { + type J = buildjob::BuildJob; + + fn msg_to_job(&self, _: &str, _: &Option<String>, body: &[u8]) -> Result<Self::J, String> { + info!("lmao I got a job?"); + match buildjob::from(body) { + Ok(job) => Ok(job), + Err(err) => { + error!("{:?}", std::str::from_utf8(body).unwrap_or("<not utf8>")); + panic!("{err:?}"); + } + } + } + + // FIXME: remove with rust/cargo update + #[allow(clippy::cognitive_complexity)] + async fn consumer( + &self, + job: buildjob::BuildJob, + notifier: Arc< + dyn notifyworker::NotificationReceiver + std::marker::Send + std::marker::Sync, + >, + ) { + let span = debug_span!("job", pr = ?job.pr.number); + let _enter = span.enter(); + + let actions = self.actions(job, notifier); + + if actions.job.attrs.is_empty() { + debug!("No attrs to build"); + actions.nothing_to_do().await; + return; + } + + info!( + "Working on https://github.com/{}/pull/{}", + actions.job.repo.full_name, actions.job.pr.number + ); + let project = self.cloner.project( + &actions.job.repo.full_name, + actions.job.repo.clone_url.clone(), + ); + let co = project + .clone_for("builder".to_string(), self.identity.clone()) + .unwrap(); + + let target_branch = match actions.job.pr.target_branch.clone() { + Some(x) => x, + None => String::from("origin/main"), + }; + + let refpath = co.checkout_origin_ref(target_branch.as_ref()).unwrap(); + + if co.fetch_pr(actions.job.pr.number).is_err() { + info!("Failed to fetch {}", actions.job.pr.number); + actions.pr_head_missing().await; + return; + } + + if !co.commit_exists(actions.job.pr.head_sha.as_ref()) { + info!("Commit {} doesn't exist", actions.job.pr.head_sha); + actions.commit_missing().await; + return; + } + + if co.merge_commit(actions.job.pr.head_sha.as_ref()).is_err() { + info!("Failed to merge {}", actions.job.pr.head_sha); + actions.merge_failed().await; + return; + } + + // Determine which projects to build from the requested attrs + let can_build: Vec<String> = actions.job.attrs.clone(); + let cannot_build: Vec<(String, Vec<String>)> = Vec::new(); + let cannot_build_attrs: Vec<String> = Vec::new(); + + info!( + "Can build: '{}', Cannot build: '{}'", + can_build.join(", "), + cannot_build_attrs.join(", ") + ); + + actions + .log_started(can_build.clone(), cannot_build_attrs.clone()) + .await; + actions.log_instantiation_errors(cannot_build).await; + + if can_build.is_empty() { + actions.build_not_attempted(cannot_build_attrs).await; + return; + } + + // Build each requested project using the build executor + let mut overall_status = BuildStatus::Success; + for project_name in &can_build { + if let Some(config) = buildtool::find_project(project_name) { + actions.log_line(format!("Building project: {}", project_name)).await; + let result = self.build_executor.build_project(refpath.as_ref(), &config); + + match result { + Ok(mut output) => { + use std::io::Read; + let mut buf = String::new(); + output.read_to_string(&mut buf).ok(); + for line in buf.lines() { + actions.log_line(line.to_string()).await; + } + } + Err(mut output) => { + overall_status = BuildStatus::Failure; + use std::io::Read; + let mut buf = String::new(); + output.read_to_string(&mut buf).ok(); + for line in buf.lines() { + actions.log_line(line.to_string()).await; + } + } + } + } else { + actions.log_line(format!("Unknown project: {}", project_name)).await; + } + } + + info!("Build finished ({:?})", overall_status); + info!("Lines:"); + info!("-----8<-----"); + actions + .log_snippet() + .iter() + .inspect(|x| info!("{}", x)) + .next_back(); + info!("----->8-----"); + + actions + .build_finished(overall_status, can_build, cannot_build_attrs) + .await; + info!("Build done!"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::message::{Pr, Repo}; + use crate::notifyworker::SimpleNotifyWorker; + use crate::test_scratch::TestScratch; + use std::env; + use std::path::{Path, PathBuf}; + use std::process::{Command, Stdio}; + use std::vec::IntoIter; + + #[cfg(target_os = "linux")] + const SYSTEM: &str = "x86_64-linux"; + #[cfg(target_os = "macos")] + const SYSTEM: &str = "x86_64-darwin"; + + fn build_executor() -> buildtool::BuildExecutor { + buildtool::BuildExecutor::new(1800) + } + + fn tpath(component: &str) -> PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")).join(component) + } + + fn make_worker(path: &Path) -> BuildWorker { + let cloner = checkout::cached_cloner(path); + let executor = build_executor(); + + BuildWorker::new( + cloner, + executor, + SYSTEM.to_owned(), + "cargo-test-build".to_owned(), + ) + } + + fn make_pr_repo(bare: &Path, co: &Path) -> String { + let output = Command::new("bash") + .current_dir(tpath("./test-srcs")) + .arg("make-pr.sh") + .arg(bare) + .arg(co) + .stderr(Stdio::null()) + .stdout(Stdio::piped()) + .output() + .expect("building the test PR failed"); + let hash = String::from_utf8(output.stdout).expect("Should just be a hash"); + + hash.trim().to_owned() + } + + fn strip_escaped_ansi(string: &str) -> String { + string + .replace(['‘', '’'], "'") + .replace("\\u001b[31;1m", "") // red + .replace("\\u001b[0m", "") // reset + } + + fn assert_contains_job(actions: &mut IntoIter<worker::Action>, text_to_match: &str) { + println!("\n\n Searching: {text_to_match:?}"); + actions + .position(|job| match job { + worker::Action::Publish(ref body) => { + let content = std::str::from_utf8(&body.content).unwrap(); + let text = strip_escaped_ansi(content); + eprintln!("{text}"); + if text.contains(text_to_match) { + println!(" ok"); + true + } else { + println!(" notContains: {text}"); + false + } + } + other => { + println!(" notPublish: {other:?}"); + false + } + }) + .unwrap_or_else(|| { + panic!("Actions should contain a job matching {text_to_match}, after the previous check") + }); + } + + #[tokio::test] + pub async fn test_simple_build() { + let p = TestScratch::new_dir("build-simple-build-working"); + let bare_repo = TestScratch::new_dir("build-simple-build-bare"); + let co_repo = TestScratch::new_dir("build-simple-build-co"); + + let head_sha = make_pr_repo(&bare_repo.path(), &co_repo.path()); + let worker = make_worker(&p.path()); + + let job = buildjob::BuildJob { + attrs: vec!["success".to_owned()], + pr: Pr { + head_sha, + number: 1, + target_branch: Some("main".to_owned()), + }, + repo: Repo { + clone_url: bare_repo.path().to_str().unwrap().to_owned(), + full_name: "test-git".to_owned(), + name: "project-tick".to_owned(), + owner: "tickborg-test".to_owned(), + }, + subset: None, + logs: Some((Some(String::from("logs")), Some(String::from("build.log")))), + statusreport: Some((Some(String::from("build-results")), None)), + request_id: "bogus-request-id".to_owned(), + }; + + let dummyreceiver = Arc::new(notifyworker::DummyNotificationReceiver::new()); + + worker.consumer(job, dummyreceiver.clone()).await; + + println!("Total actions: {:?}", dummyreceiver.actions.lock().len()); + let actions_vec = dummyreceiver.actions.lock().clone(); + let mut actions = actions_vec.into_iter(); + + assert_contains_job(&mut actions, "output\":\"hi"); + assert_contains_job(&mut actions, "output\":\"1"); + assert_contains_job(&mut actions, "output\":\"2"); + assert_contains_job(&mut actions, "output\":\"3"); + assert_contains_job(&mut actions, "output\":\"4"); + assert_contains_job(&mut actions, "status\":\"Success\""); // First one to the github poster + assert_contains_job(&mut actions, "status\":\"Success\""); // This one to the logs + assert_eq!(actions.next(), Some(worker::Action::Ack)); + } + + #[tokio::test] + pub async fn test_all_jobs_skipped() { + let p = TestScratch::new_dir("no-attempt"); + let bare_repo = TestScratch::new_dir("no-attempt-bare"); + let co_repo = TestScratch::new_dir("no-attempt-co"); + + let head_sha = make_pr_repo(&bare_repo.path(), &co_repo.path()); + let worker = make_worker(&p.path()); + + let job = buildjob::BuildJob { + attrs: vec!["not-real".to_owned()], + pr: Pr { + head_sha, + number: 1, + target_branch: Some("main".to_owned()), + }, + repo: Repo { + clone_url: bare_repo.path().to_str().unwrap().to_owned(), + full_name: "test-git".to_owned(), + name: "project-tick".to_owned(), + owner: "tickborg-test".to_owned(), + }, + subset: None, + logs: Some((Some(String::from("logs")), Some(String::from("build.log")))), + statusreport: Some((Some(String::from("build-results")), None)), + request_id: "bogus-request-id".to_owned(), + }; + + let dummyreceiver = Arc::new(notifyworker::DummyNotificationReceiver::new()); + + worker.consumer(job, dummyreceiver.clone()).await; + + println!("Total actions: {:?}", dummyreceiver.actions.lock().len()); + let actions_vec = dummyreceiver.actions.lock().clone(); + let mut actions = actions_vec.into_iter(); + + assert_contains_job( + &mut actions, + r#""line_number":1,"output":"Cannot build `not-real` because:""#, + ); + assert_contains_job( + &mut actions, + "attribute 'not-real' in selection path 'not-real' not found\"}", + ); + assert_contains_job(&mut actions, "skipped_attrs\":[\"not-real"); // First one to the github poster + assert_contains_job(&mut actions, "skipped_attrs\":[\"not-real"); // This one to the logs + assert_eq!(actions.next(), Some(worker::Action::Ack)); + } +} diff --git a/ofborg/tickborg/src/tasks/eval/mod.rs b/ofborg/tickborg/src/tasks/eval/mod.rs new file mode 100644 index 0000000000..6f8a2d1955 --- /dev/null +++ b/ofborg/tickborg/src/tasks/eval/mod.rs @@ -0,0 +1,48 @@ +mod monorepo; + +pub use self::monorepo::MonorepoStrategy; +use crate::checkout::CachedProjectCo; +use crate::commitstatus::{CommitStatus, CommitStatusError}; +use crate::evalchecker::EvalChecker; +use crate::message::buildjob::BuildJob; + +use std::path::Path; + +pub trait EvaluationStrategy { + fn pre_clone(&mut self) -> impl std::future::Future<Output = StepResult<()>>; + + fn on_target_branch( + &mut self, + co: &Path, + status: &mut CommitStatus, + ) -> impl std::future::Future<Output = StepResult<()>>; + fn after_fetch(&mut self, co: &CachedProjectCo) -> StepResult<()>; + fn after_merge( + &mut self, + status: &mut CommitStatus, + ) -> impl std::future::Future<Output = StepResult<()>>; + fn evaluation_checks(&self) -> Vec<EvalChecker>; + fn all_evaluations_passed( + &mut self, + status: &mut CommitStatus, + ) -> impl std::future::Future<Output = StepResult<EvaluationComplete>>; +} + +pub type StepResult<T> = Result<T, Error>; + +#[derive(Default)] +pub struct EvaluationComplete { + pub builds: Vec<BuildJob>, +} + +#[derive(Debug)] +pub enum Error { + CommitStatusWrite(CommitStatusError), + Fail(String), +} + +impl From<CommitStatusError> for Error { + fn from(e: CommitStatusError) -> Error { + Error::CommitStatusWrite(e) + } +} diff --git a/ofborg/tickborg/src/tasks/eval/monorepo.rs b/ofborg/tickborg/src/tasks/eval/monorepo.rs new file mode 100644 index 0000000000..cc86653f0c --- /dev/null +++ b/ofborg/tickborg/src/tasks/eval/monorepo.rs @@ -0,0 +1,254 @@ +use crate::buildtool::detect_changed_projects; +use crate::checkout::CachedProjectCo; +use crate::commentparser::Subset; +use crate::commitstatus::CommitStatus; +use crate::evalchecker::EvalChecker; +use crate::message::buildjob::BuildJob; +use crate::message::evaluationjob::EvaluationJob; +use crate::tasks::eval::{EvaluationComplete, EvaluationStrategy, StepResult}; +use crate::tasks::evaluate::update_labels; + +use std::path::Path; + +use hubcaps::issues::IssueRef; +use regex::Regex; +use uuid::Uuid; + +/// Project Tick specific labels from PR title keywords +const TITLE_LABELS: [(&str, &str); 12] = [ + ("meshmc", "project: meshmc"), + ("mnv", "project: mnv"), + ("neozip", "project: neozip"), + ("cmark", "project: cmark"), + ("cgit", "project: cgit"), + ("json4cpp", "project: json4cpp"), + ("tomlplusplus", "project: tomlplusplus"), + ("corebinutils", "project: corebinutils"), + ("forgewrapper", "project: forgewrapper"), + ("genqrcode", "project: genqrcode"), + ("darwin", "platform: macos"), + ("windows", "platform: windows"), +]; + +fn label_from_title(title: &str) -> Vec<String> { + let title_lower = title.to_lowercase(); + TITLE_LABELS + .iter() + .filter(|(word, _label)| { + let re = Regex::new(&format!("\\b{word}\\b")).unwrap(); + re.is_match(&title_lower) + }) + .map(|(_word, label)| (*label).into()) + .collect() +} + +/// Parses Conventional Commit messages to extract affected project scopes +fn parse_commit_scopes(messages: &[String]) -> Vec<String> { + let scope_re = Regex::new(r"^[a-z]+\(([^)]+)\)").unwrap(); + let colon_re = Regex::new(r"^([a-z0-9_-]+):").unwrap(); + + let mut projects: Vec<String> = messages + .iter() + .filter_map(|line| { + let trimmed = line.trim(); + // Try Conventional Commits: "feat(meshmc): ..." + if let Some(caps) = scope_re.captures(trimmed) { + Some(caps[1].to_string()) + } + // Try simple "project: description" + else if let Some(caps) = colon_re.captures(trimmed) { + let candidate = caps[1].to_string(); + // Only accept known project names + if crate::buildtool::find_project(&candidate).is_some() { + Some(candidate) + } else { + None + } + } else { + None + } + }) + .collect(); + + projects.sort(); + projects.dedup(); + projects +} + +pub struct MonorepoStrategy<'a> { + job: &'a EvaluationJob, + issue_ref: &'a IssueRef, + changed_projects: Option<Vec<String>>, +} + +impl<'a> MonorepoStrategy<'a> { + pub fn new(job: &'a EvaluationJob, issue_ref: &'a IssueRef) -> MonorepoStrategy<'a> { + Self { + job, + issue_ref, + changed_projects: None, + } + } + + async fn tag_from_title(&self) { + let title = match self.issue_ref.get().await { + Ok(issue) => issue.title.to_lowercase(), + Err(_) => return, + }; + + let labels = label_from_title(&title); + + if labels.is_empty() { + return; + } + + update_labels(self.issue_ref, &labels, &[]).await; + } + + fn queue_builds(&self) -> StepResult<Vec<BuildJob>> { + if let Some(ref projects) = self.changed_projects { + if !projects.is_empty() && projects.len() <= 15 { + Ok(vec![BuildJob::new( + self.job.repo.clone(), + self.job.pr.clone(), + Subset::Project, + projects.clone(), + None, + None, + Uuid::new_v4().to_string(), + )]) + } else { + Ok(vec![]) + } + } else { + Ok(vec![]) + } + } +} + +impl EvaluationStrategy for MonorepoStrategy<'_> { + async fn pre_clone(&mut self) -> StepResult<()> { + self.tag_from_title().await; + Ok(()) + } + + async fn on_target_branch(&mut self, _dir: &Path, status: &mut CommitStatus) -> StepResult<()> { + status + .set_with_description( + "Analyzing changed projects", + hubcaps::statuses::State::Pending, + ) + .await?; + Ok(()) + } + + fn after_fetch(&mut self, co: &CachedProjectCo) -> StepResult<()> { + // Strategy 1: detect from changed files + let changed_files = co + .files_changed_from_head(&self.job.pr.head_sha) + .unwrap_or_default(); + let mut projects = detect_changed_projects(&changed_files); + + // Strategy 2: also parse commit messages for scopes + let commit_scopes = parse_commit_scopes( + &co.commit_messages_from_head(&self.job.pr.head_sha) + .unwrap_or_else(|_| vec!["".to_owned()]), + ); + + for scope in commit_scopes { + if !projects.contains(&scope) { + projects.push(scope); + } + } + + projects.sort(); + projects.dedup(); + self.changed_projects = Some(projects); + + Ok(()) + } + + async fn after_merge(&mut self, status: &mut CommitStatus) -> StepResult<()> { + let project_list = self + .changed_projects + .as_ref() + .map(|p| p.join(", ")) + .unwrap_or_else(|| "none".into()); + status + .set_with_description( + &format!("Changed: {project_list}"), + hubcaps::statuses::State::Pending, + ) + .await?; + Ok(()) + } + + fn evaluation_checks(&self) -> Vec<EvalChecker> { + vec![] + } + + async fn all_evaluations_passed( + &mut self, + status: &mut CommitStatus, + ) -> StepResult<EvaluationComplete> { + status + .set_with_description( + "Scheduling project builds", + hubcaps::statuses::State::Pending, + ) + .await?; + + let builds = self.queue_builds()?; + Ok(EvaluationComplete { builds }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_label_from_title() { + assert_eq!( + label_from_title("feat(meshmc): add new block type"), + vec![String::from("project: meshmc")] + ); + assert_eq!( + label_from_title("fix windows build for meshmc"), + vec![String::from("project: meshmc"), String::from("platform: windows")] + ); + assert_eq!( + label_from_title("fix darwin support"), + vec![String::from("platform: macos")] + ); + assert_eq!( + label_from_title("docs: update README"), + Vec::<String>::new() + ); + } + + #[test] + fn test_parse_commit_scopes() { + let messages = vec![ + "feat(meshmc): add new feature".into(), + "fix(mnv): resolve segfault".into(), + "chore: update CI".into(), + "Merge pull request #123 from feature/xyz".into(), + "neozip: bump version".into(), + ]; + let scopes = parse_commit_scopes(&messages); + assert_eq!(scopes, vec!["meshmc", "mnv", "neozip"]); + } + + #[test] + fn test_parse_commit_scopes_unknown() { + let messages = vec![ + "feat(unknownproject): something".into(), + "docs: update readme".into(), + ]; + let scopes = parse_commit_scopes(&messages); + // "unknownproject" should be included (scope from conventional commit) + // "docs" should NOT be included (not a known project) + assert_eq!(scopes, vec!["unknownproject"]); + } +} diff --git a/ofborg/tickborg/src/tasks/evaluate.rs b/ofborg/tickborg/src/tasks/evaluate.rs new file mode 100644 index 0000000000..8f277aa228 --- /dev/null +++ b/ofborg/tickborg/src/tasks/evaluate.rs @@ -0,0 +1,556 @@ +/// This is what evaluates every pull-request +use crate::acl::Acl; +use crate::checkout; +use crate::commitstatus::{CommitStatus, CommitStatusError}; +use crate::config::GithubAppVendingMachine; +use crate::message::{buildjob, evaluationjob}; +use crate::stats::{self, Event}; +use crate::systems; +use crate::tasks::eval; +use crate::tasks::eval::EvaluationStrategy; +use crate::worker; +use futures::stream::StreamExt; +use futures_util::TryFutureExt; + +use std::path::Path; +use std::time::Instant; + +use tracing::{debug_span, error, info, warn}; + +pub struct EvaluationWorker<E> { + cloner: checkout::CachedCloner, + github_vend: tokio::sync::RwLock<GithubAppVendingMachine>, + acl: Acl, + identity: String, + events: E, +} + +impl<E: stats::SysEvents> EvaluationWorker<E> { + pub fn new( + cloner: checkout::CachedCloner, + github_vend: GithubAppVendingMachine, + acl: Acl, + identity: String, + events: E, + ) -> EvaluationWorker<E> { + EvaluationWorker { + cloner, + github_vend: tokio::sync::RwLock::new(github_vend), + acl, + identity, + events, + } + } +} + +impl<E: stats::SysEvents + 'static> worker::SimpleWorker for EvaluationWorker<E> { + type J = evaluationjob::EvaluationJob; + + async fn msg_to_job( + &mut self, + _: &str, + _: &Option<String>, + body: &[u8], + ) -> Result<Self::J, String> { + self.events.notify(Event::JobReceived).await; + match evaluationjob::from(body) { + Ok(job) => { + self.events.notify(Event::JobDecodeSuccess).await; + Ok(job) + } + Err(err) => { + self.events.notify(Event::JobDecodeFailure).await; + error!( + "Failed to decode message: {}, Err: {err:?}", + std::str::from_utf8(body).unwrap_or("<message not utf8>") + ); + Err("Failed to decode message".to_owned()) + } + } + } + + async fn consumer(&mut self, job: &evaluationjob::EvaluationJob) -> worker::Actions { + let span = debug_span!("job", pr = ?job.pr.number); + let _enter = span.enter(); + + let mut vending_machine = self.github_vend.write().await; + + let github_client = vending_machine + .for_repo(&job.repo.owner, &job.repo.name) + .await + .expect("Failed to get a github client token"); + + OneEval::new( + github_client, + &self.acl, + &mut self.events, + &self.identity, + &self.cloner, + job, + ) + .worker_actions() + .await + } +} + +struct OneEval<'a, E> { + client_app: &'a hubcaps::Github, + repo: hubcaps::repositories::Repository, + acl: &'a Acl, + events: &'a mut E, + identity: &'a str, + cloner: &'a checkout::CachedCloner, + job: &'a evaluationjob::EvaluationJob, +} + +impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> { + #[allow(clippy::too_many_arguments)] + fn new( + client_app: &'a hubcaps::Github, + acl: &'a Acl, + events: &'a mut E, + identity: &'a str, + cloner: &'a checkout::CachedCloner, + job: &'a evaluationjob::EvaluationJob, + ) -> OneEval<'a, E> { + let repo = client_app.repo(job.repo.owner.clone(), job.repo.name.clone()); + OneEval { + client_app, + repo, + acl, + events, + identity, + cloner, + job, + } + } + + fn actions(&self) -> evaluationjob::Actions { + evaluationjob::Actions {} + } + + async fn update_status( + &self, + description: String, + url: Option<String>, + state: hubcaps::statuses::State, + ) -> Result<(), CommitStatusError> { + let description = if description.len() >= 140 { + warn!( + "description is over 140 char; truncating: {:?}", + &description + ); + description.chars().take(140).collect() + } else { + description + }; + let repo = self + .client_app + .repo(self.job.repo.owner.clone(), self.job.repo.name.clone()); + let prefix = get_prefix(repo.statuses(), &self.job.pr.head_sha).await?; + + let mut builder = hubcaps::statuses::StatusOptions::builder(state); + builder.context(format!("{prefix}-eval")); + builder.description(description.clone()); + + if let Some(url) = url { + builder.target_url(url); + } + + info!( + "Updating status on {}:{} -> {}", + &self.job.pr.number, &self.job.pr.head_sha, &description + ); + + self.repo + .statuses() + .create(&self.job.pr.head_sha, &builder.build()) + .map_ok(|_| ()) + .map_err(|e| CommitStatusError::from(e)) + .await + } + + async fn worker_actions(&mut self) -> worker::Actions { + let eval_result = match self.evaluate_job().await { + Ok(v) => Ok(v), + Err(eval_error) => match eval_error { + // Handle error cases which expect us to post statuses + // to github. Convert Eval Errors in to Result<_, CommitStatusWrite> + EvalWorkerError::EvalError(eval::Error::Fail(msg)) => Err(self + .update_status(msg, None, hubcaps::statuses::State::Failure) + .await), + EvalWorkerError::EvalError(eval::Error::CommitStatusWrite(e)) => Err(Err(e)), + EvalWorkerError::CommitStatusWrite(e) => Err(Err(e)), + }, + }; + + match eval_result { + Ok(eval_actions) => { + let issue_ref = self.repo.issue(self.job.pr.number); + update_labels(&issue_ref, &[], &[String::from("tickborg-internal-error")]).await; + + eval_actions + } + Err(Ok(())) => { + // There was an error during eval, but we successfully + // updated the PR. + + let issue_ref = self.repo.issue(self.job.pr.number); + update_labels(&issue_ref, &[], &[String::from("tickborg-internal-error")]).await; + + self.actions().skip(self.job) + } + Err(Err(CommitStatusError::ExpiredCreds(e))) => { + error!("Failed writing commit status: creds expired: {:?}", e); + self.actions().retry_later(self.job) + } + Err(Err(CommitStatusError::InternalError(e))) => { + error!("Failed writing commit status: internal error: {:?}", e); + self.actions().retry_later(self.job) + } + Err(Err(CommitStatusError::MissingSha(e))) => { + error!( + "Failed writing commit status: commit sha was force-pushed away: {:?}", + e + ); + self.actions().skip(self.job) + } + + Err(Err(CommitStatusError::Error(cswerr))) => { + error!( + "Internal error writing commit status: {:?}, marking internal error", + cswerr + ); + let issue_ref = self.repo.issue(self.job.pr.number); + update_labels(&issue_ref, &[String::from("tickborg-internal-error")], &[]).await; + + self.actions().skip(self.job) + } + } + } + + async fn evaluate_job(&mut self) -> Result<worker::Actions, EvalWorkerError> { + let job = self.job; + let repo = self + .client_app + .repo(self.job.repo.owner.clone(), self.job.repo.name.clone()); + let issue_ref = repo.issue(job.pr.number); + let auto_schedule_build_archs: Vec<systems::System>; + + match issue_ref.get().await { + Ok(iss) => { + if iss.state == "closed" { + self.events.notify(Event::IssueAlreadyClosed).await; + info!("Skipping {} because it is closed", job.pr.number); + return Ok(self.actions().skip(job)); + } + + if issue_is_wip(&iss) { + auto_schedule_build_archs = vec![]; + } else { + auto_schedule_build_archs = self.acl.build_job_architectures_for_user_repo( + &iss.user.login, + &job.repo.full_name, + ); + } + } + + Err(e) => { + self.events.notify(Event::IssueFetchFailed).await; + error!("Error fetching {}!", job.pr.number); + error!("E: {:?}", e); + return Ok(self.actions().skip(job)); + } + }; + + let mut evaluation_strategy = eval::MonorepoStrategy::new(job, &issue_ref); + + let prefix = get_prefix(repo.statuses(), &job.pr.head_sha).await?; + + let mut overall_status = CommitStatus::new( + repo.statuses(), + job.pr.head_sha.clone(), + format!("{prefix}-eval"), + "Starting".to_owned(), + None, + ); + + overall_status + .set_with_description("Starting", hubcaps::statuses::State::Pending) + .await?; + + evaluation_strategy.pre_clone().await?; + + let project = self + .cloner + .project(&job.repo.full_name, job.repo.clone_url.clone()); + + overall_status + .set_with_description("Cloning project", hubcaps::statuses::State::Pending) + .await?; + + info!("Working on {}", job.pr.number); + let co = project + .clone_for("mr-est".to_string(), self.identity.to_string()) + .map_err(|e| { + EvalWorkerError::CommitStatusWrite(CommitStatusError::InternalError(format!( + "Cloning failed: {e}" + ))) + })?; + + let target_branch = match job.pr.target_branch.clone() { + Some(x) => x, + None => String::from("main"), + }; + + overall_status + .set_with_description( + format!("Checking out {}", &target_branch).as_ref(), + hubcaps::statuses::State::Pending, + ) + .await?; + info!("Checking out target branch {}", &target_branch); + let refpath = co + .checkout_origin_ref(target_branch.as_ref()) + .map_err(|e| { + EvalWorkerError::CommitStatusWrite(CommitStatusError::InternalError(format!( + "Checking out target branch failed: {e}" + ))) + })?; + + evaluation_strategy + .on_target_branch(Path::new(&refpath), &mut overall_status) + .await?; + + let target_branch_rebuild_sniff_start = Instant::now(); + + self.events + .notify(Event::EvaluationDuration( + target_branch.clone(), + target_branch_rebuild_sniff_start.elapsed().as_secs(), + )) + .await; + self.events + .notify(Event::EvaluationDurationCount(target_branch)) + .await; + + overall_status + .set_with_description("Fetching PR", hubcaps::statuses::State::Pending) + .await?; + + co.fetch_pr(job.pr.number).map_err(|e| { + EvalWorkerError::CommitStatusWrite(CommitStatusError::InternalError(format!( + "Fetching PR failed: {e}" + ))) + })?; + + if !co.commit_exists(job.pr.head_sha.as_ref()) { + overall_status + .set_with_description("Commit not found", hubcaps::statuses::State::Error) + .await?; + + info!("Commit {} doesn't exist", job.pr.head_sha); + return Ok(self.actions().skip(job)); + } + + evaluation_strategy.after_fetch(&co)?; + + overall_status + .set_with_description("Merging PR", hubcaps::statuses::State::Pending) + .await?; + + if co.merge_commit(job.pr.head_sha.as_ref()).is_err() { + overall_status + .set_with_description("Failed to merge", hubcaps::statuses::State::Failure) + .await?; + + info!("Failed to merge {}", job.pr.head_sha); + + return Ok(self.actions().skip(job)); + } + + evaluation_strategy.after_merge(&mut overall_status).await?; + + info!("Got path: {:?}, building", refpath); + overall_status + .set_with_description("Beginning Evaluations", hubcaps::statuses::State::Pending) + .await?; + + let eval_results: bool = futures::stream::iter(evaluation_strategy.evaluation_checks()) + .map(|check| { + // We need to clone or move variables into the async block + let repo_statuses = repo.statuses(); + let head_sha = job.pr.head_sha.clone(); + let refpath = refpath.clone(); + + async move { + let status = CommitStatus::new( + repo_statuses, + head_sha, + format!("{prefix}-eval-{}", check.name()), + check.cli_cmd(), + None, + ); + + status + .set(hubcaps::statuses::State::Pending) + .await + .expect("Failed to set status on eval strategy"); + + let state = match check.execute(Path::new(&refpath)) { + Ok(_) => hubcaps::statuses::State::Success, + Err(_) => hubcaps::statuses::State::Failure, + }; + + status + .set(state.clone()) + .await + .expect("Failed to set status on eval strategy"); + + if state == hubcaps::statuses::State::Success { + Ok(()) + } else { + Err(()) + } + } + }) + .buffered(1) + .all(|res| async move { res.is_ok() }) + .await; + + info!("Finished evaluations"); + let mut response: worker::Actions = vec![]; + + if eval_results { + let complete = evaluation_strategy + .all_evaluations_passed(&mut overall_status) + .await?; + + response.extend(schedule_builds(complete.builds, auto_schedule_build_archs)); + + overall_status + .set_with_description("^.^!", hubcaps::statuses::State::Success) + .await?; + } else { + overall_status + .set_with_description("Complete, with errors", hubcaps::statuses::State::Failure) + .await?; + } + + self.events.notify(Event::TaskEvaluationCheckComplete).await; + + info!("Evaluations done!"); + Ok(self.actions().done(job, response)) + } +} + +fn schedule_builds( + builds: Vec<buildjob::BuildJob>, + auto_schedule_build_archs: Vec<systems::System>, +) -> Vec<worker::Action> { + let mut response = vec![]; + info!( + "Scheduling build jobs {:?} on arches {:?}", + builds, auto_schedule_build_archs + ); + for buildjob in builds { + for arch in auto_schedule_build_archs.iter() { + let (exchange, routingkey) = arch.as_build_destination(); + response.push(worker::publish_serde_action( + exchange, routingkey, &buildjob, + )); + } + response.push(worker::publish_serde_action( + Some("build-results".to_string()), + None, + &buildjob::QueuedBuildJobs { + job: buildjob, + architectures: auto_schedule_build_archs + .iter() + .map(|arch| arch.to_string()) + .collect(), + }, + )); + } + + response +} + +pub async fn update_labels( + issueref: &hubcaps::issues::IssueRef, + add: &[String], + remove: &[String], +) { + let l = issueref.labels(); + let issue = issueref.get().await.expect("Failed to get issue"); + + let existing: Vec<String> = issue.labels.iter().map(|l| l.name.clone()).collect(); + + let to_add: Vec<&str> = add + .iter() + .filter(|l| !existing.contains(l)) // Remove labels already on the issue + .map(|l| l.as_ref()) + .collect(); + + let to_remove: Vec<String> = remove + .iter() + .filter(|l| existing.contains(l)) // Remove labels already on the issue + .cloned() + .collect(); + + let issue = issue.number; + + info!("Labeling issue #{issue}: + {to_add:?} , - {to_remove:?}, = {existing:?}"); + + l.add(to_add.clone()) + .await + .unwrap_or_else(|err| panic!("Failed to add labels {to_add:?} to issue #{issue}: {err:?}")); + + for label in to_remove { + l.remove(&label).await.unwrap_or_else(|err| { + panic!("Failed to remove label {label:?} from issue #{issue}: {err:?}") + }); + } +} + +fn issue_is_wip(issue: &hubcaps::issues::Issue) -> bool { + issue.title.starts_with("WIP:") || issue.title.contains("[WIP]") +} + +/// Determine whether or not to use the "old" status prefix, `grahamcofborg`, or +/// the new one, `tickborg`. +/// +/// If the PR already has any `grahamcofborg`-prefixed statuses, continue to use +/// that for backwards compatibility. Otherwise use the new prefix. +pub async fn get_prefix( + statuses: hubcaps::statuses::Statuses, + sha: &str, +) -> Result<&str, CommitStatusError> { + if statuses + .list(sha) + .await? + .iter() + .any(|s| s.context.starts_with("grahamcofborg-")) + { + Ok("grahamcofborg") + } else { + Ok("tickborg") + } +} + +enum EvalWorkerError { + EvalError(eval::Error), + CommitStatusWrite(CommitStatusError), +} + +impl From<eval::Error> for EvalWorkerError { + fn from(e: eval::Error) -> EvalWorkerError { + EvalWorkerError::EvalError(e) + } +} + +impl From<CommitStatusError> for EvalWorkerError { + fn from(e: CommitStatusError) -> EvalWorkerError { + EvalWorkerError::CommitStatusWrite(e) + } +} diff --git a/ofborg/tickborg/src/tasks/evaluationfilter.rs b/ofborg/tickborg/src/tasks/evaluationfilter.rs new file mode 100644 index 0000000000..85d61b6f3a --- /dev/null +++ b/ofborg/tickborg/src/tasks/evaluationfilter.rs @@ -0,0 +1,146 @@ +use crate::acl; +use crate::ghevent; +use crate::message::{Pr, Repo, evaluationjob}; +use crate::worker; + +use tracing::{debug_span, info}; + +pub struct EvaluationFilterWorker { + acl: acl::Acl, +} + +impl EvaluationFilterWorker { + pub fn new(acl: acl::Acl) -> EvaluationFilterWorker { + EvaluationFilterWorker { acl } + } +} + +impl worker::SimpleWorker for EvaluationFilterWorker { + type J = ghevent::PullRequestEvent; + + async fn msg_to_job( + &mut self, + _: &str, + _: &Option<String>, + body: &[u8], + ) -> Result<Self::J, String> { + match serde_json::from_slice(body) { + Ok(event) => Ok(event), + Err(err) => Err(format!( + "Failed to deserialize job {err:?}: {:?}", + std::str::from_utf8(body).unwrap_or("<job not utf8>") + )), + } + } + + async fn consumer(&mut self, job: &ghevent::PullRequestEvent) -> worker::Actions { + let span = debug_span!("job", pr = ?job.number); + let _enter = span.enter(); + + if !self.acl.is_repo_eligible(&job.repository.full_name) { + info!("Repo not authorized ({})", job.repository.full_name); + return vec![worker::Action::Ack]; + } + + if job.pull_request.state != ghevent::PullRequestState::Open { + info!( + "PR is not open ({}#{})", + job.repository.full_name, job.number + ); + return vec![worker::Action::Ack]; + } + + let interesting: bool = match job.action { + ghevent::PullRequestAction::Opened => true, + ghevent::PullRequestAction::Synchronize => true, + ghevent::PullRequestAction::Reopened => true, + ghevent::PullRequestAction::Edited => { + if let Some(ref changes) = job.changes { + changes.base.is_some() + } else { + false + } + } + _ => false, + }; + + if !interesting { + info!( + "Not interesting: {}#{} because of {:?}", + job.repository.full_name, job.number, job.action + ); + + return vec![worker::Action::Ack]; + } + + info!( + "Found {}#{} to be interesting because of {:?}", + job.repository.full_name, job.number, job.action + ); + let repo_msg = Repo { + clone_url: job.repository.clone_url.clone(), + full_name: job.repository.full_name.clone(), + owner: job.repository.owner.login.clone(), + name: job.repository.name.clone(), + }; + + let pr_msg = Pr { + number: job.number, + head_sha: job.pull_request.head.sha.clone(), + target_branch: Some(job.pull_request.base.git_ref.clone()), + }; + + let msg = evaluationjob::EvaluationJob { + repo: repo_msg, + pr: pr_msg, + }; + + vec![ + worker::publish_serde_action(None, Some("mass-rebuild-check-jobs".to_owned()), &msg), + worker::Action::Ack, + ] + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::worker::SimpleWorker; + + #[tokio::test] + async fn changed_base() { + let data = include_str!("../../test-srcs/events/pr-changed-base.json"); + + let job: ghevent::PullRequestEvent = + serde_json::from_str(data).expect("Should properly deserialize"); + + let mut worker = EvaluationFilterWorker::new(acl::Acl::new( + vec!["project-tick/Project-Tick".to_owned()], + Some(vec![]), + )); + + assert_eq!( + worker.consumer(&job).await, + vec![ + worker::publish_serde_action( + None, + Some("mass-rebuild-check-jobs".to_owned()), + &evaluationjob::EvaluationJob { + repo: Repo { + clone_url: String::from("https://github.com/project-tick/Project-Tick.git"), + full_name: String::from("project-tick/Project-Tick"), + owner: String::from("project-tick"), + name: String::from("Project-Tick"), + }, + pr: Pr { + number: 33299, + head_sha: String::from("887e8b460a7d45ddb3bbdebe01447b251b3229e8"), + target_branch: Some(String::from("staging")), + }, + } + ), + worker::Action::Ack, + ] + ); + } +} diff --git a/ofborg/tickborg/src/tasks/githubcommentfilter.rs b/ofborg/tickborg/src/tasks/githubcommentfilter.rs new file mode 100644 index 0000000000..2a27061577 --- /dev/null +++ b/ofborg/tickborg/src/tasks/githubcommentfilter.rs @@ -0,0 +1,182 @@ +use crate::acl; +use crate::commentparser; +use crate::ghevent; +use crate::message::{Pr, Repo, buildjob, evaluationjob}; +use crate::worker; + +use tracing::{debug_span, error, info}; +use uuid::Uuid; + +pub struct GitHubCommentWorker { + acl: acl::Acl, + github: hubcaps::Github, +} + +impl GitHubCommentWorker { + pub fn new(acl: acl::Acl, github: hubcaps::Github) -> GitHubCommentWorker { + GitHubCommentWorker { acl, github } + } +} + +impl worker::SimpleWorker for GitHubCommentWorker { + type J = ghevent::IssueComment; + + async fn msg_to_job( + &mut self, + _: &str, + _: &Option<String>, + body: &[u8], + ) -> Result<Self::J, String> { + match serde_json::from_slice(body) { + Ok(comment) => Ok(comment), + Err(err) => { + error!( + "Failed to deserialize IsssueComment: {:?}", + std::str::from_utf8(body).unwrap_or("<not utf8>") + ); + panic!("{err:?}"); + } + } + } + + // FIXME: remove with rust/cargo update + #[allow(clippy::cognitive_complexity)] + async fn consumer(&mut self, job: &ghevent::IssueComment) -> worker::Actions { + let span = debug_span!("job", pr = ?job.issue.number); + let _enter = span.enter(); + + if job.action == ghevent::IssueCommentAction::Deleted + || job.action == ghevent::IssueCommentAction::Pinned + || job.action == ghevent::IssueCommentAction::Unpinned + { + return vec![worker::Action::Ack]; + } + + let instructions = commentparser::parse(&job.comment.body); + if instructions.is_none() { + return vec![worker::Action::Ack]; + } + + let build_destinations = self.acl.build_job_architectures_for_user_repo( + &job.comment.user.login, + &job.repository.full_name, + ); + + if build_destinations.is_empty() { + info!("No build destinations for: {:?}", job); + // Don't process comments if they can't build anything + return vec![worker::Action::Ack]; + } + + info!("Got job: {:?}", job); + + let instructions = commentparser::parse(&job.comment.body); + info!("Instructions: {:?}", instructions); + + let pr = self + .github + .repo( + job.repository.owner.login.clone(), + job.repository.name.clone(), + ) + .pulls() + .get(job.issue.number) + .get() + .await; + + if let Err(x) = pr { + info!( + "fetching PR {}#{} from GitHub yielded error {}", + job.repository.full_name, job.issue.number, x + ); + return vec![worker::Action::Ack]; + } + + let pr = pr.unwrap(); + + let repo_msg = Repo { + clone_url: job.repository.clone_url.clone(), + full_name: job.repository.full_name.clone(), + owner: job.repository.owner.login.clone(), + name: job.repository.name.clone(), + }; + + let pr_msg = Pr { + number: job.issue.number, + head_sha: pr.head.sha.clone(), + target_branch: Some(pr.base.commit_ref), + }; + + let mut response: Vec<worker::Action> = vec![]; + if let Some(instructions) = instructions { + for instruction in instructions { + match instruction { + commentparser::Instruction::Build(subset, attrs) => { + let build_destinations = build_destinations.clone(); + + let msg = buildjob::BuildJob::new( + repo_msg.clone(), + pr_msg.clone(), + subset, + attrs, + None, + None, + Uuid::new_v4().to_string(), + ); + + for arch in build_destinations.iter() { + let (exchange, routingkey) = arch.as_build_destination(); + response.push(worker::publish_serde_action(exchange, routingkey, &msg)); + } + + response.push(worker::publish_serde_action( + Some("build-results".to_string()), + None, + &buildjob::QueuedBuildJobs { + job: msg, + architectures: build_destinations + .iter() + .cloned() + .map(|arch| arch.to_string()) + .collect(), + }, + )); + } + commentparser::Instruction::Test(attrs) => { + let msg = buildjob::BuildJob::new( + repo_msg.clone(), + pr_msg.clone(), + commentparser::Subset::Project, + attrs, + None, + None, + Uuid::new_v4().to_string(), + ); + + for arch in build_destinations.iter() { + if arch.can_run_tests() { + let (exchange, routingkey) = arch.as_build_destination(); + response.push(worker::publish_serde_action(exchange, routingkey, &msg)); + } + } + } + commentparser::Instruction::Eval => { + let msg = evaluationjob::EvaluationJob { + repo: repo_msg.clone(), + pr: pr_msg.clone(), + }; + + response.push(worker::publish_serde_action( + None, + Some("mass-rebuild-check-jobs".to_owned()), + &msg, + )); + } + } + } + } + + response.push(worker::Action::Ack); + response + } +} diff --git a/ofborg/tickborg/src/tasks/githubcommentposter.rs b/ofborg/tickborg/src/tasks/githubcommentposter.rs new file mode 100644 index 0000000000..70c4a118e4 --- /dev/null +++ b/ofborg/tickborg/src/tasks/githubcommentposter.rs @@ -0,0 +1,765 @@ +use crate::config::GithubAppVendingMachine; +use crate::message::Repo; +use crate::message::buildjob::{BuildJob, QueuedBuildJobs}; +use crate::message::buildresult::{BuildResult, BuildStatus, LegacyBuildResult}; +use crate::worker; + +use chrono::{DateTime, Utc}; +use hubcaps::checks::{CheckRunOptions, CheckRunState, Conclusion, Output}; +use tracing::{debug, debug_span, info, warn}; + +pub struct GitHubCommentPoster { + github_vend: GithubAppVendingMachine, +} + +impl GitHubCommentPoster { + pub fn new(github_vend: GithubAppVendingMachine) -> GitHubCommentPoster { + GitHubCommentPoster { github_vend } + } +} + +pub enum PostableEvent { + BuildQueued(QueuedBuildJobs), + BuildFinished(BuildResult), +} + +impl PostableEvent { + fn from(bytes: &[u8]) -> Result<PostableEvent, String> { + match serde_json::from_slice::<QueuedBuildJobs>(bytes) { + Ok(e) => Ok(PostableEvent::BuildQueued(e)), + Err(_) => match serde_json::from_slice::<BuildResult>(bytes) { + Ok(e) => Ok(PostableEvent::BuildFinished(e)), + Err(e) => Err(format!( + "Failed to deserialize PostableEvent: {:?}, err: {:}", + String::from_utf8_lossy(bytes), + e + )), + }, + } + } +} + +impl worker::SimpleWorker for GitHubCommentPoster { + type J = PostableEvent; + + async fn msg_to_job( + &mut self, + _: &str, + _: &Option<String>, + body: &[u8], + ) -> Result<Self::J, String> { + PostableEvent::from(body) + } + + async fn consumer(&mut self, job: &PostableEvent) -> worker::Actions { + let mut checks: Vec<CheckRunOptions> = vec![]; + let repo: Repo; + + let pr = match job { + PostableEvent::BuildQueued(queued_job) => { + repo = queued_job.job.repo.clone(); + for architecture in queued_job.architectures.iter() { + checks.push(job_to_check(&queued_job.job, architecture, Utc::now())); + } + queued_job.job.pr.to_owned() + } + PostableEvent::BuildFinished(finished_job) => { + let result = finished_job.legacy(); + repo = result.repo.clone(); + checks.push(result_to_check(&result, Utc::now())); + finished_job.pr() + } + }; + + let span = debug_span!("job", pr = ?pr.number); + let _enter = span.enter(); + + for check in checks { + info!( + "check {:?} {} {}", + check.status, + check.name, + check.details_url.as_ref().unwrap_or(&String::from("-")) + ); + debug!("{:?}", check); + + let check_create_attempt = self + .github_vend + .for_repo(&repo.owner, &repo.name) + .await + .unwrap() + .repo(repo.owner.clone(), repo.name.clone()) + .checkruns() + .create(&check) + .await; + + match check_create_attempt { + Ok(_) => info!("Successfully sent."), + Err(err) => warn!("Failed to send check {:?}", err), + } + } + + vec![worker::Action::Ack] + } +} + +fn job_to_check(job: &BuildJob, architecture: &str, timestamp: DateTime<Utc>) -> CheckRunOptions { + let mut all_attrs: Vec<String> = job.attrs.clone(); + all_attrs.sort(); + + if all_attrs.is_empty() { + all_attrs = vec![String::from("(unknown attributes)")]; + } + + CheckRunOptions { + name: format!("{} on {architecture}", all_attrs.join(", ")), + actions: None, + completed_at: None, + started_at: Some(timestamp.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)), + conclusion: None, + details_url: Some(format!( + "https://logs.tickborg.project-tick.net/?key={}/{}.{}", + &job.repo.owner.to_lowercase(), + &job.repo.name.to_lowercase(), + job.pr.number, + )), + external_id: None, + head_sha: job.pr.head_sha.clone(), + output: None, + status: Some(CheckRunState::Queued), + } +} + +fn result_to_check(result: &LegacyBuildResult, timestamp: DateTime<Utc>) -> CheckRunOptions { + let mut all_attrs: Vec<String> = + vec![result.attempted_attrs.clone(), result.skipped_attrs.clone()] + .into_iter() + .map(|opt| opt.unwrap_or_else(|| vec![])) + .flat_map(|list| list.into_iter()) + .collect(); + all_attrs.sort(); + + if all_attrs.is_empty() { + all_attrs = vec![String::from("(unknown attributes)")]; + } + + let conclusion: Conclusion = result.status.clone().into(); + + let mut summary: Vec<String> = vec![]; + if let Some(ref attempted) = result.attempted_attrs { + summary.extend(list_segment("Attempted", attempted)); + } + + if result.status == BuildStatus::TimedOut { + summary.push(String::from("Build timed out.")); + } + + if let Some(ref skipped) = result.skipped_attrs { + summary.extend(list_segment( + &format!( + "The following builds were skipped because they don't evaluate on {}", + result.system + ), + skipped, + )); + } + + // Allow the clippy violation for improved readability + #[allow(clippy::vec_init_then_push)] + let text: String = if !result.output.is_empty() { + let mut reply: Vec<String> = vec![]; + + reply.push("## Partial log".to_owned()); + reply.push("".to_owned()); + reply.push("```".to_owned()); + reply.extend(result.output.clone()); + reply.push("```".to_owned()); + + reply.join("\n") + } else { + String::from("No partial log is available.") + }; + + CheckRunOptions { + name: format!("{} on {}", all_attrs.join(", "), result.system), + actions: None, + completed_at: Some(timestamp.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)), + started_at: None, + conclusion: Some(conclusion), + details_url: Some(format!( + "https://logs.tickborg.project-tick.net/?key={}/{}.{}&attempt_id={}", + &result.repo.owner.to_lowercase(), + &result.repo.name.to_lowercase(), + result.pr.number, + result.attempt_id, + )), + external_id: Some(result.attempt_id.clone()), + head_sha: result.pr.head_sha.clone(), + + output: Some(Output { + annotations: None, + images: None, + summary: summary.join("\n"), + text: Some(text), + title: result.status.clone().into(), + }), + status: Some(CheckRunState::Completed), + } +} + +fn list_segment(name: &str, things: &[String]) -> Vec<String> { + let mut reply: Vec<String> = vec![]; + + if !things.is_empty() { + reply.push(format!("{name}: {}", things.join(", "))); + reply.push("".to_owned()); + } + + reply +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::message::{Pr, Repo}; + use chrono::TimeZone; + + #[test] + pub fn test_queued_build() { + let job = BuildJob { + repo: Repo { + clone_url: "https://github.com/project-tick/Project-Tick.git".to_owned(), + full_name: "project-tick/Project-Tick".to_owned(), + owner: "project-tick".to_owned(), + name: "Project-Tick".to_owned(), + }, + pr: Pr { + head_sha: "abc123".to_owned(), + number: 2345, + target_branch: Some("master".to_owned()), + }, + logs: None, + statusreport: None, + subset: None, + + request_id: "bogus-request-id".to_owned(), + attrs: vec!["foo".to_owned(), "bar".to_owned()], + }; + + let timestamp = Utc.with_ymd_and_hms(2023, 4, 20, 13, 37, 42).unwrap(); + assert_eq!( + job_to_check(&job, "x86_64-linux", timestamp), + CheckRunOptions { + name: "bar, foo on x86_64-linux".to_string(), + actions: None, + started_at: Some("2023-04-20T13:37:42Z".to_string()), + completed_at: None, + status: Some(CheckRunState::Queued), + conclusion: None, + details_url: Some("https://logs.tickborg.project-tick.net/?key=project-tick/Project-Tick.2345".to_string()), + external_id: None, + head_sha: "abc123".to_string(), + output: None, + } + ); + } + + #[test] + pub fn test_check_passing_build() { + let result = LegacyBuildResult { + repo: Repo { + clone_url: "https://github.com/project-tick/Project-Tick.git".to_owned(), + full_name: "project-tick/Project-Tick".to_owned(), + owner: "project-tick".to_owned(), + name: "Project-Tick".to_owned(), + }, + pr: Pr { + head_sha: "abc123".to_owned(), + number: 2345, + target_branch: Some("master".to_owned()), + }, + output: vec![ + "make[2]: Entering directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline'".to_owned(), + "make[2]: Nothing to be done for 'install'.".to_owned(), + "make[2]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline'".to_owned(), + "make[1]: Nothing to be done for 'install-target'.".to_owned(), + "make[1]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1'".to_owned(), + "removed '/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1/share/info/bfd.info'".to_owned(), + "post-installation fixup".to_owned(), + "strip is /nix/store/5a88zk3jgimdmzg8rfhvm93kxib3njf9-cctools-binutils-darwin/bin/strip".to_owned(), + "patching script interpreter paths in /nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1".to_owned(), + "/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1".to_owned(), + ], + attempt_id: "neatattemptid".to_owned(), + request_id: "bogus-request-id".to_owned(), + system: "x86_64-linux".to_owned(), + attempted_attrs: Some(vec!["foo".to_owned()]), + skipped_attrs: Some(vec!["bar".to_owned()]), + status: BuildStatus::Success, + }; + + let timestamp = Utc.with_ymd_and_hms(2023, 4, 20, 13, 37, 42).unwrap(); + + assert_eq!( + result_to_check(&result, timestamp), + CheckRunOptions { + name: "bar, foo on x86_64-linux".to_string(), + actions: None, + started_at: None, + completed_at: Some("2023-04-20T13:37:42Z".to_string()), + status: Some(CheckRunState::Completed), + conclusion: Some(Conclusion::Success), + details_url: Some( + "https://logs.tickborg.project-tick.net/?key=project-tick/Project-Tick.2345&attempt_id=neatattemptid" + .to_string() + ), + external_id: Some("neatattemptid".to_string()), + head_sha: "abc123".to_string(), + output: Some(Output { + title: "Success".to_string(), + summary: "Attempted: foo + +The following builds were skipped because they don't evaluate on x86_64-linux: bar +" + .to_string(), + text: Some( + "## Partial log + +``` +make[2]: Entering directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline' +make[2]: Nothing to be done for 'install'. +make[2]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline' +make[1]: Nothing to be done for 'install-target'. +make[1]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1' +removed '/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1/share/info/bfd.info' +post-installation fixup +strip is /nix/store/5a88zk3jgimdmzg8rfhvm93kxib3njf9-cctools-binutils-darwin/bin/strip +patching script interpreter paths in /nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1 +/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1 +```" + .to_string() + ), + annotations: None, + images: None, + }) + } + ); + } + + #[test] + pub fn test_check_failing_build() { + let result = LegacyBuildResult { + repo: Repo { + clone_url: "https://github.com/project-tick/Project-Tick.git".to_owned(), + full_name: "project-tick/Project-Tick".to_owned(), + owner: "project-tick".to_owned(), + name: "Project-Tick".to_owned(), + }, + pr: Pr { + head_sha: "abc123".to_owned(), + number: 2345, + target_branch: Some("master".to_owned()), + }, + output: vec![ + "make[2]: Entering directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline'".to_owned(), + "make[2]: Nothing to be done for 'install'.".to_owned(), + "make[2]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline'".to_owned(), + "make[1]: Nothing to be done for 'install-target'.".to_owned(), + "make[1]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1'".to_owned(), + "removed '/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1/share/info/bfd.info'".to_owned(), + "post-installation fixup".to_owned(), + "strip is /nix/store/5a88zk3jgimdmzg8rfhvm93kxib3njf9-cctools-binutils-darwin/bin/strip".to_owned(), + "patching script interpreter paths in /nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1".to_owned(), + "/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1".to_owned(), + ], + attempt_id: "neatattemptid".to_owned(), + request_id: "bogus-request-id".to_owned(), + system: "x86_64-linux".to_owned(), + attempted_attrs: Some(vec!["foo".to_owned()]), + skipped_attrs: None, + status: BuildStatus::Failure, + }; + + let timestamp = Utc.with_ymd_and_hms(2023, 4, 20, 13, 37, 42).unwrap(); + + assert_eq!( + result_to_check(&result, timestamp), + CheckRunOptions { + name: "foo on x86_64-linux".to_string(), + actions: None, + started_at: None, + completed_at: Some("2023-04-20T13:37:42Z".to_string()), + status: Some(CheckRunState::Completed), + conclusion: Some(Conclusion::Neutral), + details_url: Some( + "https://logs.tickborg.project-tick.net/?key=project-tick/Project-Tick.2345&attempt_id=neatattemptid" + .to_string() + ), + external_id: Some("neatattemptid".to_string()), + head_sha: "abc123".to_string(), + output: Some(Output { + title: "Failure".to_string(), + summary: "Attempted: foo +" + .to_string(), + text: Some( + "## Partial log + +``` +make[2]: Entering directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline' +make[2]: Nothing to be done for 'install'. +make[2]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline' +make[1]: Nothing to be done for 'install-target'. +make[1]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1' +removed '/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1/share/info/bfd.info' +post-installation fixup +strip is /nix/store/5a88zk3jgimdmzg8rfhvm93kxib3njf9-cctools-binutils-darwin/bin/strip +patching script interpreter paths in /nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1 +/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1 +```" + .to_string() + ), + annotations: None, + images: None, + }) + } + ); + } + + #[test] + pub fn test_check_timedout_build() { + let result = LegacyBuildResult { + repo: Repo { + clone_url: "https://github.com/project-tick/Project-Tick.git".to_owned(), + full_name: "project-tick/Project-Tick".to_owned(), + owner: "project-tick".to_owned(), + name: "Project-Tick".to_owned(), + }, + pr: Pr { + head_sha: "abc123".to_owned(), + number: 2345, + target_branch: Some("master".to_owned()), + }, + output: vec![ + "make[2]: Entering directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline'".to_owned(), + "make[2]: Nothing to be done for 'install'.".to_owned(), + "make[2]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline'".to_owned(), + "make[1]: Nothing to be done for 'install-target'.".to_owned(), + "make[1]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1'".to_owned(), + "removed '/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1/share/info/bfd.info'".to_owned(), + "post-installation fixup".to_owned(), + "building of '/nix/store/l1limh50lx2cx45yb2gqpv7k8xl1mik2-gdb-8.1.drv' timed out after 1 seconds".to_owned(), + "error: build of '/nix/store/l1limh50lx2cx45yb2gqpv7k8xl1mik2-gdb-8.1.drv' failed".to_owned(), + ], + attempt_id: "neatattemptid".to_owned(), + request_id: "bogus-request-id".to_owned(), + system: "x86_64-linux".to_owned(), + attempted_attrs: Some(vec!["foo".to_owned()]), + skipped_attrs: None, + status: BuildStatus::TimedOut, + }; + + let timestamp = Utc.with_ymd_and_hms(2023, 4, 20, 13, 37, 42).unwrap(); + + assert_eq!( + result_to_check(&result, timestamp), + CheckRunOptions { + name: "foo on x86_64-linux".to_string(), + actions: None, + started_at: None, + completed_at: Some("2023-04-20T13:37:42Z".to_string()), + status: Some(CheckRunState::Completed), + conclusion: Some(Conclusion::Neutral), + details_url: Some( + "https://logs.tickborg.project-tick.net/?key=project-tick/Project-Tick.2345&attempt_id=neatattemptid" + .to_string() + ), + external_id: Some("neatattemptid".to_string()), + head_sha: "abc123".to_string(), + output: Some(Output { + title: "Timed out, unknown build status".to_string(), + summary: "Attempted: foo + +Build timed out." + .to_string(), + text: Some( + "## Partial log + +``` +make[2]: Entering directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline' +make[2]: Nothing to be done for 'install'. +make[2]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline' +make[1]: Nothing to be done for 'install-target'. +make[1]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1' +removed '/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1/share/info/bfd.info' +post-installation fixup +building of '/nix/store/l1limh50lx2cx45yb2gqpv7k8xl1mik2-gdb-8.1.drv' timed out after 1 seconds +error: build of '/nix/store/l1limh50lx2cx45yb2gqpv7k8xl1mik2-gdb-8.1.drv' failed +```" + .to_string() + ), + annotations: None, + images: None, + }) + } + ); + } + + #[test] + pub fn test_check_passing_build_unspecified_attributes() { + let result = LegacyBuildResult { + repo: Repo { + clone_url: "https://github.com/project-tick/Project-Tick.git".to_owned(), + full_name: "project-tick/Project-Tick".to_owned(), + owner: "project-tick".to_owned(), + name: "Project-Tick".to_owned(), + }, + pr: Pr { + head_sha: "abc123".to_owned(), + number: 2345, + target_branch: Some("master".to_owned()), + }, + output: vec![ + "make[2]: Entering directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline'".to_owned(), + "make[2]: Nothing to be done for 'install'.".to_owned(), + "make[2]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline'".to_owned(), + "make[1]: Nothing to be done for 'install-target'.".to_owned(), + "make[1]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1'".to_owned(), + "removed '/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1/share/info/bfd.info'".to_owned(), + "post-installation fixup".to_owned(), + "strip is /nix/store/5a88zk3jgimdmzg8rfhvm93kxib3njf9-cctools-binutils-darwin/bin/strip".to_owned(), + "patching script interpreter paths in /nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1".to_owned(), + "/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1".to_owned(), + ], + attempt_id: "neatattemptid".to_owned(), + request_id: "bogus-request-id".to_owned(), + system: "x86_64-linux".to_owned(), + attempted_attrs: None, + skipped_attrs: None, + status: BuildStatus::Success, + }; + + let timestamp = Utc.with_ymd_and_hms(2023, 4, 20, 13, 37, 42).unwrap(); + + assert_eq!( + result_to_check(&result, timestamp), + CheckRunOptions { + name: "(unknown attributes) on x86_64-linux".to_string(), + actions: None, + started_at: None, + completed_at: Some("2023-04-20T13:37:42Z".to_string()), + status: Some(CheckRunState::Completed), + conclusion: Some(Conclusion::Success), + details_url: Some( + "https://logs.tickborg.project-tick.net/?key=project-tick/Project-Tick.2345&attempt_id=neatattemptid" + .to_string() + ), + external_id: Some("neatattemptid".to_string()), + head_sha: "abc123".to_string(), + output: Some(Output { + title: "Success".to_string(), + summary: "".to_string(), + text: Some( + "## Partial log + +``` +make[2]: Entering directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline' +make[2]: Nothing to be done for 'install'. +make[2]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline' +make[1]: Nothing to be done for 'install-target'. +make[1]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1' +removed '/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1/share/info/bfd.info' +post-installation fixup +strip is /nix/store/5a88zk3jgimdmzg8rfhvm93kxib3njf9-cctools-binutils-darwin/bin/strip +patching script interpreter paths in /nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1 +/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1 +```" + .to_string() + ), + annotations: None, + images: None, + }) + } + ); + } + + #[test] + pub fn test_check_failing_build_unspecified_attributes() { + let result = LegacyBuildResult { + repo: Repo { + clone_url: "https://github.com/project-tick/Project-Tick.git".to_owned(), + full_name: "project-tick/Project-Tick".to_owned(), + owner: "project-tick".to_owned(), + name: "Project-Tick".to_owned(), + }, + pr: Pr { + head_sha: "abc123".to_owned(), + number: 2345, + target_branch: Some("master".to_owned()), + }, + output: vec![ + "make[2]: Entering directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline'".to_owned(), + "make[2]: Nothing to be done for 'install'.".to_owned(), + "make[2]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline'".to_owned(), + "make[1]: Nothing to be done for 'install-target'.".to_owned(), + "make[1]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1'".to_owned(), + "removed '/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1/share/info/bfd.info'".to_owned(), + "post-installation fixup".to_owned(), + "strip is /nix/store/5a88zk3jgimdmzg8rfhvm93kxib3njf9-cctools-binutils-darwin/bin/strip".to_owned(), + "patching script interpreter paths in /nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1".to_owned(), + "/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1".to_owned(), + ], + attempt_id: "neatattemptid".to_owned(), + request_id: "bogus-request-id".to_owned(), + system: "x86_64-linux".to_owned(), + attempted_attrs: None, + skipped_attrs: None, + status: BuildStatus::Failure, + }; + + let timestamp = Utc.with_ymd_and_hms(2023, 4, 20, 13, 37, 42).unwrap(); + + assert_eq!( + result_to_check(&result, timestamp), + CheckRunOptions { + name: "(unknown attributes) on x86_64-linux".to_string(), + actions: None, + started_at: None, + completed_at: Some("2023-04-20T13:37:42Z".to_string()), + status: Some(CheckRunState::Completed), + conclusion: Some(Conclusion::Neutral), + details_url: Some( + "https://logs.tickborg.project-tick.net/?key=project-tick/Project-Tick.2345&attempt_id=neatattemptid" + .to_string() + ), + external_id: Some("neatattemptid".to_string()), + head_sha: "abc123".to_string(), + output: Some(Output { + title: "Failure".to_string(), + summary: "".to_string(), + text: Some( + "## Partial log + +``` +make[2]: Entering directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline' +make[2]: Nothing to be done for 'install'. +make[2]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1/readline' +make[1]: Nothing to be done for 'install-target'. +make[1]: Leaving directory '/private/tmp/nix-build-gdb-8.1.drv-0/gdb-8.1' +removed '/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1/share/info/bfd.info' +post-installation fixup +strip is /nix/store/5a88zk3jgimdmzg8rfhvm93kxib3njf9-cctools-binutils-darwin/bin/strip +patching script interpreter paths in /nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1 +/nix/store/pcja75y9isdvgz5i00pkrpif9rxzxc29-gdb-8.1 +```" + .to_string() + ), + annotations: None, + images: None, + }) + } + ); + } + + #[test] + pub fn test_check_no_attempt() { + let result = LegacyBuildResult { + repo: Repo { + clone_url: "https://github.com/project-tick/Project-Tick.git".to_owned(), + full_name: "project-tick/Project-Tick".to_owned(), + owner: "project-tick".to_owned(), + name: "Project-Tick".to_owned(), + }, + pr: Pr { + head_sha: "abc123".to_owned(), + number: 2345, + target_branch: Some("master".to_owned()), + }, + output: vec!["foo".to_owned()], + attempt_id: "neatattemptid".to_owned(), + request_id: "bogus-request-id".to_owned(), + system: "x86_64-linux".to_owned(), + attempted_attrs: None, + skipped_attrs: Some(vec!["not-attempted".to_owned()]), + status: BuildStatus::Skipped, + }; + + let timestamp = Utc.with_ymd_and_hms(2023, 4, 20, 13, 37, 42).unwrap(); + + assert_eq!( + result_to_check(&result, timestamp), + CheckRunOptions { + name: "not-attempted on x86_64-linux".to_string(), + actions: None, + started_at: None, + completed_at: Some("2023-04-20T13:37:42Z".to_string()), + status: Some(CheckRunState::Completed), + conclusion: Some(Conclusion::Skipped), + details_url: Some("https://logs.tickborg.project-tick.net/?key=project-tick/Project-Tick.2345&attempt_id=neatattemptid".to_string()), + external_id: Some("neatattemptid".to_string()), + head_sha: "abc123".to_string(), + output: Some(Output { + title: "No attempt".to_string(), + summary: "The following builds were skipped because they don\'t evaluate on x86_64-linux: not-attempted +".to_string(), + text: Some("## Partial log + +``` +foo +```".to_string()), + annotations: None, + images: None, + }) + } + ); + } + + #[test] + pub fn test_check_no_attempt_no_log() { + let result = LegacyBuildResult { + repo: Repo { + clone_url: "https://github.com/project-tick/Project-Tick.git".to_owned(), + full_name: "project-tick/Project-Tick".to_owned(), + owner: "project-tick".to_owned(), + name: "Project-Tick".to_owned(), + }, + pr: Pr { + head_sha: "abc123".to_owned(), + number: 2345, + target_branch: Some("master".to_owned()), + }, + output: vec![], + attempt_id: "neatattemptid".to_owned(), + request_id: "bogus-request-id".to_owned(), + system: "x86_64-linux".to_owned(), + attempted_attrs: None, + skipped_attrs: Some(vec!["not-attempted".to_owned()]), + status: BuildStatus::Skipped, + }; + + let timestamp = Utc.with_ymd_and_hms(2023, 4, 20, 13, 37, 42).unwrap(); + + assert_eq!( + result_to_check(&result, timestamp), + CheckRunOptions { + name: "not-attempted on x86_64-linux".to_string(), + actions: None, + started_at: None, + completed_at: Some("2023-04-20T13:37:42Z".to_string()), + status: Some(CheckRunState::Completed), + conclusion: Some(Conclusion::Skipped), + details_url: Some("https://logs.tickborg.project-tick.net/?key=project-tick/Project-Tick.2345&attempt_id=neatattemptid".to_string()), + external_id: Some("neatattemptid".to_string()), + head_sha: "abc123".to_string(), + output: Some(Output { + title: "No attempt".to_string(), + summary: "The following builds were skipped because they don\'t evaluate on x86_64-linux: not-attempted +".to_string(), + text: Some("No partial log is available.".to_string()), + annotations: None, + images: None, + }) + } + ); + } +} diff --git a/ofborg/tickborg/src/tasks/log_message_collector.rs b/ofborg/tickborg/src/tasks/log_message_collector.rs new file mode 100644 index 0000000000..2d80f72f03 --- /dev/null +++ b/ofborg/tickborg/src/tasks/log_message_collector.rs @@ -0,0 +1,487 @@ +use crate::message::buildlogmsg::{BuildLogMsg, BuildLogStart}; +use crate::message::buildresult::BuildResult; +use crate::worker; +use crate::writetoline::LineWriter; + +use std::fs::{self, File, OpenOptions}; +use std::io::Write; +use std::path::{Component, Path, PathBuf}; + +use lru_cache::LruCache; +use tracing::warn; + +#[derive(Eq, PartialEq, Hash, Debug, Clone)] +pub struct LogFrom { + routing_key: String, + attempt_id: String, +} + +pub struct LogMessageCollector { + handles: LruCache<LogFrom, LineWriter>, + log_root: PathBuf, +} + +#[derive(Debug)] +enum MsgType { + Start(BuildLogStart), + Msg(BuildLogMsg), + Finish(Box<BuildResult>), +} + +#[derive(Debug)] +pub struct LogMessage { + from: LogFrom, + message: MsgType, +} + +fn validate_path_segment(segment: &Path) -> Result<(), String> { + let components = segment.components(); + + if components.count() == 0 { + return Err(String::from("Segment has no components")); + } + + if segment.components().all(|component| match component { + Component::Normal(_) => true, + e => { + warn!("Invalid path component: {:?}", e); + false + } + }) { + Ok(()) + } else { + Err(String::from("Path contained invalid components")) + } +} + +impl LogMessageCollector { + pub fn new(log_root: PathBuf, max_open: usize) -> LogMessageCollector { + LogMessageCollector { + handles: LruCache::new(max_open), + log_root, + } + } + + pub fn write_metadata(&mut self, from: &LogFrom, data: &BuildLogStart) -> Result<(), String> { + let metapath = self.path_for_metadata(from)?; + let mut fp = self.open_file(&metapath)?; + + match serde_json::to_string(data) { + Ok(data) => { + if let Err(err) = fp.write(data.as_bytes()) { + Err(format!("Failed to write metadata: {err:?}")) + } else { + Ok(()) + } + } + Err(err) => Err(format!("Failed to stringify metadata: {err:?}")), + } + } + + pub fn write_result(&mut self, from: &LogFrom, data: &BuildResult) -> Result<(), String> { + let path = self.path_for_result(from)?; + let mut fp = self.open_file(&path)?; + + match serde_json::to_string(data) { + Ok(data) => { + if let Err(err) = fp.write(data.as_bytes()) { + Err(format!("Failed to write result: {err:?}")) + } else { + Ok(()) + } + } + Err(err) => Err(format!("Failed to stringify result: {err:?}")), + } + } + + pub fn handle_for(&mut self, from: &LogFrom) -> Result<&mut LineWriter, String> { + if self.handles.contains_key(from) { + Ok(self + .handles + .get_mut(from) + .expect("handles just contained the key")) + } else { + let logpath = self.path_for_log(from)?; + let fp = self.open_file(&logpath)?; + let writer = LineWriter::new(fp); + self.handles.insert(from.clone(), writer); + if let Some(handle) = self.handles.get_mut(from) { + Ok(handle) + } else { + Err(String::from( + "A just-inserted value should already be there", + )) + } + } + } + + fn path_for_metadata(&self, from: &LogFrom) -> Result<PathBuf, String> { + let mut path = self.path_for_log(from)?; + path.set_extension("metadata.json"); + Ok(path) + } + + fn path_for_result(&self, from: &LogFrom) -> Result<PathBuf, String> { + let mut path = self.path_for_log(from)?; + path.set_extension("result.json"); + Ok(path) + } + + fn path_for_log(&self, from: &LogFrom) -> Result<PathBuf, String> { + let mut location = self.log_root.clone(); + + let routing_key = PathBuf::from(from.routing_key.clone()); + validate_path_segment(&routing_key)?; + location.push(routing_key); + + let attempt_id = PathBuf::from(from.attempt_id.clone()); + validate_path_segment(&attempt_id)?; + location.push(attempt_id); + + if location.starts_with(&self.log_root) { + Ok(location) + } else { + Err(format!( + "Calculating the log location for {from:?} resulted in an invalid path {location:?}" + )) + } + } + + fn open_file(&self, path: &Path) -> Result<File, String> { + let dir = path.parent().unwrap(); + fs::create_dir_all(dir).unwrap(); + + let attempt = OpenOptions::new() + .append(true) + .read(true) + .create(true) + .open(path); + + match attempt { + Ok(handle) => Ok(handle), + Err(err) => Err(format!( + "Failed to open the file for {path:?}, err: {err:?}" + )), + } + } +} + +impl worker::SimpleWorker for LogMessageCollector { + type J = LogMessage; + + async fn msg_to_job( + &mut self, + routing_key: &str, + _: &Option<String>, + body: &[u8], + ) -> Result<Self::J, String> { + let message: MsgType; + let attempt_id: String; + + let decode_msg: Result<BuildLogMsg, _> = serde_json::from_slice(body); + if let Ok(msg) = decode_msg { + attempt_id = msg.attempt_id.clone(); + message = MsgType::Msg(msg); + } else { + let decode_msg: Result<BuildLogStart, _> = serde_json::from_slice(body); + if let Ok(msg) = decode_msg { + attempt_id = msg.attempt_id.clone(); + message = MsgType::Start(msg); + } else { + let decode_msg: Result<BuildResult, _> = serde_json::from_slice(body); + if let Ok(msg) = decode_msg { + attempt_id = msg.legacy().attempt_id; + message = MsgType::Finish(Box::new(msg)); + } else { + return Err(format!("failed to decode job: {decode_msg:?}")); + } + } + } + + Ok(LogMessage { + from: LogFrom { + routing_key: routing_key.to_string(), + attempt_id, + }, + message, + }) + } + + async fn consumer(&mut self, job: &LogMessage) -> worker::Actions { + match job.message { + MsgType::Start(ref start) => { + self.write_metadata(&job.from, start) + .expect("failed to write metadata"); + + // Make sure the log content exists by opening its handle. + // This (hopefully) prevents builds that produce no output (for any reason) from + // having their logs.tickborg.project-tick.net link complaining about a 404. + let _ = self.handle_for(&job.from).unwrap(); + } + MsgType::Msg(ref message) => { + let handle = self.handle_for(&job.from).unwrap(); + + handle.write_to_line((message.line_number - 1) as usize, &message.output); + } + MsgType::Finish(ref finish) => { + self.write_result(&job.from, finish) + .expect("failed to write result"); + } + } + + vec![worker::Action::Ack] + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::message::buildresult::{BuildStatus, V1Tag}; + use crate::message::{Pr, Repo}; + use crate::test_scratch::TestScratch; + use crate::worker::SimpleWorker; + use std::io::Read; + use std::path::PathBuf; + + fn make_worker(path: PathBuf) -> LogMessageCollector { + LogMessageCollector::new(path, 3) + } + + fn make_from(id: &str) -> LogFrom { + LogFrom { + attempt_id: format!("attempt-id-{id}"), + routing_key: format!("routing-key-{id}"), + } + } + + #[test] + fn test_handle_for() { + let p = TestScratch::new_dir("log-message-collector-handle_for"); + + let a = make_from("a.foo/123"); + let b = make_from("b.foo/123"); + let c = make_from("c.foo/123"); + let d = make_from("d.foo/123"); + + let mut worker = make_worker(p.path()); + assert!(worker.handle_for(&a).is_ok()); + assert!(worker.handle_for(&b).is_ok()); + assert!(worker.handle_for(&c).is_ok()); + assert!(worker.handle_for(&d).is_ok()); + assert!(worker.handle_for(&a).is_ok()); + } + + #[test] + fn test_path_for_metadata() { + let p = TestScratch::new_dir("log-message-collector-path_for_metadata"); + let worker = make_worker(p.path()); + + let path = worker + .path_for_metadata(&LogFrom { + attempt_id: String::from("my-attempt-id"), + routing_key: String::from("my-routing-key"), + }) + .expect("the path should be valid"); + + assert!(path.starts_with(p.path())); + assert!( + path.as_os_str() + .to_string_lossy() + .ends_with("my-routing-key/my-attempt-id.metadata.json") + ); + } + + #[test] + fn test_path_for_result() { + let p = TestScratch::new_dir("log-message-collector-path_for_result"); + let worker = make_worker(p.path()); + + let path = worker + .path_for_result(&LogFrom { + attempt_id: String::from("my-attempt-id"), + routing_key: String::from("my-routing-key"), + }) + .expect("the path should be valid"); + + assert!(path.starts_with(p.path())); + assert!( + path.as_os_str() + .to_string_lossy() + .ends_with("my-routing-key/my-attempt-id.result.json") + ); + } + + #[test] + fn test_path_for_log() { + let p = TestScratch::new_dir("log-message-collector-path_for_log"); + let worker = make_worker(p.path()); + + let path = worker + .path_for_log(&LogFrom { + attempt_id: String::from("my-attempt-id"), + routing_key: String::from("my-routing-key"), + }) + .expect("the path should be valid"); + + assert!(path.starts_with(p.path())); + assert!(path.ends_with("my-routing-key/my-attempt-id")); + } + + #[test] + fn test_path_for_log_malicious() { + let p = TestScratch::new_dir("log-message-collector-for_malicious"); + let worker = make_worker(p.path()); + + let path = worker.path_for_log(&LogFrom { + attempt_id: String::from("./../../"), + routing_key: String::from("./../../foobar"), + }); + + println!("path: {path:?}"); + assert!(path.is_err()); + } + + #[test] + fn test_validate_path_segment() { + assert!(validate_path_segment(&PathBuf::from("foo")).is_ok()); + assert!(validate_path_segment(&PathBuf::from("foo/bar")).is_ok()); + assert!(validate_path_segment(&PathBuf::from("foo.bar/123")).is_ok()); + assert!(validate_path_segment(&PathBuf::from("..")).is_err()); + assert!(validate_path_segment(&PathBuf::from(".")).is_err()); + assert!(validate_path_segment(&PathBuf::from("./././")).is_err()); + assert!(validate_path_segment(&PathBuf::from("")).is_err()); + assert!(validate_path_segment(&PathBuf::from("foo/..")).is_err()); + assert!(validate_path_segment(&PathBuf::from("foo/../bar")).is_err()); + assert!(validate_path_segment(&PathBuf::from("foo/./bar")).is_ok()); + assert!(validate_path_segment(&PathBuf::from("/foo/bar")).is_err()); + assert!(validate_path_segment(&PathBuf::from("/foo")).is_err()); + } + + #[test] + fn test_open_file() { + let p = TestScratch::new_dir("log-message-collector-open_file"); + let worker = make_worker(p.path()); + + assert!( + worker + .open_file(&worker.path_for_log(&make_from("a")).unwrap()) + .is_ok() + ); + assert!( + worker + .open_file(&worker.path_for_log(&make_from("b.foo/123")).unwrap()) + .is_ok() + ); + } + + #[tokio::test] + pub async fn test_logs_collect() { + let mut logmsg = BuildLogMsg { + attempt_id: String::from("my-attempt-id"), + identity: String::from("my-identity"), + system: String::from("foobar-x8664"), + line_number: 1, + output: String::from("line-1"), + }; + let mut job = LogMessage { + from: make_from("foo"), + message: MsgType::Msg(logmsg.clone()), + }; + + let p = TestScratch::new_dir("log-message-collector-logs_collector"); + + { + let mut worker = make_worker(p.path()); + assert_eq!( + vec![worker::Action::Ack], + worker + .consumer(&LogMessage { + from: make_from("foo"), + message: MsgType::Start(BuildLogStart { + attempt_id: String::from("my-attempt-id"), + identity: String::from("my-identity"), + system: String::from("foobar-x8664"), + attempted_attrs: Some(vec!["foo".to_owned()]), + skipped_attrs: Some(vec!["bar".to_owned()]), + }) + }) + .await + ); + + assert!(p.path().join("routing-key-foo/attempt-id-foo").exists()); + assert_eq!(vec![worker::Action::Ack], worker.consumer(&job).await); + + logmsg.line_number = 5; + logmsg.output = String::from("line-5"); + job.message = MsgType::Msg(logmsg.clone()); + assert_eq!(vec![worker::Action::Ack], worker.consumer(&job).await); + + job.from.attempt_id = String::from("my-other-attempt"); + logmsg.attempt_id = String::from("my-other-attempt"); + logmsg.line_number = 3; + logmsg.output = String::from("line-3"); + job.message = MsgType::Msg(logmsg); + assert_eq!(vec![worker::Action::Ack], worker.consumer(&job).await); + + assert_eq!( + vec![worker::Action::Ack], + worker + .consumer(&LogMessage { + from: make_from("foo"), + message: MsgType::Finish(Box::new(BuildResult::V1 { + tag: V1Tag::V1, + repo: Repo { + clone_url: "https://github.com/project-tick/tickborg.git".to_owned(), + full_name: "project-tick/tickborg".to_owned(), + owner: "project-tick".to_owned(), + name: "tickborg".to_owned(), + }, + pr: Pr { + number: 42, + head_sha: "6dd9f0265d52b946dd13daf996f30b64e4edb446".to_owned(), + target_branch: Some("scratch".to_owned()), + }, + system: "x86_64-linux".to_owned(), + output: vec![], + attempt_id: "attempt-id-foo".to_owned(), + request_id: "bogus-request-id".to_owned(), + status: BuildStatus::Success, + attempted_attrs: Some(vec!["foo".to_owned()]), + skipped_attrs: Some(vec!["bar".to_owned()]), + })) + }) + .await + ); + } + + let mut prm = p.path(); + let mut sm = String::new(); + prm.push("routing-key-foo/attempt-id-foo.metadata.json"); + File::open(prm).unwrap().read_to_string(&mut sm).unwrap(); + assert_eq!( + &sm, + "{\"system\":\"foobar-x8664\",\"identity\":\"my-identity\",\"attempt_id\":\"my-attempt-id\",\"attempted_attrs\":[\"foo\"],\"skipped_attrs\":[\"bar\"]}" + ); + + let mut prf = p.path(); + let mut sf = String::new(); + prf.push("routing-key-foo/attempt-id-foo"); + File::open(prf).unwrap().read_to_string(&mut sf).unwrap(); + assert_eq!(&sf, "line-1\n\n\n\nline-5\n"); + + let mut pr = p.path(); + let mut s = String::new(); + pr.push("routing-key-foo/my-other-attempt"); + File::open(pr).unwrap().read_to_string(&mut s).unwrap(); + assert_eq!(&s, "\n\nline-3\n"); + + let mut prr = p.path(); + let mut sr = String::new(); + prr.push("routing-key-foo/attempt-id-foo.result.json"); + File::open(prr).unwrap().read_to_string(&mut sr).unwrap(); + assert_eq!( + &sr, + "{\"tag\":\"V1\",\"repo\":{\"owner\":\"project-tick\",\"name\":\"tickborg\",\"full_name\":\"project-tick/tickborg\",\"clone_url\":\"https://github.com/project-tick/tickborg.git\"},\"pr\":{\"target_branch\":\"scratch\",\"number\":42,\"head_sha\":\"6dd9f0265d52b946dd13daf996f30b64e4edb446\"},\"system\":\"x86_64-linux\",\"output\":[],\"attempt_id\":\"attempt-id-foo\",\"request_id\":\"bogus-request-id\",\"status\":\"Success\",\"skipped_attrs\":[\"bar\"],\"attempted_attrs\":[\"foo\"]}" + ); + } +} diff --git a/ofborg/tickborg/src/tasks/mod.rs b/ofborg/tickborg/src/tasks/mod.rs new file mode 100644 index 0000000000..5aab0fa631 --- /dev/null +++ b/ofborg/tickborg/src/tasks/mod.rs @@ -0,0 +1,8 @@ +pub mod build; +pub mod eval; +pub mod evaluate; +pub mod evaluationfilter; +pub mod githubcommentfilter; +pub mod githubcommentposter; +pub mod log_message_collector; +pub mod statscollector; diff --git a/ofborg/tickborg/src/tasks/statscollector.rs b/ofborg/tickborg/src/tasks/statscollector.rs new file mode 100644 index 0000000000..fef23ad3c6 --- /dev/null +++ b/ofborg/tickborg/src/tasks/statscollector.rs @@ -0,0 +1,68 @@ +use crate::stats; +use crate::worker; + +use tracing::error; + +pub struct StatCollectorWorker<E> { + events: E, + collector: stats::MetricCollector, +} + +impl<E: stats::SysEvents + 'static> StatCollectorWorker<E> { + pub fn new(events: E, collector: stats::MetricCollector) -> StatCollectorWorker<E> { + StatCollectorWorker { events, collector } + } +} + +impl<E: stats::SysEvents + 'static> worker::SimpleWorker for StatCollectorWorker<E> { + type J = stats::EventMessage; + + async fn msg_to_job( + &mut self, + _: &str, + _: &Option<String>, + body: &[u8], + ) -> Result<Self::J, String> { + match serde_json::from_slice(body) { + Ok(e) => Ok(e), + Err(_) => { + let mut modified_body: Vec<u8> = vec![b"\""[0]]; + modified_body.append(&mut body.to_vec()); + modified_body.push(b"\""[0]); + + match serde_json::from_slice(&modified_body) { + Ok(event) => { + self.events + .notify(stats::Event::StatCollectorLegacyEvent( + stats::event_metric_name(&event), + )) + .await; + Ok(stats::EventMessage { + sender: "".to_owned(), + events: vec![event], + }) + } + Err(err) => { + self.events + .notify(stats::Event::StatCollectorBogusEvent) + .await; + error!( + "Failed to decode message: {:?}, Err: {err:?}", + std::str::from_utf8(body).unwrap_or("<message not utf8>") + ); + Err("Failed to decode message".to_owned()) + } + } + } + } + } + + async fn consumer(&mut self, job: &stats::EventMessage) -> worker::Actions { + let sender = job.sender.clone(); + for event in job.events.iter() { + self.collector.record(sender.clone(), event.clone()); + } + + vec![worker::Action::Ack] + } +} diff --git a/ofborg/tickborg/src/test_scratch.rs b/ofborg/tickborg/src/test_scratch.rs new file mode 100644 index 0000000000..d63632ecd0 --- /dev/null +++ b/ofborg/tickborg/src/test_scratch.rs @@ -0,0 +1,61 @@ +use std::fs; +use std::path::{Path, PathBuf}; +use std::process::Command; + +use tracing::debug; + +pub struct TestScratch { + root: PathBuf, +} + +impl TestScratch { + pub fn new_dir(ident: &str) -> TestScratch { + let scratch = TestScratch { + root: Path::new(env!("CARGO_MANIFEST_DIR")) + .join("test-scratch") + .join("dirs") + .join(format!("dir-{ident}")), + }; + + TestScratch::create_dir(&scratch); + + scratch + } + + pub fn new_file(ident: &str) -> TestScratch { + let scratch = TestScratch { + root: Path::new(env!("CARGO_MANIFEST_DIR")) + .join("test-scratch") + .join("files") + .join(format!("file-{ident}")), + }; + + TestScratch::create_dir(&scratch); + scratch + } + + fn create_dir(path: &TestScratch) { + let target = path.root.parent().unwrap(); + debug!("Creating directory {target:?}"); + fs::create_dir_all(target).unwrap(); + } + + pub fn path(&self) -> PathBuf { + self.root.clone() + } + + pub fn string(&self) -> String { + self.path().to_str().unwrap().to_owned() + } +} + +impl Drop for TestScratch { + fn drop(&mut self) { + debug!("Deleting root {:?}", self.root); + Command::new("rm") + .arg("-rf") + .arg(self.root.clone()) + .status() + .expect("cleanup of test-scratch should work"); + } +} diff --git a/ofborg/tickborg/src/worker.rs b/ofborg/tickborg/src/worker.rs new file mode 100644 index 0000000000..9569b450b9 --- /dev/null +++ b/ofborg/tickborg/src/worker.rs @@ -0,0 +1,53 @@ +use std::{marker::Send, sync::Arc}; + +use serde::Serialize; + +pub struct Response {} + +pub type Actions = Vec<Action>; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Action { + Ack, + NackRequeue, + NackDump, + Publish(Arc<QueueMsg>), +} + +#[derive(Debug, PartialEq, Eq)] +pub struct QueueMsg { + pub exchange: Option<String>, + pub routing_key: Option<String>, + pub mandatory: bool, + pub immediate: bool, + pub content_type: Option<String>, + pub content: Vec<u8>, +} + +pub fn publish_serde_action<T: Serialize + ?Sized>( + exchange: Option<String>, + routing_key: Option<String>, + msg: &T, +) -> Action { + Action::Publish(Arc::new(QueueMsg { + exchange, + routing_key, + mandatory: false, + immediate: false, + content_type: Some("application/json".to_owned()), + content: serde_json::to_string(&msg).unwrap().into_bytes(), + })) +} + +pub trait SimpleWorker: Send { + type J: Send; + + fn consumer(&mut self, job: &Self::J) -> impl std::future::Future<Output = Actions>; + + fn msg_to_job( + &mut self, + method: &str, + headers: &Option<String>, + body: &[u8], + ) -> impl std::future::Future<Output = Result<Self::J, String>>; +} diff --git a/ofborg/tickborg/src/writetoline.rs b/ofborg/tickborg/src/writetoline.rs new file mode 100644 index 0000000000..848464242d --- /dev/null +++ b/ofborg/tickborg/src/writetoline.rs @@ -0,0 +1,356 @@ +use std::fs::File; +use std::io::{BufRead, BufReader, Seek, SeekFrom, Write}; + +pub struct LineWriter { + file: File, + buffer: Vec<String>, + last_line: usize, +} + +impl LineWriter { + pub fn new(mut rw: File) -> LineWriter { + let buf = LineWriter::load_buffer(&mut rw); + let len = buf.len(); + + LineWriter { + file: rw, + buffer: buf, + last_line: len, + } + } + + fn load_buffer(file: &mut File) -> Vec<String> { + file.seek(SeekFrom::Start(0)).unwrap(); + + let reader = BufReader::new(file.try_clone().unwrap()); + reader + .lines() + .map(|line| match line { + Ok(s) => s, + Err(err) => format!("UTF-8 Decode err: {err:?}"), + }) + .collect() + } + + pub fn write_to_line(&mut self, line: usize, data: &str) { + let original_len = self.buffer.len(); + while self.buffer.len() <= line { + self.buffer.push("".to_owned()); + } + + self.buffer.remove(line); + self.buffer.insert(line, data.to_owned()); + + if self.last_line > line { + // println!("taking the rewrite option"); + // We're inserting in to the middle of a file, so just + // write the entire buffer again + self.file.set_len(0).unwrap(); + self.file.seek(SeekFrom::Start(0)).unwrap(); + self.file + .write_all(self.buffer.join("\n").as_bytes()) + .unwrap(); + self.file.write_all(b"\n").unwrap(); + } else { + // println!("taking the append option"); + // println!("Writing {:?} to line {}", data, line); + + let buffer_start = original_len; + let buffer_end = line + 1; + let to_write = self.buffer[buffer_start..buffer_end].join("\n"); + // println!("Full buffer: {:?}", self.buffer); + // println!("buffer[{}..{}] = {:?}", buffer_start, buffer_end, to_write); + // Inclusive range syntax (ie: ...) is experimental, so + // to include the final newline in to the written buffer + // we have to use one more than the range we want for the + // end + // println!("selected buffer: {:?}", to_write); + self.file.write_all(to_write.as_bytes()).unwrap(); + self.file.write_all(b"\n").unwrap(); + } + + self.last_line = line; + } + + pub fn inner(self) -> File { + self.file + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_scratch::TestScratch; + use std::fs::File; + use std::fs::OpenOptions; + use std::io::Read; + use std::path::Path; + use std::time::Instant; + + fn testfile(path: &Path) -> File { + OpenOptions::new() + .read(true) + .write(true) + .truncate(true) + .create(true) + .open(path) + .expect("failed to open scratch file") + } + + fn assert_file_content<T>(f: &mut T, value: &str) + where + T: Read + Seek, + { + let mut mystr: String = String::new(); + f.seek(SeekFrom::Start(0)).unwrap(); + f.read_to_string(&mut mystr).unwrap(); + assert_eq!(mystr, value); + } + + #[test] + fn test_writer_line_ordered() { + let p = TestScratch::new_file("writetoline-ordered"); + let mut f = testfile(&p.path()); + + assert_file_content(&mut f, ""); + + let mut writer = LineWriter::new(f); + writer.write_to_line(0, "hello"); + f = writer.inner(); + + assert_file_content(&mut f, "hello\n"); + + let mut writer = LineWriter::new(f); + writer.write_to_line(1, "world"); + f = writer.inner(); + + assert_file_content(&mut f, "hello\nworld\n"); + + let mut writer = LineWriter::new(f); + writer.write_to_line(2, ":)"); + f = writer.inner(); + + assert_file_content(&mut f, "hello\nworld\n:)\n"); + } + + #[test] + fn test_writer_line_unordered() { + let p = TestScratch::new_file("writetoline-unordered"); + let mut f = testfile(&p.path()); + + assert_file_content(&mut f, ""); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line(2, ":)"); + f = writer.inner(); + } + + assert_file_content(&mut f, "\n\n:)\n"); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line(1, "world"); + f = writer.inner(); + } + + assert_file_content(&mut f, "\nworld\n:)\n"); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line(0, "hello"); + f = writer.inner(); + } + + assert_file_content(&mut f, "hello\nworld\n:)\n"); + } + + #[test] + fn test_writer_line_unordered_long() { + let p = TestScratch::new_file("writetoline-unordered-long"); + let mut f = testfile(&p.path()); + + assert_file_content(&mut f, ""); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line( + 2, + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + ); + f = writer.inner(); + } + assert_file_content( + &mut f, + "\n\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n", + ); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line( + 1, + "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", + ); + f = writer.inner(); + } + assert_file_content( + &mut f, + "\nBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n", + ); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line( + 0, + "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC", + ); + f = writer.inner(); + } + assert_file_content( + &mut f, + "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC\nBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n", + ); + } + + #[test] + fn test_writer_line_unordered_longish() { + let p = TestScratch::new_file("writetoline-unordered-longish"); + let mut f = testfile(&p.path()); + + assert_file_content(&mut f, ""); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line(2, "hello"); + f = writer.inner(); + } + assert_file_content(&mut f, "\n\nhello\n"); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line(1, "mynameis"); + f = writer.inner(); + } + assert_file_content(&mut f, "\nmynameis\nhello\n"); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line(0, "graham"); + f = writer.inner(); + } + assert_file_content(&mut f, "graham\nmynameis\nhello\n"); + } + + #[test] + fn test_writer_line_ordered_result() { + let p = TestScratch::new_file("writetoline-ordered-result"); + let mut f = testfile(&p.path()); + + let mut writer = LineWriter::new(f); + writer.write_to_line(0, "hello"); + writer.write_to_line(1, "world"); + writer.write_to_line(2, ":)"); + f = writer.inner(); + + assert_file_content(&mut f, "hello\nworld\n:)\n"); + } + + #[test] + fn test_writer_line_unordered_result() { + let p = TestScratch::new_file("writetoline-unordered-result"); + let mut f = testfile(&p.path()); + + let mut writer = LineWriter::new(f); + writer.write_to_line(2, ":)"); + writer.write_to_line(1, "world"); + writer.write_to_line(0, "hello"); + f = writer.inner(); + + assert_file_content(&mut f, "hello\nworld\n:)\n"); + } + + #[test] + fn test_writer_line_unordered_long_result() { + let p = TestScratch::new_file("writetoline-unordered-long-result"); + let mut f = testfile(&p.path()); + + let mut writer = LineWriter::new(f); + writer.write_to_line( + 2, + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + ); + writer.write_to_line( + 1, + "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", + ); + writer.write_to_line( + 0, + "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC", + ); + f = writer.inner(); + + assert_file_content( + &mut f, + "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC\nBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n", + ); + } + + #[test] + fn test_writer_line_unordered_longish_result() { + let p = TestScratch::new_file("writetoline-unordered-longish-result"); + let mut f = testfile(&p.path()); + + let mut writer = LineWriter::new(f); + writer.write_to_line(2, "hello"); + writer.write_to_line(1, "mynameis"); + writer.write_to_line(0, "graham"); + f = writer.inner(); + + assert_file_content(&mut f, "graham\nmynameis\nhello\n"); + } + + #[test] + fn test_writer_line_middle() { + let p = TestScratch::new_file("writetoline-middle"); + let mut f = testfile(&p.path()); + + assert_file_content(&mut f, ""); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line(5, "hello"); + f = writer.inner(); + } + assert_file_content(&mut f, "\n\n\n\n\nhello\n"); + } + + #[test] + fn bench_lots_of_ordered_lines() { + let p = TestScratch::new_file("bench-ordered-lines"); + let f = testfile(&p.path()); + let mut writer = LineWriter::new(f); + + let timer = Instant::now(); + + for i in 0..3000 { + writer.write_to_line(i, "This is my line!"); + } + + println!("ordered took: {:?}", timer.elapsed()); + } + + #[test] + fn bench_lots_of_reversed_lines() { + let p = TestScratch::new_file("bench-reversed-lines"); + let f = testfile(&p.path()); + let mut writer = LineWriter::new(f); + + let timer = Instant::now(); + + for i in (0..3000).rev() { + writer.write_to_line(i, "This is my line!"); + } + + println!("reversed took: {:?}", timer.elapsed()); + } +} |
