MarisUK's picture
Maris AI model sync
f440f03 verified
use axum::{
extract::{
Query, State,
ws::{Message, WebSocket, WebSocketUpgrade},
Path,
},
http::{HeaderMap, StatusCode},
response::{IntoResponse, Response},
Json,
};
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use crate::{
app_state::AppState,
camera_auth::{self, CameraPrincipal},
event_bus::EventEnvelope,
event_narrator,
jwt_auth,
inference::python_bridge::PythonBridge,
onvif_discovery::{self, OnvifCredentials},
snapshot_store,
vision_runtime,
vision_store::{self, VisionDashboardData},
};
use crate::inference::response_validator::{
FrameAnalysisPayload, LiveCameraCatalogPayload, LiveCameraHealthPayload,
LiveCameraResponsePayload, LiveCameraSessionPayload, LiveCameraResolutionPayload,
LiveEventPayload, LiveEventsResponsePayload, LiveFrameResponsePayload, LiveSnapshotPayload,
OcrBlockPayload, SceneSegmentPayload, SegmentationMaskPayload, TrackedObjectPayload,
VisionActionPayload, VisionAnalysisPayload, VisionDetectionPayload, VisionPosePayload,
};
#[derive(Deserialize)]
pub struct VisionImageRequest {
pub image_url: Option<String>,
pub image_base64: Option<String>,
pub max_detections: Option<u32>,
pub confidence_threshold: Option<f32>,
}
#[derive(Deserialize)]
pub struct VisionFrameSequenceRequest {
pub frames_base64: Vec<String>,
pub max_detections: Option<u32>,
pub confidence_threshold: Option<f32>,
}
#[derive(Serialize, Clone)]
pub struct VisionBoundingBox {
pub x: f32,
pub y: f32,
pub width: f32,
pub height: f32,
}
#[derive(Serialize)]
pub struct VisionDetection {
pub label: String,
pub confidence: f32,
pub bbox: VisionBoundingBox,
}
#[derive(Serialize)]
pub struct VisionAnalyzeResponse {
pub summary: String,
pub detections: Vec<VisionDetection>,
pub width: u32,
pub height: u32,
pub model: String,
pub fallback_used: bool,
}
#[derive(Serialize)]
pub struct OcrBlock {
pub text: String,
pub confidence: f32,
pub bbox: VisionBoundingBox,
pub language: String,
}
#[derive(Serialize)]
pub struct VisionOcrResponse {
pub summary: String,
pub results: Vec<OcrBlock>,
pub width: u32,
pub height: u32,
pub model: String,
pub fallback_used: bool,
}
#[derive(Serialize)]
pub struct PoseKeypoint {
pub name: String,
pub x: f32,
pub y: f32,
pub confidence: f32,
}
#[derive(Serialize)]
pub struct PoseConnection {
pub start: String,
pub end: String,
}
#[derive(Serialize)]
pub struct PoseDetection {
pub person_id: u32,
pub confidence: f32,
pub bbox: VisionBoundingBox,
pub keypoints: Vec<PoseKeypoint>,
pub connections: Vec<PoseConnection>,
}
#[derive(Serialize)]
pub struct VisionPoseResponse {
pub summary: String,
pub poses: Vec<PoseDetection>,
pub width: u32,
pub height: u32,
pub model: String,
pub fallback_used: bool,
}
#[derive(Serialize)]
pub struct SegmentationMask {
pub label: String,
pub confidence: f32,
pub mask_data_url: String,
pub bbox: VisionBoundingBox,
pub area_pixels: u32,
}
#[derive(Serialize)]
pub struct VisionSegmentationResponse {
pub summary: String,
pub masks: Vec<SegmentationMask>,
pub width: u32,
pub height: u32,
pub model: String,
pub fallback_used: bool,
}
#[derive(Serialize)]
pub struct ActionPrediction {
pub action: String,
pub confidence: f32,
pub subject_label: String,
pub rationale: String,
}
#[derive(Serialize)]
pub struct VisionActionResponse {
pub summary: String,
pub actions: Vec<ActionPrediction>,
pub width: u32,
pub height: u32,
pub model: String,
pub fallback_used: bool,
}
#[derive(Serialize, Clone)]
pub struct TrackObservation {
pub frame_index: u32,
pub confidence: f32,
pub bbox: VisionBoundingBox,
}
#[derive(Serialize, Clone)]
pub struct TrackedObject {
pub track_id: u32,
pub label: String,
pub average_confidence: f32,
pub observations: Vec<TrackObservation>,
}
#[derive(Serialize)]
pub struct VisionTrackingResponse {
pub summary: String,
pub tracks: Vec<TrackedObject>,
pub frame_count: u32,
pub model: String,
pub fallback_used: bool,
}
#[derive(Serialize)]
pub struct FrameAnalysis {
pub frame_index: u32,
pub summary: String,
pub detections: Vec<VisionDetection>,
pub dominant_labels: Vec<String>,
pub brightness: f32,
}
#[derive(Serialize)]
pub struct VisionFrameAnalysisResponse {
pub summary: String,
pub frames: Vec<FrameAnalysis>,
pub frame_count: u32,
pub model: String,
pub fallback_used: bool,
}
#[derive(Serialize, Clone)]
pub struct SceneSegment {
pub scene_index: u32,
pub start_frame: u32,
pub end_frame: u32,
pub summary: String,
pub dominant_labels: Vec<String>,
pub average_brightness: f32,
}
#[derive(Serialize)]
pub struct VisionSceneTimelineResponse {
pub summary: String,
pub scenes: Vec<SceneSegment>,
pub frame_count: u32,
pub model: String,
pub fallback_used: bool,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct LiveCameraConnectRequest {
pub camera_id: Option<String>,
pub source_type: String,
pub transport: String,
pub url: Option<String>,
pub device_id: Option<String>,
#[serde(default)]
pub auth: serde_json::Value,
pub resolution: Option<LiveCameraResolution>,
pub fps: Option<f32>,
#[serde(default)]
pub enabled_pipelines: Vec<String>,
pub detection_stride: Option<u32>,
pub ocr_interval: Option<u32>,
pub fps_budget: Option<f32>,
#[serde(default)]
pub roi_zones: Vec<serde_json::Value>,
#[serde(default)]
pub alert_rules: Vec<String>,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct LiveSessionCommandRequest {
pub camera_id: String,
pub enabled_pipelines: Option<Vec<String>>,
pub detection_stride: Option<u32>,
pub ocr_interval: Option<u32>,
pub fps_budget: Option<f32>,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct LiveFrameRequest {
pub camera_id: String,
pub image_base64: String,
pub frame_index: Option<u32>,
pub timestamp_ms: Option<u64>,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct LiveCameraResolution {
pub width: u32,
pub height: u32,
}
#[derive(Serialize, Clone)]
pub struct LiveCameraHealth {
pub connected: bool,
pub analysis_active: bool,
pub reconnect_attempts: u32,
pub dropped_frames: u32,
pub events_emitted: u32,
pub last_frame_at: Option<String>,
pub last_event_at: Option<String>,
pub last_error: Option<String>,
pub ingest_mode: String,
}
#[derive(Serialize, Clone)]
pub struct LiveEvent {
pub event_id: String,
pub camera_id: String,
#[serde(rename = "type")]
pub event_type: String,
pub severity: String,
pub timestamp: String,
pub summary: String,
pub payload: serde_json::Value,
}
#[derive(Serialize, Clone)]
pub struct LiveCameraSession {
pub camera_id: String,
pub source_type: String,
pub transport: String,
pub url: Option<String>,
pub device_id: Option<String>,
pub auth: serde_json::Value,
pub resolution: LiveCameraResolution,
pub fps: f32,
pub status: String,
pub health: LiveCameraHealth,
pub enabled_pipelines: Vec<String>,
pub detection_stride: u32,
pub ocr_interval: u32,
pub fps_budget: f32,
pub roi_zones: Vec<serde_json::Value>,
pub alert_rules: Vec<String>,
pub latest_snapshot: Option<String>,
pub latest_result: serde_json::Value,
pub recent_events: Vec<LiveEvent>,
pub timeline: Vec<SceneSegment>,
pub tracks: Vec<TrackedObject>,
}
#[derive(Serialize)]
pub struct LiveCameraCatalogResponse {
pub summary: String,
pub cameras: Vec<LiveCameraSession>,
}
#[derive(Serialize)]
pub struct LiveCameraResponse {
pub summary: String,
pub camera: LiveCameraSession,
}
#[derive(Serialize)]
pub struct LiveSnapshotResponse {
pub summary: String,
pub camera_id: String,
pub snapshot_data_url: Option<String>,
}
#[derive(Serialize)]
pub struct LiveEventsResponse {
pub summary: String,
pub camera_id: String,
pub events: Vec<LiveEvent>,
}
#[derive(Serialize)]
pub struct LiveFrameResponse {
pub summary: String,
pub camera: LiveCameraSession,
pub events: Vec<LiveEvent>,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct LiveCameraConfigRequest {
#[serde(default)]
pub roi_zones: Vec<serde_json::Value>,
#[serde(default)]
pub alert_rules: Vec<String>,
#[serde(default)]
pub enabled_pipelines: Vec<String>,
pub fps_budget: Option<f32>,
}
#[derive(Deserialize, Default)]
pub struct VisionDashboardQuery {
pub search: Option<String>,
}
#[derive(Serialize)]
pub struct VisionDashboardResponse {
pub summary: String,
pub cameras: Vec<serde_json::Value>,
pub recent_events: Vec<vision_store::StoredCameraEvent>,
pub recent_scenes: Vec<vision_store::StoredTimelineScene>,
pub reid_matches: Vec<vision_store::StoredReidMatch>,
}
#[derive(Deserialize)]
pub struct OnvifDiscoverRequest {
pub username: Option<String>,
pub password: Option<String>,
pub timeout_ms: Option<u64>,
}
#[derive(Serialize)]
pub struct OnvifDiscoverResponse {
pub summary: String,
pub devices: Vec<onvif_discovery::OnvifDevice>,
}
#[derive(Deserialize)]
pub struct TimelineSnapshotsQuery {
pub start_frame: Option<i32>,
pub end_frame: Option<i32>,
pub limit: Option<i64>,
}
#[derive(Serialize)]
pub struct TimelineSnapshotsResponse {
pub summary: String,
pub camera_id: String,
pub snapshots: Vec<snapshot_store::SnapshotEntry>,
}
#[derive(Deserialize)]
pub struct CameraAccessGrantRequest {
pub principal_type: String,
pub principal_id: String,
pub permission: String,
}
#[derive(Serialize)]
pub struct CameraAccessResponse {
pub summary: String,
pub camera_id: String,
pub entries: Vec<camera_auth::CameraAccessEntry>,
}
#[derive(Serialize)]
pub struct CameraNarrationsResponse {
pub summary: String,
pub camera_id: String,
pub narrations: Vec<event_narrator::EventNarration>,
}
#[derive(Deserialize)]
pub struct RotateCredentialsRequest {
pub new_key: String,
}
#[derive(Deserialize)]
pub struct WsAuthQuery {
pub token: Option<String>,
}
#[derive(Serialize)]
pub struct RotateCredentialsResponse {
pub summary: String,
pub rotated_credentials: u64,
pub active_key_id: String,
}
#[derive(Serialize)]
pub struct GpuJobsResponse {
pub summary: String,
pub jobs: Vec<crate::gpu_scheduler::GpuJobInfo>,
}
#[derive(Serialize)]
pub struct GpuResourcesResponse {
pub summary: String,
pub resources: Vec<crate::gpu_scheduler::GpuResourceInfo>,
}
fn error_response(status: StatusCode, message: &str) -> (StatusCode, Json<serde_json::Value>) {
(status, Json(serde_json::json!({ "error": message })))
}
fn validate_image_request(
req: &VisionImageRequest,
) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
let has_url = req
.image_url
.as_deref()
.map(str::trim)
.is_some_and(|value| !value.is_empty());
let has_base64 = req
.image_base64
.as_deref()
.map(str::trim)
.is_some_and(|value| !value.is_empty());
if has_url == has_base64 {
return Err(error_response(
StatusCode::BAD_REQUEST,
"Norādi tieši vienu no image_url vai image_base64.",
));
}
if matches!(req.max_detections, Some(0)) {
return Err(error_response(
StatusCode::BAD_REQUEST,
"max_detections jābūt lielākam par 0.",
));
}
if req
.confidence_threshold
.is_some_and(|value| !value.is_finite() || !(0.0..=1.0).contains(&value))
{
return Err(error_response(
StatusCode::BAD_REQUEST,
"confidence_threshold jābūt diapazonā no 0 līdz 1.",
));
}
Ok(())
}
fn validate_frame_request(
req: &VisionFrameSequenceRequest,
) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
if req.frames_base64.is_empty() {
return Err(error_response(
StatusCode::BAD_REQUEST,
"frames_base64 nedrīkst būt tukšs.",
));
}
if req
.frames_base64
.iter()
.any(|frame| frame.trim().is_empty())
{
return Err(error_response(
StatusCode::BAD_REQUEST,
"frames_base64 satur tukšu kadru.",
));
}
if matches!(req.max_detections, Some(0)) {
return Err(error_response(
StatusCode::BAD_REQUEST,
"max_detections jābūt lielākam par 0.",
));
}
if req
.confidence_threshold
.is_some_and(|value| !value.is_finite() || !(0.0..=1.0).contains(&value))
{
return Err(error_response(
StatusCode::BAD_REQUEST,
"confidence_threshold jābūt diapazonā no 0 līdz 1.",
));
}
Ok(())
}
fn validate_live_connect_request(
req: &LiveCameraConnectRequest,
) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
if req.source_type.trim().is_empty() {
return Err(error_response(
StatusCode::BAD_REQUEST,
"source_type nedrīkst būt tukšs.",
));
}
if req.transport.trim().is_empty() {
return Err(error_response(
StatusCode::BAD_REQUEST,
"transport nedrīkst būt tukšs.",
));
}
let has_url = req.url.as_deref().map(str::trim).is_some_and(|value| !value.is_empty());
let has_device = req
.device_id
.as_deref()
.map(str::trim)
.is_some_and(|value| !value.is_empty());
if !has_url && !has_device {
return Err(error_response(
StatusCode::BAD_REQUEST,
"Norādi url vai device_id kamerai.",
));
}
if req.fps.is_some_and(|value| !value.is_finite() || value <= 0.0) {
return Err(error_response(
StatusCode::BAD_REQUEST,
"fps jābūt pozitīvam skaitlim.",
));
}
if req
.fps_budget
.is_some_and(|value| !value.is_finite() || value <= 0.0)
{
return Err(error_response(
StatusCode::BAD_REQUEST,
"fps_budget jābūt pozitīvam skaitlim.",
));
}
Ok(())
}
fn validate_live_command_request(
req: &LiveSessionCommandRequest,
) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
if req.camera_id.trim().is_empty() {
return Err(error_response(
StatusCode::BAD_REQUEST,
"camera_id nedrīkst būt tukšs.",
));
}
if req
.fps_budget
.is_some_and(|value| !value.is_finite() || value <= 0.0)
{
return Err(error_response(
StatusCode::BAD_REQUEST,
"fps_budget jābūt pozitīvam skaitlim.",
));
}
Ok(())
}
fn validate_live_config_request(
req: &LiveCameraConfigRequest,
) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
if req
.fps_budget
.is_some_and(|value| !value.is_finite() || value <= 0.0)
{
return Err(error_response(
StatusCode::BAD_REQUEST,
"fps_budget jābūt pozitīvam skaitlim.",
));
}
Ok(())
}
fn map_bbox(
bbox: crate::inference::response_validator::VisionBoundingBoxPayload,
) -> VisionBoundingBox {
VisionBoundingBox {
x: bbox.x,
y: bbox.y,
width: bbox.width,
height: bbox.height,
}
}
fn map_detection(item: VisionDetectionPayload) -> VisionDetection {
VisionDetection {
label: item.label,
confidence: item.confidence,
bbox: map_bbox(item.bbox),
}
}
fn map_ocr_block(item: OcrBlockPayload) -> OcrBlock {
OcrBlock {
text: item.text,
confidence: item.confidence,
bbox: map_bbox(item.bbox),
language: item.language,
}
}
fn map_pose(item: VisionPosePayload) -> PoseDetection {
PoseDetection {
person_id: item.person_id,
confidence: item.confidence,
bbox: map_bbox(item.bbox),
keypoints: item
.keypoints
.into_iter()
.map(|point| PoseKeypoint {
name: point.name,
x: point.x,
y: point.y,
confidence: point.confidence,
})
.collect(),
connections: item
.connections
.into_iter()
.map(|connection| PoseConnection {
start: connection.start,
end: connection.end,
})
.collect(),
}
}
fn map_mask(item: SegmentationMaskPayload) -> SegmentationMask {
SegmentationMask {
label: item.label,
confidence: item.confidence,
mask_data_url: item.mask_data_url,
bbox: map_bbox(item.bbox),
area_pixels: item.area_pixels,
}
}
fn map_action(item: VisionActionPayload) -> ActionPrediction {
ActionPrediction {
action: item.action,
confidence: item.confidence,
subject_label: item.subject_label,
rationale: item.rationale,
}
}
fn map_track(item: TrackedObjectPayload) -> TrackedObject {
TrackedObject {
track_id: item.track_id,
label: item.label,
average_confidence: item.average_confidence,
observations: item
.observations
.into_iter()
.map(|observation| TrackObservation {
frame_index: observation.frame_index,
confidence: observation.confidence,
bbox: map_bbox(observation.bbox),
})
.collect(),
}
}
fn map_frame(item: FrameAnalysisPayload) -> FrameAnalysis {
FrameAnalysis {
frame_index: item.frame_index,
summary: item.summary,
detections: item.detections.into_iter().map(map_detection).collect(),
dominant_labels: item.dominant_labels,
brightness: item.brightness,
}
}
fn map_scene(item: SceneSegmentPayload) -> SceneSegment {
SceneSegment {
scene_index: item.scene_index,
start_frame: item.start_frame,
end_frame: item.end_frame,
summary: item.summary,
dominant_labels: item.dominant_labels,
average_brightness: item.average_brightness,
}
}
fn map_live_resolution(item: LiveCameraResolutionPayload) -> LiveCameraResolution {
LiveCameraResolution {
width: item.width,
height: item.height,
}
}
fn map_live_health(item: LiveCameraHealthPayload) -> LiveCameraHealth {
LiveCameraHealth {
connected: item.connected,
analysis_active: item.analysis_active,
reconnect_attempts: item.reconnect_attempts,
dropped_frames: item.dropped_frames,
events_emitted: item.events_emitted,
last_frame_at: item.last_frame_at,
last_event_at: item.last_event_at,
last_error: item.last_error,
ingest_mode: item.ingest_mode,
}
}
pub(crate) fn map_live_event(item: LiveEventPayload) -> LiveEvent {
LiveEvent {
event_id: item.event_id,
camera_id: item.camera_id,
event_type: item.event_type,
severity: item.severity,
timestamp: item.timestamp,
summary: item.summary,
payload: item.payload,
}
}
pub(crate) fn map_live_camera(item: LiveCameraSessionPayload) -> LiveCameraSession {
LiveCameraSession {
camera_id: item.camera_id,
source_type: item.source_type,
transport: item.transport,
url: item.url,
device_id: item.device_id,
auth: item.auth,
resolution: map_live_resolution(item.resolution),
fps: item.fps,
status: item.status,
health: map_live_health(item.health),
enabled_pipelines: item.enabled_pipelines,
detection_stride: item.detection_stride,
ocr_interval: item.ocr_interval,
fps_budget: item.fps_budget,
roi_zones: item.roi_zones,
alert_rules: item.alert_rules,
latest_snapshot: item.latest_snapshot,
latest_result: item.latest_result,
recent_events: item.recent_events.into_iter().map(map_live_event).collect(),
timeline: item.timeline.into_iter().map(map_scene).collect(),
tracks: item.tracks.into_iter().map(map_track).collect(),
}
}
#[derive(Deserialize)]
#[serde(tag = "type")]
enum LiveClientMessage {
#[serde(rename = "frame")]
Frame {
image_base64: String,
frame_index: Option<u32>,
timestamp_ms: Option<u64>,
},
#[serde(rename = "ping")]
Ping,
}
#[derive(Serialize)]
#[serde(tag = "type")]
enum LiveServerMessage {
#[serde(rename = "connected")]
Connected { camera: LiveCameraSession },
#[serde(rename = "analysis")]
Analysis {
summary: String,
camera: LiveCameraSession,
events: Vec<LiveEvent>,
},
#[serde(rename = "pong")]
Pong,
#[serde(rename = "error")]
Error { message: String },
}
async fn send_live_message(
tx: &mut futures_util::stream::SplitSink<WebSocket, Message>,
message: &LiveServerMessage,
) {
let payload = serde_json::to_string(message).unwrap_or_default();
let _ = tx.send(Message::Text(payload)).await;
}
async fn persist_live_state(state: &AppState, camera: &LiveCameraSession, events: &[LiveEvent]) {
if let Some(pool) = &state.postgres {
if let Err(error) = vision_store::persist_session(pool, camera, events).await {
tracing::warn!("Neizdevās persistēt vision live state: {}", error);
}
}
for event in events {
let _ = state.event_bus.publish(EventEnvelope {
event_kind: "camera_event".to_string(),
camera_id: event.camera_id.clone(),
event_id: Some(event.event_id.clone()),
event_type: event.event_type.clone(),
severity: event.severity.clone(),
summary: event.summary.clone(),
narration_url: None,
payload: event.payload.clone(),
created_at: event.timestamp.clone(),
});
}
}
async fn resolve_camera_principal(
state: &AppState,
headers: &HeaderMap,
) -> Result<Option<CameraPrincipal>, (StatusCode, Json<serde_json::Value>)> {
camera_auth::resolve_principal(headers, state.postgres.as_ref())
.await
.map(|principal| {
if principal.is_some() {
return principal;
}
headers
.get(axum::http::header::AUTHORIZATION)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.strip_prefix("Bearer "))
.and_then(|token| jwt_auth::verify_access_token(&state.settings, token).ok())
.map(|claims| CameraPrincipal {
principal_type: "user".to_string(),
principal_id: claims.sub,
})
})
.map_err(|error| error_response(StatusCode::UNAUTHORIZED, &error.to_string()))
}
async fn ensure_camera_permission(
state: &AppState,
headers: &HeaderMap,
camera_id: &str,
permission: &str,
) -> Result<Option<CameraPrincipal>, (StatusCode, Json<serde_json::Value>)> {
let principal = resolve_camera_principal(state, headers).await?;
let allowed = camera_auth::ensure_access(
state.postgres.as_ref(),
camera_id,
principal.as_ref(),
permission,
state.settings.camera_auth_required,
)
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
if !allowed {
return Err(error_response(
StatusCode::FORBIDDEN,
"Trūkst piekļuves tiesību šai kamerai.",
));
}
Ok(principal)
}
pub async fn analyze_image(
Json(req): Json<VisionImageRequest>,
) -> Result<Json<VisionAnalyzeResponse>, (StatusCode, Json<serde_json::Value>)> {
validate_image_request(&req)?;
let bridge = PythonBridge::new();
let payload = serde_json::json!({
"image_url": req.image_url,
"image_base64": req.image_base64,
"max_detections": req.max_detections.unwrap_or(10),
"confidence_threshold": req.confidence_threshold.unwrap_or(0.25),
});
match bridge
.call::<VisionAnalysisPayload>("vision/analyze", &payload)
.await
{
Ok(result) => Ok(Json(VisionAnalyzeResponse {
summary: result.summary,
detections: result.detections.into_iter().map(map_detection).collect(),
width: result.width,
height: result.height,
model: result.model,
fallback_used: result.fallback_used,
})),
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn ocr_image(
Json(req): Json<VisionImageRequest>,
) -> Result<Json<VisionOcrResponse>, (StatusCode, Json<serde_json::Value>)> {
validate_image_request(&req)?;
let bridge = PythonBridge::new();
let payload = serde_json::json!({
"image_url": req.image_url,
"image_base64": req.image_base64,
"max_detections": req.max_detections.unwrap_or(10),
"confidence_threshold": req.confidence_threshold.unwrap_or(0.25),
});
match bridge
.call::<crate::inference::response_validator::VisionOcrPayload>("vision/ocr", &payload)
.await
{
Ok(result) => Ok(Json(VisionOcrResponse {
summary: result.summary,
results: result.results.into_iter().map(map_ocr_block).collect(),
width: result.width,
height: result.height,
model: result.model,
fallback_used: result.fallback_used,
})),
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn estimate_pose(
Json(req): Json<VisionImageRequest>,
) -> Result<Json<VisionPoseResponse>, (StatusCode, Json<serde_json::Value>)> {
validate_image_request(&req)?;
let bridge = PythonBridge::new();
let payload = serde_json::json!({
"image_url": req.image_url,
"image_base64": req.image_base64,
"max_detections": req.max_detections.unwrap_or(10),
"confidence_threshold": req.confidence_threshold.unwrap_or(0.25),
});
match bridge
.call::<crate::inference::response_validator::VisionPoseResponsePayload>(
"vision/pose-estimate",
&payload,
)
.await
{
Ok(result) => Ok(Json(VisionPoseResponse {
summary: result.summary,
poses: result.poses.into_iter().map(map_pose).collect(),
width: result.width,
height: result.height,
model: result.model,
fallback_used: result.fallback_used,
})),
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn segment_image(
Json(req): Json<VisionImageRequest>,
) -> Result<Json<VisionSegmentationResponse>, (StatusCode, Json<serde_json::Value>)> {
validate_image_request(&req)?;
let bridge = PythonBridge::new();
let payload = serde_json::json!({
"image_url": req.image_url,
"image_base64": req.image_base64,
"max_detections": req.max_detections.unwrap_or(10),
"confidence_threshold": req.confidence_threshold.unwrap_or(0.25),
});
match bridge
.call::<crate::inference::response_validator::VisionSegmentationPayload>(
"vision/segment",
&payload,
)
.await
{
Ok(result) => Ok(Json(VisionSegmentationResponse {
summary: result.summary,
masks: result.masks.into_iter().map(map_mask).collect(),
width: result.width,
height: result.height,
model: result.model,
fallback_used: result.fallback_used,
})),
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn recognize_action(
Json(req): Json<VisionImageRequest>,
) -> Result<Json<VisionActionResponse>, (StatusCode, Json<serde_json::Value>)> {
validate_image_request(&req)?;
let bridge = PythonBridge::new();
let payload = serde_json::json!({
"image_url": req.image_url,
"image_base64": req.image_base64,
"max_detections": req.max_detections.unwrap_or(10),
"confidence_threshold": req.confidence_threshold.unwrap_or(0.25),
});
match bridge
.call::<crate::inference::response_validator::VisionActionResponsePayload>(
"vision/action-recognize",
&payload,
)
.await
{
Ok(result) => Ok(Json(VisionActionResponse {
summary: result.summary,
actions: result.actions.into_iter().map(map_action).collect(),
width: result.width,
height: result.height,
model: result.model,
fallback_used: result.fallback_used,
})),
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn track_objects(
Json(req): Json<VisionFrameSequenceRequest>,
) -> Result<Json<VisionTrackingResponse>, (StatusCode, Json<serde_json::Value>)> {
validate_frame_request(&req)?;
let bridge = PythonBridge::new();
let payload = serde_json::json!({
"frames_base64": req.frames_base64,
"max_detections": req.max_detections.unwrap_or(10),
"confidence_threshold": req.confidence_threshold.unwrap_or(0.25),
});
match bridge
.call::<crate::inference::response_validator::VisionTrackingPayload>(
"vision/tracking",
&payload,
)
.await
{
Ok(result) => Ok(Json(VisionTrackingResponse {
summary: result.summary,
tracks: result.tracks.into_iter().map(map_track).collect(),
frame_count: result.frame_count,
model: result.model,
fallback_used: result.fallback_used,
})),
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn analyze_frames(
Json(req): Json<VisionFrameSequenceRequest>,
) -> Result<Json<VisionFrameAnalysisResponse>, (StatusCode, Json<serde_json::Value>)> {
validate_frame_request(&req)?;
let bridge = PythonBridge::new();
let payload = serde_json::json!({
"frames_base64": req.frames_base64,
"max_detections": req.max_detections.unwrap_or(10),
"confidence_threshold": req.confidence_threshold.unwrap_or(0.25),
});
match bridge
.call::<crate::inference::response_validator::VisionFrameAnalysisResponsePayload>(
"vision/frame-analysis",
&payload,
)
.await
{
Ok(result) => Ok(Json(VisionFrameAnalysisResponse {
summary: result.summary,
frames: result.frames.into_iter().map(map_frame).collect(),
frame_count: result.frame_count,
model: result.model,
fallback_used: result.fallback_used,
})),
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn scene_timeline(
Json(req): Json<VisionFrameSequenceRequest>,
) -> Result<Json<VisionSceneTimelineResponse>, (StatusCode, Json<serde_json::Value>)> {
validate_frame_request(&req)?;
let bridge = PythonBridge::new();
let payload = serde_json::json!({
"frames_base64": req.frames_base64,
"max_detections": req.max_detections.unwrap_or(10),
"confidence_threshold": req.confidence_threshold.unwrap_or(0.25),
});
match bridge
.call::<crate::inference::response_validator::VisionSceneTimelinePayload>(
"vision/scene-timeline",
&payload,
)
.await
{
Ok(result) => Ok(Json(VisionSceneTimelineResponse {
summary: result.summary,
scenes: result.scenes.into_iter().map(map_scene).collect(),
frame_count: result.frame_count,
model: result.model,
fallback_used: result.fallback_used,
})),
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn list_live_cameras(
State(state): State<AppState>,
headers: HeaderMap,
) -> Result<Json<LiveCameraCatalogResponse>, (StatusCode, Json<serde_json::Value>)> {
if state.settings.camera_auth_required {
let _ = resolve_camera_principal(&state, &headers).await?.ok_or_else(|| {
error_response(StatusCode::UNAUTHORIZED, "Norādi x-user-id vai x-api-key.")
})?;
}
let bridge = PythonBridge::new();
match bridge.get::<LiveCameraCatalogPayload>("vision/live/cameras").await {
Ok(result) => {
let cameras = result
.cameras
.into_iter()
.map(map_live_camera)
.collect::<Vec<_>>();
for camera in &cameras {
persist_live_state(&state, camera, &camera.recent_events).await;
}
Ok(Json(LiveCameraCatalogResponse {
summary: result.summary,
cameras,
}))
}
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn connect_live_camera(
State(state): State<AppState>,
headers: HeaderMap,
Json(req): Json<LiveCameraConnectRequest>,
) -> Result<Json<LiveCameraResponse>, (StatusCode, Json<serde_json::Value>)> {
validate_live_connect_request(&req)?;
let principal = resolve_camera_principal(&state, &headers).await?;
if state.settings.camera_auth_required && principal.is_none() {
return Err(error_response(
StatusCode::UNAUTHORIZED,
"Norādi x-user-id vai x-api-key, lai pieslēgtu kameru.",
));
}
let bridge = PythonBridge::new();
let payload = serde_json::to_value(&req).unwrap_or_else(|_| serde_json::json!({}));
match bridge
.call::<LiveCameraResponsePayload>("vision/live/connect", &payload)
.await
{
Ok(result) => {
let camera = map_live_camera(result.camera);
persist_live_state(&state, &camera, &camera.recent_events).await;
if let (Some(pool), Some(principal)) = (&state.postgres, principal.as_ref()) {
for permission in ["view", "configure", "delete"] {
let _ = camera_auth::grant_access(
pool,
&camera.camera_id,
&principal.principal_type,
&principal.principal_id,
permission,
)
.await;
}
}
if let (Some(pool), Some(crypto)) = (&state.postgres, &state.credential_crypto) {
let username = req.auth.get("username").and_then(|value| value.as_str());
let password = req.auth.get("password").and_then(|value| value.as_str());
let bearer = req.auth.get("bearer_token").and_then(|value| value.as_str());
if username.is_some() || password.is_some() || bearer.is_some() {
let custom_headers = req
.auth
.get("headers")
.cloned()
.unwrap_or_else(|| serde_json::json!({}));
let _ = camera_auth::upsert_credentials(
pool,
crypto,
&camera.camera_id,
username,
password,
bearer,
&custom_headers,
principal.as_ref(),
)
.await;
}
}
if camera.source_type == "onvif" {
if let Some(pool) = &state.postgres {
let credentials = OnvifCredentials {
username: req
.auth
.get("username")
.and_then(|value| value.as_str())
.map(ToString::to_string),
password: req
.auth
.get("password")
.and_then(|value| value.as_str())
.map(ToString::to_string),
};
if let Some(url) = camera.url.as_deref() {
if let Ok(media_service_url) =
onvif_discovery::discover(&state.http_client, credentials.clone(), 750).await
{
let metadata = serde_json::json!({
"onvif": {
"discovered_devices": media_service_url,
"connect_url": url,
}
});
let _ =
vision_store::update_camera_metadata(pool, &camera.camera_id, &metadata).await;
}
}
}
}
Ok(Json(LiveCameraResponse {
summary: result.summary,
camera,
}))
}
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn start_live_camera(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
Json(mut req): Json<LiveSessionCommandRequest>,
) -> Result<Json<LiveCameraResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?;
req.camera_id = camera_id;
validate_live_command_request(&req)?;
let bridge = PythonBridge::new();
let payload = serde_json::to_value(&req).unwrap_or_else(|_| serde_json::json!({}));
match bridge
.call::<LiveCameraResponsePayload>("vision/live/start", &payload)
.await
{
Ok(result) => {
let camera = map_live_camera(result.camera);
persist_live_state(&state, &camera, &camera.recent_events).await;
if vision_runtime::should_run_background_worker(&LiveCameraConnectRequest {
camera_id: Some(camera.camera_id.clone()),
source_type: camera.source_type.clone(),
transport: camera.transport.clone(),
url: camera.url.clone(),
device_id: camera.device_id.clone(),
auth: camera.auth.clone(),
resolution: Some(camera.resolution.clone()),
fps: Some(camera.fps),
enabled_pipelines: camera.enabled_pipelines.clone(),
detection_stride: Some(camera.detection_stride),
ocr_interval: Some(camera.ocr_interval),
fps_budget: Some(camera.fps_budget),
roi_zones: camera.roi_zones.clone(),
alert_rules: camera.alert_rules.clone(),
}) {
if let Err(error) = state
.vision_runtime
.start_worker(state.clone(), camera.clone())
.await
{
tracing::warn!("Neizdevās startēt ingest worker: {}", error);
}
}
Ok(Json(LiveCameraResponse {
summary: result.summary,
camera,
}))
}
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn pause_live_camera(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
) -> Result<Json<LiveCameraResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?;
let req = LiveSessionCommandRequest {
camera_id,
enabled_pipelines: None,
detection_stride: None,
ocr_interval: None,
fps_budget: None,
};
let bridge = PythonBridge::new();
let payload = serde_json::to_value(&req).unwrap_or_else(|_| serde_json::json!({}));
match bridge
.call::<LiveCameraResponsePayload>("vision/live/pause", &payload)
.await
{
Ok(result) => {
let camera = map_live_camera(result.camera);
state.vision_runtime.stop_worker(&camera.camera_id).await;
persist_live_state(&state, &camera, &camera.recent_events).await;
Ok(Json(LiveCameraResponse {
summary: result.summary,
camera,
}))
}
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn stop_live_camera(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
) -> Result<Json<LiveCameraResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "delete").await?;
let req = LiveSessionCommandRequest {
camera_id,
enabled_pipelines: None,
detection_stride: None,
ocr_interval: None,
fps_budget: None,
};
let bridge = PythonBridge::new();
let payload = serde_json::to_value(&req).unwrap_or_else(|_| serde_json::json!({}));
match bridge
.call::<LiveCameraResponsePayload>("vision/live/stop", &payload)
.await
{
Ok(result) => {
let camera = map_live_camera(result.camera);
state.vision_runtime.stop_worker(&camera.camera_id).await;
persist_live_state(&state, &camera, &camera.recent_events).await;
Ok(Json(LiveCameraResponse {
summary: result.summary,
camera,
}))
}
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn live_camera_snapshot(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
) -> Result<Json<LiveSnapshotResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?;
let bridge = PythonBridge::new();
let endpoint = format!("vision/live/{}/snapshot", camera_id);
match bridge.get::<LiveSnapshotPayload>(&endpoint).await {
Ok(result) => {
if let Some(pool) = &state.postgres {
let _ = sqlx::query(
"UPDATE vision_cameras SET latest_snapshot_url = $2 WHERE camera_id = $1",
)
.bind(&result.camera_id)
.bind(&result.snapshot_data_url)
.execute(pool)
.await;
}
Ok(Json(LiveSnapshotResponse {
summary: result.summary,
camera_id: result.camera_id,
snapshot_data_url: result.snapshot_data_url,
}))
}
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn live_camera_state(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
) -> Result<Json<LiveCameraResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?;
let bridge = PythonBridge::new();
let endpoint = format!("vision/live/{}/state", camera_id);
match bridge.get::<LiveCameraResponsePayload>(&endpoint).await {
Ok(result) => {
let camera = map_live_camera(result.camera);
persist_live_state(&state, &camera, &camera.recent_events).await;
Ok(Json(LiveCameraResponse {
summary: result.summary,
camera,
}))
}
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn live_camera_events(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
) -> Result<Json<LiveEventsResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?;
let bridge = PythonBridge::new();
let endpoint = format!("vision/live/{}/events", camera_id);
match bridge.get::<LiveEventsResponsePayload>(&endpoint).await {
Ok(result) => {
let events = result.events.into_iter().map(map_live_event).collect::<Vec<_>>();
if let Some(pool) = &state.postgres {
let _ = vision_store::insert_events(pool, &result.camera_id, &events).await;
let _ = vision_store::insert_reid_matches(pool, &result.camera_id, &events).await;
}
Ok(Json(LiveEventsResponse {
summary: result.summary,
camera_id: result.camera_id,
events,
}))
}
Err(e) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&e.to_string(),
)),
}
}
pub async fn update_live_camera_config(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
Json(req): Json<LiveCameraConfigRequest>,
) -> Result<Json<LiveCameraResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?;
validate_live_config_request(&req)?;
let bridge = PythonBridge::new();
let payload = serde_json::json!({
"camera_id": camera_id,
"roi_zones": req.roi_zones,
"alert_rules": req.alert_rules,
"enabled_pipelines": req.enabled_pipelines,
"fps_budget": req.fps_budget,
});
match bridge
.call::<LiveCameraResponsePayload>("vision/live/config", &payload)
.await
{
Ok(result) => {
let camera = map_live_camera(result.camera);
persist_live_state(&state, &camera, &camera.recent_events).await;
if let Some(pool) = &state.postgres {
let alert_rules = serde_json::Value::Array(
camera
.alert_rules
.iter()
.cloned()
.map(serde_json::Value::String)
.collect(),
);
let roi_zones = serde_json::Value::Array(camera.roi_zones.clone());
let _ = vision_store::update_camera_config(
pool,
&camera.camera_id,
&roi_zones,
&alert_rules,
&camera.enabled_pipelines,
camera.fps_budget,
)
.await;
}
Ok(Json(LiveCameraResponse {
summary: result.summary,
camera,
}))
}
Err(error) => Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
&error.to_string(),
)),
}
}
pub async fn vision_dashboard(
State(state): State<AppState>,
headers: HeaderMap,
Query(query): Query<VisionDashboardQuery>,
) -> Result<Json<VisionDashboardResponse>, (StatusCode, Json<serde_json::Value>)> {
if state.settings.camera_auth_required {
let _ = resolve_camera_principal(&state, &headers).await?.ok_or_else(|| {
error_response(StatusCode::UNAUTHORIZED, "Norādi x-user-id vai x-api-key.")
})?;
}
let VisionDashboardData {
cameras,
recent_events,
recent_scenes,
reid_matches,
} = match &state.postgres {
Some(pool) => vision_store::load_dashboard(pool, query.search.as_deref())
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?,
None => VisionDashboardData {
cameras: Vec::new(),
recent_events: Vec::new(),
recent_scenes: Vec::new(),
reid_matches: Vec::new(),
},
};
Ok(Json(VisionDashboardResponse {
summary: format!("Multi-camera dashboard satur {} kameras.", cameras.len()),
cameras,
recent_events,
recent_scenes,
reid_matches,
}))
}
pub async fn discover_onvif_devices(
State(state): State<AppState>,
headers: HeaderMap,
Json(req): Json<OnvifDiscoverRequest>,
) -> Result<Json<OnvifDiscoverResponse>, (StatusCode, Json<serde_json::Value>)> {
if state.settings.camera_auth_required {
let _ = resolve_camera_principal(&state, &headers).await?.ok_or_else(|| {
error_response(StatusCode::UNAUTHORIZED, "Norādi x-user-id vai x-api-key.")
})?;
}
let devices = onvif_discovery::discover(
&state.http_client,
OnvifCredentials {
username: req.username,
password: req.password,
},
req.timeout_ms.unwrap_or(2_000),
)
.await
.map_err(|error| error_response(StatusCode::BAD_GATEWAY, &error.to_string()))?;
Ok(Json(OnvifDiscoverResponse {
summary: format!("Atrastas {} ONVIF ierīces.", devices.len()),
devices,
}))
}
pub async fn camera_timeline_snapshots(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
Query(query): Query<TimelineSnapshotsQuery>,
) -> Result<Json<TimelineSnapshotsResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?;
let pool = state
.postgres
.as_ref()
.ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?;
let snapshots = snapshot_store::list_snapshots(
pool,
&camera_id,
query.start_frame,
query.end_frame,
query.limit.unwrap_or(120),
)
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
Ok(Json(TimelineSnapshotsResponse {
summary: format!("Atrasti {} snapshot timeline ieraksti.", snapshots.len()),
camera_id,
snapshots,
}))
}
pub async fn camera_frame_snapshot(
State(state): State<AppState>,
headers: HeaderMap,
Path((camera_id, frame_index)): Path<(String, i32)>,
) -> Result<Json<TimelineSnapshotsResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?;
let pool = state
.postgres
.as_ref()
.ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?;
let snapshot = snapshot_store::get_snapshot_by_frame(pool, &camera_id, frame_index)
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
Ok(Json(TimelineSnapshotsResponse {
summary: if snapshot.is_some() {
"Atrasts snapshot.".to_string()
} else {
"Snapshot netika atrasts.".to_string()
},
camera_id,
snapshots: snapshot.into_iter().collect(),
}))
}
pub async fn list_camera_access(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
) -> Result<Json<CameraAccessResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?;
let pool = state
.postgres
.as_ref()
.ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?;
let entries = camera_auth::list_access(pool, &camera_id)
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
Ok(Json(CameraAccessResponse {
summary: format!("Atrastas {} access tiesības.", entries.len()),
camera_id,
entries,
}))
}
pub async fn rotate_camera_credentials(
State(state): State<AppState>,
headers: HeaderMap,
Json(req): Json<RotateCredentialsRequest>,
) -> Result<Json<RotateCredentialsResponse>, (StatusCode, Json<serde_json::Value>)> {
let principal = resolve_camera_principal(&state, &headers).await?;
let pool = state
.postgres
.as_ref()
.ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?;
let crypto = state
.credential_crypto
.as_ref()
.ok_or_else(|| error_response(StatusCode::BAD_REQUEST, "Camera encryption nav konfigurēta."))?;
let metadata = crypto
.rotate_key(req.new_key)
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
let rotated = camera_auth::rotate_credentials(pool, crypto, principal.as_ref())
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
Ok(Json(RotateCredentialsResponse {
summary: format!("Pāršifrēti {} credential ieraksti.", rotated),
rotated_credentials: rotated,
active_key_id: metadata.key_id,
}))
}
pub async fn grant_camera_access(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
Json(req): Json<CameraAccessGrantRequest>,
) -> Result<Json<CameraAccessResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?;
let pool = state
.postgres
.as_ref()
.ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?;
camera_auth::grant_access(
pool,
&camera_id,
&req.principal_type,
&req.principal_id,
&req.permission,
)
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
list_camera_access(State(state), headers, Path(camera_id)).await
}
pub async fn revoke_camera_access(
State(state): State<AppState>,
headers: HeaderMap,
Path((camera_id, access_id)): Path<(String, String)>,
) -> Result<Json<CameraAccessResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?;
let pool = state
.postgres
.as_ref()
.ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?;
camera_auth::revoke_access(pool, &access_id)
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
list_camera_access(State(state), headers, Path(camera_id)).await
}
pub async fn generate_camera_event_narration(
State(state): State<AppState>,
headers: HeaderMap,
Path((camera_id, event_id)): Path<(String, String)>,
) -> Result<Json<event_narrator::EventNarration>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?;
let narration = event_narrator::narrate_event(&state, &camera_id, &event_id)
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
let _ = state.event_bus.publish(EventEnvelope {
event_kind: "narration_ready".to_string(),
camera_id: camera_id.clone(),
event_id: Some(narration.event_id.clone()),
event_type: "narration_ready".to_string(),
severity: "info".to_string(),
summary: "Kamerai ir pieejams narration audio.".to_string(),
narration_url: Some(narration.narration_url.clone()),
payload: serde_json::json!({ "language": narration.language }),
created_at: chrono::Utc::now().to_rfc3339(),
});
Ok(Json(narration))
}
pub async fn list_camera_narrations(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
) -> Result<Json<CameraNarrationsResponse>, (StatusCode, Json<serde_json::Value>)> {
let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?;
let pool = state
.postgres
.as_ref()
.ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?;
let narrations = event_narrator::list_event_narrations(pool, &camera_id)
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
Ok(Json(CameraNarrationsResponse {
summary: format!("Atrastas {} narration ieraksti.", narrations.len()),
camera_id,
narrations,
}))
}
pub async fn list_gpu_jobs(
State(state): State<AppState>,
headers: HeaderMap,
) -> Result<Json<GpuJobsResponse>, (StatusCode, Json<serde_json::Value>)> {
if state.settings.camera_auth_required {
let _ = resolve_camera_principal(&state, &headers).await?.ok_or_else(|| {
error_response(StatusCode::UNAUTHORIZED, "Norādi x-user-id vai x-api-key.")
})?;
}
let Some(scheduler) = &state.gpu_scheduler else {
return Ok(Json(GpuJobsResponse {
summary: "GPU scheduler nav ieslēgts.".to_string(),
jobs: Vec::new(),
}));
};
let jobs = scheduler
.list_jobs(100)
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
Ok(Json(GpuJobsResponse {
summary: format!("Atrasti {} GPU jobi.", jobs.len()),
jobs,
}))
}
pub async fn list_gpu_resources(
State(state): State<AppState>,
headers: HeaderMap,
) -> Result<Json<GpuResourcesResponse>, (StatusCode, Json<serde_json::Value>)> {
if state.settings.camera_auth_required {
let _ = resolve_camera_principal(&state, &headers).await?.ok_or_else(|| {
error_response(StatusCode::UNAUTHORIZED, "Norādi x-user-id vai x-api-key.")
})?;
}
let Some(scheduler) = &state.gpu_scheduler else {
return Ok(Json(GpuResourcesResponse {
summary: "GPU scheduler nav ieslēgts.".to_string(),
resources: Vec::new(),
}));
};
let resources = scheduler
.list_resources()
.await
.map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?;
Ok(Json(GpuResourcesResponse {
summary: format!("Atrasti {} GPU resursi.", resources.len()),
resources,
}))
}
pub async fn live_camera_ws(
State(state): State<AppState>,
headers: HeaderMap,
Path(camera_id): Path<String>,
Query(query): Query<WsAuthQuery>,
ws: WebSocketUpgrade,
) -> Response {
let mut token_principal = None;
if state.settings.app_auth_required {
let Some(token) = query.token.as_deref() else {
return StatusCode::UNAUTHORIZED.into_response();
};
match jwt_auth::verify_access_token(&state.settings, token) {
Ok(claims) => {
token_principal = Some(CameraPrincipal {
principal_type: "user".to_string(),
principal_id: claims.sub,
});
}
Err(_) => return StatusCode::UNAUTHORIZED.into_response(),
}
}
if state.settings.camera_auth_required {
let header_principal = resolve_camera_principal(&state, &headers).await.ok().flatten();
let principal = header_principal.as_ref().or(token_principal.as_ref());
let allowed = camera_auth::ensure_access(
state.postgres.as_ref(),
&camera_id,
principal,
"view",
true,
)
.await
.unwrap_or(false);
if !allowed {
return StatusCode::FORBIDDEN.into_response();
}
}
ws.on_upgrade(move |socket| handle_live_socket(socket, camera_id, state))
}
async fn handle_live_socket(socket: WebSocket, camera_id: String, state: AppState) {
let (mut tx, mut rx) = socket.split();
let bridge = PythonBridge::new();
let state_endpoint = format!("vision/live/{}/state", camera_id);
match bridge.get::<LiveCameraResponsePayload>(&state_endpoint).await {
Ok(result) => {
send_live_message(
&mut tx,
&LiveServerMessage::Connected {
camera: map_live_camera(result.camera),
},
)
.await;
}
Err(error) => {
send_live_message(
&mut tx,
&LiveServerMessage::Error {
message: error.to_string(),
},
)
.await;
return;
}
}
while let Some(message) = rx.next().await {
match message {
Ok(Message::Text(text)) => match serde_json::from_str::<LiveClientMessage>(&text) {
Ok(LiveClientMessage::Ping) => {
send_live_message(&mut tx, &LiveServerMessage::Pong).await;
}
Ok(LiveClientMessage::Frame {
image_base64,
frame_index,
timestamp_ms,
}) => {
let payload = serde_json::json!({
"camera_id": camera_id.clone(),
"image_base64": image_base64,
"frame_index": frame_index,
"timestamp_ms": timestamp_ms,
});
match bridge
.call::<LiveFrameResponsePayload>("vision/live/frame", &payload)
.await
{
Ok(result) => {
let summary = result.summary;
let camera = map_live_camera(result.camera);
let events = result
.events
.into_iter()
.map(map_live_event)
.collect::<Vec<_>>();
persist_live_state(&state, &camera, &events).await;
send_live_message(
&mut tx,
&LiveServerMessage::Analysis {
summary,
camera,
events,
},
)
.await;
}
Err(error) => {
send_live_message(
&mut tx,
&LiveServerMessage::Error {
message: error.to_string(),
},
)
.await;
}
}
}
Err(error) => {
send_live_message(
&mut tx,
&LiveServerMessage::Error {
message: format!("Nederīgs live ziņojums: {}", error),
},
)
.await;
}
},
Ok(Message::Close(_)) => break,
Err(error) => {
send_live_message(
&mut tx,
&LiveServerMessage::Error {
message: format!("WebSocket kļūda: {}", error),
},
)
.await;
break;
}
_ => {}
}
}
}