kkyygg hank9999 commited on
Commit
3a50c78
·
unverified ·
1 Parent(s): 280bd89

fix: 改进凭据故障转移策略,避免瞬态错误误禁用所有凭据(502错误高负载不禁用凭据, 继续重试) (#15)

Browse files

* fix: 改进凭据故障转移策略,避免瞬态错误误禁用所有凭据

* fix: 瞬态错误(429/5xx/网络)不切换凭据,保持与eec4f69设计一致

- 网络错误:不切换凭据,只重试+指数退避
- 429/408/5xx:不切换凭据,只重试+指数退避
- 兜底错误:不切换凭据,只重试+指数退避

这与 commit eec4f69 的设计一致:429 high traffic / 502 high load 等
瞬态错误不应切换凭据,因为切换到其他凭据可能遇到同样的上游问题。

---------

Co-authored-by: hank9999 <hank9999@qq.com>

Files changed (2) hide show
  1. src/kiro/provider.rs +67 -38
  2. src/kiro/token_manager.rs +73 -3
src/kiro/provider.rs CHANGED
@@ -7,6 +7,8 @@
7
  use reqwest::Client;
8
  use reqwest::header::{AUTHORIZATION, CONNECTION, CONTENT_TYPE, HOST, HeaderMap, HeaderValue};
9
  use std::sync::Arc;
 
 
10
  use uuid::Uuid;
11
 
12
  use crate::http_client::{ProxyConfig, build_client};
@@ -123,7 +125,8 @@ impl KiroProvider {
123
  ///
124
  /// 支持多凭据故障转移:
125
  /// - 400 Bad Request: 直接返回错误,不计入凭据失败
126
- /// - 其他错误: 计入失败次数,达到阈值后切换凭据重试
 
127
  ///
128
  /// # Arguments
129
  /// * `request_body` - JSON 格式的请求体字符串
@@ -138,7 +141,8 @@ impl KiroProvider {
138
  ///
139
  /// 支持多凭据故障转移:
140
  /// - 400 Bad Request: 直接返回错误,不计入凭据失败
141
- /// - 其他错误: 计入失败次数,达到阈值后切换凭据重试
 
142
  ///
143
  /// # Arguments
144
  /// * `request_body` - JSON 格式的请求体字符串
@@ -163,6 +167,7 @@ impl KiroProvider {
163
  let total_credentials = self.token_manager.total_count();
164
  let max_retries = (total_credentials * MAX_RETRIES_PER_CREDENTIAL).min(MAX_TOTAL_RETRIES);
165
  let mut last_error: Option<anyhow::Error> = None;
 
166
 
167
  for attempt in 0..max_retries {
168
  // 获取调用上下文(绑定 index、credentials、token)
@@ -200,11 +205,12 @@ impl KiroProvider {
200
  max_retries,
201
  e
202
  );
203
- // 网络错误,报告失败并重试(使用绑定的 id)
204
- if !self.token_manager.report_failure(ctx.id) {
205
- return Err(e.into());
206
- }
207
  last_error = Some(e.into());
 
 
 
208
  continue;
209
  }
210
  };
@@ -217,63 +223,75 @@ impl KiroProvider {
217
  return Ok(response);
218
  }
219
 
220
- // 400 Bad Request - 不算凭据错误,直接返回
 
 
 
221
  if status.as_u16() == 400 {
222
- let body = response.text().await.unwrap_or_default();
223
- let api_type = if is_stream { "流式" } else { "非流式" };
224
  anyhow::bail!("{} API 请求失败: {} {}", api_type, status, body);
225
  }
226
 
227
- // 429 Too Many Requests - 限流错误,不算凭据错误,重试但不禁用凭据
228
- if status.as_u16() == 429 {
229
- let body = response.text().await.unwrap_or_default();
230
  tracing::warn!(
231
- "API 请求被限流(尝试 {}/{}): {} {}",
232
  attempt + 1,
233
  max_retries,
234
  status,
235
  body
236
  );
237
- last_error = Some(anyhow::anyhow!(
238
- "{} API 请求被限流: {} {}",
239
- if is_stream { "流式" } else { "非流式" },
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240
  status,
241
  body
242
- ));
 
 
 
 
243
  continue;
244
  }
245
 
246
- // 其他错误 - 记录失败并可能重试(使用绑定的 id)
247
- let body = response.text().await.unwrap_or_default();
 
 
 
 
248
  tracing::warn!(
249
- "API 请求失败(尝试 {}/{}): {} {}",
250
  attempt + 1,
251
  max_retries,
252
  status,
253
  body
254
  );
255
-
256
- let has_available = self.token_manager.report_failure(ctx.id);
257
- if !has_available {
258
- let api_type = if is_stream { "流式" } else { "非流式" };
259
- anyhow::bail!(
260
- "{} API 请求失败(所有凭据已用尽): {} {}",
261
- api_type,
262
- status,
263
- body
264
- );
265
  }
266
-
267
- last_error = Some(anyhow::anyhow!(
268
- "{} API 请求失败: {} {}",
269
- if is_stream { "流式" } else { "非流式" },
270
- status,
271
- body
272
- ));
273
  }
274
 
275
  // 所有重试都失败
276
- let api_type = if is_stream { "流式" } else { "非流式" };
277
  Err(last_error.unwrap_or_else(|| {
278
  anyhow::anyhow!(
279
  "{} API 请求失败:已达到最大重试次数({}次)",
@@ -282,6 +300,17 @@ impl KiroProvider {
282
  )
283
  }))
284
  }
 
 
 
 
 
 
 
 
 
 
 
285
  }
286
 
287
  #[cfg(test)]
 
7
  use reqwest::Client;
8
  use reqwest::header::{AUTHORIZATION, CONNECTION, CONTENT_TYPE, HOST, HeaderMap, HeaderValue};
9
  use std::sync::Arc;
10
+ use std::time::Duration;
11
+ use tokio::time::sleep;
12
  use uuid::Uuid;
13
 
14
  use crate::http_client::{ProxyConfig, build_client};
 
125
  ///
126
  /// 支持多凭据故障转移:
127
  /// - 400 Bad Request: 直接返回错误,不计入凭据失败
128
+ /// - 401/403: 视为凭据/权限问题,计入失败次数并允许故障转移
129
+ /// - 429/5xx/网络等瞬态错误: 重试但不禁用或切换凭据(避免误把所有凭据锁死)
130
  ///
131
  /// # Arguments
132
  /// * `request_body` - JSON 格式的请求体字符串
 
141
  ///
142
  /// 支持多凭据故障转移:
143
  /// - 400 Bad Request: 直接返回错误,不计入凭据失败
144
+ /// - 401/403: 视为凭据/权限问题,计入失败次数并允许故障转移
145
+ /// - 429/5xx/网络等瞬态错误: 重试但不禁用或切换凭据(避免误把所有凭据锁死)
146
  ///
147
  /// # Arguments
148
  /// * `request_body` - JSON 格式的请求体字符串
 
167
  let total_credentials = self.token_manager.total_count();
168
  let max_retries = (total_credentials * MAX_RETRIES_PER_CREDENTIAL).min(MAX_TOTAL_RETRIES);
169
  let mut last_error: Option<anyhow::Error> = None;
170
+ let api_type = if is_stream { "流式" } else { "非流式" };
171
 
172
  for attempt in 0..max_retries {
173
  // 获取调用上下文(绑定 index、credentials、token)
 
205
  max_retries,
206
  e
207
  );
208
+ // 网络错误通常是上游/链路瞬态问题,不应导致"禁用凭据"或"切换凭据"
209
+ // (否则一段时间网络抖动会把所有凭据都误禁用,需要重启才能恢复)
 
 
210
  last_error = Some(e.into());
211
+ if attempt + 1 < max_retries {
212
+ sleep(Self::retry_delay(attempt)).await;
213
+ }
214
  continue;
215
  }
