leech / bot /modules /metadata.py
dragxd's picture
Initial commit: Push project to Hugging Face
db78256
from asyncio import create_subprocess_exec
from asyncio.subprocess import PIPE
import os
from os import path as ospath, walk
from aiofiles.os import path as aiopath, remove
from aioshutil import move
from .. import LOGGER, cpu_eater_lock, task_dict, task_dict_lock
from ..core.config_manager import BinConfig
from ..helper.ext_utils.bot_utils import sync_to_async
from ..helper.ext_utils.files_utils import get_path_size
from ..helper.ext_utils.media_utils import (
FFMpeg,
get_document_type,
get_media_info,
get_streams,
)
from ..helper.mirror_leech_utils.status_utils.metadata_status import MetadataStatus
async def apply_metadata_title(
self,
dl_path,
gid,
metadata_dict,
audio_metadata_dict=None,
video_metadata_dict=None,
subtitle_metadata_dict=None,
):
if not any(
[
metadata_dict,
audio_metadata_dict,
video_metadata_dict,
subtitle_metadata_dict,
]
):
return dl_path
LOGGER.info(f"Applying metadata to {self.name}")
ffmpeg = FFMpeg(self)
is_file = await aiopath.isfile(dl_path)
files = (
[(dl_path, *await get_document_type(dl_path))]
if is_file
else [
(ospath.join(d, f), *await get_document_type(ospath.join(d, f)))
for d, _, fs in await sync_to_async(walk, dl_path, topdown=False)
for f in fs
]
)
files = [(f, v, a) for f, v, a, _ in files if v or a]
if not files:
LOGGER.info(f"No audio/video files found in {dl_path} to apply metadata.")
return dl_path
async with task_dict_lock:
task_dict[self.mid] = MetadataStatus(self, ffmpeg, gid, "up")
self.progress = False
await cpu_eater_lock.acquire()
self.progress = True
try:
for file_path, is_video, is_audio in files:
if self.is_cancelled:
break
self.subname = ospath.basename(file_path)
self.subsize = await get_path_size(file_path)
meta = await self.metadata_processor.process_all(
video_metadata_dict or {},
audio_metadata_dict or {},
subtitle_metadata_dict or {},
file_path,
)
if metadata_dict:
meta["global"].update(
await self.metadata_processor.process(metadata_dict, file_path)
)
ext = ospath.splitext(file_path)[1]
temp_out = f"{ospath.splitext(file_path)[0]}.meta_temp{ext}"
streams = await get_streams(file_path)
if not streams:
LOGGER.error(f"Error getting streams for {file_path}. Skipping.")
if is_file:
cpu_eater_lock.release()
return dl_path
continue
met_cmd = [
BinConfig.FFMPEG_NAME,
"-hide_banner",
"-loglevel",
"error",
"-i",
file_path,
]
maps, meta_maps = [], []
v, a, s = 0, 0, 0
for stream in streams:
idx, typ = stream["index"], stream["codec_type"]
maps += ["-map", f"0:{idx}"]
if typ == "video":
maps += [f"-c:v:{v}", "copy"]
if "tags" in stream and "language" in stream["tags"]:
meta_maps += [
f"-metadata:s:v:{v}",
f"language={stream['tags']['language']}",
]
for k, v_ in meta["video"].items():
meta_maps += [f"-metadata:s:v:{v}", f"{k}={v_}"]
v += 1
elif typ == "audio":
maps += [f"-c:a:{a}", "copy"]
if "tags" in stream and "language" in stream["tags"]:
meta_maps += [
f"-metadata:s:a:{a}",
f"language={stream['tags']['language']}",
]
audio_meta = next(
(
m["metadata"]
for m in meta["audio_streams"]
if m["index"] == idx
),
{},
)
for k, v_ in audio_meta.items():
meta_maps += [f"-metadata:s:a:{a}", f"{k}={v_}"]
a += 1
elif typ == "subtitle":
maps += [f"-c:s:{s}", "copy"]
if "tags" in stream and "language" in stream["tags"]:
meta_maps += [
f"-metadata:s:s:{s}",
f"language={stream['tags']['language']}",
]
sub_meta = next(
(
m["metadata"]
for m in meta["subtitle_streams"]
if m["index"] == idx
),
{},
)
for k, v_ in sub_meta.items():
meta_maps += [f"-metadata:s:s:{s}", f"{k}={v_}"]
s += 1
else:
maps += [f"-c:{idx}", "copy"]
met_cmd += maps
met_cmd += ["-map_metadata", "-1"]
for item in meta_maps:
met_cmd.append(item)
for k, v_ in meta["global"].items():
met_cmd += ["-metadata", f"{k}={v_}"]
met_cmd += ["-threads", str(max(1, (os.cpu_count() or 2) // 2)), temp_out]
ffmpeg.clear()
media_info = await get_media_info(file_path)
if media_info:
ffmpeg._total_time = media_info[0]
LOGGER.debug(f"FFmpeg command: {' '.join(met_cmd)}")
self.subproc = await create_subprocess_exec(
*met_cmd, stdout=PIPE, stderr=PIPE
)
await ffmpeg._ffmpeg_progress()
_, stderr = await self.subproc.communicate()
stderr_text = stderr.decode().strip() if stderr else ""
if self.is_cancelled:
if await aiopath.exists(temp_out):
await remove(temp_out)
break
if self.subproc.returncode == 0:
LOGGER.info(f"Successfully applied metadata to {file_path}")
await remove(file_path)
await move(temp_out, file_path)
else:
LOGGER.error(f"Error applying metadata to {file_path}: {stderr_text}")
if await aiopath.exists(temp_out):
await remove(temp_out)
finally:
cpu_eater_lock.release()
return dl_path