VeuReu commited on
Commit
e653427
·
1 Parent(s): 9167d9a

Upload utils.py

Browse files
Files changed (1) hide show
  1. utils.py +211 -211
utils.py CHANGED
@@ -1,214 +1,214 @@
1
- # utils.py
2
- import os
3
- import yaml
4
- import subprocess
5
- from pathlib import Path
6
- from dataclasses import dataclass
7
- import shlex # Para manejar argumentos de línea de comandos de forma segura
8
- from __future__ import annotations
9
- from dataclasses import dataclass
10
- from typing import List, Optional, Callable
11
- import re
12
- import xml.etree.ElementTree as ET
13
-
14
-
15
- def incrustar_subtitulos_ffmpeg(
16
- input_video_path: str,
17
- srt_path: str,
18
- output_video_path: str,
19
- codificar_audio: bool = False,
20
- sobreescribir: bool = False
21
- ):
22
- """
23
- Incrusta (quema/hardsub) un archivo SRT sobre un vídeo MP4 usando FFmpeg.
24
-
25
- IMPORTANTE: Requiere que FFmpeg esté instalado en el sistema.
26
-
27
- :param input_video_path: Ruta al archivo de vídeo MP4 de entrada.
28
- :param srt_path: Ruta al archivo de subtítulos SRT.
29
- :param output_video_path: Ruta donde se guardará el nuevo vídeo con subtítulos.
30
- :param codificar_audio: Si es True, recodifica el audio (más lento, pero a veces soluciona problemas).
31
- Si es False (por defecto), copia el stream de audio (-c:a copy).
32
- :param sobreescribir: Si es True, permite sobreescribir el archivo de salida si ya existe.
33
- :raises FileNotFoundError: Si no se encuentra FFmpeg o alguno de los archivos de entrada.
34
- :raises subprocess.CalledProcessError: Si FFmpeg devuelve un error durante el proceso.
35
- """
36
-
37
- if not os.path.exists(input_video_path):
38
- raise FileNotFoundError(f"Vídeo de entrada no encontrado: {input_video_path}")
39
- if not os.path.exists(srt_path):
40
- raise FileNotFoundError(f"Archivo SRT no encontrado: {srt_path}")
41
-
42
- # Comando base de FFmpeg para incrustar subtítulos
43
- # -i: archivo de entrada
44
- # -vf: filtro de vídeo, usamos 'subtitles=' para quemar el srt
45
- # -c:v: copia el stream de vídeo original (rápido, sin recodificar el vídeo)
46
-
47
- # IMPORTANTE: Para que FFmpeg pueda quemar subtítulos, debe usar el filtro 'subtitles'
48
- # y *no* copiar el stream de vídeo (-c:v copy). Debemos recodificar el vídeo (-c:v libx264).
49
- # Solo copiaremos el audio para ahorrar tiempo a menos que se indique lo contrario.
50
-
51
- audio_codec_arg = ["-c:a", "copy"] if not codificar_audio else []
52
-
53
- # Se recomienda usar -c:v libx264 y -preset fast para una buena velocidad/calidad
54
- video_codec_arg = ["-c:v", "libx264", "-preset", "fast"]
55
-
56
- ffmpeg_command = [
57
- "ffmpeg",
58
- "-i", input_video_path,
59
- "-vf", f"subtitles={srt_path}", # Filtro para quemar el SRT
60
- *video_codec_arg,
61
- *audio_codec_arg,
62
- output_video_path
63
- ]
64
-
65
- if sobreescribir:
66
- ffmpeg_command.insert(1, "-y") # Añade la opción de sobreescribir
67
-
68
- print(f"Ejecutando comando FFmpeg: {' '.join(shlex.quote(arg) for arg in ffmpeg_command)}")
69
-
70
- try:
71
- # Ejecutar el comando FFmpeg
72
- subprocess.run(
73
- ffmpeg_command,
74
- check=True, # Lanza CalledProcessError si el código de retorno no es 0
75
- stdout=subprocess.PIPE, # Captura la salida estándar (para no saturar)
76
- stderr=subprocess.PIPE, # Captura la salida de error (donde FFmpeg imprime el progreso)
77
- text=True
78
- )
79
- print(f"\nÉxito: Vídeo con subtítulos guardado en: {output_video_path}")
80
-
81
- except FileNotFoundError:
82
- print("ERROR: El comando 'ffmpeg' no fue encontrado.")
83
- print("Asegúrate de que FFmpeg esté instalado y accesible en tu PATH.")
84
- raise
85
- except subprocess.CalledProcessError as e:
86
- print(f"ERROR: FFmpeg falló con el código {e.returncode}")
87
- print("Salida de error de FFmpeg:")
88
- print(e.stderr)
89
- raise
90
-
91
-
92
- @dataclass
93
- class AppConfig:
94
- app: dict
95
- api: dict
96
- storage: dict
97
- sqlite: dict
98
- security: dict
99
- ui: dict
100
-
101
- def load_config(path: str = "config.yaml") -> AppConfig:
102
- path = Path(path)
103
- if not path.exists():
104
- raise FileNotFoundError(f"No encuentro {path.resolve()}. Sube un config.yaml válido.")
105
- with path.open("r", encoding="utf-8") as f:
106
- cfg = yaml.safe_load(f) or {}
107
- for k in ["app", "api", "storage", "sqlite", "security", "ui"]:
108
- if k not in cfg:
109
- raise ValueError(f"Falta la clave '{k}' en config.yaml")
110
- return AppConfig(**cfg)
111
-
112
- def ensure_dirs(root: str | Path):
113
- root = Path(root)
114
- (root / "uploads").mkdir(parents=True, exist_ok=True)
115
- (root / "results").mkdir(parents=True, exist_ok=True)
116
-
117
- def save_bytes(path: str | Path, data: bytes):
118
- path = Path(path)
119
- path.parent.mkdir(parents=True, exist_ok=True)
120
- with open(path, "wb") as f:
121
- f.write(data)
122
-
123
- def save_text(path: str | Path, text: str):
124
- path = Path(path)
125
- path.parent.mkdir(parents=True, exist_ok=True)
126
- with open(path, "w", encoding="utf-8") as f:
127
- f.write(text)
128
-
129
- def human_size(num_bytes: int) -> str:
130
- units = ["B", "KB", "MB", "GB", "TB", "PB"]
131
- n = float(num_bytes)
132
- for u in units:
133
- if n < 1024.0:
134
- return f"{n:.1f} {u}"
135
- n /= 1024.0
136
- return f"{n:.1f} EB"
137
-
138
- def recortar_video(input_path: str, output_path: str, duracion_segundos: int = 240):
139
- """
140
- Corta los primeros `duracion_segundos` de un MP4 sin recodificar (rápido y sin pérdida).
141
- Requiere ffmpeg en PATH (en conda-forge ya viene).
142
- """
143
- input_path = str(Path(input_path))
144
- output_path = str(Path(output_path))
145
-
146
- cmd = [
147
- "ffmpeg",
148
- "-y", # sobrescribe salida
149
- "-hide_banner",
150
- "-loglevel", "error", # mensajes limpios
151
- "-ss", "0", # inicio
152
- "-i", input_path, # entrada
153
- "-t", str(duracion_segundos), # duración
154
- "-c", "copy", # copia streams sin recodificar
155
- output_path
156
- ]
157
- subprocess.run(cmd, check=True)
158
-
159
-
160
- # ---- Núcleo: SRT -> ESF (XML string) ----
161
-
162
- TIME_RE = re.compile(
163
- r"(?P<start>\d{2}:\d{2}:\d{2}[,\.]\d{3})\s*-->\s*(?P<end>\d{2}:\d{2}:\d{2}[,\.]\d{3})"
164
- )
165
-
166
- @dataclass
167
- class Cue:
168
- index: int
169
- start: str # "HH:MM:SS.mmm"
170
- end: str # "HH:MM:SS.mmm"
171
- text: str
172
-
173
- def _norm_ts(ts: str) -> str:
174
- """Convierte '01:02:03,456' -> '01:02:03.456'."""
175
- return ts.replace(",", ".")
176
-
177
- def _parse_srt(srt_text: str) -> List[Cue]:
178
- """Parsea SRT a una lista de cues normalizados."""
179
- srt_text = srt_text.replace("\r\n", "\n").replace("\r", "\n")
180
- blocks = [b.strip() for b in re.split(r"\n\s*\n", srt_text) if b.strip()]
181
- cues: List[Cue] = []
182
-
183
- for block in blocks:
184
- lines = block.split("\n")
185
- # Detectar si la primera línea es índice
186
- idx = None
187
- if lines and lines[0].strip().isdigit():
188
- idx = int(lines[0].strip())
189
- time_candidates = lines[1:]
190
- else:
191
- idx = len(cues) + 1
192
- time_candidates = lines
193
-
194
- m = None
195
- time_line_idx = None
196
- for i, ln in enumerate(time_candidates[:3]): # robustez
197
- mm = TIME_RE.search(ln)
198
- if mm:
199
- m = mm
200
- time_line_idx = i
201
- break
202
- if not m:
203
- raise ValueError(f"Bloque SRT sin tiempos válidos (índice {idx}):\n{block}")
204
-
205
- start = _norm_ts(m.group("start"))
206
- end = _norm_ts(m.group("end"))
207
- text_lines = time_candidates[time_line_idx + 1 :]
208
- text = "\n".join(text_lines).strip()
209
-
210
- cues.append(Cue(index=idx, start=start, end=end, text=text))
211
-
212
  # Re-indexar por si venía desordenado
