| | from datetime import datetime, timezone |
| | from typing import List, Optional |
| |
|
| | from config import get_api_password, get_panel_password |
| | from fastapi import Depends, HTTPException, Header, Query, Request, status |
| | from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer |
| | from log import log |
| |
|
| | |
| | security = HTTPBearer() |
| |
|
| | |
| |
|
| | GEMINICLI_USER_AGENT = "GeminiCLI/0.1.5 (Windows; AMD64)" |
| |
|
| | ANTIGRAVITY_USER_AGENT = "antigravity/1.11.3 windows/amd64" |
| |
|
| | |
| | CLIENT_ID = "681255809395-oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com" |
| | CLIENT_SECRET = "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" |
| | SCOPES = [ |
| | "https://www.googleapis.com/auth/cloud-platform", |
| | "https://www.googleapis.com/auth/userinfo.email", |
| | "https://www.googleapis.com/auth/userinfo.profile", |
| | ] |
| |
|
| | |
| | ANTIGRAVITY_CLIENT_ID = "1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com" |
| | ANTIGRAVITY_CLIENT_SECRET = "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" |
| | ANTIGRAVITY_SCOPES = [ |
| | 'https://www.googleapis.com/auth/cloud-platform', |
| | 'https://www.googleapis.com/auth/userinfo.email', |
| | 'https://www.googleapis.com/auth/userinfo.profile', |
| | 'https://www.googleapis.com/auth/cclog', |
| | 'https://www.googleapis.com/auth/experimentsandconfigs' |
| | ] |
| |
|
| | |
| | TOKEN_URL = "https://oauth2.googleapis.com/token" |
| |
|
| | |
| | CALLBACK_HOST = "localhost" |
| |
|
| | |
| |
|
| | |
| | DEFAULT_SAFETY_SETTINGS = [ |
| | {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, |
| | {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, |
| | {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, |
| | {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, |
| | {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}, |
| | {"category": "HARM_CATEGORY_IMAGE_HATE", "threshold": "BLOCK_NONE"}, |
| | {"category": "HARM_CATEGORY_IMAGE_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, |
| | {"category": "HARM_CATEGORY_IMAGE_HARASSMENT", "threshold": "BLOCK_NONE"}, |
| | {"category": "HARM_CATEGORY_IMAGE_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, |
| | {"category": "HARM_CATEGORY_JAILBREAK", "threshold": "BLOCK_NONE"}, |
| | ] |
| |
|
| | |
| | BASE_MODELS = [ |
| | "gemini-2.5-pro", |
| | "gemini-2.5-flash", |
| | "gemini-3-pro-preview", |
| | "gemini-3-flash-preview" |
| | ] |
| |
|
| |
|
| | |
| |
|
| | def is_fake_streaming_model(model_name: str) -> bool: |
| | """Check if model name indicates fake streaming should be used.""" |
| | return model_name.startswith("假流式/") |
| |
|
| |
|
| | def is_anti_truncation_model(model_name: str) -> bool: |
| | """Check if model name indicates anti-truncation should be used.""" |
| | return model_name.startswith("流式抗截断/") |
| |
|
| |
|
| | def get_base_model_from_feature_model(model_name: str) -> str: |
| | """Get base model name from feature model name.""" |
| | |
| | for prefix in ["假流式/", "流式抗截断/"]: |
| | if model_name.startswith(prefix): |
| | return model_name[len(prefix) :] |
| | return model_name |
| |
|
| |
|
| | def get_available_models(router_type: str = "openai") -> List[str]: |
| | """ |
| | Get available models with feature prefixes. |
| | |
| | Args: |
| | router_type: "openai" or "gemini" |
| | |
| | Returns: |
| | List of model names with feature prefixes |
| | """ |
| | models = [] |
| |
|
| | for base_model in BASE_MODELS: |
| | |
| | models.append(base_model) |
| |
|
| | |
| | models.append(f"假流式/{base_model}") |
| |
|
| | |
| | models.append(f"流式抗截断/{base_model}") |
| |
|
| | |
| | |
| | thinking_suffixes = ["-maxthinking", "-nothinking"] |
| | search_suffix = "-search" |
| |
|
| | |
| | for thinking_suffix in thinking_suffixes: |
| | models.append(f"{base_model}{thinking_suffix}") |
| | models.append(f"假流式/{base_model}{thinking_suffix}") |
| | models.append(f"流式抗截断/{base_model}{thinking_suffix}") |
| |
|
| | |
| | models.append(f"{base_model}{search_suffix}") |
| | models.append(f"假流式/{base_model}{search_suffix}") |
| | models.append(f"流式抗截断/{base_model}{search_suffix}") |
| |
|
| | |
| | for thinking_suffix in thinking_suffixes: |
| | combined_suffix = f"{thinking_suffix}{search_suffix}" |
| | models.append(f"{base_model}{combined_suffix}") |
| | models.append(f"假流式/{base_model}{combined_suffix}") |
| | models.append(f"流式抗截断/{base_model}{combined_suffix}") |
| |
|
| | return models |
| |
|
| |
|
| | |
| |
|
| | async def authenticate_flexible( |
| | request: Request, |
| | authorization: Optional[str] = Header(None), |
| | x_api_key: Optional[str] = Header(None, alias="x-api-key"), |
| | access_token: Optional[str] = Header(None, alias="access_token"), |
| | x_goog_api_key: Optional[str] = Header(None, alias="x-goog-api-key"), |
| | key: Optional[str] = Query(None) |
| | ) -> str: |
| | """ |
| | 统一的灵活认证函数,支持多种认证方式 |
| | |
| | 此函数可以直接用作 FastAPI 的 Depends 依赖 |
| | |
| | 支持的认证方式: |
| | - URL 参数: key |
| | - HTTP 头部: Authorization (Bearer token) |
| | - HTTP 头部: x-api-key |
| | - HTTP 头部: access_token |
| | - HTTP 头部: x-goog-api-key |
| | |
| | Args: |
| | request: FastAPI Request 对象 |
| | authorization: Authorization 头部值(自动注入) |
| | x_api_key: x-api-key 头部值(自动注入) |
| | access_token: access_token 头部值(自动注入) |
| | x_goog_api_key: x-goog-api-key 头部值(自动注入) |
| | key: URL 参数 key(自动注入) |
| | |
| | Returns: |
| | 验证通过的token |
| | |
| | Raises: |
| | HTTPException: 认证失败时抛出异常 |
| | |
| | 使用示例: |
| | @router.post("/endpoint") |
| | async def endpoint(token: str = Depends(authenticate_flexible)): |
| | # token 已验证通过 |
| | pass |
| | """ |
| | password = await get_api_password() |
| | token = None |
| | auth_method = None |
| | |
| | |
| | if key: |
| | token = key |
| | auth_method = "URL parameter 'key'" |
| | |
| | |
| | elif x_goog_api_key: |
| | token = x_goog_api_key |
| | auth_method = "x-goog-api-key header" |
| | |
| | |
| | elif x_api_key: |
| | token = x_api_key |
| | auth_method = "x-api-key header" |
| | |
| | |
| | elif access_token: |
| | token = access_token |
| | auth_method = "access_token header" |
| | |
| | |
| | elif authorization: |
| | if not authorization.startswith("Bearer "): |
| | raise HTTPException( |
| | status_code=status.HTTP_401_UNAUTHORIZED, |
| | detail="Invalid authentication scheme. Use 'Bearer <token>'", |
| | headers={"WWW-Authenticate": "Bearer"}, |
| | ) |
| | token = authorization[7:] |
| | auth_method = "Authorization Bearer header" |
| | |
| | |
| | if not token: |
| | raise HTTPException( |
| | status_code=status.HTTP_401_UNAUTHORIZED, |
| | detail="Missing authentication credentials. Use 'key' URL parameter, 'x-goog-api-key', 'x-api-key', 'access_token' header, or 'Authorization: Bearer <token>'", |
| | headers={"WWW-Authenticate": "Bearer"}, |
| | ) |
| | |
| | |
| | if token != password: |
| | log.error(f"Authentication failed using {auth_method}") |
| | raise HTTPException( |
| | status_code=status.HTTP_403_FORBIDDEN, |
| | detail="密码错误" |
| | ) |
| | |
| | log.debug(f"Authentication successful using {auth_method}") |
| | return token |
| |
|
| |
|
| | |
| | authenticate_bearer = authenticate_flexible |
| | authenticate_gemini_flexible = authenticate_flexible |
| |
|
| |
|
| | |
| |
|
| | async def verify_panel_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: |
| | """ |
| | 简化的控制面板密码验证函数 |
| | |
| | 直接验证Bearer token是否等于控制面板密码 |
| | |
| | Args: |
| | credentials: HTTPAuthorizationCredentials 自动注入 |
| | |
| | Returns: |
| | 验证通过的token |
| | |
| | Raises: |
| | HTTPException: 密码错误时抛出401异常 |
| | """ |
| |
|
| | password = await get_panel_password() |
| | if credentials.credentials != password: |
| | raise HTTPException(status_code=401, detail="密码错误") |
| | return credentials.credentials |
| |
|