AI-Video-Engine / licensing_client.py
AbuAlone09's picture
Update licensing_client.py
4c273b4 verified
# =========================================================
# MODULE: licensing_client.py
# SYSTEM: ADVANCED RESOURCE & CONCURRENCY THREAD MANAGER
# FIX: DYNAMIC FILE-BASED TRANSACTION SHARING FOR MULTI-WORKERS
# DEBUGGING: CONTAINER RAW LOGS TRACER WITH STDOUT FLUSHING
# FALLBACK INTEGRATION: AUTOMATIC ROUTE PATH RECOVERY (FIX 404)
# AUTHOR: Abu Alone © 2026
# =========================================================
import os
import json
import time
import signal
import requests
import subprocess
from datetime import datetime
from loguru import logger
# Cấu hình đường dẫn lưu trữ và thông tin Server cổng thanh toán dịch vụ
STORAGE_FILE = "Hugging AbuAlone09/AI-Video-Engine-storage"
# File trạng thái phân luồng dùng chung giữa toàn bộ các Worker tiến trình để số nhảy thực tế từng giây
THREAD_STATUS_FILE = "Hugging AbuAlone09/AI-Video-Engine-threads"
SERVER_URL = os.getenv("LICENSIFY_SERVER_URL", "https://abualone09-my-licensify-server.hf.space").strip()
SECRET_API_KEY = os.getenv("SECRET_API_KEY", "YOUR_SECRET_API_KEY_HERE").strip()
# Khởi tạo thư mục và tệp cấu trúc lưu trữ
os.makedirs(os.path.dirname(STORAGE_FILE), exist_ok=True)
def _init_system_files():
if not os.path.exists(STORAGE_FILE):
with open(STORAGE_FILE, "w", encoding="utf-8") as f:
json.dump({"keys": {}, "free_devices": {}}, f, indent=4)
# Khởi tạo pool 6 luồng thực tế vào file dùng chung nếu chưa có
if not os.path.exists(THREAD_STATUS_FILE):
initial_pool = {str(i): {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0} for i in range(1, 7)}
with open(THREAD_STATUS_FILE, "w", encoding="utf-8") as f:
json.dump(initial_pool, f, indent=4)
_init_system_files()
# =========================================================
# THÀNH PHẦN CORE: ĐỒNG BỘ THỜI GIAN THỰC ĐA TIẾN TRÌNH (FILE-BASED MONITOR)
# =========================================================
def _load_threads_state() -> dict:
"""Đọc trực tiếp trạng thái luồng từ tệp dùng chung để tránh lệch số giữa các API worker"""
try:
if os.path.exists(THREAD_STATUS_FILE):
with open(THREAD_STATUS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {str(i): {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0} for i in range(1, 7)}
def _save_threads_state(state: dict):
"""Ghi trạng thái luồng xuống tệp khóa ngay lập tức để đồng bộ hóa"""
try:
with open(THREAD_STATUS_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, indent=4)
except Exception as e:
logger.error(f"Failed to write thread state cluster: {e}")
def get_active_threads_count() -> int:
"""Quét sạch tiến trình chết rớt mạng rồi tính tổng số luồng thực tế đang bận render"""
clean_dead_or_zombie_threads()
state = _load_threads_state()
return sum(1 for t in state.values() if t["status"] == "rendering")
def clean_dead_or_zombie_threads():
"""Kiểm tra PID hệ thống Linux để giải phóng luồng ngay lập tức nếu người dùng đóng tab, rớt mạng, F5"""
state = _load_threads_state()
changed = False
for slot_id, thread in state.items():
if thread["status"] == "rendering":
pid = thread.get("pid")
if pid:
try:
# Gửi tín hiệu 0 để kiểm tra xem tiến trình tạo video đó còn sống thực tế không
os.kill(pid, 0)
except (ProcessLookupError, PermissionError):
# Tiến trình đã chết (người dùng hủy tab hoặc crash giữa chừng) -> Trả luồng về idle lập tức
logger.warning(f"🧹 Clean-up monitor detected dead PID {pid} on Slot {slot_id}. Reverting to idle.")
state[slot_id] = {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0}
changed = True
if changed:
_save_threads_state(state)
# =========================================================
# CORE 1: XỬ LÝ ĐỌC / GHI & XÁC THỰC API KEY TỪ XA TỚI SERVER
# =========================================================
def _load_storage():
with open(STORAGE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
def _save_storage(data):
with open(STORAGE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
def clean_expired_keys():
data = _load_storage()
now_ts = time.time()
changed = False
for k, info in list(data["keys"].items()):
if now_ts > info.get("expiry_timestamp", 0):
del data["keys"][k]
changed = True
logger.warning(f"Key {k} expired and has been automatically purged from storage.")
today_str = datetime.now().strftime("%Y-%m-%d")
for dev, info in list(data["free_devices"].items()):
if info.get("last_date") != today_str:
info["daily_batches"] = 0
info["last_date"] = today_str
changed = True
if changed:
_save_storage(data)
def verify_and_get_license_info(key_input: str, device_id: str) -> tuple:
clean_expired_keys()
token = key_input.strip()
admin_secret = os.getenv("ADMIN_KEY", "ADMIN_ABUALONE_2026").strip()
vip_secret = os.getenv("VIP_KEY", "VIP_PROMO_2026").strip()
if token == admin_secret:
return True, {
"tier": "ADMIN",
"tx_name": "System Master Administrator",
"amount": "$0.00 (Root Privilege)",
"tx_date": datetime.now().strftime("%Y-%m-%d"),
"expiry": "Permanent Access",
"days_left": 9999,
"msg": "📋 SYSTEM STATUS: Admin Master Active. Full hardware diagnostic test layer unlocked.",
"show_test_panel": True,
"remove_watermark": True,
"bypass_limits": True
}
if token == vip_secret:
return True, {
"tier": "VIP",
"tx_name": "VIP Promotional Access",
"amount": "$0.00 (Promo Tier)",
"tx_date": datetime.now().strftime("%Y-%m-%d"),
"expiry": "2026-12-31",
"days_left": 200,
"msg": "👑 VIP STATUS: Promo Key Verified. Watermarks and daily rendering limits removed.",
"show_test_panel": False,
"remove_watermark": True,
"bypass_limits": True
}
data = _load_storage()
if token in data["keys"]:
info = data["keys"][token]
days_left = int((info["expiry_timestamp"] - time.time()) / 86400)
return True, {
"tier": "VIP", "tx_name": info["tx_name"], "amount": info["amount"],
"tx_date": info["tx_date"], "expiry": info["expiry"], "days_left": max(0, days_left),
"msg": f"👑 VIP ACTIVE | User: {info['tx_name']} | {max(0, days_left)} days remaining.",
"show_test_panel": False,
"remove_watermark": True,
"bypass_limits": False
}
# --------------------------------------------------
# BỘ KHỞI CHẠY CHẨN ĐOÁN LỖI HUGGING FACE CONTAINER (STDOUT FORCED)
# --------------------------------------------------
print("\n" + "🚀 " + "="*55, flush=True)
print(f"📡 [LICENSING SYSTEM DEBUG] ĐANG GỌI CỔNG XÁC THỰC TỪ XA", flush=True)
print(f" • API Target Endpoint : {SERVER_URL}/api/verify-key", flush=True)
print(f" • Secret Header Token : {SECRET_API_KEY[:7]}*** (Ẩn một phần)", flush=True)
print(f" • Checking User Key : '{token}'", flush=True)
print(f" • Hardware Device ID : '{device_id}'", flush=True)
print("🚀 " + "="*55, flush=True)
try:
headers = {"X-API-Key": SECRET_API_KEY, "Content-Type": "application/json"}
payload = {"key": token, "hwid": device_id}
start_time = time.time()
response = requests.post(f"{SERVER_URL}/api/verify-key", json=payload, headers=headers, timeout=10)
duration = time.time() - start_time
print("\n" + "📥 " + "—"*55, flush=True)
print(f"📡 [RESPONSE RECEIVED] Phản hồi trả về sau {duration:.3f} giây:", flush=True)
print(f" • HTTP Status Code : {response.status_code}", flush=True)
print(f" • Response Headers : {dict(response.headers)}", flush=True)
print(f" • Raw Plain Text Content (Dữ liệu thô từ Server):\n{response.text}", flush=True)
print("📥 " + "—"*55 + "\n", flush=True)
# --------------------------------------------------
# THÊM LOGIC: TỰ ĐỘNG CHUYỂN ĐỔI TUYẾN ĐƯỜNG KHI DÍNH LỖI ĐỊNH VỊ 404 NOT FOUND
# --------------------------------------------------
if response.status_code == 404:
fallback_url = f"{SERVER_URL}/verify-key"
print(f"⚠️ [404 DETECTED] Không tìm thấy endpoint /api/verify-key. Đang tự động kích hoạt Route dự phòng: {fallback_url}", flush=True)
start_time = time.time()
response = requests.post(fallback_url, json=payload, headers=headers, timeout=10)
duration = time.time() - start_time
print(f"📡 [FALLBACK RESPONSE] Kết quả từ route dự phòng sau {duration:.3f} giây: Code {response.status_code}", flush=True)
print(f" • Raw Content dự phòng:\n{response.text}", flush=True)
if response.status_code == 200:
res_data = response.json()
if res_data.get("status") == "success":
tx_info = res_data.get("data", {})
expiry_str = tx_info.get("expiry_date", datetime.now().strftime("%Y-%m-%d"))
try:
expiry_ts = time.mktime(time.strptime(expiry_str, "%Y-%m-%d"))
except Exception:
expiry_ts = time.time() + (3 * 86400) # Dự phòng mặc định 3 ngày nếu parse lỗi
data["keys"][token] = {
"tx_name": tx_info.get("buyer_name", "Anonymous User"),
"amount": tx_info.get("amount_paid", "$2.99"),
"tx_date": tx_info.get("payment_date", datetime.now().strftime("%Y-%m-%d")),
"expiry": expiry_str,
"expiry_timestamp": expiry_ts
}
_save_storage(data)
days_left = int((expiry_ts - time.time()) / 86400)
return True, {
"tier": "VIP", "tx_name": tx_info.get("buyer_name"), "amount": tx_info.get("amount_paid"),
"tx_date": tx_info.get("payment_date"), "expiry": expiry_str, "days_left": max(0, days_left),
"msg": f"👑 VIP KEY VERIFIED | Owner: {tx_info.get('buyer_name')} | Expiry: {expiry_str}.",
"show_test_panel": False,
"remove_watermark": True,
"bypass_limits": False
}
else:
print(f"⚠️ [LOGIC FAILURE] Server phản hồi HTTP 200 thành công nhưng 'status' bị từ chối: '{res_data.get('status')}'", flush=True)
else:
print(f"❌ [HTTP ERROR DETECTED] Server từ chối xử lý kết nối với mã lỗi: {response.status_code}", flush=True)
return False, {"msg": "❌ Invalid Access Key or communication failure with payment gateway!", "show_test_panel": False, "remove_watermark": False, "tier": "FREE"}
except requests.exceptions.Timeout:
print("🚨 [TIMEOUT ERROR] Cổng thanh toán hoặc máy chủ Licensify không phản hồi sau 10 giây!", flush=True)
return False, {"msg": "⚠️ Connection timeout to Licensify Server!", "show_test_panel": False, "remove_watermark": False, "tier": "FREE"}
except requests.exceptions.RequestException as req_err:
print(f"🚨 [NETWORK EXCEPTION] Lỗi kết nối mạng/DNS vật lý: {str(req_err)}", flush=True)
return False, {"msg": "⚠️ Network request communication failure!", "show_test_panel": False, "remove_watermark": False, "tier": "FREE"}
except Exception as e:
print(f"🚨 [UNKNOWN CRASH] Lỗi xử lý logic ngầm: {str(e)}", flush=True)
return False, {"msg": f"⚠️ Connection error to Licensify Server! Info: {str(e)}", "show_test_panel": False, "remove_watermark": False, "tier": "FREE"}
# =========================================================
# CORE 2: CƠ CHẾ PHÂN LUỒNG THÔNG MINH (5 VIP - 1 FREE, ÉP TRỤC XUẤT)
# =========================================================
def get_thread_status_json():
"""Hàm này nhảy số thực tế từng giây dựa trên trạng thái tệp tập trung"""
clean_dead_or_zombie_threads()
state = _load_threads_state()
busy_count = sum(1 for t in state.values() if t["status"] == "rendering")
vip_count = sum(1 for t in state.values() if t["type"] == "VIP")
free_count = sum(1 for t in state.values() if t["type"] == "FREE")
return {
"busy_channels": f"{busy_count}/6",
"vip_active": vip_count,
"free_active": free_count,
"pool": state
}
def allocate_render_thread(key_input: str, device_id: str, is_vip: bool) -> tuple:
"""Cơ chế phân luồng nghiêm ngặt: 5 luồng VIP độc quyền (1-5), luồng 6 cho Free. """
clean_dead_or_zombie_threads()
state = _load_threads_state()
token = key_input.strip()
# Chặn đứng trường hợp chính Key đó hoặc Device đó đang render trùng lặp
for slot_id, thread in state.items():
if thread["status"] == "rendering":
if is_vip and token and thread["key"] == token:
return False, "❌ This VIP Key is already running a rendering process! Multi-thread allocation denied."
if not is_vip& device_id and thread["device"] == device_id:
return False, "❌ Your device is currently rendering a video. Please wait until it completes!"
target_slot = None
if is_vip:
for i in range(1, 6):
if state[str(i)]["status"] == "idle":
target_slot = str(i)
break
if not target_slot and state["6"]["status"] == "idle":
target_slot = "6"
if not target_slot:
for i in ["6", "1", "2", "3", "4", "5"]:
if state[i]["type"] == "FREE":
free_pid = state[i]["pid"]
logger.warning(f"👑 VIP Eviction Triggered: Terminating Free PID {free_pid} at Slot {i} to liberate resources.")
try:
if free_pid:
os.kill(free_pid, signal.SIGKILL)
except ProcessLookupError:
pass
target_slot = i
state[i] = {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0}
break
else:
if state["6"]["status"] == "idle":
target_slot = "6"
else:
for i in range(1, 6):
if state[str(i)]["status"] == "idle":
target_slot = str(i)
break
if not target_slot:
return False, "⚠️ All rendering channels are currently full. Please try again in a few moments!"
return True, int(target_slot)
def register_process_to_slot(slot: int, key_input: str, device_id: str, is_vip: bool, pid: int):
state = _load_threads_state()
state[str(slot)] = {
"status": "rendering",
"key": key_input.strip() if is_vip else None,
"type": "VIP" if is_vip else "FREE",
"pid": pid,
"device": device_id,
"start_time": time.time()
}
_save_threads_state(state)
def release_thread_slot(slot: int):
state = _load_threads_state()
slot_str = str(slot)
if slot_str in state:
state[slot_str] = {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0}
_save_threads_state(state)
# =========================================================
# CORE 3: GIÁM SÁT HẠN MỨC CHẶN CHẶT CHẼ
# =========================================================
def check_generation_limits(key_input: str, device_id: str, is_vip: bool) -> tuple:
clean_expired_keys()
data = _load_storage()
today_str = datetime.now().strftime("%Y-%m-%d")
now_ts = time.time()
if is_vip:
if key_input not in data["keys"]:
return False, "License error: Key missing from verified list."
vip_data = data["keys"][key_input]
if vip_data.get("last_date_used") != today_str:
vip_data["last_date_used"] = today_str
vip_data["daily_batches"] = 0
vip_data["videos_this_batch"] = 0
vip_data["cooldown_until"] = 0
if now_ts < vip_data.get("cooldown_until", 0):
wait_min = int((vip_data["cooldown_until"] - now_ts) / 60)
return False, f"⏱️ Cooldown active! Please return in {wait_min} minutes."
if vip_data.get("daily_batches", 0) >= 5:
return False, "❌ You have exhausted your daily limit of 5 batches. Please return tomorrow!"
return True, {"status_str": f"VIP Usage: Batch {vip_data.get('daily_batches', 0)}/5"}
else:
if device_id not in data["free_devices"]:
data["free_devices"][device_id] = {
"last_date": today_str,
"daily_batches": 0,
"cooldown_until": 0
}
free_data = data["free_devices"][device_id]
if free_data.get("last_date") != today_str:
free_data["last_date"] = today_str
free_data["daily_batches"] = 0
free_data["cooldown_until"] = 0
if now_ts < free_data.get("cooldown_until", 0):
wait_hours = int((free_data["cooldown_until"] - now_ts) / 3600)
return False, f"⏱️ Free Tier Cooldown: Please wait {wait_hours + 1} hours before generating again, or upgrade to VIP Premium!"
if free_data.get("daily_batches", 0) >= 3:
return False, "❌ Free Tier Exhausted! Maximum 3 daily videos reached."
return True, {"status_str": f"Free Tier Usage: {free_data.get('daily_batches', 0)}/3 Videos"}
def commit_generation_success(key_input: str, device_id: str, is_vip: bool):
data = _load_storage()
now_ts = time.time()
if is_vip:
if key_input in data["keys"]:
vip_data = data["keys"][key_input]
vip_data["videos_this_batch"] = vip_data.get("videos_this_batch", 0) + 1
if vip_data["videos_this_batch"] >= 2:
vip_data["daily_batches"] = vip_data.get("daily_batches", 0) + 1
vip_data["videos_this_batch"] = 0
vip_data["cooldown_until"] = now_ts + 3600
else:
if device_id in data["free_devices"]:
free_data = data["free_devices"][device_id]
free_data["daily_batches"] = free_data.get("daily_batches", 0) + 1
free_data["cooldown_until"] = now_ts + (3 * 3600)
_save_storage(data)
# =========================================================
# CORE 4: FORCE STOP - KHÔNG TÍNH LƯỢT KHI HỦY HOẶC NÉT MẠNG RỚT
# =========================================================
def force_abort_user_session(key_input: str, device_id: str) -> bool:
state = _load_threads_state()
token = key_input.strip()
released = False
for slot_id, thread in state.items():
if thread["status"] == "rendering":
is_match = (thread["key"] == token) if thread["type"] == "VIP" else (thread["device"] == device_id)
if is_match:
pid = thread.get("pid")
if pid:
try:
logger.warning(f"🛑 Manual Force Abort triggered. Killing Video Engine Process PID {pid} instantly.")
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass
state[slot_id] = {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0}
released = True
if released:
_save_threads_state(state)
return released
def execute_admin_diagnostic_test() -> str:
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
test_script = os.path.join(current_dir, "tester.py")
if not os.path.exists(test_script):
return f"❌ FILE MISSING: Tệp 'tester.py' không tồn tại tại vị trí cấu trúc: {test_script}!"
try:
result = subprocess.run([sys.executable, test_script], capture_output=True, text=True, timeout=30)
if result.returncode == 0:
return f"✅ [TESTER REPORT SUCCESS]:\n{result.stdout.strip()}"
else:
return f"❌ [TESTER REPORT CRASHED WITH EXIT CODE {result.returncode}]:\n{result.stderr.strip()}"
except subprocess.TimeoutExpired:
return "⚠️ [TESTER TIMEOUT]: Tiến trình kiểm thử vượt ngưỡng thời gian chờ 30 giây!"
except Exception as e:
return f"❌ [SYSTEM CRASH]: Lỗi phát sinh từ hệ thống gọi file: {str(e)}"
def check_local_cache_validity(*args, **kwargs): return True
def clean_expired_keys_from_storage(*args, **kwargs): return clean_expired_keys()
def check_and_reset_daily_quota(*args, **kwargs): return True
def execute_preemption_kick(*args, **kwargs): return True
def is_session_already_rendering(key_input: str, device_id: str, is_vip: bool = False) -> bool:
state = _load_threads_state()
token = key_input.strip() if key_input else ""
if not token and not device_id:
return False
for slot_id, thread in state.items():
if thread.get("status") == "rendering":
if is_vip and token and thread.get("key") == token: return True
if not is_vip and device_id and thread.get("device") == device_id: return True
if not is_vip and device_id:
for i in range(1, 6):
if state[str(i)].get("device") == device_id and state[str(i)].get("status") == "rendering":
return True
return False