maris-ai-master / backend-rust /src /inference /python_bridge.rs
MarisUK's picture
Maris AI model sync
f440f03 verified
/// PythonBridge — nosūta HTTP pieprasījumus uz core-python FastAPI servisu.
use anyhow::{anyhow, Result};
use reqwest::{Client, Response};
use serde::de::DeserializeOwned;
use serde_json::Value;
use std::time::Duration;
use tracing::warn;
use crate::inference::response_validator::ValidateCoreResponse;
pub struct PythonBridge {
client: Client,
base_urls: Vec<String>,
request_timeout: Duration,
router_timeout: Duration,
}
const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 5;
const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 45;
const DEFAULT_ROUTER_TIMEOUT_SECS: u64 = 8;
const DEFAULT_CORE_PYTHON_URL: &str = "http://localhost:8000";
const RAILWAY_PUBLIC_SUFFIX: &str = ".up.railway.app";
const RAILWAY_PRIVATE_SUFFIX: &str = ".railway.internal";
const BACKEND_SERVICE_NAME: &str = "backend";
const CORE_PYTHON_SERVICE_NAME: &str = "core-python";
fn env_u64_or_default(key: &str, default: u64) -> u64 {
std::env::var(key)
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.filter(|value| *value > 0)
.unwrap_or(default)
}
/// Parse an optional timeout from the environment.
///
/// Returns `Some(Duration)` when the variable contains a positive integer value
/// in seconds, otherwise returns `None` for missing, invalid, or zero values.
fn env_optional_duration(key: &str) -> Option<Duration> {
std::env::var(key)
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.filter(|value| *value > 0)
.map(Duration::from_secs)
}
fn resolve_request_timeout() -> Duration {
env_optional_duration("CORE_PYTHON_TIMEOUT_SECS")
.unwrap_or_else(|| Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS))
}
fn resolve_router_timeout(request_timeout: Duration) -> Duration {
env_optional_duration("CORE_PYTHON_ROUTER_TIMEOUT_SECS")
.unwrap_or_else(|| Duration::from_secs(DEFAULT_ROUTER_TIMEOUT_SECS))
.min(request_timeout)
}
fn first_non_empty_env_value(keys: &[&str]) -> Option<String> {
keys.iter()
.find_map(|key| std::env::var(key).ok())
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
fn strip_url_scheme(value: &str) -> &str {
value
.strip_prefix("https://")
.or_else(|| value.strip_prefix("http://"))
.or_else(|| value.strip_prefix("wss://"))
.or_else(|| value.strip_prefix("ws://"))
.unwrap_or(value)
}
fn replace_service_token(label: &str, source: &str, target: &str) -> Option<String> {
let mut replaced = false;
let tokens = label
.split('-')
.map(|token| {
if !replaced && token == source {
replaced = true;
target
} else {
token
}
})
.collect::<Vec<_>>();
replaced.then(|| tokens.join("-"))
}
fn label_contains_service(label: &str, service: &str) -> bool {
label == service
|| label.starts_with(&format!("{service}-"))
|| label.ends_with(&format!("-{service}"))
|| label.contains(&format!("-{service}-"))
}
fn infer_railway_sibling_hostname(domain: &str, source: &str, target: &str) -> Option<String> {
let hostname = strip_url_scheme(domain.trim())
.trim_end_matches('/')
.trim()
.to_string();
let (label, rest) = hostname.split_once('.')?;
let replaced = replace_service_token(label, source, target)?;
Some(format!("{replaced}.{rest}"))
}
fn infer_private_core_python_url() -> Option<String> {
first_non_empty_env_value(&["RAILWAY_PRIVATE_DOMAIN"])
.and_then(|domain| {
infer_railway_sibling_hostname(&domain, BACKEND_SERVICE_NAME, CORE_PYTHON_SERVICE_NAME)
})
.map(|hostname| format!("http://{hostname}"))
}
fn is_public_railway_core_python_url(base_url: &str) -> bool {
let hostname = strip_url_scheme(base_url).trim_end_matches('/');
hostname.ends_with(RAILWAY_PUBLIC_SUFFIX)
&& hostname
.split('.')
.next()
.is_some_and(|label| label_contains_service(label, CORE_PYTHON_SERVICE_NAME))
}
fn resolve_core_python_base_url() -> String {
let configured_value = first_non_empty_env_value(&["CORE_PYTHON_URL"]);
let normalized = normalize_http_base_url(
configured_value
.as_deref()
.unwrap_or(DEFAULT_CORE_PYTHON_URL),
);
if let Some(private_url) = infer_private_core_python_url() {
if configured_value.is_none()
|| normalized == DEFAULT_CORE_PYTHON_URL
|| is_public_railway_core_python_url(&normalized)
{
return private_url;
}
}
normalized
}
fn add_url_candidate(candidates: &mut Vec<String>, candidate: String) {
if !candidates.iter().any(|existing| existing == &candidate) {
candidates.push(candidate);
}
}
fn railway_private_url_with_default_port(base_url: &str, default_port: u16) -> Option<String> {
let (scheme, rest) = base_url
.strip_prefix("http://")
.map(|rest| ("http://", rest))
.or_else(|| {
base_url
.strip_prefix("https://")
.map(|rest| ("https://", rest))
})?;
let (authority, suffix) = match rest.find('/') {
Some(index) => (&rest[..index], &rest[index..]),
None => (rest, ""),
};
if authority.contains(':') || !authority.ends_with(RAILWAY_PRIVATE_SUFFIX) {
return None;
}
Some(format!("{scheme}{authority}:{default_port}{suffix}"))
}
fn resolve_core_python_base_urls() -> Vec<String> {
let configured_value = first_non_empty_env_value(&["CORE_PYTHON_URL"]);
let primary = resolve_core_python_base_url();
let mut candidates = vec![primary.clone()];
if let Some(port_variant) = railway_private_url_with_default_port(&primary, 8000) {
add_url_candidate(&mut candidates, port_variant);
}
if let Some(configured) = configured_value {
let normalized = normalize_http_base_url(&configured);
add_url_candidate(&mut candidates, normalized.clone());
if let Some(port_variant) = railway_private_url_with_default_port(&normalized, 8000) {
add_url_candidate(&mut candidates, port_variant);
}
}
candidates
}
pub(crate) fn normalize_http_base_url(base_url: &str) -> String {
let trimmed = base_url.trim().trim_end_matches('/');
if trimmed.is_empty() {
return DEFAULT_CORE_PYTHON_URL.to_string();
}
let normalized = if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
trimmed.to_string()
} else if trimmed.starts_with("localhost")
|| trimmed.starts_with("127.")
|| trimmed.starts_with("0.0.0.0")
|| trimmed.contains(':')
|| trimmed.ends_with(".local")
|| trimmed.ends_with(RAILWAY_PRIVATE_SUFFIX)
{
format!("http://{trimmed}")
} else {
format!("https://{trimmed}")
};
normalized
}
impl PythonBridge {
fn timeout_for_endpoint(&self, endpoint: &str) -> Duration {
if endpoint == "orchestrator/route" {
self.router_timeout
} else {
self.request_timeout
}
}
pub fn new() -> Self {
let base_urls = resolve_core_python_base_urls();
let request_timeout = resolve_request_timeout();
let router_timeout = resolve_router_timeout(request_timeout);
let connect_timeout = Duration::from_secs(env_u64_or_default(
"CORE_PYTHON_CONNECT_TIMEOUT_SECS",
DEFAULT_CONNECT_TIMEOUT_SECS,
));
let client_builder = Client::builder()
.connect_timeout(connect_timeout)
// Use a per-read timeout so SSE/text streams can stay open while bytes
// continue flowing, but stalled model generations still fail cleanly.
.read_timeout(request_timeout);
let client = client_builder.build().expect("Failed to build HTTP client");
Self {
client,
base_urls,
request_timeout,
router_timeout,
}
}
/// Izsauc core-python API endpoint.
/// `endpoint` — ceļš bez slīpsvītras sākumā, piem. "text/generate"
pub async fn call<T>(&self, endpoint: &str, payload: &Value) -> Result<T>
where
T: DeserializeOwned + ValidateCoreResponse,
{
let mut last_error = None;
for (index, base_url) in self.base_urls.iter().enumerate() {
let url = format!("{base_url}/v1/{endpoint}");
let request_timeout = self.timeout_for_endpoint(endpoint);
let response = match self
.client
.post(&url)
.timeout(request_timeout)
.json(payload)
.send()
.await
{
Ok(response) => response,
Err(error) => {
let error = if error.is_timeout() {
anyhow!(
"Python bridge timeout pēc {}s ({url})",
request_timeout.as_secs()
)
} else {
anyhow!("Python bridge HTTP kļūda: {}", error)
};
if let Some(next_base_url) = self.base_urls.get(index + 1) {
warn!(
"Python bridge request to {url} failed, retrying {}: {error}",
format!("{next_base_url}/v1/{endpoint}")
);
}
last_error = Some(error);
continue;
}
};
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(anyhow!("Python bridge atbildēja ar {}: {}", status, body));
}
let json: T = response
.json()
.await
.map_err(|e| anyhow!("Python bridge JSON kļūda: {}", e))?;
return json
.validate()
.map_err(|e| anyhow!("Python bridge atgrieza nederīgu payload: {}", e));
}
Err(last_error.unwrap_or_else(|| anyhow!("Python bridge URL nav konfigurēts")))
}
pub async fn get<T>(&self, endpoint: &str) -> Result<T>
where
T: DeserializeOwned + ValidateCoreResponse,
{
let mut last_error = None;
for (index, base_url) in self.base_urls.iter().enumerate() {
let url = format!("{base_url}/v1/{endpoint}");
let request_timeout = self.timeout_for_endpoint(endpoint);
let response = match self.client.get(&url).timeout(request_timeout).send().await {
Ok(response) => response,
Err(error) => {
let error = if error.is_timeout() {
anyhow!(
"Python bridge timeout pēc {}s ({url})",
request_timeout.as_secs()
)
} else {
anyhow!("Python bridge HTTP kļūda: {}", error)
};
if let Some(next_base_url) = self.base_urls.get(index + 1) {
warn!(
"Python bridge GET request to {url} failed, retrying {}: {error}",
format!("{next_base_url}/v1/{endpoint}")
);
}
last_error = Some(error);
continue;
}
};
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(anyhow!("Python bridge atbildēja ar {}: {}", status, body));
}
let json: T = response
.json()
.await
.map_err(|e| anyhow!("Python bridge JSON kļūda: {}", e))?;
return json
.validate()
.map_err(|e| anyhow!("Python bridge atgrieza nederīgu payload: {}", e));
}
Err(last_error.unwrap_or_else(|| anyhow!("Python bridge URL nav konfigurēts")))
}
pub async fn stream(&self, endpoint: &str, payload: &Value) -> Result<Response> {
let mut last_error = None;
for (index, base_url) in self.base_urls.iter().enumerate() {
let url = format!("{base_url}/v1/{endpoint}");
let request_timeout = self.timeout_for_endpoint(endpoint);
let response = match self
.client
.post(&url)
.timeout(request_timeout)
.header("Accept", "text/event-stream")
.json(payload)
.send()
.await
{
Ok(response) => response,
Err(error) => {
let error = if error.is_timeout() {
anyhow!(
"Python bridge timeout pēc {}s ({url})",
request_timeout.as_secs()
)
} else {
anyhow!("Python bridge HTTP kļūda: {}", error)
};
if let Some(next_base_url) = self.base_urls.get(index + 1) {
warn!(
"Python bridge stream request to {url} failed, retrying {}: {error}",
format!("{next_base_url}/v1/{endpoint}")
);
}
last_error = Some(error);
continue;
}
};
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(anyhow!("Python bridge atbildēja ar {}: {}", status, body));
}
return Ok(response);
}
Err(last_error.unwrap_or_else(|| anyhow!("Python bridge URL nav konfigurēts")))
}
}
impl Default for PythonBridge {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::{
env_optional_duration, env_u64_or_default, normalize_http_base_url,
resolve_core_python_base_url, resolve_core_python_base_urls, resolve_request_timeout,
resolve_router_timeout, DEFAULT_CORE_PYTHON_URL, DEFAULT_REQUEST_TIMEOUT_SECS,
DEFAULT_ROUTER_TIMEOUT_SECS,
};
use std::env;
use std::sync::{LazyLock, Mutex};
use std::time::Duration;
static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
struct EnvGuard {
previous: Vec<(&'static str, Option<String>)>,
}
impl EnvGuard {
fn set(entries: &[(&'static str, Option<&str>)]) -> Self {
let previous = entries
.iter()
.map(|(key, value)| {
let original = env::var(key).ok();
match value {
Some(value) => env::set_var(key, value),
None => env::remove_var(key),
}
(*key, original)
})
.collect();
Self { previous }
}
}
impl Drop for EnvGuard {
fn drop(&mut self) {
for (key, value) in self.previous.drain(..) {
match value {
Some(value) => env::set_var(key, value),
None => env::remove_var(key),
}
}
}
}
#[test]
fn adds_https_for_public_host_values() {
assert_eq!(
normalize_http_base_url("maris-core-python.up.railway.app"),
"https://maris-core-python.up.railway.app"
);
}
#[test]
fn adds_http_for_localhost_values() {
assert_eq!(
normalize_http_base_url("localhost:8000"),
"http://localhost:8000"
);
}
#[test]
fn adds_http_for_bare_hostport_values() {
assert_eq!(
normalize_http_base_url("maris-core-python:8000"),
"http://maris-core-python:8000"
);
}
#[test]
fn keeps_explicit_port_on_railway_private_url() {
assert_eq!(
normalize_http_base_url("http://maris-core-python.railway.internal:8000/"),
"http://maris-core-python.railway.internal:8000"
);
}
#[test]
fn reads_timeout_value_from_env() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_VALID", Some("42"))]);
assert_eq!(env_u64_or_default("MARIS_TEST_TIMEOUT_SECS_VALID", 30), 42);
}
#[test]
fn env_optional_duration_returns_none_when_missing() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_OPTIONAL", None)]);
assert_eq!(
env_optional_duration("MARIS_TEST_TIMEOUT_SECS_OPTIONAL"),
None
);
}
#[test]
fn core_python_timeout_uses_default_when_env_is_missing() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[("CORE_PYTHON_TIMEOUT_SECS", None)]);
assert_eq!(
resolve_request_timeout(),
Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)
);
}
#[test]
fn router_timeout_uses_default_when_env_is_missing() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[("CORE_PYTHON_ROUTER_TIMEOUT_SECS", None)]);
assert_eq!(
resolve_router_timeout(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)),
Duration::from_secs(DEFAULT_ROUTER_TIMEOUT_SECS)
);
}
#[test]
fn router_timeout_is_capped_by_global_request_timeout() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[("CORE_PYTHON_ROUTER_TIMEOUT_SECS", Some("20"))]);
assert_eq!(
resolve_router_timeout(Duration::from_secs(6)),
Duration::from_secs(6)
);
}
#[test]
fn reads_optional_timeout_value_from_env() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_OPTIONAL", Some("42"))]);
assert_eq!(
env_optional_duration("MARIS_TEST_TIMEOUT_SECS_OPTIONAL"),
Some(Duration::from_secs(42))
);
}
#[test]
fn falls_back_when_env_timeout_invalid() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[("MARIS_TEST_TIMEOUT_SECS_INVALID", Some("abc"))]);
assert_eq!(
env_u64_or_default("MARIS_TEST_TIMEOUT_SECS_INVALID", 30),
30
);
}
#[test]
fn infers_private_core_python_url_when_env_is_missing_on_railway() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[
("CORE_PYTHON_URL", None),
(
"RAILWAY_PRIVATE_DOMAIN",
Some("maris-backend.railway.internal"),
),
]);
assert_eq!(
resolve_core_python_base_url(),
"http://maris-core-python.railway.internal"
);
}
#[test]
fn rewrites_public_railway_core_python_url_to_private_sibling_when_available() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[
(
"CORE_PYTHON_URL",
Some("https://maris-core-python.up.railway.app"),
),
(
"RAILWAY_PRIVATE_DOMAIN",
Some("maris-backend.railway.internal"),
),
]);
assert_eq!(
resolve_core_python_base_url(),
"http://maris-core-python.railway.internal"
);
}
#[test]
fn keeps_explicit_non_railway_core_python_url() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[
("CORE_PYTHON_URL", Some("https://example.com/core")),
(
"RAILWAY_PRIVATE_DOMAIN",
Some("maris-backend.railway.internal"),
),
]);
assert_eq!(resolve_core_python_base_url(), "https://example.com/core");
}
#[test]
fn adds_default_private_port_fallback_for_railway_internal_targets() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[
("CORE_PYTHON_URL", None),
(
"RAILWAY_PRIVATE_DOMAIN",
Some("maris-backend.railway.internal"),
),
]);
assert_eq!(
resolve_core_python_base_urls(),
vec![
"http://maris-core-python.railway.internal".to_string(),
"http://maris-core-python.railway.internal:8000".to_string()
]
);
}
#[test]
fn keeps_explicit_ported_private_url_as_primary_candidate() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard = EnvGuard::set(&[
(
"CORE_PYTHON_URL",
Some("http://maris-core-python.railway.internal:8000/"),
),
("RAILWAY_PRIVATE_DOMAIN", None),
]);
assert_eq!(
resolve_core_python_base_urls(),
vec!["http://maris-core-python.railway.internal:8000".to_string()]
);
}
#[test]
fn keeps_local_default_when_not_running_on_railway() {
let _env_lock = ENV_LOCK.lock().unwrap();
let _env_guard =
EnvGuard::set(&[("CORE_PYTHON_URL", None), ("RAILWAY_PRIVATE_DOMAIN", None)]);
assert_eq!(resolve_core_python_base_url(), DEFAULT_CORE_PYTHON_URL);
}
}