| from asyncio import create_subprocess_exec | |
| from asyncio.subprocess import PIPE | |
| import os | |
| from os import path as ospath, walk | |
| from aiofiles.os import path as aiopath, remove | |
| from aioshutil import move | |
| from .. import LOGGER, cpu_eater_lock, task_dict, task_dict_lock | |
| from ..core.config_manager import BinConfig | |
| from ..helper.ext_utils.bot_utils import sync_to_async | |
| from ..helper.ext_utils.files_utils import get_path_size | |
| from ..helper.ext_utils.media_utils import ( | |
| FFMpeg, | |
| get_document_type, | |
| get_media_info, | |
| get_streams, | |
| ) | |
| from ..helper.mirror_leech_utils.status_utils.metadata_status import MetadataStatus | |
| async def apply_metadata_title( | |
| self, | |
| dl_path, | |
| gid, | |
| metadata_dict, | |
| audio_metadata_dict=None, | |
| video_metadata_dict=None, | |
| subtitle_metadata_dict=None, | |
| ): | |
| if not any( | |
| [ | |
| metadata_dict, | |
| audio_metadata_dict, | |
| video_metadata_dict, | |
| subtitle_metadata_dict, | |
| ] | |
| ): | |
| return dl_path | |
| LOGGER.info(f"Applying metadata to {self.name}") | |
| ffmpeg = FFMpeg(self) | |
| is_file = await aiopath.isfile(dl_path) | |
| files = ( | |
| [(dl_path, *await get_document_type(dl_path))] | |
| if is_file | |
| else [ | |
| (ospath.join(d, f), *await get_document_type(ospath.join(d, f))) | |
| for d, _, fs in await sync_to_async(walk, dl_path, topdown=False) | |
| for f in fs | |
| ] | |
| ) | |
| files = [(f, v, a) for f, v, a, _ in files if v or a] | |
| if not files: | |
| LOGGER.info(f"No audio/video files found in {dl_path} to apply metadata.") | |
| return dl_path | |
| async with task_dict_lock: | |
| task_dict[self.mid] = MetadataStatus(self, ffmpeg, gid, "up") | |
| self.progress = False | |
| await cpu_eater_lock.acquire() | |
| self.progress = True | |
| try: | |
| for file_path, is_video, is_audio in files: | |
| if self.is_cancelled: | |
| break | |
| self.subname = ospath.basename(file_path) | |
| self.subsize = await get_path_size(file_path) | |
| meta = await self.metadata_processor.process_all( | |
| video_metadata_dict or {}, | |
| audio_metadata_dict or {}, | |
| subtitle_metadata_dict or {}, | |
| file_path, | |
| ) | |
| if metadata_dict: | |
| meta["global"].update( | |
| await self.metadata_processor.process(metadata_dict, file_path) | |
| ) | |
| ext = ospath.splitext(file_path)[1] | |
| temp_out = f"{ospath.splitext(file_path)[0]}.meta_temp{ext}" | |
| streams = await get_streams(file_path) | |
| if not streams: | |
| LOGGER.error(f"Error getting streams for {file_path}. Skipping.") | |
| if is_file: | |
| cpu_eater_lock.release() | |
| return dl_path | |
| continue | |
| met_cmd = [ | |
| BinConfig.FFMPEG_NAME, | |
| "-hide_banner", | |
| "-loglevel", | |
| "error", | |
| "-i", | |
| file_path, | |
| ] | |
| maps, meta_maps = [], [] | |
| v, a, s = 0, 0, 0 | |
| for stream in streams: | |
| idx, typ = stream["index"], stream["codec_type"] | |
| maps += ["-map", f"0:{idx}"] | |
| if typ == "video": | |
| maps += [f"-c:v:{v}", "copy"] | |
| if "tags" in stream and "language" in stream["tags"]: | |
| meta_maps += [ | |
| f"-metadata:s:v:{v}", | |
| f"language={stream['tags']['language']}", | |
| ] | |
| for k, v_ in meta["video"].items(): | |
| meta_maps += [f"-metadata:s:v:{v}", f"{k}={v_}"] | |
| v += 1 | |
| elif typ == "audio": | |
| maps += [f"-c:a:{a}", "copy"] | |
| if "tags" in stream and "language" in stream["tags"]: | |
| meta_maps += [ | |
| f"-metadata:s:a:{a}", | |
| f"language={stream['tags']['language']}", | |
| ] | |
| audio_meta = next( | |
| ( | |
| m["metadata"] | |
| for m in meta["audio_streams"] | |
| if m["index"] == idx | |
| ), | |
| {}, | |
| ) | |
| for k, v_ in audio_meta.items(): | |
| meta_maps += [f"-metadata:s:a:{a}", f"{k}={v_}"] | |
| a += 1 | |
| elif typ == "subtitle": | |
| maps += [f"-c:s:{s}", "copy"] | |
| if "tags" in stream and "language" in stream["tags"]: | |
| meta_maps += [ | |
| f"-metadata:s:s:{s}", | |
| f"language={stream['tags']['language']}", | |
| ] | |
| sub_meta = next( | |
| ( | |
| m["metadata"] | |
| for m in meta["subtitle_streams"] | |
| if m["index"] == idx | |
| ), | |
| {}, | |
| ) | |
| for k, v_ in sub_meta.items(): | |
| meta_maps += [f"-metadata:s:s:{s}", f"{k}={v_}"] | |
| s += 1 | |
| else: | |
| maps += [f"-c:{idx}", "copy"] | |
| met_cmd += maps | |
| met_cmd += ["-map_metadata", "-1"] | |
| for item in meta_maps: | |
| met_cmd.append(item) | |
| for k, v_ in meta["global"].items(): | |
| met_cmd += ["-metadata", f"{k}={v_}"] | |
| met_cmd += ["-threads", str(max(1, (os.cpu_count() or 2) // 2)), temp_out] | |
| ffmpeg.clear() | |
| media_info = await get_media_info(file_path) | |
| if media_info: | |
| ffmpeg._total_time = media_info[0] | |
| LOGGER.debug(f"FFmpeg command: {' '.join(met_cmd)}") | |
| self.subproc = await create_subprocess_exec( | |
| *met_cmd, stdout=PIPE, stderr=PIPE | |
| ) | |
| await ffmpeg._ffmpeg_progress() | |
| _, stderr = await self.subproc.communicate() | |
| stderr_text = stderr.decode().strip() if stderr else "" | |
| if self.is_cancelled: | |
| if await aiopath.exists(temp_out): | |
| await remove(temp_out) | |
| break | |
| if self.subproc.returncode == 0: | |
| LOGGER.info(f"Successfully applied metadata to {file_path}") | |
| await remove(file_path) | |
| await move(temp_out, file_path) | |
| else: | |
| LOGGER.error(f"Error applying metadata to {file_path}: {stderr_text}") | |
| if await aiopath.exists(temp_out): | |
| await remove(temp_out) | |
| finally: | |
| cpu_eater_lock.release() | |
| return dl_path | |