Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import threading | |
| import time | |
| from datetime import timedelta | |
| from queue import Queue | |
| from flask import Flask, render_template, request, send_file, jsonify | |
| import whisper | |
| import ffmpeg | |
| import requests | |
| app = Flask(__name__) | |
| UPLOAD_FOLDER = 'uploads' | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| # Load Whisper model once (auto-detects language) | |
| model = whisper.load_model("large-v3") # use "large-v3" for best accuracy | |
| video_queue = Queue() | |
| jobs_status = {} # Track jobs by video_id | |
| def index(): | |
| return render_template('index.html') | |
| def upload_video(): | |
| if 'video' not in request.files: | |
| return jsonify({"error": "No video file provided"}), 400 | |
| video = request.files['video'] | |
| if video.filename == '': | |
| return jsonify({"error": "Empty filename"}), 400 | |
| # User settings | |
| subtitle_position = request.form.get("subtitle_position", "bottom") | |
| text_size = int(request.form.get("text_size", 48)) | |
| words_per_line = int(request.form.get("words_per_line", 5)) | |
| lines_per_display = int(request.form.get("lines_per_display", 1)) | |
| highlight_color = request.form.get("highlight_color", "#FFFFFF") # default white | |
| language = request.form.get("language", "auto") # chosen language | |
| # Map Hinglish properly | |
| if language.lower() in ("hinglish", "hi-roman", "romanized"): | |
| # Whisper doesn't produce Hinglish romanization directly; | |
| # best option is Hindi model output (Devanagari) — can transliterate later if needed. | |
| language = "hi" | |
| video_id = str(uuid.uuid4()) | |
| input_path = os.path.join(UPLOAD_FOLDER, f"{video_id}.mp4") | |
| output_path = os.path.join(UPLOAD_FOLDER, f"{video_id}_subtitled.mp4") | |
| ass_path = os.path.join(UPLOAD_FOLDER, f"{video_id}.ass") | |
| video.save(input_path) | |
| job = { | |
| "video_id": video_id, | |
| "input": input_path, | |
| "output": output_path, | |
| "ass": ass_path, | |
| "position": subtitle_position, | |
| "size": text_size, | |
| "words_per_line": words_per_line, | |
| "lines_per_display": lines_per_display, | |
| "color": highlight_color, | |
| "language": language, | |
| } | |
| jobs_status[video_id] = {"status": "queued", "language": language} | |
| video_queue.put(job) | |
| return jsonify({"video_id": video_id}), 200 | |
| def result(video_id): | |
| return render_template('result.html', filename=video_id) | |
| def status(video_id): | |
| info = jobs_status.get(video_id, {"status": "not_found"}) | |
| return jsonify(info) | |
| def download(filename): | |
| return send_file(os.path.join(UPLOAD_FOLDER, filename), as_attachment=True) | |
| # ---------------- Helper functions ---------------- | |
| def hex_to_ass_color(hex_color): | |
| """Convert #RRGGBB -> &H00BBGGRR (ASS format).""" | |
| try: | |
| hex_color = hex_color.lstrip("#") | |
| if len(hex_color) != 6: | |
| raise ValueError("invalid") | |
| r, g, b = hex_color[0:2], hex_color[2:4], hex_color[4:6] | |
| return f"&H00{b}{g}{r}" | |
| except Exception: | |
| return "&H00FFFF00" # fallback yellow | |
| def escape_ass_text(text: str) -> str: | |
| """Clean up text for ASS.""" | |
| if text is None: | |
| return "" | |
| text = text.replace("\r", " ").strip() | |
| text = text.replace("\n", "\\N") | |
| text = text.replace("{", "").replace("}", "") | |
| return text | |
| def create_word_fallback_from_segment(seg): | |
| """If Whisper doesn't provide per-word timestamps, create fake words with even timing.""" | |
| text = seg.get("text", "").strip() | |
| if not text: | |
| return [] | |
| words = text.split() | |
| if not words: | |
| return [] | |
| seg_start = seg.get("start", 0.0) | |
| seg_end = seg.get("end", seg_start + 0.001) | |
| total_dur = max(seg_end - seg_start, 0.001) | |
| per_word = total_dur / len(words) | |
| out = [] | |
| for i, w in enumerate(words): | |
| s = seg_start + i * per_word | |
| e = s + per_word | |
| out.append({"word": w, "start": s, "end": e}) | |
| return out | |
| def format_ass_time(seconds): | |
| """ASS time format H:MM:SS.cc""" | |
| h = int(seconds // 3600) | |
| m = int((seconds % 3600) // 60) | |
| s = int(seconds % 60) | |
| cs = int(round((seconds - int(seconds)) * 100)) | |
| return f"{h}:{m:02d}:{s:02d}.{cs:02d}" | |
| def generate_karaoke_ass( | |
| segments, | |
| position, | |
| text_size, | |
| words_per_line=5, | |
| lines_per_display=1, | |
| color="#FFFF00" | |
| ): | |
| alignment_map = {"top": 8, "center": 5, "bottom": 2} | |
| margin_v_map = {"top": 100, "center": 350, "bottom": 100} | |
| alignment = alignment_map.get(position, 2) | |
| margin_v = margin_v_map.get(position, 100) | |
| ass_color = hex_to_ass_color(color) | |
| header = f"""[Script Info] | |
| Title: Karaoke | |
| ScriptType: v4.00+ | |
| PlayResX: 1280 | |
| PlayResY: 720 | |
| [V4+ Styles] | |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding | |
| Style: CustomStyle,Arial,{int(text_size)},{ass_color},&H00FFFFFF,&H00000000,&H64000000,-1,0,0,0,100,100,0,0,1,2,0,{alignment},0,0,{margin_v},1 | |
| [Events] | |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text | |
| """ | |
| dialogues = "" | |
| display_lines = [] | |
| for seg in segments: | |
| if isinstance(seg.get("words"), list) and seg.get("words"): | |
| words = seg["words"] | |
| else: | |
| words = create_word_fallback_from_segment(seg) | |
| line_words = [] | |
| line_start = None | |
| for w in words: | |
| if "start" not in w or "end" not in w or not w.get("word"): | |
| continue | |
| if line_start is None: | |
| line_start = w["start"] | |
| duration_cs = int(round((w["end"] - w["start"]) * 100)) | |
| word_text = escape_ass_text(w["word"]) | |
| line_words.append(f"{{\\k{duration_cs}}}{word_text} ") | |
| if len(line_words) >= words_per_line or word_text.endswith((".", "!", "?", ",")): | |
| text = "".join(line_words).strip() | |
| display_lines.append((line_start, w["end"], text)) | |
| line_words = [] | |
| line_start = None | |
| if len(display_lines) >= lines_per_display: | |
| block_start = display_lines[0][0] | |
| block_end = display_lines[-1][1] | |
| block_text = "\\N".join([dl[2] for dl in display_lines]) | |
| dialogues += f"Dialogue: 0,{format_ass_time(block_start)},{format_ass_time(block_end)},CustomStyle,,0,0,0,,{block_text}\n" | |
| display_lines = [] | |
| if line_words: | |
| text = "".join(line_words).strip() | |
| last_end = words[-1]["end"] if words else seg.get("end", seg.get("start", 0)) | |
| display_lines.append((line_start or seg.get("start", 0), last_end, text)) | |
| if len(display_lines) >= lines_per_display: | |
| block_start = display_lines[0][0] | |
| block_end = display_lines[-1][1] | |
| block_text = "\\N".join([dl[2] for dl in display_lines]) | |
| dialogues += f"Dialogue: 0,{format_ass_time(block_start)},{format_ass_time(block_end)},CustomStyle,,0,0,0,,{block_text}\n" | |
| display_lines = [] | |
| if display_lines: | |
| block_start = display_lines[0][0] | |
| block_end = display_lines[-1][1] | |
| block_text = "\\N".join([dl[2] for dl in display_lines]) | |
| dialogues += f"Dialogue: 0,{format_ass_time(block_start)},{format_ass_time(block_end)},CustomStyle,,0,0,0,,{block_text}\n" | |
| return header + dialogues | |
| # ---------------- Gentle alignment integration ---------------- | |
| def format_time(seconds): | |
| td = timedelta(seconds=seconds) | |
| total = str(td) | |
| if "." in total: | |
| total = total[: total.index(".") + 3] # keep 2 decimal places | |
| if len(total.split(":")[0]) == 1: | |
| total = "0:" + total | |
| return total | |
| def generate_ass(words, words_per_line=5): | |
| header = """[Script Info] | |
| Title: Karaoke Lyrics | |
| ScriptType: v4.00+ | |
| PlayDepth: 0 | |
| [V4+ Styles] | |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding | |
| Style: Default,Arial,40,&H00FFFFFF,&H00FFFFFF,&H00000000,&H80000000,-1,0,0,0,100,100,0,0,1,2,0,2,10,10,10,1 | |
| [Events] | |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text | |
| """ | |
| dialogues = "" | |
| line = [] | |
| start_time = None | |
| end_time = None | |
| word_count = 0 | |
| for word in words: | |
| if "start" not in word or "end" not in word: | |
| continue | |
| if start_time is None: | |
| start_time = word["start"] | |
| end_time = word["end"] | |
| duration = int((word["end"] - word["start"]) * 100) | |
| line.append(f"{{\\k{duration}}}{word['word']} ") | |
| word_count += 1 | |
| if ( | |
| word["word"].endswith((".", "!", "?", ",")) | |
| or word_count >= words_per_line | |
| ): | |
| start = format_time(start_time) | |
| end = format_time(end_time) | |
| text = "".join(line).strip() | |
| dialogues += f"Dialogue: 0,{start},{end},Default,,0,0,0,,{text}\n" | |
| line = [] | |
| start_time = None | |
| word_count = 0 | |
| if line: | |
| start = format_time(start_time) | |
| end = format_time(end_time) | |
| text = "".join(line).strip() | |
| dialogues += f"Dialogue: 0,{start},{end},Default,,0,0,0,,{text}\n" | |
| return header + dialogues | |
| def gentle_align(audio_file, transcript_file, output_ass="gentle_output.ass"): | |
| print("Aligning with Gentle...") | |
| with open(transcript_file, "r", encoding="utf-8") as f: | |
| lyrics = f.read() | |
| with open(audio_file, "rb") as audio: | |
| files = {"audio": audio, "transcript": (None, lyrics)} | |
| response = requests.post( | |
| "http://localhost:8765/transcriptions?async=false", files=files | |
| ) | |
| if response.status_code != 200: | |
| print("Gentle error:", response.text) | |
| return None | |
| aligned = response.json() | |
| words = aligned.get("words", []) | |
| ass_content = generate_ass(words) | |
| with open(output_ass, "w", encoding="utf-8") as f: | |
| f.write(ass_content) | |
| return output_ass | |
| # ---------------- Queue worker ---------------- | |
| def process_queue(): | |
| while True: | |
| job = video_queue.get() | |
| video_id = job["video_id"] | |
| try: | |
| jobs_status[video_id] = { | |
| "status": "processing", | |
| "model_used": None, | |
| "elapsed": None, | |
| "language": job["language"] | |
| } | |
| start_time = time.time() | |
| whisper_lang = None if job["language"] == "auto" else job["language"] | |
| try: | |
| result = model.transcribe( | |
| job["input"], | |
| language=whisper_lang, | |
| word_timestamps=True | |
| ) | |
| except TypeError: | |
| result = model.transcribe(job["input"], language=whisper_lang) | |
| detected_lang = result.get("language", "unknown") | |
| print(f"🌐 Detected language for {video_id}: {detected_lang}") | |
| jobs_status[video_id]["model_used"] = getattr(model, "name", "Whisper") | |
| segments = result.get("segments", []) | |
| if not segments and "words" in result: | |
| segments = [{ | |
| "start": 0.0, | |
| "end": result.get("duration", 0.0), | |
| "words": result["words"] | |
| }] | |
| ass_content = generate_karaoke_ass( | |
| segments, | |
| job["position"], | |
| job["size"], | |
| job["words_per_line"], | |
| job["lines_per_display"], | |
| job["color"] | |
| ) | |
| with open(job["ass"], "w", encoding="utf-8") as f: | |
| f.write(ass_content) | |
| transcript_file = os.path.join(UPLOAD_FOLDER, f"{video_id}.txt") | |
| if os.path.exists(transcript_file): | |
| gentle_align(job["input"], transcript_file, output_ass=job["ass"]) | |
| ffmpeg.input(job["input"]).output( | |
| job["output"], | |
| vf=f"ass={job['ass'].replace(os.sep, '/')}" | |
| ).run(overwrite_output=True) | |
| elapsed = round(time.time() - start_time, 2) | |
| jobs_status[video_id]["elapsed"] = elapsed | |
| jobs_status[video_id]["status"] = "done" | |
| except Exception as e: | |
| print("Error:", e) | |
| jobs_status[video_id] = { | |
| "status": "error", | |
| "model_used": None, | |
| "elapsed": None, | |
| "language": job["language"] | |
| } | |
| finally: | |
| video_queue.task_done() | |
| # Start worker thread | |
| threading.Thread(target=process_queue, daemon=True).start() | |
| if __name__ == '__main__': | |
| port = int(os.environ.get("PORT", 7860)) | |
| app.run(host="0.0.0.0", port=port, debug=False) | |