|
|
import re |
|
|
|
|
|
def srt_time_to_seconds(timestamp): |
|
|
"""Converts SRT timestamp (HH:MM:SS,mmm) to seconds""" |
|
|
try: |
|
|
time_part, ms_part = timestamp.split(",") |
|
|
h, m, s = map(int, time_part.split(":")) |
|
|
ms = int(ms_part) |
|
|
return h * 3600 + m * 60 + s + ms / 1000.0 |
|
|
except: |
|
|
return 0.0 |
|
|
|
|
|
def seconds_to_srt_time(seconds): |
|
|
"""Converts seconds to SRT timestamp (HH:MM:SS,mmm)""" |
|
|
hours = int(seconds // 3600) |
|
|
minutes = int((seconds % 3600) // 60) |
|
|
secs = int(seconds % 60) |
|
|
ms = int((seconds % 1) * 1000) |
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}" |
|
|
|
|
|
def shift_srt_timestamps(srt_content, offset_seconds): |
|
|
"""Shifts all timestamps in SRT content by offset_seconds""" |
|
|
subs = parse_srt(srt_content) |
|
|
if not subs: |
|
|
return srt_content |
|
|
|
|
|
shifted_srt = "" |
|
|
for i, sub in enumerate(subs, 1): |
|
|
start = sub['start'] + offset_seconds |
|
|
end = sub['end'] + offset_seconds |
|
|
|
|
|
|
|
|
if start < 0: start = 0 |
|
|
if end < 1e-3: end = 1e-3 |
|
|
|
|
|
start_str = seconds_to_srt_time(start) |
|
|
end_str = seconds_to_srt_time(end) |
|
|
|
|
|
shifted_srt += f"{i}\n{start_str} --> {end_str}\n{sub['text']}\n\n" |
|
|
|
|
|
return shifted_srt.strip() |
|
|
|
|
|
def parse_srt(srt_content): |
|
|
"""Parses SRT content into a list of dictionaries. Returns VALIDATED list.""" |
|
|
pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!\d+\s*\n\d{1,2}:\d{2}).+\n?)*)", re.MULTILINE) |
|
|
matches = pattern.findall(srt_content) |
|
|
|
|
|
subtitles = [] |
|
|
for num, start, end, text in matches: |
|
|
subtitles.append({ |
|
|
'start': srt_time_to_seconds(start.strip()), |
|
|
'end': srt_time_to_seconds(end.strip()), |
|
|
'text': text.strip() |
|
|
}) |
|
|
return subtitles |
|
|
|
|
|
def format_text_lines(text, max_chars=42): |
|
|
"""Formats text into max 2 lines, balancing length or respecting max_chars""" |
|
|
words = text.split() |
|
|
if not words: |
|
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
FORCE_SPLIT_THRESHOLD = 30 |
|
|
|
|
|
if len(text) <= max_chars and len(text) <= FORCE_SPLIT_THRESHOLD: |
|
|
return text |
|
|
|
|
|
|
|
|
|
|
|
best_split_idx = -1 |
|
|
best_balance = float('inf') |
|
|
|
|
|
|
|
|
for i in range(1, len(words)): |
|
|
|
|
|
line1 = " ".join(words[:i]) |
|
|
line2 = " ".join(words[i:]) |
|
|
|
|
|
len1 = len(line1) |
|
|
len2 = len(line2) |
|
|
|
|
|
|
|
|
if len1 <= max_chars and len2 <= max_chars: |
|
|
balance = abs(len2 - len1) |
|
|
|
|
|
if len2 >= len1: |
|
|
balance -= 5 |
|
|
|
|
|
if balance < best_balance: |
|
|
best_balance = balance |
|
|
best_split_idx = i |
|
|
|
|
|
|
|
|
if best_split_idx != -1: |
|
|
|
|
|
|
|
|
if len(text) <= max_chars: |
|
|
line1 = " ".join(words[:best_split_idx]) |
|
|
line2 = " ".join(words[best_split_idx:]) |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
line1 = " ".join(words[:best_split_idx]) |
|
|
line2 = " ".join(words[best_split_idx:]) |
|
|
return f"{line1}\n{line2}" |
|
|
|
|
|
|
|
|
if len(text) <= max_chars: |
|
|
return text |
|
|
|
|
|
|
|
|
mid = len(words) // 2 |
|
|
return " ".join(words[:mid]) + "\n" + " ".join(words[mid:]) |
|
|
|
|
|
def fix_word_timing(words): |
|
|
""" |
|
|
Ensures words are sequential in time. |
|
|
Strategy: |
|
|
1. If overlaps, prefer trimming the END of the previous word to preserve the START of the current word. |
|
|
2. Only delay the current word if the previous word would become too short or inverted. |
|
|
3. Ensure minimum duration for all words. |
|
|
""" |
|
|
if not words: return [] |
|
|
|
|
|
|
|
|
|
|
|
for i in range(1, len(words)): |
|
|
prev = words[i-1] |
|
|
curr = words[i] |
|
|
|
|
|
|
|
|
if curr['start'] < prev['end']: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
new_prev_end = max(prev['start'], curr['start']) |
|
|
|
|
|
|
|
|
|
|
|
if new_prev_end <= prev['start'] + 0.01: |
|
|
|
|
|
curr['start'] = prev['end'] |
|
|
else: |
|
|
|
|
|
prev['end'] = new_prev_end |
|
|
|
|
|
|
|
|
if curr['end'] <= curr['start']: |
|
|
curr['end'] = curr['start'] + 0.1 |
|
|
|
|
|
return words |
|
|
|
|
|
def apply_netflix_style_filter(srt_content): |
|
|
""" |
|
|
Groups word-level subtitles into Netflix-style phrases. |
|
|
Rules: |
|
|
- Max 42 chars/line |
|
|
- Max 2 lines |
|
|
- Max duration 7s |
|
|
- Merge words |
|
|
""" |
|
|
words = parse_srt(srt_content) |
|
|
if not words: |
|
|
return srt_content |
|
|
|
|
|
|
|
|
words = fix_word_timing(words) |
|
|
|
|
|
grouped_events = [] |
|
|
current_group = [] |
|
|
|
|
|
MAX_CHARS_PER_LINE = 42 |
|
|
MAX_LINES = 2 |
|
|
MAX_TOTAL_CHARS = MAX_CHARS_PER_LINE * MAX_LINES |
|
|
MAX_DURATION = 7.0 |
|
|
MIN_GAP_FOR_SPLIT = 0.5 |
|
|
|
|
|
def get_group_text(group): |
|
|
return " ".join(w['text'] for w in group) |
|
|
|
|
|
def get_group_duration(group): |
|
|
if not group: return 0 |
|
|
return group[-1]['end'] - group[0]['start'] |
|
|
|
|
|
for i, word in enumerate(words): |
|
|
if not current_group: |
|
|
current_group.append(word) |
|
|
continue |
|
|
|
|
|
last_word = current_group[-1] |
|
|
|
|
|
|
|
|
gap = word['start'] - last_word['end'] |
|
|
if gap > MIN_GAP_FOR_SPLIT: |
|
|
grouped_events.append(current_group) |
|
|
current_group = [word] |
|
|
continue |
|
|
|
|
|
|
|
|
current_text = get_group_text(current_group) |
|
|
new_text_proj = current_text + " " + word['text'] |
|
|
current_duration = last_word['end'] - current_group[0]['start'] |
|
|
new_duration_proj = word['end'] - current_group[0]['start'] |
|
|
|
|
|
|
|
|
|
|
|
if len(new_text_proj) > MAX_CHARS_PER_LINE: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_long_enough_dur = current_duration > 1.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_too_huge = len(new_text_proj) > 70 |
|
|
|
|
|
|
|
|
if is_long_enough_dur or is_too_huge: |
|
|
grouped_events.append(current_group) |
|
|
current_group = [word] |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(new_text_proj) > MAX_TOTAL_CHARS or new_duration_proj > MAX_DURATION: |
|
|
grouped_events.append(current_group) |
|
|
current_group = [word] |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
if re.search(r'[.!?]$', last_word['text']): |
|
|
|
|
|
if len(current_text) > 3: |
|
|
grouped_events.append(current_group) |
|
|
current_group = [word] |
|
|
continue |
|
|
|
|
|
current_group.append(word) |
|
|
|
|
|
if current_group: |
|
|
grouped_events.append(current_group) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
merged_events = [] |
|
|
if grouped_events: |
|
|
merged_events.append(grouped_events[0]) |
|
|
|
|
|
for i in range(1, len(grouped_events)): |
|
|
prev_group = merged_events[-1] |
|
|
curr_group = grouped_events[i] |
|
|
|
|
|
|
|
|
|
|
|
curr_text = get_group_text(curr_group) |
|
|
is_orphan = len(curr_group) == 1 or len(curr_text) < 10 |
|
|
|
|
|
if is_orphan: |
|
|
|
|
|
gap = curr_group[0]['start'] - prev_group[-1]['end'] |
|
|
|
|
|
|
|
|
|
|
|
if gap < 1.0: |
|
|
|
|
|
|
|
|
combined_text = get_group_text(prev_group + curr_group) |
|
|
formatted = format_text_lines(combined_text, MAX_CHARS_PER_LINE) |
|
|
lines = formatted.split('\n') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
valid_merge = True |
|
|
for line in lines: |
|
|
if len(line) > MAX_CHARS_PER_LINE + 5: |
|
|
valid_merge = False |
|
|
break |
|
|
|
|
|
if valid_merge: |
|
|
|
|
|
prev_group.extend(curr_group) |
|
|
continue |
|
|
|
|
|
|
|
|
merged_events.append(curr_group) |
|
|
|
|
|
|
|
|
output_srt = "" |
|
|
for i, group in enumerate(merged_events, 1): |
|
|
if not group: continue |
|
|
|
|
|
start_time = seconds_to_srt_time(group[0]['start']) |
|
|
end_time = seconds_to_srt_time(group[-1]['end']) |
|
|
|
|
|
text = get_group_text(group) |
|
|
formatted_text = format_text_lines(text, MAX_CHARS_PER_LINE) |
|
|
|
|
|
output_srt += f"{i}\n{start_time} --> {end_time}\n{formatted_text}\n\n" |
|
|
|
|
|
return output_srt.strip() |
|
|
|
|
|
import subprocess |
|
|
import shutil |
|
|
import os |
|
|
|
|
|
def process_audio_for_transcription(input_file: str, has_bg_music: bool = False, time_start: float = None, time_end: float = None) -> str: |
|
|
""" |
|
|
Process audio to maximize speech clarity. |
|
|
|
|
|
Args: |
|
|
input_file: Path to input audio |
|
|
has_bg_music: If True, uses Demucs to remove background music (slow). |
|
|
If False, skips Demucs but applies voice enhancement filters (fast). |
|
|
|
|
|
Returns path to processed .mp3 file (vocals) |
|
|
""" |
|
|
|
|
|
|
|
|
output_dir = os.path.join("static", "processed") |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
input_filename = os.path.basename(input_file) |
|
|
input_stem = os.path.splitext(input_filename)[0] |
|
|
|
|
|
|
|
|
suffix = "" |
|
|
if time_start is not None: suffix += f"_s{int(time_start)}" |
|
|
if time_end is not None: suffix += f"_e{int(time_end)}" |
|
|
|
|
|
final_output = os.path.join(output_dir, f"{input_stem}{suffix}.processed.mp3") |
|
|
|
|
|
ffmpeg_cmd = shutil.which("ffmpeg") |
|
|
if not ffmpeg_cmd: |
|
|
print("⚠️ FFmpeg não encontrado!") |
|
|
return input_file |
|
|
|
|
|
vocals_path = input_file |
|
|
|
|
|
|
|
|
if has_bg_music: |
|
|
print(f"🔊 [Demucs] Iniciando isolamento de voz via AI (has_bg_music=True)...") |
|
|
demucs_output_dir = os.path.join("static", "separated") |
|
|
os.makedirs(demucs_output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
demucs_cmd = shutil.which("demucs") |
|
|
if not demucs_cmd: |
|
|
demucs_cmd = "demucs" |
|
|
|
|
|
try: |
|
|
model = "htdemucs" |
|
|
command = [ |
|
|
demucs_cmd, |
|
|
"--two-stems=vocals", |
|
|
"-n", model, |
|
|
"-d", "cpu", |
|
|
"--mp3", |
|
|
"--mp3-bitrate", "128", |
|
|
input_file, |
|
|
"-o", demucs_output_dir |
|
|
] |
|
|
|
|
|
print(f"🔊 Executando Demucs...") |
|
|
result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
|
|
|
|
|
if result.returncode == 0: |
|
|
|
|
|
|
|
|
demucs_vocals = os.path.join(demucs_output_dir, model, input_stem, "vocals.mp3") |
|
|
if os.path.exists(demucs_vocals): |
|
|
print(f"✅ Demucs sucesso: {demucs_vocals}") |
|
|
vocals_path = demucs_vocals |
|
|
else: |
|
|
print(f"⚠️ Erro no Demucs (Code {result.returncode}), continuando com audio original.") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ Falha no Demucs: {e}") |
|
|
|
|
|
else: |
|
|
print(f"⏩ [Demucs] Pulando remoção de música (has_bg_music=False).") |
|
|
|
|
|
|
|
|
print(f"🔊 [FFmpeg] Aplicando filtros de melhoria de voz...") |
|
|
|
|
|
|
|
|
|
|
|
filter_chain = ( |
|
|
"highpass=f=100," |
|
|
"afftdn=nr=10:nf=-50:tn=1," |
|
|
"compand=attacks=0:points=-80/-90|-45/-25|-27/-9|0/-7:gain=5," |
|
|
"equalizer=f=3000:width_type=h:width=1000:g=5," |
|
|
"loudnorm" |
|
|
) |
|
|
|
|
|
cmd_convert = [ |
|
|
ffmpeg_cmd, "-y", |
|
|
"-i", vocals_path, |
|
|
] |
|
|
|
|
|
|
|
|
if time_start is not None: |
|
|
cmd_convert.extend(["-ss", str(time_start)]) |
|
|
if time_end is not None: |
|
|
cmd_convert.extend(["-to", str(time_end)]) |
|
|
|
|
|
cmd_convert.extend([ |
|
|
"-ac", "1", "-ar", "16000", |
|
|
"-af", filter_chain, |
|
|
"-c:a", "libmp3lame", "-q:a", "2", |
|
|
final_output |
|
|
]) |
|
|
|
|
|
try: |
|
|
subprocess.run(cmd_convert, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
|
|
|
|
|
if has_bg_music and "separated" in vocals_path: |
|
|
try: |
|
|
|
|
|
song_folder = os.path.dirname(vocals_path) |
|
|
shutil.rmtree(song_folder) |
|
|
except: pass |
|
|
|
|
|
return final_output |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ Erro no FFmpeg: {e}") |
|
|
return vocals_path |