Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import threading | |
| import uuid | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import List, Optional | |
| from fastapi import FastAPI, File, HTTPException, Request, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, HTMLResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from faster_whisper import WhisperModel | |
| from pydantic import BaseModel, Field | |
| APP_DIR = Path(__file__).resolve().parent | |
| WORK_DIR = APP_DIR / "workspace" | |
| TEMPLATES_DIR = APP_DIR / "templates" | |
| STATIC_DIR = APP_DIR / "static" | |
| WORK_DIR.mkdir(parents=True, exist_ok=True) | |
| app = FastAPI(title="Viet AutoSub Editor") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") | |
| templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) | |
| MODEL_LOCK = threading.Lock() | |
| MODEL_CACHE = {} | |
| DEFAULT_MODEL_SIZE = os.getenv("WHISPER_MODEL_SIZE", "small") | |
| MAX_UPLOAD_MB = int(os.getenv("MAX_UPLOAD_MB", "250")) | |
| KEEP_HOURS = int(os.getenv("KEEP_HOURS", "24")) | |
| class SegmentIn(BaseModel): | |
| id: int | |
| start: str | |
| end: str | |
| text: str = Field(default="") | |
| class ExportRequest(BaseModel): | |
| job_id: str | |
| segments: List[SegmentIn] | |
| burn_in: bool = True | |
| class SegmentOut(BaseModel): | |
| id: int | |
| start: float | |
| end: float | |
| text: str | |
| def cleanup_old_jobs() -> None: | |
| cutoff = datetime.utcnow() - timedelta(hours=KEEP_HOURS) | |
| for folder in WORK_DIR.iterdir(): | |
| if not folder.is_dir(): | |
| continue | |
| try: | |
| modified = datetime.utcfromtimestamp(folder.stat().st_mtime) | |
| if modified < cutoff: | |
| shutil.rmtree(folder, ignore_errors=True) | |
| except Exception: | |
| continue | |
| def get_model(model_size: str = DEFAULT_MODEL_SIZE) -> WhisperModel: | |
| with MODEL_LOCK: | |
| if model_size not in MODEL_CACHE: | |
| MODEL_CACHE[model_size] = WhisperModel( | |
| model_size, | |
| device="cpu", | |
| compute_type="int8", | |
| ) | |
| return MODEL_CACHE[model_size] | |
| def ffmpeg_exists() -> bool: | |
| return shutil.which("ffmpeg") is not None and shutil.which("ffprobe") is not None | |
| def save_upload(upload: UploadFile, target_dir: Path) -> Path: | |
| suffix = Path(upload.filename or "video.mp4").suffix or ".mp4" | |
| video_path = target_dir / f"source{suffix}" | |
| with video_path.open("wb") as f: | |
| while True: | |
| chunk = upload.file.read(1024 * 1024) | |
| if not chunk: | |
| break | |
| f.write(chunk) | |
| if f.tell() > MAX_UPLOAD_MB * 1024 * 1024: | |
| raise HTTPException(status_code=413, detail=f"File quá lớn. Giới hạn {MAX_UPLOAD_MB} MB.") | |
| return video_path | |
| def run_ffprobe_duration(video_path: Path) -> Optional[float]: | |
| try: | |
| cmd = [ | |
| "ffprobe", | |
| "-v", | |
| "error", | |
| "-show_entries", | |
| "format=duration", | |
| "-of", | |
| "default=noprint_wrappers=1:nokey=1", | |
| str(video_path), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return float(result.stdout.strip()) | |
| except Exception: | |
| return None | |
| def transcribe_video(video_path: Path, model_size: str = DEFAULT_MODEL_SIZE) -> List[SegmentOut]: | |
| model = get_model(model_size) | |
| segments, _info = model.transcribe( | |
| str(video_path), | |
| language="vi", | |
| vad_filter=True, | |
| beam_size=5, | |
| condition_on_previous_text=True, | |
| ) | |
| rows: List[SegmentOut] = [] | |
| for idx, seg in enumerate(segments, start=1): | |
| text = (seg.text or "").strip() | |
| if not text: | |
| continue | |
| rows.append( | |
| SegmentOut( | |
| id=idx, | |
| start=float(seg.start), | |
| end=float(seg.end), | |
| text=text, | |
| ) | |
| ) | |
| if not rows: | |
| raise HTTPException(status_code=400, detail="Không nhận diện được lời thoại trong video.") | |
| return rows | |
| def format_srt_time(seconds: float) -> str: | |
| total_ms = max(0, int(round(seconds * 1000))) | |
| hours = total_ms // 3600000 | |
| total_ms %= 3600000 | |
| minutes = total_ms // 60000 | |
| total_ms %= 60000 | |
| secs = total_ms // 1000 | |
| millis = total_ms % 1000 | |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" | |
| def parse_time_string(value: str) -> float: | |
| value = value.strip() | |
| if not value: | |
| return 0.0 | |
| value = value.replace(".", ",") | |
| try: | |
| hhmmss, ms = value.split(",") if "," in value else (value, "0") | |
| parts = hhmmss.split(":") | |
| if len(parts) == 2: | |
| hours = 0 | |
| minutes, secs = parts | |
| elif len(parts) == 3: | |
| hours, minutes, secs = parts | |
| else: | |
| raise ValueError | |
| return int(hours) * 3600 + int(minutes) * 60 + int(secs) + int(ms.ljust(3, "0")[:3]) / 1000.0 | |
| except Exception as exc: | |
| raise HTTPException(status_code=400, detail=f"Sai định dạng thời gian: {value}") from exc | |
| def write_srt(job_dir: Path, segments: List[SegmentIn]) -> Path: | |
| srt_path = job_dir / "edited.srt" | |
| lines: List[str] = [] | |
| cleaned = sorted(segments, key=lambda s: parse_time_string(s.start)) | |
| for idx, seg in enumerate(cleaned, start=1): | |
| start_sec = parse_time_string(seg.start) | |
| end_sec = parse_time_string(seg.end) | |
| if end_sec <= start_sec: | |
| end_sec = start_sec + 1.0 | |
| text = (seg.text or "").strip() | |
| if not text: | |
| continue | |
| lines.extend( | |
| [ | |
| str(idx), | |
| f"{format_srt_time(start_sec)} --> {format_srt_time(end_sec)}", | |
| text, | |
| "", | |
| ] | |
| ) | |
| if not lines: | |
| raise HTTPException(status_code=400, detail="Không có subtitle hợp lệ để xuất SRT.") | |
| srt_path.write_text("\n".join(lines), encoding="utf-8") | |
| return srt_path | |
| def burn_subtitles(job_dir: Path, video_path: Path, srt_path: Path) -> Path: | |
| output_path = job_dir / "output_subtitled.mp4" | |
| subtitle_filter = ( | |
| "subtitles=edited.srt:" | |
| "force_style='FontName=DejaVu Sans,FontSize=20,Outline=1,Shadow=0,MarginV=18,Alignment=2'" | |
| ) | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| video_path.name, | |
| "-vf", | |
| subtitle_filter, | |
| "-c:v", | |
| "libx264", | |
| "-preset", | |
| "veryfast", | |
| "-crf", | |
| "23", | |
| "-c:a", | |
| "aac", | |
| "-b:a", | |
| "192k", | |
| output_path.name, | |
| ] | |
| try: | |
| subprocess.run(cmd, cwd=job_dir, capture_output=True, text=True, check=True) | |
| except subprocess.CalledProcessError as exc: | |
| stderr = (exc.stderr or "").strip() | |
| raise HTTPException(status_code=500, detail=f"FFmpeg lỗi khi xuất MP4: {stderr[:1200]}") from exc | |
| return output_path | |
| def job_meta_path(job_dir: Path) -> Path: | |
| return job_dir / "meta.json" | |
| def save_job_meta(job_dir: Path, data: dict) -> None: | |
| job_meta_path(job_dir).write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") | |
| def load_job_meta(job_id: str) -> dict: | |
| meta = job_meta_path(WORK_DIR / job_id) | |
| if not meta.exists(): | |
| raise HTTPException(status_code=404, detail="Không tìm thấy job.") | |
| return json.loads(meta.read_text(encoding="utf-8")) | |
| def home(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| def health(): | |
| return { | |
| "ok": True, | |
| "ffmpeg": ffmpeg_exists(), | |
| "workspace": str(WORK_DIR), | |
| "default_model": DEFAULT_MODEL_SIZE, | |
| } | |
| def api_transcribe(file: UploadFile = File(...)): | |
| cleanup_old_jobs() | |
| if not ffmpeg_exists(): | |
| raise HTTPException(status_code=500, detail="Máy chủ chưa có FFmpeg.") | |
| filename = file.filename or "video.mp4" | |
| if not filename.lower().endswith((".mp4", ".mov", ".mkv", ".avi", ".webm", ".m4v")): | |
| raise HTTPException(status_code=400, detail="Chỉ hỗ trợ video mp4, mov, mkv, avi, webm, m4v.") | |
| job_id = uuid.uuid4().hex | |
| job_dir = WORK_DIR / job_id | |
| job_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| video_path = save_upload(file, job_dir) | |
| duration = run_ffprobe_duration(video_path) | |
| segments = transcribe_video(video_path) | |
| save_job_meta( | |
| job_dir, | |
| { | |
| "job_id": job_id, | |
| "video_path": video_path.name, | |
| "duration": duration, | |
| "created_at": datetime.utcnow().isoformat() + "Z", | |
| }, | |
| ) | |
| return JSONResponse( | |
| { | |
| "job_id": job_id, | |
| "duration": duration, | |
| "segments": [ | |
| { | |
| "id": seg.id, | |
| "start": format_srt_time(seg.start), | |
| "end": format_srt_time(seg.end), | |
| "text": seg.text, | |
| } | |
| for seg in segments | |
| ], | |
| } | |
| ) | |
| except Exception: | |
| shutil.rmtree(job_dir, ignore_errors=True) | |
| raise | |
| def api_export(payload: ExportRequest): | |
| job_dir = WORK_DIR / payload.job_id | |
| if not job_dir.exists(): | |
| raise HTTPException(status_code=404, detail="Job đã hết hạn hoặc không tồn tại.") | |
| meta = load_job_meta(payload.job_id) | |
| video_path = job_dir / meta["video_path"] | |
| if not video_path.exists(): | |
| raise HTTPException(status_code=404, detail="Không tìm thấy video gốc để xuất lại.") | |
| srt_path = write_srt(job_dir, payload.segments) | |
| response = { | |
| "job_id": payload.job_id, | |
| "srt_url": f"/download/{payload.job_id}/srt", | |
| "mp4_url": None, | |
| } | |
| if payload.burn_in: | |
| mp4_path = burn_subtitles(job_dir, video_path, srt_path) | |
| response["mp4_url"] = f"/download/{payload.job_id}/mp4" | |
| response["mp4_size_mb"] = round(mp4_path.stat().st_size / (1024 * 1024), 2) | |
| return JSONResponse(response) | |
| def download_srt(job_id: str): | |
| path = WORK_DIR / job_id / "edited.srt" | |
| if not path.exists(): | |
| raise HTTPException(status_code=404, detail="Chưa có file SRT.") | |
| return FileResponse(path, media_type="application/x-subrip", filename=f"{job_id}.srt") | |
| def download_mp4(job_id: str): | |
| path = WORK_DIR / job_id / "output_subtitled.mp4" | |
| if not path.exists(): | |
| raise HTTPException(status_code=404, detail="Chưa có file MP4.") | |
| return FileResponse(path, media_type="video/mp4", filename=f"{job_id}.mp4") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", "7860")) | |
| uvicorn.run("app:app", host="0.0.0.0", port=port, reload=False) | |