from os.path import basename, splitext from re import compile as re_compile from pycountry import languages from ..ext_utils.media_utils import get_streams class MetadataProcessor: _year_pattern = re_compile(r"\b(19|20)\d{2}\b") _sanitize_pattern = re_compile(r'[<>:"/\\?*]') def __init__(self): self.vars = {} self.audio_streams = [] self.subtitle_streams = [] @staticmethod def convert_lang_code(lang_code): if not lang_code or lang_code in {"unknown", "und", "none"}: return lang_code try: if len(lang_code) == 2: lang = languages.get(alpha_2=lang_code.lower()) elif len(lang_code) == 3: lang = languages.get(alpha_3=lang_code.lower()) else: return lang_code return lang.name if lang else lang_code except Exception: return lang_code async def extract_file_vars(self, file_path): fname = basename(file_path) bname, ext = splitext(fname) self.vars = { "filename": fname, "basename": bname, "extension": ext.lstrip("."), "audiolang": "unknown", "sublang": "none", } self.audio_streams, self.subtitle_streams = [], [] try: for s in await get_streams(file_path) or []: ctype = s.get("codec_type", "").lower() slang = s.get("tags", {}).get("language", "unknown") full_lang = self.convert_lang_code(slang) entry = { "index": s.get("index", 0), "language": slang, "full_language": full_lang, } if ctype == "audio": self.audio_streams.append(entry) if self.vars["audiolang"] == "unknown" and slang != "und": self.vars["audiolang"] = full_lang elif ctype == "subtitle": self.subtitle_streams.append(entry) if self.vars["sublang"] == "none" and slang != "und": self.vars["sublang"] = full_lang except Exception: pass m = self._year_pattern.findall(bname) if m: self.vars["year"] = m[-1] @staticmethod def parse_string(metadata_str): if not metadata_str or not isinstance(metadata_str, str): return {} parts, current, i = [], "", 0 while i < len(metadata_str): if ( metadata_str[i] == "\\" and i + 1 < len(metadata_str) and metadata_str[i + 1] == "|" ): current += "|" i += 2 elif metadata_str[i] == "|": parts.append(current) current = "" i += 1 else: current += metadata_str[i] i += 1 if current: parts.append(current) return dict(p.split("=", 1) if "=" in p else (p, "") for p in parts) @staticmethod def merge_dicts(default_dict, cmd_dict): return {**(default_dict or {}), **(cmd_dict or {})} def apply_vars_to_stream( self, metadata_dict, stream_lang=None, full_lang=None, stream_type="audio" ): if not isinstance(metadata_dict, dict): return {} vars_with_stream = self.vars.copy() if stream_lang and stream_lang != "unknown": key = "audiolang" if stream_type == "audio" else "sublang" vars_with_stream[key] = full_lang or self.convert_lang_code(stream_lang) return { self.sanitize(k): ( str(v).format(**vars_with_stream) if isinstance(v, str) else str(v) ) for k, v in metadata_dict.items() } def apply_vars(self, metadata_dict): return self.apply_vars_to_stream(metadata_dict) def get_audio_metadata(self, audio_metadata_dict): return [ { "index": s["index"], "metadata": self.apply_vars_to_stream( audio_metadata_dict, s["language"], s["full_language"], "audio" ), } for s in self.audio_streams ] def get_subtitle_metadata(self, subtitle_metadata_dict): return [ { "index": s["index"], "metadata": self.apply_vars_to_stream( subtitle_metadata_dict, s["language"], s["full_language"], "subtitle", ), } for s in self.subtitle_streams ] def sanitize(self, value): return self._sanitize_pattern.sub("_", str(value))[:100] async def process_all( self, video_metadata_dict, audio_metadata_dict, subtitle_metadata_dict, file_path, ): await self.extract_file_vars(file_path) return { "video": ( self.apply_vars(video_metadata_dict) if video_metadata_dict else {} ), "audio_streams": ( self.get_audio_metadata(audio_metadata_dict) if audio_metadata_dict else [] ), "subtitle_streams": ( self.get_subtitle_metadata(subtitle_metadata_dict) if subtitle_metadata_dict else [] ), "global": {}, } async def process(self, metadata_dict, file_path): await self.extract_file_vars(file_path) return self.apply_vars(metadata_dict)