use axum::{ extract::{ Query, State, ws::{Message, WebSocket, WebSocketUpgrade}, Path, }, http::{HeaderMap, StatusCode}, response::{IntoResponse, Response}, Json, }; use futures_util::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use crate::{ app_state::AppState, camera_auth::{self, CameraPrincipal}, event_bus::EventEnvelope, event_narrator, jwt_auth, inference::python_bridge::PythonBridge, onvif_discovery::{self, OnvifCredentials}, snapshot_store, vision_runtime, vision_store::{self, VisionDashboardData}, }; use crate::inference::response_validator::{ FrameAnalysisPayload, LiveCameraCatalogPayload, LiveCameraHealthPayload, LiveCameraResponsePayload, LiveCameraSessionPayload, LiveCameraResolutionPayload, LiveEventPayload, LiveEventsResponsePayload, LiveFrameResponsePayload, LiveSnapshotPayload, OcrBlockPayload, SceneSegmentPayload, SegmentationMaskPayload, TrackedObjectPayload, VisionActionPayload, VisionAnalysisPayload, VisionDetectionPayload, VisionPosePayload, }; #[derive(Deserialize)] pub struct VisionImageRequest { pub image_url: Option, pub image_base64: Option, pub max_detections: Option, pub confidence_threshold: Option, } #[derive(Deserialize)] pub struct VisionFrameSequenceRequest { pub frames_base64: Vec, pub max_detections: Option, pub confidence_threshold: Option, } #[derive(Serialize, Clone)] pub struct VisionBoundingBox { pub x: f32, pub y: f32, pub width: f32, pub height: f32, } #[derive(Serialize)] pub struct VisionDetection { pub label: String, pub confidence: f32, pub bbox: VisionBoundingBox, } #[derive(Serialize)] pub struct VisionAnalyzeResponse { pub summary: String, pub detections: Vec, pub width: u32, pub height: u32, pub model: String, pub fallback_used: bool, } #[derive(Serialize)] pub struct OcrBlock { pub text: String, pub confidence: f32, pub bbox: VisionBoundingBox, pub language: String, } #[derive(Serialize)] pub struct VisionOcrResponse { pub summary: String, pub results: Vec, pub width: u32, pub height: u32, pub model: String, pub fallback_used: bool, } #[derive(Serialize)] pub struct PoseKeypoint { pub name: String, pub x: f32, pub y: f32, pub confidence: f32, } #[derive(Serialize)] pub struct PoseConnection { pub start: String, pub end: String, } #[derive(Serialize)] pub struct PoseDetection { pub person_id: u32, pub confidence: f32, pub bbox: VisionBoundingBox, pub keypoints: Vec, pub connections: Vec, } #[derive(Serialize)] pub struct VisionPoseResponse { pub summary: String, pub poses: Vec, pub width: u32, pub height: u32, pub model: String, pub fallback_used: bool, } #[derive(Serialize)] pub struct SegmentationMask { pub label: String, pub confidence: f32, pub mask_data_url: String, pub bbox: VisionBoundingBox, pub area_pixels: u32, } #[derive(Serialize)] pub struct VisionSegmentationResponse { pub summary: String, pub masks: Vec, pub width: u32, pub height: u32, pub model: String, pub fallback_used: bool, } #[derive(Serialize)] pub struct ActionPrediction { pub action: String, pub confidence: f32, pub subject_label: String, pub rationale: String, } #[derive(Serialize)] pub struct VisionActionResponse { pub summary: String, pub actions: Vec, pub width: u32, pub height: u32, pub model: String, pub fallback_used: bool, } #[derive(Serialize, Clone)] pub struct TrackObservation { pub frame_index: u32, pub confidence: f32, pub bbox: VisionBoundingBox, } #[derive(Serialize, Clone)] pub struct TrackedObject { pub track_id: u32, pub label: String, pub average_confidence: f32, pub observations: Vec, } #[derive(Serialize)] pub struct VisionTrackingResponse { pub summary: String, pub tracks: Vec, pub frame_count: u32, pub model: String, pub fallback_used: bool, } #[derive(Serialize)] pub struct FrameAnalysis { pub frame_index: u32, pub summary: String, pub detections: Vec, pub dominant_labels: Vec, pub brightness: f32, } #[derive(Serialize)] pub struct VisionFrameAnalysisResponse { pub summary: String, pub frames: Vec, pub frame_count: u32, pub model: String, pub fallback_used: bool, } #[derive(Serialize, Clone)] pub struct SceneSegment { pub scene_index: u32, pub start_frame: u32, pub end_frame: u32, pub summary: String, pub dominant_labels: Vec, pub average_brightness: f32, } #[derive(Serialize)] pub struct VisionSceneTimelineResponse { pub summary: String, pub scenes: Vec, pub frame_count: u32, pub model: String, pub fallback_used: bool, } #[derive(Deserialize, Serialize, Clone)] pub struct LiveCameraConnectRequest { pub camera_id: Option, pub source_type: String, pub transport: String, pub url: Option, pub device_id: Option, #[serde(default)] pub auth: serde_json::Value, pub resolution: Option, pub fps: Option, #[serde(default)] pub enabled_pipelines: Vec, pub detection_stride: Option, pub ocr_interval: Option, pub fps_budget: Option, #[serde(default)] pub roi_zones: Vec, #[serde(default)] pub alert_rules: Vec, } #[derive(Deserialize, Serialize, Clone)] pub struct LiveSessionCommandRequest { pub camera_id: String, pub enabled_pipelines: Option>, pub detection_stride: Option, pub ocr_interval: Option, pub fps_budget: Option, } #[derive(Deserialize, Serialize, Clone)] pub struct LiveFrameRequest { pub camera_id: String, pub image_base64: String, pub frame_index: Option, pub timestamp_ms: Option, } #[derive(Deserialize, Serialize, Clone)] pub struct LiveCameraResolution { pub width: u32, pub height: u32, } #[derive(Serialize, Clone)] pub struct LiveCameraHealth { pub connected: bool, pub analysis_active: bool, pub reconnect_attempts: u32, pub dropped_frames: u32, pub events_emitted: u32, pub last_frame_at: Option, pub last_event_at: Option, pub last_error: Option, pub ingest_mode: String, } #[derive(Serialize, Clone)] pub struct LiveEvent { pub event_id: String, pub camera_id: String, #[serde(rename = "type")] pub event_type: String, pub severity: String, pub timestamp: String, pub summary: String, pub payload: serde_json::Value, } #[derive(Serialize, Clone)] pub struct LiveCameraSession { pub camera_id: String, pub source_type: String, pub transport: String, pub url: Option, pub device_id: Option, pub auth: serde_json::Value, pub resolution: LiveCameraResolution, pub fps: f32, pub status: String, pub health: LiveCameraHealth, pub enabled_pipelines: Vec, pub detection_stride: u32, pub ocr_interval: u32, pub fps_budget: f32, pub roi_zones: Vec, pub alert_rules: Vec, pub latest_snapshot: Option, pub latest_result: serde_json::Value, pub recent_events: Vec, pub timeline: Vec, pub tracks: Vec, } #[derive(Serialize)] pub struct LiveCameraCatalogResponse { pub summary: String, pub cameras: Vec, } #[derive(Serialize)] pub struct LiveCameraResponse { pub summary: String, pub camera: LiveCameraSession, } #[derive(Serialize)] pub struct LiveSnapshotResponse { pub summary: String, pub camera_id: String, pub snapshot_data_url: Option, } #[derive(Serialize)] pub struct LiveEventsResponse { pub summary: String, pub camera_id: String, pub events: Vec, } #[derive(Serialize)] pub struct LiveFrameResponse { pub summary: String, pub camera: LiveCameraSession, pub events: Vec, } #[derive(Deserialize, Serialize, Clone)] pub struct LiveCameraConfigRequest { #[serde(default)] pub roi_zones: Vec, #[serde(default)] pub alert_rules: Vec, #[serde(default)] pub enabled_pipelines: Vec, pub fps_budget: Option, } #[derive(Deserialize, Default)] pub struct VisionDashboardQuery { pub search: Option, } #[derive(Serialize)] pub struct VisionDashboardResponse { pub summary: String, pub cameras: Vec, pub recent_events: Vec, pub recent_scenes: Vec, pub reid_matches: Vec, } #[derive(Deserialize)] pub struct OnvifDiscoverRequest { pub username: Option, pub password: Option, pub timeout_ms: Option, } #[derive(Serialize)] pub struct OnvifDiscoverResponse { pub summary: String, pub devices: Vec, } #[derive(Deserialize)] pub struct TimelineSnapshotsQuery { pub start_frame: Option, pub end_frame: Option, pub limit: Option, } #[derive(Serialize)] pub struct TimelineSnapshotsResponse { pub summary: String, pub camera_id: String, pub snapshots: Vec, } #[derive(Deserialize)] pub struct CameraAccessGrantRequest { pub principal_type: String, pub principal_id: String, pub permission: String, } #[derive(Serialize)] pub struct CameraAccessResponse { pub summary: String, pub camera_id: String, pub entries: Vec, } #[derive(Serialize)] pub struct CameraNarrationsResponse { pub summary: String, pub camera_id: String, pub narrations: Vec, } #[derive(Deserialize)] pub struct RotateCredentialsRequest { pub new_key: String, } #[derive(Deserialize)] pub struct WsAuthQuery { pub token: Option, } #[derive(Serialize)] pub struct RotateCredentialsResponse { pub summary: String, pub rotated_credentials: u64, pub active_key_id: String, } #[derive(Serialize)] pub struct GpuJobsResponse { pub summary: String, pub jobs: Vec, } #[derive(Serialize)] pub struct GpuResourcesResponse { pub summary: String, pub resources: Vec, } fn error_response(status: StatusCode, message: &str) -> (StatusCode, Json) { (status, Json(serde_json::json!({ "error": message }))) } fn validate_image_request( req: &VisionImageRequest, ) -> Result<(), (StatusCode, Json)> { let has_url = req .image_url .as_deref() .map(str::trim) .is_some_and(|value| !value.is_empty()); let has_base64 = req .image_base64 .as_deref() .map(str::trim) .is_some_and(|value| !value.is_empty()); if has_url == has_base64 { return Err(error_response( StatusCode::BAD_REQUEST, "Norādi tieši vienu no image_url vai image_base64.", )); } if matches!(req.max_detections, Some(0)) { return Err(error_response( StatusCode::BAD_REQUEST, "max_detections jābūt lielākam par 0.", )); } if req .confidence_threshold .is_some_and(|value| !value.is_finite() || !(0.0..=1.0).contains(&value)) { return Err(error_response( StatusCode::BAD_REQUEST, "confidence_threshold jābūt diapazonā no 0 līdz 1.", )); } Ok(()) } fn validate_frame_request( req: &VisionFrameSequenceRequest, ) -> Result<(), (StatusCode, Json)> { if req.frames_base64.is_empty() { return Err(error_response( StatusCode::BAD_REQUEST, "frames_base64 nedrīkst būt tukšs.", )); } if req .frames_base64 .iter() .any(|frame| frame.trim().is_empty()) { return Err(error_response( StatusCode::BAD_REQUEST, "frames_base64 satur tukšu kadru.", )); } if matches!(req.max_detections, Some(0)) { return Err(error_response( StatusCode::BAD_REQUEST, "max_detections jābūt lielākam par 0.", )); } if req .confidence_threshold .is_some_and(|value| !value.is_finite() || !(0.0..=1.0).contains(&value)) { return Err(error_response( StatusCode::BAD_REQUEST, "confidence_threshold jābūt diapazonā no 0 līdz 1.", )); } Ok(()) } fn validate_live_connect_request( req: &LiveCameraConnectRequest, ) -> Result<(), (StatusCode, Json)> { if req.source_type.trim().is_empty() { return Err(error_response( StatusCode::BAD_REQUEST, "source_type nedrīkst būt tukšs.", )); } if req.transport.trim().is_empty() { return Err(error_response( StatusCode::BAD_REQUEST, "transport nedrīkst būt tukšs.", )); } let has_url = req.url.as_deref().map(str::trim).is_some_and(|value| !value.is_empty()); let has_device = req .device_id .as_deref() .map(str::trim) .is_some_and(|value| !value.is_empty()); if !has_url && !has_device { return Err(error_response( StatusCode::BAD_REQUEST, "Norādi url vai device_id kamerai.", )); } if req.fps.is_some_and(|value| !value.is_finite() || value <= 0.0) { return Err(error_response( StatusCode::BAD_REQUEST, "fps jābūt pozitīvam skaitlim.", )); } if req .fps_budget .is_some_and(|value| !value.is_finite() || value <= 0.0) { return Err(error_response( StatusCode::BAD_REQUEST, "fps_budget jābūt pozitīvam skaitlim.", )); } Ok(()) } fn validate_live_command_request( req: &LiveSessionCommandRequest, ) -> Result<(), (StatusCode, Json)> { if req.camera_id.trim().is_empty() { return Err(error_response( StatusCode::BAD_REQUEST, "camera_id nedrīkst būt tukšs.", )); } if req .fps_budget .is_some_and(|value| !value.is_finite() || value <= 0.0) { return Err(error_response( StatusCode::BAD_REQUEST, "fps_budget jābūt pozitīvam skaitlim.", )); } Ok(()) } fn validate_live_config_request( req: &LiveCameraConfigRequest, ) -> Result<(), (StatusCode, Json)> { if req .fps_budget .is_some_and(|value| !value.is_finite() || value <= 0.0) { return Err(error_response( StatusCode::BAD_REQUEST, "fps_budget jābūt pozitīvam skaitlim.", )); } Ok(()) } fn map_bbox( bbox: crate::inference::response_validator::VisionBoundingBoxPayload, ) -> VisionBoundingBox { VisionBoundingBox { x: bbox.x, y: bbox.y, width: bbox.width, height: bbox.height, } } fn map_detection(item: VisionDetectionPayload) -> VisionDetection { VisionDetection { label: item.label, confidence: item.confidence, bbox: map_bbox(item.bbox), } } fn map_ocr_block(item: OcrBlockPayload) -> OcrBlock { OcrBlock { text: item.text, confidence: item.confidence, bbox: map_bbox(item.bbox), language: item.language, } } fn map_pose(item: VisionPosePayload) -> PoseDetection { PoseDetection { person_id: item.person_id, confidence: item.confidence, bbox: map_bbox(item.bbox), keypoints: item .keypoints .into_iter() .map(|point| PoseKeypoint { name: point.name, x: point.x, y: point.y, confidence: point.confidence, }) .collect(), connections: item .connections .into_iter() .map(|connection| PoseConnection { start: connection.start, end: connection.end, }) .collect(), } } fn map_mask(item: SegmentationMaskPayload) -> SegmentationMask { SegmentationMask { label: item.label, confidence: item.confidence, mask_data_url: item.mask_data_url, bbox: map_bbox(item.bbox), area_pixels: item.area_pixels, } } fn map_action(item: VisionActionPayload) -> ActionPrediction { ActionPrediction { action: item.action, confidence: item.confidence, subject_label: item.subject_label, rationale: item.rationale, } } fn map_track(item: TrackedObjectPayload) -> TrackedObject { TrackedObject { track_id: item.track_id, label: item.label, average_confidence: item.average_confidence, observations: item .observations .into_iter() .map(|observation| TrackObservation { frame_index: observation.frame_index, confidence: observation.confidence, bbox: map_bbox(observation.bbox), }) .collect(), } } fn map_frame(item: FrameAnalysisPayload) -> FrameAnalysis { FrameAnalysis { frame_index: item.frame_index, summary: item.summary, detections: item.detections.into_iter().map(map_detection).collect(), dominant_labels: item.dominant_labels, brightness: item.brightness, } } fn map_scene(item: SceneSegmentPayload) -> SceneSegment { SceneSegment { scene_index: item.scene_index, start_frame: item.start_frame, end_frame: item.end_frame, summary: item.summary, dominant_labels: item.dominant_labels, average_brightness: item.average_brightness, } } fn map_live_resolution(item: LiveCameraResolutionPayload) -> LiveCameraResolution { LiveCameraResolution { width: item.width, height: item.height, } } fn map_live_health(item: LiveCameraHealthPayload) -> LiveCameraHealth { LiveCameraHealth { connected: item.connected, analysis_active: item.analysis_active, reconnect_attempts: item.reconnect_attempts, dropped_frames: item.dropped_frames, events_emitted: item.events_emitted, last_frame_at: item.last_frame_at, last_event_at: item.last_event_at, last_error: item.last_error, ingest_mode: item.ingest_mode, } } pub(crate) fn map_live_event(item: LiveEventPayload) -> LiveEvent { LiveEvent { event_id: item.event_id, camera_id: item.camera_id, event_type: item.event_type, severity: item.severity, timestamp: item.timestamp, summary: item.summary, payload: item.payload, } } pub(crate) fn map_live_camera(item: LiveCameraSessionPayload) -> LiveCameraSession { LiveCameraSession { camera_id: item.camera_id, source_type: item.source_type, transport: item.transport, url: item.url, device_id: item.device_id, auth: item.auth, resolution: map_live_resolution(item.resolution), fps: item.fps, status: item.status, health: map_live_health(item.health), enabled_pipelines: item.enabled_pipelines, detection_stride: item.detection_stride, ocr_interval: item.ocr_interval, fps_budget: item.fps_budget, roi_zones: item.roi_zones, alert_rules: item.alert_rules, latest_snapshot: item.latest_snapshot, latest_result: item.latest_result, recent_events: item.recent_events.into_iter().map(map_live_event).collect(), timeline: item.timeline.into_iter().map(map_scene).collect(), tracks: item.tracks.into_iter().map(map_track).collect(), } } #[derive(Deserialize)] #[serde(tag = "type")] enum LiveClientMessage { #[serde(rename = "frame")] Frame { image_base64: String, frame_index: Option, timestamp_ms: Option, }, #[serde(rename = "ping")] Ping, } #[derive(Serialize)] #[serde(tag = "type")] enum LiveServerMessage { #[serde(rename = "connected")] Connected { camera: LiveCameraSession }, #[serde(rename = "analysis")] Analysis { summary: String, camera: LiveCameraSession, events: Vec, }, #[serde(rename = "pong")] Pong, #[serde(rename = "error")] Error { message: String }, } async fn send_live_message( tx: &mut futures_util::stream::SplitSink, message: &LiveServerMessage, ) { let payload = serde_json::to_string(message).unwrap_or_default(); let _ = tx.send(Message::Text(payload)).await; } async fn persist_live_state(state: &AppState, camera: &LiveCameraSession, events: &[LiveEvent]) { if let Some(pool) = &state.postgres { if let Err(error) = vision_store::persist_session(pool, camera, events).await { tracing::warn!("Neizdevās persistēt vision live state: {}", error); } } for event in events { let _ = state.event_bus.publish(EventEnvelope { event_kind: "camera_event".to_string(), camera_id: event.camera_id.clone(), event_id: Some(event.event_id.clone()), event_type: event.event_type.clone(), severity: event.severity.clone(), summary: event.summary.clone(), narration_url: None, payload: event.payload.clone(), created_at: event.timestamp.clone(), }); } } async fn resolve_camera_principal( state: &AppState, headers: &HeaderMap, ) -> Result, (StatusCode, Json)> { camera_auth::resolve_principal(headers, state.postgres.as_ref()) .await .map(|principal| { if principal.is_some() { return principal; } headers .get(axum::http::header::AUTHORIZATION) .and_then(|value| value.to_str().ok()) .and_then(|value| value.strip_prefix("Bearer ")) .and_then(|token| jwt_auth::verify_access_token(&state.settings, token).ok()) .map(|claims| CameraPrincipal { principal_type: "user".to_string(), principal_id: claims.sub, }) }) .map_err(|error| error_response(StatusCode::UNAUTHORIZED, &error.to_string())) } async fn ensure_camera_permission( state: &AppState, headers: &HeaderMap, camera_id: &str, permission: &str, ) -> Result, (StatusCode, Json)> { let principal = resolve_camera_principal(state, headers).await?; let allowed = camera_auth::ensure_access( state.postgres.as_ref(), camera_id, principal.as_ref(), permission, state.settings.camera_auth_required, ) .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; if !allowed { return Err(error_response( StatusCode::FORBIDDEN, "Trūkst piekļuves tiesību šai kamerai.", )); } Ok(principal) } pub async fn analyze_image( Json(req): Json, ) -> Result, (StatusCode, Json)> { validate_image_request(&req)?; let bridge = PythonBridge::new(); let payload = serde_json::json!({ "image_url": req.image_url, "image_base64": req.image_base64, "max_detections": req.max_detections.unwrap_or(10), "confidence_threshold": req.confidence_threshold.unwrap_or(0.25), }); match bridge .call::("vision/analyze", &payload) .await { Ok(result) => Ok(Json(VisionAnalyzeResponse { summary: result.summary, detections: result.detections.into_iter().map(map_detection).collect(), width: result.width, height: result.height, model: result.model, fallback_used: result.fallback_used, })), Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn ocr_image( Json(req): Json, ) -> Result, (StatusCode, Json)> { validate_image_request(&req)?; let bridge = PythonBridge::new(); let payload = serde_json::json!({ "image_url": req.image_url, "image_base64": req.image_base64, "max_detections": req.max_detections.unwrap_or(10), "confidence_threshold": req.confidence_threshold.unwrap_or(0.25), }); match bridge .call::("vision/ocr", &payload) .await { Ok(result) => Ok(Json(VisionOcrResponse { summary: result.summary, results: result.results.into_iter().map(map_ocr_block).collect(), width: result.width, height: result.height, model: result.model, fallback_used: result.fallback_used, })), Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn estimate_pose( Json(req): Json, ) -> Result, (StatusCode, Json)> { validate_image_request(&req)?; let bridge = PythonBridge::new(); let payload = serde_json::json!({ "image_url": req.image_url, "image_base64": req.image_base64, "max_detections": req.max_detections.unwrap_or(10), "confidence_threshold": req.confidence_threshold.unwrap_or(0.25), }); match bridge .call::( "vision/pose-estimate", &payload, ) .await { Ok(result) => Ok(Json(VisionPoseResponse { summary: result.summary, poses: result.poses.into_iter().map(map_pose).collect(), width: result.width, height: result.height, model: result.model, fallback_used: result.fallback_used, })), Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn segment_image( Json(req): Json, ) -> Result, (StatusCode, Json)> { validate_image_request(&req)?; let bridge = PythonBridge::new(); let payload = serde_json::json!({ "image_url": req.image_url, "image_base64": req.image_base64, "max_detections": req.max_detections.unwrap_or(10), "confidence_threshold": req.confidence_threshold.unwrap_or(0.25), }); match bridge .call::( "vision/segment", &payload, ) .await { Ok(result) => Ok(Json(VisionSegmentationResponse { summary: result.summary, masks: result.masks.into_iter().map(map_mask).collect(), width: result.width, height: result.height, model: result.model, fallback_used: result.fallback_used, })), Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn recognize_action( Json(req): Json, ) -> Result, (StatusCode, Json)> { validate_image_request(&req)?; let bridge = PythonBridge::new(); let payload = serde_json::json!({ "image_url": req.image_url, "image_base64": req.image_base64, "max_detections": req.max_detections.unwrap_or(10), "confidence_threshold": req.confidence_threshold.unwrap_or(0.25), }); match bridge .call::( "vision/action-recognize", &payload, ) .await { Ok(result) => Ok(Json(VisionActionResponse { summary: result.summary, actions: result.actions.into_iter().map(map_action).collect(), width: result.width, height: result.height, model: result.model, fallback_used: result.fallback_used, })), Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn track_objects( Json(req): Json, ) -> Result, (StatusCode, Json)> { validate_frame_request(&req)?; let bridge = PythonBridge::new(); let payload = serde_json::json!({ "frames_base64": req.frames_base64, "max_detections": req.max_detections.unwrap_or(10), "confidence_threshold": req.confidence_threshold.unwrap_or(0.25), }); match bridge .call::( "vision/tracking", &payload, ) .await { Ok(result) => Ok(Json(VisionTrackingResponse { summary: result.summary, tracks: result.tracks.into_iter().map(map_track).collect(), frame_count: result.frame_count, model: result.model, fallback_used: result.fallback_used, })), Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn analyze_frames( Json(req): Json, ) -> Result, (StatusCode, Json)> { validate_frame_request(&req)?; let bridge = PythonBridge::new(); let payload = serde_json::json!({ "frames_base64": req.frames_base64, "max_detections": req.max_detections.unwrap_or(10), "confidence_threshold": req.confidence_threshold.unwrap_or(0.25), }); match bridge .call::( "vision/frame-analysis", &payload, ) .await { Ok(result) => Ok(Json(VisionFrameAnalysisResponse { summary: result.summary, frames: result.frames.into_iter().map(map_frame).collect(), frame_count: result.frame_count, model: result.model, fallback_used: result.fallback_used, })), Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn scene_timeline( Json(req): Json, ) -> Result, (StatusCode, Json)> { validate_frame_request(&req)?; let bridge = PythonBridge::new(); let payload = serde_json::json!({ "frames_base64": req.frames_base64, "max_detections": req.max_detections.unwrap_or(10), "confidence_threshold": req.confidence_threshold.unwrap_or(0.25), }); match bridge .call::( "vision/scene-timeline", &payload, ) .await { Ok(result) => Ok(Json(VisionSceneTimelineResponse { summary: result.summary, scenes: result.scenes.into_iter().map(map_scene).collect(), frame_count: result.frame_count, model: result.model, fallback_used: result.fallback_used, })), Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn list_live_cameras( State(state): State, headers: HeaderMap, ) -> Result, (StatusCode, Json)> { if state.settings.camera_auth_required { let _ = resolve_camera_principal(&state, &headers).await?.ok_or_else(|| { error_response(StatusCode::UNAUTHORIZED, "Norādi x-user-id vai x-api-key.") })?; } let bridge = PythonBridge::new(); match bridge.get::("vision/live/cameras").await { Ok(result) => { let cameras = result .cameras .into_iter() .map(map_live_camera) .collect::>(); for camera in &cameras { persist_live_state(&state, camera, &camera.recent_events).await; } Ok(Json(LiveCameraCatalogResponse { summary: result.summary, cameras, })) } Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn connect_live_camera( State(state): State, headers: HeaderMap, Json(req): Json, ) -> Result, (StatusCode, Json)> { validate_live_connect_request(&req)?; let principal = resolve_camera_principal(&state, &headers).await?; if state.settings.camera_auth_required && principal.is_none() { return Err(error_response( StatusCode::UNAUTHORIZED, "Norādi x-user-id vai x-api-key, lai pieslēgtu kameru.", )); } let bridge = PythonBridge::new(); let payload = serde_json::to_value(&req).unwrap_or_else(|_| serde_json::json!({})); match bridge .call::("vision/live/connect", &payload) .await { Ok(result) => { let camera = map_live_camera(result.camera); persist_live_state(&state, &camera, &camera.recent_events).await; if let (Some(pool), Some(principal)) = (&state.postgres, principal.as_ref()) { for permission in ["view", "configure", "delete"] { let _ = camera_auth::grant_access( pool, &camera.camera_id, &principal.principal_type, &principal.principal_id, permission, ) .await; } } if let (Some(pool), Some(crypto)) = (&state.postgres, &state.credential_crypto) { let username = req.auth.get("username").and_then(|value| value.as_str()); let password = req.auth.get("password").and_then(|value| value.as_str()); let bearer = req.auth.get("bearer_token").and_then(|value| value.as_str()); if username.is_some() || password.is_some() || bearer.is_some() { let custom_headers = req .auth .get("headers") .cloned() .unwrap_or_else(|| serde_json::json!({})); let _ = camera_auth::upsert_credentials( pool, crypto, &camera.camera_id, username, password, bearer, &custom_headers, principal.as_ref(), ) .await; } } if camera.source_type == "onvif" { if let Some(pool) = &state.postgres { let credentials = OnvifCredentials { username: req .auth .get("username") .and_then(|value| value.as_str()) .map(ToString::to_string), password: req .auth .get("password") .and_then(|value| value.as_str()) .map(ToString::to_string), }; if let Some(url) = camera.url.as_deref() { if let Ok(media_service_url) = onvif_discovery::discover(&state.http_client, credentials.clone(), 750).await { let metadata = serde_json::json!({ "onvif": { "discovered_devices": media_service_url, "connect_url": url, } }); let _ = vision_store::update_camera_metadata(pool, &camera.camera_id, &metadata).await; } } } } Ok(Json(LiveCameraResponse { summary: result.summary, camera, })) } Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn start_live_camera( State(state): State, headers: HeaderMap, Path(camera_id): Path, Json(mut req): Json, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?; req.camera_id = camera_id; validate_live_command_request(&req)?; let bridge = PythonBridge::new(); let payload = serde_json::to_value(&req).unwrap_or_else(|_| serde_json::json!({})); match bridge .call::("vision/live/start", &payload) .await { Ok(result) => { let camera = map_live_camera(result.camera); persist_live_state(&state, &camera, &camera.recent_events).await; if vision_runtime::should_run_background_worker(&LiveCameraConnectRequest { camera_id: Some(camera.camera_id.clone()), source_type: camera.source_type.clone(), transport: camera.transport.clone(), url: camera.url.clone(), device_id: camera.device_id.clone(), auth: camera.auth.clone(), resolution: Some(camera.resolution.clone()), fps: Some(camera.fps), enabled_pipelines: camera.enabled_pipelines.clone(), detection_stride: Some(camera.detection_stride), ocr_interval: Some(camera.ocr_interval), fps_budget: Some(camera.fps_budget), roi_zones: camera.roi_zones.clone(), alert_rules: camera.alert_rules.clone(), }) { if let Err(error) = state .vision_runtime .start_worker(state.clone(), camera.clone()) .await { tracing::warn!("Neizdevās startēt ingest worker: {}", error); } } Ok(Json(LiveCameraResponse { summary: result.summary, camera, })) } Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn pause_live_camera( State(state): State, headers: HeaderMap, Path(camera_id): Path, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?; let req = LiveSessionCommandRequest { camera_id, enabled_pipelines: None, detection_stride: None, ocr_interval: None, fps_budget: None, }; let bridge = PythonBridge::new(); let payload = serde_json::to_value(&req).unwrap_or_else(|_| serde_json::json!({})); match bridge .call::("vision/live/pause", &payload) .await { Ok(result) => { let camera = map_live_camera(result.camera); state.vision_runtime.stop_worker(&camera.camera_id).await; persist_live_state(&state, &camera, &camera.recent_events).await; Ok(Json(LiveCameraResponse { summary: result.summary, camera, })) } Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn stop_live_camera( State(state): State, headers: HeaderMap, Path(camera_id): Path, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "delete").await?; let req = LiveSessionCommandRequest { camera_id, enabled_pipelines: None, detection_stride: None, ocr_interval: None, fps_budget: None, }; let bridge = PythonBridge::new(); let payload = serde_json::to_value(&req).unwrap_or_else(|_| serde_json::json!({})); match bridge .call::("vision/live/stop", &payload) .await { Ok(result) => { let camera = map_live_camera(result.camera); state.vision_runtime.stop_worker(&camera.camera_id).await; persist_live_state(&state, &camera, &camera.recent_events).await; Ok(Json(LiveCameraResponse { summary: result.summary, camera, })) } Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn live_camera_snapshot( State(state): State, headers: HeaderMap, Path(camera_id): Path, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?; let bridge = PythonBridge::new(); let endpoint = format!("vision/live/{}/snapshot", camera_id); match bridge.get::(&endpoint).await { Ok(result) => { if let Some(pool) = &state.postgres { let _ = sqlx::query( "UPDATE vision_cameras SET latest_snapshot_url = $2 WHERE camera_id = $1", ) .bind(&result.camera_id) .bind(&result.snapshot_data_url) .execute(pool) .await; } Ok(Json(LiveSnapshotResponse { summary: result.summary, camera_id: result.camera_id, snapshot_data_url: result.snapshot_data_url, })) } Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn live_camera_state( State(state): State, headers: HeaderMap, Path(camera_id): Path, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?; let bridge = PythonBridge::new(); let endpoint = format!("vision/live/{}/state", camera_id); match bridge.get::(&endpoint).await { Ok(result) => { let camera = map_live_camera(result.camera); persist_live_state(&state, &camera, &camera.recent_events).await; Ok(Json(LiveCameraResponse { summary: result.summary, camera, })) } Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn live_camera_events( State(state): State, headers: HeaderMap, Path(camera_id): Path, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?; let bridge = PythonBridge::new(); let endpoint = format!("vision/live/{}/events", camera_id); match bridge.get::(&endpoint).await { Ok(result) => { let events = result.events.into_iter().map(map_live_event).collect::>(); if let Some(pool) = &state.postgres { let _ = vision_store::insert_events(pool, &result.camera_id, &events).await; let _ = vision_store::insert_reid_matches(pool, &result.camera_id, &events).await; } Ok(Json(LiveEventsResponse { summary: result.summary, camera_id: result.camera_id, events, })) } Err(e) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &e.to_string(), )), } } pub async fn update_live_camera_config( State(state): State, headers: HeaderMap, Path(camera_id): Path, Json(req): Json, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?; validate_live_config_request(&req)?; let bridge = PythonBridge::new(); let payload = serde_json::json!({ "camera_id": camera_id, "roi_zones": req.roi_zones, "alert_rules": req.alert_rules, "enabled_pipelines": req.enabled_pipelines, "fps_budget": req.fps_budget, }); match bridge .call::("vision/live/config", &payload) .await { Ok(result) => { let camera = map_live_camera(result.camera); persist_live_state(&state, &camera, &camera.recent_events).await; if let Some(pool) = &state.postgres { let alert_rules = serde_json::Value::Array( camera .alert_rules .iter() .cloned() .map(serde_json::Value::String) .collect(), ); let roi_zones = serde_json::Value::Array(camera.roi_zones.clone()); let _ = vision_store::update_camera_config( pool, &camera.camera_id, &roi_zones, &alert_rules, &camera.enabled_pipelines, camera.fps_budget, ) .await; } Ok(Json(LiveCameraResponse { summary: result.summary, camera, })) } Err(error) => Err(error_response( StatusCode::INTERNAL_SERVER_ERROR, &error.to_string(), )), } } pub async fn vision_dashboard( State(state): State, headers: HeaderMap, Query(query): Query, ) -> Result, (StatusCode, Json)> { if state.settings.camera_auth_required { let _ = resolve_camera_principal(&state, &headers).await?.ok_or_else(|| { error_response(StatusCode::UNAUTHORIZED, "Norādi x-user-id vai x-api-key.") })?; } let VisionDashboardData { cameras, recent_events, recent_scenes, reid_matches, } = match &state.postgres { Some(pool) => vision_store::load_dashboard(pool, query.search.as_deref()) .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?, None => VisionDashboardData { cameras: Vec::new(), recent_events: Vec::new(), recent_scenes: Vec::new(), reid_matches: Vec::new(), }, }; Ok(Json(VisionDashboardResponse { summary: format!("Multi-camera dashboard satur {} kameras.", cameras.len()), cameras, recent_events, recent_scenes, reid_matches, })) } pub async fn discover_onvif_devices( State(state): State, headers: HeaderMap, Json(req): Json, ) -> Result, (StatusCode, Json)> { if state.settings.camera_auth_required { let _ = resolve_camera_principal(&state, &headers).await?.ok_or_else(|| { error_response(StatusCode::UNAUTHORIZED, "Norādi x-user-id vai x-api-key.") })?; } let devices = onvif_discovery::discover( &state.http_client, OnvifCredentials { username: req.username, password: req.password, }, req.timeout_ms.unwrap_or(2_000), ) .await .map_err(|error| error_response(StatusCode::BAD_GATEWAY, &error.to_string()))?; Ok(Json(OnvifDiscoverResponse { summary: format!("Atrastas {} ONVIF ierīces.", devices.len()), devices, })) } pub async fn camera_timeline_snapshots( State(state): State, headers: HeaderMap, Path(camera_id): Path, Query(query): Query, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?; let pool = state .postgres .as_ref() .ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?; let snapshots = snapshot_store::list_snapshots( pool, &camera_id, query.start_frame, query.end_frame, query.limit.unwrap_or(120), ) .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; Ok(Json(TimelineSnapshotsResponse { summary: format!("Atrasti {} snapshot timeline ieraksti.", snapshots.len()), camera_id, snapshots, })) } pub async fn camera_frame_snapshot( State(state): State, headers: HeaderMap, Path((camera_id, frame_index)): Path<(String, i32)>, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?; let pool = state .postgres .as_ref() .ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?; let snapshot = snapshot_store::get_snapshot_by_frame(pool, &camera_id, frame_index) .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; Ok(Json(TimelineSnapshotsResponse { summary: if snapshot.is_some() { "Atrasts snapshot.".to_string() } else { "Snapshot netika atrasts.".to_string() }, camera_id, snapshots: snapshot.into_iter().collect(), })) } pub async fn list_camera_access( State(state): State, headers: HeaderMap, Path(camera_id): Path, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?; let pool = state .postgres .as_ref() .ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?; let entries = camera_auth::list_access(pool, &camera_id) .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; Ok(Json(CameraAccessResponse { summary: format!("Atrastas {} access tiesības.", entries.len()), camera_id, entries, })) } pub async fn rotate_camera_credentials( State(state): State, headers: HeaderMap, Json(req): Json, ) -> Result, (StatusCode, Json)> { let principal = resolve_camera_principal(&state, &headers).await?; let pool = state .postgres .as_ref() .ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?; let crypto = state .credential_crypto .as_ref() .ok_or_else(|| error_response(StatusCode::BAD_REQUEST, "Camera encryption nav konfigurēta."))?; let metadata = crypto .rotate_key(req.new_key) .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; let rotated = camera_auth::rotate_credentials(pool, crypto, principal.as_ref()) .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; Ok(Json(RotateCredentialsResponse { summary: format!("Pāršifrēti {} credential ieraksti.", rotated), rotated_credentials: rotated, active_key_id: metadata.key_id, })) } pub async fn grant_camera_access( State(state): State, headers: HeaderMap, Path(camera_id): Path, Json(req): Json, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?; let pool = state .postgres .as_ref() .ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?; camera_auth::grant_access( pool, &camera_id, &req.principal_type, &req.principal_id, &req.permission, ) .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; list_camera_access(State(state), headers, Path(camera_id)).await } pub async fn revoke_camera_access( State(state): State, headers: HeaderMap, Path((camera_id, access_id)): Path<(String, String)>, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "configure").await?; let pool = state .postgres .as_ref() .ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?; camera_auth::revoke_access(pool, &access_id) .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; list_camera_access(State(state), headers, Path(camera_id)).await } pub async fn generate_camera_event_narration( State(state): State, headers: HeaderMap, Path((camera_id, event_id)): Path<(String, String)>, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?; let narration = event_narrator::narrate_event(&state, &camera_id, &event_id) .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; let _ = state.event_bus.publish(EventEnvelope { event_kind: "narration_ready".to_string(), camera_id: camera_id.clone(), event_id: Some(narration.event_id.clone()), event_type: "narration_ready".to_string(), severity: "info".to_string(), summary: "Kamerai ir pieejams narration audio.".to_string(), narration_url: Some(narration.narration_url.clone()), payload: serde_json::json!({ "language": narration.language }), created_at: chrono::Utc::now().to_rfc3339(), }); Ok(Json(narration)) } pub async fn list_camera_narrations( State(state): State, headers: HeaderMap, Path(camera_id): Path, ) -> Result, (StatusCode, Json)> { let _ = ensure_camera_permission(&state, &headers, &camera_id, "view").await?; let pool = state .postgres .as_ref() .ok_or_else(|| error_response(StatusCode::SERVICE_UNAVAILABLE, "PostgreSQL nav pieejams."))?; let narrations = event_narrator::list_event_narrations(pool, &camera_id) .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; Ok(Json(CameraNarrationsResponse { summary: format!("Atrastas {} narration ieraksti.", narrations.len()), camera_id, narrations, })) } pub async fn list_gpu_jobs( State(state): State, headers: HeaderMap, ) -> Result, (StatusCode, Json)> { if state.settings.camera_auth_required { let _ = resolve_camera_principal(&state, &headers).await?.ok_or_else(|| { error_response(StatusCode::UNAUTHORIZED, "Norādi x-user-id vai x-api-key.") })?; } let Some(scheduler) = &state.gpu_scheduler else { return Ok(Json(GpuJobsResponse { summary: "GPU scheduler nav ieslēgts.".to_string(), jobs: Vec::new(), })); }; let jobs = scheduler .list_jobs(100) .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; Ok(Json(GpuJobsResponse { summary: format!("Atrasti {} GPU jobi.", jobs.len()), jobs, })) } pub async fn list_gpu_resources( State(state): State, headers: HeaderMap, ) -> Result, (StatusCode, Json)> { if state.settings.camera_auth_required { let _ = resolve_camera_principal(&state, &headers).await?.ok_or_else(|| { error_response(StatusCode::UNAUTHORIZED, "Norādi x-user-id vai x-api-key.") })?; } let Some(scheduler) = &state.gpu_scheduler else { return Ok(Json(GpuResourcesResponse { summary: "GPU scheduler nav ieslēgts.".to_string(), resources: Vec::new(), })); }; let resources = scheduler .list_resources() .await .map_err(|error| error_response(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string()))?; Ok(Json(GpuResourcesResponse { summary: format!("Atrasti {} GPU resursi.", resources.len()), resources, })) } pub async fn live_camera_ws( State(state): State, headers: HeaderMap, Path(camera_id): Path, Query(query): Query, ws: WebSocketUpgrade, ) -> Response { let mut token_principal = None; if state.settings.app_auth_required { let Some(token) = query.token.as_deref() else { return StatusCode::UNAUTHORIZED.into_response(); }; match jwt_auth::verify_access_token(&state.settings, token) { Ok(claims) => { token_principal = Some(CameraPrincipal { principal_type: "user".to_string(), principal_id: claims.sub, }); } Err(_) => return StatusCode::UNAUTHORIZED.into_response(), } } if state.settings.camera_auth_required { let header_principal = resolve_camera_principal(&state, &headers).await.ok().flatten(); let principal = header_principal.as_ref().or(token_principal.as_ref()); let allowed = camera_auth::ensure_access( state.postgres.as_ref(), &camera_id, principal, "view", true, ) .await .unwrap_or(false); if !allowed { return StatusCode::FORBIDDEN.into_response(); } } ws.on_upgrade(move |socket| handle_live_socket(socket, camera_id, state)) } async fn handle_live_socket(socket: WebSocket, camera_id: String, state: AppState) { let (mut tx, mut rx) = socket.split(); let bridge = PythonBridge::new(); let state_endpoint = format!("vision/live/{}/state", camera_id); match bridge.get::(&state_endpoint).await { Ok(result) => { send_live_message( &mut tx, &LiveServerMessage::Connected { camera: map_live_camera(result.camera), }, ) .await; } Err(error) => { send_live_message( &mut tx, &LiveServerMessage::Error { message: error.to_string(), }, ) .await; return; } } while let Some(message) = rx.next().await { match message { Ok(Message::Text(text)) => match serde_json::from_str::(&text) { Ok(LiveClientMessage::Ping) => { send_live_message(&mut tx, &LiveServerMessage::Pong).await; } Ok(LiveClientMessage::Frame { image_base64, frame_index, timestamp_ms, }) => { let payload = serde_json::json!({ "camera_id": camera_id.clone(), "image_base64": image_base64, "frame_index": frame_index, "timestamp_ms": timestamp_ms, }); match bridge .call::("vision/live/frame", &payload) .await { Ok(result) => { let summary = result.summary; let camera = map_live_camera(result.camera); let events = result .events .into_iter() .map(map_live_event) .collect::>(); persist_live_state(&state, &camera, &events).await; send_live_message( &mut tx, &LiveServerMessage::Analysis { summary, camera, events, }, ) .await; } Err(error) => { send_live_message( &mut tx, &LiveServerMessage::Error { message: error.to_string(), }, ) .await; } } } Err(error) => { send_live_message( &mut tx, &LiveServerMessage::Error { message: format!("Nederīgs live ziņojums: {}", error), }, ) .await; } }, Ok(Message::Close(_)) => break, Err(error) => { send_live_message( &mut tx, &LiveServerMessage::Error { message: format!("WebSocket kļūda: {}", error), }, ) .await; break; } _ => {} } } }