| import json |
| import os |
| import shutil |
| import subprocess |
| import threading |
| import uuid |
| from datetime import datetime, timedelta |
| from pathlib import Path |
| from typing import List, Optional |
|
|
| from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import FileResponse, HTMLResponse, JSONResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.templating import Jinja2Templates |
| from faster_whisper import WhisperModel |
| from pydantic import BaseModel, Field |
|
|
|
|
| APP_DIR = Path(__file__).resolve().parent |
| WORK_DIR = APP_DIR / "workspace" |
| TEMPLATES_DIR = APP_DIR / "templates" |
| STATIC_DIR = APP_DIR / "static" |
| FONTS_DIR = APP_DIR / "fonts" |
| WORK_DIR.mkdir(parents=True, exist_ok=True) |
| FONTS_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
| app = FastAPI(title="Viet AutoSub Editor") |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
| templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) |
|
|
|
|
| MODEL_LOCK = threading.Lock() |
| MODEL_CACHE = {} |
| DEFAULT_MODEL_SIZE = os.getenv("WHISPER_MODEL_SIZE", "small") |
| MAX_UPLOAD_MB = int(os.getenv("MAX_UPLOAD_MB", "250")) |
| KEEP_HOURS = int(os.getenv("KEEP_HOURS", "24")) |
| FFMPEG_TIMEOUT = int(os.getenv("FFMPEG_TIMEOUT", "600")) |
|
|
| |
| |
| |
|
|
| GOOGLE_FONT_URLS = { |
| "Bangers": "https://fonts.google.com/download?family=Bangers", |
| "Bebas Neue": "https://fonts.google.com/download?family=Bebas+Neue", |
| "Lobster": "https://fonts.google.com/download?family=Lobster", |
| "Permanent Marker": "https://fonts.google.com/download?family=Permanent+Marker", |
| "Pacifico": "https://fonts.google.com/download?family=Pacifico", |
| "Dancing Script": "https://fonts.google.com/download?family=Dancing+Script", |
| "Playfair Display": "https://fonts.google.com/download?family=Playfair+Display", |
| } |
|
|
| |
| FONT_FILE_MAP = { |
| "Bangers": "Bangers-Regular.ttf", |
| "Bebas Neue": "BebasNeue-Regular.ttf", |
| "Lobster": "Lobster-Regular.ttf", |
| "Permanent Marker": "PermanentMarker-Regular.ttf", |
| "Pacifico": "Pacifico-Regular.ttf", |
| "Dancing Script": "DancingScript-Regular.ttf", |
| "Playfair Display": "PlayfairDisplay-Regular.ttf", |
| } |
|
|
|
|
| def ensure_font_available(font_name: str) -> str: |
| """ |
| Đảm bảo font có sẵn cho FFmpeg. |
| Trả về tên font mà FFmpeg sẽ dùng. |
| Nếu không tải được, fallback về DejaVu Sans. |
| """ |
| if font_name == "DejaVu Sans" or font_name not in FONT_FILE_MAP: |
| return "DejaVu Sans" |
|
|
| ttf_name = FONT_FILE_MAP[font_name] |
| |
| user_fonts_dir = Path.home() / ".fonts" |
| user_fonts_dir.mkdir(parents=True, exist_ok=True) |
| target = user_fonts_dir / ttf_name |
|
|
| if target.exists(): |
| return font_name |
|
|
| |
| try: |
| import zipfile |
| import io |
| import urllib.request |
|
|
| url = GOOGLE_FONT_URLS.get(font_name) |
| if not url: |
| return "DejaVu Sans" |
|
|
| req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) |
| with urllib.request.urlopen(req, timeout=30) as resp: |
| data = resp.read() |
|
|
| with zipfile.ZipFile(io.BytesIO(data)) as zf: |
| |
| for name in zf.namelist(): |
| basename = Path(name).name |
| if basename.lower().endswith((".ttf", ".otf")): |
| extracted = zf.read(name) |
| dest = user_fonts_dir / basename |
| dest.write_bytes(extracted) |
|
|
| |
| subprocess.run(["fc-cache", "-f", str(user_fonts_dir)], |
| capture_output=True, timeout=30) |
|
|
| if target.exists() or any(user_fonts_dir.glob(f"*{font_name.replace(' ', '')}*")): |
| return font_name |
| except Exception as e: |
| print(f"[FONT] Không tải được font '{font_name}': {e}") |
|
|
| return "DejaVu Sans" |
|
|
|
|
| class SegmentIn(BaseModel): |
| id: int |
| start: str |
| end: str |
| text: str = Field(default="") |
|
|
|
|
| class SubtitleStyle(BaseModel): |
| font_name: str = "DejaVu Sans" |
| font_color: str = "#FFFFFF" |
| highlight_color: str = "#FFD700" |
| outline_color: str = "#000000" |
| outline_width: int = 2 |
| font_size_pct: int = 100 |
| position_pct: int = 90 |
| karaoke_mode: bool = False |
|
|
|
|
| class ExportRequest(BaseModel): |
| job_id: str |
| segments: List[SegmentIn] |
| burn_in: bool = True |
| style: Optional[SubtitleStyle] = None |
|
|
|
|
| class SegmentOut(BaseModel): |
| id: int |
| start: float |
| end: float |
| text: str |
|
|
|
|
|
|
| def cleanup_old_jobs() -> None: |
| cutoff = datetime.utcnow() - timedelta(hours=KEEP_HOURS) |
| for folder in WORK_DIR.iterdir(): |
| if not folder.is_dir(): |
| continue |
| try: |
| modified = datetime.utcfromtimestamp(folder.stat().st_mtime) |
| if modified < cutoff: |
| shutil.rmtree(folder, ignore_errors=True) |
| except Exception: |
| continue |
|
|
|
|
|
|
| def get_model(model_size: str = DEFAULT_MODEL_SIZE) -> WhisperModel: |
| with MODEL_LOCK: |
| if model_size not in MODEL_CACHE: |
| MODEL_CACHE[model_size] = WhisperModel( |
| model_size, |
| device="cpu", |
| compute_type="int8", |
| ) |
| return MODEL_CACHE[model_size] |
|
|
|
|
|
|
| def ffmpeg_exists() -> bool: |
| return shutil.which("ffmpeg") is not None and shutil.which("ffprobe") is not None |
|
|
|
|
|
|
| def save_upload(upload: UploadFile, target_dir: Path) -> Path: |
| suffix = Path(upload.filename or "video.mp4").suffix or ".mp4" |
| video_path = target_dir / f"source{suffix}" |
| with video_path.open("wb") as f: |
| while True: |
| chunk = upload.file.read(1024 * 1024) |
| if not chunk: |
| break |
| f.write(chunk) |
| if f.tell() > MAX_UPLOAD_MB * 1024 * 1024: |
| raise HTTPException(status_code=413, detail=f"File quá lớn. Giới hạn {MAX_UPLOAD_MB} MB.") |
| return video_path |
|
|
|
|
|
|
| def run_ffprobe_duration(video_path: Path) -> Optional[float]: |
| try: |
| cmd = [ |
| "ffprobe", |
| "-v", |
| "error", |
| "-show_entries", |
| "format=duration", |
| "-of", |
| "default=noprint_wrappers=1:nokey=1", |
| str(video_path), |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
| return float(result.stdout.strip()) |
| except Exception: |
| return None |
|
|
|
|
| |
| |
| |
|
|
| def merge_segments_music(raw_segments: list, max_gap: float = 0.8, max_len: float = 8.0) -> list: |
| """ |
| Gộp các segment ngắn liên tiếp thành câu dài hơn, phù hợp lời bài hát. |
| - max_gap: khoảng trống tối đa giữa 2 segment để gộp (giây) |
| - max_len: độ dài tối đa 1 segment sau gộp (giây) |
| """ |
| if not raw_segments: |
| return [] |
|
|
| merged = [] |
| current = { |
| "start": raw_segments[0]["start"], |
| "end": raw_segments[0]["end"], |
| "text": raw_segments[0]["text"], |
| } |
|
|
| for seg in raw_segments[1:]: |
| gap = seg["start"] - current["end"] |
| new_duration = seg["end"] - current["start"] |
|
|
| |
| if gap <= max_gap and new_duration <= max_len: |
| current["end"] = seg["end"] |
| current["text"] = current["text"] + " " + seg["text"] |
| else: |
| merged.append(current) |
| current = { |
| "start": seg["start"], |
| "end": seg["end"], |
| "text": seg["text"], |
| } |
|
|
| merged.append(current) |
| return merged |
|
|
|
|
| def fill_timeline_gaps(segments: list, total_duration: Optional[float] = None, min_gap: float = 0.3) -> list: |
| """ |
| Lấp khoảng trống lớn giữa các segment. |
| Nếu khoảng trống > min_gap, điều chỉnh end/start của segment kề cho liền mạch. |
| Giúp subtitle phủ toàn bộ timeline video. |
| """ |
| if not segments: |
| return segments |
|
|
| result = [] |
| for i, seg in enumerate(segments): |
| s = dict(seg) |
|
|
| |
| if i > 0: |
| prev_end = result[-1]["end"] |
| gap = s["start"] - prev_end |
| if 0 < gap <= 1.5: |
| |
| s["start"] = prev_end |
| elif gap > 1.5: |
| |
| half = gap / 2 |
| result[-1]["end"] = prev_end + min(half, 0.5) |
| s["start"] = s["start"] - min(half, 0.5) |
|
|
| result.append(s) |
|
|
| |
| if total_duration and result: |
| last = result[-1] |
| remaining = total_duration - last["end"] |
| if 0 < remaining <= 2.0: |
| last["end"] = total_duration |
|
|
| return result |
|
|
|
|
| def transcribe_video_music(video_path: Path, duration: Optional[float] = None, |
| model_size: str = DEFAULT_MODEL_SIZE) -> List[SegmentOut]: |
| """ |
| Chế độ LỜI BÀI HÁT: tối ưu để nhận diện toàn bộ lyrics. |
| - Tắt VAD filter (không cắt đoạn nhạc nền) |
| - Tăng beam_size cho accuracy |
| - Bật word_timestamps cho khớp chính xác |
| - Gộp segment thông minh |
| - Lấp khoảng trống timeline |
| """ |
| model = get_model(model_size) |
|
|
| segments, info = model.transcribe( |
| str(video_path), |
| language="vi", |
| vad_filter=False, |
| beam_size=8, |
| best_of=5, |
| patience=1.5, |
| condition_on_previous_text=True, |
| word_timestamps=True, |
| no_speech_threshold=0.3, |
| log_prob_threshold=-1.5, |
| compression_ratio_threshold=2.8, |
| ) |
|
|
| raw: list = [] |
| for seg in segments: |
| text = (seg.text or "").strip() |
| if not text: |
| continue |
| raw.append({ |
| "start": float(seg.start), |
| "end": float(seg.end), |
| "text": text, |
| }) |
|
|
| if not raw: |
| raise HTTPException(status_code=400, detail="Không nhận diện được lời thoại/lời hát trong video.") |
|
|
| |
| merged = merge_segments_music(raw, max_gap=0.8, max_len=8.0) |
|
|
| |
| filled = fill_timeline_gaps(merged, total_duration=duration) |
|
|
| rows: List[SegmentOut] = [] |
| for idx, seg in enumerate(filled, start=1): |
| rows.append(SegmentOut( |
| id=idx, |
| start=seg["start"], |
| end=seg["end"], |
| text=seg["text"], |
| )) |
|
|
| return rows |
|
|
|
|
| def transcribe_video_speech(video_path: Path, model_size: str = DEFAULT_MODEL_SIZE) -> List[SegmentOut]: |
| """ |
| Chế độ GIỌNG NÓI: giữ nguyên logic cũ, tối ưu cho lời thoại/thuyết trình. |
| - Bật VAD filter (lọc tiếng ồn) |
| - beam_size vừa phải |
| """ |
| model = get_model(model_size) |
| segments, _info = model.transcribe( |
| str(video_path), |
| language="vi", |
| vad_filter=True, |
| beam_size=5, |
| condition_on_previous_text=True, |
| ) |
| rows: List[SegmentOut] = [] |
| for idx, seg in enumerate(segments, start=1): |
| text = (seg.text or "").strip() |
| if not text: |
| continue |
| rows.append( |
| SegmentOut( |
| id=idx, |
| start=float(seg.start), |
| end=float(seg.end), |
| text=text, |
| ) |
| ) |
| if not rows: |
| raise HTTPException(status_code=400, detail="Không nhận diện được lời thoại trong video.") |
| return rows |
|
|
|
|
|
|
| def format_srt_time(seconds: float) -> str: |
| total_ms = max(0, int(round(seconds * 1000))) |
| hours = total_ms // 3600000 |
| total_ms %= 3600000 |
| minutes = total_ms // 60000 |
| total_ms %= 60000 |
| secs = total_ms // 1000 |
| millis = total_ms % 1000 |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" |
|
|
|
|
|
|
| def parse_time_string(value: str) -> float: |
| value = value.strip() |
| if not value: |
| return 0.0 |
| value = value.replace(".", ",") |
| try: |
| hhmmss, ms = value.split(",") if "," in value else (value, "0") |
| parts = hhmmss.split(":") |
| if len(parts) == 2: |
| hours = 0 |
| minutes, secs = parts |
| elif len(parts) == 3: |
| hours, minutes, secs = parts |
| else: |
| raise ValueError |
| return int(hours) * 3600 + int(minutes) * 60 + int(secs) + int(ms.ljust(3, "0")[:3]) / 1000.0 |
| except Exception as exc: |
| raise HTTPException(status_code=400, detail=f"Sai định dạng thời gian: {value}") from exc |
|
|
|
|
|
|
| def write_srt(job_dir: Path, segments: List[SegmentIn]) -> Path: |
| srt_path = job_dir / "edited.srt" |
| lines: List[str] = [] |
| cleaned = sorted(segments, key=lambda s: parse_time_string(s.start)) |
| for idx, seg in enumerate(cleaned, start=1): |
| start_sec = parse_time_string(seg.start) |
| end_sec = parse_time_string(seg.end) |
| if end_sec <= start_sec: |
| end_sec = start_sec + 1.0 |
| text = (seg.text or "").strip() |
| if not text: |
| continue |
| lines.extend( |
| [ |
| str(idx), |
| f"{format_srt_time(start_sec)} --> {format_srt_time(end_sec)}", |
| text, |
| "", |
| ] |
| ) |
| if not lines: |
| raise HTTPException(status_code=400, detail="Không có subtitle hợp lệ để xuất SRT.") |
| srt_path.write_text("\n".join(lines), encoding="utf-8") |
| return srt_path |
|
|
|
|
|
|
| def hex_to_ass_color(hex_color: str) -> str: |
| """ |
| Chuyển đổi hex color (#RRGGBB) thành ASS color (&HBBGGRR&). |
| ASS dùng format BGR ngược lại. |
| """ |
| h = hex_color.lstrip("#") |
| if len(h) != 6: |
| h = "FFFFFF" |
| r, g, b = h[0:2], h[2:4], h[4:6] |
| return f"&H00{b.upper()}{g.upper()}{r.upper()}&" |
|
|
|
|
| def build_force_style(style: Optional["SubtitleStyle"] = None) -> str: |
| """ |
| Tạo chuỗi force_style cho FFmpeg subtitles filter dựa trên SubtitleStyle. |
| """ |
| if style is None: |
| return "FontName=DejaVu Sans,FontSize=20,Outline=1,Shadow=0,MarginV=18,Alignment=2" |
|
|
| |
| font_name = style.font_name or "DejaVu Sans" |
|
|
| |
| base_size = 20 |
| font_size = max(10, int(base_size * style.font_size_pct / 100)) |
|
|
| |
| primary_color = hex_to_ass_color(style.font_color) |
| outline_color = hex_to_ass_color(style.outline_color) |
|
|
| |
| outline = max(0, min(6, style.outline_width)) |
|
|
| |
| |
| |
| |
| |
| margin_v = max(5, min(280, int((100 - style.position_pct) * 3))) |
|
|
| |
| |
| alignment = 8 if style.position_pct < 40 else 2 |
|
|
| parts = [ |
| f"FontName={font_name}", |
| f"FontSize={font_size}", |
| f"PrimaryColour={primary_color}", |
| f"OutlineColour={outline_color}", |
| f"Outline={outline}", |
| f"Shadow=0", |
| f"MarginV={margin_v}", |
| f"Alignment={alignment}", |
| f"Bold=1", |
| ] |
| return ",".join(parts) |
|
|
|
|
| def write_ass_karaoke(job_dir: Path, segments: List["SegmentIn"], style: Optional["SubtitleStyle"] = None, resolved_font: Optional[str] = None) -> Path: |
| """ |
| Tạo file ASS với karaoke word-by-word highlight (\kf tags). |
| Mỗi từ được highlight lần lượt theo thời gian segment. |
| resolved_font: tên font thực tế đã được ensure_font_available() kiểm tra. |
| """ |
| ass_path = job_dir / "karaoke.ass" |
| s = style or SubtitleStyle() |
|
|
| |
| font_name = resolved_font or s.font_name or "DejaVu Sans" |
| base_size = 20 |
| font_size = max(10, int(base_size * s.font_size_pct / 100)) |
| primary_color = hex_to_ass_color(s.font_color) |
| highlight_color = hex_to_ass_color(s.highlight_color) |
| outline_color = hex_to_ass_color(s.outline_color) |
| outline = max(0, min(6, s.outline_width)) |
| margin_v = max(5, min(280, int((100 - s.position_pct) * 3))) |
| alignment = 8 if s.position_pct < 40 else 2 |
|
|
| header = f"""[Script Info] |
| Title: Viet AutoSub Karaoke |
| ScriptType: v4.00+ |
| PlayResX: 1280 |
| PlayResY: 720 |
| ScaledBorderAndShadow: yes |
| |
| [V4+ Styles] |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding |
| Style: Default,{font_name},{font_size},{primary_color},{highlight_color},{outline_color},&H80000000&,1,0,0,0,100,100,0,0,1,{outline},0,{alignment},20,20,{margin_v},1 |
| |
| [Events] |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text |
| """ |
| lines_out: List[str] = [header.strip()] |
|
|
| cleaned = sorted(segments, key=lambda seg: parse_time_string(seg.start)) |
| for seg in cleaned: |
| text = (seg.text or "").strip() |
| if not text: |
| continue |
|
|
| start_sec = parse_time_string(seg.start) |
| end_sec = parse_time_string(seg.end) |
| if end_sec <= start_sec: |
| end_sec = start_sec + 1.0 |
|
|
| |
| def sec_to_ass(seconds: float) -> str: |
| total_cs = max(0, int(round(seconds * 100))) |
| h = total_cs // 360000 |
| total_cs %= 360000 |
| m = total_cs // 6000 |
| total_cs %= 6000 |
| ss = total_cs // 100 |
| cs = total_cs % 100 |
| return f"{h}:{m:02d}:{ss:02d}.{cs:02d}" |
|
|
| ass_start = sec_to_ass(start_sec) |
| ass_end = sec_to_ass(end_sec) |
|
|
| |
| words = text.split() |
| if not words: |
| continue |
|
|
| duration_cs = max(1, int(round((end_sec - start_sec) * 100))) |
| per_word_cs = max(1, duration_cs // len(words)) |
|
|
| |
| |
| karaoke_parts = [] |
| for word in words: |
| karaoke_parts.append(f"{{\\kf{per_word_cs}}}{word}") |
|
|
| karaoke_text = " ".join(karaoke_parts) |
| |
| |
| color_override = f"{{\\1c{highlight_color}}}" |
| line = f"Dialogue: 0,{ass_start},{ass_end},Default,,0,0,0,,{color_override}{karaoke_text}" |
| lines_out.append(line) |
|
|
| ass_path.write_text("\n".join(lines_out), encoding="utf-8") |
| return ass_path |
|
|
|
|
| def burn_subtitles(job_dir: Path, video_path: Path, srt_path: Path, |
| segments: Optional[List["SegmentIn"]] = None, |
| style: Optional["SubtitleStyle"] = None) -> Path: |
| """ |
| Burn subtitle vào video bằng FFmpeg. |
| - Nếu karaoke_mode: tạo file ASS với \kf tags từ segments, rồi dùng ass= filter |
| - Nếu không: dùng subtitles= filter với SRT + force_style |
| """ |
| output_path = job_dir / "output_subtitled.mp4" |
|
|
| |
| actual_font = "DejaVu Sans" |
| if style and style.font_name: |
| actual_font = ensure_font_available(style.font_name) |
| if actual_font != style.font_name: |
| print(f"[FONT] Fallback: '{style.font_name}' → '{actual_font}'") |
|
|
| |
| if style and style.karaoke_mode and segments: |
| |
| write_ass_karaoke(job_dir, segments, style, resolved_font=actual_font) |
| |
| ass_abs = str((job_dir / "karaoke.ass").resolve()).replace("\\", "/").replace(":", r"\\:") |
| subtitle_filter = f"ass='{ass_abs}'" |
| else: |
| |
| effective_style = style |
| if effective_style and effective_style.font_name != actual_font: |
| effective_style = effective_style.model_copy() |
| effective_style.font_name = actual_font |
|
|
| force_style = build_force_style(effective_style) |
| |
| srt_abs = str(srt_path.resolve()).replace("\\", "/").replace(":", r"\\:") |
| subtitle_filter = f"subtitles='{srt_abs}':force_style='{force_style}'" |
|
|
| cmd = [ |
| "ffmpeg", |
| "-y", |
| "-i", |
| str(video_path.resolve()), |
| "-vf", |
| subtitle_filter, |
| "-c:v", |
| "libx264", |
| "-preset", |
| "veryfast", |
| "-crf", |
| "23", |
| "-c:a", |
| "aac", |
| "-b:a", |
| "192k", |
| "-movflags", |
| "+faststart", |
| str(output_path.resolve()), |
| ] |
| try: |
| result = subprocess.run( |
| cmd, |
| cwd=str(job_dir), |
| capture_output=True, |
| text=True, |
| check=True, |
| timeout=FFMPEG_TIMEOUT, |
| ) |
| except subprocess.TimeoutExpired: |
| raise HTTPException( |
| status_code=500, |
| detail=f"FFmpeg quá thời gian ({FFMPEG_TIMEOUT}s). Video có thể quá lớn." |
| ) |
| except subprocess.CalledProcessError as exc: |
| stderr = (exc.stderr or "").strip() |
| |
| print(f"[FFMPEG ERROR] cmd: {' '.join(cmd)}") |
| print(f"[FFMPEG STDERR] {stderr}") |
| raise HTTPException( |
| status_code=500, |
| detail=f"FFmpeg lỗi khi xuất MP4: {stderr[:1200]}" |
| ) from exc |
|
|
| if not output_path.exists() or output_path.stat().st_size < 1000: |
| raise HTTPException( |
| status_code=500, |
| detail="FFmpeg chạy xong nhưng file MP4 bị lỗi hoặc trống." |
| ) |
|
|
| return output_path |
|
|
|
|
|
|
| def job_meta_path(job_dir: Path) -> Path: |
| return job_dir / "meta.json" |
|
|
|
|
|
|
| def save_job_meta(job_dir: Path, data: dict) -> None: |
| job_meta_path(job_dir).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") |
|
|
|
|
|
|
| def load_job_meta(job_id: str) -> dict: |
| meta = job_meta_path(WORK_DIR / job_id) |
| if not meta.exists(): |
| raise HTTPException(status_code=404, detail="Không tìm thấy job.") |
| return json.loads(meta.read_text(encoding="utf-8")) |
|
|
|
|
| @app.get("/", response_class=HTMLResponse) |
| def home(request: Request): |
| return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
|
| @app.get("/health") |
| def health(): |
| return { |
| "ok": True, |
| "ffmpeg": ffmpeg_exists(), |
| "workspace": str(WORK_DIR), |
| "default_model": DEFAULT_MODEL_SIZE, |
| } |
|
|
|
|
| @app.post("/api/transcribe") |
| def api_transcribe( |
| file: UploadFile = File(...), |
| mode: str = Form(default="music"), |
| ): |
| """ |
| mode: "music" (lời bài hát) hoặc "speech" (giọng nói/thuyết trình) |
| """ |
| cleanup_old_jobs() |
| if not ffmpeg_exists(): |
| raise HTTPException(status_code=500, detail="Máy chủ chưa có FFmpeg.") |
|
|
| filename = file.filename or "video.mp4" |
| if not filename.lower().endswith((".mp4", ".mov", ".mkv", ".avi", ".webm", ".m4v")): |
| raise HTTPException(status_code=400, detail="Chỉ hỗ trợ video mp4, mov, mkv, avi, webm, m4v.") |
|
|
| if mode not in ("music", "speech"): |
| mode = "music" |
|
|
| job_id = uuid.uuid4().hex |
| job_dir = WORK_DIR / job_id |
| job_dir.mkdir(parents=True, exist_ok=True) |
| try: |
| video_path = save_upload(file, job_dir) |
| duration = run_ffprobe_duration(video_path) |
|
|
| if mode == "music": |
| segments = transcribe_video_music(video_path, duration=duration) |
| else: |
| segments = transcribe_video_speech(video_path) |
|
|
| |
| total_sub_time = sum(s.end - s.start for s in segments) |
| coverage_pct = round((total_sub_time / duration * 100), 1) if duration and duration > 0 else 0 |
|
|
| save_job_meta( |
| job_dir, |
| { |
| "job_id": job_id, |
| "video_path": video_path.name, |
| "duration": duration, |
| "mode": mode, |
| "created_at": datetime.utcnow().isoformat() + "Z", |
| }, |
| ) |
| return JSONResponse( |
| { |
| "job_id": job_id, |
| "duration": duration, |
| "mode": mode, |
| "coverage_pct": coverage_pct, |
| "segments": [ |
| { |
| "id": seg.id, |
| "start": format_srt_time(seg.start), |
| "end": format_srt_time(seg.end), |
| "text": seg.text, |
| } |
| for seg in segments |
| ], |
| } |
| ) |
| except Exception: |
| shutil.rmtree(job_dir, ignore_errors=True) |
| raise |
|
|
|
|
| @app.post("/api/export") |
| def api_export(payload: ExportRequest): |
| job_dir = WORK_DIR / payload.job_id |
| if not job_dir.exists(): |
| raise HTTPException(status_code=404, detail="Job đã hết hạn hoặc không tồn tại.") |
|
|
| meta = load_job_meta(payload.job_id) |
| video_path = job_dir / meta["video_path"] |
| if not video_path.exists(): |
| raise HTTPException(status_code=404, detail="Không tìm thấy video gốc để xuất lại.") |
|
|
| srt_path = write_srt(job_dir, payload.segments) |
| response = { |
| "job_id": payload.job_id, |
| "srt_url": f"/download/{payload.job_id}/srt", |
| "mp4_url": None, |
| } |
|
|
| if payload.burn_in: |
| |
| mp4_path = burn_subtitles(job_dir, video_path, srt_path, |
| segments=payload.segments, style=payload.style) |
| response["mp4_url"] = f"/download/{payload.job_id}/mp4" |
| response["mp4_size_mb"] = round(mp4_path.stat().st_size / (1024 * 1024), 2) |
|
|
| return JSONResponse(response) |
|
|
|
|
| @app.get("/download/{job_id}/srt") |
| def download_srt(job_id: str): |
| path = WORK_DIR / job_id / "edited.srt" |
| if not path.exists(): |
| raise HTTPException(status_code=404, detail="Chưa có file SRT.") |
| return FileResponse(path, media_type="application/x-subrip", filename=f"{job_id}.srt") |
|
|
|
|
| @app.get("/download/{job_id}/mp4") |
| def download_mp4(job_id: str): |
| path = WORK_DIR / job_id / "output_subtitled.mp4" |
| if not path.exists(): |
| raise HTTPException(status_code=404, detail="Chưa có file MP4.") |
| return FileResponse(path, media_type="video/mp4", filename=f"{job_id}.mp4") |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
|
|
| port = int(os.getenv("PORT", "7860")) |
| uvicorn.run("app:app", host="0.0.0.0", port=port, reload=False) |
|
|