use std::{collections::HashMap, sync::Arc, time::Duration}; use anyhow::{anyhow, Context}; use base64::Engine as _; use tokio::{ process::Command, sync::{oneshot, RwLock}, task::JoinHandle, time::sleep, }; use crate::{ api::vision::{LiveCameraConnectRequest, LiveCameraSession}, app_state::AppState, inference::{ python_bridge::normalize_http_base_url, response_validator::LiveFrameResponsePayload, }, vision_store, }; struct WorkerHandle { stop_tx: oneshot::Sender<()>, join_handle: JoinHandle<()>, } #[derive(Default)] pub struct VisionRuntime { workers: Arc>>, } impl VisionRuntime { pub fn new() -> Self { Self::default() } pub async fn start_worker( &self, state: AppState, camera: LiveCameraSession, ) -> anyhow::Result<()> { if camera.url.as_deref().is_none() { return Ok(()); } self.stop_worker(&camera.camera_id).await; let (stop_tx, mut stop_rx) = oneshot::channel(); let camera_id = camera.camera_id.clone(); let worker_camera = camera.clone(); let join_handle = tokio::spawn(async move { let interval_ms = state.settings.ingest_worker_poll_ms.max( (1000.0 / worker_camera.fps_budget.max(0.5)).round() as u64, ); loop { tokio::select! { _ = &mut stop_rx => break, _ = sleep(Duration::from_millis(interval_ms.max(250))) => { if let Err(error) = process_camera_frame(&state, &worker_camera).await { tracing::warn!("Vision ingest worker {} kļūda: {}", camera_id, error); } } } } }); self.workers.write().await.insert( camera.camera_id.clone(), WorkerHandle { stop_tx, join_handle, }, ); Ok(()) } pub async fn stop_worker(&self, camera_id: &str) { if let Some(handle) = self.workers.write().await.remove(camera_id) { let _ = handle.stop_tx.send(()); handle.join_handle.abort(); } } } async fn process_camera_frame(state: &AppState, camera: &LiveCameraSession) -> anyhow::Result<()> { let stream_url = resolve_stream_url(camera)?; let image_base64 = capture_stream_frame( state .settings .ingest_ffmpeg_path .as_deref() .unwrap_or("ffmpeg"), &camera.transport, &stream_url, ) .await?; if let Some(scheduler) = &state.gpu_scheduler { let data_url = format!("data:image/jpeg;base64,{image_base64}"); let frame_index = chrono::Utc::now().timestamp_millis().rem_euclid(i32::MAX as i64) as i32; scheduler .enqueue_job(&camera.camera_id, "live_frame", frame_index, &data_url, 50) .await?; return Ok(()); } let base_url = normalize_http_base_url(&state.settings.core_python_url); let endpoint = format!("{base_url}/v1/vision/live/frame"); let response = state .http_client .post(endpoint) .json(&serde_json::json!({ "camera_id": camera.camera_id, "image_base64": image_base64, "timestamp_ms": chrono::Utc::now().timestamp_millis(), })) .send() .await .context("Neizdevās nosūtīt live frame uz core-python")?; if !response.status().is_success() { let body = response.text().await.unwrap_or_default(); return Err(anyhow!("Core-python live/frame atbildēja ar kļūdu: {}", body)); } let payload: LiveFrameResponsePayload = response .json() .await .context("Neizdevās dekodēt live frame atbildi")?; if let Some(pool) = &state.postgres { vision_store::persist_session( pool, &super::api::vision::map_live_camera(payload.camera), &payload .events .into_iter() .map(super::api::vision::map_live_event) .collect::>(), ) .await?; } Ok(()) } fn resolve_stream_url(camera: &LiveCameraSession) -> anyhow::Result { match camera.transport.as_str() { "rtsp" | "hls" | "rtmp" | "webrtc" => camera .url .clone() .ok_or_else(|| anyhow!("Kamerai trūkst stream URL.")), "onvif" => { if let Some(url) = camera.url.clone() { if url.starts_with("rtsp://") || url.starts_with("http://") || url.starts_with("https://") { return Ok(url); } } camera .auth .get("stream_uri") .and_then(|value| value.as_str()) .map(ToString::to_string) .ok_or_else(|| anyhow!("ONVIF kamerai vajag ffmpeg-kompatiblu stream_uri.")) } _ => Err(anyhow!("Transport {} nav ingest workeram atbalstīts.", camera.transport)), } } async fn capture_stream_frame( ffmpeg_path: &str, transport: &str, stream_url: &str, ) -> anyhow::Result { let mut command = Command::new(ffmpeg_path); command.arg("-loglevel").arg("error"); if transport == "rtsp" { command.arg("-rtsp_transport").arg("tcp"); } let output = command .arg("-i") .arg(stream_url) .arg("-frames:v") .arg("1") .arg("-f") .arg("image2pipe") .arg("-vcodec") .arg("mjpeg") .arg("pipe:1") .output() .await .with_context(|| format!("Neizdevās palaist ingest worker bināriju `{ffmpeg_path}`"))?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(anyhow!("FFmpeg ingest kļūda: {}", stderr.trim())); } Ok(base64::engine::general_purpose::STANDARD.encode(output.stdout)) } pub fn should_run_background_worker(camera: &LiveCameraConnectRequest) -> bool { matches!( camera.transport.as_str(), "rtsp" | "onvif" | "hls" | "rtmp" | "webrtc" ) && (camera.url.as_deref().is_some() || camera .auth .get("stream_uri") .and_then(|value| value.as_str()) .is_some()) }