import os
import sys
import time
import shutil
import logging
import subprocess
from uuid import uuid4
from loguru import logger
from fastapi import FastAPI, Request, Form, HTTPException, BackgroundTasks
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from app.config import config
from app.models.schema import VideoParams, VideoAspect, VideoConcatMode
from app.services import voice
from app.services import task as tm
from app.utils import utils
import licensing_client
app = FastAPI(title="AI Video Engine Commercial")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- BỘ NHỚ LƯU TRỮ LOG TẠM THỜI ĐC TỐI ƯU HÓA PHÂN LUỒNG ---
GLOBAL_TERMINAL_LOGS = {}
def secure_terminal_log_dispatcher(msg):
"""
Bộ điều phối log thông minh: Phân tích cú pháp dòng log,
chỉ đẩy log của TASK nào vào đúng hộp lưu trữ của TASK đó.
"""
log_text = msg.strip()
# Tìm kiếm xem dòng log có chứa nhãn định danh [TASK-xxxx] hay không
for task_id in list(GLOBAL_TERMINAL_LOGS.keys()):
if task_id in log_text:
GLOBAL_TERMINAL_LOGS[task_id].append(log_text)
# Hoặc nếu là log khởi tạo luồng chung của chính hệ thống cấp slot
elif "allocated Thread Slot" in log_text and task_id in log_text:
GLOBAL_TERMINAL_LOGS[task_id].append(log_text)
# Đăng ký bộ cấu hình log chuẩn hóa của Loguru
logger.remove() # Loại bỏ handler mặc định để tránh in trùng lặp log
logger.add(sys.stderr, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
logger.add(secure_terminal_log_dispatcher, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
# --- 1. SECRETS & CLOUDFLARE DEFAULTS ---
PEXELS_KEY = os.getenv("PEXELS_KEY", "").strip()
CF_TOKEN = os.getenv("CF_API_TOKEN", "").strip()
CF_ID = os.getenv("CF_ACCOUNT_ID", "").strip()
if PEXELS_KEY:
config.app["pexels_api_keys"] = [PEXELS_KEY]
if CF_TOKEN and CF_ID:
config.app["llm_provider"] = "cloudflare"
config.app["cloudflare_account_id"] = CF_ID
config.app["cloudflare_api_key"] = CF_TOKEN
config.app["cloudflare_model_name"] = "@cf/meta/llama-3-8b-instruct"
OUTPUT_DIR = utils.storage_dir("videos", True)
if os.path.exists(OUTPUT_DIR):
app.mount("/static_videos", StaticFiles(directory=OUTPUT_DIR), name="static_videos")
def get_voice_list():
path = os.path.join(os.path.dirname(__file__), "voice-list.txt")
if os.path.exists(path):
try:
with open(path, "r", encoding="utf-8") as f:
voices = [line.split("Name:")[-1].strip() for line in f if "Name:" in line]
if voices: return voices
except Exception as e:
logger.error(f"Error reading voice list: {e}")
return ["en-US-AvaNeural", "vi-VN-HoaiMyNeural"]
# --- API CẬP NHẬT TRẠNG THÁI LUỒNG THỜI GIAN THỰC ĐỒNG BỘ MỖI GIÂY ---
@app.get("/api/system-status")
async def get_system_status(key: str = "", request: Request = None):
device_id = request.client.host if request else "MOBILE_DEFAULT_NODE"
try:
thread_data = licensing_client.get_thread_status_json()
active_str = thread_data["busy_channels"]
is_valid, key_info = licensing_client.verify_and_get_license_info(key.strip(), device_id)
if is_valid:
return {
"thread_string": active_str,
"key_info": {
"status": "success",
"type": key_info.get("tier", "FREE").lower(),
"tx_id": key_info.get("tx_name", "N/A"),
"amount": key_info.get("amount", "N/A"),
"issued_date": key_info.get("tx_date", "N/A"),
"expiry_date": key_info.get("expiry", "N/A"),
"days_left": key_info.get("days_left", 0),
"show_test_panel": key_info.get("show_test_panel", False)
}
}
else:
return {
"thread_string": active_str,
"key_info": { "status": "failed", "type": "free", "show_test_panel": False }
}
except Exception:
return { "thread_string": "0/6", "key_info": { "status": "failed", "type": "free", "show_test_panel": False } }
# --- API CHUYÊN TRÁCH XỬ LÝ NÚT BẤM VALIDATE TOKEN ---
@app.post("/api/validate-key")
async def validate_user_key(request: Request):
try:
data = await request.json()
token = data.get("key", "").strip()
except Exception:
return JSONResponse(status_code=400, content={"status": "error", "message": "Invalid raw payload structure."})
device_id = request.client.host if request else "MOBILE_DEFAULT_NODE"
if not token:
return JSONResponse(status_code=400, content={"status": "error", "message": "Access token cannot be empty!"})
is_valid, key_info = licensing_client.verify_and_get_license_info(token, device_id)
if not is_valid:
return JSONResponse(status_code=401, content={"status": "error", "message": "Authentication Failed: Invalid or expired security token."})
return {
"status": "success",
"message": "Credentials verification payload synchronized.",
"tier": key_info.get("tier", "FREE").lower(),
"days_left": key_info.get("days_left", 0)
}
@app.get("/api/logs/{task_id}")
async def get_task_logs(task_id: str):
logs = GLOBAL_TERMINAL_LOGS.get(task_id, ["Connecting to master container log channel..."])
return {"logs": "\n".join(logs)}
@app.post("/api/admin/run-test")
async def run_admin_test():
report = licensing_client.execute_admin_diagnostic_test()
return {"status": "success", "report": report}
# --- API NGẮT TIẾN TRÌNH KHẨN CẤP KHI NHẤN NÚT STOP HOẶC RỚT MẠNG ---
@app.post("/api/cancel-task")
async def cancel_task(license_key: str = Form(""), request: Request = None):
device_id = request.client.host if request else "MOBILE_NODE_2026"
token = license_key.strip()
try:
released = licensing_client.force_abort_user_session(token, device_id)
if released:
return {"status": "success", "msg": "Task rendering forcefully terminated."}
return {"status": "error", "msg": "No active tasks found for this session."}
except Exception as e:
return {"status": "error", "msg": str(e)}
async def monitor_disconnect_stream(request: Request, token: str, device_id: str, slot: int):
while True:
if await request.is_disconnected():
logger.warning(f"🔌 Disconnect Event Detected (F5 or Tab Closed) for Device {device_id}. Terminating stream.")
licensing_client.force_abort_user_session(token, device_id)
licensing_client.release_thread_slot(slot)
break
await time.sleep(1)
# --- 3. CORE PROCESSING INTERFACE WITH WATERMARK & LIMITS ---
@app.post("/api/generate")
async def api_generate(
request: Request,
background_tasks: BackgroundTasks,
video_script: str = Form(...),
clip_duration: int = Form(10),
voice_rate: float = Form(1.0),
selected_voice: str = Form("en-US-AvaNeural"),
enable_subtitles: bool = Form(True),
enable_bgm: bool = Form(True),
license_key: str = Form("")
):
if not video_script.strip():
return JSONResponse(status_code=400, content={"status": "error", "msg": "Please enter your script before generating!"})
token = license_key.strip()
device_id = request.client.host if request.client else "MOBILE_NODE_2026"
is_key_valid, key_info = licensing_client.verify_and_get_license_info(token, device_id)
is_vip_key = (key_info.get("tier") in ["VIP", "ADMIN"]) if is_key_valid else False
bypass_limits = key_info.get("bypass_limits", False)
if not bypass_limits:
allowed, limit_res = licensing_client.check_generation_limits(token, device_id, is_vip_key)
if not allowed:
return JSONResponse(status_code=400, content={"status": "error", "msg": limit_res})
success_alloc, slot_or_err = licensing_client.allocate_render_thread(token, device_id, is_vip_key)
if not success_alloc:
return JSONResponse(status_code=400, content={"status": "error", "msg": slot_or_err})
allocated_slot = slot_or_err
current_pid = os.getpid()
licensing_client.register_process_to_slot(allocated_slot, token, device_id, is_vip_key, current_pid)
background_tasks.add_task(monitor_disconnect_stream, request, token, device_id, allocated_slot)
task_id = f"TASK-{int(time.time())}"
GLOBAL_TERMINAL_LOGS[task_id] = [f"🚀 System allocated Thread Slot #{allocated_slot} for PID {current_pid}"]
params = VideoParams(
video_subject=video_script[:30].strip(),
video_script=video_script,
video_aspect=VideoAspect.portrait,
video_concat_mode=VideoConcatMode.random,
video_clip_duration=clip_duration,
voice_name=selected_voice,
voice_rate=voice_rate,
subtitle_enabled=enable_subtitles,
bgm_type="random" if enable_bgm else "",
n_threads=2
)
try:
logger.info(f"[{task_id}] Starting Video Pipeline workflow...")
GLOBAL_TERMINAL_LOGS[task_id].append("⚡ Fetching assets from Pexels API and preparing TTS narrative structure...")
result = tm.start(task_id=task_id, params=params)
if result and "videos" in result and len(result["videos"]) > 0:
video_path = result["videos"][0]
if os.path.exists(video_path):
filename = os.path.basename(video_path)
if not is_vip_key:
GLOBAL_TERMINAL_LOGS[task_id].append("⚠️ Free tier session detected: Injecting mobile fluid watermark layer...")
wm_filename = f"wm_{filename}"
watermarked_path = os.path.join(OUTPUT_DIR, wm_filename)
drawtext_filter = "drawtext=text='Hugging/AiVideoEngine':x='mod(t*35,w)':y='mod(t*15,h)':fontsize=24:fontcolor=white@0.35"
ffmpeg_cmd = [
'ffmpeg', '-y',
'-i', video_path,
'-vf', drawtext_filter,
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
'-c:a', 'copy',
watermarked_path
]
subprocess.run(ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if os.path.exists(watermarked_path):
filename = wm_filename
licensing_client.commit_generation_success(token, device_id, is_vip_key)
GLOBAL_TERMINAL_LOGS[task_id].append("✅ Rendering success. Exporting video stream to player node.")
return {
"status": "success",
"task_id": task_id,
"video_url": f"/static_videos/{filename}"
}
return JSONResponse(status_code=500, content={"status": "error", "task_id": task_id, "msg": "Render completed but output file not found."})
except Exception as e:
logger.error(f"Execution Error: {str(e)}")
return JSONResponse(status_code=500, content={"status": "error", "task_id": task_id, "msg": f"Render Error: {str(e)}"})
finally:
licensing_client.release_thread_slot(allocated_slot)
background_tasks.add_task(lambda: [time.sleep(30), GLOBAL_TERMINAL_LOGS.pop(task_id, None)])
# --- 4. GIAO DIỆN CHÍNH ---
@app.get("/", response_class=HTMLResponse)
async def index_page():
voices_options = "".join([f'' for v in get_voice_list()])
html_content = f"""
AI VIDEO ENGINE | Control Center
Commercial Video System
AI VIDEO ENGINE
v3.1.0 Production Workshop
🌟 Automated Production Suite: An instant AI video factory engineered tailored for Shorts, TikTok, and Reels. It automatically aggregates premium background clips from Pexels, structures high-fidelity AI narrative voices, blends automated audio beds, and burns fully aligned dynamic subtitles—all rendered seamlessly with just 1-click.
💡 Our Mission: We are committed to engineering a highly accessible, low-cost automated content rendering architecture to help creators maximize efficiency while cutting software expenses down to zero.
⚠️ Hosting Environment Notice: This platform is deployed on a Free Tier Shared Server (2 vCPU, 16GB RAM, Shared Core Containers) on Hugging Face. Due to unallocated shared hardware limits, video generation cycles can be quite lengthy. To actively protect our instance against memory crashes or sudden cluster restarts, strict thread concurrency thresholds have been applied. We sincerely apologize for any processing delays and thank you for your understanding.
☕ Empower the Infrastructure: To expand our server performance capacities, maintain uninterrupted deployment pipelines, and invest in our upcoming formal platform architectures, your contributions are crucial! Our commercial premium license matrix remains incredibly cost-effective, with the highest tier priced under the cost of two standard cups of coffee. Secure your high-speed access keys here:
Free tier users: 3 video creations/day (1 video per batch, 3-hour cooldown) with mobile fluid watermark. For comprehensive service tier details and premium pricing models, please click our licensing gateway.
Purchase Premium Keys
🔐 System Authentication
Validate credentials to remove output limitations and stream watermarks
Status: Unverified (Free Tier System Active)
Admin Diagnostic Interface
Waiting for testing process pipeline...
⚙️ Video Configuration
Tune rendering parameters and narration variables
🎬 Studio Result
Realtime output file delivery node
Pipeline Rendering Operational...
Your video is being generated. This process takes a minimum of 5 minutes, please be patient. You can monitor the live rendering logs directly inside your Hugging Face space log console.