/// PythonBridge — nosūta HTTP pieprasījumus uz core-python FastAPI servisu. use anyhow::{anyhow, Result}; use reqwest::{Client, Response}; use serde::de::DeserializeOwned; use serde_json::Value; use std::time::Duration; use tracing::warn; use crate::inference::response_validator::ValidateCoreResponse; pub struct PythonBridge { client: Client, base_urls: Vec, request_timeout: Duration, router_timeout: Duration, } const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 5; const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 45; const DEFAULT_ROUTER_TIMEOUT_SECS: u64 = 8; const DEFAULT_CORE_PYTHON_URL: &str = "http://localhost:8000"; const RAILWAY_PUBLIC_SUFFIX: &str = ".up.railway.app"; const RAILWAY_PRIVATE_SUFFIX: &str = ".railway.internal"; const BACKEND_SERVICE_NAME: &str = "backend"; const CORE_PYTHON_SERVICE_NAME: &str = "core-python"; fn env_u64_or_default(key: &str, default: u64) -> u64 { std::env::var(key) .ok() .and_then(|value| value.trim().parse::().ok()) .filter(|value| *value > 0) .unwrap_or(default) } /// Parse an optional timeout from the environment. /// /// Returns `Some(Duration)` when the variable contains a positive integer value /// in seconds, otherwise returns `None` for missing, invalid, or zero values. fn env_optional_duration(key: &str) -> Option { std::env::var(key) .ok() .and_then(|value| value.trim().parse::().ok()) .filter(|value| *value > 0) .map(Duration::from_secs) } fn resolve_request_timeout() -> Duration { env_optional_duration("CORE_PYTHON_TIMEOUT_SECS") .unwrap_or_else(|| Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)) } fn resolve_router_timeout(request_timeout: Duration) -> Duration { env_optional_duration("CORE_PYTHON_ROUTER_TIMEOUT_SECS") .unwrap_or_else(|| Duration::from_secs(DEFAULT_ROUTER_TIMEOUT_SECS)) .min(request_timeout) } fn first_non_empty_env_value(keys: &[&str]) -> Option { keys.iter() .find_map(|key| std::env::var(key).ok()) .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()) } fn strip_url_scheme(value: &str) -> &str { value .strip_prefix("https://") .or_else(|| value.strip_prefix("http://")) .or_else(|| value.strip_prefix("wss://")) .or_else(|| value.strip_prefix("ws://")) .unwrap_or(value) } fn replace_service_token(label: &str, source: &str, target: &str) -> Option { let mut replaced = false; let tokens = label .split('-') .map(|token| { if !replaced && token == source { replaced = true; target } else { token } }) .collect::>(); replaced.then(|| tokens.join("-")) } fn label_contains_service(label: &str, service: &str) -> bool { label == service || label.starts_with(&format!("{service}-")) || label.ends_with(&format!("-{service}")) || label.contains(&format!("-{service}-")) } fn infer_railway_sibling_hostname(domain: &str, source: &str, target: &str) -> Option { let hostname = strip_url_scheme(domain.trim()) .trim_end_matches('/') .trim() .to_string(); let (label, rest) = hostname.split_once('.')?; let replaced = replace_service_token(label, source, target)?; Some(format!("{replaced}.{rest}")) } fn infer_private_core_python_url() -> Option { first_non_empty_env_value(&["RAILWAY_PRIVATE_DOMAIN"]) .and_then(|domain| { infer_railway_sibling_hostname(&domain, BACKEND_SERVICE_NAME, CORE_PYTHON_SERVICE_NAME) }) .map(|hostname| format!("http://{hostname}")) } fn is_public_railway_core_python_url(base_url: &str) -> bool { let hostname = strip_url_scheme(base_url).trim_end_matches('/'); hostname.ends_with(RAILWAY_PUBLIC_SUFFIX) && hostname .split('.') .next() .is_some_and(|label| label_contains_service(label, CORE_PYTHON_SERVICE_NAME)) } fn resolve_core_python_base_url() -> String { let configured_value = first_non_empty_env_value(&["CORE_PYTHON_URL"]); let normalized = normalize_http_base_url( configured_value .as_deref() .unwrap_or(DEFAULT_CORE_PYTHON_URL), ); if let Some(private_url) = infer_private_core_python_url() { if configured_value.is_none() || normalized == DEFAULT_CORE_PYTHON_URL || is_public_railway_core_python_url(&normalized) { return private_url; } } normalized } fn add_url_candidate(candidates: &mut Vec, candidate: String) { if !candidates.iter().any(|existing| existing == &candidate) { candidates.push(candidate); } } fn railway_private_url_with_default_port(base_url: &str, default_port: u16) -> Option { let (scheme, rest) = base_url .strip_prefix("http://") .map(|rest| ("http://", rest)) .or_else(|| { base_url .strip_prefix("https://") .map(|rest| ("https://", rest)) })?; let (authority, suffix) = match rest.find('/') { Some(index) => (&rest[..index], &rest[index..]), None => (rest, ""), }; if authority.contains(':') || !authority.ends_with(RAILWAY_PRIVATE_SUFFIX) { return None; } Some(format!("{scheme}{authority}:{default_port}{suffix}")) } fn resolve_core_python_base_urls() -> Vec { let configured_value = first_non_empty_env_value(&["CORE_PYTHON_URL"]); let primary = resolve_core_python_base_url(); let mut candidates = vec![primary.clone()]; if let Some(port_variant) = railway_private_url_with_default_port(&primary, 8000) { add_url_candidate(&mut candidates, port_variant); } if let Some(configured) = configured_value { let normalized = normalize_http_base_url(&configured); add_url_candidate(&mut candidates, normalized.clone()); if let Some(port_variant) = railway_private_url_with_default_port(&normalized, 8000) { add_url_candidate(&mut candidates, port_variant); } } candidates } pub(crate) fn normalize_http_base_url(base_url: &str) -> String { let trimmed = base_url.trim().trim_end_matches('/'); if trimmed.is_empty() { return DEFAULT_CORE_PYTHON_URL.to_string(); } let normalized = if trimmed.starts_with("http://") || trimmed.starts_with("https://") { trimmed.to_string() } else if trimmed.starts_with("localhost") || trimmed.starts_with("127.") || trimmed.starts_with("0.0.0.0") || trimmed.contains(':') || trimmed.ends_with(".local") || trimmed.ends_with(RAILWAY_PRIVATE_SUFFIX) { format!("http://{trimmed}") } else { format!("https://{trimmed}") }; normalized } impl PythonBridge { fn timeout_for_endpoint(&self, endpoint: &str) -> Duration { if endpoint == "orchestrator/route" { self.router_timeout } else { self.request_timeout } } pub fn new() -> Self { let base_urls = resolve_core_python_base_urls(); let request_timeout = resolve_request_timeout(); let router_timeout = resolve_router_timeout(request_timeout); let connect_timeout = Duration::from_secs(env_u64_or_default( "CORE_PYTHON_CONNECT_TIMEOUT_SECS", DEFAULT_CONNECT_TIMEOUT_SECS, )); let client_builder = Client::builder() .connect_timeout(connect_timeout) // Use a per-read timeout so SSE/text streams can stay open while bytes // continue flowing, but stalled model generations still fail cleanly. .read_timeout(request_timeout); let client = client_builder.build().expect("Failed to build HTTP client"); Self { client, base_urls, request_timeout, router_timeout, } } /// Izsauc core-python API endpoint. /// `endpoint` — ceļš bez slīpsvītras sākumā, piem. "text/generate" pub async fn call(&self, endpoint: &str, payload: &Value) -> Result where T: DeserializeOwned + ValidateCoreResponse, { let mut last_error = None; for (index, base_url) in self.base_urls.iter().enumerate() { let url = format!("{base_url}/v1/{endpoint}"); let request_timeout = self.timeout_for_endpoint(endpoint); let response = match self .client .post(&url) .timeout(request_timeout) .json(payload) .send() .await { Ok(response) => response, Err(error) => { let error = if error.is_timeout() { anyhow!( "Python bridge timeout pēc {}s ({url})", request_timeout.as_secs() ) } else { anyhow!("Python bridge HTTP kļūda: {}", error) }; if let Some(next_base_url) = self.base_urls.get(index + 1) { warn!( "Python bridge request to {url} failed, retrying {}: {error}", format!("{next_base_url}/v1/{endpoint}") ); } last_error = Some(error); continue; } }; if !response.status().is_success() { let status = response.status(); let body = response.text().await.unwrap_or_default(); return Err(anyhow!("Python bridge atbildēja ar {}: {}", status, body)); } let json: T = response .json() .await .map_err(|e| anyhow!("Python bridge JSON kļūda: {}", e))?; return json .validate() .map_err(|e| anyhow!("Python bridge atgrieza nederīgu payload: {}", e)); } Err(last_error.unwrap_or_else(|| anyhow!("Python bridge URL nav konfigurēts"))) } pub async fn get(&self, endpoint: &str) -> Result where T: DeserializeOwned + ValidateCoreResponse, { let mut last_error = None; for (index, base_url) in self.base_urls.iter().enumerate() { let url = format!("{base_url}/v1/{endpoint}"); let request_timeout = self.timeout_for_endpoint(endpoint); let response = match self.client.get(&url).timeout(request_timeout).send().await { Ok(response) => response, Err(error) => { let error = if error.is_timeout() { anyhow!( "Python bridge timeout pēc {}s ({url})", request_timeout.as_secs() ) } else { anyhow!("Python bridge HTTP kļūda: {}", error) }; if let Some(next_base_url) = self.base_urls.get(index + 1) { warn!( "Python bridge GET request to {url} failed, retrying {}: {error}", format!("{next_base_url}/v1/{endpoint}") ); } last_error = Some(error); continue; } }; if !response.status().is_success() { let status = response.status(); let body = response.text().await.unwrap_or_default(); return Err(anyhow!("Python bridge atbildēja ar {}: {}", status, body)); } let json: T = response .json() .await .map_err(|e| anyhow!("Python bridge JSON kļūda: {}", e))?; return json .validate() .map_err(|e| anyhow!("Python bridge atgrieza nederīgu payload: {}", e)); } Err(last_error.unwrap_or_else(|| anyhow!("Python bridge URL nav konfigurēts"))) } pub async fn stream(&self, endpoint: &str, payload: &Value) -> Result { let mut last_error = None; for (index, base_url) in self.base_urls.iter().enumerate() { let url = format!("{base_url}/v1/{endpoint}"); let request_timeout = self.timeout_for_endpoint(endpoint); let response = match self .client .post(&url) .timeout(request_timeout) .header("Accept", "text/event-stream") .json(payload) .send() .await { Ok(response) => response, Err(error) => { let error = if error.is_timeout() { anyhow!( "Python bridge timeout pēc {}s ({url})", request_timeout.as_secs() ) } else { anyhow!("Python bridge HTTP kļūda: {}", error) }; if let Some(next_base_url) = self.base_urls.get(index + 1) { warn!( "Python bridge stream request to {url} failed, retrying {}: {error}", format!("{next_base_url}/v1/{endpoint}") ); } last_error = Some(error); continue; } }; if !response.status().is_success() { let status = response.status(); let body = response.text().await.unwrap_or_default(); return Err(anyhow!("Python bridge atbildēja ar {}: {}", status, body)); } return Ok(response); } Err(last_error.unwrap_or_else(|| anyhow!("Python bridge URL nav konfigurēts"))) } } impl Default for PythonBridge { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests { use super::{ env_optional_duration, env_u64_or_default, normalize_http_base_url, resolve_core_python_base_url, resolve_core_python_base_urls, resolve_request_timeout, resolve_router_timeout, DEFAULT_CORE_PYTHON_URL, DEFAULT_REQUEST_TIMEOUT_SECS, DEFAULT_ROUTER_TIMEOUT_SECS, }; use std::env; use std::sync::{LazyLock, Mutex}; use std::time::Duration; static ENV_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); struct EnvGuard { previous: Vec<(&'static str, Option)>, } impl EnvGuard { fn set(entries: &[(&'static str, Option<&str>)]) -> Self { let previous = entries .iter() .map(|(key, value)| { let original = env::var(key).ok(); match value { Some(value) => env::set_var(key, value), None => env::remove_var(key), } (*key, original) }) .collect(); Self { previous } } } impl Drop for EnvGuard { fn drop(&mut self) { for (key, value) in self.previous.drain(..) { match value { Some(value) => env::set_var(key, value), None => env::remove_var(key), } } } } #[test] fn adds_https_for_public_host_values() { assert_eq!( normalize_http_base_url("maris-core-python.up.railway.app"), "https://maris-core-python.up.railway.app" ); } #[test] fn adds_http_for_localhost_values() { assert_eq!( normalize_http_base_url("localhost:8000"), "http://localhost:8000" ); } #[test] fn adds_http_for_bare_hostport_values() { assert_eq!( normalize_http_base_url("maris-core-python:8000"), "http://maris-core-python:8000" ); } #[test] fn keeps_explicit_port_on_railway_private_url() { assert_eq!( normalize_http_base_url("http://maris-core-python.railway.internal:8000/"), "http://maris-core-python.railway.internal:8000" ); } #[test] fn reads_timeout_value_from_env() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_VALID", Some("42"))]); assert_eq!(env_u64_or_default("MARIS_TEST_TIMEOUT_SECS_VALID", 30), 42); } #[test] fn env_optional_duration_returns_none_when_missing() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_OPTIONAL", None)]); assert_eq!( env_optional_duration("MARIS_TEST_TIMEOUT_SECS_OPTIONAL"), None ); } #[test] fn core_python_timeout_uses_default_when_env_is_missing() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[("CORE_PYTHON_TIMEOUT_SECS", None)]); assert_eq!( resolve_request_timeout(), Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS) ); } #[test] fn router_timeout_uses_default_when_env_is_missing() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[("CORE_PYTHON_ROUTER_TIMEOUT_SECS", None)]); assert_eq!( resolve_router_timeout(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)), Duration::from_secs(DEFAULT_ROUTER_TIMEOUT_SECS) ); } #[test] fn router_timeout_is_capped_by_global_request_timeout() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[("CORE_PYTHON_ROUTER_TIMEOUT_SECS", Some("20"))]); assert_eq!( resolve_router_timeout(Duration::from_secs(6)), Duration::from_secs(6) ); } #[test] fn reads_optional_timeout_value_from_env() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_OPTIONAL", Some("42"))]); assert_eq!( env_optional_duration("MARIS_TEST_TIMEOUT_SECS_OPTIONAL"), Some(Duration::from_secs(42)) ); } #[test] fn falls_back_when_env_timeout_invalid() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_INVALID", Some("abc"))]); assert_eq!( env_u64_or_default("MARIS_TEST_TIMEOUT_SECS_INVALID", 30), 30 ); } #[test] fn infers_private_core_python_url_when_env_is_missing_on_railway() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[ ("CORE_PYTHON_URL", None), ( "RAILWAY_PRIVATE_DOMAIN", Some("maris-backend.railway.internal"), ), ]); assert_eq!( resolve_core_python_base_url(), "http://maris-core-python.railway.internal" ); } #[test] fn rewrites_public_railway_core_python_url_to_private_sibling_when_available() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[ ( "CORE_PYTHON_URL", Some("https://maris-core-python.up.railway.app"), ), ( "RAILWAY_PRIVATE_DOMAIN", Some("maris-backend.railway.internal"), ), ]); assert_eq!( resolve_core_python_base_url(), "http://maris-core-python.railway.internal" ); } #[test] fn keeps_explicit_non_railway_core_python_url() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[ ("CORE_PYTHON_URL", Some("https://example.com/core")), ( "RAILWAY_PRIVATE_DOMAIN", Some("maris-backend.railway.internal"), ), ]); assert_eq!(resolve_core_python_base_url(), "https://example.com/core"); } #[test] fn adds_default_private_port_fallback_for_railway_internal_targets() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[ ("CORE_PYTHON_URL", None), ( "RAILWAY_PRIVATE_DOMAIN", Some("maris-backend.railway.internal"), ), ]); assert_eq!( resolve_core_python_base_urls(), vec![ "http://maris-core-python.railway.internal".to_string(), "http://maris-core-python.railway.internal:8000".to_string() ] ); } #[test] fn keeps_explicit_ported_private_url_as_primary_candidate() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[ ( "CORE_PYTHON_URL", Some("http://maris-core-python.railway.internal:8000/"), ), ("RAILWAY_PRIVATE_DOMAIN", None), ]); assert_eq!( resolve_core_python_base_urls(), vec!["http://maris-core-python.railway.internal:8000".to_string()] ); } #[test] fn keeps_local_default_when_not_running_on_railway() { let _env_lock = ENV_LOCK.lock().unwrap(); let _env_guard = EnvGuard::set(&[("CORE_PYTHON_URL", None), ("RAILWAY_PRIVATE_DOMAIN", None)]); assert_eq!(resolve_core_python_base_url(), DEFAULT_CORE_PYTHON_URL); } }