subtitle-medium / app.py
Gagandeep12's picture
Update app.py
3cbd91e verified
import os
import uuid
import threading
import time
from datetime import timedelta
from queue import Queue
from flask import Flask, render_template, request, send_file, jsonify
import whisper
import ffmpeg
import requests
app = Flask(__name__)
UPLOAD_FOLDER = 'uploads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Load Whisper model once (auto-detects language)
model = whisper.load_model("large-v3") # use "large-v3" for best accuracy
video_queue = Queue()
jobs_status = {} # Track jobs by video_id
@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_video():
if 'video' not in request.files:
return jsonify({"error": "No video file provided"}), 400
video = request.files['video']
if video.filename == '':
return jsonify({"error": "Empty filename"}), 400
# User settings
subtitle_position = request.form.get("subtitle_position", "bottom")
text_size = int(request.form.get("text_size", 48))
words_per_line = int(request.form.get("words_per_line", 5))
lines_per_display = int(request.form.get("lines_per_display", 1))
highlight_color = request.form.get("highlight_color", "#FFFFFF") # default white
language = request.form.get("language", "auto") # chosen language
# Map Hinglish properly
if language.lower() in ("hinglish", "hi-roman", "romanized"):
# Whisper doesn't produce Hinglish romanization directly;
# best option is Hindi model output (Devanagari) — can transliterate later if needed.
language = "hi"
video_id = str(uuid.uuid4())
input_path = os.path.join(UPLOAD_FOLDER, f"{video_id}.mp4")
output_path = os.path.join(UPLOAD_FOLDER, f"{video_id}_subtitled.mp4")
ass_path = os.path.join(UPLOAD_FOLDER, f"{video_id}.ass")
video.save(input_path)
job = {
"video_id": video_id,
"input": input_path,
"output": output_path,
"ass": ass_path,
"position": subtitle_position,
"size": text_size,
"words_per_line": words_per_line,
"lines_per_display": lines_per_display,
"color": highlight_color,
"language": language,
}
jobs_status[video_id] = {"status": "queued", "language": language}
video_queue.put(job)
return jsonify({"video_id": video_id}), 200
@app.route('/result/<video_id>')
def result(video_id):
return render_template('result.html', filename=video_id)
@app.route('/status/<video_id>')
def status(video_id):
info = jobs_status.get(video_id, {"status": "not_found"})
return jsonify(info)
@app.route('/download/<filename>')
def download(filename):
return send_file(os.path.join(UPLOAD_FOLDER, filename), as_attachment=True)
# ---------------- Helper functions ----------------
def hex_to_ass_color(hex_color):
"""Convert #RRGGBB -> &H00BBGGRR (ASS format)."""
try:
hex_color = hex_color.lstrip("#")
if len(hex_color) != 6:
raise ValueError("invalid")
r, g, b = hex_color[0:2], hex_color[2:4], hex_color[4:6]
return f"&H00{b}{g}{r}"
except Exception:
return "&H00FFFF00" # fallback yellow
def escape_ass_text(text: str) -> str:
"""Clean up text for ASS."""
if text is None:
return ""
text = text.replace("\r", " ").strip()
text = text.replace("\n", "\\N")
text = text.replace("{", "").replace("}", "")
return text
def create_word_fallback_from_segment(seg):
"""If Whisper doesn't provide per-word timestamps, create fake words with even timing."""
text = seg.get("text", "").strip()
if not text:
return []
words = text.split()
if not words:
return []
seg_start = seg.get("start", 0.0)
seg_end = seg.get("end", seg_start + 0.001)
total_dur = max(seg_end - seg_start, 0.001)
per_word = total_dur / len(words)
out = []
for i, w in enumerate(words):
s = seg_start + i * per_word
e = s + per_word
out.append({"word": w, "start": s, "end": e})
return out
def format_ass_time(seconds):
"""ASS time format H:MM:SS.cc"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
cs = int(round((seconds - int(seconds)) * 100))
return f"{h}:{m:02d}:{s:02d}.{cs:02d}"
def generate_karaoke_ass(
segments,
position,
text_size,
words_per_line=5,
lines_per_display=1,
color="#FFFF00"
):
alignment_map = {"top": 8, "center": 5, "bottom": 2}
margin_v_map = {"top": 100, "center": 350, "bottom": 100}
alignment = alignment_map.get(position, 2)
margin_v = margin_v_map.get(position, 100)
ass_color = hex_to_ass_color(color)
header = f"""[Script Info]
Title: Karaoke
ScriptType: v4.00+
PlayResX: 1280
PlayResY: 720
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: CustomStyle,Arial,{int(text_size)},{ass_color},&H00FFFFFF,&H00000000,&H64000000,-1,0,0,0,100,100,0,0,1,2,0,{alignment},0,0,{margin_v},1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
dialogues = ""
display_lines = []
for seg in segments:
if isinstance(seg.get("words"), list) and seg.get("words"):
words = seg["words"]
else:
words = create_word_fallback_from_segment(seg)
line_words = []
line_start = None
for w in words:
if "start" not in w or "end" not in w or not w.get("word"):
continue
if line_start is None:
line_start = w["start"]
duration_cs = int(round((w["end"] - w["start"]) * 100))
word_text = escape_ass_text(w["word"])
line_words.append(f"{{\\k{duration_cs}}}{word_text} ")
if len(line_words) >= words_per_line or word_text.endswith((".", "!", "?", ",")):
text = "".join(line_words).strip()
display_lines.append((line_start, w["end"], text))
line_words = []
line_start = None
if len(display_lines) >= lines_per_display:
block_start = display_lines[0][0]
block_end = display_lines[-1][1]
block_text = "\\N".join([dl[2] for dl in display_lines])
dialogues += f"Dialogue: 0,{format_ass_time(block_start)},{format_ass_time(block_end)},CustomStyle,,0,0,0,,{block_text}\n"
display_lines = []
if line_words:
text = "".join(line_words).strip()
last_end = words[-1]["end"] if words else seg.get("end", seg.get("start", 0))
display_lines.append((line_start or seg.get("start", 0), last_end, text))
if len(display_lines) >= lines_per_display:
block_start = display_lines[0][0]
block_end = display_lines[-1][1]
block_text = "\\N".join([dl[2] for dl in display_lines])
dialogues += f"Dialogue: 0,{format_ass_time(block_start)},{format_ass_time(block_end)},CustomStyle,,0,0,0,,{block_text}\n"
display_lines = []
if display_lines:
block_start = display_lines[0][0]
block_end = display_lines[-1][1]
block_text = "\\N".join([dl[2] for dl in display_lines])
dialogues += f"Dialogue: 0,{format_ass_time(block_start)},{format_ass_time(block_end)},CustomStyle,,0,0,0,,{block_text}\n"
return header + dialogues
# ---------------- Gentle alignment integration ----------------
def format_time(seconds):
td = timedelta(seconds=seconds)
total = str(td)
if "." in total:
total = total[: total.index(".") + 3] # keep 2 decimal places
if len(total.split(":")[0]) == 1:
total = "0:" + total
return total
def generate_ass(words, words_per_line=5):
header = """[Script Info]
Title: Karaoke Lyrics
ScriptType: v4.00+
PlayDepth: 0
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,Arial,40,&H00FFFFFF,&H00FFFFFF,&H00000000,&H80000000,-1,0,0,0,100,100,0,0,1,2,0,2,10,10,10,1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
dialogues = ""
line = []
start_time = None
end_time = None
word_count = 0
for word in words:
if "start" not in word or "end" not in word:
continue
if start_time is None:
start_time = word["start"]
end_time = word["end"]
duration = int((word["end"] - word["start"]) * 100)
line.append(f"{{\\k{duration}}}{word['word']} ")
word_count += 1
if (
word["word"].endswith((".", "!", "?", ","))
or word_count >= words_per_line
):
start = format_time(start_time)
end = format_time(end_time)
text = "".join(line).strip()
dialogues += f"Dialogue: 0,{start},{end},Default,,0,0,0,,{text}\n"
line = []
start_time = None
word_count = 0
if line:
start = format_time(start_time)
end = format_time(end_time)
text = "".join(line).strip()
dialogues += f"Dialogue: 0,{start},{end},Default,,0,0,0,,{text}\n"
return header + dialogues
def gentle_align(audio_file, transcript_file, output_ass="gentle_output.ass"):
print("Aligning with Gentle...")
with open(transcript_file, "r", encoding="utf-8") as f:
lyrics = f.read()
with open(audio_file, "rb") as audio:
files = {"audio": audio, "transcript": (None, lyrics)}
response = requests.post(
"http://localhost:8765/transcriptions?async=false", files=files
)
if response.status_code != 200:
print("Gentle error:", response.text)
return None
aligned = response.json()
words = aligned.get("words", [])
ass_content = generate_ass(words)
with open(output_ass, "w", encoding="utf-8") as f:
f.write(ass_content)
return output_ass
# ---------------- Queue worker ----------------
def process_queue():
while True:
job = video_queue.get()
video_id = job["video_id"]
try:
jobs_status[video_id] = {
"status": "processing",
"model_used": None,
"elapsed": None,
"language": job["language"]
}
start_time = time.time()
whisper_lang = None if job["language"] == "auto" else job["language"]
try:
result = model.transcribe(
job["input"],
language=whisper_lang,
word_timestamps=True
)
except TypeError:
result = model.transcribe(job["input"], language=whisper_lang)
detected_lang = result.get("language", "unknown")
print(f"🌐 Detected language for {video_id}: {detected_lang}")
jobs_status[video_id]["model_used"] = getattr(model, "name", "Whisper")
segments = result.get("segments", [])
if not segments and "words" in result:
segments = [{
"start": 0.0,
"end": result.get("duration", 0.0),
"words": result["words"]
}]
ass_content = generate_karaoke_ass(
segments,
job["position"],
job["size"],
job["words_per_line"],
job["lines_per_display"],
job["color"]
)
with open(job["ass"], "w", encoding="utf-8") as f:
f.write(ass_content)
transcript_file = os.path.join(UPLOAD_FOLDER, f"{video_id}.txt")
if os.path.exists(transcript_file):
gentle_align(job["input"], transcript_file, output_ass=job["ass"])
ffmpeg.input(job["input"]).output(
job["output"],
vf=f"ass={job['ass'].replace(os.sep, '/')}"
).run(overwrite_output=True)
elapsed = round(time.time() - start_time, 2)
jobs_status[video_id]["elapsed"] = elapsed
jobs_status[video_id]["status"] = "done"
except Exception as e:
print("Error:", e)
jobs_status[video_id] = {
"status": "error",
"model_used": None,
"elapsed": None,
"language": job["language"]
}
finally:
video_queue.task_done()
# Start worker thread
threading.Thread(target=process_queue, daemon=True).start()
if __name__ == '__main__':
port = int(os.environ.get("PORT", 7860))
app.run(host="0.0.0.0", port=port, debug=False)