216
  };
 
223
  return Ok(response);
224
  }
225
 
226
+ // 失败响应:读取 body 用于日志/错误信息
227
+ let body = response.text().await.unwrap_or_default();
228
+
229
+ // 400 Bad Request - 请求问题,重试/切换凭据无意义
230
  if status.as_u16() == 400 {
 
 
231
  anyhow::bail!("{} API 请求失败: {} {}", api_type, status, body);
232
  }
233
 
234
+ // 401/403 - 更可能是凭据/权限问题:计入失败并允许故障转移
235
+ if matches!(status.as_u16(), 401 | 403) {
 
236
  tracing::warn!(
237
+ "API 请求失败(可能为凭据错误,尝试 {}/{}): {} {}",
238
  attempt + 1,
239
  max_retries,
240
  status,
241
  body
242
  );
243
+
244
+ let has_available = self.token_manager.report_failure(ctx.id);
245
+ if !has_available {
246
+ anyhow::bail!(
247
+ "{} API 请求失败(所有凭据已用尽): {} {}",
248
+ api_type,
249
+ status,
250
+ body
251
+ );
252
+ }
253
+
254
+ last_error = Some(anyhow::anyhow!("{} API 请求失败: {} {}", api_type, status, body));
255
+ continue;
256
+ }
257
+
258
+ // 429/408/5xx - 瞬态上游错误:重试但不禁用或切换凭据
259
+ // (避免 429 high traffic / 502 high load 等瞬态错误把所有凭据锁死)
260
+ if matches!(status.as_u16(), 408 | 429) || status.is_server_error() {
261
+ tracing::warn!(
262
+ "API 请求失败(上游瞬态错误,尝试 {}/{}): {} {}",
263
+ attempt + 1,
264
+ max_retries,
265
  status,
266
  body
267
+ );
268
+ last_error = Some(anyhow::anyhow!("{} API 请求失败: {} {}", api_type, status, body));
269
+ if attempt + 1 < max_retries {
270
+ sleep(Self::retry_delay(attempt)).await;
271
+ }
272
  continue;
273
  }
