maris-ai-master / backend-rust /src /vision_runtime.rs
MarisUK's picture
Maris AI model sync
f440f03 verified
use std::{collections::HashMap, sync::Arc, time::Duration};
use anyhow::{anyhow, Context};
use base64::Engine as _;
use tokio::{
process::Command,
sync::{oneshot, RwLock},
task::JoinHandle,
time::sleep,
};
use crate::{
api::vision::{LiveCameraConnectRequest, LiveCameraSession},
app_state::AppState,
inference::{
python_bridge::normalize_http_base_url,
response_validator::LiveFrameResponsePayload,
},
vision_store,
};
struct WorkerHandle {
stop_tx: oneshot::Sender<()>,
join_handle: JoinHandle<()>,
}
#[derive(Default)]
pub struct VisionRuntime {
workers: Arc<RwLock<HashMap<String, WorkerHandle>>>,
}
impl VisionRuntime {
pub fn new() -> Self {
Self::default()
}
pub async fn start_worker(
&self,
state: AppState,
camera: LiveCameraSession,
) -> anyhow::Result<()> {
if camera.url.as_deref().is_none() {
return Ok(());
}
self.stop_worker(&camera.camera_id).await;
let (stop_tx, mut stop_rx) = oneshot::channel();
let camera_id = camera.camera_id.clone();
let worker_camera = camera.clone();
let join_handle = tokio::spawn(async move {
let interval_ms = state.settings.ingest_worker_poll_ms.max(
(1000.0 / worker_camera.fps_budget.max(0.5)).round() as u64,
);
loop {
tokio::select! {
_ = &mut stop_rx => break,
_ = sleep(Duration::from_millis(interval_ms.max(250))) => {
if let Err(error) = process_camera_frame(&state, &worker_camera).await {
tracing::warn!("Vision ingest worker {} kļūda: {}", camera_id, error);
}
}
}
}
});
self.workers.write().await.insert(
camera.camera_id.clone(),
WorkerHandle {
stop_tx,
join_handle,
},
);
Ok(())
}
pub async fn stop_worker(&self, camera_id: &str) {
if let Some(handle) = self.workers.write().await.remove(camera_id) {
let _ = handle.stop_tx.send(());
handle.join_handle.abort();
}
}
}
async fn process_camera_frame(state: &AppState, camera: &LiveCameraSession) -> anyhow::Result<()> {
let stream_url = resolve_stream_url(camera)?;
let image_base64 = capture_stream_frame(
state
.settings
.ingest_ffmpeg_path
.as_deref()
.unwrap_or("ffmpeg"),
&camera.transport,
&stream_url,
)
.await?;
if let Some(scheduler) = &state.gpu_scheduler {
let data_url = format!("data:image/jpeg;base64,{image_base64}");
let frame_index = chrono::Utc::now().timestamp_millis().rem_euclid(i32::MAX as i64) as i32;
scheduler
.enqueue_job(&camera.camera_id, "live_frame", frame_index, &data_url, 50)
.await?;
return Ok(());
}
let base_url = normalize_http_base_url(&state.settings.core_python_url);
let endpoint = format!("{base_url}/v1/vision/live/frame");
let response = state
.http_client
.post(endpoint)
.json(&serde_json::json!({
"camera_id": camera.camera_id,
"image_base64": image_base64,
"timestamp_ms": chrono::Utc::now().timestamp_millis(),
}))
.send()
.await
.context("Neizdevās nosūtīt live frame uz core-python")?;
if !response.status().is_success() {
let body = response.text().await.unwrap_or_default();
return Err(anyhow!("Core-python live/frame atbildēja ar kļūdu: {}", body));
}
let payload: LiveFrameResponsePayload = response
.json()
.await
.context("Neizdevās dekodēt live frame atbildi")?;
if let Some(pool) = &state.postgres {
vision_store::persist_session(
pool,
&super::api::vision::map_live_camera(payload.camera),
&payload
.events
.into_iter()
.map(super::api::vision::map_live_event)
.collect::<Vec<_>>(),
)
.await?;
}
Ok(())
}
fn resolve_stream_url(camera: &LiveCameraSession) -> anyhow::Result<String> {
match camera.transport.as_str() {
"rtsp" | "hls" | "rtmp" | "webrtc" => camera
.url
.clone()
.ok_or_else(|| anyhow!("Kamerai trūkst stream URL.")),
"onvif" => {
if let Some(url) = camera.url.clone() {
if url.starts_with("rtsp://") || url.starts_with("http://") || url.starts_with("https://")
{
return Ok(url);
}
}
camera
.auth
.get("stream_uri")
.and_then(|value| value.as_str())
.map(ToString::to_string)
.ok_or_else(|| anyhow!("ONVIF kamerai vajag ffmpeg-kompatiblu stream_uri."))
}
_ => Err(anyhow!("Transport {} nav ingest workeram atbalstīts.", camera.transport)),
}
}
async fn capture_stream_frame(
ffmpeg_path: &str,
transport: &str,
stream_url: &str,
) -> anyhow::Result<String> {
let mut command = Command::new(ffmpeg_path);
command.arg("-loglevel").arg("error");
if transport == "rtsp" {
command.arg("-rtsp_transport").arg("tcp");
}
let output = command
.arg("-i")
.arg(stream_url)
.arg("-frames:v")
.arg("1")
.arg("-f")
.arg("image2pipe")
.arg("-vcodec")
.arg("mjpeg")
.arg("pipe:1")
.output()
.await
.with_context(|| format!("Neizdevās palaist ingest worker bināriju `{ffmpeg_path}`"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!("FFmpeg ingest kļūda: {}", stderr.trim()));
}
Ok(base64::engine::general_purpose::STANDARD.encode(output.stdout))
}
pub fn should_run_background_worker(camera: &LiveCameraConnectRequest) -> bool {
matches!(
camera.transport.as_str(),
"rtsp" | "onvif" | "hls" | "rtmp" | "webrtc"
) && (camera.url.as_deref().is_some()
|| camera
.auth
.get("stream_uri")
.and_then(|value| value.as_str())
.is_some())
}