#!/usr/bin/env python3 import os import tempfile import shutil from pathlib import Path from datetime import datetime from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel import uvicorn try: from huggingface_hub import hf_hub_download, upload_file, list_repo_files import whisper except ImportError as e: print(f"Missing dependency: {e}") exit(1) # Load environment variables load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: print("Error: HF_TOKEN not found in .env file") exit(1) app = FastAPI(title="Movie Transcription Service") # In-memory job tracking jobs = {} class TranscriptionRequest(BaseModel): dataset_link: str model_size: str = "small" def format_timestamp(seconds: float) -> str: """Convert seconds to HH:MM:SS format.""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{secs:02d}" def transcribe_with_timestamps(video_path: str, model_size: str) -> str: """Transcribe video and include timestamps.""" print(f"Loading Whisper model: {model_size}") model = whisper.load_model(model_size) print(f"Transcribing audio from: {video_path}") result = model.transcribe(video_path) # Format transcript with timestamps transcript_lines = [] transcript_lines.append("=" * 80) transcript_lines.append("MOVIE TRANSCRIPTION WITH TIMESTAMPS") transcript_lines.append("=" * 80) transcript_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") transcript_lines.append("") if "segments" in result: for segment in result["segments"]: timestamp = format_timestamp(segment["start"]) text = segment["text"].strip() if text: transcript_lines.append(f"[{timestamp}] {text}") else: # Fallback if segments not available transcript_lines.append(result.get("text", "")) return "\n".join(transcript_lines) def extract_dataset_info(dataset_link: str) -> tuple: """Extract repo_id and filename from dataset link.""" # Examples: # https://huggingface.co/datasets/factorstudios/movs/blob/main/Captain.America.Brave.New.World.(NKIRI.COM).2025.mkv # factorstudios/movs/Captain.America.Brave.New.World.(NKIRI.COM).2025.mkv link = dataset_link.strip() # Validate input if not link: raise ValueError("Dataset link cannot be empty") if any(char in link for char in ["=", "\n", "\r", "DASHSCOPE", "API", "TOKEN"]): raise ValueError( "Invalid dataset link format. Please provide a valid Hugging Face dataset URL or path.\n" "Examples:\n" " https://huggingface.co/datasets/factorstudios/movs/blob/main/movie.mkv\n" " factorstudios/movs/movie.mkv" ) if "huggingface.co" in link: # Parse HF URL parts = link.split("/") if "datasets" in parts: try: idx = parts.index("datasets") owner = parts[idx + 1] repo = parts[idx + 2] # Find filename (after /blob/main/ or /blob/[branch]/) if "blob" in parts: blob_idx = parts.index("blob") filename = "/".join(parts[blob_idx + 2:]) else: filename = parts[-1] repo_id = f"{owner}/{repo}" if not filename: raise ValueError("No filename found in URL") return repo_id, filename except (IndexError, ValueError) as e: raise ValueError(f"Invalid Hugging Face dataset URL format: {e}") else: # Assume it's in format: owner/repo/filename parts = link.split("/") if len(parts) >= 3: repo_id = f"{parts[0]}/{parts[1]}" filename = "/".join(parts[2:]) if not filename: raise ValueError("No filename found in path") return repo_id, filename raise ValueError( f"Cannot parse dataset link. Please use:\n" f" https://huggingface.co/datasets/owner/repo/blob/main/file.mkv\n" f" or: owner/repo/file.mkv" ) async def process_transcription(job_id: str, dataset_link: str, model_size: str): """Background task to process transcription and upload.""" try: jobs[job_id]["status"] = "extracting_info" # Parse and validate dataset link try: repo_id, filename = extract_dataset_info(dataset_link) except ValueError as e: raise ValueError(f"Invalid dataset link: {str(e)}") jobs[job_id]["repo_id"] = repo_id jobs[job_id]["filename"] = filename # Create temp directory temp_dir = tempfile.mkdtemp() try: jobs[job_id]["status"] = "downloading" print(f"Downloading {filename} from {repo_id}...") # Download video local_path = hf_hub_download( repo_id=repo_id, filename=filename, repo_type="dataset", token=HF_TOKEN, ) # Resolve symlink if needed if os.path.islink(local_path): local_path = os.path.realpath(local_path) # Copy to temp location video_path = os.path.join(temp_dir, os.path.basename(filename)) shutil.copy2(local_path, video_path) jobs[job_id]["status"] = "transcribing" print(f"Starting transcription...") # Transcribe with timestamps transcript = transcribe_with_timestamps(video_path, model_size) # Prepare transcript file transcript_filename = os.path.splitext(os.path.basename(filename))[0] + ".transcript.txt" transcript_path = os.path.join(temp_dir, transcript_filename) with open(transcript_path, "w", encoding="utf-8") as f: f.write(transcript) jobs[job_id]["status"] = "uploading" print(f"Uploading transcript to dataset...") # Upload transcript to transcriptions folder repo_upload_path = f"transcriptions/{transcript_filename}" upload_file( path_or_fileobj=transcript_path, path_in_repo=repo_upload_path, repo_id=repo_id, repo_type="dataset", token=HF_TOKEN, commit_message=f"Add transcription for {os.path.basename(filename)}" ) jobs[job_id]["status"] = "completed" jobs[job_id]["transcript_path"] = repo_upload_path print(f"✓ Transcription completed and uploaded to {repo_upload_path}") finally: # Cleanup temp directory shutil.rmtree(temp_dir, ignore_errors=True) except Exception as e: jobs[job_id]["status"] = "failed" jobs[job_id]["error"] = str(e) print(f"✗ Error: {e}") @app.get("/", response_class=HTMLResponse) async def serve_ui(): """Serve the transcription UI.""" return """ Movie Transcription Service

🎬 Movie Transcription Service

Download, transcribe, and upload movie transcriptions with timestamps

Format: https://huggingface.co/datasets/owner/repo/blob/main/filename.mkv
or: owner/repo/filename.mkv
""" @app.post("/transcribe") async def start_transcription(request: TranscriptionRequest, background_tasks: BackgroundTasks): """Start a transcription job.""" import uuid job_id = str(uuid.uuid4()) jobs[job_id] = { "status": "queued", "dataset_link": request.dataset_link, "model_size": request.model_size, } background_tasks.add_task( process_transcription, job_id, request.dataset_link, request.model_size ) return JSONResponse({"job_id": job_id}) @app.get("/status/{job_id}") async def get_status(job_id: str): """Get the status of a transcription job.""" if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") return JSONResponse(jobs[job_id]) if __name__ == "__main__": print("Starting Movie Transcription Service...") print("Open http://localhost:7860 in your browser") uvicorn.run(app, host="0.0.0.0", port=7860)