Spaces:
Paused
Paused
| import os | |
| import subprocess | |
| import sys | |
| sys.path.append(os.getcwd()) | |
| import glob | |
| import json | |
| import time | |
| from typing import Any | |
| from src.models.taskers.tasker import Tasker | |
| from src.models.utils.dirs import * | |
| from src.models.utils.logging import get_logger | |
| logger = get_logger(name='Embedder', is_stream=True,) | |
| class Embedder(Tasker): | |
| def __init__(self): | |
| super().__init__() | |
| def do(self, samples: dict, *args, **kwargs) -> Any: | |
| _f_output_paths = [] | |
| _o_output_paths = [] | |
| decode_file = glob.glob(_DECODE_DIR + '/hypo.json')[0] | |
| with open(decode_file, 'r') as f: | |
| _hypo_dict = json.load(f) | |
| _output_path = os.path.join(_FINAL_RESULT_DIR, 'output.mp4') | |
| _srt_path = os.path.join(_FINAL_RESULT_DIR, 'subtitle.srt') | |
| if not any(_hypo_dict['hypo']): | |
| logger.critical('Empty transcript is predicted') | |
| raise RuntimeError() | |
| f = open(_srt_path, 'w') | |
| for utt_id, hypo in sorted(zip(_hypo_dict['utt_id'], _hypo_dict['hypo']), key=lambda x: x[0]): | |
| index = int(utt_id.split('/')[0]) | |
| timestamp = samples[index]['timestamp'] | |
| self._append_to_srt(f=f, index=index, subtitle=hypo, timestamp=timestamp) | |
| f.close() | |
| self._embed_subtitle(samples['result_video_path'], _srt_path, _output_path) | |
| return _output_path | |
| def _embed_subtitle( | |
| self, | |
| video_path: str, | |
| subtitle_path: str, | |
| output_path: str, | |
| ) -> str: | |
| if not os.path.isfile(video_path): | |
| logger.critical("Video file is not exist.") | |
| raise RuntimeError() | |
| if not os.path.isfile(subtitle_path): | |
| logger.critical("Subtitle file is not exist.") | |
| raise RuntimeError() | |
| _cmd = [ | |
| 'ffmpeg', '-y', | |
| '-loglevel', 'panic', | |
| '-i', video_path, | |
| '-vf', f"subtitles={subtitle_path}:force_style='PrimaryColour=&HFFFFFF,BorderStyle=4,BackColour=0'", | |
| output_path, | |
| ] | |
| subprocess.run(_cmd, shell=False, capture_output=False, stdout=None) | |
| if not os.path.isfile(output_path): | |
| logger.warning(f"Add subtitle into video '{video_path}' fail.") | |
| raise RuntimeError() | |
| return output_path | |
| def _append_to_srt(self, f, index: int, subtitle: str, timestamp: tuple): | |
| _start_time_stamp = time.strftime('%H:%M:%S,000', time.gmtime(timestamp[0])) | |
| _end_time_stamp = time.strftime('%H:%M:%S,000', time.gmtime(timestamp[1])) | |
| f.write(str(index) + '\n') | |
| f.write(_start_time_stamp + ' --> ' + _end_time_stamp + '\n') | |
| f.write(subtitle + '\n') | |
| f.write('\n') |