import os import sys import time import shutil import logging import subprocess from uuid import uuid4 from loguru import logger from fastapi import FastAPI, Request, Form, HTTPException, BackgroundTasks from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware from app.config import config from app.models.schema import VideoParams, VideoAspect, VideoConcatMode from app.services import voice from app.services import task as tm from app.utils import utils import licensing_client app = FastAPI(title="AI Video Engine Commercial") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --- BỘ NHỚ LƯU TRỮ LOG TẠM THỜI ĐC TỐI ƯU HÓA PHÂN LUỒNG --- GLOBAL_TERMINAL_LOGS = {} def secure_terminal_log_dispatcher(msg): """ Bộ điều phối log thông minh: Phân tích cú pháp dòng log, chỉ đẩy log của TASK nào vào đúng hộp lưu trữ của TASK đó. """ log_text = msg.strip() # Tìm kiếm xem dòng log có chứa nhãn định danh [TASK-xxxx] hay không for task_id in list(GLOBAL_TERMINAL_LOGS.keys()): if task_id in log_text: GLOBAL_TERMINAL_LOGS[task_id].append(log_text) # Hoặc nếu là log khởi tạo luồng chung của chính hệ thống cấp slot elif "allocated Thread Slot" in log_text and task_id in log_text: GLOBAL_TERMINAL_LOGS[task_id].append(log_text) # Đăng ký bộ cấu hình log chuẩn hóa của Loguru logger.remove() # Loại bỏ handler mặc định để tránh in trùng lặp log logger.add(sys.stderr, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}") logger.add(secure_terminal_log_dispatcher, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}") # --- 1. SECRETS & CLOUDFLARE DEFAULTS --- PEXELS_KEY = os.getenv("PEXELS_KEY", "").strip() CF_TOKEN = os.getenv("CF_API_TOKEN", "").strip() CF_ID = os.getenv("CF_ACCOUNT_ID", "").strip() if PEXELS_KEY: config.app["pexels_api_keys"] = [PEXELS_KEY] if CF_TOKEN and CF_ID: config.app["llm_provider"] = "cloudflare" config.app["cloudflare_account_id"] = CF_ID config.app["cloudflare_api_key"] = CF_TOKEN config.app["cloudflare_model_name"] = "@cf/meta/llama-3-8b-instruct" OUTPUT_DIR = utils.storage_dir("videos", True) if os.path.exists(OUTPUT_DIR): app.mount("/static_videos", StaticFiles(directory=OUTPUT_DIR), name="static_videos") def get_voice_list(): path = os.path.join(os.path.dirname(__file__), "voice-list.txt") if os.path.exists(path): try: with open(path, "r", encoding="utf-8") as f: voices = [line.split("Name:")[-1].strip() for line in f if "Name:" in line] if voices: return voices except Exception as e: logger.error(f"Error reading voice list: {e}") return ["en-US-AvaNeural", "vi-VN-HoaiMyNeural"] # --- API CẬP NHẬT TRẠNG THÁI LUỒNG THỜI GIAN THỰC ĐỒNG BỘ MỖI GIÂY --- @app.get("/api/system-status") async def get_system_status(key: str = "", request: Request = None): device_id = request.client.host if request else "MOBILE_DEFAULT_NODE" try: thread_data = licensing_client.get_thread_status_json() active_str = thread_data["busy_channels"] is_valid, key_info = licensing_client.verify_and_get_license_info(key.strip(), device_id) if is_valid: return { "thread_string": active_str, "key_info": { "status": "success", "type": key_info.get("tier", "FREE").lower(), "tx_id": key_info.get("tx_name", "N/A"), "amount": key_info.get("amount", "N/A"), "issued_date": key_info.get("tx_date", "N/A"), "expiry_date": key_info.get("expiry", "N/A"), "days_left": key_info.get("days_left", 0), "show_test_panel": key_info.get("show_test_panel", False) } } else: return { "thread_string": active_str, "key_info": { "status": "failed", "type": "free", "show_test_panel": False } } except Exception: return { "thread_string": "0/6", "key_info": { "status": "failed", "type": "free", "show_test_panel": False } } # --- API CHUYÊN TRÁCH XỬ LÝ NÚT BẤM VALIDATE TOKEN --- @app.post("/api/validate-key") async def validate_user_key(request: Request): try: data = await request.json() token = data.get("key", "").strip() except Exception: return JSONResponse(status_code=400, content={"status": "error", "message": "Invalid raw payload structure."}) device_id = request.client.host if request else "MOBILE_DEFAULT_NODE" if not token: return JSONResponse(status_code=400, content={"status": "error", "message": "Access token cannot be empty!"}) is_valid, key_info = licensing_client.verify_and_get_license_info(token, device_id) if not is_valid: return JSONResponse(status_code=401, content={"status": "error", "message": "Authentication Failed: Invalid or expired security token."}) return { "status": "success", "message": "Credentials verification payload synchronized.", "tier": key_info.get("tier", "FREE").lower(), "days_left": key_info.get("days_left", 0) } @app.get("/api/logs/{task_id}") async def get_task_logs(task_id: str): logs = GLOBAL_TERMINAL_LOGS.get(task_id, ["Connecting to master container log channel..."]) return {"logs": "\n".join(logs)} @app.post("/api/admin/run-test") async def run_admin_test(): report = licensing_client.execute_admin_diagnostic_test() return {"status": "success", "report": report} # --- API NGẮT TIẾN TRÌNH KHẨN CẤP KHI NHẤN NÚT STOP HOẶC RỚT MẠNG --- @app.post("/api/cancel-task") async def cancel_task(license_key: str = Form(""), request: Request = None): device_id = request.client.host if request else "MOBILE_NODE_2026" token = license_key.strip() try: released = licensing_client.force_abort_user_session(token, device_id) if released: return {"status": "success", "msg": "Task rendering forcefully terminated."} return {"status": "error", "msg": "No active tasks found for this session."} except Exception as e: return {"status": "error", "msg": str(e)} async def monitor_disconnect_stream(request: Request, token: str, device_id: str, slot: int): while True: if await request.is_disconnected(): logger.warning(f"🔌 Disconnect Event Detected (F5 or Tab Closed) for Device {device_id}. Terminating stream.") licensing_client.force_abort_user_session(token, device_id) licensing_client.release_thread_slot(slot) break await time.sleep(1) # --- 3. CORE PROCESSING INTERFACE WITH WATERMARK & LIMITS --- @app.post("/api/generate") async def api_generate( request: Request, background_tasks: BackgroundTasks, video_script: str = Form(...), clip_duration: int = Form(10), voice_rate: float = Form(1.0), selected_voice: str = Form("en-US-AvaNeural"), enable_subtitles: bool = Form(True), enable_bgm: bool = Form(True), license_key: str = Form("") ): if not video_script.strip(): return JSONResponse(status_code=400, content={"status": "error", "msg": "Please enter your script before generating!"}) token = license_key.strip() device_id = request.client.host if request.client else "MOBILE_NODE_2026" is_key_valid, key_info = licensing_client.verify_and_get_license_info(token, device_id) is_vip_key = (key_info.get("tier") in ["VIP", "ADMIN"]) if is_key_valid else False bypass_limits = key_info.get("bypass_limits", False) if not bypass_limits: allowed, limit_res = licensing_client.check_generation_limits(token, device_id, is_vip_key) if not allowed: return JSONResponse(status_code=400, content={"status": "error", "msg": limit_res}) success_alloc, slot_or_err = licensing_client.allocate_render_thread(token, device_id, is_vip_key) if not success_alloc: return JSONResponse(status_code=400, content={"status": "error", "msg": slot_or_err}) allocated_slot = slot_or_err current_pid = os.getpid() licensing_client.register_process_to_slot(allocated_slot, token, device_id, is_vip_key, current_pid) background_tasks.add_task(monitor_disconnect_stream, request, token, device_id, allocated_slot) task_id = f"TASK-{int(time.time())}" GLOBAL_TERMINAL_LOGS[task_id] = [f"🚀 System allocated Thread Slot #{allocated_slot} for PID {current_pid}"] params = VideoParams( video_subject=video_script[:30].strip(), video_script=video_script, video_aspect=VideoAspect.portrait, video_concat_mode=VideoConcatMode.random, video_clip_duration=clip_duration, voice_name=selected_voice, voice_rate=voice_rate, subtitle_enabled=enable_subtitles, bgm_type="random" if enable_bgm else "", n_threads=2 ) try: logger.info(f"[{task_id}] Starting Video Pipeline workflow...") GLOBAL_TERMINAL_LOGS[task_id].append("⚡ Fetching assets from Pexels API and preparing TTS narrative structure...") result = tm.start(task_id=task_id, params=params) if result and "videos" in result and len(result["videos"]) > 0: video_path = result["videos"][0] if os.path.exists(video_path): filename = os.path.basename(video_path) if not is_vip_key: GLOBAL_TERMINAL_LOGS[task_id].append("⚠️ Free tier session detected: Injecting mobile fluid watermark layer...") wm_filename = f"wm_{filename}" watermarked_path = os.path.join(OUTPUT_DIR, wm_filename) drawtext_filter = "drawtext=text='Hugging/AiVideoEngine':x='mod(t*35,w)':y='mod(t*15,h)':fontsize=24:fontcolor=white@0.35" ffmpeg_cmd = [ 'ffmpeg', '-y', '-i', video_path, '-vf', drawtext_filter, '-c:v', 'libx264', '-pix_fmt', 'yuv420p', '-c:a', 'copy', watermarked_path ] subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if os.path.exists(watermarked_path): filename = wm_filename licensing_client.commit_generation_success(token, device_id, is_vip_key) GLOBAL_TERMINAL_LOGS[task_id].append("✅ Rendering success. Exporting video stream to player node.") return { "status": "success", "task_id": task_id, "video_url": f"/static_videos/{filename}" } return JSONResponse(status_code=500, content={"status": "error", "task_id": task_id, "msg": "Render completed but output file not found."}) except Exception as e: logger.error(f"Execution Error: {str(e)}") return JSONResponse(status_code=500, content={"status": "error", "task_id": task_id, "msg": f"Render Error: {str(e)}"}) finally: licensing_client.release_thread_slot(allocated_slot) background_tasks.add_task(lambda: [time.sleep(30), GLOBAL_TERMINAL_LOGS.pop(task_id, None)]) # --- 4. GIAO DIỆN CHÍNH --- @app.get("/", response_class=HTMLResponse) async def index_page(): voices_options = "".join([f'' for v in get_voice_list()]) html_content = f""" AI VIDEO ENGINE | Control Center
Commercial Video System

