| |
| import asyncio |
| import aiohttp |
| import os |
| from fastapi import FastAPI, Header, Depends, HTTPException |
| from pydantic import BaseModel, Field |
| from typing import List, Optional, Dict, Any |
| from datetime import datetime |
| import logging |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| |
| CHECK_KEY_TIMEOUT = 10 |
|
|
| |
| API_AUTH_KEY = os.getenv("AUTH_KEY", "default-secret-key-change-me") |
|
|
| async def api_key_auth(x_api_key: str = Header(...)): |
| if x_api_key != API_AUTH_KEY: |
| raise HTTPException(status_code=401, detail="Invalid or Missing API Key") |
|
|
| |
| class APIKeyPayload(BaseModel): |
| api_keys: List[str] |
|
|
| class KeyStatusResult(BaseModel): |
| key: str |
| status: str |
| credit_remaining: int |
| credit_limit: int |
| reset_date: Optional[str] = None |
| message: str |
|
|
| |
| app = FastAPI( |
| title="ElevenLabs Credit Checker (Thợ)", |
| description="Nhận việc từ Load Balancer và thực hiện kiểm tra key tốc độ cao.", |
| version="2.0.0", |
| ) |
|
|
| |
| async def check_single_key_status(session: aiohttp.ClientSession, api_key: str) -> Dict[str, Any]: |
| url = "https://api.elevenlabs.io/v1/user/subscription" |
| headers = {"xi-api-key": api_key} |
| result = {"key": api_key, "status": "Error", "credit_remaining": 0, "credit_limit": 0, "reset_date": None, "message": ""} |
|
|
| if not api_key: |
| result["message"], result["status"] = "Key rỗng.", "Invalid" |
| return result |
| |
| try: |
| timeout = aiohttp.ClientTimeout(total=CHECK_KEY_TIMEOUT) |
| async with session.get(url, headers=headers, timeout=timeout) as response: |
| if response.status == 200: |
| data = await response.json() |
| char_count = data.get("character_count", 0) |
| char_limit = data.get("character_limit", 0) |
| reset_unix = data.get("next_character_count_reset_unix") |
| |
| result["status"] = "OK" |
| result["credit_remaining"] = max(0, char_limit - char_count) |
| result["credit_limit"] = char_limit |
| result["reset_date"] = datetime.fromtimestamp(reset_unix).strftime('%Y-%m-%d %H:%M:%S') if reset_unix else "N/A" |
| result["message"] = f"OK ({result['credit_remaining']:,}/{result['credit_limit']:,})" |
| elif response.status == 401: |
| result["message"], result["status"] = "Invalid/Unauthorized", "Invalid" |
| else: |
| body = await response.text() |
| result["message"], result["status"] = f"API Error {response.status}: {body[:100]}", "API Error" |
| return result |
| except (aiohttp.ClientError, asyncio.TimeoutError) as e: |
| result["message"], result["status"] = f"Network/Timeout Error", "Network Error" |
| return result |
| except Exception as e: |
| logger.exception(f"Lỗi không xác định khi kiểm tra key ...{api_key[-4:]}") |
| result["message"], result["status"] = f"Unknown Error: {str(e)}", "Unknown Error" |
| return result |
|
|
| |
|
|
| |
| async def check_all_keys_logic(api_keys: List[str]) -> List[Dict[str, Any]]: |
| unique_keys = sorted(list(set(k.strip() for k in api_keys if k.strip()))) |
| if not unique_keys: |
| return [] |
|
|
| logger.info(f"Bắt đầu kiểm tra {len(unique_keys)} key...") |
| |
| final_results: Dict[str, Any] = {} |
| keys_to_retry: List[str] = [] |
|
|
| |
| logger.info(">>> Vòng 1: Kiểm tra toàn bộ lô key...") |
| async with aiohttp.ClientSession() as session: |
| tasks_round_1 = [check_single_key_status(session, key) for key in unique_keys] |
| results_round_1 = await asyncio.gather(*tasks_round_1, return_exceptions=True) |
|
|
| for i, res in enumerate(results_round_1): |
| key_in_check = unique_keys[i] |
| |
| |
| if isinstance(res, Exception): |
| logger.error(f"Lỗi task nghiêm trọng cho key ...{key_in_check[-5:]}: {res}") |
| |
| final_results[key_in_check] = {"key": key_in_check, "status": "Task Error", "credit_remaining": 0, "credit_limit": 0, "reset_date": None, "message": str(res)} |
| keys_to_retry.append(key_in_check) |
| continue |
|
|
| |
| final_results[key_in_check] = res |
| |
| |
| |
| |
| if res.get("status") in ["Network Error", "API Error", "Unknown Error", "Task Error"]: |
| |
| if "401" in res.get("message", "") or "Invalid" in res.get("status", ""): |
| logger.info(f"Key ...{key_in_check[-5:]} bị lỗi 401/Invalid, sẽ KHÔNG thử lại.") |
| else: |
| logger.info(f"Key ...{key_in_check[-5:]} bị lỗi tạm thời ({res.get('status')}), sẽ được thử lại.") |
| keys_to_retry.append(key_in_check) |
|
|
| |
| if keys_to_retry: |
| logger.info(f">>> Vòng 2: Thử lại {len(keys_to_retry)} key bị lỗi...") |
| await asyncio.sleep(1) |
|
|
| async with aiohttp.ClientSession() as session: |
| tasks_round_2 = [check_single_key_status(session, key) for key in keys_to_retry] |
| results_round_2 = await asyncio.gather(*tasks_round_2, return_exceptions=True) |
| |
| for i, res_retry in enumerate(results_round_2): |
| key_in_retry = keys_to_retry[i] |
|
|
| if isinstance(res_retry, Exception): |
| logger.error(f"Lỗi task nghiêm trọng (Vòng 2) cho key ...{key_in_retry[-5:]}: {res_retry}") |
| |
| continue |
| |
| |
| |
| final_results[key_in_retry] = res_retry |
| logger.info(f"Đã cập nhật kết quả Vòng 2 cho key ...{key_in_retry[-5:]}. Status mới: {res_retry.get('status')}") |
|
|
| logger.info("Kiểm tra key hoàn tất.") |
| |
| return [final_results[key] for key in unique_keys] |
| |
| |
| @app.post("/check-credits", response_model=List[KeyStatusResult], dependencies=[Depends(api_key_auth)]) |
| async def check_credits_endpoint(payload: APIKeyPayload): |
| """Nhận danh sách API key và trả về trạng thái của từng key.""" |
| return await check_all_keys_logic(payload.api_keys) |
|
|
| @app.get("/", include_in_schema=False) |
| def root(): |
| return {"message": "Chào mừng đến với Worker Thợ!"} |
|
|
| |