| use std::{ |
| borrow::Cow, collections::VecDeque, convert::Infallible, env, path::PathBuf, time::Duration, |
| }; |
|
|
| use axum::{ |
| response::sse::{Event, KeepAlive, Sse}, |
| Json, |
| }; |
| use futures_util::{stream, StreamExt}; |
| use serde::{Deserialize, Serialize}; |
| use tokio::sync::mpsc; |
| use tracing::{error, warn}; |
|
|
| use crate::inference::python_bridge::PythonBridge; |
| use crate::inference::response_validator::{ |
| AutonomousSessionPayload, CodeGenerationPayload, ImageGenerationPayload, LiveSnapshotPayload, |
| MusicGenerationPayload, OrchestrationRoutePayload, ProjectFilePayload, SessionContextPayload, |
| TextGenerationPayload, ToolTracePayload, ValidateCoreResponse, VideoGenerationPayload, |
| VisionAnalysisPayload, |
| }; |
| use crate::utils::errors::MarisError; |
|
|
| |
| |
| const CHAT_STREAM_CHUNK_CHARS: usize = 48; |
| |
| |
| const CHAT_STREAM_INTERVAL_MS: u64 = 24; |
|
|
| #[derive(Clone, Deserialize)] |
| pub struct ChatRequest { |
| pub message: String, |
| pub history: Option<Vec<HistoryMessage>>, |
| pub session_id: Option<String>, |
| pub persona_id: Option<String>, |
| pub autonomous_mode: Option<bool>, |
| pub fallback_model: Option<String>, |
| pub image_url: Option<String>, |
| pub image_base64: Option<String>, |
| pub camera_id: Option<String>, |
| pub confidence_threshold: Option<f32>, |
| } |
|
|
| #[derive(Clone, Deserialize, Serialize)] |
| pub struct HistoryMessage { |
| pub role: String, |
| pub content: String, |
| } |
|
|
| #[derive(Clone, Serialize)] |
| pub struct ChatAutonomousTaskPayload { |
| pub id: String, |
| pub description: String, |
| pub status: String, |
| pub tool: String, |
| pub result: Option<String>, |
| } |
|
|
| #[derive(Clone, Serialize)] |
| pub struct ChatProjectFilePayload { |
| pub path: String, |
| pub absolute_path: Option<String>, |
| } |
|
|
| #[derive(Clone, Serialize)] |
| pub struct ChatRouteInfo { |
| pub capability: String, |
| pub branch: String, |
| pub profile: String, |
| pub studio: String, |
| pub reasoning: String, |
| pub confidence: f32, |
| } |
|
|
| #[derive(Clone, Serialize)] |
| #[serde(tag = "type", rename_all = "snake_case")] |
| pub enum ChatHandoffPayload { |
| Code { |
| code: String, |
| explanation: String, |
| language: String, |
| detected_stack: String, |
| files: Vec<ChatProjectFilePayload>, |
| workspace_artifact_dir: Option<String>, |
| bundle_download_url: Option<String>, |
| entrypoint: Option<String>, |
| repo_path: Option<String>, |
| }, |
| Image { |
| image_url: String, |
| prompt: String, |
| }, |
| Music { |
| audio_url: String, |
| title: String, |
| duration_seconds: u32, |
| }, |
| Video { |
| video_url: String, |
| duration_seconds: u32, |
| prompt: String, |
| }, |
| Autonomous { |
| session_id: String, |
| status: String, |
| progress_percent: u32, |
| persona_id: String, |
| persona_title: String, |
| tasks: Vec<ChatAutonomousTaskPayload>, |
| }, |
| } |
|
|
| #[derive(Clone, Serialize)] |
| pub struct ChatResponse { |
| pub response: String, |
| pub model: String, |
| pub route: ChatRouteInfo, |
| pub handoff: Option<ChatHandoffPayload>, |
| pub vision_context: Option<ChatVisionContext>, |
| pub detected_emotion: String, |
| pub emotion_confidence: f32, |
| pub response_style: String, |
| pub persona_id: Option<String>, |
| pub persona_title: Option<String>, |
| pub persona_summary: Option<String>, |
| pub request_id: Option<String>, |
| pub session_id: Option<String>, |
| pub latency_ms: Option<u32>, |
| pub tokens_used: Option<u32>, |
| pub prompt_messages: Option<u32>, |
| pub memory_matches: Option<u32>, |
| pub tool_trace: Option<ToolTracePayload>, |
| } |
|
|
| #[derive(Clone, Serialize)] |
| pub struct ChatVisionContext { |
| pub summary: String, |
| pub source: String, |
| pub model: String, |
| pub fallback_used: bool, |
| pub camera_id: Option<String>, |
| pub image_url: Option<String>, |
| pub width: u32, |
| pub height: u32, |
| pub detections: u32, |
| } |
|
|
| #[derive(Clone, Serialize)] |
| pub struct ChatStatusPayload { |
| pub stage: String, |
| pub message: String, |
| } |
|
|
| struct ChatExecutionResult { |
| response: String, |
| model: String, |
| handoff: Option<ChatHandoffPayload>, |
| vision_context: Option<ChatVisionContext>, |
| detected_emotion: String, |
| emotion_confidence: f32, |
| response_style: String, |
| persona_id: Option<String>, |
| persona_title: Option<String>, |
| persona_summary: Option<String>, |
| request_id: Option<String>, |
| session_id: Option<String>, |
| latency_ms: Option<u32>, |
| tokens_used: Option<u32>, |
| prompt_messages: Option<u32>, |
| memory_matches: Option<u32>, |
| tool_trace: Option<ToolTracePayload>, |
| } |
|
|
| struct PreparedChatRequest { |
| message: String, |
| history: Vec<HistoryMessage>, |
| session_id: Option<String>, |
| persona_id: Option<String>, |
| fallback_model: Option<String>, |
| route: OrchestrationRoutePayload, |
| vision_context: Option<ChatVisionContext>, |
| } |
|
|
| |
| |
| |
| |
| |
| |
| struct ResolvedVisionInput { |
| source: String, |
| image_url: Option<String>, |
| image_base64: Option<String>, |
| camera_id: Option<String>, |
| } |
|
|
| #[derive(Debug, Deserialize)] |
| struct CoreTextDeltaPayload { |
| delta: Option<String>, |
| } |
|
|
| #[derive(Debug, Deserialize)] |
| struct CoreTextErrorPayload { |
| error: Option<String>, |
| } |
|
|
| enum CoreTextStreamEvent { |
| Delta(String), |
| Complete(Box<TextGenerationPayload>), |
| Error(String), |
| } |
|
|
| fn fallback_chat_model(capability: &str) -> String { |
| match capability { |
| "code_generation" => "Maris code handoff".to_string(), |
| "image_generation" => "Maris image handoff".to_string(), |
| "music_generation" => "Maris music handoff".to_string(), |
| "video_generation" => "Maris video handoff".to_string(), |
| "vision_analysis" => "Maris vision handoff".to_string(), |
| _ => "Maris core text model".to_string(), |
| } |
| } |
|
|
| fn default_emotion_metadata() -> (String, f32, String) { |
| ("neutral".to_string(), 0.0, "clear_grounded".to_string()) |
| } |
|
|
| fn default_route() -> OrchestrationRoutePayload { |
| OrchestrationRoutePayload { |
| capability: "text_chat".to_string(), |
| branch: "master".to_string(), |
| profile: "general".to_string(), |
| target_endpoint: "text/generate".to_string(), |
| target_studio: "/chat".to_string(), |
| reasoning: "Izmanto noklusējuma sarunu atzaru, jo routing serviss nav pieejams." |
| .to_string(), |
| confidence: 0.5, |
| session_context: SessionContextPayload::default(), |
| } |
| } |
|
|
| fn generation_status_message(capability: &str) -> &'static str { |
| match capability { |
| "code_generation" => "Veidoju koda draftu.", |
| "autonomous_tasks" => "Sāku izpildāmu uzdevumu plūsmu.", |
| "image_generation" => "Salieku attēla handoff.", |
| "music_generation" => "Komponēju audio handoff.", |
| "video_generation" => "Sagatavoju video handoff.", |
| _ => "Domāju un veidoju atbildi.", |
| } |
| } |
|
|
| fn infer_code_language(message: &str) -> String { |
| let normalized = message.to_lowercase(); |
| if normalized.contains("next.js") || normalized.contains("nextjs") { |
| return "Next.js (TypeScript)".to_string(); |
| } |
| if normalized.contains("react") { |
| return "React (TypeScript)".to_string(); |
| } |
| if normalized.contains("typescript") { |
| return "TypeScript".to_string(); |
| } |
| if normalized.contains("javascript") || normalized.contains("node") { |
| return "JavaScript".to_string(); |
| } |
| if normalized.contains("rust") { |
| return "Rust".to_string(); |
| } |
| if normalized.contains("sql") { |
| return "SQL".to_string(); |
| } |
| if normalized.contains("html") |
| || normalized.contains("css") |
| || normalized.contains("ui") |
| || normalized.contains("frontend") |
| || normalized.contains("landing page") |
| || normalized.contains("web app") |
| || normalized.contains("web-app") |
| || normalized.contains("kalkulator") |
| || normalized.contains("calculator") |
| { |
| return "HTML/CSS/JavaScript".to_string(); |
| } |
| "Python".to_string() |
| } |
|
|
| fn build_code_context(message: &str, language: &str) -> String { |
| let normalized = message.to_lowercase(); |
| if language == "Next.js (TypeScript)" { |
| return "Atdod pilnu Next.js/TypeScript feature vai app router artefaktu ar pareizu failu struktūru, nevis tikai vienu komponenti.".to_string(); |
| } |
| if language == "HTML/CSS/JavaScript" { |
| return "Atdod vienu pilnīgu, uzreiz pārlūkā palaižamu artefaktu ar visu markup, stilu un loģiku bez TODO placeholderiem.".to_string(); |
| } |
| if language == "React (TypeScript)" { |
| return "Atdod pilnu React/TypeScript komponenti vai mazu feature moduli ar gatavu UI loģiku, nevis tikai skeletu.".to_string(); |
| } |
| if normalized.contains("salabo") |
| || normalized.contains("labo") |
| || normalized.contains("uzlabo") |
| || normalized.contains("improve") |
| || normalized.contains("fix") |
| || normalized.contains("refactor") |
| { |
| return "Ja pieprasījums ir par labojumu vai uzlabojumu, atdod gala kodu ar ielabotu rezultātu, nevis tikai komentāru par pieeju.".to_string(); |
| } |
| String::new() |
| } |
|
|
| fn is_repo_edit_request(message: &str) -> bool { |
| let normalized = message.to_lowercase(); |
| [ |
| "edit", "modify", "update", "refactor", "fix", "patch", "rewrite", "change", "labo", |
| "salabo", "rediģē", "uzlabo", "esoš", |
| ] |
| .iter() |
| .any(|keyword| normalized.contains(keyword)) |
| } |
|
|
| fn infer_repo_root() -> Option<String> { |
| let mut current = env::current_dir().ok()?; |
| loop { |
| if current.join(".git").exists() |
| || (current.join("backend-rust").is_dir() |
| && current.join("core-python").is_dir() |
| && current.join("frontend").is_dir()) |
| { |
| return Some(current.to_string_lossy().into_owned()); |
| } |
| if !current.pop() { |
| break; |
| } |
| } |
| None |
| } |
|
|
| fn infer_repo_path(message: &str) -> Option<String> { |
| for token in message.split_whitespace() { |
| let trimmed = token.trim_matches(|ch: char| { |
| matches!( |
| ch, |
| '"' | '\'' | '`' | ',' | '.' | ';' | ':' | '(' | ')' | '[' | ']' | '{' | '}' |
| ) |
| }); |
| if trimmed.is_empty() { |
| continue; |
| } |
| let candidate = PathBuf::from(trimmed); |
| let resolved = |
| if candidate.is_absolute() || trimmed.starts_with("./") || trimmed.starts_with("../") { |
| candidate.canonicalize().ok() |
| } else { |
| None |
| }; |
| let Some(path) = resolved else { |
| continue; |
| }; |
| if path.is_dir() { |
| return Some(path.to_string_lossy().into_owned()); |
| } |
| if let Some(parent) = path.parent() { |
| return Some(parent.to_string_lossy().into_owned()); |
| } |
| } |
| if is_repo_edit_request(message) { |
| return infer_repo_root(); |
| } |
| None |
| } |
|
|
| fn bundle_download_url(workspace_artifact_dir: Option<&str>) -> Option<String> { |
| workspace_artifact_dir.map(|artifact_dir| { |
| format!( |
| "/api/code/download?artifact_dir={}", |
| urlencoding::encode(artifact_dir) |
| ) |
| }) |
| } |
|
|
| fn normalize_message(message: &str) -> Result<String, MarisError> { |
| let trimmed = message.trim(); |
|
|
| if trimmed.is_empty() { |
| return Err(MarisError::Validation( |
| "Ziņa nedrīkst būt tukša.".to_string(), |
| )); |
| } |
|
|
| Ok(trimmed.to_string()) |
| } |
|
|
| fn normalize_optional_text(value: Option<String>) -> Option<String> { |
| value.and_then(|item| { |
| let trimmed = item.trim().to_string(); |
| if trimmed.is_empty() { |
| None |
| } else { |
| Some(trimmed) |
| } |
| }) |
| } |
|
|
| fn is_autonomous_mode_enabled(value: Option<bool>) -> bool { |
| value.unwrap_or(false) |
| } |
|
|
| fn force_autonomous_route( |
| route: OrchestrationRoutePayload, |
| session_id: Option<&str>, |
| persona_id: Option<&str>, |
| ) -> OrchestrationRoutePayload { |
| let mut forced = route; |
| forced.capability = "autonomous_tasks".to_string(); |
| forced.branch = "planner".to_string(); |
| forced.profile = "planner".to_string(); |
| forced.target_endpoint = "autonomous/start".to_string(); |
| forced.target_studio = "/autonomous".to_string(); |
| forced.reasoning = "Autonomous chat mode ir ieslēgts, tāpēc pieprasījums tiek izpildīts caur planner aģenta plūsmu.".to_string(); |
| forced.confidence = 1.0; |
| if let Some(session_id) = session_id { |
| forced.session_context.session_id = session_id.to_string(); |
| } |
| if let Some(persona_id) = persona_id { |
| forced.session_context.persona_id = persona_id.to_string(); |
| } |
| forced |
| } |
|
|
| async fn resolve_vision_input( |
| bridge: &PythonBridge, |
| req: &ChatRequest, |
| ) -> Result<Option<ResolvedVisionInput>, MarisError> { |
| let image_url = normalize_optional_text(req.image_url.clone()); |
| let image_base64 = normalize_optional_text(req.image_base64.clone()); |
| let camera_id = normalize_optional_text(req.camera_id.clone()); |
|
|
| |
| |
| if image_url.is_some() && image_base64.is_some() { |
| return Err(MarisError::Validation( |
| "Norādi tikai vienu no image_url vai image_base64.".to_string(), |
| )); |
| } |
|
|
| if image_url.is_some() || image_base64.is_some() { |
| return Ok(Some(ResolvedVisionInput { |
| source: "upload".to_string(), |
| image_url, |
| image_base64, |
| camera_id, |
| })); |
| } |
|
|
| let Some(camera_id) = camera_id else { |
| return Ok(None); |
| }; |
|
|
| let endpoint = format!("vision/live/{camera_id}/snapshot"); |
| let snapshot = bridge |
| .get::<LiveSnapshotPayload>(&endpoint) |
| .await |
| .map_err(|err| { |
| MarisError::Bridge(format!( |
| "Neizdevās ielādēt snapshot kamerai {camera_id}: {err}" |
| )) |
| })?; |
| let snapshot_data_url = snapshot.snapshot_data_url.ok_or_else(|| { |
| MarisError::Validation(format!( |
| "Kamerai {camera_id} pašlaik nav pieejams snapshot čatam." |
| )) |
| })?; |
|
|
| Ok(Some(ResolvedVisionInput { |
| source: "camera_snapshot".to_string(), |
| image_url: Some(snapshot_data_url), |
| image_base64: None, |
| camera_id: Some(camera_id), |
| })) |
| } |
|
|
| async fn analyze_vision_context( |
| bridge: &PythonBridge, |
| req: &ChatRequest, |
| ) -> Result<Option<ChatVisionContext>, MarisError> { |
| let Some(input) = resolve_vision_input(bridge, req).await? else { |
| return Ok(None); |
| }; |
|
|
| let payload = serde_json::json!({ |
| "image_url": input.image_url, |
| "image_base64": input.image_base64, |
| "camera_id": input.camera_id, |
| "session_id": req.session_id, |
| "confidence_threshold": req.confidence_threshold.unwrap_or(0.25), |
| }); |
| let result = bridge |
| .call::<VisionAnalysisPayload>("vision/analyze", &payload) |
| .await |
| .map_err(|err| { |
| error!("Vision analysis failed: {err}"); |
| MarisError::Bridge("Vision analīzes serviss šobrīd nav pieejams.".to_string()) |
| })?; |
|
|
| Ok(Some(ChatVisionContext { |
| summary: result.summary, |
| source: input.source, |
| model: result.model, |
| fallback_used: result.fallback_used, |
| camera_id: input.camera_id, |
| image_url: input.image_url, |
| width: result.width, |
| height: result.height, |
| detections: result.detections.len() as u32, |
| })) |
| } |
|
|
| async fn resolve_chat_route( |
| bridge: &PythonBridge, |
| message: &str, |
| session_id: Option<&str>, |
| persona_id: Option<&str>, |
| has_vision_input: bool, |
| ) -> OrchestrationRoutePayload { |
| bridge |
| .call::<OrchestrationRoutePayload>( |
| "orchestrator/route", |
| &serde_json::json!({ |
| "message": message, |
| "session_id": session_id, |
| "persona_id": persona_id, |
| "has_vision_input": has_vision_input, |
| }), |
| ) |
| .await |
| .map_err(|err| { |
| warn!("Routing bridge call failed, falling back to master route: {err}"); |
| err |
| }) |
| .unwrap_or_else(|_| default_route()) |
| } |
|
|
| async fn handle_code_handoff( |
| bridge: &PythonBridge, |
| message: &str, |
| fallback_model: Option<&str>, |
| ) -> Result<(String, ChatHandoffPayload), MarisError> { |
| let language = infer_code_language(message); |
| let context = build_code_context(message, &language); |
| let repo_path = infer_repo_path(message); |
| let result = bridge |
| .call::<CodeGenerationPayload>( |
| "code/generate", |
| &serde_json::json!({ |
| "prompt": message, |
| "language": language, |
| "context": context, |
| "repo_path": repo_path, |
| "fallback_model": fallback_model, |
| }), |
| ) |
| .await |
| .map_err(|err| { |
| error!("Code handoff failed: {err}"); |
| MarisError::Bridge("Koda ģenerēšanas serviss šobrīd nav pieejams.".to_string()) |
| })?; |
|
|
| let explanation = if result.explanation.is_empty() { |
| format!("Sagatavoju {} draftu pēc tava pieprasījuma.", language) |
| } else { |
| result.explanation |
| }; |
| let files = result |
| .files |
| .into_iter() |
| .map( |
| |ProjectFilePayload { |
| path, |
| absolute_path, |
| .. |
| }| ChatProjectFilePayload { |
| path, |
| absolute_path, |
| }, |
| ) |
| .collect::<Vec<_>>(); |
| let workspace_artifact_dir = result.workspace_artifact_dir.clone(); |
|
|
| Ok(( |
| explanation.clone(), |
| ChatHandoffPayload::Code { |
| code: result.code, |
| explanation, |
| language: result.language, |
| detected_stack: result.detected_stack, |
| files, |
| workspace_artifact_dir, |
| bundle_download_url: bundle_download_url(result.workspace_artifact_dir.as_deref()), |
| entrypoint: result.entrypoint, |
| repo_path: result.repo_path, |
| }, |
| )) |
| } |
|
|
| async fn handle_autonomous_handoff( |
| bridge: &PythonBridge, |
| prepared: &PreparedChatRequest, |
| ) -> Result<(String, ChatHandoffPayload), MarisError> { |
| let session_id = prepared |
| .session_id |
| .clone() |
| .unwrap_or_else(|| prepared.route.session_context.session_id.clone()); |
| let persona_id = prepared |
| .persona_id |
| .clone() |
| .unwrap_or_else(|| prepared.route.session_context.persona_id.clone()); |
| let result = bridge |
| .call::<AutonomousSessionPayload>( |
| "autonomous/start", |
| &serde_json::json!({ |
| "session_id": session_id, |
| "goal": prepared.message, |
| "max_steps": 8, |
| "persona_id": persona_id, |
| }), |
| ) |
| .await |
| .map_err(|err| { |
| error!("Autonomous handoff failed: {err}"); |
| MarisError::Bridge("Autonomās izpildes serviss šobrīd nav pieejams.".to_string()) |
| })?; |
|
|
| let tasks = result |
| .tasks |
| .iter() |
| .take(4) |
| .map(|task| ChatAutonomousTaskPayload { |
| id: task.id.clone(), |
| description: task.description.clone(), |
| status: task.status.clone(), |
| tool: task.tool.clone(), |
| result: task.result.clone(), |
| }) |
| .collect::<Vec<_>>(); |
| let summary = format!( |
| "Palaidu autonomo izpildi: {} uzdevumi, progress {}%, statuss {}.", |
| result.tasks.len(), |
| result.progress_percent, |
| result.status |
| ); |
|
|
| Ok(( |
| summary, |
| ChatHandoffPayload::Autonomous { |
| session_id, |
| status: result.status, |
| progress_percent: result.progress_percent, |
| persona_id: result.persona_id, |
| persona_title: result.persona_title, |
| tasks, |
| }, |
| )) |
| } |
|
|
| async fn handle_image_handoff( |
| bridge: &PythonBridge, |
| message: &str, |
| ) -> Result<(String, ChatHandoffPayload), MarisError> { |
| let result = bridge |
| .call::<ImageGenerationPayload>( |
| "images/generate", |
| &serde_json::json!({ |
| "prompt": message, |
| "width": 1024, |
| "height": 1024, |
| "steps": 30, |
| }), |
| ) |
| .await |
| .map_err(|err| { |
| error!("Image handoff failed: {err}"); |
| MarisError::Bridge("Attēlu ģenerēšanas serviss šobrīd nav pieejams.".to_string()) |
| })?; |
|
|
| Ok(( |
| "Sagatavoju attēla konceptu pēc tava pieprasījuma.".to_string(), |
| ChatHandoffPayload::Image { |
| image_url: result.image_url, |
| prompt: message.to_string(), |
| }, |
| )) |
| } |
|
|
| async fn handle_music_handoff( |
| bridge: &PythonBridge, |
| message: &str, |
| ) -> Result<(String, ChatHandoffPayload), MarisError> { |
| let result = bridge |
| .call::<MusicGenerationPayload>( |
| "audio/generate_music", |
| &serde_json::json!({ |
| "prompt": message, |
| "genre": "pop", |
| "duration_seconds": 30, |
| }), |
| ) |
| .await |
| .map_err(|err| { |
| error!("Music handoff failed: {err}"); |
| MarisError::Bridge("Mūzikas ģenerēšanas serviss šobrīd nav pieejams.".to_string()) |
| })?; |
|
|
| Ok(( |
| format!("Sagatavoju mūzikas draftu: {}.", result.title), |
| ChatHandoffPayload::Music { |
| audio_url: result.audio_url, |
| title: result.title, |
| duration_seconds: result.duration_seconds, |
| }, |
| )) |
| } |
|
|
| async fn handle_video_handoff( |
| bridge: &PythonBridge, |
| message: &str, |
| ) -> Result<(String, ChatHandoffPayload), MarisError> { |
| let result = bridge |
| .call::<VideoGenerationPayload>( |
| "video/generate", |
| &serde_json::json!({ |
| "prompt": message, |
| "duration_seconds": 5, |
| "fps": 24, |
| "resolution": "720p", |
| }), |
| ) |
| .await |
| .map_err(|err| { |
| error!("Video handoff failed: {err}"); |
| MarisError::Bridge("Video ģenerēšanas serviss šobrīd nav pieejams.".to_string()) |
| })?; |
|
|
| Ok(( |
| "Sagatavoju video draftu pēc tava pieprasījuma.".to_string(), |
| ChatHandoffPayload::Video { |
| video_url: result.video_url, |
| duration_seconds: result.duration_seconds, |
| prompt: message.to_string(), |
| }, |
| )) |
| } |
|
|
| async fn prepare_chat_request(req: ChatRequest) -> Result<PreparedChatRequest, MarisError> { |
| let bridge = PythonBridge::new(); |
| let message = normalize_message(&req.message)?; |
| let session_id = normalize_optional_text(req.session_id.clone()); |
| let persona_id = normalize_optional_text(req.persona_id.clone()); |
| let vision_context = analyze_vision_context(&bridge, &req).await?; |
| let route = resolve_chat_route( |
| &bridge, |
| &message, |
| session_id.as_deref(), |
| persona_id.as_deref(), |
| vision_context.is_some(), |
| ) |
| .await; |
| let route = if is_autonomous_mode_enabled(req.autonomous_mode) { |
| force_autonomous_route(route, session_id.as_deref(), persona_id.as_deref()) |
| } else { |
| route |
| }; |
|
|
| Ok(PreparedChatRequest { |
| message, |
| history: req.history.unwrap_or_default(), |
| session_id, |
| persona_id, |
| fallback_model: req.fallback_model, |
| route, |
| vision_context, |
| }) |
| } |
|
|
| async fn execute_prepared_chat( |
| prepared: &PreparedChatRequest, |
| ) -> Result<ChatExecutionResult, MarisError> { |
| let bridge = PythonBridge::new(); |
| match prepared.route.capability.as_str() { |
| "code_generation" => { |
| let (summary, handoff) = handle_code_handoff( |
| &bridge, |
| &prepared.message, |
| prepared.fallback_model.as_deref(), |
| ) |
| .await?; |
| let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata(); |
| Ok(ChatExecutionResult { |
| response: summary, |
| model: fallback_chat_model(&prepared.route.capability), |
| handoff: Some(handoff), |
| vision_context: None, |
| detected_emotion, |
| emotion_confidence, |
| response_style, |
| persona_id: None, |
| persona_title: None, |
| persona_summary: None, |
| request_id: None, |
| session_id: None, |
| latency_ms: None, |
| tokens_used: None, |
| prompt_messages: None, |
| memory_matches: None, |
| tool_trace: None, |
| }) |
| } |
| "autonomous_tasks" => { |
| let (summary, handoff) = handle_autonomous_handoff(&bridge, prepared).await?; |
| let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata(); |
| Ok(ChatExecutionResult { |
| response: summary, |
| model: fallback_chat_model(&prepared.route.capability), |
| handoff: Some(handoff), |
| vision_context: None, |
| detected_emotion, |
| emotion_confidence, |
| response_style, |
| persona_id: prepared.persona_id.clone(), |
| persona_title: None, |
| persona_summary: None, |
| request_id: None, |
| session_id: prepared.session_id.clone(), |
| latency_ms: None, |
| tokens_used: None, |
| prompt_messages: None, |
| memory_matches: None, |
| tool_trace: None, |
| }) |
| } |
| "image_generation" => { |
| let (summary, handoff) = handle_image_handoff(&bridge, &prepared.message).await?; |
| let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata(); |
| Ok(ChatExecutionResult { |
| response: summary, |
| model: fallback_chat_model(&prepared.route.capability), |
| handoff: Some(handoff), |
| vision_context: None, |
| detected_emotion, |
| emotion_confidence, |
| response_style, |
| persona_id: None, |
| persona_title: None, |
| persona_summary: None, |
| request_id: None, |
| session_id: None, |
| latency_ms: None, |
| tokens_used: None, |
| prompt_messages: None, |
| memory_matches: None, |
| tool_trace: None, |
| }) |
| } |
| "music_generation" => { |
| let (summary, handoff) = handle_music_handoff(&bridge, &prepared.message).await?; |
| let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata(); |
| Ok(ChatExecutionResult { |
| response: summary, |
| model: fallback_chat_model(&prepared.route.capability), |
| handoff: Some(handoff), |
| vision_context: None, |
| detected_emotion, |
| emotion_confidence, |
| response_style, |
| persona_id: None, |
| persona_title: None, |
| persona_summary: None, |
| request_id: None, |
| session_id: None, |
| latency_ms: None, |
| tokens_used: None, |
| prompt_messages: None, |
| memory_matches: None, |
| tool_trace: None, |
| }) |
| } |
| "video_generation" => { |
| let (summary, handoff) = handle_video_handoff(&bridge, &prepared.message).await?; |
| let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata(); |
| Ok(ChatExecutionResult { |
| response: summary, |
| model: fallback_chat_model(&prepared.route.capability), |
| handoff: Some(handoff), |
| vision_context: None, |
| detected_emotion, |
| emotion_confidence, |
| response_style, |
| persona_id: None, |
| persona_title: None, |
| persona_summary: None, |
| request_id: None, |
| session_id: None, |
| latency_ms: None, |
| tokens_used: None, |
| prompt_messages: None, |
| memory_matches: None, |
| tool_trace: None, |
| }) |
| } |
| _ => { |
| let payload = build_text_generation_payload(prepared); |
|
|
| let result = bridge |
| .call::<TextGenerationPayload>("text/generate", &payload) |
| .await |
| .map_err(|err| { |
| error!("Chat bridge call failed: {err}"); |
| MarisError::Bridge( |
| "Čata serviss šobrīd nav pieejams. Lūdzu mēģiniet vēlreiz.".to_string(), |
| ) |
| })?; |
| Ok(ChatExecutionResult { |
| response: result.response, |
| model: if result.model.is_empty() { |
| fallback_chat_model(&prepared.route.capability) |
| } else { |
| result.model |
| }, |
| handoff: None, |
| vision_context: prepared.vision_context.clone(), |
| detected_emotion: result.detected_emotion, |
| emotion_confidence: result.emotion_confidence, |
| response_style: result.response_style, |
| persona_id: (!result.persona_id.is_empty()).then_some(result.persona_id), |
| persona_title: (!result.persona_title.is_empty()).then_some(result.persona_title), |
| persona_summary: (!result.persona_summary.is_empty()) |
| .then_some(result.persona_summary), |
| request_id: (!result.request_id.is_empty()).then_some(result.request_id), |
| session_id: (!result.session_id.is_empty()).then_some(result.session_id), |
| latency_ms: (result.latency_ms > 0).then_some(result.latency_ms), |
| tokens_used: (result.tokens_used > 0).then_some(result.tokens_used), |
| prompt_messages: (result.prompt_messages > 0).then_some(result.prompt_messages), |
| memory_matches: (result.memory_matches > 0).then_some(result.memory_matches), |
| tool_trace: result.tool_trace, |
| }) |
| } |
| } |
| } |
|
|
| fn build_text_generation_payload(prepared: &PreparedChatRequest) -> serde_json::Value { |
| serde_json::json!({ |
| "message": &prepared.message, |
| "history": &prepared.history, |
| "profile": &prepared.route.profile, |
| "persona_id": &prepared.persona_id, |
| "session_id": &prepared.session_id, |
| "fallback_model": &prepared.fallback_model, |
| "vision_context": prepared.vision_context.as_ref().map(|context| { |
| serde_json::json!({ |
| "summary": &context.summary, |
| "source": &context.source, |
| "model": &context.model, |
| "fallback_used": context.fallback_used, |
| "camera_id": &context.camera_id, |
| "image_url": &context.image_url, |
| "width": context.width, |
| "height": context.height, |
| "detections": context.detections, |
| }) |
| }), |
| }) |
| } |
|
|
| fn parse_core_text_stream_event( |
| raw_event: &str, |
| ) -> Result<Option<CoreTextStreamEvent>, MarisError> { |
| let normalized = if raw_event.contains("\r\n") { |
| Cow::Owned(raw_event.replace("\r\n", "\n")) |
| } else { |
| Cow::Borrowed(raw_event) |
| }; |
| let trimmed = normalized.trim(); |
| if trimmed.is_empty() || trimmed.starts_with(':') { |
| return Ok(None); |
| } |
|
|
| let mut event_name = "message"; |
| let mut data_lines = Vec::new(); |
|
|
| for line in normalized.lines() { |
| if let Some(value) = line.strip_prefix("event:") { |
| event_name = value.trim(); |
| continue; |
| } |
| if let Some(value) = line.strip_prefix("data:") { |
| data_lines.push(value.trim_start()); |
| } |
| } |
|
|
| if data_lines.is_empty() { |
| return Ok(None); |
| } |
|
|
| let payload = data_lines.join("\n"); |
| match event_name { |
| "delta" => { |
| let parsed: CoreTextDeltaPayload = serde_json::from_str(&payload).map_err(|err| { |
| MarisError::Bridge(format!("Neizdevās parsēt core text delta eventu: {err}")) |
| })?; |
| Ok(parsed.delta.map(CoreTextStreamEvent::Delta)) |
| } |
| "complete" => { |
| let parsed: TextGenerationPayload = serde_json::from_str(&payload).map_err(|err| { |
| MarisError::Bridge(format!("Neizdevās parsēt core text complete eventu: {err}")) |
| })?; |
| Ok(Some(CoreTextStreamEvent::Complete(Box::new( |
| parsed.validate().map_err(|err| { |
| MarisError::Bridge(format!("Core text stream atgrieza nederīgu payload: {err}")) |
| })?, |
| )))) |
| } |
| "error" => { |
| let parsed: CoreTextErrorPayload = serde_json::from_str(&payload).map_err(|err| { |
| MarisError::Bridge(format!("Neizdevās parsēt core text error eventu: {err}")) |
| })?; |
| Ok(Some(CoreTextStreamEvent::Error( |
| parsed |
| .error |
| .unwrap_or_else(|| "Core text stream kļūda.".to_string()), |
| ))) |
| } |
| _ => Ok(None), |
| } |
| } |
|
|
| fn take_next_core_text_stream_frame(buffer: &mut String) -> Option<String> { |
| if buffer.contains("\r\n") { |
| *buffer = buffer.replace("\r\n", "\n"); |
| } |
| if buffer.contains('\r') { |
| *buffer = buffer.replace('\r', "\n"); |
| } |
|
|
| let index = buffer.find("\n\n")?; |
| let raw_event = buffer[..index].to_string(); |
| *buffer = buffer[index + 2..].to_string(); |
| Some(raw_event) |
| } |
|
|
| fn build_stream_text_chat_result( |
| prepared: &PreparedChatRequest, |
| result: TextGenerationPayload, |
| ) -> ChatExecutionResult { |
| ChatExecutionResult { |
| response: result.response, |
| model: if result.model.is_empty() { |
| fallback_chat_model(&prepared.route.capability) |
| } else { |
| result.model |
| }, |
| handoff: None, |
| vision_context: prepared.vision_context.clone(), |
| detected_emotion: result.detected_emotion, |
| emotion_confidence: result.emotion_confidence, |
| response_style: result.response_style, |
| persona_id: (!result.persona_id.is_empty()).then_some(result.persona_id), |
| persona_title: (!result.persona_title.is_empty()).then_some(result.persona_title), |
| persona_summary: (!result.persona_summary.is_empty()).then_some(result.persona_summary), |
| request_id: (!result.request_id.is_empty()).then_some(result.request_id), |
| session_id: (!result.session_id.is_empty()).then_some(result.session_id), |
| latency_ms: (result.latency_ms > 0).then_some(result.latency_ms), |
| tokens_used: (result.tokens_used > 0).then_some(result.tokens_used), |
| prompt_messages: (result.prompt_messages > 0).then_some(result.prompt_messages), |
| memory_matches: (result.memory_matches > 0).then_some(result.memory_matches), |
| tool_trace: result.tool_trace, |
| } |
| } |
|
|
| async fn stream_text_chat_response( |
| bridge: &PythonBridge, |
| prepared: &PreparedChatRequest, |
| tx: &mpsc::Sender<Result<Event, Infallible>>, |
| ) -> Result<Option<ChatExecutionResult>, MarisError> { |
| let payload = build_text_generation_payload(prepared); |
| let response = bridge |
| .stream("text/generate/stream", &payload) |
| .await |
| .map_err(|err| { |
| error!("Chat bridge stream call failed: {err}"); |
| MarisError::Bridge( |
| "Čata serviss šobrīd nav pieejams. Lūdzu mēģiniet vēlreiz.".to_string(), |
| ) |
| })?; |
|
|
| let mut stream = response.bytes_stream(); |
| let mut buffer = String::new(); |
|
|
| while let Some(next_chunk) = tokio::select! { |
| _ = tx.closed() => return Ok(None), |
| next = stream.next() => next, |
| } { |
| let chunk = next_chunk.map_err(|err| { |
| MarisError::Bridge(format!("Neizdevās nolasīt core text straumi: {err}")) |
| })?; |
| buffer.push_str(&String::from_utf8_lossy(&chunk)); |
|
|
| while let Some(raw_event) = take_next_core_text_stream_frame(&mut buffer) { |
| match parse_core_text_stream_event(&raw_event)? { |
| Some(CoreTextStreamEvent::Delta(delta)) => { |
| if !try_send_stream_event( |
| tx, |
| build_sse_event("delta", &serde_json::json!({ "delta": delta })), |
| ) |
| .await |
| { |
| return Ok(None); |
| } |
| } |
| Some(CoreTextStreamEvent::Complete(payload)) => { |
| return Ok(Some(build_stream_text_chat_result(prepared, *payload))); |
| } |
| Some(CoreTextStreamEvent::Error(message)) => { |
| return Err(MarisError::Bridge(message)); |
| } |
| None => {} |
| } |
| } |
| } |
|
|
| if !buffer.trim().is_empty() { |
| match parse_core_text_stream_event(&buffer)? { |
| Some(CoreTextStreamEvent::Delta(delta)) => { |
| if !try_send_stream_event( |
| tx, |
| build_sse_event("delta", &serde_json::json!({ "delta": delta })), |
| ) |
| .await |
| { |
| return Ok(None); |
| } |
| } |
| Some(CoreTextStreamEvent::Complete(payload)) => { |
| return Ok(Some(build_stream_text_chat_result(prepared, *payload))); |
| } |
| Some(CoreTextStreamEvent::Error(message)) => { |
| return Err(MarisError::Bridge(message)); |
| } |
| None => {} |
| } |
| } |
|
|
| Err(MarisError::Bridge( |
| "Core text straume noslēdzās bez final payload.".to_string(), |
| )) |
| } |
|
|
| fn build_chat_response( |
| route: OrchestrationRoutePayload, |
| chat_result: ChatExecutionResult, |
| ) -> ChatResponse { |
| ChatResponse { |
| response: chat_result.response, |
| model: chat_result.model, |
| route: ChatRouteInfo { |
| capability: route.capability, |
| branch: route.branch, |
| profile: route.profile, |
| studio: route.target_studio, |
| reasoning: route.reasoning, |
| confidence: route.confidence, |
| }, |
| handoff: chat_result.handoff, |
| vision_context: chat_result.vision_context, |
| detected_emotion: chat_result.detected_emotion, |
| emotion_confidence: chat_result.emotion_confidence, |
| response_style: chat_result.response_style, |
| persona_id: chat_result.persona_id, |
| persona_title: chat_result.persona_title, |
| persona_summary: chat_result.persona_summary, |
| request_id: chat_result.request_id, |
| session_id: chat_result.session_id, |
| latency_ms: chat_result.latency_ms, |
| tokens_used: chat_result.tokens_used, |
| prompt_messages: chat_result.prompt_messages, |
| memory_matches: chat_result.memory_matches, |
| tool_trace: chat_result.tool_trace, |
| } |
| } |
|
|
| fn build_sse_event<T: Serialize>(event_name: &str, payload: &T) -> Event { |
| Event::default() |
| .event(event_name) |
| .data(serde_json::to_string(payload).unwrap_or_else(|_| "{}".to_string())) |
| } |
|
|
| fn build_status_event(stage: &str, message: &str) -> Event { |
| build_sse_event( |
| "status", |
| &ChatStatusPayload { |
| stage: stage.to_string(), |
| message: message.to_string(), |
| }, |
| ) |
| } |
|
|
| async fn try_send_stream_event(tx: &mpsc::Sender<Result<Event, Infallible>>, event: Event) -> bool { |
| tx.send(Ok(event)).await.is_ok() |
| } |
|
|
| fn chunk_response_for_streaming(text: &str) -> Vec<String> { |
| let mut chunks = Vec::new(); |
| let mut current = String::new(); |
| let mut last_boundary = None; |
|
|
| for ch in text.chars() { |
| current.push(ch); |
| if ch.is_whitespace() { |
| last_boundary = Some(current.len()); |
| } |
| if current.len() >= CHAT_STREAM_CHUNK_CHARS { |
| if let Some(boundary) = last_boundary |
| .take() |
| .filter(|boundary| *boundary < current.len()) |
| { |
| let remainder = current.split_off(boundary); |
| chunks.push(std::mem::take(&mut current)); |
| current = remainder; |
| } else { |
| chunks.push(std::mem::take(&mut current)); |
| } |
| } |
| } |
|
|
| if !current.is_empty() { |
| chunks.push(current); |
| } |
|
|
| chunks |
| } |
|
|
| fn build_stream_delta_events(response: &str) -> VecDeque<Event> { |
| chunk_response_for_streaming(response) |
| .into_iter() |
| .map(|delta| build_sse_event("delta", &serde_json::json!({ "delta": delta }))) |
| .collect() |
| } |
|
|
| pub async fn chat(Json(req): Json<ChatRequest>) -> Result<Json<ChatResponse>, MarisError> { |
| let prepared = prepare_chat_request(req).await?; |
| let chat_result = execute_prepared_chat(&prepared).await?; |
| Ok(Json(build_chat_response(prepared.route, chat_result))) |
| } |
|
|
| pub async fn chat_stream( |
| Json(req): Json<ChatRequest>, |
| ) -> Result<Sse<impl futures_util::Stream<Item = Result<Event, Infallible>>>, MarisError> { |
| let message = normalize_message(&req.message)?; |
| let (tx, rx) = mpsc::channel::<Result<Event, Infallible>>(32); |
| let history = req.history.clone().unwrap_or_default(); |
|
|
| tokio::spawn(async move { |
| let bridge = PythonBridge::new(); |
| let session_id = normalize_optional_text(req.session_id.clone()); |
| let persona_id = normalize_optional_text(req.persona_id.clone()); |
| let autonomous_mode = is_autonomous_mode_enabled(req.autonomous_mode); |
| let fallback_model = normalize_optional_text(req.fallback_model.clone()); |
|
|
| if !try_send_stream_event( |
| &tx, |
| build_status_event("presence", "Savienojums izveidots — Maris ir klāt."), |
| ) |
| .await |
| { |
| return; |
| } |
|
|
| let has_vision_input = req.image_url.is_some() |
| || req.image_base64.is_some() |
| || req |
| .camera_id |
| .as_ref() |
| .is_some_and(|camera_id| !camera_id.trim().is_empty()); |
|
|
| if has_vision_input |
| && !try_send_stream_event( |
| &tx, |
| build_status_event("vision", "Analizēju vizuālo kontekstu."), |
| ) |
| .await |
| { |
| return; |
| } |
|
|
| let vision_context = match analyze_vision_context(&bridge, &req).await { |
| Ok(vision_context) => vision_context, |
| Err(err) => { |
| let _ = try_send_stream_event( |
| &tx, |
| build_sse_event("error", &serde_json::json!({ "error": err.to_string() })), |
| ) |
| .await; |
| return; |
| } |
| }; |
|
|
| if let Some(vision_context) = vision_context.as_ref() { |
| if !try_send_stream_event(&tx, build_sse_event("vision_context", vision_context)).await |
| { |
| return; |
| } |
| } |
|
|
| if !try_send_stream_event( |
| &tx, |
| build_status_event("routing", "Izvēlos labāko atbildes režīmu."), |
| ) |
| .await |
| { |
| return; |
| } |
|
|
| let route = resolve_chat_route( |
| &bridge, |
| &message, |
| session_id.as_deref(), |
| persona_id.as_deref(), |
| vision_context.is_some(), |
| ) |
| .await; |
| let route = if autonomous_mode { |
| force_autonomous_route(route, session_id.as_deref(), persona_id.as_deref()) |
| } else { |
| route |
| }; |
|
|
| if !try_send_stream_event(&tx, build_sse_event("route", &route)).await { |
| return; |
| } |
|
|
| if !try_send_stream_event( |
| &tx, |
| build_status_event("generation", generation_status_message(&route.capability)), |
| ) |
| .await |
| { |
| return; |
| } |
|
|
| let prepared = PreparedChatRequest { |
| message, |
| history, |
| session_id, |
| persona_id, |
| fallback_model, |
| route, |
| vision_context, |
| }; |
|
|
| if prepared.route.capability == "text_chat" { |
| match stream_text_chat_response(&bridge, &prepared, &tx).await { |
| Ok(Some(chat_result)) => { |
| let response = build_chat_response(prepared.route, chat_result); |
| let _ = |
| try_send_stream_event(&tx, build_sse_event("complete", &response)).await; |
| } |
| Ok(None) => {} |
| Err(err) => { |
| let _ = try_send_stream_event( |
| &tx, |
| build_sse_event("error", &serde_json::json!({ "error": err.to_string() })), |
| ) |
| .await; |
| } |
| } |
| return; |
| } |
|
|
| match execute_prepared_chat(&prepared).await { |
| Ok(chat_result) => { |
| let response = build_chat_response(prepared.route, chat_result); |
| for delta in build_stream_delta_events(&response.response) { |
| if !try_send_stream_event(&tx, delta).await { |
| return; |
| } |
| tokio::time::sleep(Duration::from_millis(CHAT_STREAM_INTERVAL_MS)).await; |
| } |
| let _ = try_send_stream_event(&tx, build_sse_event("complete", &response)).await; |
| } |
| Err(err) => { |
| let _ = try_send_stream_event( |
| &tx, |
| build_sse_event("error", &serde_json::json!({ "error": err.to_string() })), |
| ) |
| .await; |
| } |
| } |
| }); |
|
|
| let stream = stream::unfold(rx, |mut rx| async move { |
| rx.recv().await.map(|event| (event, rx)) |
| }); |
|
|
| Ok(Sse::new(stream).keep_alive( |
| KeepAlive::new() |
| .interval(Duration::from_secs(10)) |
| .text("keepalive"), |
| )) |
| } |
|
|
| #[cfg(test)] |
| mod tests { |
| use super::{ |
| chunk_response_for_streaming, generation_status_message, normalize_message, |
| parse_core_text_stream_event, take_next_core_text_stream_frame, CoreTextStreamEvent, |
| }; |
| use crate::utils::errors::MarisError; |
|
|
| #[test] |
| fn rejects_empty_messages() { |
| let err = normalize_message(" ").unwrap_err(); |
| assert!(matches!(err, MarisError::Validation(_))); |
| assert_eq!(err.to_string(), "Derīguma kļūda: Ziņa nedrīkst būt tukša."); |
| } |
|
|
| #[test] |
| fn trims_valid_messages() { |
| assert_eq!( |
| normalize_message(" Sveiki, Maris! ").unwrap(), |
| "Sveiki, Maris!" |
| ); |
| } |
|
|
| #[test] |
| fn chunks_streaming_response_without_losing_text() { |
| let text = |
| "Šī ir garāka atbilde ar vairākiem vārdiem, lai pārbaudītu straumēšanas chunkus."; |
| let chunks = chunk_response_for_streaming(text); |
| assert!(chunks.len() >= 2); |
| assert_eq!(chunks.concat(), text); |
| } |
|
|
| #[test] |
| fn maps_generation_status_by_capability() { |
| assert_eq!( |
| generation_status_message("code_generation"), |
| "Veidoju koda draftu." |
| ); |
| assert_eq!( |
| generation_status_message("text_chat"), |
| "Domāju un veidoju atbildi." |
| ); |
| } |
|
|
| #[test] |
| fn parses_core_text_delta_event() { |
| let event = parse_core_text_stream_event("event: delta\ndata: {\"delta\":\"Sveiki\"}\n") |
| .unwrap() |
| .unwrap(); |
|
|
| match event { |
| CoreTextStreamEvent::Delta(delta) => assert_eq!(delta, "Sveiki"), |
| _ => panic!("expected delta event"), |
| } |
| } |
|
|
| #[test] |
| fn parses_core_text_complete_event() { |
| let event = parse_core_text_stream_event( |
| "event: complete\ndata: {\"response\":\"Sveiki\",\"model\":\"m\",\"detected_emotion\":\"neutral\",\"emotion_confidence\":0.1,\"response_style\":\"clear_grounded\"}\n", |
| ) |
| .unwrap() |
| .unwrap(); |
|
|
| match event { |
| CoreTextStreamEvent::Complete(payload) => { |
| assert_eq!(payload.response, "Sveiki"); |
| assert_eq!(payload.model, "m"); |
| } |
| _ => panic!("expected complete event"), |
| } |
| } |
|
|
| #[test] |
| fn extracts_multiple_core_text_events_with_crlf_delimiters() { |
| let mut buffer = concat!( |
| "event: delta\r\n", |
| "data: {\"delta\":\"Sveiki\"}\r\n\r\n", |
| "event: complete\r\n", |
| "data: {\"response\":\"Sveiki\",\"model\":\"m\",\"detected_emotion\":\"neutral\",\"emotion_confidence\":0.1,\"response_style\":\"clear_grounded\"}\r\n\r\n" |
| ) |
| .to_string(); |
|
|
| let first = take_next_core_text_stream_frame(&mut buffer).expect("first event"); |
| let second = take_next_core_text_stream_frame(&mut buffer).expect("second event"); |
|
|
| assert!(buffer.is_empty()); |
| assert!(matches!( |
| parse_core_text_stream_event(&first).unwrap(), |
| Some(CoreTextStreamEvent::Delta(delta)) if delta == "Sveiki" |
| )); |
| assert!(matches!( |
| parse_core_text_stream_event(&second).unwrap(), |
| Some(CoreTextStreamEvent::Complete(payload)) if payload.response == "Sveiki" |
| )); |
| } |
| } |
|
|