| import re |
|
|
| def srt_time_to_seconds(timestamp): |
| """Converts SRT timestamp (HH:MM:SS,mmm) to seconds""" |
| try: |
| time_part, ms_part = timestamp.split(",") |
| h, m, s = map(int, time_part.split(":")) |
| ms = int(ms_part) |
| return h * 3600 + m * 60 + s + ms / 1000.0 |
| except: |
| return 0.0 |
|
|
| def seconds_to_srt_time(seconds): |
| """Converts seconds to SRT timestamp (HH:MM:SS,mmm)""" |
| hours = int(seconds // 3600) |
| minutes = int((seconds % 3600) // 60) |
| secs = int(seconds % 60) |
| ms = int((seconds % 1) * 1000) |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}" |
|
|
| def shift_srt_timestamps(srt_content, offset_seconds): |
| """Shifts all timestamps in SRT content by offset_seconds""" |
| subs = parse_srt(srt_content) |
| if not subs: |
| return srt_content |
| |
| shifted_srt = "" |
| for i, sub in enumerate(subs, 1): |
| start = sub['start'] + offset_seconds |
| end = sub['end'] + offset_seconds |
| |
| |
| if start < 0: start = 0 |
| if end < 1e-3: end = 1e-3 |
| |
| start_str = seconds_to_srt_time(start) |
| end_str = seconds_to_srt_time(end) |
| |
| shifted_srt += f"{i}\n{start_str} --> {end_str}\n{sub['text']}\n\n" |
| |
| return shifted_srt.strip() |
|
|
| def parse_srt(srt_content): |
| """Parses SRT content into a list of dictionaries. Returns VALIDATED list.""" |
| pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!\d+\s*\n\d{1,2}:\d{2}).+\n?)*)", re.MULTILINE) |
| matches = pattern.findall(srt_content) |
| |
| subtitles = [] |
| for num, start, end, text in matches: |
| subtitles.append({ |
| 'start': srt_time_to_seconds(start.strip()), |
| 'end': srt_time_to_seconds(end.strip()), |
| 'text': text.strip() |
| }) |
| return subtitles |
|
|
| def format_text_lines(text, max_chars=42): |
| """Formats text into max 2 lines, balancing length or respecting max_chars""" |
| words = text.split() |
| if not words: |
| return "" |
| |
| |
| |
| FORCE_SPLIT_THRESHOLD = 30 |
| |
| if len(text) <= max_chars and len(text) <= FORCE_SPLIT_THRESHOLD: |
| return text |
| |
| |
| |
| best_split_idx = -1 |
| best_balance = float('inf') |
| |
| |
| for i in range(1, len(words)): |
| |
| line1 = " ".join(words[:i]) |
| line2 = " ".join(words[i:]) |
| |
| len1 = len(line1) |
| len2 = len(line2) |
| |
| |
| if len1 <= max_chars and len2 <= max_chars: |
| balance = abs(len2 - len1) |
| |
| if len2 >= len1: |
| balance -= 5 |
| |
| if balance < best_balance: |
| best_balance = balance |
| best_split_idx = i |
| |
| |
| if best_split_idx != -1: |
| |
| |
| if len(text) <= max_chars: |
| line1 = " ".join(words[:best_split_idx]) |
| line2 = " ".join(words[best_split_idx:]) |
| |
| |
| pass |
|
|
| line1 = " ".join(words[:best_split_idx]) |
| line2 = " ".join(words[best_split_idx:]) |
| return f"{line1}\n{line2}" |
| |
| |
| if len(text) <= max_chars: |
| return text |
| |
| |
| mid = len(words) // 2 |
| return " ".join(words[:mid]) + "\n" + " ".join(words[mid:]) |
|
|
| def fix_word_timing(words): |
| """ |
| Ensures words are sequential in time. |
| Strategy: |
| 1. If overlaps, prefer trimming the END of the previous word to preserve the START of the current word. |
| 2. Only delay the current word if the previous word would become too short or inverted. |
| 3. Ensure minimum duration for all words. |
| """ |
| if not words: return [] |
| |
| |
| |
| for i in range(1, len(words)): |
| prev = words[i-1] |
| curr = words[i] |
| |
| |
| if curr['start'] < prev['end']: |
| |
| |
| |
| |
| |
| new_prev_end = max(prev['start'], curr['start']) |
| |
| |
| |
| if new_prev_end <= prev['start'] + 0.01: |
| |
| curr['start'] = prev['end'] |
| else: |
| |
| prev['end'] = new_prev_end |
| |
| |
| if curr['end'] <= curr['start']: |
| curr['end'] = curr['start'] + 0.1 |
| |
| return words |
|
|
| def apply_netflix_style_filter(srt_content): |
| """ |
| Groups word-level subtitles into Netflix-style phrases. |
| Rules: |
| - Max 42 chars/line |
| - Max 2 lines |
| - Max duration 7s |
| - Merge words |
| """ |
| words = parse_srt(srt_content) |
| if not words: |
| return srt_content |
| |
| |
| words = fix_word_timing(words) |
|
|
| grouped_events = [] |
| current_group = [] |
| |
| MAX_CHARS_PER_LINE = 42 |
| MAX_LINES = 2 |
| MAX_TOTAL_CHARS = MAX_CHARS_PER_LINE * MAX_LINES |
| MAX_DURATION = 7.0 |
| MIN_GAP_FOR_SPLIT = 0.5 |
| |
| def get_group_text(group): |
| return " ".join(w['text'] for w in group) |
| |
| def get_group_duration(group): |
| if not group: return 0 |
| return group[-1]['end'] - group[0]['start'] |
|
|
| for i, word in enumerate(words): |
| if not current_group: |
| current_group.append(word) |
| continue |
| |
| last_word = current_group[-1] |
| |
| |
| gap = word['start'] - last_word['end'] |
| if gap > MIN_GAP_FOR_SPLIT: |
| grouped_events.append(current_group) |
| current_group = [word] |
| continue |
| |
| |
| current_text = get_group_text(current_group) |
| new_text_proj = current_text + " " + word['text'] |
| current_duration = last_word['end'] - current_group[0]['start'] |
| new_duration_proj = word['end'] - current_group[0]['start'] |
| |
| |
| |
| if len(new_text_proj) > MAX_CHARS_PER_LINE: |
| |
| |
| |
| |
| |
| is_long_enough_dur = current_duration > 1.0 |
| |
| |
| |
| |
| |
| |
| |
| is_too_huge = len(new_text_proj) > 70 |
| |
| |
| if is_long_enough_dur or is_too_huge: |
| grouped_events.append(current_group) |
| current_group = [word] |
| continue |
| |
| |
| |
| |
| if len(new_text_proj) > MAX_TOTAL_CHARS or new_duration_proj > MAX_DURATION: |
| grouped_events.append(current_group) |
| current_group = [word] |
| continue |
| |
| |
| |
| if re.search(r'[.!?]$', last_word['text']): |
| |
| if len(current_text) > 3: |
| grouped_events.append(current_group) |
| current_group = [word] |
| continue |
| |
| current_group.append(word) |
|
|
| if current_group: |
| grouped_events.append(current_group) |
| |
| |
| |
| |
| merged_events = [] |
| if grouped_events: |
| merged_events.append(grouped_events[0]) |
| |
| for i in range(1, len(grouped_events)): |
| prev_group = merged_events[-1] |
| curr_group = grouped_events[i] |
| |
| |
| |
| curr_text = get_group_text(curr_group) |
| is_orphan = len(curr_group) == 1 or len(curr_text) < 10 |
| |
| if is_orphan: |
| |
| gap = curr_group[0]['start'] - prev_group[-1]['end'] |
| |
| |
| |
| if gap < 1.0: |
| |
| |
| combined_text = get_group_text(prev_group + curr_group) |
| formatted = format_text_lines(combined_text, MAX_CHARS_PER_LINE) |
| lines = formatted.split('\n') |
| |
| |
| |
| |
| |
| |
| valid_merge = True |
| for line in lines: |
| if len(line) > MAX_CHARS_PER_LINE + 5: |
| valid_merge = False |
| break |
| |
| if valid_merge: |
| |
| prev_group.extend(curr_group) |
| continue |
|
|
| |
| merged_events.append(curr_group) |
|
|
| |
| output_srt = "" |
| for i, group in enumerate(merged_events, 1): |
| if not group: continue |
| |
| start_time = seconds_to_srt_time(group[0]['start']) |
| end_time = seconds_to_srt_time(group[-1]['end']) |
| |
| text = get_group_text(group) |
| formatted_text = format_text_lines(text, MAX_CHARS_PER_LINE) |
| |
| output_srt += f"{i}\n{start_time} --> {end_time}\n{formatted_text}\n\n" |
| |
| return output_srt.strip() |
|
|
| import subprocess |
| import shutil |
| import os |
|
|
| def process_audio_for_transcription(input_file: str, has_bg_music: bool = False, time_start: float = None, time_end: float = None) -> str: |
| """ |
| Process audio to maximize speech clarity. |
| |
| Args: |
| input_file: Path to input audio |
| has_bg_music: If True, uses Demucs to remove background music (slow). |
| If False, skips Demucs but applies voice enhancement filters (fast). |
| |
| Returns path to processed .mp3 file (vocals) |
| """ |
| |
| |
| output_dir = os.path.join("static", "processed") |
| os.makedirs(output_dir, exist_ok=True) |
| |
| input_filename = os.path.basename(input_file) |
| input_stem = os.path.splitext(input_filename)[0] |
| |
| |
| suffix = "" |
| if time_start is not None: suffix += f"_s{int(time_start)}" |
| if time_end is not None: suffix += f"_e{int(time_end)}" |
| |
| final_output = os.path.join(output_dir, f"{input_stem}{suffix}.processed.mp3") |
|
|
| ffmpeg_cmd = shutil.which("ffmpeg") |
| if not ffmpeg_cmd: |
| print("⚠️ FFmpeg não encontrado!") |
| return input_file |
|
|
| vocals_path = input_file |
| |
| |
| if has_bg_music: |
| print(f"🔊 [Demucs] Iniciando isolamento de voz via AI (has_bg_music=True)...") |
| demucs_output_dir = os.path.join("static", "separated") |
| os.makedirs(demucs_output_dir, exist_ok=True) |
| |
| |
| demucs_cmd = shutil.which("demucs") |
| if not demucs_cmd: |
| demucs_cmd = "demucs" |
|
|
| try: |
| model = "htdemucs" |
| command = [ |
| demucs_cmd, |
| "--two-stems=vocals", |
| "-n", model, |
| "-d", "cpu", |
| "--mp3", |
| "--mp3-bitrate", "128", |
| input_file, |
| "-o", demucs_output_dir |
| ] |
| |
| print(f"🔊 Executando Demucs...") |
| result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
| |
| if result.returncode == 0: |
| |
| |
| demucs_vocals = os.path.join(demucs_output_dir, model, input_stem, "vocals.mp3") |
| if os.path.exists(demucs_vocals): |
| print(f"✅ Demucs sucesso: {demucs_vocals}") |
| vocals_path = demucs_vocals |
| else: |
| print(f"⚠️ Erro no Demucs (Code {result.returncode}), continuando com audio original.") |
| |
| except Exception as e: |
| print(f"⚠️ Falha no Demucs: {e}") |
| |
| else: |
| print(f"⏩ [Demucs] Pulando remoção de música (has_bg_music=False).") |
|
|
| |
| print(f"🔊 [FFmpeg] Aplicando filtros de melhoria de voz...") |
| |
| |
| |
| filter_chain = ( |
| "highpass=f=100," |
| "afftdn=nr=10:nf=-50:tn=1," |
| "compand=attacks=0:points=-80/-90|-45/-25|-27/-9|0/-7:gain=5," |
| "equalizer=f=3000:width_type=h:width=1000:g=5," |
| "loudnorm" |
| ) |
|
|
| cmd_convert = [ |
| ffmpeg_cmd, "-y", |
| "-i", vocals_path, |
| ] |
| |
| |
| if time_start is not None: |
| cmd_convert.extend(["-ss", str(time_start)]) |
| if time_end is not None: |
| cmd_convert.extend(["-to", str(time_end)]) |
| |
| cmd_convert.extend([ |
| "-ac", "1", "-ar", "16000", |
| "-af", filter_chain, |
| "-c:a", "libmp3lame", "-q:a", "2", |
| final_output |
| ]) |
| |
| try: |
| subprocess.run(cmd_convert, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| |
| |
| if has_bg_music and "separated" in vocals_path: |
| try: |
| |
| song_folder = os.path.dirname(vocals_path) |
| shutil.rmtree(song_folder) |
| except: pass |
| |
| return final_output |
| |
| except Exception as e: |
| print(f"⚠️ Erro no FFmpeg: {e}") |
| return vocals_path |