trans / transcription_server.py
factorstudios's picture
Update transcription_server.py
e5e57a4 verified
#!/usr/bin/env python3
import os
import tempfile
import shutil
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
import uvicorn
try:
from huggingface_hub import hf_hub_download, upload_file, list_repo_files
import whisper
except ImportError as e:
print(f"Missing dependency: {e}")
exit(1)
# Load environment variables
load_dotenv()
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
print("Error: HF_TOKEN not found in .env file")
exit(1)
app = FastAPI(title="Movie Transcription Service")
# In-memory job tracking
jobs = {}
class TranscriptionRequest(BaseModel):
dataset_link: str
model_size: str = "small"
def format_timestamp(seconds: float) -> str:
"""Convert seconds to HH:MM:SS format."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
def transcribe_with_timestamps(video_path: str, model_size: str) -> str:
"""Transcribe video and include timestamps."""
print(f"Loading Whisper model: {model_size}")
model = whisper.load_model(model_size)
print(f"Transcribing audio from: {video_path}")
result = model.transcribe(video_path)
# Format transcript with timestamps
transcript_lines = []
transcript_lines.append("=" * 80)
transcript_lines.append("MOVIE TRANSCRIPTION WITH TIMESTAMPS")
transcript_lines.append("=" * 80)
transcript_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
transcript_lines.append("")
if "segments" in result:
for segment in result["segments"]:
timestamp = format_timestamp(segment["start"])
text = segment["text"].strip()
if text:
transcript_lines.append(f"[{timestamp}] {text}")
else:
# Fallback if segments not available
transcript_lines.append(result.get("text", ""))
return "\n".join(transcript_lines)
def extract_dataset_info(dataset_link: str) -> tuple:
"""Extract repo_id and filename from dataset link."""
# Examples:
# https://huggingface.co/datasets/factorstudios/movs/blob/main/Captain.America.Brave.New.World.(NKIRI.COM).2025.mkv
# factorstudios/movs/Captain.America.Brave.New.World.(NKIRI.COM).2025.mkv
link = dataset_link.strip()
# Validate input
if not link:
raise ValueError("Dataset link cannot be empty")
if any(char in link for char in ["=", "\n", "\r", "DASHSCOPE", "API", "TOKEN"]):
raise ValueError(
"Invalid dataset link format. Please provide a valid Hugging Face dataset URL or path.\n"
"Examples:\n"
" https://huggingface.co/datasets/factorstudios/movs/blob/main/movie.mkv\n"
" factorstudios/movs/movie.mkv"
)
if "huggingface.co" in link:
# Parse HF URL
parts = link.split("/")
if "datasets" in parts:
try:
idx = parts.index("datasets")
owner = parts[idx + 1]
repo = parts[idx + 2]
# Find filename (after /blob/main/ or /blob/[branch]/)
if "blob" in parts:
blob_idx = parts.index("blob")
filename = "/".join(parts[blob_idx + 2:])
else:
filename = parts[-1]
repo_id = f"{owner}/{repo}"
if not filename:
raise ValueError("No filename found in URL")
return repo_id, filename
except (IndexError, ValueError) as e:
raise ValueError(f"Invalid Hugging Face dataset URL format: {e}")
else:
# Assume it's in format: owner/repo/filename
parts = link.split("/")
if len(parts) >= 3:
repo_id = f"{parts[0]}/{parts[1]}"
filename = "/".join(parts[2:])
if not filename:
raise ValueError("No filename found in path")
return repo_id, filename
raise ValueError(
f"Cannot parse dataset link. Please use:\n"
f" https://huggingface.co/datasets/owner/repo/blob/main/file.mkv\n"
f" or: owner/repo/file.mkv"
)
async def process_transcription(job_id: str, dataset_link: str, model_size: str):
"""Background task to process transcription and upload."""
try:
jobs[job_id]["status"] = "extracting_info"
# Parse and validate dataset link
try:
repo_id, filename = extract_dataset_info(dataset_link)
except ValueError as e:
raise ValueError(f"Invalid dataset link: {str(e)}")
jobs[job_id]["repo_id"] = repo_id
jobs[job_id]["filename"] = filename
# Create temp directory
temp_dir = tempfile.mkdtemp()
try:
jobs[job_id]["status"] = "downloading"
print(f"Downloading {filename} from {repo_id}...")
# Download video
local_path = hf_hub_download(
repo_id=repo_id,
filename=filename,
repo_type="dataset",
token=HF_TOKEN,
)
# Resolve symlink if needed
if os.path.islink(local_path):
local_path = os.path.realpath(local_path)
# Copy to temp location
video_path = os.path.join(temp_dir, os.path.basename(filename))
shutil.copy2(local_path, video_path)
jobs[job_id]["status"] = "transcribing"
print(f"Starting transcription...")
# Transcribe with timestamps
transcript = transcribe_with_timestamps(video_path, model_size)
# Prepare transcript file
transcript_filename = os.path.splitext(os.path.basename(filename))[0] + ".transcript.txt"
transcript_path = os.path.join(temp_dir, transcript_filename)
with open(transcript_path, "w", encoding="utf-8") as f:
f.write(transcript)
jobs[job_id]["status"] = "uploading"
print(f"Uploading transcript to dataset...")
# Upload transcript to transcriptions folder
repo_upload_path = f"transcriptions/{transcript_filename}"
upload_file(
path_or_fileobj=transcript_path,
path_in_repo=repo_upload_path,
repo_id=repo_id,
repo_type="dataset",
token=HF_TOKEN,
commit_message=f"Add transcription for {os.path.basename(filename)}"
)
jobs[job_id]["status"] = "completed"
jobs[job_id]["transcript_path"] = repo_upload_path
print(f"✓ Transcription completed and uploaded to {repo_upload_path}")
finally:
# Cleanup temp directory
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
jobs[job_id]["status"] = "failed"
jobs[job_id]["error"] = str(e)
print(f"✗ Error: {e}")
@app.get("/", response_class=HTMLResponse)
async def serve_ui():
"""Serve the transcription UI."""
return """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Movie Transcription Service</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
padding: 20px;
}
.container {
background: white;
border-radius: 12px;
box-shadow: 0 20px 60px rgba(0, 0, 0, 0.3);
max-width: 600px;
width: 100%;
padding: 40px;
}
.header {
text-align: center;
margin-bottom: 30px;
}
.header h1 {
color: #333;
font-size: 28px;
margin-bottom: 10px;
}
.header p {
color: #666;
font-size: 14px;
}
.form-group {
margin-bottom: 20px;
}
label {
display: block;
margin-bottom: 8px;
color: #333;
font-weight: 500;
font-size: 14px;
}
input, select {
width: 100%;
padding: 12px;
border: 2px solid #e0e0e0;
border-radius: 6px;
font-size: 14px;
transition: border-color 0.3s;
}
input:focus, select:focus {
outline: none;
border-color: #667eea;
}
button {
width: 100%;
padding: 12px;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
border-radius: 6px;
font-size: 16px;
font-weight: 600;
cursor: pointer;
transition: transform 0.2s;
}
button:hover {
transform: translateY(-2px);
}
button:disabled {
opacity: 0.6;
cursor: not-allowed;
transform: none;
}
.status-section {
margin-top: 30px;
padding-top: 30px;
border-top: 2px solid #f0f0f0;
}
.status-item {
display: none;
padding: 16px;
border-radius: 6px;
margin-bottom: 12px;
font-size: 14px;
}
.status-item.active {
display: block;
}
.status-item.info {
background: #e3f2fd;
color: #1976d2;
border-left: 4px solid #1976d2;
}
.status-item.success {
background: #e8f5e9;
color: #388e3c;
border-left: 4px solid #388e3c;
}
.status-item.error {
background: #ffebee;
color: #d32f2f;
border-left: 4px solid #d32f2f;
}
.spinner {
display: inline-block;
width: 12px;
height: 12px;
border: 2px solid #ccc;
border-top-color: #1976d2;
border-radius: 50%;
animation: spin 0.6s linear infinite;
margin-right: 8px;
}
@keyframes spin {
to { transform: rotate(360deg); }
}
.job-id {
font-family: 'Courier New', monospace;
font-size: 12px;
color: #999;
margin-top: 8px;
word-break: break-all;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🎬 Movie Transcription Service</h1>
<p>Download, transcribe, and upload movie transcriptions with timestamps</p>
</div>
<form id="transcriptionForm">
<div class="form-group">
<label for="datasetLink">Dataset Link or URL</label>
<input
type="text"
id="datasetLink"
placeholder="https://huggingface.co/datasets/factorstudios/movs/blob/main/movie.mkv"
title="Enter a Hugging Face dataset URL or path (owner/repo/filename.mkv)"
required
>
<small style="display: block; margin-top: 6px; color: #999; font-size: 12px;">
Format: https://huggingface.co/datasets/owner/repo/blob/main/filename.mkv<br>
or: owner/repo/filename.mkv
</small>
</div>
<div class="form-group">
<label for="modelSize">Whisper Model Size</label>
<select id="modelSize">
<option value="tiny">Tiny (Fast)</option>
<option value="base">Base</option>
<option value="small" selected>Small (Recommended)</option>
<option value="medium">Medium</option>
<option value="large">Large (Slow but Accurate)</option>
</select>
</div>
<button type="submit" id="submitBtn">Start Transcription</button>
</form>
<div class="status-section" id="statusSection" style="display: none;">
<div id="statusMessages"></div>
<div class="job-id" id="jobId"></div>
</div>
</div>
<script>
const form = document.getElementById('transcriptionForm');
const statusSection = document.getElementById('statusSection');
const statusMessages = document.getElementById('statusMessages');
const jobId = document.getElementById('jobId');
const submitBtn = document.getElementById('submitBtn');
form.addEventListener('submit', async (e) => {
e.preventDefault();
const datasetLink = document.getElementById('datasetLink').value;
const modelSize = document.getElementById('modelSize').value;
submitBtn.disabled = true;
statusSection.style.display = 'block';
statusMessages.innerHTML = '';
try {
// Submit transcription request
const response = await fetch('/transcribe', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
dataset_link: datasetLink,
model_size: modelSize
})
});
if (!response.ok) {
throw new Error(await response.text());
}
const data = await response.json();
const currentJobId = data.job_id;
jobId.textContent = `Job ID: ${currentJobId}`;
addStatus('info', '<span class="spinner"></span>Transcription started...', true);
// Poll for status updates
let completed = false;
while (!completed) {
await new Promise(resolve => setTimeout(resolve, 2000));
const statusResponse = await fetch(`/status/${currentJobId}`);
const statusData = await statusResponse.json();
const status = statusData.status;
if (status === 'completed') {
addStatus('success', '✓ Transcription completed and uploaded!');
addStatus('info', `📁 File: ${statusData.transcript_path}`);
completed = true;
} else if (status === 'failed') {
addStatus('error', `✗ Error: ${statusData.error}`);
completed = true;
} else {
const statusText = status.charAt(0).toUpperCase() + status.slice(1).replace(/_/g, ' ');
addStatus('info', `<span class="spinner"></span>${statusText}...`, true);
}
}
} catch (error) {
addStatus('error', `✗ Error: ${error.message}`);
} finally {
submitBtn.disabled = false;
}
});
function addStatus(type, message, replace = false) {
if (replace) {
statusMessages.innerHTML = '';
}
const div = document.createElement('div');
div.className = `status-item active ${type}`;
div.innerHTML = message;
statusMessages.appendChild(div);
statusMessages.parentElement.scrollIntoView({ behavior: 'smooth', block: 'nearest' });
}
</script>
</body>
</html>
"""
@app.post("/transcribe")
async def start_transcription(request: TranscriptionRequest, background_tasks: BackgroundTasks):
"""Start a transcription job."""
import uuid
job_id = str(uuid.uuid4())
jobs[job_id] = {
"status": "queued",
"dataset_link": request.dataset_link,
"model_size": request.model_size,
}
background_tasks.add_task(
process_transcription,
job_id,
request.dataset_link,
request.model_size
)
return JSONResponse({"job_id": job_id})
@app.get("/status/{job_id}")
async def get_status(job_id: str):
"""Get the status of a transcription job."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
return JSONResponse(jobs[job_id])
if __name__ == "__main__":
print("Starting Movie Transcription Service...")
print("Open http://localhost:7860 in your browser")
uvicorn.run(app, host="0.0.0.0", port=7860)