transcribe / app.py
bigbossmonster's picture
Update app.py
bc87dbf verified
import os
import time
import shutil
import subprocess
import json
import mimetypes
import datetime
import re
from fastapi import FastAPI, UploadFile, Form
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
# NEW SDK IMPORT
from google import genai
from google.genai import types
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
MIME_MAP = {
".mp4": "video/mp4",
".mp3": "audio/mpeg",
".wav": "audio/wav",
".mov": "video/quicktime",
".mkv": "video/x-matroska",
".flac": "audio/flac",
".ogg": "audio/ogg",
".webm": "video/webm",
".m4a": "audio/mp4",
".aac": "audio/aac"
}
# --- UTILITY FUNCTIONS ---
def log_msg(message: str) -> str:
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
return f"STATUS:[{timestamp}] {message}\n"
def get_accurate_duration(file_path: str) -> float:
try:
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "json",
file_path
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=60)
data = json.loads(result.stdout)
return float(data["format"]["duration"])
except Exception as e:
print(f"FFprobe Error: {e}")
return 0.0
def format_timestamp(seconds: float) -> str:
"""Converts seconds to standard SRT format HH:MM:SS,mmm"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds % 1) * 1000)
return f"{h:02}:{m:02}:{s:02},{ms:03}"
def normalize_srt_line(line: str) -> str:
"""
detects if AI used 'Total Minutes' (MMM:SS) and converts to HH:MM:SS,mmm
Input: "125:05 --> 125:10" (125 mins, 5 secs)
Output: "02:05:05,000 --> 02:05:10,000"
"""
pattern = r"(\d+):(\d{2})(?:,(\d{3}))?"
def convert_match(match):
minutes = int(match.group(1))
seconds = int(match.group(2))
milliseconds = match.group(3) if match.group(3) else "000"
hours = minutes // 60
rem_minutes = minutes % 60
return f"{hours:02}:{rem_minutes:02}:{seconds:02},{milliseconds}"
if "-->" in line:
return re.sub(pattern, convert_match, line)
return line
# --- CORE GENERATOR ---
async def stream_generator(file_path: str, display_name: str, api_key: str, model_id: str, include_songs: bool, include_on_screen_text: bool, mime_type: str):
client = genai.Client(api_key=api_key)
file_uri = None
try:
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
yield log_msg(f"Initializing: {display_name} ({file_size_mb:.2f} MB)")
# 1. FFmpeg Duration Check
yield log_msg("Analyzing exact duration with FFmpeg...")
duration = get_accurate_duration(file_path)
total_duration_str = format_timestamp(duration) if duration > 0 else "Unknown"
yield log_msg(f"Exact Duration: {total_duration_str}")
# 2. Upload File
yield log_msg(f"Uploading to Google ({mime_type})...")
video_file = client.files.upload(
file=file_path,
config=types.UploadFileConfig(display_name=display_name, mime_type=mime_type)
)
file_uri = video_file.name
# 3. Wait for Processing
yield log_msg("Waiting for Google AI processing...")
while video_file.state.name == "PROCESSING":
time.sleep(2)
video_file = client.files.get(name=file_uri)
if video_file.state.name == "FAILED":
raise ValueError(f"Gemini processing failed: {video_file.state.name}")
yield log_msg(f"Ready. Generating subtitles...")
# 4. === INSTRUCTIONS ===
# Define variables for prompt injection
start_time_str = "00:00:00,000"
end_time_str = total_duration_str if total_duration_str != "Unknown" else "the final frame"
instruction_set = f"""
1. Transcribe the audio from {start_time_str} to the VERY END of the file ({end_time_str}).
2. Translate to natural Burmese (Myanmar).
3. Do NOT summarize. Transcribe verbatim.
"""
if include_songs:
instruction_set += "\n4. **LYRICS**: Prefix with 🎶."
else:
instruction_set += "\n4. **MUSIC**: Ignore music."
if include_on_screen_text:
instruction_set += "\n5. **VISUALS**: Translate significant text using 📝."
# === THE NEW TIMESTAMP LOGIC ===
instruction_set += f"""
---
**CRITICAL TIMESTAMP RULES (TO PREVENT ERRORS)**:
- **DO NOT** use Hours. Use **TOTAL MINUTES** format only.
- **Format**: `MMM:SS --> MMM:SS`
- **Example**: For 1 hour 5 minutes, write `65:00`, NOT `01:05:00`.
- **Example**: `118:50 --> 118:52` (This means 1hr 58m 50s).
- **Constraint**: The video is {total_duration_str} long. Do not exceed this.
"""
prompt = f"""
Task: Create full Burmese (Myanmar) SRT subtitles for this video.
Video Duration: {total_duration_str}
Instructions: {instruction_set}
Output the subtitle file content directly.
"""
# 5. Streaming
response_stream = client.models.generate_content_stream(
model=model_id,
contents=[video_file, prompt],
config=types.GenerateContentConfig(temperature=0.2)
)
buffer = ""
for chunk in response_stream:
if chunk.text:
buffer += chunk.text
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
# Fix timestamp logic on the fly
fixed_line = normalize_srt_line(line)
# === FIX: Move JSON dump outside f-string ===
json_data = json.dumps({'chunk': fixed_line + '\n'})
yield f"DATA:{json_data}\n\n"
# Yield remaining buffer
if buffer:
fixed_line = normalize_srt_line(buffer)
# === FIX: Move JSON dump outside f-string ===
json_data = json.dumps({'chunk': fixed_line})
yield f"DATA:{json_data}\n\n"
yield log_msg("Generation finished.")
yield "DONE:Complete\n"
except Exception as e:
yield f"ERROR:{str(e)}\n"
yield log_msg(f"EXCEPTION: {str(e)}")
finally:
if file_uri:
try:
client.files.delete(name=file_uri)
except:
pass
if os.path.exists(file_path):
try:
os.remove(file_path)
except:
pass
@app.post("/transcribe")
async def transcribe_endpoint(
file: UploadFile,
api_key: str = Form(...),
model: str = Form("gemini-2.0-flash"),
include_songs: bool = Form(False),
include_on_screen_text: bool = Form(False)
):
filename = file.filename or "upload"
base, ext = os.path.splitext(filename)
if not ext: ext = mimetypes.guess_extension(file.content_type or "") or ".mp4"
ext = ext.lower()
mime_type = MIME_MAP.get(ext, "video/mp4")
os.makedirs("/tmp/uploads", exist_ok=True)
safe_filename = f"temp_{int(time.time())}{ext}"
temp_path = f"/tmp/uploads/{safe_filename}"
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return StreamingResponse(
stream_generator(temp_path, filename, api_key, model, include_songs, include_on_screen_text, mime_type),
media_type="text/event-stream"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)