import os import uuid import threading import time from datetime import timedelta from queue import Queue from flask import Flask, render_template, request, send_file, jsonify import whisper import ffmpeg import requests app = Flask(__name__) UPLOAD_FOLDER = 'uploads' os.makedirs(UPLOAD_FOLDER, exist_ok=True) # Load Whisper model once (auto-detects language) model = whisper.load_model("large-v3") # use "large-v3" for best accuracy video_queue = Queue() jobs_status = {} # Track jobs by video_id @app.route('/') def index(): return render_template('index.html') @app.route('/upload', methods=['POST']) def upload_video(): if 'video' not in request.files: return jsonify({"error": "No video file provided"}), 400 video = request.files['video'] if video.filename == '': return jsonify({"error": "Empty filename"}), 400 # User settings subtitle_position = request.form.get("subtitle_position", "bottom") text_size = int(request.form.get("text_size", 48)) words_per_line = int(request.form.get("words_per_line", 5)) lines_per_display = int(request.form.get("lines_per_display", 1)) highlight_color = request.form.get("highlight_color", "#FFFFFF") # default white language = request.form.get("language", "auto") # chosen language # Map Hinglish properly if language.lower() in ("hinglish", "hi-roman", "romanized"): # Whisper doesn't produce Hinglish romanization directly; # best option is Hindi model output (Devanagari) — can transliterate later if needed. language = "hi" video_id = str(uuid.uuid4()) input_path = os.path.join(UPLOAD_FOLDER, f"{video_id}.mp4") output_path = os.path.join(UPLOAD_FOLDER, f"{video_id}_subtitled.mp4") ass_path = os.path.join(UPLOAD_FOLDER, f"{video_id}.ass") video.save(input_path) job = { "video_id": video_id, "input": input_path, "output": output_path, "ass": ass_path, "position": subtitle_position, "size": text_size, "words_per_line": words_per_line, "lines_per_display": lines_per_display, "color": highlight_color, "language": language, } jobs_status[video_id] = {"status": "queued", "language": language} video_queue.put(job) return jsonify({"video_id": video_id}), 200 @app.route('/result/') def result(video_id): return render_template('result.html', filename=video_id) @app.route('/status/') def status(video_id): info = jobs_status.get(video_id, {"status": "not_found"}) return jsonify(info) @app.route('/download/') def download(filename): return send_file(os.path.join(UPLOAD_FOLDER, filename), as_attachment=True) # ---------------- Helper functions ---------------- def hex_to_ass_color(hex_color): """Convert #RRGGBB -> &H00BBGGRR (ASS format).""" try: hex_color = hex_color.lstrip("#") if len(hex_color) != 6: raise ValueError("invalid") r, g, b = hex_color[0:2], hex_color[2:4], hex_color[4:6] return f"&H00{b}{g}{r}" except Exception: return "&H00FFFF00" # fallback yellow def escape_ass_text(text: str) -> str: """Clean up text for ASS.""" if text is None: return "" text = text.replace("\r", " ").strip() text = text.replace("\n", "\\N") text = text.replace("{", "").replace("}", "") return text def create_word_fallback_from_segment(seg): """If Whisper doesn't provide per-word timestamps, create fake words with even timing.""" text = seg.get("text", "").strip() if not text: return [] words = text.split() if not words: return [] seg_start = seg.get("start", 0.0) seg_end = seg.get("end", seg_start + 0.001) total_dur = max(seg_end - seg_start, 0.001) per_word = total_dur / len(words) out = [] for i, w in enumerate(words): s = seg_start + i * per_word e = s + per_word out.append({"word": w, "start": s, "end": e}) return out def format_ass_time(seconds): """ASS time format H:MM:SS.cc""" h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) cs = int(round((seconds - int(seconds)) * 100)) return f"{h}:{m:02d}:{s:02d}.{cs:02d}" def generate_karaoke_ass( segments, position, text_size, words_per_line=5, lines_per_display=1, color="#FFFF00" ): alignment_map = {"top": 8, "center": 5, "bottom": 2} margin_v_map = {"top": 100, "center": 350, "bottom": 100} alignment = alignment_map.get(position, 2) margin_v = margin_v_map.get(position, 100) ass_color = hex_to_ass_color(color) header = f"""[Script Info] Title: Karaoke ScriptType: v4.00+ PlayResX: 1280 PlayResY: 720 [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding Style: CustomStyle,Arial,{int(text_size)},{ass_color},&H00FFFFFF,&H00000000,&H64000000,-1,0,0,0,100,100,0,0,1,2,0,{alignment},0,0,{margin_v},1 [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ dialogues = "" display_lines = [] for seg in segments: if isinstance(seg.get("words"), list) and seg.get("words"): words = seg["words"] else: words = create_word_fallback_from_segment(seg) line_words = [] line_start = None for w in words: if "start" not in w or "end" not in w or not w.get("word"): continue if line_start is None: line_start = w["start"] duration_cs = int(round((w["end"] - w["start"]) * 100)) word_text = escape_ass_text(w["word"]) line_words.append(f"{{\\k{duration_cs}}}{word_text} ") if len(line_words) >= words_per_line or word_text.endswith((".", "!", "?", ",")): text = "".join(line_words).strip() display_lines.append((line_start, w["end"], text)) line_words = [] line_start = None if len(display_lines) >= lines_per_display: block_start = display_lines[0][0] block_end = display_lines[-1][1] block_text = "\\N".join([dl[2] for dl in display_lines]) dialogues += f"Dialogue: 0,{format_ass_time(block_start)},{format_ass_time(block_end)},CustomStyle,,0,0,0,,{block_text}\n" display_lines = [] if line_words: text = "".join(line_words).strip() last_end = words[-1]["end"] if words else seg.get("end", seg.get("start", 0)) display_lines.append((line_start or seg.get("start", 0), last_end, text)) if len(display_lines) >= lines_per_display: block_start = display_lines[0][0] block_end = display_lines[-1][1] block_text = "\\N".join([dl[2] for dl in display_lines]) dialogues += f"Dialogue: 0,{format_ass_time(block_start)},{format_ass_time(block_end)},CustomStyle,,0,0,0,,{block_text}\n" display_lines = [] if display_lines: block_start = display_lines[0][0] block_end = display_lines[-1][1] block_text = "\\N".join([dl[2] for dl in display_lines]) dialogues += f"Dialogue: 0,{format_ass_time(block_start)},{format_ass_time(block_end)},CustomStyle,,0,0,0,,{block_text}\n" return header + dialogues # ---------------- Gentle alignment integration ---------------- def format_time(seconds): td = timedelta(seconds=seconds) total = str(td) if "." in total: total = total[: total.index(".") + 3] # keep 2 decimal places if len(total.split(":")[0]) == 1: total = "0:" + total return total def generate_ass(words, words_per_line=5): header = """[Script Info] Title: Karaoke Lyrics ScriptType: v4.00+ PlayDepth: 0 [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding Style: Default,Arial,40,&H00FFFFFF,&H00FFFFFF,&H00000000,&H80000000,-1,0,0,0,100,100,0,0,1,2,0,2,10,10,10,1 [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ dialogues = "" line = [] start_time = None end_time = None word_count = 0 for word in words: if "start" not in word or "end" not in word: continue if start_time is None: start_time = word["start"] end_time = word["end"] duration = int((word["end"] - word["start"]) * 100) line.append(f"{{\\k{duration}}}{word['word']} ") word_count += 1 if ( word["word"].endswith((".", "!", "?", ",")) or word_count >= words_per_line ): start = format_time(start_time) end = format_time(end_time) text = "".join(line).strip() dialogues += f"Dialogue: 0,{start},{end},Default,,0,0,0,,{text}\n" line = [] start_time = None word_count = 0 if line: start = format_time(start_time) end = format_time(end_time) text = "".join(line).strip() dialogues += f"Dialogue: 0,{start},{end},Default,,0,0,0,,{text}\n" return header + dialogues def gentle_align(audio_file, transcript_file, output_ass="gentle_output.ass"): print("Aligning with Gentle...") with open(transcript_file, "r", encoding="utf-8") as f: lyrics = f.read() with open(audio_file, "rb") as audio: files = {"audio": audio, "transcript": (None, lyrics)} response = requests.post( "http://localhost:8765/transcriptions?async=false", files=files ) if response.status_code != 200: print("Gentle error:", response.text) return None aligned = response.json() words = aligned.get("words", []) ass_content = generate_ass(words) with open(output_ass, "w", encoding="utf-8") as f: f.write(ass_content) return output_ass # ---------------- Queue worker ---------------- def process_queue(): while True: job = video_queue.get() video_id = job["video_id"] try: jobs_status[video_id] = { "status": "processing", "model_used": None, "elapsed": None, "language": job["language"] } start_time = time.time() whisper_lang = None if job["language"] == "auto" else job["language"] try: result = model.transcribe( job["input"], language=whisper_lang, word_timestamps=True ) except TypeError: result = model.transcribe(job["input"], language=whisper_lang) detected_lang = result.get("language", "unknown") print(f"🌐 Detected language for {video_id}: {detected_lang}") jobs_status[video_id]["model_used"] = getattr(model, "name", "Whisper") segments = result.get("segments", []) if not segments and "words" in result: segments = [{ "start": 0.0, "end": result.get("duration", 0.0), "words": result["words"] }] ass_content = generate_karaoke_ass( segments, job["position"], job["size"], job["words_per_line"], job["lines_per_display"], job["color"] ) with open(job["ass"], "w", encoding="utf-8") as f: f.write(ass_content) transcript_file = os.path.join(UPLOAD_FOLDER, f"{video_id}.txt") if os.path.exists(transcript_file): gentle_align(job["input"], transcript_file, output_ass=job["ass"]) ffmpeg.input(job["input"]).output( job["output"], vf=f"ass={job['ass'].replace(os.sep, '/')}" ).run(overwrite_output=True) elapsed = round(time.time() - start_time, 2) jobs_status[video_id]["elapsed"] = elapsed jobs_status[video_id]["status"] = "done" except Exception as e: print("Error:", e) jobs_status[video_id] = { "status": "error", "model_used": None, "elapsed": None, "language": job["language"] } finally: video_queue.task_done() # Start worker thread threading.Thread(target=process_queue, daemon=True).start() if __name__ == '__main__': port = int(os.environ.get("PORT", 7860)) app.run(host="0.0.0.0", port=port, debug=False)