| import os |
| import uuid |
| import subprocess |
| import requests |
| import re |
| import math |
| import datetime |
| from fastapi import FastAPI, File, UploadFile, HTTPException, Form |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| SARVAM_API_KEY = os.getenv("SARVAM_API_KEY", "") |
|
|
| app = FastAPI(title="ReelText AI Transcription API") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| TEMP_DIR = "temp" |
| os.makedirs(TEMP_DIR, exist_ok=True) |
|
|
| print("ReelText AI Backend ready!") |
|
|
|
|
| def format_time(seconds): |
| """Formats seconds into SRT timestamp format HH:MM:SS,MMM""" |
| td = datetime.timedelta(seconds=float(seconds)) |
| total_secs = int(td.total_seconds()) |
| hours = total_secs // 3600 |
| minutes = (total_secs % 3600) // 60 |
| secs = total_secs % 60 |
| millisecs = int((float(seconds) - int(float(seconds))) * 1000) |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{millisecs:03d}" |
|
|
|
|
| def find_ffmpeg(): |
| import shutil |
| ffmpeg_in_path = shutil.which("ffmpeg") |
| if ffmpeg_in_path: |
| return ffmpeg_in_path |
|
|
| common_paths = [ |
| r"C:\Program Files\ffmpeg\bin\ffmpeg.exe", |
| r"C:\ffmpeg\bin\ffmpeg.exe", |
| r"C:\tools\ffmpeg\bin\ffmpeg.exe", |
| ] |
|
|
| winget_base = os.path.expandvars(r"%LOCALAPPDATA%\Microsoft\WinGet\Packages") |
| if os.path.isdir(winget_base): |
| for folder in os.listdir(winget_base): |
| if "FFmpeg" in folder or "ffmpeg" in folder: |
| for root, dirs, files in os.walk(os.path.join(winget_base, folder)): |
| if "ffmpeg.exe" in files: |
| common_paths.insert(0, os.path.join(root, "ffmpeg.exe")) |
|
|
| for path in common_paths: |
| if os.path.isfile(path): |
| return path |
| return None |
|
|
|
|
| def get_audio_duration(audio_path: str, ffmpeg_path: str) -> float: |
| """Get audio duration in seconds using ffmpeg stderr parsing.""" |
| try: |
| result = subprocess.run( |
| [ffmpeg_path, "-i", audio_path], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| timeout=15 |
| ) |
| output = result.stderr.decode("utf-8", errors="ignore") |
| match = re.search(r"Duration:\s*(\d+):(\d+):(\d+\.?\d*)", output) |
| if match: |
| h = int(match.group(1)) |
| m = int(match.group(2)) |
| s = float(match.group(3)) |
| duration = h * 3600 + m * 60 + s |
| print(f"Audio duration detected: {duration:.1f}s") |
| return duration |
| except Exception as e: |
| print(f"Duration detection failed: {e}") |
|
|
| print("Warning: Could not detect duration, assuming 120s") |
| return 120.0 |
|
|
|
|
| def extract_audio(video_path: str, audio_path: str): |
| ffmpeg_path = find_ffmpeg() |
| if not ffmpeg_path: |
| raise Exception("FFmpeg is not installed. Run: winget install Gyan.FFmpeg") |
| try: |
| subprocess.run( |
| [ffmpeg_path, "-i", video_path, "-q:a", "0", "-map", "a", audio_path, "-y"], |
| check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL |
| ) |
| return True |
| except subprocess.CalledProcessError: |
| return False |
|
|
|
|
| def transcribe_with_sarvam(audio_path: str, api_key: str = None) -> dict: |
| """ |
| Transcribes using Sarvam AI Saaras v3 (translit mode = Romanized Hinglish). |
| Splits audio into 25s chunks automatically for videos longer than 30s. |
| """ |
| url = "https://api.sarvam.ai/speech-to-text" |
| active_api_key = api_key if api_key else SARVAM_API_KEY |
| headers = {"api-subscription-key": active_api_key} |
| ffmpeg_path = find_ffmpeg() |
|
|
| total_duration = get_audio_duration(audio_path, ffmpeg_path) |
|
|
| CHUNK_DURATION = 25 |
| OVERLAP = 2 |
| num_chunks = math.ceil(total_duration / CHUNK_DURATION) |
| print(f"Processing {num_chunks} chunk(s) for {total_duration:.1f}s audio") |
|
|
| all_words = [] |
| all_starts = [] |
| all_ends = [] |
| full_transcript = "" |
| detected_lang = "hi-en" |
|
|
| for i in range(num_chunks): |
| chunk_start = i * CHUNK_DURATION |
| fetch_duration = CHUNK_DURATION + (OVERLAP if i < num_chunks - 1 else 0) |
| chunk_path = audio_path.replace(".mp3", f"_chunk{i}.mp3") |
|
|
| try: |
| subprocess.run( |
| [ffmpeg_path, "-i", audio_path, |
| "-ss", str(chunk_start), "-t", str(fetch_duration), |
| "-q:a", "0", chunk_path, "-y"], |
| check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL |
| ) |
|
|
| with open(chunk_path, "rb") as f: |
| files = {"file": (os.path.basename(chunk_path), f, "audio/mpeg")} |
| data = { |
| "model": "saaras:v3", |
| "language_code": "unknown", |
| "mode": "translit", |
| "with_timestamps": "true", |
| } |
| response = requests.post(url, headers=headers, files=files, data=data, timeout=60) |
|
|
| if response.status_code != 200: |
| raise Exception(f"Sarvam API error {response.status_code}: {response.text}") |
|
|
| chunk_result = response.json() |
| chunk_transcript = chunk_result.get("transcript", "") |
| full_transcript += (" " if full_transcript else "") + chunk_transcript |
| detected_lang = chunk_result.get("language_code", "hi-en") |
| print(f"Chunk {i+1}/{num_chunks} done: {len(chunk_transcript)} chars | timestamps: {bool(chunk_result.get('timestamps', {}).get('words'))}") |
|
|
| timestamps = chunk_result.get("timestamps", {}) |
| words_raw = timestamps.get("words", []) |
| starts = timestamps.get("start_time_seconds", []) |
| ends_ts = timestamps.get("end_time_seconds", []) |
|
|
| for j, word in enumerate(words_raw): |
| local_start = starts[j] if j < len(starts) else 0 |
| w_start = local_start + chunk_start |
| w_end = (ends_ts[j] if j < len(ends_ts) else local_start + 0.3) + chunk_start |
| |
| if i > 0 and local_start < OVERLAP: |
| continue |
| all_words.append(word) |
| all_starts.append(w_start) |
| all_ends.append(w_end) |
|
|
| finally: |
| if os.path.exists(chunk_path): |
| os.remove(chunk_path) |
|
|
| |
| |
| segments_data = [] |
| srt_content = "" |
| segment_index = 1 |
| words_data = [] |
| print(f"Total words collected: {len(all_words)}, full_transcript length: {len(full_transcript)}") |
|
|
| |
| sentences = re.split(r'(?<=[.!?।])\s+', full_transcript.strip()) |
| |
| if len(sentences) <= 1: |
| words_list = full_transcript.strip().split() |
| sentences = [' '.join(words_list[i:i+12]) for i in range(0, len(words_list), 12)] |
|
|
| chunk_offset = 0.0 |
| words_per_sec = 2.5 |
|
|
| for sent in sentences: |
| sent = sent.strip() |
| if not sent: |
| continue |
| word_count = len(sent.split()) |
| duration_est = word_count / words_per_sec |
| ts_display = f"{int(chunk_offset // 60):02d}:{int(chunk_offset % 60):02d}" |
| segments_data.append({"ts": ts_display, "text": sent, "lang": detected_lang}) |
| srt_content += f"{segment_index}\n{format_time(chunk_offset)} --> {format_time(chunk_offset + duration_est)}\n{sent}\n\n" |
| segment_index += 1 |
| chunk_offset += duration_est |
|
|
| return { |
| "status": "success", |
| "message": f"Transcribed via Sarvam AI. Language: {detected_lang}", |
| "text": full_transcript, |
| "srt": srt_content.strip(), |
| "words": words_data, |
| "segments": segments_data, |
| "duration": total_duration, |
| "engine": "sarvam", |
| } |
|
|
|
|
|
|
| @app.post("/api/transcribe") |
| async def transcribe_video( |
| file: UploadFile = File(...), |
| api_key: str = Form(None) |
| ): |
| if not file.filename.endswith(('.mp4', '.mov', '.avi', '.webm', '.mkv')): |
| raise HTTPException(status_code=400, detail="Unsupported format. Upload MP4, MOV, WEBM, AVI or MKV.") |
|
|
| temp_id = str(uuid.uuid4()) |
| video_path = os.path.join(TEMP_DIR, f"{temp_id}_{file.filename}") |
| audio_path = os.path.join(TEMP_DIR, f"{temp_id}.mp3") |
|
|
| try: |
| with open(video_path, "wb") as f: |
| f.write(await file.read()) |
|
|
| success = extract_audio(video_path, audio_path) |
| if not success: |
| raise HTTPException(status_code=500, detail="Failed to extract audio from video.") |
|
|
| active_key = api_key if api_key else SARVAM_API_KEY |
| if not active_key or active_key == "your_sarvam_api_key_here": |
| raise HTTPException(status_code=400, detail="Sarvam API key not provided. Enter it on the website or set it in backend .env.") |
|
|
| return transcribe_with_sarvam(audio_path, active_key) |
|
|
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| finally: |
| if os.path.exists(video_path): |
| os.remove(video_path) |
| if os.path.exists(audio_path): |
| os.remove(audio_path) |
|
|
| FRONTEND_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "frontend")) |
| if os.path.exists(FRONTEND_DIR): |
| app.mount("/", StaticFiles(directory=FRONTEND_DIR, html=True), name="frontend") |
| else: |
| @app.get("/") |
| def read_root(): |
| return {"message": "ReelText AI Transcription API is running!"} |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) |
|
|