File size: 7,692 Bytes
8fea9ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import asyncio
import json
from typing import Literal
from fastapi import HTTPException, Request, status
from fastapi.responses import StreamingResponse
from app.models import ChatCompletionRequest
from app.services import GeminiClient
from app.utils import protect_from_abuse, handle_gemini_error, handle_api_error
from .logging_utils import log
from .stream_handlers import process_stream_request
from .nonstream_handlers import process_nonstream_request

# 请求处理函数
async def process_request(

    chat_request: ChatCompletionRequest, 

    http_request: Request, 

    request_type: Literal['stream', 'non-stream'], 

    key_manager,

    response_cache_manager,

    active_requests_manager,

    safety_settings,

    safety_settings_g2,

    api_call_stats,

    FAKE_STREAMING,

    FAKE_STREAMING_INTERVAL,

    MAX_REQUESTS_PER_MINUTE,

    MAX_REQUESTS_PER_DAY_PER_IP,

    cache_key: str = None, 

    client_ip: str = None

):
    """处理API请求的主函数,根据需要处理流式或非流式请求"""
    global current_api_key
    
    # 请求前基本检查
    protect_from_abuse(
        http_request, MAX_REQUESTS_PER_MINUTE, MAX_REQUESTS_PER_DAY_PER_IP)
    if chat_request.model not in GeminiClient.AVAILABLE_MODELS:
        error_msg = "无效的模型"
        extra_log = {'request_type': request_type, 'model': chat_request.model, 'status_code': 400, 'error_message': error_msg}
        log('error', error_msg, extra=extra_log)
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, detail=error_msg)

    # 重置已尝试的密钥
    key_manager.reset_tried_keys_for_request()
    
    # 转换消息格式
    contents, system_instruction = GeminiClient.convert_messages(
        GeminiClient, chat_request.messages)

    # 设置重试次数(使用可用API密钥数量作为最大重试次数)
    retry_attempts = len(key_manager.api_keys) if key_manager.api_keys else 1
    
    # 尝试使用不同API密钥
    for attempt in range(1, retry_attempts + 1):
        # 获取下一个密钥
        current_api_key = key_manager.get_available_key()
        
        # 检查API密钥是否可用
        if current_api_key is None:
            log('warning', "没有可用的 API 密钥,跳过本次尝试", 
                extra={'request_type': request_type, 'model': chat_request.model, 'status_code': 'N/A'})
            break
        
        # 记录当前尝试的密钥信息
        log('info', f"第 {attempt}/{retry_attempts} 次尝试 ... 使用密钥: {current_api_key[:8]}...", 
            extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})

        # 服务器错误重试逻辑
        server_error_retries = 3
        for server_retry in range(1, server_error_retries + 1):
            try:
                # 根据请求类型分别处理
                if chat_request.stream:
                    try:
                        return await process_stream_request(
                            chat_request,
                            http_request,
                            contents,
                            system_instruction,
                            current_api_key,
                            key_manager,
                            safety_settings,
                            safety_settings_g2,
                            api_call_stats,
                            FAKE_STREAMING,
                            FAKE_STREAMING_INTERVAL
                        )
                    except Exception as e:
                        # 捕获流式请求的异常,但不立即返回错误
                        # 记录错误并继续尝试下一个API密钥
                        error_detail = handle_gemini_error(e, current_api_key, key_manager)
                        log('error', f"流式请求失败: {error_detail}",
                            extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model})
                        # 不返回错误,而是抛出异常让外层循环处理
                        raise
                else:
                    return await process_nonstream_request(
                        chat_request,
                        http_request,
                        request_type,
                        contents,
                        system_instruction,
                        current_api_key,
                        response_cache_manager,
                        active_requests_manager,
                        safety_settings,
                        safety_settings_g2,
                        api_call_stats,
                        cache_key,
                        client_ip
                    )
            except HTTPException as e:
                if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
                    log('error', "客户端连接中断", 
                        extra={'key': current_api_key[:8], 'request_type': request_type, 
                              'model': chat_request.model, 'status_code': 408})
                    raise
                else:
                    raise
            except Exception as e:
                # 使用统一的API错误处理函数
                error_result = await handle_api_error(
                    e, 
                    current_api_key, 
                    key_manager, 
                    request_type, 
                    chat_request.model, 
                    server_retry - 1
                )
                
                # 如果需要删除缓存,清除缓存
                if error_result.get('remove_cache', False) and cache_key and cache_key in response_cache_manager.cache:
                    log('info', f"因API错误,删除缓存: {cache_key[:8]}...", 
                        extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
                    del response_cache_manager.cache[cache_key]
                
                if error_result.get('should_retry', False):
                    # 服务器错误需要重试(等待已在handle_api_error中完成)
                    continue
                elif error_result.get('should_switch_key', False) and attempt < retry_attempts:
                    # 跳出服务器错误重试循环,获取下一个可用密钥
                    log('info', f"API密钥 {current_api_key[:8]}... 失败,准备尝试下一个密钥", 
                        extra={'key': current_api_key[:8], 'request_type': request_type})
                    break  
                else:
                    # 无法处理的错误或已达到重试上限
                    break

    # 如果所有尝试都失败
    msg = "所有API密钥均请求失败,请稍后重试"
    log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'key': 'N/A', 'request_type': 'switch_key', 'status_code': 'N/A'})
    
    # 对于流式请求,创建一个特殊的StreamingResponse返回错误
    if chat_request.stream:
        async def error_generator():
            error_json = json.dumps({'error': {'message': msg, 'type': 'api_error'}})
            yield f"data: {error_json}\n\n"
            yield "data: [DONE]\n\n"
        
        return StreamingResponse(error_generator(), media_type="text/event-stream")
    else:
        # 非流式请求使用标准HTTP异常
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=msg)