use std::io::Read; use std::path::PathBuf; use std::process::Stdio; use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::SystemTime; use serde::Serialize; use shared_child::SharedChild; use super::ringbuffer::BoundedRingBuffer; const RING_CAP: usize = 4 * 1024 * 1024; pub struct BackgroundProc { pub command: String, pub cwd: Option, pub started_at_ms: u64, pub child: Arc, pub buffer: Mutex, pub exited: AtomicBool, pub exit_code: AtomicI32, pub exit_unknown: AtomicBool, } #[derive(Serialize)] pub struct BackgroundLogResponse { pub bytes: String, pub next_offset: u64, pub dropped: u64, pub exited: bool, pub exit_code: Option, } #[derive(Serialize)] pub struct BackgroundProcInfo { pub handle: u32, pub command: String, pub cwd: Option, pub started_at_ms: u64, pub exited: bool, pub exit_code: Option, } impl BackgroundProc { pub fn read_logs(&self, since: u64) -> BackgroundLogResponse { let (bytes, next_offset, dropped) = self.buffer.lock().unwrap().read_from(since); let exited = self.exited.load(Ordering::Acquire); let exit_code = if exited && !self.exit_unknown.load(Ordering::Acquire) { Some(self.exit_code.load(Ordering::Acquire)) } else { None }; BackgroundLogResponse { bytes: String::from_utf8_lossy(&bytes).into_owned(), next_offset, dropped, exited, exit_code, } } pub fn kill(&self) { let _ = self.child.kill(); } pub fn info(&self, handle: u32) -> BackgroundProcInfo { let exited = self.exited.load(Ordering::Acquire); let exit_code = if exited && !self.exit_unknown.load(Ordering::Acquire) { Some(self.exit_code.load(Ordering::Acquire)) } else { None }; BackgroundProcInfo { handle, command: self.command.clone(), cwd: self.cwd.clone(), started_at_ms: self.started_at_ms, exited, exit_code, } } } impl Drop for BackgroundProc { fn drop(&mut self) { self.kill(); } } pub fn spawn(command: String, cwd: Option) -> Result, String> { let trimmed = command.trim().to_string(); if trimmed.is_empty() { return Err("empty command".into()); } if let Some(ref dir) = cwd { if !PathBuf::from(dir).is_dir() { return Err(format!("cwd is not a directory: {dir}")); } } let mut cmd = super::build_oneshot_command(&trimmed); if let Some(ref dir) = cwd { cmd.current_dir(dir); } cmd.stdin(Stdio::null()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let shared = SharedChild::spawn(&mut cmd).map_err(|e| e.to_string())?; let stdout_pipe = shared.take_stdout().ok_or("no stdout pipe")?; let stderr_pipe = shared.take_stderr().ok_or("no stderr pipe")?; let child = Arc::new(shared); let started_at_ms = SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_millis() as u64) .unwrap_or(0); let proc = Arc::new(BackgroundProc { command: trimmed, cwd, started_at_ms, child, buffer: Mutex::new(BoundedRingBuffer::new(RING_CAP)), exited: AtomicBool::new(false), exit_code: AtomicI32::new(0), exit_unknown: AtomicBool::new(false), }); { let proc_ref = proc.clone(); let mut pipe = stdout_pipe; thread::spawn(move || { let mut buf = [0u8; 8192]; loop { match pipe.read(&mut buf) { Ok(0) => break, Ok(n) => proc_ref.buffer.lock().unwrap().push(&buf[..n]), Err(_) => break, } } }); } { let proc_ref = proc.clone(); let mut pipe = stderr_pipe; thread::spawn(move || { let mut buf = [0u8; 8192]; loop { match pipe.read(&mut buf) { Ok(0) => break, Ok(n) => proc_ref.buffer.lock().unwrap().push(&buf[..n]), Err(_) => break, } } }); } { let proc_ref = proc.clone(); let child_for_wait = proc.child.clone(); thread::spawn(move || { match child_for_wait.wait() { Ok(status) => match status.code() { Some(code) => proc_ref.exit_code.store(code, Ordering::Release), None => proc_ref.exit_unknown.store(true, Ordering::Release), }, Err(_) => proc_ref.exit_unknown.store(true, Ordering::Release), } proc_ref.exited.store(true, Ordering::Release); }); } Ok(proc) }