use std::{ borrow::Cow, collections::VecDeque, convert::Infallible, env, path::PathBuf, time::Duration, }; use axum::{ response::sse::{Event, KeepAlive, Sse}, Json, }; use futures_util::{stream, StreamExt}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tracing::{error, warn}; use crate::inference::python_bridge::PythonBridge; use crate::inference::response_validator::{ AutonomousSessionPayload, CodeGenerationPayload, ImageGenerationPayload, LiveSnapshotPayload, MusicGenerationPayload, OrchestrationRoutePayload, ProjectFilePayload, SessionContextPayload, TextGenerationPayload, ToolTracePayload, ValidateCoreResponse, VideoGenerationPayload, VisionAnalysisPayload, }; use crate::utils::errors::MarisError; // Keep chunks small enough for a natural "typing" feel without splitting almost // every word into its own network event. const CHAT_STREAM_CHUNK_CHARS: usize = 48; // A short pause makes the SSE output feel live while keeping the total response // time fast for already-generated backend results. const CHAT_STREAM_INTERVAL_MS: u64 = 24; #[derive(Clone, Deserialize)] pub struct ChatRequest { pub message: String, pub history: Option>, pub session_id: Option, pub persona_id: Option, pub autonomous_mode: Option, pub fallback_model: Option, pub image_url: Option, pub image_base64: Option, pub camera_id: Option, pub confidence_threshold: Option, } #[derive(Clone, Deserialize, Serialize)] pub struct HistoryMessage { pub role: String, pub content: String, } #[derive(Clone, Serialize)] pub struct ChatAutonomousTaskPayload { pub id: String, pub description: String, pub status: String, pub tool: String, pub result: Option, } #[derive(Clone, Serialize)] pub struct ChatProjectFilePayload { pub path: String, pub absolute_path: Option, } #[derive(Clone, Serialize)] pub struct ChatRouteInfo { pub capability: String, pub branch: String, pub profile: String, pub studio: String, pub reasoning: String, pub confidence: f32, } #[derive(Clone, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum ChatHandoffPayload { Code { code: String, explanation: String, language: String, detected_stack: String, files: Vec, workspace_artifact_dir: Option, bundle_download_url: Option, entrypoint: Option, repo_path: Option, }, Image { image_url: String, prompt: String, }, Music { audio_url: String, title: String, duration_seconds: u32, }, Video { video_url: String, duration_seconds: u32, prompt: String, }, Autonomous { session_id: String, status: String, progress_percent: u32, persona_id: String, persona_title: String, tasks: Vec, }, } #[derive(Clone, Serialize)] pub struct ChatResponse { pub response: String, pub model: String, pub route: ChatRouteInfo, pub handoff: Option, pub vision_context: Option, pub detected_emotion: String, pub emotion_confidence: f32, pub response_style: String, pub persona_id: Option, pub persona_title: Option, pub persona_summary: Option, pub request_id: Option, pub session_id: Option, pub latency_ms: Option, pub tokens_used: Option, pub prompt_messages: Option, pub memory_matches: Option, pub tool_trace: Option, } #[derive(Clone, Serialize)] pub struct ChatVisionContext { pub summary: String, pub source: String, pub model: String, pub fallback_used: bool, pub camera_id: Option, pub image_url: Option, pub width: u32, pub height: u32, pub detections: u32, } #[derive(Clone, Serialize)] pub struct ChatStatusPayload { pub stage: String, pub message: String, } struct ChatExecutionResult { response: String, model: String, handoff: Option, vision_context: Option, detected_emotion: String, emotion_confidence: f32, response_style: String, persona_id: Option, persona_title: Option, persona_summary: Option, request_id: Option, session_id: Option, latency_ms: Option, tokens_used: Option, prompt_messages: Option, memory_matches: Option, tool_trace: Option, } struct PreparedChatRequest { message: String, history: Vec, session_id: Option, persona_id: Option, fallback_model: Option, route: OrchestrationRoutePayload, vision_context: Option, } /// Resolved visual input for `/api/chat` after normalizing direct uploads or /// dereferencing a live camera into its latest snapshot payload. /// /// `image_url` is populated for URL-based uploads or resolved camera snapshots, /// `image_base64` is populated for direct file uploads, and `camera_id` is only /// carried through when the source originated from a camera-aware request. struct ResolvedVisionInput { source: String, image_url: Option, image_base64: Option, camera_id: Option, } #[derive(Debug, Deserialize)] struct CoreTextDeltaPayload { delta: Option, } #[derive(Debug, Deserialize)] struct CoreTextErrorPayload { error: Option, } enum CoreTextStreamEvent { Delta(String), Complete(Box), Error(String), } fn fallback_chat_model(capability: &str) -> String { match capability { "code_generation" => "Maris code handoff".to_string(), "image_generation" => "Maris image handoff".to_string(), "music_generation" => "Maris music handoff".to_string(), "video_generation" => "Maris video handoff".to_string(), "vision_analysis" => "Maris vision handoff".to_string(), _ => "Maris core text model".to_string(), } } fn default_emotion_metadata() -> (String, f32, String) { ("neutral".to_string(), 0.0, "clear_grounded".to_string()) } fn default_route() -> OrchestrationRoutePayload { OrchestrationRoutePayload { capability: "text_chat".to_string(), branch: "master".to_string(), profile: "general".to_string(), target_endpoint: "text/generate".to_string(), target_studio: "/chat".to_string(), reasoning: "Izmanto noklusējuma sarunu atzaru, jo routing serviss nav pieejams." .to_string(), confidence: 0.5, session_context: SessionContextPayload::default(), } } fn generation_status_message(capability: &str) -> &'static str { match capability { "code_generation" => "Veidoju koda draftu.", "autonomous_tasks" => "Sāku izpildāmu uzdevumu plūsmu.", "image_generation" => "Salieku attēla handoff.", "music_generation" => "Komponēju audio handoff.", "video_generation" => "Sagatavoju video handoff.", _ => "Domāju un veidoju atbildi.", } } fn infer_code_language(message: &str) -> String { let normalized = message.to_lowercase(); if normalized.contains("next.js") || normalized.contains("nextjs") { return "Next.js (TypeScript)".to_string(); } if normalized.contains("react") { return "React (TypeScript)".to_string(); } if normalized.contains("typescript") { return "TypeScript".to_string(); } if normalized.contains("javascript") || normalized.contains("node") { return "JavaScript".to_string(); } if normalized.contains("rust") { return "Rust".to_string(); } if normalized.contains("sql") { return "SQL".to_string(); } if normalized.contains("html") || normalized.contains("css") || normalized.contains("ui") || normalized.contains("frontend") || normalized.contains("landing page") || normalized.contains("web app") || normalized.contains("web-app") || normalized.contains("kalkulator") || normalized.contains("calculator") { return "HTML/CSS/JavaScript".to_string(); } "Python".to_string() } fn build_code_context(message: &str, language: &str) -> String { let normalized = message.to_lowercase(); if language == "Next.js (TypeScript)" { return "Atdod pilnu Next.js/TypeScript feature vai app router artefaktu ar pareizu failu struktūru, nevis tikai vienu komponenti.".to_string(); } if language == "HTML/CSS/JavaScript" { return "Atdod vienu pilnīgu, uzreiz pārlūkā palaižamu artefaktu ar visu markup, stilu un loģiku bez TODO placeholderiem.".to_string(); } if language == "React (TypeScript)" { return "Atdod pilnu React/TypeScript komponenti vai mazu feature moduli ar gatavu UI loģiku, nevis tikai skeletu.".to_string(); } if normalized.contains("salabo") || normalized.contains("labo") || normalized.contains("uzlabo") || normalized.contains("improve") || normalized.contains("fix") || normalized.contains("refactor") { return "Ja pieprasījums ir par labojumu vai uzlabojumu, atdod gala kodu ar ielabotu rezultātu, nevis tikai komentāru par pieeju.".to_string(); } String::new() } fn is_repo_edit_request(message: &str) -> bool { let normalized = message.to_lowercase(); [ "edit", "modify", "update", "refactor", "fix", "patch", "rewrite", "change", "labo", "salabo", "rediģē", "uzlabo", "esoš", ] .iter() .any(|keyword| normalized.contains(keyword)) } fn infer_repo_root() -> Option { let mut current = env::current_dir().ok()?; loop { if current.join(".git").exists() || (current.join("backend-rust").is_dir() && current.join("core-python").is_dir() && current.join("frontend").is_dir()) { return Some(current.to_string_lossy().into_owned()); } if !current.pop() { break; } } None } fn infer_repo_path(message: &str) -> Option { for token in message.split_whitespace() { let trimmed = token.trim_matches(|ch: char| { matches!( ch, '"' | '\'' | '`' | ',' | '.' | ';' | ':' | '(' | ')' | '[' | ']' | '{' | '}' ) }); if trimmed.is_empty() { continue; } let candidate = PathBuf::from(trimmed); let resolved = if candidate.is_absolute() || trimmed.starts_with("./") || trimmed.starts_with("../") { candidate.canonicalize().ok() } else { None }; let Some(path) = resolved else { continue; }; if path.is_dir() { return Some(path.to_string_lossy().into_owned()); } if let Some(parent) = path.parent() { return Some(parent.to_string_lossy().into_owned()); } } if is_repo_edit_request(message) { return infer_repo_root(); } None } fn bundle_download_url(workspace_artifact_dir: Option<&str>) -> Option { workspace_artifact_dir.map(|artifact_dir| { format!( "/api/code/download?artifact_dir={}", urlencoding::encode(artifact_dir) ) }) } fn normalize_message(message: &str) -> Result { let trimmed = message.trim(); if trimmed.is_empty() { return Err(MarisError::Validation( "Ziņa nedrīkst būt tukša.".to_string(), )); } Ok(trimmed.to_string()) } fn normalize_optional_text(value: Option) -> Option { value.and_then(|item| { let trimmed = item.trim().to_string(); if trimmed.is_empty() { None } else { Some(trimmed) } }) } fn is_autonomous_mode_enabled(value: Option) -> bool { value.unwrap_or(false) } fn force_autonomous_route( route: OrchestrationRoutePayload, session_id: Option<&str>, persona_id: Option<&str>, ) -> OrchestrationRoutePayload { let mut forced = route; forced.capability = "autonomous_tasks".to_string(); forced.branch = "planner".to_string(); forced.profile = "planner".to_string(); forced.target_endpoint = "autonomous/start".to_string(); forced.target_studio = "/autonomous".to_string(); forced.reasoning = "Autonomous chat mode ir ieslēgts, tāpēc pieprasījums tiek izpildīts caur planner aģenta plūsmu.".to_string(); forced.confidence = 1.0; if let Some(session_id) = session_id { forced.session_context.session_id = session_id.to_string(); } if let Some(persona_id) = persona_id { forced.session_context.persona_id = persona_id.to_string(); } forced } async fn resolve_vision_input( bridge: &PythonBridge, req: &ChatRequest, ) -> Result, MarisError> { let image_url = normalize_optional_text(req.image_url.clone()); let image_base64 = normalize_optional_text(req.image_base64.clone()); let camera_id = normalize_optional_text(req.camera_id.clone()); // Keep this early guard in Rust so `/api/chat` returns a clear validation // error before making any downstream bridge calls into the vision service. if image_url.is_some() && image_base64.is_some() { return Err(MarisError::Validation( "Norādi tikai vienu no image_url vai image_base64.".to_string(), )); } if image_url.is_some() || image_base64.is_some() { return Ok(Some(ResolvedVisionInput { source: "upload".to_string(), image_url, image_base64, camera_id, })); } let Some(camera_id) = camera_id else { return Ok(None); }; let endpoint = format!("vision/live/{camera_id}/snapshot"); let snapshot = bridge .get::(&endpoint) .await .map_err(|err| { MarisError::Bridge(format!( "Neizdevās ielādēt snapshot kamerai {camera_id}: {err}" )) })?; let snapshot_data_url = snapshot.snapshot_data_url.ok_or_else(|| { MarisError::Validation(format!( "Kamerai {camera_id} pašlaik nav pieejams snapshot čatam." )) })?; Ok(Some(ResolvedVisionInput { source: "camera_snapshot".to_string(), image_url: Some(snapshot_data_url), image_base64: None, camera_id: Some(camera_id), })) } async fn analyze_vision_context( bridge: &PythonBridge, req: &ChatRequest, ) -> Result, MarisError> { let Some(input) = resolve_vision_input(bridge, req).await? else { return Ok(None); }; let payload = serde_json::json!({ "image_url": input.image_url, "image_base64": input.image_base64, "camera_id": input.camera_id, "session_id": req.session_id, "confidence_threshold": req.confidence_threshold.unwrap_or(0.25), }); let result = bridge .call::("vision/analyze", &payload) .await .map_err(|err| { error!("Vision analysis failed: {err}"); MarisError::Bridge("Vision analīzes serviss šobrīd nav pieejams.".to_string()) })?; Ok(Some(ChatVisionContext { summary: result.summary, source: input.source, model: result.model, fallback_used: result.fallback_used, camera_id: input.camera_id, image_url: input.image_url, width: result.width, height: result.height, detections: result.detections.len() as u32, })) } async fn resolve_chat_route( bridge: &PythonBridge, message: &str, session_id: Option<&str>, persona_id: Option<&str>, has_vision_input: bool, ) -> OrchestrationRoutePayload { bridge .call::( "orchestrator/route", &serde_json::json!({ "message": message, "session_id": session_id, "persona_id": persona_id, "has_vision_input": has_vision_input, }), ) .await .map_err(|err| { warn!("Routing bridge call failed, falling back to master route: {err}"); err }) .unwrap_or_else(|_| default_route()) } async fn handle_code_handoff( bridge: &PythonBridge, message: &str, fallback_model: Option<&str>, ) -> Result<(String, ChatHandoffPayload), MarisError> { let language = infer_code_language(message); let context = build_code_context(message, &language); let repo_path = infer_repo_path(message); let result = bridge .call::( "code/generate", &serde_json::json!({ "prompt": message, "language": language, "context": context, "repo_path": repo_path, "fallback_model": fallback_model, }), ) .await .map_err(|err| { error!("Code handoff failed: {err}"); MarisError::Bridge("Koda ģenerēšanas serviss šobrīd nav pieejams.".to_string()) })?; let explanation = if result.explanation.is_empty() { format!("Sagatavoju {} draftu pēc tava pieprasījuma.", language) } else { result.explanation }; let files = result .files .into_iter() .map( |ProjectFilePayload { path, absolute_path, .. }| ChatProjectFilePayload { path, absolute_path, }, ) .collect::>(); let workspace_artifact_dir = result.workspace_artifact_dir.clone(); Ok(( explanation.clone(), ChatHandoffPayload::Code { code: result.code, explanation, language: result.language, detected_stack: result.detected_stack, files, workspace_artifact_dir, bundle_download_url: bundle_download_url(result.workspace_artifact_dir.as_deref()), entrypoint: result.entrypoint, repo_path: result.repo_path, }, )) } async fn handle_autonomous_handoff( bridge: &PythonBridge, prepared: &PreparedChatRequest, ) -> Result<(String, ChatHandoffPayload), MarisError> { let session_id = prepared .session_id .clone() .unwrap_or_else(|| prepared.route.session_context.session_id.clone()); let persona_id = prepared .persona_id .clone() .unwrap_or_else(|| prepared.route.session_context.persona_id.clone()); let result = bridge .call::( "autonomous/start", &serde_json::json!({ "session_id": session_id, "goal": prepared.message, "max_steps": 8, "persona_id": persona_id, }), ) .await .map_err(|err| { error!("Autonomous handoff failed: {err}"); MarisError::Bridge("Autonomās izpildes serviss šobrīd nav pieejams.".to_string()) })?; let tasks = result .tasks .iter() .take(4) .map(|task| ChatAutonomousTaskPayload { id: task.id.clone(), description: task.description.clone(), status: task.status.clone(), tool: task.tool.clone(), result: task.result.clone(), }) .collect::>(); let summary = format!( "Palaidu autonomo izpildi: {} uzdevumi, progress {}%, statuss {}.", result.tasks.len(), result.progress_percent, result.status ); Ok(( summary, ChatHandoffPayload::Autonomous { session_id, status: result.status, progress_percent: result.progress_percent, persona_id: result.persona_id, persona_title: result.persona_title, tasks, }, )) } async fn handle_image_handoff( bridge: &PythonBridge, message: &str, ) -> Result<(String, ChatHandoffPayload), MarisError> { let result = bridge .call::( "images/generate", &serde_json::json!({ "prompt": message, "width": 1024, "height": 1024, "steps": 30, }), ) .await .map_err(|err| { error!("Image handoff failed: {err}"); MarisError::Bridge("Attēlu ģenerēšanas serviss šobrīd nav pieejams.".to_string()) })?; Ok(( "Sagatavoju attēla konceptu pēc tava pieprasījuma.".to_string(), ChatHandoffPayload::Image { image_url: result.image_url, prompt: message.to_string(), }, )) } async fn handle_music_handoff( bridge: &PythonBridge, message: &str, ) -> Result<(String, ChatHandoffPayload), MarisError> { let result = bridge .call::( "audio/generate_music", &serde_json::json!({ "prompt": message, "genre": "pop", "duration_seconds": 30, }), ) .await .map_err(|err| { error!("Music handoff failed: {err}"); MarisError::Bridge("Mūzikas ģenerēšanas serviss šobrīd nav pieejams.".to_string()) })?; Ok(( format!("Sagatavoju mūzikas draftu: {}.", result.title), ChatHandoffPayload::Music { audio_url: result.audio_url, title: result.title, duration_seconds: result.duration_seconds, }, )) } async fn handle_video_handoff( bridge: &PythonBridge, message: &str, ) -> Result<(String, ChatHandoffPayload), MarisError> { let result = bridge .call::( "video/generate", &serde_json::json!({ "prompt": message, "duration_seconds": 5, "fps": 24, "resolution": "720p", }), ) .await .map_err(|err| { error!("Video handoff failed: {err}"); MarisError::Bridge("Video ģenerēšanas serviss šobrīd nav pieejams.".to_string()) })?; Ok(( "Sagatavoju video draftu pēc tava pieprasījuma.".to_string(), ChatHandoffPayload::Video { video_url: result.video_url, duration_seconds: result.duration_seconds, prompt: message.to_string(), }, )) } async fn prepare_chat_request(req: ChatRequest) -> Result { let bridge = PythonBridge::new(); let message = normalize_message(&req.message)?; let session_id = normalize_optional_text(req.session_id.clone()); let persona_id = normalize_optional_text(req.persona_id.clone()); let vision_context = analyze_vision_context(&bridge, &req).await?; let route = resolve_chat_route( &bridge, &message, session_id.as_deref(), persona_id.as_deref(), vision_context.is_some(), ) .await; let route = if is_autonomous_mode_enabled(req.autonomous_mode) { force_autonomous_route(route, session_id.as_deref(), persona_id.as_deref()) } else { route }; Ok(PreparedChatRequest { message, history: req.history.unwrap_or_default(), session_id, persona_id, fallback_model: req.fallback_model, route, vision_context, }) } async fn execute_prepared_chat( prepared: &PreparedChatRequest, ) -> Result { let bridge = PythonBridge::new(); match prepared.route.capability.as_str() { "code_generation" => { let (summary, handoff) = handle_code_handoff( &bridge, &prepared.message, prepared.fallback_model.as_deref(), ) .await?; let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata(); Ok(ChatExecutionResult { response: summary, model: fallback_chat_model(&prepared.route.capability), handoff: Some(handoff), vision_context: None, detected_emotion, emotion_confidence, response_style, persona_id: None, persona_title: None, persona_summary: None, request_id: None, session_id: None, latency_ms: None, tokens_used: None, prompt_messages: None, memory_matches: None, tool_trace: None, }) } "autonomous_tasks" => { let (summary, handoff) = handle_autonomous_handoff(&bridge, prepared).await?; let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata(); Ok(ChatExecutionResult { response: summary, model: fallback_chat_model(&prepared.route.capability), handoff: Some(handoff), vision_context: None, detected_emotion, emotion_confidence, response_style, persona_id: prepared.persona_id.clone(), persona_title: None, persona_summary: None, request_id: None, session_id: prepared.session_id.clone(), latency_ms: None, tokens_used: None, prompt_messages: None, memory_matches: None, tool_trace: None, }) } "image_generation" => { let (summary, handoff) = handle_image_handoff(&bridge, &prepared.message).await?; let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata(); Ok(ChatExecutionResult { response: summary, model: fallback_chat_model(&prepared.route.capability), handoff: Some(handoff), vision_context: None, detected_emotion, emotion_confidence, response_style, persona_id: None, persona_title: None, persona_summary: None, request_id: None, session_id: None, latency_ms: None, tokens_used: None, prompt_messages: None, memory_matches: None, tool_trace: None, }) } "music_generation" => { let (summary, handoff) = handle_music_handoff(&bridge, &prepared.message).await?; let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata(); Ok(ChatExecutionResult { response: summary, model: fallback_chat_model(&prepared.route.capability), handoff: Some(handoff), vision_context: None, detected_emotion, emotion_confidence, response_style, persona_id: None, persona_title: None, persona_summary: None, request_id: None, session_id: None, latency_ms: None, tokens_used: None, prompt_messages: None, memory_matches: None, tool_trace: None, }) } "video_generation" => { let (summary, handoff) = handle_video_handoff(&bridge, &prepared.message).await?; let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata(); Ok(ChatExecutionResult { response: summary, model: fallback_chat_model(&prepared.route.capability), handoff: Some(handoff), vision_context: None, detected_emotion, emotion_confidence, response_style, persona_id: None, persona_title: None, persona_summary: None, request_id: None, session_id: None, latency_ms: None, tokens_used: None, prompt_messages: None, memory_matches: None, tool_trace: None, }) } _ => { let payload = build_text_generation_payload(prepared); let result = bridge .call::("text/generate", &payload) .await .map_err(|err| { error!("Chat bridge call failed: {err}"); MarisError::Bridge( "Čata serviss šobrīd nav pieejams. Lūdzu mēģiniet vēlreiz.".to_string(), ) })?; Ok(ChatExecutionResult { response: result.response, model: if result.model.is_empty() { fallback_chat_model(&prepared.route.capability) } else { result.model }, handoff: None, vision_context: prepared.vision_context.clone(), detected_emotion: result.detected_emotion, emotion_confidence: result.emotion_confidence, response_style: result.response_style, persona_id: (!result.persona_id.is_empty()).then_some(result.persona_id), persona_title: (!result.persona_title.is_empty()).then_some(result.persona_title), persona_summary: (!result.persona_summary.is_empty()) .then_some(result.persona_summary), request_id: (!result.request_id.is_empty()).then_some(result.request_id), session_id: (!result.session_id.is_empty()).then_some(result.session_id), latency_ms: (result.latency_ms > 0).then_some(result.latency_ms), tokens_used: (result.tokens_used > 0).then_some(result.tokens_used), prompt_messages: (result.prompt_messages > 0).then_some(result.prompt_messages), memory_matches: (result.memory_matches > 0).then_some(result.memory_matches), tool_trace: result.tool_trace, }) } } } fn build_text_generation_payload(prepared: &PreparedChatRequest) -> serde_json::Value { serde_json::json!({ "message": &prepared.message, "history": &prepared.history, "profile": &prepared.route.profile, "persona_id": &prepared.persona_id, "session_id": &prepared.session_id, "fallback_model": &prepared.fallback_model, "vision_context": prepared.vision_context.as_ref().map(|context| { serde_json::json!({ "summary": &context.summary, "source": &context.source, "model": &context.model, "fallback_used": context.fallback_used, "camera_id": &context.camera_id, "image_url": &context.image_url, "width": context.width, "height": context.height, "detections": context.detections, }) }), }) } fn parse_core_text_stream_event( raw_event: &str, ) -> Result, MarisError> { let normalized = if raw_event.contains("\r\n") { Cow::Owned(raw_event.replace("\r\n", "\n")) } else { Cow::Borrowed(raw_event) }; let trimmed = normalized.trim(); if trimmed.is_empty() || trimmed.starts_with(':') { return Ok(None); } let mut event_name = "message"; let mut data_lines = Vec::new(); for line in normalized.lines() { if let Some(value) = line.strip_prefix("event:") { event_name = value.trim(); continue; } if let Some(value) = line.strip_prefix("data:") { data_lines.push(value.trim_start()); } } if data_lines.is_empty() { return Ok(None); } let payload = data_lines.join("\n"); match event_name { "delta" => { let parsed: CoreTextDeltaPayload = serde_json::from_str(&payload).map_err(|err| { MarisError::Bridge(format!("Neizdevās parsēt core text delta eventu: {err}")) })?; Ok(parsed.delta.map(CoreTextStreamEvent::Delta)) } "complete" => { let parsed: TextGenerationPayload = serde_json::from_str(&payload).map_err(|err| { MarisError::Bridge(format!("Neizdevās parsēt core text complete eventu: {err}")) })?; Ok(Some(CoreTextStreamEvent::Complete(Box::new( parsed.validate().map_err(|err| { MarisError::Bridge(format!("Core text stream atgrieza nederīgu payload: {err}")) })?, )))) } "error" => { let parsed: CoreTextErrorPayload = serde_json::from_str(&payload).map_err(|err| { MarisError::Bridge(format!("Neizdevās parsēt core text error eventu: {err}")) })?; Ok(Some(CoreTextStreamEvent::Error( parsed .error .unwrap_or_else(|| "Core text stream kļūda.".to_string()), ))) } _ => Ok(None), } } fn take_next_core_text_stream_frame(buffer: &mut String) -> Option { if buffer.contains("\r\n") { *buffer = buffer.replace("\r\n", "\n"); } if buffer.contains('\r') { *buffer = buffer.replace('\r', "\n"); } let index = buffer.find("\n\n")?; let raw_event = buffer[..index].to_string(); *buffer = buffer[index + 2..].to_string(); Some(raw_event) } fn build_stream_text_chat_result( prepared: &PreparedChatRequest, result: TextGenerationPayload, ) -> ChatExecutionResult { ChatExecutionResult { response: result.response, model: if result.model.is_empty() { fallback_chat_model(&prepared.route.capability) } else { result.model }, handoff: None, vision_context: prepared.vision_context.clone(), detected_emotion: result.detected_emotion, emotion_confidence: result.emotion_confidence, response_style: result.response_style, persona_id: (!result.persona_id.is_empty()).then_some(result.persona_id), persona_title: (!result.persona_title.is_empty()).then_some(result.persona_title), persona_summary: (!result.persona_summary.is_empty()).then_some(result.persona_summary), request_id: (!result.request_id.is_empty()).then_some(result.request_id), session_id: (!result.session_id.is_empty()).then_some(result.session_id), latency_ms: (result.latency_ms > 0).then_some(result.latency_ms), tokens_used: (result.tokens_used > 0).then_some(result.tokens_used), prompt_messages: (result.prompt_messages > 0).then_some(result.prompt_messages), memory_matches: (result.memory_matches > 0).then_some(result.memory_matches), tool_trace: result.tool_trace, } } async fn stream_text_chat_response( bridge: &PythonBridge, prepared: &PreparedChatRequest, tx: &mpsc::Sender>, ) -> Result, MarisError> { let payload = build_text_generation_payload(prepared); let response = bridge .stream("text/generate/stream", &payload) .await .map_err(|err| { error!("Chat bridge stream call failed: {err}"); MarisError::Bridge( "Čata serviss šobrīd nav pieejams. Lūdzu mēģiniet vēlreiz.".to_string(), ) })?; let mut stream = response.bytes_stream(); let mut buffer = String::new(); while let Some(next_chunk) = tokio::select! { _ = tx.closed() => return Ok(None), next = stream.next() => next, } { let chunk = next_chunk.map_err(|err| { MarisError::Bridge(format!("Neizdevās nolasīt core text straumi: {err}")) })?; buffer.push_str(&String::from_utf8_lossy(&chunk)); while let Some(raw_event) = take_next_core_text_stream_frame(&mut buffer) { match parse_core_text_stream_event(&raw_event)? { Some(CoreTextStreamEvent::Delta(delta)) => { if !try_send_stream_event( tx, build_sse_event("delta", &serde_json::json!({ "delta": delta })), ) .await { return Ok(None); } } Some(CoreTextStreamEvent::Complete(payload)) => { return Ok(Some(build_stream_text_chat_result(prepared, *payload))); } Some(CoreTextStreamEvent::Error(message)) => { return Err(MarisError::Bridge(message)); } None => {} } } } if !buffer.trim().is_empty() { match parse_core_text_stream_event(&buffer)? { Some(CoreTextStreamEvent::Delta(delta)) => { if !try_send_stream_event( tx, build_sse_event("delta", &serde_json::json!({ "delta": delta })), ) .await { return Ok(None); } } Some(CoreTextStreamEvent::Complete(payload)) => { return Ok(Some(build_stream_text_chat_result(prepared, *payload))); } Some(CoreTextStreamEvent::Error(message)) => { return Err(MarisError::Bridge(message)); } None => {} } } Err(MarisError::Bridge( "Core text straume noslēdzās bez final payload.".to_string(), )) } fn build_chat_response( route: OrchestrationRoutePayload, chat_result: ChatExecutionResult, ) -> ChatResponse { ChatResponse { response: chat_result.response, model: chat_result.model, route: ChatRouteInfo { capability: route.capability, branch: route.branch, profile: route.profile, studio: route.target_studio, reasoning: route.reasoning, confidence: route.confidence, }, handoff: chat_result.handoff, vision_context: chat_result.vision_context, detected_emotion: chat_result.detected_emotion, emotion_confidence: chat_result.emotion_confidence, response_style: chat_result.response_style, persona_id: chat_result.persona_id, persona_title: chat_result.persona_title, persona_summary: chat_result.persona_summary, request_id: chat_result.request_id, session_id: chat_result.session_id, latency_ms: chat_result.latency_ms, tokens_used: chat_result.tokens_used, prompt_messages: chat_result.prompt_messages, memory_matches: chat_result.memory_matches, tool_trace: chat_result.tool_trace, } } fn build_sse_event(event_name: &str, payload: &T) -> Event { Event::default() .event(event_name) .data(serde_json::to_string(payload).unwrap_or_else(|_| "{}".to_string())) } fn build_status_event(stage: &str, message: &str) -> Event { build_sse_event( "status", &ChatStatusPayload { stage: stage.to_string(), message: message.to_string(), }, ) } async fn try_send_stream_event(tx: &mpsc::Sender>, event: Event) -> bool { tx.send(Ok(event)).await.is_ok() } fn chunk_response_for_streaming(text: &str) -> Vec { let mut chunks = Vec::new(); let mut current = String::new(); let mut last_boundary = None; for ch in text.chars() { current.push(ch); if ch.is_whitespace() { last_boundary = Some(current.len()); } if current.len() >= CHAT_STREAM_CHUNK_CHARS { if let Some(boundary) = last_boundary .take() .filter(|boundary| *boundary < current.len()) { let remainder = current.split_off(boundary); chunks.push(std::mem::take(&mut current)); current = remainder; } else { chunks.push(std::mem::take(&mut current)); } } } if !current.is_empty() { chunks.push(current); } chunks } fn build_stream_delta_events(response: &str) -> VecDeque { chunk_response_for_streaming(response) .into_iter() .map(|delta| build_sse_event("delta", &serde_json::json!({ "delta": delta }))) .collect() } pub async fn chat(Json(req): Json) -> Result, MarisError> { let prepared = prepare_chat_request(req).await?; let chat_result = execute_prepared_chat(&prepared).await?; Ok(Json(build_chat_response(prepared.route, chat_result))) } pub async fn chat_stream( Json(req): Json, ) -> Result>>, MarisError> { let message = normalize_message(&req.message)?; let (tx, rx) = mpsc::channel::>(32); let history = req.history.clone().unwrap_or_default(); tokio::spawn(async move { let bridge = PythonBridge::new(); let session_id = normalize_optional_text(req.session_id.clone()); let persona_id = normalize_optional_text(req.persona_id.clone()); let autonomous_mode = is_autonomous_mode_enabled(req.autonomous_mode); let fallback_model = normalize_optional_text(req.fallback_model.clone()); if !try_send_stream_event( &tx, build_status_event("presence", "Savienojums izveidots — Maris ir klāt."), ) .await { return; } let has_vision_input = req.image_url.is_some() || req.image_base64.is_some() || req .camera_id .as_ref() .is_some_and(|camera_id| !camera_id.trim().is_empty()); if has_vision_input && !try_send_stream_event( &tx, build_status_event("vision", "Analizēju vizuālo kontekstu."), ) .await { return; } let vision_context = match analyze_vision_context(&bridge, &req).await { Ok(vision_context) => vision_context, Err(err) => { let _ = try_send_stream_event( &tx, build_sse_event("error", &serde_json::json!({ "error": err.to_string() })), ) .await; return; } }; if let Some(vision_context) = vision_context.as_ref() { if !try_send_stream_event(&tx, build_sse_event("vision_context", vision_context)).await { return; } } if !try_send_stream_event( &tx, build_status_event("routing", "Izvēlos labāko atbildes režīmu."), ) .await { return; } let route = resolve_chat_route( &bridge, &message, session_id.as_deref(), persona_id.as_deref(), vision_context.is_some(), ) .await; let route = if autonomous_mode { force_autonomous_route(route, session_id.as_deref(), persona_id.as_deref()) } else { route }; if !try_send_stream_event(&tx, build_sse_event("route", &route)).await { return; } if !try_send_stream_event( &tx, build_status_event("generation", generation_status_message(&route.capability)), ) .await { return; } let prepared = PreparedChatRequest { message, history, session_id, persona_id, fallback_model, route, vision_context, }; if prepared.route.capability == "text_chat" { match stream_text_chat_response(&bridge, &prepared, &tx).await { Ok(Some(chat_result)) => { let response = build_chat_response(prepared.route, chat_result); let _ = try_send_stream_event(&tx, build_sse_event("complete", &response)).await; } Ok(None) => {} Err(err) => { let _ = try_send_stream_event( &tx, build_sse_event("error", &serde_json::json!({ "error": err.to_string() })), ) .await; } } return; } match execute_prepared_chat(&prepared).await { Ok(chat_result) => { let response = build_chat_response(prepared.route, chat_result); for delta in build_stream_delta_events(&response.response) { if !try_send_stream_event(&tx, delta).await { return; } tokio::time::sleep(Duration::from_millis(CHAT_STREAM_INTERVAL_MS)).await; } let _ = try_send_stream_event(&tx, build_sse_event("complete", &response)).await; } Err(err) => { let _ = try_send_stream_event( &tx, build_sse_event("error", &serde_json::json!({ "error": err.to_string() })), ) .await; } } }); let stream = stream::unfold(rx, |mut rx| async move { rx.recv().await.map(|event| (event, rx)) }); Ok(Sse::new(stream).keep_alive( KeepAlive::new() .interval(Duration::from_secs(10)) .text("keepalive"), )) } #[cfg(test)] mod tests { use super::{ chunk_response_for_streaming, generation_status_message, normalize_message, parse_core_text_stream_event, take_next_core_text_stream_frame, CoreTextStreamEvent, }; use crate::utils::errors::MarisError; #[test] fn rejects_empty_messages() { let err = normalize_message(" ").unwrap_err(); assert!(matches!(err, MarisError::Validation(_))); assert_eq!(err.to_string(), "Derīguma kļūda: Ziņa nedrīkst būt tukša."); } #[test] fn trims_valid_messages() { assert_eq!( normalize_message(" Sveiki, Maris! ").unwrap(), "Sveiki, Maris!" ); } #[test] fn chunks_streaming_response_without_losing_text() { let text = "Šī ir garāka atbilde ar vairākiem vārdiem, lai pārbaudītu straumēšanas chunkus."; let chunks = chunk_response_for_streaming(text); assert!(chunks.len() >= 2); assert_eq!(chunks.concat(), text); } #[test] fn maps_generation_status_by_capability() { assert_eq!( generation_status_message("code_generation"), "Veidoju koda draftu." ); assert_eq!( generation_status_message("text_chat"), "Domāju un veidoju atbildi." ); } #[test] fn parses_core_text_delta_event() { let event = parse_core_text_stream_event("event: delta\ndata: {\"delta\":\"Sveiki\"}\n") .unwrap() .unwrap(); match event { CoreTextStreamEvent::Delta(delta) => assert_eq!(delta, "Sveiki"), _ => panic!("expected delta event"), } } #[test] fn parses_core_text_complete_event() { let event = parse_core_text_stream_event( "event: complete\ndata: {\"response\":\"Sveiki\",\"model\":\"m\",\"detected_emotion\":\"neutral\",\"emotion_confidence\":0.1,\"response_style\":\"clear_grounded\"}\n", ) .unwrap() .unwrap(); match event { CoreTextStreamEvent::Complete(payload) => { assert_eq!(payload.response, "Sveiki"); assert_eq!(payload.model, "m"); } _ => panic!("expected complete event"), } } #[test] fn extracts_multiple_core_text_events_with_crlf_delimiters() { let mut buffer = concat!( "event: delta\r\n", "data: {\"delta\":\"Sveiki\"}\r\n\r\n", "event: complete\r\n", "data: {\"response\":\"Sveiki\",\"model\":\"m\",\"detected_emotion\":\"neutral\",\"emotion_confidence\":0.1,\"response_style\":\"clear_grounded\"}\r\n\r\n" ) .to_string(); let first = take_next_core_text_stream_frame(&mut buffer).expect("first event"); let second = take_next_core_text_stream_frame(&mut buffer).expect("second event"); assert!(buffer.is_empty()); assert!(matches!( parse_core_text_stream_event(&first).unwrap(), Some(CoreTextStreamEvent::Delta(delta)) if delta == "Sveiki" )); assert!(matches!( parse_core_text_stream_event(&second).unwrap(), Some(CoreTextStreamEvent::Complete(payload)) if payload.response == "Sveiki" )); } }