leech / bot /helper /ext_utils /metadata_utils.py
dragxd's picture
Initial commit: Push project to Hugging Face
db78256
from os.path import basename, splitext
from re import compile as re_compile
from pycountry import languages
from ..ext_utils.media_utils import get_streams
class MetadataProcessor:
_year_pattern = re_compile(r"\b(19|20)\d{2}\b")
_sanitize_pattern = re_compile(r'[<>:"/\\?*]')
def __init__(self):
self.vars = {}
self.audio_streams = []
self.subtitle_streams = []
@staticmethod
def convert_lang_code(lang_code):
if not lang_code or lang_code in {"unknown", "und", "none"}:
return lang_code
try:
if len(lang_code) == 2:
lang = languages.get(alpha_2=lang_code.lower())
elif len(lang_code) == 3:
lang = languages.get(alpha_3=lang_code.lower())
else:
return lang_code
return lang.name if lang else lang_code
except Exception:
return lang_code
async def extract_file_vars(self, file_path):
fname = basename(file_path)
bname, ext = splitext(fname)
self.vars = {
"filename": fname,
"basename": bname,
"extension": ext.lstrip("."),
"audiolang": "unknown",
"sublang": "none",
}
self.audio_streams, self.subtitle_streams = [], []
try:
for s in await get_streams(file_path) or []:
ctype = s.get("codec_type", "").lower()
slang = s.get("tags", {}).get("language", "unknown")
full_lang = self.convert_lang_code(slang)
entry = {
"index": s.get("index", 0),
"language": slang,
"full_language": full_lang,
}
if ctype == "audio":
self.audio_streams.append(entry)
if self.vars["audiolang"] == "unknown" and slang != "und":
self.vars["audiolang"] = full_lang
elif ctype == "subtitle":
self.subtitle_streams.append(entry)
if self.vars["sublang"] == "none" and slang != "und":
self.vars["sublang"] = full_lang
except Exception:
pass
m = self._year_pattern.findall(bname)
if m:
self.vars["year"] = m[-1]
@staticmethod
def parse_string(metadata_str):
if not metadata_str or not isinstance(metadata_str, str):
return {}
parts, current, i = [], "", 0
while i < len(metadata_str):
if (
metadata_str[i] == "\\"
and i + 1 < len(metadata_str)
and metadata_str[i + 1] == "|"
):
current += "|"
i += 2
elif metadata_str[i] == "|":
parts.append(current)
current = ""
i += 1
else:
current += metadata_str[i]
i += 1
if current:
parts.append(current)
return dict(p.split("=", 1) if "=" in p else (p, "") for p in parts)
@staticmethod
def merge_dicts(default_dict, cmd_dict):
return {**(default_dict or {}), **(cmd_dict or {})}
def apply_vars_to_stream(
self, metadata_dict, stream_lang=None, full_lang=None, stream_type="audio"
):
if not isinstance(metadata_dict, dict):
return {}
vars_with_stream = self.vars.copy()
if stream_lang and stream_lang != "unknown":
key = "audiolang" if stream_type == "audio" else "sublang"
vars_with_stream[key] = full_lang or self.convert_lang_code(stream_lang)
return {
self.sanitize(k): (
str(v).format(**vars_with_stream) if isinstance(v, str) else str(v)
)
for k, v in metadata_dict.items()
}
def apply_vars(self, metadata_dict):
return self.apply_vars_to_stream(metadata_dict)
def get_audio_metadata(self, audio_metadata_dict):
return [
{
"index": s["index"],
"metadata": self.apply_vars_to_stream(
audio_metadata_dict, s["language"], s["full_language"], "audio"
),
}
for s in self.audio_streams
]
def get_subtitle_metadata(self, subtitle_metadata_dict):
return [
{
"index": s["index"],
"metadata": self.apply_vars_to_stream(
subtitle_metadata_dict,
s["language"],
s["full_language"],
"subtitle",
),
}
for s in self.subtitle_streams
]
def sanitize(self, value):
return self._sanitize_pattern.sub("_", str(value))[:100]
async def process_all(
self,
video_metadata_dict,
audio_metadata_dict,
subtitle_metadata_dict,
file_path,
):
await self.extract_file_vars(file_path)
return {
"video": (
self.apply_vars(video_metadata_dict) if video_metadata_dict else {}
),
"audio_streams": (
self.get_audio_metadata(audio_metadata_dict)
if audio_metadata_dict
else []
),
"subtitle_streams": (
self.get_subtitle_metadata(subtitle_metadata_dict)
if subtitle_metadata_dict
else []
),
"global": {},
}
async def process(self, metadata_dict, file_path):
await self.extract_file_vars(file_path)
return self.apply_vars(metadata_dict)