AI VIDEO ENGINE

v3.1.0 Production Workshop

🌟 Automated Production Suite: An instant AI video factory engineered tailored for Shorts, TikTok, and Reels. It automatically aggregates premium background clips from Pexels, structures high-fidelity AI narrative voices, blends automated audio beds, and burns fully aligned dynamic subtitles—all rendered seamlessly with just 1-click.

💡 Our Mission: We are committed to engineering a highly accessible, low-cost automated content rendering architecture to help creators maximize efficiency while cutting software expenses down to zero.

⚠️ Hosting Environment Notice: This platform is deployed on a Free Tier Shared Server (2 vCPU, 16GB RAM, Shared Core Containers) on Hugging Face. Due to unallocated shared hardware limits, video generation cycles can be quite lengthy. To actively protect our instance against memory crashes or sudden cluster restarts, strict thread concurrency thresholds have been applied. We sincerely apologize for any processing delays and thank you for your understanding.

Empower the Infrastructure: To expand our server performance capacities, maintain uninterrupted deployment pipelines, and invest in our upcoming formal platform architectures, your contributions are crucial! Our commercial premium license matrix remains incredibly cost-effective, with the highest tier priced under the cost of two standard cups of coffee. Secure your high-speed access keys here:

Free tier users: 3 video creations/day (1 video per batch, 3-hour cooldown) with mobile fluid watermark. For comprehensive service tier details and premium pricing models, please click our licensing gateway. Purchase Premium Keys

🔐 System Authentication

Validate credentials to remove output limitations and stream watermarks

Status: Unverified (Free Tier System Active)

⚙️ Video Configuration

Tune rendering parameters and narration variables

0 / 1500 Chars

🎬 Studio Result

Realtime output file delivery node

Pipeline Rendering Operational...
Your video is being generated. This process takes a minimum of 5 minutes, please be patient. You can monitor the live rendering logs directly inside your Hugging Face space log console.
""" return HTMLResponse(content=html_content) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)