MarisUK's picture
Maris AI model sync
f440f03 verified
use std::{
borrow::Cow, collections::VecDeque, convert::Infallible, env, path::PathBuf, time::Duration,
};
use axum::{
response::sse::{Event, KeepAlive, Sse},
Json,
};
use futures_util::{stream, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing::{error, warn};
use crate::inference::python_bridge::PythonBridge;
use crate::inference::response_validator::{
AutonomousSessionPayload, CodeGenerationPayload, ImageGenerationPayload, LiveSnapshotPayload,
MusicGenerationPayload, OrchestrationRoutePayload, ProjectFilePayload, SessionContextPayload,
TextGenerationPayload, ToolTracePayload, ValidateCoreResponse, VideoGenerationPayload,
VisionAnalysisPayload,
};
use crate::utils::errors::MarisError;
// Keep chunks small enough for a natural "typing" feel without splitting almost
// every word into its own network event.
const CHAT_STREAM_CHUNK_CHARS: usize = 48;
// A short pause makes the SSE output feel live while keeping the total response
// time fast for already-generated backend results.
const CHAT_STREAM_INTERVAL_MS: u64 = 24;
#[derive(Clone, Deserialize)]
pub struct ChatRequest {
pub message: String,
pub history: Option<Vec<HistoryMessage>>,
pub session_id: Option<String>,
pub persona_id: Option<String>,
pub autonomous_mode: Option<bool>,
pub fallback_model: Option<String>,
pub image_url: Option<String>,
pub image_base64: Option<String>,
pub camera_id: Option<String>,
pub confidence_threshold: Option<f32>,
}
#[derive(Clone, Deserialize, Serialize)]
pub struct HistoryMessage {
pub role: String,
pub content: String,
}
#[derive(Clone, Serialize)]
pub struct ChatAutonomousTaskPayload {
pub id: String,
pub description: String,
pub status: String,
pub tool: String,
pub result: Option<String>,
}
#[derive(Clone, Serialize)]
pub struct ChatProjectFilePayload {
pub path: String,
pub absolute_path: Option<String>,
}
#[derive(Clone, Serialize)]
pub struct ChatRouteInfo {
pub capability: String,
pub branch: String,
pub profile: String,
pub studio: String,
pub reasoning: String,
pub confidence: f32,
}
#[derive(Clone, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ChatHandoffPayload {
Code {
code: String,
explanation: String,
language: String,
detected_stack: String,
files: Vec<ChatProjectFilePayload>,
workspace_artifact_dir: Option<String>,
bundle_download_url: Option<String>,
entrypoint: Option<String>,
repo_path: Option<String>,
},
Image {
image_url: String,
prompt: String,
},
Music {
audio_url: String,
title: String,
duration_seconds: u32,
},
Video {
video_url: String,
duration_seconds: u32,
prompt: String,
},
Autonomous {
session_id: String,
status: String,
progress_percent: u32,
persona_id: String,
persona_title: String,
tasks: Vec<ChatAutonomousTaskPayload>,
},
}
#[derive(Clone, Serialize)]
pub struct ChatResponse {
pub response: String,
pub model: String,
pub route: ChatRouteInfo,
pub handoff: Option<ChatHandoffPayload>,
pub vision_context: Option<ChatVisionContext>,
pub detected_emotion: String,
pub emotion_confidence: f32,
pub response_style: String,
pub persona_id: Option<String>,
pub persona_title: Option<String>,
pub persona_summary: Option<String>,
pub request_id: Option<String>,
pub session_id: Option<String>,
pub latency_ms: Option<u32>,
pub tokens_used: Option<u32>,
pub prompt_messages: Option<u32>,
pub memory_matches: Option<u32>,
pub tool_trace: Option<ToolTracePayload>,
}
#[derive(Clone, Serialize)]
pub struct ChatVisionContext {
pub summary: String,
pub source: String,
pub model: String,
pub fallback_used: bool,
pub camera_id: Option<String>,
pub image_url: Option<String>,
pub width: u32,
pub height: u32,
pub detections: u32,
}
#[derive(Clone, Serialize)]
pub struct ChatStatusPayload {
pub stage: String,
pub message: String,
}
struct ChatExecutionResult {
response: String,
model: String,
handoff: Option<ChatHandoffPayload>,
vision_context: Option<ChatVisionContext>,
detected_emotion: String,
emotion_confidence: f32,
response_style: String,
persona_id: Option<String>,
persona_title: Option<String>,
persona_summary: Option<String>,
request_id: Option<String>,
session_id: Option<String>,
latency_ms: Option<u32>,
tokens_used: Option<u32>,
prompt_messages: Option<u32>,
memory_matches: Option<u32>,
tool_trace: Option<ToolTracePayload>,
}
struct PreparedChatRequest {
message: String,
history: Vec<HistoryMessage>,
session_id: Option<String>,
persona_id: Option<String>,
fallback_model: Option<String>,
route: OrchestrationRoutePayload,
vision_context: Option<ChatVisionContext>,
}
/// Resolved visual input for `/api/chat` after normalizing direct uploads or
/// dereferencing a live camera into its latest snapshot payload.
///
/// `image_url` is populated for URL-based uploads or resolved camera snapshots,
/// `image_base64` is populated for direct file uploads, and `camera_id` is only
/// carried through when the source originated from a camera-aware request.
struct ResolvedVisionInput {
source: String,
image_url: Option<String>,
image_base64: Option<String>,
camera_id: Option<String>,
}
#[derive(Debug, Deserialize)]
struct CoreTextDeltaPayload {
delta: Option<String>,
}
#[derive(Debug, Deserialize)]
struct CoreTextErrorPayload {
error: Option<String>,
}
enum CoreTextStreamEvent {
Delta(String),
Complete(Box<TextGenerationPayload>),
Error(String),
}
fn fallback_chat_model(capability: &str) -> String {
match capability {
"code_generation" => "Maris code handoff".to_string(),
"image_generation" => "Maris image handoff".to_string(),
"music_generation" => "Maris music handoff".to_string(),
"video_generation" => "Maris video handoff".to_string(),
"vision_analysis" => "Maris vision handoff".to_string(),
_ => "Maris core text model".to_string(),
}
}
fn default_emotion_metadata() -> (String, f32, String) {
("neutral".to_string(), 0.0, "clear_grounded".to_string())
}
fn default_route() -> OrchestrationRoutePayload {
OrchestrationRoutePayload {
capability: "text_chat".to_string(),
branch: "master".to_string(),
profile: "general".to_string(),
target_endpoint: "text/generate".to_string(),
target_studio: "/chat".to_string(),
reasoning: "Izmanto noklusējuma sarunu atzaru, jo routing serviss nav pieejams."
.to_string(),
confidence: 0.5,
session_context: SessionContextPayload::default(),
}
}
fn generation_status_message(capability: &str) -> &'static str {
match capability {
"code_generation" => "Veidoju koda draftu.",
"autonomous_tasks" => "Sāku izpildāmu uzdevumu plūsmu.",
"image_generation" => "Salieku attēla handoff.",
"music_generation" => "Komponēju audio handoff.",
"video_generation" => "Sagatavoju video handoff.",
_ => "Domāju un veidoju atbildi.",
}
}
fn infer_code_language(message: &str) -> String {
let normalized = message.to_lowercase();
if normalized.contains("next.js") || normalized.contains("nextjs") {
return "Next.js (TypeScript)".to_string();
}
if normalized.contains("react") {
return "React (TypeScript)".to_string();
}
if normalized.contains("typescript") {
return "TypeScript".to_string();
}
if normalized.contains("javascript") || normalized.contains("node") {
return "JavaScript".to_string();
}
if normalized.contains("rust") {
return "Rust".to_string();
}
if normalized.contains("sql") {
return "SQL".to_string();
}
if normalized.contains("html")
|| normalized.contains("css")
|| normalized.contains("ui")
|| normalized.contains("frontend")
|| normalized.contains("landing page")
|| normalized.contains("web app")
|| normalized.contains("web-app")
|| normalized.contains("kalkulator")
|| normalized.contains("calculator")
{
return "HTML/CSS/JavaScript".to_string();
}
"Python".to_string()
}
fn build_code_context(message: &str, language: &str) -> String {
let normalized = message.to_lowercase();
if language == "Next.js (TypeScript)" {
return "Atdod pilnu Next.js/TypeScript feature vai app router artefaktu ar pareizu failu struktūru, nevis tikai vienu komponenti.".to_string();
}
if language == "HTML/CSS/JavaScript" {
return "Atdod vienu pilnīgu, uzreiz pārlūkā palaižamu artefaktu ar visu markup, stilu un loģiku bez TODO placeholderiem.".to_string();
}
if language == "React (TypeScript)" {
return "Atdod pilnu React/TypeScript komponenti vai mazu feature moduli ar gatavu UI loģiku, nevis tikai skeletu.".to_string();
}
if normalized.contains("salabo")
|| normalized.contains("labo")
|| normalized.contains("uzlabo")
|| normalized.contains("improve")
|| normalized.contains("fix")
|| normalized.contains("refactor")
{
return "Ja pieprasījums ir par labojumu vai uzlabojumu, atdod gala kodu ar ielabotu rezultātu, nevis tikai komentāru par pieeju.".to_string();
}
String::new()
}
fn is_repo_edit_request(message: &str) -> bool {
let normalized = message.to_lowercase();
[
"edit", "modify", "update", "refactor", "fix", "patch", "rewrite", "change", "labo",
"salabo", "rediģē", "uzlabo", "esoš",
]
.iter()
.any(|keyword| normalized.contains(keyword))
}
fn infer_repo_root() -> Option<String> {
let mut current = env::current_dir().ok()?;
loop {
if current.join(".git").exists()
|| (current.join("backend-rust").is_dir()
&& current.join("core-python").is_dir()
&& current.join("frontend").is_dir())
{
return Some(current.to_string_lossy().into_owned());
}
if !current.pop() {
break;
}
}
None
}
fn infer_repo_path(message: &str) -> Option<String> {
for token in message.split_whitespace() {
let trimmed = token.trim_matches(|ch: char| {
matches!(
ch,
'"' | '\'' | '`' | ',' | '.' | ';' | ':' | '(' | ')' | '[' | ']' | '{' | '}'
)
});
if trimmed.is_empty() {
continue;
}
let candidate = PathBuf::from(trimmed);
let resolved =
if candidate.is_absolute() || trimmed.starts_with("./") || trimmed.starts_with("../") {
candidate.canonicalize().ok()
} else {
None
};
let Some(path) = resolved else {
continue;
};
if path.is_dir() {
return Some(path.to_string_lossy().into_owned());
}
if let Some(parent) = path.parent() {
return Some(parent.to_string_lossy().into_owned());
}
}
if is_repo_edit_request(message) {
return infer_repo_root();
}
None
}
fn bundle_download_url(workspace_artifact_dir: Option<&str>) -> Option<String> {
workspace_artifact_dir.map(|artifact_dir| {
format!(
"/api/code/download?artifact_dir={}",
urlencoding::encode(artifact_dir)
)
})
}
fn normalize_message(message: &str) -> Result<String, MarisError> {
let trimmed = message.trim();
if trimmed.is_empty() {
return Err(MarisError::Validation(
"Ziņa nedrīkst būt tukša.".to_string(),
));
}
Ok(trimmed.to_string())
}
fn normalize_optional_text(value: Option<String>) -> Option<String> {
value.and_then(|item| {
let trimmed = item.trim().to_string();
if trimmed.is_empty() {
None
} else {
Some(trimmed)
}
})
}
fn is_autonomous_mode_enabled(value: Option<bool>) -> bool {
value.unwrap_or(false)
}
fn force_autonomous_route(
route: OrchestrationRoutePayload,
session_id: Option<&str>,
persona_id: Option<&str>,
) -> OrchestrationRoutePayload {
let mut forced = route;
forced.capability = "autonomous_tasks".to_string();
forced.branch = "planner".to_string();
forced.profile = "planner".to_string();
forced.target_endpoint = "autonomous/start".to_string();
forced.target_studio = "/autonomous".to_string();
forced.reasoning = "Autonomous chat mode ir ieslēgts, tāpēc pieprasījums tiek izpildīts caur planner aģenta plūsmu.".to_string();
forced.confidence = 1.0;
if let Some(session_id) = session_id {
forced.session_context.session_id = session_id.to_string();
}
if let Some(persona_id) = persona_id {
forced.session_context.persona_id = persona_id.to_string();
}
forced
}
async fn resolve_vision_input(
bridge: &PythonBridge,
req: &ChatRequest,
) -> Result<Option<ResolvedVisionInput>, MarisError> {
let image_url = normalize_optional_text(req.image_url.clone());
let image_base64 = normalize_optional_text(req.image_base64.clone());
let camera_id = normalize_optional_text(req.camera_id.clone());
// Keep this early guard in Rust so `/api/chat` returns a clear validation
// error before making any downstream bridge calls into the vision service.
if image_url.is_some() && image_base64.is_some() {
return Err(MarisError::Validation(
"Norādi tikai vienu no image_url vai image_base64.".to_string(),
));
}
if image_url.is_some() || image_base64.is_some() {
return Ok(Some(ResolvedVisionInput {
source: "upload".to_string(),
image_url,
image_base64,
camera_id,
}));
}
let Some(camera_id) = camera_id else {
return Ok(None);
};
let endpoint = format!("vision/live/{camera_id}/snapshot");
let snapshot = bridge
.get::<LiveSnapshotPayload>(&endpoint)
.await
.map_err(|err| {
MarisError::Bridge(format!(
"Neizdevās ielādēt snapshot kamerai {camera_id}: {err}"
))
})?;
let snapshot_data_url = snapshot.snapshot_data_url.ok_or_else(|| {
MarisError::Validation(format!(
"Kamerai {camera_id} pašlaik nav pieejams snapshot čatam."
))
})?;
Ok(Some(ResolvedVisionInput {
source: "camera_snapshot".to_string(),
image_url: Some(snapshot_data_url),
image_base64: None,
camera_id: Some(camera_id),
}))
}
async fn analyze_vision_context(
bridge: &PythonBridge,
req: &ChatRequest,
) -> Result<Option<ChatVisionContext>, MarisError> {
let Some(input) = resolve_vision_input(bridge, req).await? else {
return Ok(None);
};
let payload = serde_json::json!({
"image_url": input.image_url,
"image_base64": input.image_base64,
"camera_id": input.camera_id,
"session_id": req.session_id,
"confidence_threshold": req.confidence_threshold.unwrap_or(0.25),
});
let result = bridge
.call::<VisionAnalysisPayload>("vision/analyze", &payload)
.await
.map_err(|err| {
error!("Vision analysis failed: {err}");
MarisError::Bridge("Vision analīzes serviss šobrīd nav pieejams.".to_string())
})?;
Ok(Some(ChatVisionContext {
summary: result.summary,
source: input.source,
model: result.model,
fallback_used: result.fallback_used,
camera_id: input.camera_id,
image_url: input.image_url,
width: result.width,
height: result.height,
detections: result.detections.len() as u32,
}))
}
async fn resolve_chat_route(
bridge: &PythonBridge,
message: &str,
session_id: Option<&str>,
persona_id: Option<&str>,
has_vision_input: bool,
) -> OrchestrationRoutePayload {
bridge
.call::<OrchestrationRoutePayload>(
"orchestrator/route",
&serde_json::json!({
"message": message,
"session_id": session_id,
"persona_id": persona_id,
"has_vision_input": has_vision_input,
}),
)
.await
.map_err(|err| {
warn!("Routing bridge call failed, falling back to master route: {err}");
err
})
.unwrap_or_else(|_| default_route())
}
async fn handle_code_handoff(
bridge: &PythonBridge,
message: &str,
fallback_model: Option<&str>,
) -> Result<(String, ChatHandoffPayload), MarisError> {
let language = infer_code_language(message);
let context = build_code_context(message, &language);
let repo_path = infer_repo_path(message);
let result = bridge
.call::<CodeGenerationPayload>(
"code/generate",
&serde_json::json!({
"prompt": message,
"language": language,
"context": context,
"repo_path": repo_path,
"fallback_model": fallback_model,
}),
)
.await
.map_err(|err| {
error!("Code handoff failed: {err}");
MarisError::Bridge("Koda ģenerēšanas serviss šobrīd nav pieejams.".to_string())
})?;
let explanation = if result.explanation.is_empty() {
format!("Sagatavoju {} draftu pēc tava pieprasījuma.", language)
} else {
result.explanation
};
let files = result
.files
.into_iter()
.map(
|ProjectFilePayload {
path,
absolute_path,
..
}| ChatProjectFilePayload {
path,
absolute_path,
},
)
.collect::<Vec<_>>();
let workspace_artifact_dir = result.workspace_artifact_dir.clone();
Ok((
explanation.clone(),
ChatHandoffPayload::Code {
code: result.code,
explanation,
language: result.language,
detected_stack: result.detected_stack,
files,
workspace_artifact_dir,
bundle_download_url: bundle_download_url(result.workspace_artifact_dir.as_deref()),
entrypoint: result.entrypoint,
repo_path: result.repo_path,
},
))
}
async fn handle_autonomous_handoff(
bridge: &PythonBridge,
prepared: &PreparedChatRequest,
) -> Result<(String, ChatHandoffPayload), MarisError> {
let session_id = prepared
.session_id
.clone()
.unwrap_or_else(|| prepared.route.session_context.session_id.clone());
let persona_id = prepared
.persona_id
.clone()
.unwrap_or_else(|| prepared.route.session_context.persona_id.clone());
let result = bridge
.call::<AutonomousSessionPayload>(
"autonomous/start",
&serde_json::json!({
"session_id": session_id,
"goal": prepared.message,
"max_steps": 8,
"persona_id": persona_id,
}),
)
.await
.map_err(|err| {
error!("Autonomous handoff failed: {err}");
MarisError::Bridge("Autonomās izpildes serviss šobrīd nav pieejams.".to_string())
})?;
let tasks = result
.tasks
.iter()
.take(4)
.map(|task| ChatAutonomousTaskPayload {
id: task.id.clone(),
description: task.description.clone(),
status: task.status.clone(),
tool: task.tool.clone(),
result: task.result.clone(),
})
.collect::<Vec<_>>();
let summary = format!(
"Palaidu autonomo izpildi: {} uzdevumi, progress {}%, statuss {}.",
result.tasks.len(),
result.progress_percent,
result.status
);
Ok((
summary,
ChatHandoffPayload::Autonomous {
session_id,
status: result.status,
progress_percent: result.progress_percent,
persona_id: result.persona_id,
persona_title: result.persona_title,
tasks,
},
))
}
async fn handle_image_handoff(
bridge: &PythonBridge,
message: &str,
) -> Result<(String, ChatHandoffPayload), MarisError> {
let result = bridge
.call::<ImageGenerationPayload>(
"images/generate",
&serde_json::json!({
"prompt": message,
"width": 1024,
"height": 1024,
"steps": 30,
}),
)
.await
.map_err(|err| {
error!("Image handoff failed: {err}");
MarisError::Bridge("Attēlu ģenerēšanas serviss šobrīd nav pieejams.".to_string())
})?;
Ok((
"Sagatavoju attēla konceptu pēc tava pieprasījuma.".to_string(),
ChatHandoffPayload::Image {
image_url: result.image_url,
prompt: message.to_string(),
},
))
}
async fn handle_music_handoff(
bridge: &PythonBridge,
message: &str,
) -> Result<(String, ChatHandoffPayload), MarisError> {
let result = bridge
.call::<MusicGenerationPayload>(
"audio/generate_music",
&serde_json::json!({
"prompt": message,
"genre": "pop",
"duration_seconds": 30,
}),
)
.await
.map_err(|err| {
error!("Music handoff failed: {err}");
MarisError::Bridge("Mūzikas ģenerēšanas serviss šobrīd nav pieejams.".to_string())
})?;
Ok((
format!("Sagatavoju mūzikas draftu: {}.", result.title),
ChatHandoffPayload::Music {
audio_url: result.audio_url,
title: result.title,
duration_seconds: result.duration_seconds,
},
))
}
async fn handle_video_handoff(
bridge: &PythonBridge,
message: &str,
) -> Result<(String, ChatHandoffPayload), MarisError> {
let result = bridge
.call::<VideoGenerationPayload>(
"video/generate",
&serde_json::json!({
"prompt": message,
"duration_seconds": 5,
"fps": 24,
"resolution": "720p",
}),
)
.await
.map_err(|err| {
error!("Video handoff failed: {err}");
MarisError::Bridge("Video ģenerēšanas serviss šobrīd nav pieejams.".to_string())
})?;
Ok((
"Sagatavoju video draftu pēc tava pieprasījuma.".to_string(),
ChatHandoffPayload::Video {
video_url: result.video_url,
duration_seconds: result.duration_seconds,
prompt: message.to_string(),
},
))
}
async fn prepare_chat_request(req: ChatRequest) -> Result<PreparedChatRequest, MarisError> {
let bridge = PythonBridge::new();
let message = normalize_message(&req.message)?;
let session_id = normalize_optional_text(req.session_id.clone());
let persona_id = normalize_optional_text(req.persona_id.clone());
let vision_context = analyze_vision_context(&bridge, &req).await?;
let route = resolve_chat_route(
&bridge,
&message,
session_id.as_deref(),
persona_id.as_deref(),
vision_context.is_some(),
)
.await;
let route = if is_autonomous_mode_enabled(req.autonomous_mode) {
force_autonomous_route(route, session_id.as_deref(), persona_id.as_deref())
} else {
route
};
Ok(PreparedChatRequest {
message,
history: req.history.unwrap_or_default(),
session_id,
persona_id,
fallback_model: req.fallback_model,
route,
vision_context,
})
}
async fn execute_prepared_chat(
prepared: &PreparedChatRequest,
) -> Result<ChatExecutionResult, MarisError> {
let bridge = PythonBridge::new();
match prepared.route.capability.as_str() {
"code_generation" => {
let (summary, handoff) = handle_code_handoff(
&bridge,
&prepared.message,
prepared.fallback_model.as_deref(),
)
.await?;
let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata();
Ok(ChatExecutionResult {
response: summary,
model: fallback_chat_model(&prepared.route.capability),
handoff: Some(handoff),
vision_context: None,
detected_emotion,
emotion_confidence,
response_style,
persona_id: None,
persona_title: None,
persona_summary: None,
request_id: None,
session_id: None,
latency_ms: None,
tokens_used: None,
prompt_messages: None,
memory_matches: None,
tool_trace: None,
})
}
"autonomous_tasks" => {
let (summary, handoff) = handle_autonomous_handoff(&bridge, prepared).await?;
let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata();
Ok(ChatExecutionResult {
response: summary,
model: fallback_chat_model(&prepared.route.capability),
handoff: Some(handoff),
vision_context: None,
detected_emotion,
emotion_confidence,
response_style,
persona_id: prepared.persona_id.clone(),
persona_title: None,
persona_summary: None,
request_id: None,
session_id: prepared.session_id.clone(),
latency_ms: None,
tokens_used: None,
prompt_messages: None,
memory_matches: None,
tool_trace: None,
})
}
"image_generation" => {
let (summary, handoff) = handle_image_handoff(&bridge, &prepared.message).await?;
let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata();
Ok(ChatExecutionResult {
response: summary,
model: fallback_chat_model(&prepared.route.capability),
handoff: Some(handoff),
vision_context: None,
detected_emotion,
emotion_confidence,
response_style,
persona_id: None,
persona_title: None,
persona_summary: None,
request_id: None,
session_id: None,
latency_ms: None,
tokens_used: None,
prompt_messages: None,
memory_matches: None,
tool_trace: None,
})
}
"music_generation" => {
let (summary, handoff) = handle_music_handoff(&bridge, &prepared.message).await?;
let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata();
Ok(ChatExecutionResult {
response: summary,
model: fallback_chat_model(&prepared.route.capability),
handoff: Some(handoff),
vision_context: None,
detected_emotion,
emotion_confidence,
response_style,
persona_id: None,
persona_title: None,
persona_summary: None,
request_id: None,
session_id: None,
latency_ms: None,
tokens_used: None,
prompt_messages: None,
memory_matches: None,
tool_trace: None,
})
}
"video_generation" => {
let (summary, handoff) = handle_video_handoff(&bridge, &prepared.message).await?;
let (detected_emotion, emotion_confidence, response_style) = default_emotion_metadata();
Ok(ChatExecutionResult {
response: summary,
model: fallback_chat_model(&prepared.route.capability),
handoff: Some(handoff),
vision_context: None,
detected_emotion,
emotion_confidence,
response_style,
persona_id: None,
persona_title: None,
persona_summary: None,
request_id: None,
session_id: None,
latency_ms: None,
tokens_used: None,
prompt_messages: None,
memory_matches: None,
tool_trace: None,
})
}
_ => {
let payload = build_text_generation_payload(prepared);
let result = bridge
.call::<TextGenerationPayload>("text/generate", &payload)
.await
.map_err(|err| {
error!("Chat bridge call failed: {err}");
MarisError::Bridge(
"Čata serviss šobrīd nav pieejams. Lūdzu mēģiniet vēlreiz.".to_string(),
)
})?;
Ok(ChatExecutionResult {
response: result.response,
model: if result.model.is_empty() {
fallback_chat_model(&prepared.route.capability)
} else {
result.model
},
handoff: None,
vision_context: prepared.vision_context.clone(),
detected_emotion: result.detected_emotion,
emotion_confidence: result.emotion_confidence,
response_style: result.response_style,
persona_id: (!result.persona_id.is_empty()).then_some(result.persona_id),
persona_title: (!result.persona_title.is_empty()).then_some(result.persona_title),
persona_summary: (!result.persona_summary.is_empty())
.then_some(result.persona_summary),
request_id: (!result.request_id.is_empty()).then_some(result.request_id),
session_id: (!result.session_id.is_empty()).then_some(result.session_id),
latency_ms: (result.latency_ms > 0).then_some(result.latency_ms),
tokens_used: (result.tokens_used > 0).then_some(result.tokens_used),
prompt_messages: (result.prompt_messages > 0).then_some(result.prompt_messages),
memory_matches: (result.memory_matches > 0).then_some(result.memory_matches),
tool_trace: result.tool_trace,
})
}
}
}
fn build_text_generation_payload(prepared: &PreparedChatRequest) -> serde_json::Value {
serde_json::json!({
"message": &prepared.message,
"history": &prepared.history,
"profile": &prepared.route.profile,
"persona_id": &prepared.persona_id,
"session_id": &prepared.session_id,
"fallback_model": &prepared.fallback_model,
"vision_context": prepared.vision_context.as_ref().map(|context| {
serde_json::json!({
"summary": &context.summary,
"source": &context.source,
"model": &context.model,
"fallback_used": context.fallback_used,
"camera_id": &context.camera_id,
"image_url": &context.image_url,
"width": context.width,
"height": context.height,
"detections": context.detections,
})
}),
})
}
fn parse_core_text_stream_event(
raw_event: &str,
) -> Result<Option<CoreTextStreamEvent>, MarisError> {
let normalized = if raw_event.contains("\r\n") {
Cow::Owned(raw_event.replace("\r\n", "\n"))
} else {
Cow::Borrowed(raw_event)
};
let trimmed = normalized.trim();
if trimmed.is_empty() || trimmed.starts_with(':') {
return Ok(None);
}
let mut event_name = "message";
let mut data_lines = Vec::new();
for line in normalized.lines() {
if let Some(value) = line.strip_prefix("event:") {
event_name = value.trim();
continue;
}
if let Some(value) = line.strip_prefix("data:") {
data_lines.push(value.trim_start());
}
}
if data_lines.is_empty() {
return Ok(None);
}
let payload = data_lines.join("\n");
match event_name {
"delta" => {
let parsed: CoreTextDeltaPayload = serde_json::from_str(&payload).map_err(|err| {
MarisError::Bridge(format!("Neizdevās parsēt core text delta eventu: {err}"))
})?;
Ok(parsed.delta.map(CoreTextStreamEvent::Delta))
}
"complete" => {
let parsed: TextGenerationPayload = serde_json::from_str(&payload).map_err(|err| {
MarisError::Bridge(format!("Neizdevās parsēt core text complete eventu: {err}"))
})?;
Ok(Some(CoreTextStreamEvent::Complete(Box::new(
parsed.validate().map_err(|err| {
MarisError::Bridge(format!("Core text stream atgrieza nederīgu payload: {err}"))
})?,
))))
}
"error" => {
let parsed: CoreTextErrorPayload = serde_json::from_str(&payload).map_err(|err| {
MarisError::Bridge(format!("Neizdevās parsēt core text error eventu: {err}"))
})?;
Ok(Some(CoreTextStreamEvent::Error(
parsed
.error
.unwrap_or_else(|| "Core text stream kļūda.".to_string()),
)))
}
_ => Ok(None),
}
}
fn take_next_core_text_stream_frame(buffer: &mut String) -> Option<String> {
if buffer.contains("\r\n") {
*buffer = buffer.replace("\r\n", "\n");
}
if buffer.contains('\r') {
*buffer = buffer.replace('\r', "\n");
}
let index = buffer.find("\n\n")?;
let raw_event = buffer[..index].to_string();
*buffer = buffer[index + 2..].to_string();
Some(raw_event)
}
fn build_stream_text_chat_result(
prepared: &PreparedChatRequest,
result: TextGenerationPayload,
) -> ChatExecutionResult {
ChatExecutionResult {
response: result.response,
model: if result.model.is_empty() {
fallback_chat_model(&prepared.route.capability)
} else {
result.model
},
handoff: None,
vision_context: prepared.vision_context.clone(),
detected_emotion: result.detected_emotion,
emotion_confidence: result.emotion_confidence,
response_style: result.response_style,
persona_id: (!result.persona_id.is_empty()).then_some(result.persona_id),
persona_title: (!result.persona_title.is_empty()).then_some(result.persona_title),
persona_summary: (!result.persona_summary.is_empty()).then_some(result.persona_summary),
request_id: (!result.request_id.is_empty()).then_some(result.request_id),
session_id: (!result.session_id.is_empty()).then_some(result.session_id),
latency_ms: (result.latency_ms > 0).then_some(result.latency_ms),
tokens_used: (result.tokens_used > 0).then_some(result.tokens_used),
prompt_messages: (result.prompt_messages > 0).then_some(result.prompt_messages),
memory_matches: (result.memory_matches > 0).then_some(result.memory_matches),
tool_trace: result.tool_trace,
}
}
async fn stream_text_chat_response(
bridge: &PythonBridge,
prepared: &PreparedChatRequest,
tx: &mpsc::Sender<Result<Event, Infallible>>,
) -> Result<Option<ChatExecutionResult>, MarisError> {
let payload = build_text_generation_payload(prepared);
let response = bridge
.stream("text/generate/stream", &payload)
.await
.map_err(|err| {
error!("Chat bridge stream call failed: {err}");
MarisError::Bridge(
"Čata serviss šobrīd nav pieejams. Lūdzu mēģiniet vēlreiz.".to_string(),
)
})?;
let mut stream = response.bytes_stream();
let mut buffer = String::new();
while let Some(next_chunk) = tokio::select! {
_ = tx.closed() => return Ok(None),
next = stream.next() => next,
} {
let chunk = next_chunk.map_err(|err| {
MarisError::Bridge(format!("Neizdevās nolasīt core text straumi: {err}"))
})?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(raw_event) = take_next_core_text_stream_frame(&mut buffer) {
match parse_core_text_stream_event(&raw_event)? {
Some(CoreTextStreamEvent::Delta(delta)) => {
if !try_send_stream_event(
tx,
build_sse_event("delta", &serde_json::json!({ "delta": delta })),
)
.await
{
return Ok(None);
}
}
Some(CoreTextStreamEvent::Complete(payload)) => {
return Ok(Some(build_stream_text_chat_result(prepared, *payload)));
}
Some(CoreTextStreamEvent::Error(message)) => {
return Err(MarisError::Bridge(message));
}
None => {}
}
}
}
if !buffer.trim().is_empty() {
match parse_core_text_stream_event(&buffer)? {
Some(CoreTextStreamEvent::Delta(delta)) => {
if !try_send_stream_event(
tx,
build_sse_event("delta", &serde_json::json!({ "delta": delta })),
)
.await
{
return Ok(None);
}
}
Some(CoreTextStreamEvent::Complete(payload)) => {
return Ok(Some(build_stream_text_chat_result(prepared, *payload)));
}
Some(CoreTextStreamEvent::Error(message)) => {
return Err(MarisError::Bridge(message));
}
None => {}
}
}
Err(MarisError::Bridge(
"Core text straume noslēdzās bez final payload.".to_string(),
))
}
fn build_chat_response(
route: OrchestrationRoutePayload,
chat_result: ChatExecutionResult,
) -> ChatResponse {
ChatResponse {
response: chat_result.response,
model: chat_result.model,
route: ChatRouteInfo {
capability: route.capability,
branch: route.branch,
profile: route.profile,
studio: route.target_studio,
reasoning: route.reasoning,
confidence: route.confidence,
},
handoff: chat_result.handoff,
vision_context: chat_result.vision_context,
detected_emotion: chat_result.detected_emotion,
emotion_confidence: chat_result.emotion_confidence,
response_style: chat_result.response_style,
persona_id: chat_result.persona_id,
persona_title: chat_result.persona_title,
persona_summary: chat_result.persona_summary,
request_id: chat_result.request_id,
session_id: chat_result.session_id,
latency_ms: chat_result.latency_ms,
tokens_used: chat_result.tokens_used,
prompt_messages: chat_result.prompt_messages,
memory_matches: chat_result.memory_matches,
tool_trace: chat_result.tool_trace,
}
}
fn build_sse_event<T: Serialize>(event_name: &str, payload: &T) -> Event {
Event::default()
.event(event_name)
.data(serde_json::to_string(payload).unwrap_or_else(|_| "{}".to_string()))
}
fn build_status_event(stage: &str, message: &str) -> Event {
build_sse_event(
"status",
&ChatStatusPayload {
stage: stage.to_string(),
message: message.to_string(),
},
)
}
async fn try_send_stream_event(tx: &mpsc::Sender<Result<Event, Infallible>>, event: Event) -> bool {
tx.send(Ok(event)).await.is_ok()
}
fn chunk_response_for_streaming(text: &str) -> Vec<String> {
let mut chunks = Vec::new();
let mut current = String::new();
let mut last_boundary = None;
for ch in text.chars() {
current.push(ch);
if ch.is_whitespace() {
last_boundary = Some(current.len());
}
if current.len() >= CHAT_STREAM_CHUNK_CHARS {
if let Some(boundary) = last_boundary
.take()
.filter(|boundary| *boundary < current.len())
{
let remainder = current.split_off(boundary);
chunks.push(std::mem::take(&mut current));
current = remainder;
} else {
chunks.push(std::mem::take(&mut current));
}
}
}
if !current.is_empty() {
chunks.push(current);
}
chunks
}
fn build_stream_delta_events(response: &str) -> VecDeque<Event> {
chunk_response_for_streaming(response)
.into_iter()
.map(|delta| build_sse_event("delta", &serde_json::json!({ "delta": delta })))
.collect()
}
pub async fn chat(Json(req): Json<ChatRequest>) -> Result<Json<ChatResponse>, MarisError> {
let prepared = prepare_chat_request(req).await?;
let chat_result = execute_prepared_chat(&prepared).await?;
Ok(Json(build_chat_response(prepared.route, chat_result)))
}
pub async fn chat_stream(
Json(req): Json<ChatRequest>,
) -> Result<Sse<impl futures_util::Stream<Item = Result<Event, Infallible>>>, MarisError> {
let message = normalize_message(&req.message)?;
let (tx, rx) = mpsc::channel::<Result<Event, Infallible>>(32);
let history = req.history.clone().unwrap_or_default();
tokio::spawn(async move {
let bridge = PythonBridge::new();
let session_id = normalize_optional_text(req.session_id.clone());
let persona_id = normalize_optional_text(req.persona_id.clone());
let autonomous_mode = is_autonomous_mode_enabled(req.autonomous_mode);
let fallback_model = normalize_optional_text(req.fallback_model.clone());
if !try_send_stream_event(
&tx,
build_status_event("presence", "Savienojums izveidots — Maris ir klāt."),
)
.await
{
return;
}
let has_vision_input = req.image_url.is_some()
|| req.image_base64.is_some()
|| req
.camera_id
.as_ref()
.is_some_and(|camera_id| !camera_id.trim().is_empty());
if has_vision_input
&& !try_send_stream_event(
&tx,
build_status_event("vision", "Analizēju vizuālo kontekstu."),
)
.await
{
return;
}
let vision_context = match analyze_vision_context(&bridge, &req).await {
Ok(vision_context) => vision_context,
Err(err) => {
let _ = try_send_stream_event(
&tx,
build_sse_event("error", &serde_json::json!({ "error": err.to_string() })),
)
.await;
return;
}
};
if let Some(vision_context) = vision_context.as_ref() {
if !try_send_stream_event(&tx, build_sse_event("vision_context", vision_context)).await
{
return;
}
}
if !try_send_stream_event(
&tx,
build_status_event("routing", "Izvēlos labāko atbildes režīmu."),
)
.await
{
return;
}
let route = resolve_chat_route(
&bridge,
&message,
session_id.as_deref(),
persona_id.as_deref(),
vision_context.is_some(),
)
.await;
let route = if autonomous_mode {
force_autonomous_route(route, session_id.as_deref(), persona_id.as_deref())
} else {
route
};
if !try_send_stream_event(&tx, build_sse_event("route", &route)).await {
return;
}
if !try_send_stream_event(
&tx,
build_status_event("generation", generation_status_message(&route.capability)),
)
.await
{
return;
}
let prepared = PreparedChatRequest {
message,
history,
session_id,
persona_id,
fallback_model,
route,
vision_context,
};
if prepared.route.capability == "text_chat" {
match stream_text_chat_response(&bridge, &prepared, &tx).await {
Ok(Some(chat_result)) => {
let response = build_chat_response(prepared.route, chat_result);
let _ =
try_send_stream_event(&tx, build_sse_event("complete", &response)).await;
}
Ok(None) => {}
Err(err) => {
let _ = try_send_stream_event(
&tx,
build_sse_event("error", &serde_json::json!({ "error": err.to_string() })),
)
.await;
}
}
return;
}
match execute_prepared_chat(&prepared).await {
Ok(chat_result) => {
let response = build_chat_response(prepared.route, chat_result);
for delta in build_stream_delta_events(&response.response) {
if !try_send_stream_event(&tx, delta).await {
return;
}
tokio::time::sleep(Duration::from_millis(CHAT_STREAM_INTERVAL_MS)).await;
}
let _ = try_send_stream_event(&tx, build_sse_event("complete", &response)).await;
}
Err(err) => {
let _ = try_send_stream_event(
&tx,
build_sse_event("error", &serde_json::json!({ "error": err.to_string() })),
)
.await;
}
}
});
let stream = stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|event| (event, rx))
});
Ok(Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(10))
.text("keepalive"),
))
}
#[cfg(test)]
mod tests {
use super::{
chunk_response_for_streaming, generation_status_message, normalize_message,
parse_core_text_stream_event, take_next_core_text_stream_frame, CoreTextStreamEvent,
};
use crate::utils::errors::MarisError;
#[test]
fn rejects_empty_messages() {
let err = normalize_message(" ").unwrap_err();
assert!(matches!(err, MarisError::Validation(_)));
assert_eq!(err.to_string(), "Derīguma kļūda: Ziņa nedrīkst būt tukša.");
}
#[test]
fn trims_valid_messages() {
assert_eq!(
normalize_message(" Sveiki, Maris! ").unwrap(),
"Sveiki, Maris!"
);
}
#[test]
fn chunks_streaming_response_without_losing_text() {
let text =
"Šī ir garāka atbilde ar vairākiem vārdiem, lai pārbaudītu straumēšanas chunkus.";
let chunks = chunk_response_for_streaming(text);
assert!(chunks.len() >= 2);
assert_eq!(chunks.concat(), text);
}
#[test]
fn maps_generation_status_by_capability() {
assert_eq!(
generation_status_message("code_generation"),
"Veidoju koda draftu."
);
assert_eq!(
generation_status_message("text_chat"),
"Domāju un veidoju atbildi."
);
}
#[test]
fn parses_core_text_delta_event() {
let event = parse_core_text_stream_event("event: delta\ndata: {\"delta\":\"Sveiki\"}\n")
.unwrap()
.unwrap();
match event {
CoreTextStreamEvent::Delta(delta) => assert_eq!(delta, "Sveiki"),
_ => panic!("expected delta event"),
}
}
#[test]
fn parses_core_text_complete_event() {
let event = parse_core_text_stream_event(
"event: complete\ndata: {\"response\":\"Sveiki\",\"model\":\"m\",\"detected_emotion\":\"neutral\",\"emotion_confidence\":0.1,\"response_style\":\"clear_grounded\"}\n",
)
.unwrap()
.unwrap();
match event {
CoreTextStreamEvent::Complete(payload) => {
assert_eq!(payload.response, "Sveiki");
assert_eq!(payload.model, "m");
}
_ => panic!("expected complete event"),
}
}
#[test]
fn extracts_multiple_core_text_events_with_crlf_delimiters() {
let mut buffer = concat!(
"event: delta\r\n",
"data: {\"delta\":\"Sveiki\"}\r\n\r\n",
"event: complete\r\n",
"data: {\"response\":\"Sveiki\",\"model\":\"m\",\"detected_emotion\":\"neutral\",\"emotion_confidence\":0.1,\"response_style\":\"clear_grounded\"}\r\n\r\n"
)
.to_string();
let first = take_next_core_text_stream_frame(&mut buffer).expect("first event");
let second = take_next_core_text_stream_frame(&mut buffer).expect("second event");
assert!(buffer.is_empty());
assert!(matches!(
parse_core_text_stream_event(&first).unwrap(),
Some(CoreTextStreamEvent::Delta(delta)) if delta == "Sveiki"
));
assert!(matches!(
parse_core_text_stream_event(&second).unwrap(),
Some(CoreTextStreamEvent::Complete(payload)) if payload.response == "Sveiki"
));
}
}