toanatp's picture
Initial commit for Elevenlabs Worker 14
ffd3cfe
#<--BẮT ĐẦU TOÀN BỘ CODE CHO FILE app.py (WORKER THỢ) - PHIÊN BẢN CUỐI CÙNG-->
import asyncio
import aiohttp
import os
from fastapi import FastAPI, Header, Depends, HTTPException
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from datetime import datetime
import logging
# --- Cấu hình Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Hằng số ---
CHECK_KEY_TIMEOUT = 10
# --- Xác thực ---
API_AUTH_KEY = os.getenv("AUTH_KEY", "default-secret-key-change-me")
async def api_key_auth(x_api_key: str = Header(...)):
if x_api_key != API_AUTH_KEY:
raise HTTPException(status_code=401, detail="Invalid or Missing API Key")
# --- Pydantic Models ---
class APIKeyPayload(BaseModel):
api_keys: List[str]
class KeyStatusResult(BaseModel):
key: str
status: str
credit_remaining: int
credit_limit: int
reset_date: Optional[str] = None
message: str
# --- Khởi tạo ứng dụng FastAPI ---
app = FastAPI(
title="ElevenLabs Credit Checker (Thợ)",
description="Nhận việc từ Load Balancer và thực hiện kiểm tra key tốc độ cao.",
version="2.0.0",
)
# --- Logic kiểm tra key ---
async def check_single_key_status(session: aiohttp.ClientSession, api_key: str) -> Dict[str, Any]:
url = "https://api.elevenlabs.io/v1/user/subscription"
headers = {"xi-api-key": api_key}
result = {"key": api_key, "status": "Error", "credit_remaining": 0, "credit_limit": 0, "reset_date": None, "message": ""}
if not api_key:
result["message"], result["status"] = "Key rỗng.", "Invalid"
return result
try:
timeout = aiohttp.ClientTimeout(total=CHECK_KEY_TIMEOUT)
async with session.get(url, headers=headers, timeout=timeout) as response:
if response.status == 200:
data = await response.json()
char_count = data.get("character_count", 0)
char_limit = data.get("character_limit", 0)
reset_unix = data.get("next_character_count_reset_unix")
result["status"] = "OK"
result["credit_remaining"] = max(0, char_limit - char_count)
result["credit_limit"] = char_limit
result["reset_date"] = datetime.fromtimestamp(reset_unix).strftime('%Y-%m-%d %H:%M:%S') if reset_unix else "N/A"
result["message"] = f"OK ({result['credit_remaining']:,}/{result['credit_limit']:,})"
elif response.status == 401:
result["message"], result["status"] = "Invalid/Unauthorized", "Invalid"
else:
body = await response.text()
result["message"], result["status"] = f"API Error {response.status}: {body[:100]}", "API Error"
return result
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
result["message"], result["status"] = f"Network/Timeout Error", "Network Error"
return result
except Exception as e:
logger.exception(f"Lỗi không xác định khi kiểm tra key ...{api_key[-4:]}")
result["message"], result["status"] = f"Unknown Error: {str(e)}", "Unknown Error"
return result
# Trong file app.py của Worker Thợ
#<--BẮT ĐẦU THAY THẾ HÀM check_all_keys_logic -->
async def check_all_keys_logic(api_keys: List[str]) -> List[Dict[str, Any]]:
unique_keys = sorted(list(set(k.strip() for k in api_keys if k.strip())))
if not unique_keys:
return []
logger.info(f"Bắt đầu kiểm tra {len(unique_keys)} key...")
final_results: Dict[str, Any] = {}
keys_to_retry: List[str] = []
# --- VÒNG 1: KIỂM TRA TẤT CẢ CÁC KEY ---
logger.info(">>> Vòng 1: Kiểm tra toàn bộ lô key...")
async with aiohttp.ClientSession() as session:
tasks_round_1 = [check_single_key_status(session, key) for key in unique_keys]
results_round_1 = await asyncio.gather(*tasks_round_1, return_exceptions=True)
for i, res in enumerate(results_round_1):
key_in_check = unique_keys[i]
# Xử lý trường hợp task bị lỗi nghiêm trọng
if isinstance(res, Exception):
logger.error(f"Lỗi task nghiêm trọng cho key ...{key_in_check[-5:]}: {res}")
# Ghi nhận kết quả lỗi và thêm vào danh sách retry
final_results[key_in_check] = {"key": key_in_check, "status": "Task Error", "credit_remaining": 0, "credit_limit": 0, "reset_date": None, "message": str(res)}
keys_to_retry.append(key_in_check)
continue
# Ghi nhận kết quả từ vòng 1
final_results[key_in_check] = res
# Quyết định xem key này có cần retry hay không
# Chỉ retry các lỗi tạm thời như Network, Timeout, hoặc lỗi Server (5xx)
# Không retry các lỗi chắc chắn như Invalid key (401)
if res.get("status") in ["Network Error", "API Error", "Unknown Error", "Task Error"]:
# Kiểm tra chi tiết hơn message của API Error để loại trừ lỗi 401
if "401" in res.get("message", "") or "Invalid" in res.get("status", ""):
logger.info(f"Key ...{key_in_check[-5:]} bị lỗi 401/Invalid, sẽ KHÔNG thử lại.")
else:
logger.info(f"Key ...{key_in_check[-5:]} bị lỗi tạm thời ({res.get('status')}), sẽ được thử lại.")
keys_to_retry.append(key_in_check)
# --- VÒNG 2: CHỈ KIỂM TRA LẠI CÁC KEY BỊ LỖI ---
if keys_to_retry:
logger.info(f">>> Vòng 2: Thử lại {len(keys_to_retry)} key bị lỗi...")
await asyncio.sleep(1) # Thêm một khoảng nghỉ nhỏ trước khi retry
async with aiohttp.ClientSession() as session:
tasks_round_2 = [check_single_key_status(session, key) for key in keys_to_retry]
results_round_2 = await asyncio.gather(*tasks_round_2, return_exceptions=True)
for i, res_retry in enumerate(results_round_2):
key_in_retry = keys_to_retry[i]
if isinstance(res_retry, Exception):
logger.error(f"Lỗi task nghiêm trọng (Vòng 2) cho key ...{key_in_retry[-5:]}: {res_retry}")
# Giữ nguyên kết quả lỗi từ vòng 1 nếu vòng 2 cũng lỗi
continue
# GHI ĐÈ kết quả lỗi từ vòng 1 bằng kết quả mới từ vòng 2
# Bất kể kết quả vòng 2 là gì (OK hoặc vẫn là Error), nó đều mới hơn và chính xác hơn.
final_results[key_in_retry] = res_retry
logger.info(f"Đã cập nhật kết quả Vòng 2 cho key ...{key_in_retry[-5:]}. Status mới: {res_retry.get('status')}")
logger.info("Kiểm tra key hoàn tất.")
# Chuyển dictionary kết quả về lại dạng list theo đúng thứ tự ban đầu
return [final_results[key] for key in unique_keys]
#<--KẾT THÚC THAY THẾ HÀM check_all_keys_logic -->
# --- API Endpoint ---
@app.post("/check-credits", response_model=List[KeyStatusResult], dependencies=[Depends(api_key_auth)])
async def check_credits_endpoint(payload: APIKeyPayload):
"""Nhận danh sách API key và trả về trạng thái của từng key."""
return await check_all_keys_logic(payload.api_keys)
@app.get("/", include_in_schema=False)
def root():
return {"message": "Chào mừng đến với Worker Thợ!"}
#<--KẾT THÚC TOÀN BỘ CODE CHO FILE app.py (WORKER THỢ) -->