# ========================================================= # MODULE: licensing_client.py # SYSTEM: ADVANCED RESOURCE & CONCURRENCY THREAD MANAGER # FIX: DYNAMIC FILE-BASED TRANSACTION SHARING FOR MULTI-WORKERS # DEBUGGING: CONTAINER RAW LOGS TRACER WITH STDOUT FLUSHING # FALLBACK INTEGRATION: AUTOMATIC ROUTE PATH RECOVERY (FIX 404) # AUTHOR: Abu Alone © 2026 # ========================================================= import os import json import time import signal import requests import subprocess from datetime import datetime from loguru import logger # Cấu hình đường dẫn lưu trữ và thông tin Server cổng thanh toán dịch vụ STORAGE_FILE = "Hugging AbuAlone09/AI-Video-Engine-storage" # File trạng thái phân luồng dùng chung giữa toàn bộ các Worker tiến trình để số nhảy thực tế từng giây THREAD_STATUS_FILE = "Hugging AbuAlone09/AI-Video-Engine-threads" SERVER_URL = os.getenv("LICENSIFY_SERVER_URL", "https://abualone09-my-licensify-server.hf.space").strip() SECRET_API_KEY = os.getenv("SECRET_API_KEY", "YOUR_SECRET_API_KEY_HERE").strip() # Khởi tạo thư mục và tệp cấu trúc lưu trữ os.makedirs(os.path.dirname(STORAGE_FILE), exist_ok=True) def _init_system_files(): if not os.path.exists(STORAGE_FILE): with open(STORAGE_FILE, "w", encoding="utf-8") as f: json.dump({"keys": {}, "free_devices": {}}, f, indent=4) # Khởi tạo pool 6 luồng thực tế vào file dùng chung nếu chưa có if not os.path.exists(THREAD_STATUS_FILE): initial_pool = {str(i): {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0} for i in range(1, 7)} with open(THREAD_STATUS_FILE, "w", encoding="utf-8") as f: json.dump(initial_pool, f, indent=4) _init_system_files() # ========================================================= # THÀNH PHẦN CORE: ĐỒNG BỘ THỜI GIAN THỰC ĐA TIẾN TRÌNH (FILE-BASED MONITOR) # ========================================================= def _load_threads_state() -> dict: """Đọc trực tiếp trạng thái luồng từ tệp dùng chung để tránh lệch số giữa các API worker""" try: if os.path.exists(THREAD_STATUS_FILE): with open(THREAD_STATUS_FILE, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass return {str(i): {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0} for i in range(1, 7)} def _save_threads_state(state: dict): """Ghi trạng thái luồng xuống tệp khóa ngay lập tức để đồng bộ hóa""" try: with open(THREAD_STATUS_FILE, "w", encoding="utf-8") as f: json.dump(state, f, indent=4) except Exception as e: logger.error(f"Failed to write thread state cluster: {e}") def get_active_threads_count() -> int: """Quét sạch tiến trình chết rớt mạng rồi tính tổng số luồng thực tế đang bận render""" clean_dead_or_zombie_threads() state = _load_threads_state() return sum(1 for t in state.values() if t["status"] == "rendering") def clean_dead_or_zombie_threads(): """Kiểm tra PID hệ thống Linux để giải phóng luồng ngay lập tức nếu người dùng đóng tab, rớt mạng, F5""" state = _load_threads_state() changed = False for slot_id, thread in state.items(): if thread["status"] == "rendering": pid = thread.get("pid") if pid: try: # Gửi tín hiệu 0 để kiểm tra xem tiến trình tạo video đó còn sống thực tế không os.kill(pid, 0) except (ProcessLookupError, PermissionError): # Tiến trình đã chết (người dùng hủy tab hoặc crash giữa chừng) -> Trả luồng về idle lập tức logger.warning(f"🧹 Clean-up monitor detected dead PID {pid} on Slot {slot_id}. Reverting to idle.") state[slot_id] = {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0} changed = True if changed: _save_threads_state(state) # ========================================================= # CORE 1: XỬ LÝ ĐỌC / GHI & XÁC THỰC API KEY TỪ XA TỚI SERVER # ========================================================= def _load_storage(): with open(STORAGE_FILE, "r", encoding="utf-8") as f: return json.load(f) def _save_storage(data): with open(STORAGE_FILE, "w", encoding="utf-8") as f: json.dump(data, f, indent=4) def clean_expired_keys(): data = _load_storage() now_ts = time.time() changed = False for k, info in list(data["keys"].items()): if now_ts > info.get("expiry_timestamp", 0): del data["keys"][k] changed = True logger.warning(f"Key {k} expired and has been automatically purged from storage.") today_str = datetime.now().strftime("%Y-%m-%d") for dev, info in list(data["free_devices"].items()): if info.get("last_date") != today_str: info["daily_batches"] = 0 info["last_date"] = today_str changed = True if changed: _save_storage(data) def verify_and_get_license_info(key_input: str, device_id: str) -> tuple: clean_expired_keys() token = key_input.strip() admin_secret = os.getenv("ADMIN_KEY", "ADMIN_ABUALONE_2026").strip() vip_secret = os.getenv("VIP_KEY", "VIP_PROMO_2026").strip() if token == admin_secret: return True, { "tier": "ADMIN", "tx_name": "System Master Administrator", "amount": "$0.00 (Root Privilege)", "tx_date": datetime.now().strftime("%Y-%m-%d"), "expiry": "Permanent Access", "days_left": 9999, "msg": "📋 SYSTEM STATUS: Admin Master Active. Full hardware diagnostic test layer unlocked.", "show_test_panel": True, "remove_watermark": True, "bypass_limits": True } if token == vip_secret: return True, { "tier": "VIP", "tx_name": "VIP Promotional Access", "amount": "$0.00 (Promo Tier)", "tx_date": datetime.now().strftime("%Y-%m-%d"), "expiry": "2026-12-31", "days_left": 200, "msg": "👑 VIP STATUS: Promo Key Verified. Watermarks and daily rendering limits removed.", "show_test_panel": False, "remove_watermark": True, "bypass_limits": True } data = _load_storage() if token in data["keys"]: info = data["keys"][token] days_left = int((info["expiry_timestamp"] - time.time()) / 86400) return True, { "tier": "VIP", "tx_name": info["tx_name"], "amount": info["amount"], "tx_date": info["tx_date"], "expiry": info["expiry"], "days_left": max(0, days_left), "msg": f"👑 VIP ACTIVE | User: {info['tx_name']} | {max(0, days_left)} days remaining.", "show_test_panel": False, "remove_watermark": True, "bypass_limits": False } # -------------------------------------------------- # BỘ KHỞI CHẠY CHẨN ĐOÁN LỖI HUGGING FACE CONTAINER (STDOUT FORCED) # -------------------------------------------------- print("\n" + "🚀 " + "="*55, flush=True) print(f"📡 [LICENSING SYSTEM DEBUG] ĐANG GỌI CỔNG XÁC THỰC TỪ XA", flush=True) print(f" • API Target Endpoint : {SERVER_URL}/api/verify-key", flush=True) print(f" • Secret Header Token : {SECRET_API_KEY[:7]}*** (Ẩn một phần)", flush=True) print(f" • Checking User Key : '{token}'", flush=True) print(f" • Hardware Device ID : '{device_id}'", flush=True) print("🚀 " + "="*55, flush=True) try: headers = {"X-API-Key": SECRET_API_KEY, "Content-Type": "application/json"} payload = {"key": token, "hwid": device_id} start_time = time.time() response = requests.post(f"{SERVER_URL}/api/verify-key", json=payload, headers=headers, timeout=10) duration = time.time() - start_time print("\n" + "📥 " + "—"*55, flush=True) print(f"📡 [RESPONSE RECEIVED] Phản hồi trả về sau {duration:.3f} giây:", flush=True) print(f" • HTTP Status Code : {response.status_code}", flush=True) print(f" • Response Headers : {dict(response.headers)}", flush=True) print(f" • Raw Plain Text Content (Dữ liệu thô từ Server):\n{response.text}", flush=True) print("📥 " + "—"*55 + "\n", flush=True) # -------------------------------------------------- # THÊM LOGIC: TỰ ĐỘNG CHUYỂN ĐỔI TUYẾN ĐƯỜNG KHI DÍNH LỖI ĐỊNH VỊ 404 NOT FOUND # -------------------------------------------------- if response.status_code == 404: fallback_url = f"{SERVER_URL}/verify-key" print(f"⚠️ [404 DETECTED] Không tìm thấy endpoint /api/verify-key. Đang tự động kích hoạt Route dự phòng: {fallback_url}", flush=True) start_time = time.time() response = requests.post(fallback_url, json=payload, headers=headers, timeout=10) duration = time.time() - start_time print(f"📡 [FALLBACK RESPONSE] Kết quả từ route dự phòng sau {duration:.3f} giây: Code {response.status_code}", flush=True) print(f" • Raw Content dự phòng:\n{response.text}", flush=True) if response.status_code == 200: res_data = response.json() if res_data.get("status") == "success": tx_info = res_data.get("data", {}) expiry_str = tx_info.get("expiry_date", datetime.now().strftime("%Y-%m-%d")) try: expiry_ts = time.mktime(time.strptime(expiry_str, "%Y-%m-%d")) except Exception: expiry_ts = time.time() + (3 * 86400) # Dự phòng mặc định 3 ngày nếu parse lỗi data["keys"][token] = { "tx_name": tx_info.get("buyer_name", "Anonymous User"), "amount": tx_info.get("amount_paid", "$2.99"), "tx_date": tx_info.get("payment_date", datetime.now().strftime("%Y-%m-%d")), "expiry": expiry_str, "expiry_timestamp": expiry_ts } _save_storage(data) days_left = int((expiry_ts - time.time()) / 86400) return True, { "tier": "VIP", "tx_name": tx_info.get("buyer_name"), "amount": tx_info.get("amount_paid"), "tx_date": tx_info.get("payment_date"), "expiry": expiry_str, "days_left": max(0, days_left), "msg": f"👑 VIP KEY VERIFIED | Owner: {tx_info.get('buyer_name')} | Expiry: {expiry_str}.", "show_test_panel": False, "remove_watermark": True, "bypass_limits": False } else: print(f"⚠️ [LOGIC FAILURE] Server phản hồi HTTP 200 thành công nhưng 'status' bị từ chối: '{res_data.get('status')}'", flush=True) else: print(f"❌ [HTTP ERROR DETECTED] Server từ chối xử lý kết nối với mã lỗi: {response.status_code}", flush=True) return False, {"msg": "❌ Invalid Access Key or communication failure with payment gateway!", "show_test_panel": False, "remove_watermark": False, "tier": "FREE"} except requests.exceptions.Timeout: print("🚨 [TIMEOUT ERROR] Cổng thanh toán hoặc máy chủ Licensify không phản hồi sau 10 giây!", flush=True) return False, {"msg": "⚠️ Connection timeout to Licensify Server!", "show_test_panel": False, "remove_watermark": False, "tier": "FREE"} except requests.exceptions.RequestException as req_err: print(f"🚨 [NETWORK EXCEPTION] Lỗi kết nối mạng/DNS vật lý: {str(req_err)}", flush=True) return False, {"msg": "⚠️ Network request communication failure!", "show_test_panel": False, "remove_watermark": False, "tier": "FREE"} except Exception as e: print(f"🚨 [UNKNOWN CRASH] Lỗi xử lý logic ngầm: {str(e)}", flush=True) return False, {"msg": f"⚠️ Connection error to Licensify Server! Info: {str(e)}", "show_test_panel": False, "remove_watermark": False, "tier": "FREE"} # ========================================================= # CORE 2: CƠ CHẾ PHÂN LUỒNG THÔNG MINH (5 VIP - 1 FREE, ÉP TRỤC XUẤT) # ========================================================= def get_thread_status_json(): """Hàm này nhảy số thực tế từng giây dựa trên trạng thái tệp tập trung""" clean_dead_or_zombie_threads() state = _load_threads_state() busy_count = sum(1 for t in state.values() if t["status"] == "rendering") vip_count = sum(1 for t in state.values() if t["type"] == "VIP") free_count = sum(1 for t in state.values() if t["type"] == "FREE") return { "busy_channels": f"{busy_count}/6", "vip_active": vip_count, "free_active": free_count, "pool": state } def allocate_render_thread(key_input: str, device_id: str, is_vip: bool) -> tuple: """Cơ chế phân luồng nghiêm ngặt: 5 luồng VIP độc quyền (1-5), luồng 6 cho Free. """ clean_dead_or_zombie_threads() state = _load_threads_state() token = key_input.strip() # Chặn đứng trường hợp chính Key đó hoặc Device đó đang render trùng lặp for slot_id, thread in state.items(): if thread["status"] == "rendering": if is_vip and token and thread["key"] == token: return False, "❌ This VIP Key is already running a rendering process! Multi-thread allocation denied." if not is_vip& device_id and thread["device"] == device_id: return False, "❌ Your device is currently rendering a video. Please wait until it completes!" target_slot = None if is_vip: for i in range(1, 6): if state[str(i)]["status"] == "idle": target_slot = str(i) break if not target_slot and state["6"]["status"] == "idle": target_slot = "6" if not target_slot: for i in ["6", "1", "2", "3", "4", "5"]: if state[i]["type"] == "FREE": free_pid = state[i]["pid"] logger.warning(f"👑 VIP Eviction Triggered: Terminating Free PID {free_pid} at Slot {i} to liberate resources.") try: if free_pid: os.kill(free_pid, signal.SIGKILL) except ProcessLookupError: pass target_slot = i state[i] = {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0} break else: if state["6"]["status"] == "idle": target_slot = "6" else: for i in range(1, 6): if state[str(i)]["status"] == "idle": target_slot = str(i) break if not target_slot: return False, "⚠️ All rendering channels are currently full. Please try again in a few moments!" return True, int(target_slot) def register_process_to_slot(slot: int, key_input: str, device_id: str, is_vip: bool, pid: int): state = _load_threads_state() state[str(slot)] = { "status": "rendering", "key": key_input.strip() if is_vip else None, "type": "VIP" if is_vip else "FREE", "pid": pid, "device": device_id, "start_time": time.time() } _save_threads_state(state) def release_thread_slot(slot: int): state = _load_threads_state() slot_str = str(slot) if slot_str in state: state[slot_str] = {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0} _save_threads_state(state) # ========================================================= # CORE 3: GIÁM SÁT HẠN MỨC CHẶN CHẶT CHẼ # ========================================================= def check_generation_limits(key_input: str, device_id: str, is_vip: bool) -> tuple: clean_expired_keys() data = _load_storage() today_str = datetime.now().strftime("%Y-%m-%d") now_ts = time.time() if is_vip: if key_input not in data["keys"]: return False, "License error: Key missing from verified list." vip_data = data["keys"][key_input] if vip_data.get("last_date_used") != today_str: vip_data["last_date_used"] = today_str vip_data["daily_batches"] = 0 vip_data["videos_this_batch"] = 0 vip_data["cooldown_until"] = 0 if now_ts < vip_data.get("cooldown_until", 0): wait_min = int((vip_data["cooldown_until"] - now_ts) / 60) return False, f"⏱️ Cooldown active! Please return in {wait_min} minutes." if vip_data.get("daily_batches", 0) >= 5: return False, "❌ You have exhausted your daily limit of 5 batches. Please return tomorrow!" return True, {"status_str": f"VIP Usage: Batch {vip_data.get('daily_batches', 0)}/5"} else: if device_id not in data["free_devices"]: data["free_devices"][device_id] = { "last_date": today_str, "daily_batches": 0, "cooldown_until": 0 } free_data = data["free_devices"][device_id] if free_data.get("last_date") != today_str: free_data["last_date"] = today_str free_data["daily_batches"] = 0 free_data["cooldown_until"] = 0 if now_ts < free_data.get("cooldown_until", 0): wait_hours = int((free_data["cooldown_until"] - now_ts) / 3600) return False, f"⏱️ Free Tier Cooldown: Please wait {wait_hours + 1} hours before generating again, or upgrade to VIP Premium!" if free_data.get("daily_batches", 0) >= 3: return False, "❌ Free Tier Exhausted! Maximum 3 daily videos reached." return True, {"status_str": f"Free Tier Usage: {free_data.get('daily_batches', 0)}/3 Videos"} def commit_generation_success(key_input: str, device_id: str, is_vip: bool): data = _load_storage() now_ts = time.time() if is_vip: if key_input in data["keys"]: vip_data = data["keys"][key_input] vip_data["videos_this_batch"] = vip_data.get("videos_this_batch", 0) + 1 if vip_data["videos_this_batch"] >= 2: vip_data["daily_batches"] = vip_data.get("daily_batches", 0) + 1 vip_data["videos_this_batch"] = 0 vip_data["cooldown_until"] = now_ts + 3600 else: if device_id in data["free_devices"]: free_data = data["free_devices"][device_id] free_data["daily_batches"] = free_data.get("daily_batches", 0) + 1 free_data["cooldown_until"] = now_ts + (3 * 3600) _save_storage(data) # ========================================================= # CORE 4: FORCE STOP - KHÔNG TÍNH LƯỢT KHI HỦY HOẶC NÉT MẠNG RỚT # ========================================================= def force_abort_user_session(key_input: str, device_id: str) -> bool: state = _load_threads_state() token = key_input.strip() released = False for slot_id, thread in state.items(): if thread["status"] == "rendering": is_match = (thread["key"] == token) if thread["type"] == "VIP" else (thread["device"] == device_id) if is_match: pid = thread.get("pid") if pid: try: logger.warning(f"🛑 Manual Force Abort triggered. Killing Video Engine Process PID {pid} instantly.") os.kill(pid, signal.SIGKILL) except ProcessLookupError: pass state[slot_id] = {"status": "idle", "key": None, "type": None, "pid": None, "device": None, "start_time": 0} released = True if released: _save_threads_state(state) return released def execute_admin_diagnostic_test() -> str: import sys current_dir = os.path.dirname(os.path.abspath(__file__)) test_script = os.path.join(current_dir, "tester.py") if not os.path.exists(test_script): return f"❌ FILE MISSING: Tệp 'tester.py' không tồn tại tại vị trí cấu trúc: {test_script}!" try: result = subprocess.run([sys.executable, test_script], capture_output=True, text=True, timeout=30) if result.returncode == 0: return f"✅ [TESTER REPORT SUCCESS]:\n{result.stdout.strip()}" else: return f"❌ [TESTER REPORT CRASHED WITH EXIT CODE {result.returncode}]:\n{result.stderr.strip()}" except subprocess.TimeoutExpired: return "⚠️ [TESTER TIMEOUT]: Tiến trình kiểm thử vượt ngưỡng thời gian chờ 30 giây!" except Exception as e: return f"❌ [SYSTEM CRASH]: Lỗi phát sinh từ hệ thống gọi file: {str(e)}" def check_local_cache_validity(*args, **kwargs): return True def clean_expired_keys_from_storage(*args, **kwargs): return clean_expired_keys() def check_and_reset_daily_quota(*args, **kwargs): return True def execute_preemption_kick(*args, **kwargs): return True def is_session_already_rendering(key_input: str, device_id: str, is_vip: bool = False) -> bool: state = _load_threads_state() token = key_input.strip() if key_input else "" if not token and not device_id: return False for slot_id, thread in state.items(): if thread.get("status") == "rendering": if is_vip and token and thread.get("key") == token: return True if not is_vip and device_id and thread.get("device") == device_id: return True if not is_vip and device_id: for i in range(1, 6): if state[str(i)].get("device") == device_id and state[str(i)].get("status") == "rendering": return True return False