File size: 6,450 Bytes
8fea9ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import asyncio
from fastapi import HTTPException, status, Request
from app.models import ChatCompletionRequest
from app.services import GeminiClient
from app.utils import cache_response, update_api_call_stats
from .logging_utils import log
from .client_disconnect import check_client_disconnect, handle_client_disconnect
from .gemini_handlers import run_gemini_completion

# 非流式请求处理函数
async def process_nonstream_request(

    chat_request: ChatCompletionRequest, 

    http_request: Request, 

    request_type: str,

    contents,

    system_instruction,

    current_api_key: str,

    response_cache_manager,

    active_requests_manager,

    safety_settings,

    safety_settings_g2,

    api_call_stats,

    cache_key: str = None,

    client_ip: str = None

):
    """处理非流式API请求"""
    gemini_client = GeminiClient(current_api_key)
    
    # 创建任务
    gemini_task = asyncio.create_task(
        run_gemini_completion(
            gemini_client,
            chat_request,
            contents,
            system_instruction,
            request_type,
            current_api_key,
            safety_settings,
            safety_settings_g2
        )
    )
    
    disconnect_task = asyncio.create_task(
        check_client_disconnect(
            http_request,
            current_api_key,
            request_type,
            chat_request.model
        )
    )

    try:
        # 先等待看是否API任务先完成,或者客户端先断开连接
        done, pending = await asyncio.wait(
            [gemini_task, disconnect_task],
            return_when=asyncio.FIRST_COMPLETED
        )

        if disconnect_task in done:
            # 客户端已断开连接,但我们仍继续完成API请求以便缓存结果
            return await handle_client_disconnect(
                gemini_task,
                chat_request,
                request_type,
                current_api_key,
                response_cache_manager,
                cache_key,
                client_ip
            )
        else:
            # API任务先完成,取消断开检测任务
            disconnect_task.cancel()
            
            # 获取响应内容
            response_content = await gemini_task
            
            # 检查缓存是否已经存在,如果存在则不再创建新缓存
            cached_response, cache_hit = response_cache_manager.get(cache_key)
            if cache_hit:
                log('info', f"缓存已存在,直接返回: {cache_key[:8]}...", 
                    extra={'cache_operation': 'use-existing', 'request_type': request_type})
                
                # 安全删除缓存
                if cache_key in response_cache_manager.cache:
                    del response_cache_manager.cache[cache_key]
                    log('info', f"缓存使用后已删除: {cache_key[:8]}...", 
                        extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
                
                return cached_response
            
            # 创建响应
            from app.utils.response import create_response
            response = create_response(chat_request, response_content)
            
            # 缓存响应
            cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=current_api_key)
            
            # 立即删除缓存,确保只能使用一次
            if cache_key and cache_key in response_cache_manager.cache:
                del response_cache_manager.cache[cache_key]
                log('info', f"缓存创建后立即删除: {cache_key[:8]}...", 
                    extra={'cache_operation': 'store-and-remove', 'request_type': request_type})
            
            # 返回响应
            return response

    except asyncio.CancelledError:
        extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message':"请求被取消"}
        log('info', "请求取消", extra=extra_log)
        
        # 在请求被取消时先检查缓存中是否已有结果
        cached_response, cache_hit = response_cache_manager.get(cache_key)
        if cache_hit:
            log('info', f"请求取消但找到有效缓存,使用缓存响应: {cache_key[:8]}...", 
                extra={'cache_operation': 'use-cache-on-cancel', 'request_type': request_type})
            
            # 安全删除缓存
            if cache_key in response_cache_manager.cache:
                del response_cache_manager.cache[cache_key]
                log('info', f"缓存使用后已删除: {cache_key[:8]}...", 
                    extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
            
            return cached_response
            
        # 尝试完成正在进行的API请求
        if not gemini_task.done():
            log('info', "请求取消但API请求尚未完成,继续等待...", 
                extra={'key': current_api_key[:8], 'request_type': request_type})
            
            # 使用shield确保任务不会被取消
            response_content = await asyncio.shield(gemini_task)
            
            # 创建响应
            from app.utils.response import create_response
            response = create_response(chat_request, response_content)
            
            # 不缓存这个响应,直接返回
            return response
        else:
            # 任务已完成,获取结果
            response_content = gemini_task.result()
            
            # 创建响应
            from app.utils.response import create_response
            response = create_response(chat_request, response_content)
            
            # 不缓存这个响应,直接返回
            return response

    except HTTPException as e:
        if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
            extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 
                        'status_code': 408, 'error_message': '客户端连接中断'}
            log('error', "客户端连接中断,终止后续重试", extra=extra_log)
            raise  
        else:
            raise