274
 
275
+ // 其他 4xx - 通常为请求/配置问题:直接返回,不计入凭据失败
276
+ if status.is_client_error() {
277
+ anyhow::bail!("{} API 请求失败: {} {}", api_type, status, body);
278
+ }
279
+
280
+ // 兜底:当作可重试的瞬态错误处理(不切换凭据)
281
  tracing::warn!(
282
+ "API 请求失败(未知错误,尝试 {}/{}): {} {}",
283
  attempt + 1,
284
  max_retries,
285
  status,
286
  body
287
  );
288
+ last_error = Some(anyhow::anyhow!("{} API 请求失败: {} {}", api_type, status, body));
289
+ if attempt + 1 < max_retries {
290
+ sleep(Self::retry_delay(attempt)).await;
 
 
 
 
 
 
 
291
  }
 
 
 
 
 
 
 
292
  }
293
 
294
  // 所有重试都失败
 
295
  Err(last_error.unwrap_or_else(|| {
296
  anyhow::anyhow!(
297
  "{} API 请求失败:已达到最大重试次数({}次)",
 
300
  )
301
  }))
302
  }
303
+
304
+ fn retry_delay(attempt: usize) -> Duration {
305
+ // 指数退避 + 少量抖动,避免上游抖动时放大故障
306
+ const BASE_MS: u64 = 200;
307
+ const MAX_MS: u64 = 2_000;
308
+ let exp = BASE_MS.saturating_mul(2u64.saturating_pow(attempt.min(6) as u32));
309
+ let backoff = exp.min(MAX_MS);
310
+ let jitter_max = (backoff / 4).max(1);
311
+ let jitter = fastrand::u64(0..=jitter_max);
312
+ Duration::from_millis(backoff.saturating_add(jitter))
313
+ }
314
  }
