habulaj commited on
Commit
0ce9d36
·
verified ·
1 Parent(s): 0898be9

Delete srt_utils.py

Browse files
Files changed (1) hide show
  1. srt_utils.py +0 -445
srt_utils.py DELETED
@@ -1,445 +0,0 @@
1
- import re
2
-
3
- def srt_time_to_seconds(timestamp):
4
- """Converts SRT timestamp (HH:MM:SS,mmm) to seconds"""
5
- try:
6
- time_part, ms_part = timestamp.split(",")
7
- h, m, s = map(int, time_part.split(":"))
8
- ms = int(ms_part)
9
- return h * 3600 + m * 60 + s + ms / 1000.0
10
- except:
11
- return 0.0
12
-
13
- def seconds_to_srt_time(seconds):
14
- """Converts seconds to SRT timestamp (HH:MM:SS,mmm)"""
15
- hours = int(seconds // 3600)
16
- minutes = int((seconds % 3600) // 60)
17
- secs = int(seconds % 60)
18
- ms = int((seconds % 1) * 1000)
19
- return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}"
20
-
21
- def shift_srt_timestamps(srt_content, offset_seconds):
22
- """Shifts all timestamps in SRT content by offset_seconds"""
23
- subs = parse_srt(srt_content)
24
- if not subs:
25
- return srt_content
26
-
27
- shifted_srt = ""
28
- for i, sub in enumerate(subs, 1):
29
- start = sub['start'] + offset_seconds
30
- end = sub['end'] + offset_seconds
31
-
32
- # Ensure non-negative
33
- if start < 0: start = 0
34
- if end < 1e-3: end = 1e-3 # avoid 0 overlap logic issues if possible
35
-
36
- start_str = seconds_to_srt_time(start)
37
- end_str = seconds_to_srt_time(end)
38
-
39
- shifted_srt += f"{i}\n{start_str} --> {end_str}\n{sub['text']}\n\n"
40
-
41
- return shifted_srt.strip()
42
-
43
- def parse_srt(srt_content):
44
- """Parses SRT content into a list of dictionaries. Returns VALIDATED list."""
45
- pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!\d+\s*\n\d{1,2}:\d{2}).+\n?)*)", re.MULTILINE)
46
- matches = pattern.findall(srt_content)
47
-
48
- subtitles = []
49
- for num, start, end, text in matches:
50
- subtitles.append({
51
- 'start': srt_time_to_seconds(start.strip()),
52
- 'end': srt_time_to_seconds(end.strip()),
53
- 'text': text.strip()
54
- })
55
- return subtitles
56
-
57
- def format_text_lines(text, max_chars=42):
58
- """Formats text into max 2 lines, balancing length or respecting max_chars"""
59
- words = text.split()
60
- if not words:
61
- return ""
62
-
63
- # If fits in one line, but we might WANT to split if it's long (> 30 chars) for better reading (pyramid shape)
64
- # The user complained about 42 chars being too long for one line.
65
- FORCE_SPLIT_THRESHOLD = 30
66
-
67
- if len(text) <= max_chars and len(text) <= FORCE_SPLIT_THRESHOLD:
68
- return text
69
-
70
- # Needs splitting (or we want to try splitting)
71
- # Simple split strategy: find middle space
72
- best_split_idx = -1
73
- best_balance = float('inf')
74
-
75
- # Try splitting at each word
76
- for i in range(1, len(words)):
77
- # Construct line 1 and line 2
78
- line1 = " ".join(words[:i])
79
- line2 = " ".join(words[i:])
80
-
81
- len1 = len(line1)
82
- len2 = len(line2)
83
-
84
- # Valid split? Only if both fit max_chars
85
- if len1 <= max_chars and len2 <= max_chars:
86
- balance = abs(len2 - len1)
87
- # Bonus for bottom heavy (line2 >= line1) which looks better often (pyramid)
88
- if len2 >= len1:
89
- balance -= 5
90
-
91
- if balance < best_balance:
92
- best_balance = balance
93
- best_split_idx = i
94
-
95
- # If we found a valid split
96
- if best_split_idx != -1:
97
- # If the original text fit in one line (< max_chars), only use the split if it's reasonably balanced.
98
- # If the split results in a tiny orphan like "I\nam going", stick to 1 line if possible.
99
- if len(text) <= max_chars:
100
- line1 = " ".join(words[:best_split_idx])
101
- line2 = " ".join(words[best_split_idx:])
102
- # If one line is very short relative to the other, maybe don't split?
103
- # But user wants "Netflix style", usually balanced.
104
- pass
105
-
106
- line1 = " ".join(words[:best_split_idx])
107
- line2 = " ".join(words[best_split_idx:])
108
- return f"{line1}\n{line2}"
109
-
110
- # Fallback: if no valid split found (e.g. words too long), but whole text fits in max_chars
111
- if len(text) <= max_chars:
112
- return text
113
-
114
- # Fallback 2: Really long text, just split in middle
115
- mid = len(words) // 2
116
- return " ".join(words[:mid]) + "\n" + " ".join(words[mid:])
117
-
118
- def fix_word_timing(words):
119
- """
120
- Ensures words are sequential in time.
121
- Strategy:
122
- 1. If overlaps, prefer trimming the END of the previous word to preserve the START of the current word.
123
- 2. Only delay the current word if the previous word would become too short or inverted.
124
- 3. Ensure minimum duration for all words.
125
- """
126
- if not words: return []
127
-
128
- # We edit in place / return modified list
129
-
130
- for i in range(1, len(words)):
131
- prev = words[i-1]
132
- curr = words[i]
133
-
134
- # Check for overlap
135
- if curr['start'] < prev['end']:
136
- # Overlap detected.
137
- # Try to trim prev['end'] to match curr['start']
138
-
139
- # Check if trimming leaves prev with enough time? (e.g. > 0s)
140
- # Actually, standard logic: just clamp prev end.
141
- new_prev_end = max(prev['start'], curr['start'])
142
-
143
- # If trimming makes it zero/negative (meaning curr starts BEFORE prev starts),
144
- # then we adhere to sequential text order implies we MUST delay curr.
145
- if new_prev_end <= prev['start'] + 0.01:
146
- # Impossible to trim prev enough. Push curr.
147
- curr['start'] = prev['end']
148
- else:
149
- # Trim prev
150
- prev['end'] = new_prev_end
151
-
152
- # Ensure curr has valid duration
153
- if curr['end'] <= curr['start']:
154
- curr['end'] = curr['start'] + 0.1 # Minimum duration 100ms
155
-
156
- return words
157
-
158
- def apply_netflix_style_filter(srt_content):
159
- """
160
- Groups word-level subtitles into Netflix-style phrases.
161
- Rules:
162
- - Max 42 chars/line
163
- - Max 2 lines
164
- - Max duration 7s
165
- - Merge words
166
- """
167
- words = parse_srt(srt_content)
168
- if not words:
169
- return srt_content
170
-
171
- # FIX TIMING ISSUES FIRST
172
- words = fix_word_timing(words)
173
-
174
- grouped_events = []
175
- current_group = []
176
-
177
- MAX_CHARS_PER_LINE = 42
178
- MAX_LINES = 2
179
- MAX_TOTAL_CHARS = MAX_CHARS_PER_LINE * MAX_LINES
180
- MAX_DURATION = 7.0
181
- MIN_GAP_FOR_SPLIT = 0.5 # seconds
182
-
183
- def get_group_text(group):
184
- return " ".join(w['text'] for w in group)
185
-
186
- def get_group_duration(group):
187
- if not group: return 0
188
- return group[-1]['end'] - group[0]['start']
189
-
190
- for i, word in enumerate(words):
191
- if not current_group:
192
- current_group.append(word)
193
- continue
194
-
195
- last_word = current_group[-1]
196
-
197
- # 1. Check for Silence (Gap)
198
- gap = word['start'] - last_word['end']
199
- if gap > MIN_GAP_FOR_SPLIT:
200
- grouped_events.append(current_group)
201
- current_group = [word]
202
- continue
203
-
204
- # 2. Check Limits (Length & Duration)
205
- current_text = get_group_text(current_group)
206
- new_text_proj = current_text + " " + word['text']
207
- current_duration = last_word['end'] - current_group[0]['start']
208
- new_duration_proj = word['end'] - current_group[0]['start']
209
-
210
- # New Logic: Prefer single lines
211
- # If adding the word exceeds 42 chars (MAX_CHARS_PER_LINE)
212
- if len(new_text_proj) > MAX_CHARS_PER_LINE:
213
- # We are crossing the single line boundary.
214
- # Check if we SHOULD split now or allow 2 lines.
215
-
216
- # Reasons to split (make a new subtitle):
217
- # A. Current subtitle is already "long enough" in duration (> 1s)
218
- is_long_enough_dur = current_duration > 1.0
219
-
220
- # B. Current subtitle is a complete sentence?
221
- # (Handled by step 3, but this is size check)
222
-
223
- # C. The projected text is HUGE (e.g. > 70 chars).
224
- # Netflix allows up to 84 (2 lines), but user wants "separation".
225
- # Let's cap at something smaller for 2 lines, e.g. 70.
226
- is_too_huge = len(new_text_proj) > 70
227
-
228
- # If it's long enough duration OR becoming huge -> BREAK
229
- if is_long_enough_dur or is_too_huge:
230
- grouped_events.append(current_group)
231
- current_group = [word]
232
- continue
233
-
234
- # Otherwise, allow merging into 2nd line (e.g. fast speech, short duration)
235
-
236
- # Check absolute absolute URL limit (MAX_TOTAL_CHARS) just in case
237
- if len(new_text_proj) > MAX_TOTAL_CHARS or new_duration_proj > MAX_DURATION:
238
- grouped_events.append(current_group)
239
- current_group = [word]
240
- continue
241
-
242
- # 3. Check Sentence Endings (CRITICAL)
243
- # If previous word was a sentence end, ALWAYS split, unless current group is tiny (<15 chars)
244
- if re.search(r'[.!?]$', last_word['text']):
245
- # Exception: "No." (Very short). "again." (6 chars) will break.
246
- if len(current_text) > 3:
247
- grouped_events.append(current_group)
248
- current_group = [word]
249
- continue
250
-
251
- current_group.append(word)
252
-
253
- if current_group:
254
- grouped_events.append(current_group)
255
-
256
- # --- POST-PROCESSING: Merge Orphans ---
257
- # Attempt to merge single/short words into previous block if they are close
258
-
259
- merged_events = []
260
- if grouped_events:
261
- merged_events.append(grouped_events[0])
262
-
263
- for i in range(1, len(grouped_events)):
264
- prev_group = merged_events[-1]
265
- curr_group = grouped_events[i]
266
-
267
- # Check if current group is "orphan-candidate"
268
- # Criteria: 1 word OR very short text (< 10 chars)
269
- curr_text = get_group_text(curr_group)
270
- is_orphan = len(curr_group) == 1 or len(curr_text) < 10
271
-
272
- if is_orphan:
273
- # Check gap
274
- gap = curr_group[0]['start'] - prev_group[-1]['end']
275
-
276
- # If gap is small enough (user said "very close")
277
- # Let's say < 1.0s is reasonably close for a "continuation"
278
- if gap < 1.0:
279
- # Check if merging breaks limits
280
- # We need to simulate the merge
281
- combined_text = get_group_text(prev_group + curr_group)
282
- formatted = format_text_lines(combined_text, MAX_CHARS_PER_LINE)
283
- lines = formatted.split('\n')
284
-
285
- # Valid if max 2 lines and lines aren't too long (format_text_lines handles length balancing,
286
- # but we check if it forced 3 lines or something weird, though helper only does max 2 usually)
287
- # format_text_lines guarantees max 2 lines usually, unless it fails fallback.
288
-
289
- # Check char limit on lines just to be safe
290
- valid_merge = True
291
- for line in lines:
292
- if len(line) > MAX_CHARS_PER_LINE + 5: # Tolerance
293
- valid_merge = False
294
- break
295
-
296
- if valid_merge:
297
- # MERGE!
298
- prev_group.extend(curr_group)
299
- continue
300
-
301
- # If not merged, append
302
- merged_events.append(curr_group)
303
-
304
- # Generate Output SRT
305
- output_srt = ""
306
- for i, group in enumerate(merged_events, 1):
307
- if not group: continue
308
-
309
- start_time = seconds_to_srt_time(group[0]['start'])
310
- end_time = seconds_to_srt_time(group[-1]['end'])
311
-
312
- text = get_group_text(group)
313
- formatted_text = format_text_lines(text, MAX_CHARS_PER_LINE)
314
-
315
- output_srt += f"{i}\n{start_time} --> {end_time}\n{formatted_text}\n\n"
316
-
317
- return output_srt.strip()
318
-
319
- import subprocess
320
- import shutil
321
- import os
322
-
323
- def process_audio_for_transcription(input_file: str, has_bg_music: bool = False, time_start: float = None, time_end: float = None) -> str:
324
- """
325
- Process audio to maximize speech clarity.
326
-
327
- Args:
328
- input_file: Path to input audio
329
- has_bg_music: If True, uses Demucs to remove background music (slow).
330
- If False, skips Demucs but applies voice enhancement filters (fast).
331
-
332
- Returns path to processed .mp3 file (vocals)
333
- """
334
-
335
- # Output directory for processed files
336
- output_dir = os.path.join("static", "processed")
337
- os.makedirs(output_dir, exist_ok=True)
338
-
339
- input_filename = os.path.basename(input_file)
340
- input_stem = os.path.splitext(input_filename)[0]
341
-
342
- # Adicionar sufixo se houver corte, para evitar cache/conflito incorreto
343
- suffix = ""
344
- if time_start is not None: suffix += f"_s{int(time_start)}"
345
- if time_end is not None: suffix += f"_e{int(time_end)}"
346
-
347
- final_output = os.path.join(output_dir, f"{input_stem}{suffix}.processed.mp3")
348
-
349
- ffmpeg_cmd = shutil.which("ffmpeg")
350
- if not ffmpeg_cmd:
351
- print("⚠️ FFmpeg não encontrado!")
352
- return input_file
353
-
354
- vocals_path = input_file
355
-
356
- # 1. Background Music Removal (Demucs) - OPTIONAL
357
- if has_bg_music:
358
- print(f"🔊 [Demucs] Iniciando isolamento de voz via AI (has_bg_music=True)...")
359
- demucs_output_dir = os.path.join("static", "separated")
360
- os.makedirs(demucs_output_dir, exist_ok=True)
361
-
362
- # Check demucs availability
363
- demucs_cmd = shutil.which("demucs")
364
- if not demucs_cmd:
365
- demucs_cmd = "demucs" # Fallback to path alias
366
-
367
- try:
368
- model = "htdemucs"
369
- command = [
370
- demucs_cmd,
371
- "--two-stems=vocals",
372
- "-n", model,
373
- "-d", "cpu",
374
- "--mp3",
375
- "--mp3-bitrate", "128",
376
- input_file,
377
- "-o", demucs_output_dir
378
- ]
379
-
380
- print(f"🔊 Executando Demucs...")
381
- result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
382
-
383
- if result.returncode == 0:
384
- # Demucs success
385
- # Path: output_dir / model_name / input_filename_no_ext / vocals.mp3
386
- demucs_vocals = os.path.join(demucs_output_dir, model, input_stem, "vocals.mp3")
387
- if os.path.exists(demucs_vocals):
388
- print(f"✅ Demucs sucesso: {demucs_vocals}")
389
- vocals_path = demucs_vocals
390
- else:
391
- print(f"⚠️ Erro no Demucs (Code {result.returncode}), continuando com audio original.")
392
-
393
- except Exception as e:
394
- print(f"⚠️ Falha no Demucs: {e}")
395
-
396
- else:
397
- print(f"⏩ [Demucs] Pulando remoção de música (has_bg_music=False).")
398
-
399
- # 2. Voice Enhancement (FFmpeg Filters) - ALWAYS RUN
400
- print(f"🔊 [FFmpeg] Aplicando filtros de melhoria de voz...")
401
-
402
- # Compress to mono mp3 16k with aggressive voice enhancement
403
- # Filters include highpass, noise reduction, compression, EQ, and normalization
404
- filter_chain = (
405
- "highpass=f=100,"
406
- "afftdn=nr=10:nf=-50:tn=1,"
407
- "compand=attacks=0:points=-80/-90|-45/-25|-27/-9|0/-7:gain=5,"
408
- "equalizer=f=3000:width_type=h:width=1000:g=5,"
409
- "loudnorm"
410
- )
411
-
412
- cmd_convert = [
413
- ffmpeg_cmd, "-y",
414
- "-i", vocals_path,
415
- ]
416
-
417
- # Apply cutting if requested (Output seeking for accuracy)
418
- if time_start is not None:
419
- cmd_convert.extend(["-ss", str(time_start)])
420
- if time_end is not None:
421
- cmd_convert.extend(["-to", str(time_end)])
422
-
423
- cmd_convert.extend([
424
- "-ac", "1", "-ar", "16000",
425
- "-af", filter_chain,
426
- "-c:a", "libmp3lame", "-q:a", "2",
427
- final_output
428
- ])
429
-
430
- try:
431
- subprocess.run(cmd_convert, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
432
-
433
- # Cleanup demucs folder if it was used
434
- if has_bg_music and "separated" in vocals_path:
435
- try:
436
- # We need to find the parent folder of 'vocals.mp3' which is the song folder
437
- song_folder = os.path.dirname(vocals_path)
438
- shutil.rmtree(song_folder)
439
- except: pass
440
-
441
- return final_output
442
-
443
- except Exception as e:
444
- print(f"⚠️ Erro no FFmpeg: {e}")
445
- return vocals_path