File size: 7,746 Bytes
a21c316 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | use tokio::time::{sleep, Duration};
use tracing::{debug, info};
use axum::{http::StatusCode, response::{IntoResponse, Response}, Json, extract::State};
use serde_json::{json, Value};
use crate::proxy::server::AppState;
// ===== 统一重试与退避策略 =====
/// 重试策略枚举
#[derive(Debug, Clone)]
pub enum RetryStrategy {
/// 不重试,直接返回错误
NoRetry,
/// 固定延迟
FixedDelay(Duration),
/// 线性退避:base_ms * (attempt + 1)
LinearBackoff { base_ms: u64 },
/// 指数退避:base_ms * 2^attempt,上限 max_ms
ExponentialBackoff { base_ms: u64, max_ms: u64 },
/// [NEW] 原地重试 (Grace Retry):在当前账号上小窗口等待后直接重试,不计入常规切换
GraceRetry(Duration),
}
/// 根据错误状态码和错误信息确定重试策略
pub fn determine_retry_strategy(
status_code: u16,
error_text: &str,
retried_without_thinking: bool,
) -> RetryStrategy {
match status_code {
// 400 错误:仅在特定 Thinking 签名失败时重试一次
400 if !retried_without_thinking
&& (error_text.contains("Invalid `signature`")
|| error_text.contains("thinking.signature")
|| error_text.contains("thinking.thinking")
|| error_text.contains("Corrupted thought signature")) =>
{
RetryStrategy::FixedDelay(Duration::from_millis(200))
}
// 429 限流错误
429 => {
// 优先使用服务端返回的 Retry-After / quotaResetDelay
if let Some(delay_ms) = crate::proxy::upstream::retry::parse_retry_delay(error_text) {
// [NEW] 如果延迟在 2s 内,执行 Grace Retry (原地重试)
if crate::proxy::upstream::retry::should_grace_retry(delay_ms) {
let actual_delay = delay_ms.saturating_add(100); // 增加 100ms 安全缓冲
tracing::info!("Grace Retry Triggered: Delay {}ms is within window, using same account", actual_delay);
RetryStrategy::GraceRetry(Duration::from_millis(actual_delay))
} else {
let actual_delay = delay_ms.saturating_add(200).min(30_000);
RetryStrategy::FixedDelay(Duration::from_millis(actual_delay))
}
} else {
// 否则使用线性退避:起始 5s,逐步增加
RetryStrategy::LinearBackoff { base_ms: 5000 }
}
}
// 503 服务不可用 / 529 服务器过载
503 | 529 => {
// 指数退避:起始 10s,上限 60s (针对 Google 边缘节点过载)
RetryStrategy::ExponentialBackoff {
base_ms: 10000,
max_ms: 60000,
}
}
// 500 服务器内部错误
500 => {
// 线性退避:起始 3s
RetryStrategy::LinearBackoff { base_ms: 3000 }
}
// 401/403 认证/权限错误:切换账号前给予极短缓冲
401 | 403 => RetryStrategy::FixedDelay(Duration::from_millis(200)),
// 404 资源未找到:Google Cloud Code API 的 404 通常是账号级别的间歇性问题
// (灰度发布、账号权限不同步等),轮换账号往往能解决
404 => RetryStrategy::FixedDelay(Duration::from_millis(300)),
// 其他错误:不重试
_ => RetryStrategy::NoRetry,
}
}
/// 执行退避策略并返回是否应该继续重试
pub async fn apply_retry_strategy(
strategy: RetryStrategy,
attempt: usize,
max_attempts: usize,
status_code: u16,
trace_id: &str,
) -> bool {
match strategy {
RetryStrategy::NoRetry => {
debug!("[{}] Non-retryable error {}, stopping", trace_id, status_code);
false
}
RetryStrategy::FixedDelay(duration) => {
let base_ms = duration.as_millis() as u64;
info!(
"[{}] ⏱️ Retry with fixed delay: status={}, attempt={}/{}, delay={}ms",
trace_id,
status_code,
attempt + 1,
max_attempts,
base_ms
);
sleep(duration).await;
true
}
RetryStrategy::LinearBackoff { base_ms } => {
let calculated_ms = base_ms * (attempt as u64 + 1);
info!(
"[{}] ⏱️ Retry with linear backoff: status={}, attempt={}/{}, delay={}ms",
trace_id,
status_code,
attempt + 1,
max_attempts,
calculated_ms
);
sleep(Duration::from_millis(calculated_ms)).await;
true
}
RetryStrategy::ExponentialBackoff { base_ms, max_ms } => {
let calculated_ms = (base_ms * 2_u64.pow(attempt as u32)).min(max_ms);
info!(
"[{}] ⏱️ Retry with exponential backoff: status={}, attempt={}/{}, delay={}ms",
trace_id,
status_code,
attempt + 1,
max_attempts,
calculated_ms
);
sleep(Duration::from_millis(calculated_ms)).await;
true
}
RetryStrategy::GraceRetry(duration) => {
info!(
"[{}] ⚡ Grace Retry: Performing micro-wait ({}ms) on current account...",
trace_id,
duration.as_millis()
);
sleep(duration).await;
true // 原地重试在 handlers 层面通过 should_rotate_account 判断是否切换
}
}
}
/// 判断是否应该轮换账号
pub fn should_rotate_account(status_code: u16, strategy: Option<&RetryStrategy>) -> bool {
// [NEW] 如果识别为 Grace Retry,则显式要求不轮换账号
if let Some(RetryStrategy::GraceRetry(_)) = strategy {
return false;
}
match status_code {
// 这些错误是账号级别或特定节点配额的,需要轮换
429 | 401 | 403 | 404 | 500 => true,
// 503/529 通常是后端过载,切号效果有限,暂不轮换
503 | 529 => false,
_ => false,
}
}
/// Detects model capabilities and configuration
/// POST /v1/models/detect
pub async fn handle_detect_model(
State(state): State<AppState>,
Json(body): Json<Value>,
) -> Response {
let model_name = body.get("model").and_then(|v| v.as_str()).unwrap_or("");
if model_name.is_empty() {
return (StatusCode::BAD_REQUEST, "Missing 'model' field").into_response();
}
// 1. Resolve mapping
let mapped_model = crate::proxy::common::model_mapping::resolve_model_route(
model_name,
&*state.custom_mapping.read().await,
);
// 2. Resolve capabilities
let config = crate::proxy::mappers::common_utils::resolve_request_config(
model_name,
&mapped_model,
&None, // We don't check tools for static capability detection
None, // size
None, // quality
None, // image_size
None, // body (not needed for static detection)
);
// 3. Construct response
let mut response = json!({
"model": model_name,
"mapped_model": mapped_model,
"type": config.request_type,
"features": {
"has_web_search": config.inject_google_search,
"is_image_gen": config.request_type == "image_gen"
}
});
if let Some(img_conf) = config.image_config {
if let Some(obj) = response.as_object_mut() {
obj.insert("config".to_string(), img_conf);
}
}
Json(response).into_response()
}
|