315
 
316
  #[cfg(test)]
src/kiro/token_manager.rs CHANGED
@@ -373,6 +373,17 @@ struct CredentialEntry {
373
  failure_count: u32,
374
  /// 是否已禁用
375
  disabled: bool,
 
 
 
 
 
 
 
 
 
 
 
376
  }
377
 
378
  // ============================================================================
@@ -485,6 +496,7 @@ impl MultiTokenManager {
485
  credentials: cred,
486
  failure_count: 0,
487
  disabled: false,
 
488
  }
489
  })
490
  .collect();
@@ -577,7 +589,7 @@ impl MultiTokenManager {
577
  }
578
 
579
  let (id, credentials) = {
580
- let entries = self.entries.lock();
581
  let current_id = *self.current_id.lock();
582
 
583
  // 找到当前凭据
@@ -585,11 +597,34 @@ impl MultiTokenManager {
585
  (entry.id, entry.credentials.clone())
586
  } else {
587
  // 当前凭据不可用,选择优先级最高的可用凭据
588
- if let Some(entry) = entries
589
  .iter()
590
  .filter(|e| !e.disabled)
591
- .min_by_key(|e| e.credentials.priority)
 
 
 
 
 
 
592
  {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
593
  // 先提取数据
594
  let new_id = entry.id;
595
  let new_creds = entry.credentials.clone();
@@ -829,6 +864,7 @@ impl MultiTokenManager {
829
 
830
  if failure_count >= MAX_FAILURES_PER_CREDENTIAL {
831
  entry.disabled = true;
 
832
  tracing::error!("凭据 #{} 已连续失败 {} 次,已被禁用", id, failure_count);
833
 
834
  // 切换到优先级最高的可用凭据
@@ -932,6 +968,9 @@ impl MultiTokenManager {
932
  if !disabled {
933
  // 启用时重置失败计数
934
  entry.failure_count = 0;
 
 
 
935
  }
936
  }
937
  // 持久化更改
@@ -969,6 +1008,7 @@ impl MultiTokenManager {
969
  .ok_or_else(|| anyhow::anyhow!("凭据不存在: {}", id))?;
970
  entry.failure_count = 0;
971
  entry.disabled = false;
 
972
  }
973
  // 持久化更改
974
  self.persist_credentials()?;
@@ -1079,6 +1119,7 @@ impl MultiTokenManager {
1079
  credentials: validated_cred,
1080
  failure_count: 0,
1081
  disabled: false,
 
1082
  });
1083
  }
1084
 
@@ -1340,4 +1381,33 @@ mod tests {
1340
  Some("token2".to_string())
1341
  );
1342
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1343
  }
 
373
  failure_count: u32,
374
  /// 是否已禁用
375
  disabled: bool,
376
+ /// 禁用原因(用于区分手动禁用 vs 自动禁用,便于自愈)
377
+ disabled_reason: Option<DisabledReason>,
378
+ }
379
+
380
+ /// 禁用原因
381
+ #[derive(Debug, Clone, Copy, PartialEq, Eq)]
382
+ enum DisabledReason {
383
+ /// Admin API 手动禁用
384
+ Manual,
385
+ /// 连续失败达到阈值后自动禁用
386
+ TooManyFailures,
387
  }
388
 
389
  // ============================================================================
 
496
  credentials: cred,
497
  failure_count: 0,
498
  disabled: false,
499
+ disabled_reason: None,
500
  }
501
  })
502
  .collect();
 
589
  }
590
 
