|
|
from datetime import datetime, timezone |
|
|
from typing import List, Optional |
|
|
|
|
|
from config import get_api_password, get_panel_password |
|
|
from fastapi import Depends, HTTPException, Header, Query, Request, status |
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer |
|
|
from log import log |
|
|
|
|
|
|
|
|
security = HTTPBearer() |
|
|
|
|
|
|
|
|
|
|
|
GEMINICLI_USER_AGENT = "GeminiCLI/0.1.5 (Windows; AMD64)" |
|
|
|
|
|
ANTIGRAVITY_USER_AGENT = "antigravity/1.11.3 windows/amd64" |
|
|
|
|
|
|
|
|
CLIENT_ID = "681255809395-oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com" |
|
|
CLIENT_SECRET = "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" |
|
|
SCOPES = [ |
|
|
"https://www.googleapis.com/auth/cloud-platform", |
|
|
"https://www.googleapis.com/auth/userinfo.email", |
|
|
"https://www.googleapis.com/auth/userinfo.profile", |
|
|
] |
|
|
|
|
|
|
|
|
ANTIGRAVITY_CLIENT_ID = "1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com" |
|
|
ANTIGRAVITY_CLIENT_SECRET = "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" |
|
|
ANTIGRAVITY_SCOPES = [ |
|
|
'https://www.googleapis.com/auth/cloud-platform', |
|
|
'https://www.googleapis.com/auth/userinfo.email', |
|
|
'https://www.googleapis.com/auth/userinfo.profile', |
|
|
'https://www.googleapis.com/auth/cclog', |
|
|
'https://www.googleapis.com/auth/experimentsandconfigs' |
|
|
] |
|
|
|
|
|
|
|
|
TOKEN_URL = "https://oauth2.googleapis.com/token" |
|
|
|
|
|
|
|
|
CALLBACK_HOST = "localhost" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_SAFETY_SETTINGS = [ |
|
|
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_IMAGE_HATE", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_IMAGE_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_IMAGE_HARASSMENT", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_IMAGE_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_JAILBREAK", "threshold": "BLOCK_NONE"}, |
|
|
] |
|
|
|
|
|
|
|
|
BASE_MODELS = [ |
|
|
"gemini-2.5-pro", |
|
|
"gemini-2.5-flash", |
|
|
"gemini-3-pro-preview", |
|
|
"gemini-3-flash-preview" |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_fake_streaming_model(model_name: str) -> bool: |
|
|
"""Check if model name indicates fake streaming should be used.""" |
|
|
return model_name.startswith("假流式/") |
|
|
|
|
|
|
|
|
def is_anti_truncation_model(model_name: str) -> bool: |
|
|
"""Check if model name indicates anti-truncation should be used.""" |
|
|
return model_name.startswith("流式抗截断/") |
|
|
|
|
|
|
|
|
def get_base_model_from_feature_model(model_name: str) -> str: |
|
|
"""Get base model name from feature model name.""" |
|
|
|
|
|
for prefix in ["假流式/", "流式抗截断/"]: |
|
|
if model_name.startswith(prefix): |
|
|
return model_name[len(prefix) :] |
|
|
return model_name |
|
|
|
|
|
|
|
|
def get_available_models(router_type: str = "openai") -> List[str]: |
|
|
""" |
|
|
Get available models with feature prefixes. |
|
|
|
|
|
Args: |
|
|
router_type: "openai" or "gemini" |
|
|
|
|
|
Returns: |
|
|
List of model names with feature prefixes |
|
|
""" |
|
|
models = [] |
|
|
|
|
|
for base_model in BASE_MODELS: |
|
|
|
|
|
models.append(base_model) |
|
|
|
|
|
|
|
|
models.append(f"假流式/{base_model}") |
|
|
|
|
|
|
|
|
models.append(f"流式抗截断/{base_model}") |
|
|
|
|
|
|
|
|
|
|
|
thinking_suffixes = ["-maxthinking", "-nothinking"] |
|
|
search_suffix = "-search" |
|
|
|
|
|
|
|
|
for thinking_suffix in thinking_suffixes: |
|
|
models.append(f"{base_model}{thinking_suffix}") |
|
|
models.append(f"假流式/{base_model}{thinking_suffix}") |
|
|
models.append(f"流式抗截断/{base_model}{thinking_suffix}") |
|
|
|
|
|
|
|
|
models.append(f"{base_model}{search_suffix}") |
|
|
models.append(f"假流式/{base_model}{search_suffix}") |
|
|
models.append(f"流式抗截断/{base_model}{search_suffix}") |
|
|
|
|
|
|
|
|
for thinking_suffix in thinking_suffixes: |
|
|
combined_suffix = f"{thinking_suffix}{search_suffix}" |
|
|
models.append(f"{base_model}{combined_suffix}") |
|
|
models.append(f"假流式/{base_model}{combined_suffix}") |
|
|
models.append(f"流式抗截断/{base_model}{combined_suffix}") |
|
|
|
|
|
return models |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def authenticate_flexible( |
|
|
request: Request, |
|
|
authorization: Optional[str] = Header(None), |
|
|
x_api_key: Optional[str] = Header(None, alias="x-api-key"), |
|
|
access_token: Optional[str] = Header(None, alias="access_token"), |
|
|
x_goog_api_key: Optional[str] = Header(None, alias="x-goog-api-key"), |
|
|
key: Optional[str] = Query(None) |
|
|
) -> str: |
|
|
""" |
|
|
统一的灵活认证函数,支持多种认证方式 |
|
|
|
|
|
此函数可以直接用作 FastAPI 的 Depends 依赖 |
|
|
|
|
|
支持的认证方式: |
|
|
- URL 参数: key |
|
|
- HTTP 头部: Authorization (Bearer token) |
|
|
- HTTP 头部: x-api-key |
|
|
- HTTP 头部: access_token |
|
|
- HTTP 头部: x-goog-api-key |
|
|
|
|
|
Args: |
|
|
request: FastAPI Request 对象 |
|
|
authorization: Authorization 头部值(自动注入) |
|
|
x_api_key: x-api-key 头部值(自动注入) |
|
|
access_token: access_token 头部值(自动注入) |
|
|
x_goog_api_key: x-goog-api-key 头部值(自动注入) |
|
|
key: URL 参数 key(自动注入) |
|
|
|
|
|
Returns: |
|
|
验证通过的token |
|
|
|
|
|
Raises: |
|
|
HTTPException: 认证失败时抛出异常 |
|
|
|
|
|
使用示例: |
|
|
@router.post("/endpoint") |
|
|
async def endpoint(token: str = Depends(authenticate_flexible)): |
|
|
# token 已验证通过 |
|
|
pass |
|
|
""" |
|
|
password = await get_api_password() |
|
|
token = None |
|
|
auth_method = None |
|
|
|
|
|
|
|
|
if key: |
|
|
token = key |
|
|
auth_method = "URL parameter 'key'" |
|
|
|
|
|
|
|
|
elif x_goog_api_key: |
|
|
token = x_goog_api_key |
|
|
auth_method = "x-goog-api-key header" |
|
|
|
|
|
|
|
|
elif x_api_key: |
|
|
token = x_api_key |
|
|
auth_method = "x-api-key header" |
|
|
|
|
|
|
|
|
elif access_token: |
|
|
token = access_token |
|
|
auth_method = "access_token header" |
|
|
|
|
|
|
|
|
elif authorization: |
|
|
if not authorization.startswith("Bearer "): |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid authentication scheme. Use 'Bearer <token>'", |
|
|
headers={"WWW-Authenticate": "Bearer"}, |
|
|
) |
|
|
token = authorization[7:] |
|
|
auth_method = "Authorization Bearer header" |
|
|
|
|
|
|
|
|
if not token: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Missing authentication credentials. Use 'key' URL parameter, 'x-goog-api-key', 'x-api-key', 'access_token' header, or 'Authorization: Bearer <token>'", |
|
|
headers={"WWW-Authenticate": "Bearer"}, |
|
|
) |
|
|
|
|
|
|
|
|
if token != password: |
|
|
log.error(f"Authentication failed using {auth_method}") |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_403_FORBIDDEN, |
|
|
detail="密码错误" |
|
|
) |
|
|
|
|
|
log.debug(f"Authentication successful using {auth_method}") |
|
|
return token |
|
|
|
|
|
|
|
|
|
|
|
authenticate_bearer = authenticate_flexible |
|
|
authenticate_gemini_flexible = authenticate_flexible |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def verify_panel_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: |
|
|
""" |
|
|
简化的控制面板密码验证函数 |
|
|
|
|
|
直接验证Bearer token是否等于控制面板密码 |
|
|
|
|
|
Args: |
|
|
credentials: HTTPAuthorizationCredentials 自动注入 |
|
|
|
|
|
Returns: |
|
|
验证通过的token |
|
|
|
|
|
Raises: |
|
|
HTTPException: 密码错误时抛出401异常 |
|
|
""" |
|
|
|
|
|
password = await get_panel_password() |
|
|
if credentials.credentials != password: |
|
|
raise HTTPException(status_code=401, detail="密码错误") |
|
|
return credentials.credentials |
|
|
|