| use std::{collections::HashMap, sync::Arc, time::Duration}; |
|
|
| use anyhow::{anyhow, Context}; |
| use base64::Engine as _; |
| use tokio::{ |
| process::Command, |
| sync::{oneshot, RwLock}, |
| task::JoinHandle, |
| time::sleep, |
| }; |
|
|
| use crate::{ |
| api::vision::{LiveCameraConnectRequest, LiveCameraSession}, |
| app_state::AppState, |
| inference::{ |
| python_bridge::normalize_http_base_url, |
| response_validator::LiveFrameResponsePayload, |
| }, |
| vision_store, |
| }; |
|
|
| struct WorkerHandle { |
| stop_tx: oneshot::Sender<()>, |
| join_handle: JoinHandle<()>, |
| } |
|
|
| #[derive(Default)] |
| pub struct VisionRuntime { |
| workers: Arc<RwLock<HashMap<String, WorkerHandle>>>, |
| } |
|
|
| impl VisionRuntime { |
| pub fn new() -> Self { |
| Self::default() |
| } |
|
|
| pub async fn start_worker( |
| &self, |
| state: AppState, |
| camera: LiveCameraSession, |
| ) -> anyhow::Result<()> { |
| if camera.url.as_deref().is_none() { |
| return Ok(()); |
| } |
|
|
| self.stop_worker(&camera.camera_id).await; |
|
|
| let (stop_tx, mut stop_rx) = oneshot::channel(); |
| let camera_id = camera.camera_id.clone(); |
| let worker_camera = camera.clone(); |
| let join_handle = tokio::spawn(async move { |
| let interval_ms = state.settings.ingest_worker_poll_ms.max( |
| (1000.0 / worker_camera.fps_budget.max(0.5)).round() as u64, |
| ); |
| loop { |
| tokio::select! { |
| _ = &mut stop_rx => break, |
| _ = sleep(Duration::from_millis(interval_ms.max(250))) => { |
| if let Err(error) = process_camera_frame(&state, &worker_camera).await { |
| tracing::warn!("Vision ingest worker {} kļūda: {}", camera_id, error); |
| } |
| } |
| } |
| } |
| }); |
|
|
| self.workers.write().await.insert( |
| camera.camera_id.clone(), |
| WorkerHandle { |
| stop_tx, |
| join_handle, |
| }, |
| ); |
|
|
| Ok(()) |
| } |
|
|
| pub async fn stop_worker(&self, camera_id: &str) { |
| if let Some(handle) = self.workers.write().await.remove(camera_id) { |
| let _ = handle.stop_tx.send(()); |
| handle.join_handle.abort(); |
| } |
| } |
| } |
|
|
| async fn process_camera_frame(state: &AppState, camera: &LiveCameraSession) -> anyhow::Result<()> { |
| let stream_url = resolve_stream_url(camera)?; |
| let image_base64 = capture_stream_frame( |
| state |
| .settings |
| .ingest_ffmpeg_path |
| .as_deref() |
| .unwrap_or("ffmpeg"), |
| &camera.transport, |
| &stream_url, |
| ) |
| .await?; |
|
|
| if let Some(scheduler) = &state.gpu_scheduler { |
| let data_url = format!("data:image/jpeg;base64,{image_base64}"); |
| let frame_index = chrono::Utc::now().timestamp_millis().rem_euclid(i32::MAX as i64) as i32; |
| scheduler |
| .enqueue_job(&camera.camera_id, "live_frame", frame_index, &data_url, 50) |
| .await?; |
| return Ok(()); |
| } |
|
|
| let base_url = normalize_http_base_url(&state.settings.core_python_url); |
| let endpoint = format!("{base_url}/v1/vision/live/frame"); |
| let response = state |
| .http_client |
| .post(endpoint) |
| .json(&serde_json::json!({ |
| "camera_id": camera.camera_id, |
| "image_base64": image_base64, |
| "timestamp_ms": chrono::Utc::now().timestamp_millis(), |
| })) |
| .send() |
| .await |
| .context("Neizdevās nosūtīt live frame uz core-python")?; |
|
|
| if !response.status().is_success() { |
| let body = response.text().await.unwrap_or_default(); |
| return Err(anyhow!("Core-python live/frame atbildēja ar kļūdu: {}", body)); |
| } |
|
|
| let payload: LiveFrameResponsePayload = response |
| .json() |
| .await |
| .context("Neizdevās dekodēt live frame atbildi")?; |
|
|
| if let Some(pool) = &state.postgres { |
| vision_store::persist_session( |
| pool, |
| &super::api::vision::map_live_camera(payload.camera), |
| &payload |
| .events |
| .into_iter() |
| .map(super::api::vision::map_live_event) |
| .collect::<Vec<_>>(), |
| ) |
| .await?; |
| } |
|
|
| Ok(()) |
| } |
|
|
| fn resolve_stream_url(camera: &LiveCameraSession) -> anyhow::Result<String> { |
| match camera.transport.as_str() { |
| "rtsp" | "hls" | "rtmp" | "webrtc" => camera |
| .url |
| .clone() |
| .ok_or_else(|| anyhow!("Kamerai trūkst stream URL.")), |
| "onvif" => { |
| if let Some(url) = camera.url.clone() { |
| if url.starts_with("rtsp://") || url.starts_with("http://") || url.starts_with("https://") |
| { |
| return Ok(url); |
| } |
| } |
| camera |
| .auth |
| .get("stream_uri") |
| .and_then(|value| value.as_str()) |
| .map(ToString::to_string) |
| .ok_or_else(|| anyhow!("ONVIF kamerai vajag ffmpeg-kompatiblu stream_uri.")) |
| } |
| _ => Err(anyhow!("Transport {} nav ingest workeram atbalstīts.", camera.transport)), |
| } |
| } |
|
|
| async fn capture_stream_frame( |
| ffmpeg_path: &str, |
| transport: &str, |
| stream_url: &str, |
| ) -> anyhow::Result<String> { |
| let mut command = Command::new(ffmpeg_path); |
| command.arg("-loglevel").arg("error"); |
| if transport == "rtsp" { |
| command.arg("-rtsp_transport").arg("tcp"); |
| } |
| let output = command |
| .arg("-i") |
| .arg(stream_url) |
| .arg("-frames:v") |
| .arg("1") |
| .arg("-f") |
| .arg("image2pipe") |
| .arg("-vcodec") |
| .arg("mjpeg") |
| .arg("pipe:1") |
| .output() |
| .await |
| .with_context(|| format!("Neizdevās palaist ingest worker bināriju `{ffmpeg_path}`"))?; |
|
|
| if !output.status.success() { |
| let stderr = String::from_utf8_lossy(&output.stderr); |
| return Err(anyhow!("FFmpeg ingest kļūda: {}", stderr.trim())); |
| } |
|
|
| Ok(base64::engine::general_purpose::STANDARD.encode(output.stdout)) |
| } |
|
|
| pub fn should_run_background_worker(camera: &LiveCameraConnectRequest) -> bool { |
| matches!( |
| camera.transport.as_str(), |
| "rtsp" | "onvif" | "hls" | "rtmp" | "webrtc" |
| ) && (camera.url.as_deref().is_some() |
| || camera |
| .auth |
| .get("stream_uri") |
| .and_then(|value| value.as_str()) |
| .is_some()) |
| } |
|
|