Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import shutil | |
| import subprocess | |
| import json | |
| import mimetypes | |
| import datetime | |
| import re | |
| from fastapi import FastAPI, UploadFile, Form | |
| from fastapi.responses import StreamingResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| # NEW SDK IMPORT | |
| from google import genai | |
| from google.genai import types | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| MIME_MAP = { | |
| ".mp4": "video/mp4", | |
| ".mp3": "audio/mpeg", | |
| ".wav": "audio/wav", | |
| ".mov": "video/quicktime", | |
| ".mkv": "video/x-matroska", | |
| ".flac": "audio/flac", | |
| ".ogg": "audio/ogg", | |
| ".webm": "video/webm", | |
| ".m4a": "audio/mp4", | |
| ".aac": "audio/aac" | |
| } | |
| # --- UTILITY FUNCTIONS --- | |
| def log_msg(message: str) -> str: | |
| timestamp = datetime.datetime.now().strftime("%H:%M:%S") | |
| return f"STATUS:[{timestamp}] {message}\n" | |
| def get_accurate_duration(file_path: str) -> float: | |
| try: | |
| cmd = [ | |
| "ffprobe", | |
| "-v", "error", | |
| "-show_entries", "format=duration", | |
| "-of", "json", | |
| file_path | |
| ] | |
| result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=60) | |
| data = json.loads(result.stdout) | |
| return float(data["format"]["duration"]) | |
| except Exception as e: | |
| print(f"FFprobe Error: {e}") | |
| return 0.0 | |
| def format_timestamp(seconds: float) -> str: | |
| """Converts seconds to standard SRT format HH:MM:SS,mmm""" | |
| h = int(seconds // 3600) | |
| m = int((seconds % 3600) // 60) | |
| s = int(seconds % 60) | |
| ms = int((seconds % 1) * 1000) | |
| return f"{h:02}:{m:02}:{s:02},{ms:03}" | |
| def normalize_srt_line(line: str) -> str: | |
| """ | |
| detects if AI used 'Total Minutes' (MMM:SS) and converts to HH:MM:SS,mmm | |
| Input: "125:05 --> 125:10" (125 mins, 5 secs) | |
| Output: "02:05:05,000 --> 02:05:10,000" | |
| """ | |
| pattern = r"(\d+):(\d{2})(?:,(\d{3}))?" | |
| def convert_match(match): | |
| minutes = int(match.group(1)) | |
| seconds = int(match.group(2)) | |
| milliseconds = match.group(3) if match.group(3) else "000" | |
| hours = minutes // 60 | |
| rem_minutes = minutes % 60 | |
| return f"{hours:02}:{rem_minutes:02}:{seconds:02},{milliseconds}" | |
| if "-->" in line: | |
| return re.sub(pattern, convert_match, line) | |
| return line | |
| # --- CORE GENERATOR --- | |
| async def stream_generator(file_path: str, display_name: str, api_key: str, model_id: str, include_songs: bool, include_on_screen_text: bool, mime_type: str): | |
| client = genai.Client(api_key=api_key) | |
| file_uri = None | |
| try: | |
| file_size_mb = os.path.getsize(file_path) / (1024 * 1024) | |
| yield log_msg(f"Initializing: {display_name} ({file_size_mb:.2f} MB)") | |
| # 1. FFmpeg Duration Check | |
| yield log_msg("Analyzing exact duration with FFmpeg...") | |
| duration = get_accurate_duration(file_path) | |
| total_duration_str = format_timestamp(duration) if duration > 0 else "Unknown" | |
| yield log_msg(f"Exact Duration: {total_duration_str}") | |
| # 2. Upload File | |
| yield log_msg(f"Uploading to Google ({mime_type})...") | |
| video_file = client.files.upload( | |
| file=file_path, | |
| config=types.UploadFileConfig(display_name=display_name, mime_type=mime_type) | |
| ) | |
| file_uri = video_file.name | |
| # 3. Wait for Processing | |
| yield log_msg("Waiting for Google AI processing...") | |
| while video_file.state.name == "PROCESSING": | |
| time.sleep(2) | |
| video_file = client.files.get(name=file_uri) | |
| if video_file.state.name == "FAILED": | |
| raise ValueError(f"Gemini processing failed: {video_file.state.name}") | |
| yield log_msg(f"Ready. Generating subtitles...") | |
| # 4. === INSTRUCTIONS === | |
| # Define variables for prompt injection | |
| start_time_str = "00:00:00,000" | |
| end_time_str = total_duration_str if total_duration_str != "Unknown" else "the final frame" | |
| instruction_set = f""" | |
| 1. Transcribe the audio from {start_time_str} to the VERY END of the file ({end_time_str}). | |
| 2. Translate to natural Burmese (Myanmar). | |
| 3. Do NOT summarize. Transcribe verbatim. | |
| """ | |
| if include_songs: | |
| instruction_set += "\n4. **LYRICS**: Prefix with 🎶." | |
| else: | |
| instruction_set += "\n4. **MUSIC**: Ignore music." | |
| if include_on_screen_text: | |
| instruction_set += "\n5. **VISUALS**: Translate significant text using 📝." | |
| # === THE NEW TIMESTAMP LOGIC === | |
| instruction_set += f""" | |
| --- | |
| **CRITICAL TIMESTAMP RULES (TO PREVENT ERRORS)**: | |
| - **DO NOT** use Hours. Use **TOTAL MINUTES** format only. | |
| - **Format**: `MMM:SS --> MMM:SS` | |
| - **Example**: For 1 hour 5 minutes, write `65:00`, NOT `01:05:00`. | |
| - **Example**: `118:50 --> 118:52` (This means 1hr 58m 50s). | |
| - **Constraint**: The video is {total_duration_str} long. Do not exceed this. | |
| """ | |
| prompt = f""" | |
| Task: Create full Burmese (Myanmar) SRT subtitles for this video. | |
| Video Duration: {total_duration_str} | |
| Instructions: {instruction_set} | |
| Output the subtitle file content directly. | |
| """ | |
| # 5. Streaming | |
| response_stream = client.models.generate_content_stream( | |
| model=model_id, | |
| contents=[video_file, prompt], | |
| config=types.GenerateContentConfig(temperature=0.2) | |
| ) | |
| buffer = "" | |
| for chunk in response_stream: | |
| if chunk.text: | |
| buffer += chunk.text | |
| while '\n' in buffer: | |
| line, buffer = buffer.split('\n', 1) | |
| # Fix timestamp logic on the fly | |
| fixed_line = normalize_srt_line(line) | |
| # === FIX: Move JSON dump outside f-string === | |
| json_data = json.dumps({'chunk': fixed_line + '\n'}) | |
| yield f"DATA:{json_data}\n\n" | |
| # Yield remaining buffer | |
| if buffer: | |
| fixed_line = normalize_srt_line(buffer) | |
| # === FIX: Move JSON dump outside f-string === | |
| json_data = json.dumps({'chunk': fixed_line}) | |
| yield f"DATA:{json_data}\n\n" | |
| yield log_msg("Generation finished.") | |
| yield "DONE:Complete\n" | |
| except Exception as e: | |
| yield f"ERROR:{str(e)}\n" | |
| yield log_msg(f"EXCEPTION: {str(e)}") | |
| finally: | |
| if file_uri: | |
| try: | |
| client.files.delete(name=file_uri) | |
| except: | |
| pass | |
| if os.path.exists(file_path): | |
| try: | |
| os.remove(file_path) | |
| except: | |
| pass | |
| async def transcribe_endpoint( | |
| file: UploadFile, | |
| api_key: str = Form(...), | |
| model: str = Form("gemini-2.0-flash"), | |
| include_songs: bool = Form(False), | |
| include_on_screen_text: bool = Form(False) | |
| ): | |
| filename = file.filename or "upload" | |
| base, ext = os.path.splitext(filename) | |
| if not ext: ext = mimetypes.guess_extension(file.content_type or "") or ".mp4" | |
| ext = ext.lower() | |
| mime_type = MIME_MAP.get(ext, "video/mp4") | |
| os.makedirs("/tmp/uploads", exist_ok=True) | |
| safe_filename = f"temp_{int(time.time())}{ext}" | |
| temp_path = f"/tmp/uploads/{safe_filename}" | |
| with open(temp_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| return StreamingResponse( | |
| stream_generator(temp_path, filename, api_key, model, include_songs, include_on_screen_text, mime_type), | |
| media_type="text/event-stream" | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |