import os import re import uuid import unicodedata from dotenv import load_dotenv load_dotenv(override=True) from fastapi import FastAPI, UploadFile, File, Form, Request from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel from supabase import create_client, Client from worker import render_video_task, generate_tts_task, render_studio_task # Setup Supabase SUPABASE_URL = os.getenv("SUPABASE_URL", "https://your-project.supabase.co") SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY", "your-service-key") supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) app = FastAPI(title="VieNeu Video AI processing API") # Mount thư mục tĩnh và giao diện HTML app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") def slugify(text: str) -> str: # Chuyển tiếng Việt có dấu thành không dấu text = unicodedata.normalize('NFD', text).encode('ascii', 'ignore').decode("utf-8") # Xóa ký tự đặc biệt, chuyển sang lowercase, thay khoảng trắng bằng gạch dưới text = re.sub(r'[^\w\s-]', '', text).strip().lower() text = re.sub(r'[-\s]+', '_', text) return text[:40] class RenderJobRequest(BaseModel): script_text: str voice_preset_id: str = "default" @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): return templates.TemplateResponse(request=request, name="index.html") @app.get("/tts", response_class=HTMLResponse) async def read_tts(request: Request): return templates.TemplateResponse(request=request, name="tts.html") @app.get("/studio", response_class=HTMLResponse) async def read_studio(request: Request): return templates.TemplateResponse(request=request, name="studio.html") @app.post("/api/v1/jobs/submit") async def submit_job( script: str = Form(...), ref_audio: UploadFile = File(None), aspect_ratio: str = Form("9:16"), sub_style: str = Form("karaoke"), font_name: str = Form("Arial"), highlight_color: str = Form("#00FDFF") ): """ Receives frontend parameters, tracks them, and dispatches a Celery task. """ ref_audio_path = None if ref_audio: ref_audio_bytes = await ref_audio.read() ref_audio_path = f"references/{ref_audio.filename}" supabase.storage.from_("content").upload(path=ref_audio_path, file=ref_audio_bytes) # 2. Create DB record to track the job db_resp = supabase.table("video_jobs").insert({ "status": "pending", "script": script, "raw_video_path": "green_screen" }).execute() job_id = db_resp.data[0]["id"] if db_resp.data else "unknown" # 3. Dispatch to Celery queue render_video_task.delay(job_id, script, ref_audio_path, aspect_ratio, sub_style, font_name, highlight_color) return {"job_id": job_id, "status": "processing_queued"} @app.post("/api/v1/tts/generate") async def submit_tts_job( script: str = Form(...), temperature: float = Form(0.5), voice_preset: str = Form("default"), ref_audio: UploadFile = File(None), existing_ref_path: str = Form(None), bgm_audio: UploadFile = File(None), bgm_volume: float = Form(0.1), bgm_preset: str = Form(None) ): """ Submits a pure Text-To-Speech task to Celery. """ ref_audio_path = existing_ref_path if ref_audio: ref_audio_bytes = await ref_audio.read() # Clean the filename and add a unique ID to prevent conflicts/accents issues clean_name = slugify(ref_audio.filename.rsplit('.', 1)[0]) safe_filename = f"{clean_name}_{uuid.uuid4().hex[:8]}.wav" ref_audio_path = f"references/{safe_filename}" supabase.storage.from_("content").upload( path=ref_audio_path, file=ref_audio_bytes, file_options={"content-type": "audio/wav"} ) # Note: Using generic "video_jobs" table to track TTS jobs as well to save setup time. db_resp = supabase.table("video_jobs").insert({ "status": "pending", "script": script, "raw_video_path": ref_audio_path if ref_audio_path else "audio_only" }).execute() job_id = db_resp.data[0]["id"] if db_resp.data else "unknown" bgm_path = None if bgm_audio: bgm_bytes = await bgm_audio.read() bgm_filename = f"bgm/{slugify(bgm_audio.filename.rsplit('.', 1)[0])}_{uuid.uuid4().hex[:8]}.mp3" supabase.storage.from_("content").upload(path=bgm_filename, file=bgm_bytes) bgm_path = bgm_filename elif bgm_preset: bgm_path = bgm_preset generate_tts_task.delay(job_id, script, voice_preset, temperature, ref_audio_path, bgm_path, bgm_volume) return {"job_id": job_id, "status": "processing_queued"} @app.get("/api/v1/jobs/{job_id}") async def get_job_status(job_id: str): response = supabase.table("video_jobs").select("*").eq("id", job_id).execute() if not response.data: return {"error": "Job not found"} data = response.data[0] # Fetch progress from Redis try: import redis redis_client = redis.from_url(os.getenv("REDIS_URL", "redis://localhost:6379/0")) progress_data = redis_client.get(f"progress_{job_id}") if progress_data: progress_str = progress_data.decode("utf-8") parts = progress_str.split("|") if len(parts) == 2: data["progress"] = { "elapsed": parts[0], "remaining": parts[1] } except Exception as e: print(f"Redis error: {e}") return data @app.post("/api/v1/studio/generate") async def submit_studio_job( script: str = Form(...), temperature: float = Form(0.5), voice_preset: str = Form("default"), bgm_audio: UploadFile = File(None), bgm_volume: float = Form(0.1), bgm_preset: str = Form(None) ): """ Submits a Studio MP4 rendering task to Celery. """ db_resp = supabase.table("video_jobs").insert({ "status": "pending", "script": script, "raw_video_path": "studio_render" }).execute() job_id = db_resp.data[0]["id"] if db_resp.data else "unknown" bgm_path = None if bgm_audio: bgm_bytes = await bgm_audio.read() bgm_filename = f"bgm/{slugify(bgm_audio.filename.rsplit('.', 1)[0])}_{uuid.uuid4().hex[:8]}.mp3" supabase.storage.from_("content").upload(path=bgm_filename, file=bgm_bytes) bgm_path = bgm_filename elif bgm_preset: bgm_path = bgm_preset render_studio_task.delay(job_id, script, temperature, voice_preset, bgm_path, bgm_volume) return {"job_id": job_id, "status": "processing_queued"}