UI-VieNeu / main.py
HuuDatLego's picture
Upload folder using huggingface_hub
911c66e verified
import os
import re
import uuid
import unicodedata
from dotenv import load_dotenv
load_dotenv(override=True)
from fastapi import FastAPI, UploadFile, File, Form, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
from supabase import create_client, Client
from worker import render_video_task, generate_tts_task, render_studio_task
# Setup Supabase
SUPABASE_URL = os.getenv("SUPABASE_URL", "https://your-project.supabase.co")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY", "your-service-key")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
app = FastAPI(title="VieNeu Video AI processing API")
# Mount thư mục tĩnh và giao diện HTML
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
def slugify(text: str) -> str:
# Chuyển tiếng Việt có dấu thành không dấu
text = unicodedata.normalize('NFD', text).encode('ascii', 'ignore').decode("utf-8")
# Xóa ký tự đặc biệt, chuyển sang lowercase, thay khoảng trắng bằng gạch dưới
text = re.sub(r'[^\w\s-]', '', text).strip().lower()
text = re.sub(r'[-\s]+', '_', text)
return text[:40]
class RenderJobRequest(BaseModel):
script_text: str
voice_preset_id: str = "default"
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse(request=request, name="index.html")
@app.get("/tts", response_class=HTMLResponse)
async def read_tts(request: Request):
return templates.TemplateResponse(request=request, name="tts.html")
@app.get("/studio", response_class=HTMLResponse)
async def read_studio(request: Request):
return templates.TemplateResponse(request=request, name="studio.html")
@app.post("/api/v1/jobs/submit")
async def submit_job(
script: str = Form(...),
ref_audio: UploadFile = File(None),
aspect_ratio: str = Form("9:16"),
sub_style: str = Form("karaoke"),
font_name: str = Form("Arial"),
highlight_color: str = Form("#00FDFF")
):
"""
Receives frontend parameters, tracks them, and dispatches a Celery task.
"""
ref_audio_path = None
if ref_audio:
ref_audio_bytes = await ref_audio.read()
ref_audio_path = f"references/{ref_audio.filename}"
supabase.storage.from_("content").upload(path=ref_audio_path, file=ref_audio_bytes)
# 2. Create DB record to track the job
db_resp = supabase.table("video_jobs").insert({
"status": "pending",
"script": script,
"raw_video_path": "green_screen"
}).execute()
job_id = db_resp.data[0]["id"] if db_resp.data else "unknown"
# 3. Dispatch to Celery queue
render_video_task.delay(job_id, script, ref_audio_path, aspect_ratio, sub_style, font_name, highlight_color)
return {"job_id": job_id, "status": "processing_queued"}
@app.post("/api/v1/tts/generate")
async def submit_tts_job(
script: str = Form(...),
temperature: float = Form(0.5),
voice_preset: str = Form("default"),
ref_audio: UploadFile = File(None),
existing_ref_path: str = Form(None),
bgm_audio: UploadFile = File(None),
bgm_volume: float = Form(0.1),
bgm_preset: str = Form(None)
):
"""
Submits a pure Text-To-Speech task to Celery.
"""
ref_audio_path = existing_ref_path
if ref_audio:
ref_audio_bytes = await ref_audio.read()
# Clean the filename and add a unique ID to prevent conflicts/accents issues
clean_name = slugify(ref_audio.filename.rsplit('.', 1)[0])
safe_filename = f"{clean_name}_{uuid.uuid4().hex[:8]}.wav"
ref_audio_path = f"references/{safe_filename}"
supabase.storage.from_("content").upload(
path=ref_audio_path,
file=ref_audio_bytes,
file_options={"content-type": "audio/wav"}
)
# Note: Using generic "video_jobs" table to track TTS jobs as well to save setup time.
db_resp = supabase.table("video_jobs").insert({
"status": "pending",
"script": script,
"raw_video_path": ref_audio_path if ref_audio_path else "audio_only"
}).execute()
job_id = db_resp.data[0]["id"] if db_resp.data else "unknown"
bgm_path = None
if bgm_audio:
bgm_bytes = await bgm_audio.read()
bgm_filename = f"bgm/{slugify(bgm_audio.filename.rsplit('.', 1)[0])}_{uuid.uuid4().hex[:8]}.mp3"
supabase.storage.from_("content").upload(path=bgm_filename, file=bgm_bytes)
bgm_path = bgm_filename
elif bgm_preset:
bgm_path = bgm_preset
generate_tts_task.delay(job_id, script, voice_preset, temperature, ref_audio_path, bgm_path, bgm_volume)
return {"job_id": job_id, "status": "processing_queued"}
@app.get("/api/v1/jobs/{job_id}")
async def get_job_status(job_id: str):
response = supabase.table("video_jobs").select("*").eq("id", job_id).execute()
if not response.data:
return {"error": "Job not found"}
data = response.data[0]
# Fetch progress from Redis
try:
import redis
redis_client = redis.from_url(os.getenv("REDIS_URL", "redis://localhost:6379/0"))
progress_data = redis_client.get(f"progress_{job_id}")
if progress_data:
progress_str = progress_data.decode("utf-8")
parts = progress_str.split("|")
if len(parts) == 2:
data["progress"] = {
"elapsed": parts[0],
"remaining": parts[1]
}
except Exception as e:
print(f"Redis error: {e}")
return data
@app.post("/api/v1/studio/generate")
async def submit_studio_job(
script: str = Form(...),
temperature: float = Form(0.5),
voice_preset: str = Form("default"),
bgm_audio: UploadFile = File(None),
bgm_volume: float = Form(0.1),
bgm_preset: str = Form(None)
):
"""
Submits a Studio MP4 rendering task to Celery.
"""
db_resp = supabase.table("video_jobs").insert({
"status": "pending",
"script": script,
"raw_video_path": "studio_render"
}).execute()
job_id = db_resp.data[0]["id"] if db_resp.data else "unknown"
bgm_path = None
if bgm_audio:
bgm_bytes = await bgm_audio.read()
bgm_filename = f"bgm/{slugify(bgm_audio.filename.rsplit('.', 1)[0])}_{uuid.uuid4().hex[:8]}.mp3"
supabase.storage.from_("content").upload(path=bgm_filename, file=bgm_bytes)
bgm_path = bgm_filename
elif bgm_preset:
bgm_path = bgm_preset
render_studio_task.delay(job_id, script, temperature, voice_preset, bgm_path, bgm_volume)
return {"job_id": job_id, "status": "processing_queued"}