213
  for i, c in enumerate(cues, 1):
214
  c.index = i
 
1
+ from __future__ import annotations
2
+
3
+ # utils.py
4
+ import os
5
+ import yaml
6
+ import subprocess
7
+ from pathlib import Path
8
+ from dataclasses import dataclass
9
+ import shlex # Para manejar argumentos de línea de comandos de forma segura
10
+ from typing import List, Optional, Callable
11
+ import re
12
+ import xml.etree.ElementTree as ET
13
+
14
+
15
+ def incrustar_subtitulos_ffmpeg(
16
+ input_video_path: str,
17
+ srt_path: str,
18
+ output_video_path: str,
19
+ codificar_audio: bool = False,
20
+ sobreescribir: bool = False
21
+ ):
22
+ """
23
+ Incrusta (quema/hardsub) un archivo SRT sobre un vídeo MP4 usando FFmpeg.
24
+
25
+ IMPORTANTE: Requiere que FFmpeg esté instalado en el sistema.
26
+
27
+ :param input_video_path: Ruta al archivo de vídeo MP4 de entrada.
28
+ :param srt_path: Ruta al archivo de subtítulos SRT.
29
+ :param output_video_path: Ruta donde se guardará el nuevo vídeo con subtítulos.
30
+ :param codificar_audio: Si es True, recodifica el audio (más lento, pero a veces soluciona problemas).
31
+ Si es False (por defecto), copia el stream de audio (-c:a copy).
32
+ :param sobreescribir: Si es True, permite sobreescribir el archivo de salida si ya existe.
33
+ :raises FileNotFoundError: Si no se encuentra FFmpeg o alguno de los archivos de entrada.
34
+ :raises subprocess.CalledProcessError: Si FFmpeg devuelve un error durante el proceso.
35
+ """
36
+
37
+ if not os.path.exists(input_video_path):
38
+ raise FileNotFoundError(f"Vídeo de entrada no encontrado: {input_video_path}")
39
+ if not os.path.exists(srt_path):
40
+ raise FileNotFoundError(f"Archivo SRT no encontrado: {srt_path}")
41
+
42
+ # Comando base de FFmpeg para incrustar subtítulos
43
+ # -i: archivo de entrada
44
+ # -vf: filtro de vídeo, usamos 'subtitles=' para quemar el srt
45
+ # -c:v: copia el stream de vídeo original (rápido, sin recodificar el vídeo)
46
+
47
+ # IMPORTANTE: Para que FFmpeg pueda quemar subtítulos, debe usar el filtro 'subtitles'
48
+ # y *no* copiar el stream de vídeo (-c:v copy). Debemos recodificar el vídeo (-c:v libx264).
49
+ # Solo copiaremos el audio para ahorrar tiempo a menos que se indique lo contrario.
50
+
51
+ audio_codec_arg = ["-c:a", "copy"] if not codificar_audio else []
52
+
53
+ # Se recomienda usar -c:v libx264 y -preset fast para una buena velocidad/calidad
54
+ video_codec_arg = ["-c:v", "libx264", "-preset", "fast"]
55
+
56
+ ffmpeg_command = [
57
+ "ffmpeg",
58
+ "-i", input_video_path,
59
+ "-vf", f"subtitles={srt_path}", # Filtro para quemar el SRT
60
+ *video_codec_arg,
61
+ *audio_codec_arg,
62
+ output_video_path
63
+ ]
64
+
65
+ if sobreescribir:
66
+ ffmpeg_command.insert(1, "-y") # Añade la opción de sobreescribir
67
+
68
+ print(f"Ejecutando comando FFmpeg: {' '.join(shlex.quote(arg) for arg in ffmpeg_command)}")
69
+
70
+ try:
71
+ # Ejecutar el comando FFmpeg
72
+ subprocess.run(
73
+ ffmpeg_command,
74
+ check=True, # Lanza CalledProcessError si el código de retorno no es 0
75
+ stdout=subprocess.PIPE, # Captura la salida estándar (para no saturar)
76
+ stderr=subprocess.PIPE, # Captura la salida de error (donde FFmpeg imprime el progreso)
77
+ text=True
78
+ )
79
+ print(f"\nÉxito: Vídeo con subtítulos guardado en: {output_video_path}")
80
+
81
+ except FileNotFoundError:
82
+ print("ERROR: El comando 'ffmpeg' no fue encontrado.")
83
+ print("Asegúrate de que FFmpeg esté instalado y accesible en tu PATH.")
84
+ raise
85
+ except subprocess.CalledProcessError as e:
86
+ print(f"ERROR: FFmpeg falló con el código {e.returncode}")
87
+ print("Salida de error de FFmpeg:")
88
+ print(e.stderr)
89
+ raise
90
+
91
+
92
+ @dataclass
93
+ class AppConfig:
94
+ app: dict
95
+ api: dict
96
+ storage: dict
97
+ sqlite: dict
98
+ security: dict
99
+ ui: dict
100
+
101
+ def load_config(path: str = "config.yaml") -> AppConfig:
102
+ path = Path(path)
103
+ if not path.exists():
104
+ raise FileNotFoundError(f"No encuentro {path.resolve()}. Sube un config.yaml válido.")
105
+ with path.open("r", encoding="utf-8") as f:
106
+ cfg = yaml.safe_load(f) or {}
107
+ for k in ["app", "api", "storage", "sqlite", "security", "ui"]:
108
+ if k not in cfg:
109
+ raise ValueError(f"Falta la clave '{k}' en config.yaml")
110
+ return AppConfig(**cfg)
111
+
112
+ def ensure_dirs(root: str | Path):
113
+ root = Path(root)
114
+ (root / "uploads").mkdir(parents=True, exist_ok=True)
115
+ (root / "results").mkdir(parents=True, exist_ok=True)
116
+
117
+ def save_bytes(path: str | Path, data: bytes):
118
+ path = Path(path)
119
+ path.parent.mkdir(parents=True, exist_ok=True)
120
+ with open(path, "wb") as f:
121
+ f.write(data)
122
+
123
+ def save_text(path: str | Path, text: str):
124
+ path = Path(path)
125
+ path.parent.mkdir(parents=True, exist_ok=True)
126
+ with open(path, "w", encoding="utf-8") as f:
127
+ f.write(text)
128
+
129
+ def human_size(num_bytes: int) -> str:
130
+ units = ["B", "KB", "MB", "GB", "TB", "PB"]
131
+ n = float(num_bytes)
132
+ for u in units:
133
+ if n < 1024.0:
134
+ return f"{n:.1f} {u}"
135
+ n /= 1024.0
136
+ return f"{n:.1f} EB"
137
+
138
+ def recortar_video(input_path: str, output_path: str, duracion_segundos: int = 240):
139
+ """
140
+ Corta los primeros `duracion_segundos` de un MP4 sin recodificar (rápido y sin pérdida).
141
+ Requiere ffmpeg en PATH (en conda-forge ya viene).
142
+ """
143
+ input_path = str(Path(input_path))
144
+ output_path = str(Path(output_path))
145
+
146
+ cmd = [
147
+ "ffmpeg",
148
+ "-y", # sobrescribe salida
149
+ "-hide_banner",
150
+ "-loglevel", "error", # mensajes limpios
151
+ "-ss", "0", # inicio
152
+ "-i", input_path, # entrada
153
+ "-t", str(duracion_segundos), # duración
154
+ "-c", "copy", # copia streams sin recodificar
155
+ output_path
156
+ ]
157
+ subprocess.run(cmd, check=True)
158
+
159
+
160
+ # ---- Núcleo: SRT -> ESF (XML string) ----
161
+
162
+ TIME_RE = re.compile(
163
+ r"(?P<start>\d{2}:\d{2}:\d{2}[,\.]\d{3})\s*-->\s*(?P<end>\d{2}:\d{2}:\d{2}[,\.]\d{3})"
164
+ )
165
+
166
+ @dataclass
167
+ class Cue:
168
+ index: int
169
+ start: str # "HH:MM:SS.mmm"
170
+ end: str # "HH:MM:SS.mmm"
171
+ text: str
172
+
173
+ def _norm_ts(ts: str) -> str:
174
+ """Convierte '01:02:03,456' -> '01:02:03.456'."""
175
+ return ts.replace(",", ".")
176
+
177
+ def _parse_srt(srt_text: str) -> List[Cue]:
178
+ """Parsea SRT a una lista de cues normalizados."""
179
+ srt_text = srt_text.replace("\r\n", "\n").replace("\r", "\n")
180
+ blocks = [b.strip() for b in re.split(r"\n\s*\n", srt_text) if b.strip()]
181
+ cues: List[Cue] = []
182
+
183
+ for block in blocks:
184
+ lines = block.split("\n")
185
+ # Detectar si la primera línea es índice
186
+ idx = None
187
+ if lines and lines[0].strip().isdigit():
188
+ idx = int(lines[0].strip())
189
+ time_candidates = lines[1:]
190
+ else:
191
+ idx = len(cues) + 1
192
+ time_candidates = lines
193
+
194
+ m = None
195
+ time_line_idx = None
196
+ for i, ln in enumerate(time_candidates[:3]): # robustez
197
+ mm = TIME_RE.search(ln)
198
+ if mm:
199
+ m = mm
200
+ time_line_idx = i
201
+ break
202
+ if not m:
203
+ raise ValueError(f"Bloque SRT sin tiempos válidos (índice {idx}):\n{block}")
204
+
205
+ start = _norm_ts(m.group("start"))
206
+ end = _norm_ts(m.group("end"))
207
+ text_lines = time_candidates[time_line_idx + 1 :]
208
+ text = "\n".join(text_lines).strip()
209
+
210
+ cues.append(Cue(index=idx, start=start, end=end, text=text))
211
+
212
  # Re-indexar por si venía desordenado
213
  for i, c in enumerate(cues, 1):
214
  c.index = i