| |
| use anyhow::{anyhow, Result}; |
| use reqwest::{Client, Response}; |
| use serde::de::DeserializeOwned; |
| use serde_json::Value; |
| use std::time::Duration; |
| use tracing::warn; |
|
|
| use crate::inference::response_validator::ValidateCoreResponse; |
|
|
| pub struct PythonBridge { |
| client: Client, |
| base_urls: Vec<String>, |
| request_timeout: Duration, |
| router_timeout: Duration, |
| } |
|
|
| const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 5; |
| const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 45; |
| const DEFAULT_ROUTER_TIMEOUT_SECS: u64 = 8; |
| const DEFAULT_CORE_PYTHON_URL: &str = "http://localhost:8000"; |
| const RAILWAY_PUBLIC_SUFFIX: &str = ".up.railway.app"; |
| const RAILWAY_PRIVATE_SUFFIX: &str = ".railway.internal"; |
| const BACKEND_SERVICE_NAME: &str = "backend"; |
| const CORE_PYTHON_SERVICE_NAME: &str = "core-python"; |
|
|
| fn env_u64_or_default(key: &str, default: u64) -> u64 { |
| std::env::var(key) |
| .ok() |
| .and_then(|value| value.trim().parse::<u64>().ok()) |
| .filter(|value| *value > 0) |
| .unwrap_or(default) |
| } |
|
|
| |
| |
| |
| |
| fn env_optional_duration(key: &str) -> Option<Duration> { |
| std::env::var(key) |
| .ok() |
| .and_then(|value| value.trim().parse::<u64>().ok()) |
| .filter(|value| *value > 0) |
| .map(Duration::from_secs) |
| } |
|
|
| fn resolve_request_timeout() -> Duration { |
| env_optional_duration("CORE_PYTHON_TIMEOUT_SECS") |
| .unwrap_or_else(|| Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)) |
| } |
|
|
| fn resolve_router_timeout(request_timeout: Duration) -> Duration { |
| env_optional_duration("CORE_PYTHON_ROUTER_TIMEOUT_SECS") |
| .unwrap_or_else(|| Duration::from_secs(DEFAULT_ROUTER_TIMEOUT_SECS)) |
| .min(request_timeout) |
| } |
|
|
| fn first_non_empty_env_value(keys: &[&str]) -> Option<String> { |
| keys.iter() |
| .find_map(|key| std::env::var(key).ok()) |
| .map(|value| value.trim().to_string()) |
| .filter(|value| !value.is_empty()) |
| } |
|
|
| fn strip_url_scheme(value: &str) -> &str { |
| value |
| .strip_prefix("https://") |
| .or_else(|| value.strip_prefix("http://")) |
| .or_else(|| value.strip_prefix("wss://")) |
| .or_else(|| value.strip_prefix("ws://")) |
| .unwrap_or(value) |
| } |
|
|
| fn replace_service_token(label: &str, source: &str, target: &str) -> Option<String> { |
| let mut replaced = false; |
| let tokens = label |
| .split('-') |
| .map(|token| { |
| if !replaced && token == source { |
| replaced = true; |
| target |
| } else { |
| token |
| } |
| }) |
| .collect::<Vec<_>>(); |
|
|
| replaced.then(|| tokens.join("-")) |
| } |
|
|
| fn label_contains_service(label: &str, service: &str) -> bool { |
| label == service |
| || label.starts_with(&format!("{service}-")) |
| || label.ends_with(&format!("-{service}")) |
| || label.contains(&format!("-{service}-")) |
| } |
|
|
| fn infer_railway_sibling_hostname(domain: &str, source: &str, target: &str) -> Option<String> { |
| let hostname = strip_url_scheme(domain.trim()) |
| .trim_end_matches('/') |
| .trim() |
| .to_string(); |
| let (label, rest) = hostname.split_once('.')?; |
| let replaced = replace_service_token(label, source, target)?; |
| Some(format!("{replaced}.{rest}")) |
| } |
|
|
| fn infer_private_core_python_url() -> Option<String> { |
| first_non_empty_env_value(&["RAILWAY_PRIVATE_DOMAIN"]) |
| .and_then(|domain| { |
| infer_railway_sibling_hostname(&domain, BACKEND_SERVICE_NAME, CORE_PYTHON_SERVICE_NAME) |
| }) |
| .map(|hostname| format!("http://{hostname}")) |
| } |
|
|
| fn is_public_railway_core_python_url(base_url: &str) -> bool { |
| let hostname = strip_url_scheme(base_url).trim_end_matches('/'); |
|
|
| hostname.ends_with(RAILWAY_PUBLIC_SUFFIX) |
| && hostname |
| .split('.') |
| .next() |
| .is_some_and(|label| label_contains_service(label, CORE_PYTHON_SERVICE_NAME)) |
| } |
|
|
| fn resolve_core_python_base_url() -> String { |
| let configured_value = first_non_empty_env_value(&["CORE_PYTHON_URL"]); |
| let normalized = normalize_http_base_url( |
| configured_value |
| .as_deref() |
| .unwrap_or(DEFAULT_CORE_PYTHON_URL), |
| ); |
|
|
| if let Some(private_url) = infer_private_core_python_url() { |
| if configured_value.is_none() |
| || normalized == DEFAULT_CORE_PYTHON_URL |
| || is_public_railway_core_python_url(&normalized) |
| { |
| return private_url; |
| } |
| } |
|
|
| normalized |
| } |
|
|
| fn add_url_candidate(candidates: &mut Vec<String>, candidate: String) { |
| if !candidates.iter().any(|existing| existing == &candidate) { |
| candidates.push(candidate); |
| } |
| } |
|
|
| fn railway_private_url_with_default_port(base_url: &str, default_port: u16) -> Option<String> { |
| let (scheme, rest) = base_url |
| .strip_prefix("http://") |
| .map(|rest| ("http://", rest)) |
| .or_else(|| { |
| base_url |
| .strip_prefix("https://") |
| .map(|rest| ("https://", rest)) |
| })?; |
| let (authority, suffix) = match rest.find('/') { |
| Some(index) => (&rest[..index], &rest[index..]), |
| None => (rest, ""), |
| }; |
|
|
| if authority.contains(':') || !authority.ends_with(RAILWAY_PRIVATE_SUFFIX) { |
| return None; |
| } |
|
|
| Some(format!("{scheme}{authority}:{default_port}{suffix}")) |
| } |
|
|
| fn resolve_core_python_base_urls() -> Vec<String> { |
| let configured_value = first_non_empty_env_value(&["CORE_PYTHON_URL"]); |
| let primary = resolve_core_python_base_url(); |
| let mut candidates = vec![primary.clone()]; |
|
|
| if let Some(port_variant) = railway_private_url_with_default_port(&primary, 8000) { |
| add_url_candidate(&mut candidates, port_variant); |
| } |
|
|
| if let Some(configured) = configured_value { |
| let normalized = normalize_http_base_url(&configured); |
| add_url_candidate(&mut candidates, normalized.clone()); |
|
|
| if let Some(port_variant) = railway_private_url_with_default_port(&normalized, 8000) { |
| add_url_candidate(&mut candidates, port_variant); |
| } |
| } |
|
|
| candidates |
| } |
|
|
| pub(crate) fn normalize_http_base_url(base_url: &str) -> String { |
| let trimmed = base_url.trim().trim_end_matches('/'); |
|
|
| if trimmed.is_empty() { |
| return DEFAULT_CORE_PYTHON_URL.to_string(); |
| } |
|
|
| let normalized = if trimmed.starts_with("http://") || trimmed.starts_with("https://") { |
| trimmed.to_string() |
| } else if trimmed.starts_with("localhost") |
| || trimmed.starts_with("127.") |
| || trimmed.starts_with("0.0.0.0") |
| || trimmed.contains(':') |
| || trimmed.ends_with(".local") |
| || trimmed.ends_with(RAILWAY_PRIVATE_SUFFIX) |
| { |
| format!("http://{trimmed}") |
| } else { |
| format!("https://{trimmed}") |
| }; |
|
|
| normalized |
| } |
|
|
| impl PythonBridge { |
| fn timeout_for_endpoint(&self, endpoint: &str) -> Duration { |
| if endpoint == "orchestrator/route" { |
| self.router_timeout |
| } else { |
| self.request_timeout |
| } |
| } |
|
|
| pub fn new() -> Self { |
| let base_urls = resolve_core_python_base_urls(); |
|
|
| let request_timeout = resolve_request_timeout(); |
| let router_timeout = resolve_router_timeout(request_timeout); |
| let connect_timeout = Duration::from_secs(env_u64_or_default( |
| "CORE_PYTHON_CONNECT_TIMEOUT_SECS", |
| DEFAULT_CONNECT_TIMEOUT_SECS, |
| )); |
|
|
| let client_builder = Client::builder() |
| .connect_timeout(connect_timeout) |
| |
| |
| .read_timeout(request_timeout); |
| let client = client_builder.build().expect("Failed to build HTTP client"); |
|
|
| Self { |
| client, |
| base_urls, |
| request_timeout, |
| router_timeout, |
| } |
| } |
|
|
| |
| |
| pub async fn call<T>(&self, endpoint: &str, payload: &Value) -> Result<T> |
| where |
| T: DeserializeOwned + ValidateCoreResponse, |
| { |
| let mut last_error = None; |
|
|
| for (index, base_url) in self.base_urls.iter().enumerate() { |
| let url = format!("{base_url}/v1/{endpoint}"); |
| let request_timeout = self.timeout_for_endpoint(endpoint); |
| let response = match self |
| .client |
| .post(&url) |
| .timeout(request_timeout) |
| .json(payload) |
| .send() |
| .await |
| { |
| Ok(response) => response, |
| Err(error) => { |
| let error = if error.is_timeout() { |
| anyhow!( |
| "Python bridge timeout pēc {}s ({url})", |
| request_timeout.as_secs() |
| ) |
| } else { |
| anyhow!("Python bridge HTTP kļūda: {}", error) |
| }; |
|
|
| if let Some(next_base_url) = self.base_urls.get(index + 1) { |
| warn!( |
| "Python bridge request to {url} failed, retrying {}: {error}", |
| format!("{next_base_url}/v1/{endpoint}") |
| ); |
| } |
|
|
| last_error = Some(error); |
| continue; |
| } |
| }; |
|
|
| if !response.status().is_success() { |
| let status = response.status(); |
| let body = response.text().await.unwrap_or_default(); |
| return Err(anyhow!("Python bridge atbildēja ar {}: {}", status, body)); |
| } |
|
|
| let json: T = response |
| .json() |
| .await |
| .map_err(|e| anyhow!("Python bridge JSON kļūda: {}", e))?; |
|
|
| return json |
| .validate() |
| .map_err(|e| anyhow!("Python bridge atgrieza nederīgu payload: {}", e)); |
| } |
|
|
| Err(last_error.unwrap_or_else(|| anyhow!("Python bridge URL nav konfigurēts"))) |
| } |
|
|
| pub async fn get<T>(&self, endpoint: &str) -> Result<T> |
| where |
| T: DeserializeOwned + ValidateCoreResponse, |
| { |
| let mut last_error = None; |
|
|
| for (index, base_url) in self.base_urls.iter().enumerate() { |
| let url = format!("{base_url}/v1/{endpoint}"); |
| let request_timeout = self.timeout_for_endpoint(endpoint); |
| let response = match self.client.get(&url).timeout(request_timeout).send().await { |
| Ok(response) => response, |
| Err(error) => { |
| let error = if error.is_timeout() { |
| anyhow!( |
| "Python bridge timeout pēc {}s ({url})", |
| request_timeout.as_secs() |
| ) |
| } else { |
| anyhow!("Python bridge HTTP kļūda: {}", error) |
| }; |
|
|
| if let Some(next_base_url) = self.base_urls.get(index + 1) { |
| warn!( |
| "Python bridge GET request to {url} failed, retrying {}: {error}", |
| format!("{next_base_url}/v1/{endpoint}") |
| ); |
| } |
|
|
| last_error = Some(error); |
| continue; |
| } |
| }; |
|
|
| if !response.status().is_success() { |
| let status = response.status(); |
| let body = response.text().await.unwrap_or_default(); |
| return Err(anyhow!("Python bridge atbildēja ar {}: {}", status, body)); |
| } |
|
|
| let json: T = response |
| .json() |
| .await |
| .map_err(|e| anyhow!("Python bridge JSON kļūda: {}", e))?; |
|
|
| return json |
| .validate() |
| .map_err(|e| anyhow!("Python bridge atgrieza nederīgu payload: {}", e)); |
| } |
|
|
| Err(last_error.unwrap_or_else(|| anyhow!("Python bridge URL nav konfigurēts"))) |
| } |
|
|
| pub async fn stream(&self, endpoint: &str, payload: &Value) -> Result<Response> { |
| let mut last_error = None; |
|
|
| for (index, base_url) in self.base_urls.iter().enumerate() { |
| let url = format!("{base_url}/v1/{endpoint}"); |
| let request_timeout = self.timeout_for_endpoint(endpoint); |
| let response = match self |
| .client |
| .post(&url) |
| .timeout(request_timeout) |
| .header("Accept", "text/event-stream") |
| .json(payload) |
| .send() |
| .await |
| { |
| Ok(response) => response, |
| Err(error) => { |
| let error = if error.is_timeout() { |
| anyhow!( |
| "Python bridge timeout pēc {}s ({url})", |
| request_timeout.as_secs() |
| ) |
| } else { |
| anyhow!("Python bridge HTTP kļūda: {}", error) |
| }; |
|
|
| if let Some(next_base_url) = self.base_urls.get(index + 1) { |
| warn!( |
| "Python bridge stream request to {url} failed, retrying {}: {error}", |
| format!("{next_base_url}/v1/{endpoint}") |
| ); |
| } |
|
|
| last_error = Some(error); |
| continue; |
| } |
| }; |
|
|
| if !response.status().is_success() { |
| let status = response.status(); |
| let body = response.text().await.unwrap_or_default(); |
| return Err(anyhow!("Python bridge atbildēja ar {}: {}", status, body)); |
| } |
|
|
| return Ok(response); |
| } |
|
|
| Err(last_error.unwrap_or_else(|| anyhow!("Python bridge URL nav konfigurēts"))) |
| } |
| } |
|
|
| impl Default for PythonBridge { |
| fn default() -> Self { |
| Self::new() |
| } |
| } |
|
|
| #[cfg(test)] |
| mod tests { |
| use super::{ |
| env_optional_duration, env_u64_or_default, normalize_http_base_url, |
| resolve_core_python_base_url, resolve_core_python_base_urls, resolve_request_timeout, |
| resolve_router_timeout, DEFAULT_CORE_PYTHON_URL, DEFAULT_REQUEST_TIMEOUT_SECS, |
| DEFAULT_ROUTER_TIMEOUT_SECS, |
| }; |
| use std::env; |
| use std::sync::{LazyLock, Mutex}; |
| use std::time::Duration; |
|
|
| static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(())); |
|
|
| struct EnvGuard { |
| previous: Vec<(&'static str, Option<String>)>, |
| } |
|
|
| impl EnvGuard { |
| fn set(entries: &[(&'static str, Option<&str>)]) -> Self { |
| let previous = entries |
| .iter() |
| .map(|(key, value)| { |
| let original = env::var(key).ok(); |
| match value { |
| Some(value) => env::set_var(key, value), |
| None => env::remove_var(key), |
| } |
| (*key, original) |
| }) |
| .collect(); |
|
|
| Self { previous } |
| } |
| } |
|
|
| impl Drop for EnvGuard { |
| fn drop(&mut self) { |
| for (key, value) in self.previous.drain(..) { |
| match value { |
| Some(value) => env::set_var(key, value), |
| None => env::remove_var(key), |
| } |
| } |
| } |
| } |
|
|
| #[test] |
| fn adds_https_for_public_host_values() { |
| assert_eq!( |
| normalize_http_base_url("maris-core-python.up.railway.app"), |
| "https://maris-core-python.up.railway.app" |
| ); |
| } |
|
|
| #[test] |
| fn adds_http_for_localhost_values() { |
| assert_eq!( |
| normalize_http_base_url("localhost:8000"), |
| "http://localhost:8000" |
| ); |
| } |
|
|
| #[test] |
| fn adds_http_for_bare_hostport_values() { |
| assert_eq!( |
| normalize_http_base_url("maris-core-python:8000"), |
| "http://maris-core-python:8000" |
| ); |
| } |
|
|
| #[test] |
| fn keeps_explicit_port_on_railway_private_url() { |
| assert_eq!( |
| normalize_http_base_url("http://maris-core-python.railway.internal:8000/"), |
| "http://maris-core-python.railway.internal:8000" |
| ); |
| } |
|
|
| #[test] |
| fn reads_timeout_value_from_env() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_VALID", Some("42"))]); |
| assert_eq!(env_u64_or_default("MARIS_TEST_TIMEOUT_SECS_VALID", 30), 42); |
| } |
|
|
| #[test] |
| fn env_optional_duration_returns_none_when_missing() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_OPTIONAL", None)]); |
| assert_eq!( |
| env_optional_duration("MARIS_TEST_TIMEOUT_SECS_OPTIONAL"), |
| None |
| ); |
| } |
|
|
| #[test] |
| fn core_python_timeout_uses_default_when_env_is_missing() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[("CORE_PYTHON_TIMEOUT_SECS", None)]); |
| assert_eq!( |
| resolve_request_timeout(), |
| Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS) |
| ); |
| } |
|
|
| #[test] |
| fn router_timeout_uses_default_when_env_is_missing() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[("CORE_PYTHON_ROUTER_TIMEOUT_SECS", None)]); |
| assert_eq!( |
| resolve_router_timeout(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)), |
| Duration::from_secs(DEFAULT_ROUTER_TIMEOUT_SECS) |
| ); |
| } |
|
|
| #[test] |
| fn router_timeout_is_capped_by_global_request_timeout() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[("CORE_PYTHON_ROUTER_TIMEOUT_SECS", Some("20"))]); |
| assert_eq!( |
| resolve_router_timeout(Duration::from_secs(6)), |
| Duration::from_secs(6) |
| ); |
| } |
|
|
| #[test] |
| fn reads_optional_timeout_value_from_env() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_OPTIONAL", Some("42"))]); |
| assert_eq!( |
| env_optional_duration("MARIS_TEST_TIMEOUT_SECS_OPTIONAL"), |
| Some(Duration::from_secs(42)) |
| ); |
| } |
|
|
| #[test] |
| fn falls_back_when_env_timeout_invalid() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_INVALID", Some("abc"))]); |
| assert_eq!( |
| env_u64_or_default("MARIS_TEST_TIMEOUT_SECS_INVALID", 30), |
| 30 |
| ); |
| } |
|
|
| #[test] |
| fn infers_private_core_python_url_when_env_is_missing_on_railway() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[ |
| ("CORE_PYTHON_URL", None), |
| ( |
| "RAILWAY_PRIVATE_DOMAIN", |
| Some("maris-backend.railway.internal"), |
| ), |
| ]); |
|
|
| assert_eq!( |
| resolve_core_python_base_url(), |
| "http://maris-core-python.railway.internal" |
| ); |
| } |
|
|
| #[test] |
| fn rewrites_public_railway_core_python_url_to_private_sibling_when_available() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[ |
| ( |
| "CORE_PYTHON_URL", |
| Some("https://maris-core-python.up.railway.app"), |
| ), |
| ( |
| "RAILWAY_PRIVATE_DOMAIN", |
| Some("maris-backend.railway.internal"), |
| ), |
| ]); |
|
|
| assert_eq!( |
| resolve_core_python_base_url(), |
| "http://maris-core-python.railway.internal" |
| ); |
| } |
|
|
| #[test] |
| fn keeps_explicit_non_railway_core_python_url() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[ |
| ("CORE_PYTHON_URL", Some("https://example.com/core")), |
| ( |
| "RAILWAY_PRIVATE_DOMAIN", |
| Some("maris-backend.railway.internal"), |
| ), |
| ]); |
|
|
| assert_eq!(resolve_core_python_base_url(), "https://example.com/core"); |
| } |
|
|
| #[test] |
| fn adds_default_private_port_fallback_for_railway_internal_targets() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[ |
| ("CORE_PYTHON_URL", None), |
| ( |
| "RAILWAY_PRIVATE_DOMAIN", |
| Some("maris-backend.railway.internal"), |
| ), |
| ]); |
|
|
| assert_eq!( |
| resolve_core_python_base_urls(), |
| vec![ |
| "http://maris-core-python.railway.internal".to_string(), |
| "http://maris-core-python.railway.internal:8000".to_string() |
| ] |
| ); |
| } |
|
|
| #[test] |
| fn keeps_explicit_ported_private_url_as_primary_candidate() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = EnvGuard::set(&[ |
| ( |
| "CORE_PYTHON_URL", |
| Some("http://maris-core-python.railway.internal:8000/"), |
| ), |
| ("RAILWAY_PRIVATE_DOMAIN", None), |
| ]); |
|
|
| assert_eq!( |
| resolve_core_python_base_urls(), |
| vec!["http://maris-core-python.railway.internal:8000".to_string()] |
| ); |
| } |
|
|
| #[test] |
| fn keeps_local_default_when_not_running_on_railway() { |
| let _env_lock = ENV_LOCK.lock().unwrap(); |
| let _env_guard = |
| EnvGuard::set(&[("CORE_PYTHON_URL", None), ("RAILWAY_PRIVATE_DOMAIN", None)]); |
|
|
| assert_eq!(resolve_core_python_base_url(), DEFAULT_CORE_PYTHON_URL); |
| } |
| } |
|
|