habulaj commited on
Commit
7463c33
·
verified ·
1 Parent(s): da37cf5

Create srt_utils.py

Browse files
Files changed (1) hide show
  1. srt_utils.py +445 -0
srt_utils.py ADDED
@@ -0,0 +1,445 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+
3
+ def srt_time_to_seconds(timestamp):
4
+ """Converts SRT timestamp (HH:MM:SS,mmm) to seconds"""
5
+ try:
6
+ time_part, ms_part = timestamp.split(",")
7
+ h, m, s = map(int, time_part.split(":"))
8
+ ms = int(ms_part)
9
+ return h * 3600 + m * 60 + s + ms / 1000.0
10
+ except:
11
+ return 0.0
12
+
13
+ def seconds_to_srt_time(seconds):
14
+ """Converts seconds to SRT timestamp (HH:MM:SS,mmm)"""
15
+ hours = int(seconds // 3600)
16
+ minutes = int((seconds % 3600) // 60)
17
+ secs = int(seconds % 60)
18
+ ms = int((seconds % 1) * 1000)
19
+ return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}"
20
+
21
+ def shift_srt_timestamps(srt_content, offset_seconds):
22
+ """Shifts all timestamps in SRT content by offset_seconds"""
23
+ subs = parse_srt(srt_content)
24
+ if not subs:
25
+ return srt_content
26
+
27
+ shifted_srt = ""
28
+ for i, sub in enumerate(subs, 1):
29
+ start = sub['start'] + offset_seconds
30
+ end = sub['end'] + offset_seconds
31
+
32
+ # Ensure non-negative
33
+ if start < 0: start = 0
34
+ if end < 1e-3: end = 1e-3 # avoid 0 overlap logic issues if possible
35
+
36
+ start_str = seconds_to_srt_time(start)
37
+ end_str = seconds_to_srt_time(end)
38
+
39
+ shifted_srt += f"{i}\n{start_str} --> {end_str}\n{sub['text']}\n\n"
40
+
41
+ return shifted_srt.strip()
42
+
43
+ def parse_srt(srt_content):
44
+ """Parses SRT content into a list of dictionaries. Returns VALIDATED list."""
45
+ pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!\d+\s*\n\d{1,2}:\d{2}).+\n?)*)", re.MULTILINE)
46
+ matches = pattern.findall(srt_content)
47
+
48
+ subtitles = []
49
+ for num, start, end, text in matches:
50
+ subtitles.append({
51
+ 'start': srt_time_to_seconds(start.strip()),
52
+ 'end': srt_time_to_seconds(end.strip()),
53
+ 'text': text.strip()
54
+ })
55
+ return subtitles
56
+
57
+ def format_text_lines(text, max_chars=42):
58
+ """Formats text into max 2 lines, balancing length or respecting max_chars"""
59
+ words = text.split()
60
+ if not words:
61
+ return ""
62
+
63
+ # If fits in one line, but we might WANT to split if it's long (> 30 chars) for better reading (pyramid shape)
64
+ # The user complained about 42 chars being too long for one line.
65
+ FORCE_SPLIT_THRESHOLD = 30
66
+
67
+ if len(text) <= max_chars and len(text) <= FORCE_SPLIT_THRESHOLD:
68
+ return text
69
+
70
+ # Needs splitting (or we want to try splitting)
71
+ # Simple split strategy: find middle space
72
+ best_split_idx = -1
73
+ best_balance = float('inf')
74
+
75
+ # Try splitting at each word
76
+ for i in range(1, len(words)):
77
+ # Construct line 1 and line 2
78
+ line1 = " ".join(words[:i])
79
+ line2 = " ".join(words[i:])
80
+
81
+ len1 = len(line1)
82
+ len2 = len(line2)
83
+
84
+ # Valid split? Only if both fit max_chars
85
+ if len1 <= max_chars and len2 <= max_chars:
86
+ balance = abs(len2 - len1)
87
+ # Bonus for bottom heavy (line2 >= line1) which looks better often (pyramid)
88
+ if len2 >= len1:
89
+ balance -= 5
90
+
91
+ if balance < best_balance:
92
+ best_balance = balance
93
+ best_split_idx = i
94
+
95
+ # If we found a valid split
96
+ if best_split_idx != -1:
97
+ # If the original text fit in one line (< max_chars), only use the split if it's reasonably balanced.
98
+ # If the split results in a tiny orphan like "I\nam going", stick to 1 line if possible.
99
+ if len(text) <= max_chars:
100
+ line1 = " ".join(words[:best_split_idx])
101
+ line2 = " ".join(words[best_split_idx:])
102
+ # If one line is very short relative to the other, maybe don't split?
103
+ # But user wants "Netflix style", usually balanced.
104
+ pass
105
+
106
+ line1 = " ".join(words[:best_split_idx])
107
+ line2 = " ".join(words[best_split_idx:])
108
+ return f"{line1}\n{line2}"
109
+
110
+ # Fallback: if no valid split found (e.g. words too long), but whole text fits in max_chars
111
+ if len(text) <= max_chars:
112
+ return text
113
+
114
+ # Fallback 2: Really long text, just split in middle
115
+ mid = len(words) // 2
116
+ return " ".join(words[:mid]) + "\n" + " ".join(words[mid:])
117
+
118
+ def fix_word_timing(words):
119
+ """
120
+ Ensures words are sequential in time.
121
+ Strategy:
122
+ 1. If overlaps, prefer trimming the END of the previous word to preserve the START of the current word.
123
+ 2. Only delay the current word if the previous word would become too short or inverted.
124
+ 3. Ensure minimum duration for all words.
125
+ """
126
+ if not words: return []
127
+
128
+ # We edit in place / return modified list
129
+
130
+ for i in range(1, len(words)):
131
+ prev = words[i-1]
132
+ curr = words[i]
133
+
134
+ # Check for overlap
135
+ if curr['start'] < prev['end']:
136
+ # Overlap detected.
137
+ # Try to trim prev['end'] to match curr['start']
138
+
139
+ # Check if trimming leaves prev with enough time? (e.g. > 0s)
140
+ # Actually, standard logic: just clamp prev end.
141
+ new_prev_end = max(prev['start'], curr['start'])
142
+
143
+ # If trimming makes it zero/negative (meaning curr starts BEFORE prev starts),
144
+ # then we adhere to sequential text order implies we MUST delay curr.
145
+ if new_prev_end <= prev['start'] + 0.01:
146
+ # Impossible to trim prev enough. Push curr.
147
+ curr['start'] = prev['end']
148
+ else:
149
+ # Trim prev
150
+ prev['end'] = new_prev_end
151
+
152
+ # Ensure curr has valid duration
153
+ if curr['end'] <= curr['start']:
154
+ curr['end'] = curr['start'] + 0.1 # Minimum duration 100ms
155
+
156
+ return words
157
+
158
+ def apply_netflix_style_filter(srt_content):
159
+ """
160
+ Groups word-level subtitles into Netflix-style phrases.
161
+ Rules:
162
+ - Max 42 chars/line
163
+ - Max 2 lines
164
+ - Max duration 7s
165
+ - Merge words
166
+ """
167
+ words = parse_srt(srt_content)
168
+ if not words:
169
+ return srt_content
170
+
171
+ # FIX TIMING ISSUES FIRST
172
+ words = fix_word_timing(words)
173
+
174
+ grouped_events = []
175
+ current_group = []
176
+
177
+ MAX_CHARS_PER_LINE = 42
178
+ MAX_LINES = 2
179
+ MAX_TOTAL_CHARS = MAX_CHARS_PER_LINE * MAX_LINES
180
+ MAX_DURATION = 7.0
181
+ MIN_GAP_FOR_SPLIT = 0.5 # seconds
182
+
183
+ def get_group_text(group):
184
+ return " ".join(w['text'] for w in group)
185
+
186
+ def get_group_duration(group):
187
+ if not group: return 0
188
+ return group[-1]['end'] - group[0]['start']
189
+
190
+ for i, word in enumerate(words):
191
+ if not current_group:
192
+ current_group.append(word)
193
+ continue
194
+
195
+ last_word = current_group[-1]
196
+
197
+ # 1. Check for Silence (Gap)
198
+ gap = word['start'] - last_word['end']
199
+ if gap > MIN_GAP_FOR_SPLIT:
200
+ grouped_events.append(current_group)
201
+ current_group = [word]
202
+ continue
203
+
204
+ # 2. Check Limits (Length & Duration)
205
+ current_text = get_group_text(current_group)
206
+ new_text_proj = current_text + " " + word['text']
207
+ current_duration = last_word['end'] - current_group[0]['start']
208
+ new_duration_proj = word['end'] - current_group[0]['start']
209
+
210
+ # New Logic: Prefer single lines
211
+ # If adding the word exceeds 42 chars (MAX_CHARS_PER_LINE)
212
+ if len(new_text_proj) > MAX_CHARS_PER_LINE:
213
+ # We are crossing the single line boundary.
214
+ # Check if we SHOULD split now or allow 2 lines.
215
+
216
+ # Reasons to split (make a new subtitle):
217
+ # A. Current subtitle is already "long enough" in duration (> 1s)
218
+ is_long_enough_dur = current_duration > 1.0
219
+
220
+ # B. Current subtitle is a complete sentence?
221
+ # (Handled by step 3, but this is size check)
222
+
223
+ # C. The projected text is HUGE (e.g. > 70 chars).
224
+ # Netflix allows up to 84 (2 lines), but user wants "separation".
225
+ # Let's cap at something smaller for 2 lines, e.g. 70.
226
+ is_too_huge = len(new_text_proj) > 70
227
+
228
+ # If it's long enough duration OR becoming huge -> BREAK
229
+ if is_long_enough_dur or is_too_huge:
230
+ grouped_events.append(current_group)
231
+ current_group = [word]
232
+ continue
233
+
234
+ # Otherwise, allow merging into 2nd line (e.g. fast speech, short duration)
235
+
236
+ # Check absolute absolute URL limit (MAX_TOTAL_CHARS) just in case
237
+ if len(new_text_proj) > MAX_TOTAL_CHARS or new_duration_proj > MAX_DURATION:
238
+ grouped_events.append(current_group)
239
+ current_group = [word]
240
+ continue
241
+
242
+ # 3. Check Sentence Endings (CRITICAL)
243
+ # If previous word was a sentence end, ALWAYS split, unless current group is tiny (<15 chars)
244
+ if re.search(r'[.!?]$', last_word['text']):
245
+ # Exception: "No." (Very short). "again." (6 chars) will break.
246
+ if len(current_text) > 3:
247
+ grouped_events.append(current_group)
248
+ current_group = [word]
249
+ continue
250
+
251
+ current_group.append(word)
252
+
253
+ if current_group:
254
+ grouped_events.append(current_group)
255
+
256
+ # --- POST-PROCESSING: Merge Orphans ---
257
+ # Attempt to merge single/short words into previous block if they are close
258
+
259
+ merged_events = []
260
+ if grouped_events:
261
+ merged_events.append(grouped_events[0])
262
+
263
+ for i in range(1, len(grouped_events)):
264
+ prev_group = merged_events[-1]
265
+ curr_group = grouped_events[i]
266
+
267
+ # Check if current group is "orphan-candidate"
268
+ # Criteria: 1 word OR very short text (< 10 chars)
269
+ curr_text = get_group_text(curr_group)
270
+ is_orphan = len(curr_group) == 1 or len(curr_text) < 10
271
+
272
+ if is_orphan:
273
+ # Check gap
274
+ gap = curr_group[0]['start'] - prev_group[-1]['end']
275
+
276
+ # If gap is small enough (user said "very close")
277
+ # Let's say < 1.0s is reasonably close for a "continuation"
278
+ if gap < 1.0:
279
+ # Check if merging breaks limits
280
+ # We need to simulate the merge
281
+ combined_text = get_group_text(prev_group + curr_group)
282
+ formatted = format_text_lines(combined_text, MAX_CHARS_PER_LINE)
283
+ lines = formatted.split('\n')
284
+
285
+ # Valid if max 2 lines and lines aren't too long (format_text_lines handles length balancing,
286
+ # but we check if it forced 3 lines or something weird, though helper only does max 2 usually)
287
+ # format_text_lines guarantees max 2 lines usually, unless it fails fallback.
288
+
289
+ # Check char limit on lines just to be safe
290
+ valid_merge = True
291
+ for line in lines:
292
+ if len(line) > MAX_CHARS_PER_LINE + 5: # Tolerance
293
+ valid_merge = False
294
+ break
295
+
296
+ if valid_merge:
297
+ # MERGE!
298
+ prev_group.extend(curr_group)
299
+ continue
300
+
301
+ # If not merged, append
302
+ merged_events.append(curr_group)
303
+
304
+ # Generate Output SRT
305
+ output_srt = ""
306
+ for i, group in enumerate(merged_events, 1):
307
+ if not group: continue
308
+
309
+ start_time = seconds_to_srt_time(group[0]['start'])
310
+ end_time = seconds_to_srt_time(group[-1]['end'])
311
+
312
+ text = get_group_text(group)
313
+ formatted_text = format_text_lines(text, MAX_CHARS_PER_LINE)
314
+
315
+ output_srt += f"{i}\n{start_time} --> {end_time}\n{formatted_text}\n\n"
316
+
317
+ return output_srt.strip()
318
+
319
+ import subprocess
320
+ import shutil
321
+ import os
322
+
323
+ def process_audio_for_transcription(input_file: str, has_bg_music: bool = False, time_start: float = None, time_end: float = None) -> str:
324
+ """
325
+ Process audio to maximize speech clarity.
326
+
327
+ Args:
328
+ input_file: Path to input audio
329
+ has_bg_music: If True, uses Demucs to remove background music (slow).
330
+ If False, skips Demucs but applies voice enhancement filters (fast).
331
+
332
+ Returns path to processed .mp3 file (vocals)
333
+ """
334
+
335
+ # Output directory for processed files
336
+ output_dir = os.path.join("static", "processed")
337
+ os.makedirs(output_dir, exist_ok=True)
338
+
339
+ input_filename = os.path.basename(input_file)
340
+ input_stem = os.path.splitext(input_filename)[0]
341
+
342
+ # Adicionar sufixo se houver corte, para evitar cache/conflito incorreto
343
+ suffix = ""
344
+ if time_start is not None: suffix += f"_s{int(time_start)}"
345
+ if time_end is not None: suffix += f"_e{int(time_end)}"
346
+
347
+ final_output = os.path.join(output_dir, f"{input_stem}{suffix}.processed.mp3")
348
+
349
+ ffmpeg_cmd = shutil.which("ffmpeg")
350
+ if not ffmpeg_cmd:
351
+ print("⚠️ FFmpeg não encontrado!")
352
+ return input_file
353
+
354
+ vocals_path = input_file
355
+
356
+ # 1. Background Music Removal (Demucs) - OPTIONAL
357
+ if has_bg_music:
358
+ print(f"🔊 [Demucs] Iniciando isolamento de voz via AI (has_bg_music=True)...")
359
+ demucs_output_dir = os.path.join("static", "separated")
360
+ os.makedirs(demucs_output_dir, exist_ok=True)
361
+
362
+ # Check demucs availability
363
+ demucs_cmd = shutil.which("demucs")
364
+ if not demucs_cmd:
365
+ demucs_cmd = "demucs" # Fallback to path alias
366
+
367
+ try:
368
+ model = "htdemucs"
369
+ command = [
370
+ demucs_cmd,
371
+ "--two-stems=vocals",
372
+ "-n", model,
373
+ "-d", "cpu",
374
+ "--mp3",
375
+ "--mp3-bitrate", "128",
376
+ input_file,
377
+ "-o", demucs_output_dir
378
+ ]
379
+
380
+ print(f"🔊 Executando Demucs...")
381
+ result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
382
+
383
+ if result.returncode == 0:
384
+ # Demucs success
385
+ # Path: output_dir / model_name / input_filename_no_ext / vocals.mp3
386
+ demucs_vocals = os.path.join(demucs_output_dir, model, input_stem, "vocals.mp3")
387
+ if os.path.exists(demucs_vocals):
388
+ print(f"✅ Demucs sucesso: {demucs_vocals}")
389
+ vocals_path = demucs_vocals
390
+ else:
391
+ print(f"⚠️ Erro no Demucs (Code {result.returncode}), continuando com audio original.")
392
+
393
+ except Exception as e:
394
+ print(f"⚠️ Falha no Demucs: {e}")
395
+
396
+ else:
397
+ print(f"⏩ [Demucs] Pulando remoção de música (has_bg_music=False).")
398
+
399
+ # 2. Voice Enhancement (FFmpeg Filters) - ALWAYS RUN
400
+ print(f"🔊 [FFmpeg] Aplicando filtros de melhoria de voz...")
401
+
402
+ # Compress to mono mp3 16k with aggressive voice enhancement
403
+ # Filters include highpass, noise reduction, compression, EQ, and normalization
404
+ filter_chain = (
405
+ "highpass=f=100,"
406
+ "afftdn=nr=10:nf=-50:tn=1,"
407
+ "compand=attacks=0:points=-80/-90|-45/-25|-27/-9|0/-7:gain=5,"
408
+ "equalizer=f=3000:width_type=h:width=1000:g=5,"
409
+ "loudnorm"
410
+ )
411
+
412
+ cmd_convert = [
413
+ ffmpeg_cmd, "-y",
414
+ "-i", vocals_path,
415
+ ]
416
+
417
+ # Apply cutting if requested (Output seeking for accuracy)
418
+ if time_start is not None:
419
+ cmd_convert.extend(["-ss", str(time_start)])
420
+ if time_end is not None:
421
+ cmd_convert.extend(["-to", str(time_end)])
422
+
423
+ cmd_convert.extend([
424
+ "-ac", "1", "-ar", "16000",
425
+ "-af", filter_chain,
426
+ "-c:a", "libmp3lame", "-q:a", "2",
427
+ final_output
428
+ ])
429
+
430
+ try:
431
+ subprocess.run(cmd_convert, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
432
+
433
+ # Cleanup demucs folder if it was used
434
+ if has_bg_music and "separated" in vocals_path:
435
+ try:
436
+ # We need to find the parent folder of 'vocals.mp3' which is the song folder
437
+ song_folder = os.path.dirname(vocals_path)
438
+ shutil.rmtree(song_folder)
439
+ except: pass
440
+
441
+ return final_output
442
+
443
+ except Exception as e:
444
+ print(f"⚠️ Erro no FFmpeg: {e}")
445
+ return vocals_path