591
  let (id, credentials) = {
592
+ let mut entries = self.entries.lock();
593
  let current_id = *self.current_id.lock();
594
 
595
  // 找到当前凭据
 
597
  (entry.id, entry.credentials.clone())
598
  } else {
599
  // 当前凭据不可用,选择优先级最高的可用凭据
600
+ let mut best = entries
601
  .iter()
602
  .filter(|e| !e.disabled)
603
+ .min_by_key(|e| e.credentials.priority);
604
+
605
+ // 没有可用凭据:如果是“自动禁用导致全灭”,做一次类似重启的自愈
606
+ if best.is_none()
607
+ && entries.iter().any(|e| {
608
+ e.disabled && e.disabled_reason == Some(DisabledReason::TooManyFailures)
609
+ })
610
  {
611
+ tracing::warn!(
612
+ "所有凭据均已被自动禁用,执行自愈:重置失败计数并重新启用(等价于重启)"
613
+ );
614
+ for e in entries.iter_mut() {
615
+ if e.disabled_reason == Some(DisabledReason::TooManyFailures) {
616
+ e.disabled = false;
617
+ e.disabled_reason = None;
618
+ e.failure_count = 0;
619
+ }
620
+ }
621
+ best = entries
622
+ .iter()
623
+ .filter(|e| !e.disabled)
624
+ .min_by_key(|e| e.credentials.priority);
625
+ }
626
+
627
+ if let Some(entry) = best {
628
  // 先提取数据
629
  let new_id = entry.id;
630
  let new_creds = entry.credentials.clone();
 
864
 
865
  if failure_count >= MAX_FAILURES_PER_CREDENTIAL {
866
  entry.disabled = true;
867
+ entry.disabled_reason = Some(DisabledReason::TooManyFailures);
868
  tracing::error!("凭据 #{} 已连续失败 {} 次,已被禁用", id, failure_count);
869
 
870
  // 切换到优先级最高的可用凭据
 
968
  if !disabled {
969
  // 启用时重置失败计数
970
  entry.failure_count = 0;
971
+ entry.disabled_reason = None;
972
+ } else {
973
+ entry.disabled_reason = Some(DisabledReason::Manual);
974
  }
975
  }
976
  // 持久化更改
 
1008
  .ok_or_else(|| anyhow::anyhow!("凭据不存在: {}", id))?;
1009
  entry.failure_count = 0;
1010
  entry.disabled = false;
1011
+ entry.disabled_reason = None;
1012
  }
1013
  // 持久化更改
1014
  self.persist_credentials()?;
 
1119
  credentials: validated_cred,
1120
  failure_count: 0,
1121
  disabled: false,
1122
+ disabled_reason: None,
1123
  });
1124
  }
1125
 
 
1381
  Some("token2".to_string())
1382
  );
1383
  }
1384
+
1385
+ #[tokio::test]
1386
+ async fn test_multi_token_manager_acquire_context_auto_recovers_all_disabled() {
1387
+ let config = Config::default();
1388
+ let mut cred1 = KiroCredentials::default();
1389
+ cred1.access_token = Some("t1".to_string());
1390
+ cred1.expires_at = Some((Utc::now() + Duration::hours(1)).to_rfc3339());
1391
+ let mut cred2 = KiroCredentials::default();
1392
+ cred2.access_token = Some("t2".to_string());
1393
+ cred2.expires_at = Some((Utc::now() + Duration::hours(1)).to_rfc3339());
1394
+
1395
+ let manager =
1396
+ MultiTokenManager::new(config, vec![cred1, cred2], None, None, false).unwrap();
1397
+
1398
+ // 凭据会自动分配 ID(从 1 开始)
1399
+ for _ in 0..MAX_FAILURES_PER_CREDENTIAL {
1400
+ manager.report_failure(1);
1401
+ }
1402
+ for _ in 0..MAX_FAILURES_PER_CREDENTIAL {
1403
+ manager.report_failure(2);
1404
+ }
1405
+
1406
+ assert_eq!(manager.available_count(), 0);
1407
+
1408
+ // 应触发自愈:重置失败计数并重新启用,避免必须重启进程
1409
+ let ctx = manager.acquire_context().await.unwrap();
1410
+ assert!(ctx.token == "t1" || ctx.token == "t2");
1411
+ assert_eq!(manager.available_count(), 2);
1412
+ }
1413
  }