| """This module provides a query interface for media streams and captions.""" |
| from collections.abc import Mapping, Sequence |
| from typing import Callable, List, Optional, Union |
|
|
| from pytubefix import Caption, Stream |
| from pytubefix.helpers import deprecated |
|
|
|
|
| class StreamQuery(Sequence): |
| """Interface for querying the available media streams.""" |
|
|
| def __init__(self, fmt_streams): |
| """Construct a :class:`StreamQuery <StreamQuery>`. |
| |
| param list fmt_streams: |
| list of :class:`Stream <Stream>` instances. |
| """ |
| self.fmt_streams = fmt_streams |
| self.itag_index = {int(s.itag): s for s in fmt_streams} |
|
|
| def filter( |
| self, |
| fps=None, |
| res=None, |
| resolution=None, |
| mime_type=None, |
| type=None, |
| subtype=None, |
| file_extension=None, |
| abr=None, |
| bitrate=None, |
| video_codec=None, |
| audio_codec=None, |
| only_audio=None, |
| only_video=None, |
| progressive=None, |
| adaptive=None, |
| is_dash=None, |
| is_drc=None, |
| audio_track_name=None, |
| custom_filter_functions=None, |
| ): |
| """Apply the given filtering criterion. |
| |
| :param fps: |
| (optional) The frames per second. |
| :type fps: |
| int or None |
| |
| :param resolution: |
| (optional) Alias to ``res``. |
| :type res: |
| str or None |
| |
| :param res: |
| (optional) The video resolution. |
| :type resolution: |
| str or None |
| |
| :param mime_type: |
| (optional) Two-part identifier for file formats and format contents |
| composed of a "type", a "subtype". |
| :type mime_type: |
| str or None |
| |
| :param type: |
| (optional) Type part of the ``mime_type`` (e.g.: audio, video). |
| :type type: |
| str or None |
| |
| :param subtype: |
| (optional) Sub-type part of the ``mime_type`` (e.g.: mp4, mov). |
| :type subtype: |
| str or None |
| |
| :param file_extension: |
| (optional) Alias to ``sub_type``. |
| :type file_extension: |
| str or None |
| |
| :param abr: |
| (optional) Average bitrate (ABR) refers to the average amount of |
| data transferred per unit of time (e.g.: 64kbps, 192kbps). |
| :type abr: |
| str or None |
| |
| :param bitrate: |
| (optional) Alias to ``abr``. |
| :type bitrate: |
| str or None |
| |
| :param video_codec: |
| (optional) Video compression format. |
| :type video_codec: |
| str or None |
| |
| :param audio_codec: |
| (optional) Audio compression format. |
| :type audio_codec: |
| str or None |
| |
| :param bool progressive: |
| Excludes adaptive streams (one file contains both audio and video |
| tracks). |
| |
| :param bool adaptive: |
| Excludes progressive streams (audio and video are on separate |
| tracks). |
| |
| :param bool is_dash: |
| Include/exclude dash streams. |
| |
| :param bool only_audio: |
| Excludes streams with video tracks. |
| |
| :param bool only_video: |
| Excludes streams with audio tracks. |
| |
| :param bool is_drc: |
| Include/exclude stable volume streams. |
| |
| :param audio_track_name: |
| Name of the dubbed audio track |
| :type type: |
| str or None |
| |
| :param custom_filter_functions: |
| (optional) Interface for defining complex filters without |
| subclassing. |
| :type custom_filter_functions: |
| list or None |
| |
| """ |
| filters = [] |
| if res or resolution: |
| if isinstance(res, str) or isinstance(resolution, str): |
| filters.append(lambda s: s.resolution == (res or resolution)) |
| elif isinstance(res, list) or isinstance(resolution, list): |
| filters.append(lambda s: s.resolution in (res or resolution)) |
|
|
| if fps: |
| filters.append(lambda s: s.fps == fps) |
|
|
| if mime_type: |
| filters.append(lambda s: s.mime_type == mime_type) |
|
|
| if type: |
| filters.append(lambda s: s.type == type) |
|
|
| if subtype or file_extension: |
| filters.append(lambda s: s.subtype == (subtype or file_extension)) |
|
|
| if abr or bitrate: |
| filters.append(lambda s: s.abr == (abr or bitrate)) |
|
|
| if video_codec: |
| filters.append(lambda s: s.video_codec == video_codec) |
|
|
| if audio_codec: |
| filters.append(lambda s: s.audio_codec == audio_codec) |
|
|
| if only_audio: |
| filters.append( |
| lambda s: ( |
| s.includes_audio_track and not s.includes_video_track |
| ), |
| ) |
|
|
| if only_video: |
| filters.append( |
| lambda s: ( |
| s.includes_video_track and not s.includes_audio_track |
| ), |
| ) |
|
|
| if progressive: |
| filters.append(lambda s: s.is_progressive) |
|
|
| if adaptive: |
| filters.append(lambda s: s.is_adaptive) |
|
|
| if audio_track_name: |
| filters.append(lambda s: s.audio_track_name == audio_track_name) |
|
|
| if custom_filter_functions: |
| filters.extend(custom_filter_functions) |
|
|
| if is_dash is not None: |
| filters.append(lambda s: s.is_dash == is_dash) |
|
|
| if is_drc is not None: |
| filters.append(lambda s: s.is_drc == is_drc) |
|
|
| return self._filter(filters) |
|
|
| def _filter(self, filters: List[Callable]) -> "StreamQuery": |
| fmt_streams = self.fmt_streams |
| for filter_lambda in filters: |
| fmt_streams = filter(filter_lambda, fmt_streams) |
| return StreamQuery(list(fmt_streams)) |
|
|
| def order_by(self, attribute_name: str) -> "StreamQuery": |
| """Apply a sort order. Filters out stream the do not have the attribute. |
| |
| :param str attribute_name: |
| The name of the attribute to sort by. |
| """ |
| has_attribute = [ |
| s |
| for s in self.fmt_streams |
| if getattr(s, attribute_name) is not None |
| ] |
| |
| if has_attribute and isinstance( |
| getattr(has_attribute[0], attribute_name), str |
| ): |
| |
| |
| try: |
| return StreamQuery( |
| sorted( |
| has_attribute, |
| key=lambda s: int( |
| "".join( |
| filter(str.isdigit, getattr(s, attribute_name)) |
| ) |
| ), |
| ) |
| ) |
| except ValueError: |
| pass |
|
|
| return StreamQuery( |
| sorted(has_attribute, key=lambda s: getattr(s, attribute_name)) |
| ) |
|
|
| def desc(self) -> "StreamQuery": |
| """Sort streams in descending order. |
| |
| :rtype: :class:`StreamQuery <StreamQuery>` |
| |
| """ |
| return StreamQuery(self.fmt_streams[::-1]) |
|
|
| def asc(self) -> "StreamQuery": |
| """Sort streams in ascending order. |
| |
| :rtype: :class:`StreamQuery <StreamQuery>` |
| |
| """ |
| return self |
|
|
| def get_by_itag(self, itag: Union[int, str]) -> Optional[Stream]: |
| """Get the corresponding :class:`Stream <Stream>` for a given itag. |
| |
| :param int itag: |
| YouTube format identifier code. |
| :rtype: :class:`Stream <Stream>` or None |
| :returns: |
| The :class:`Stream <Stream>` matching the given itag or None if |
| not found. |
| |
| """ |
| if isinstance(itag, int): |
| return self.itag_index.get(itag) |
| elif isinstance(itag, str) and itag.isdigit(): |
| return self.itag_index.get(int(itag)) |
|
|
| def get_by_resolution(self, resolution: str) -> Optional[Stream]: |
| """Get the corresponding :class:`Stream <Stream>` for a given resolution. |
| |
| Stream must be a progressive mp4. |
| |
| :param str resolution: |
| Video resolution i.e. "720p", "480p", "360p", "240p", "144p" |
| :rtype: :class:`Stream <Stream>` or None |
| :returns: |
| The :class:`Stream <Stream>` matching the given itag or None if |
| not found. |
| |
| """ |
| return self.filter( |
| progressive=True, subtype="mp4", resolution=resolution |
| ).first() |
|
|
| def get_default_audio_track(self) -> "StreamQuery": |
| """Takes the standard audio tracks, will return all audio tracks if there is no dubbing. |
| |
| :rtype: :class:`StreamQuery <StreamQuery>` |
| :returns: A StreamQuery object with filtered default dubbing streams. |
| """ |
| return self._filter([lambda s: s.is_default_audio_track]) |
|
|
| def get_extra_audio_track(self) -> Optional["StreamQuery"]: |
| """Get only dubbed audio tracks. |
| |
| :rtype: :class:`StreamQuery <StreamQuery>` or None |
| :returns: A StreamQuery object with filtering only the dubbing streams. |
| """ |
| return self._filter([lambda s: |
| not s.is_default_audio_track |
| and s.includes_audio_track |
| and not s.includes_video_track]) |
|
|
| def get_extra_audio_track_by_name(self, name) -> Optional["StreamQuery"]: |
| """Filter dubbed audio streams by name |
| |
| :rtype: :class:`StreamQuery <StreamQuery>` or None |
| :returns: A StreamQuery object filtering dubbed audio streams by name. |
| """ |
| return self._filter([lambda s: s.audio_track_name == name]) |
|
|
| def get_lowest_resolution(self, progressive=True) -> Optional[Stream]: |
| """Get lowest resolution stream that is a progressive mp4. |
| |
| :param bool progressive: |
| Filter only progressive streams (video and audio in the same file), default is True. |
| Set False to get the adaptive stream (separate video and audio) at the lowest resolution |
| :rtype: :class:`Stream <Stream>` or None |
| :returns: |
| The :class:`Stream <Stream>` matching the given itag or None if |
| not found. |
| |
| """ |
| return ( |
| self.filter(progressive=progressive, subtype="mp4") |
| .order_by("resolution") |
| .first() |
| ) |
|
|
| def get_highest_resolution(self, progressive=True, mime_type=None) -> Optional[Stream]: |
| """Get highest resolution stream that is a progressive video. |
| |
| :param bool progressive: |
| Filter only progressive streams (video and audio in the same file), default is True. |
| Set False to get the adaptive stream (separate video and audio) at the highest resolution |
| :param str mime_type: |
| Filter by mime_type. Leave as None to accept any mime_type. |
| :rtype: :class:`Stream <Stream>` or None |
| :returns: |
| The :class:`Stream <Stream>` matching the given itag or None if |
| not found. |
| |
| """ |
| return self.filter(progressive=progressive, mime_type=mime_type).order_by("resolution").last() |
|
|
| def get_audio_only(self, subtype: str = "mp4") -> Optional[Stream]: |
| """Get highest bitrate audio stream for given codec (defaults to mp4) |
| |
| :param str subtype: |
| Audio subtype, defaults to mp4 |
| :rtype: :class:`Stream <Stream>` or None |
| :returns: |
| The :class:`Stream <Stream>` matching the given itag or None if |
| not found. |
| """ |
| return ( |
| self.filter(only_audio=True, subtype=subtype) |
| .order_by("abr") |
| .last() |
| ) |
|
|
| def otf(self, is_otf: bool = False) -> "StreamQuery": |
| """Filter stream by OTF, useful if some streams have 404 URLs |
| |
| :param bool is_otf: Set to False to retrieve only non-OTF streams |
| :rtype: :class:`StreamQuery <StreamQuery>` |
| :returns: A StreamQuery object with otf filtered streams |
| """ |
| return self._filter([lambda s: s.is_otf == is_otf]) |
|
|
| def first(self) -> Optional[Stream]: |
| """Get the first :class:`Stream <Stream>` in the results. |
| |
| :rtype: :class:`Stream <Stream>` or None |
| :returns: |
| the first result of this query or None if the result doesn't |
| contain any streams. |
| |
| """ |
| try: |
| return self.fmt_streams[0] |
| except IndexError: |
| return None |
|
|
| def last(self): |
| """Get the last :class:`Stream <Stream>` in the results. |
| |
| :rtype: :class:`Stream <Stream>` or None |
| :returns: |
| Return the last result of this query or None if the result |
| doesn't contain any streams. |
| |
| """ |
| try: |
| return self.fmt_streams[-1] |
| except IndexError: |
| pass |
|
|
| @deprecated("Get the size of this list directly using len()") |
| def count(self, value: Optional[str] = None) -> int: |
| """Get the count of items in the list. |
| |
| :rtype: int |
| """ |
| return self.fmt_streams.count(value) if value else len(self) |
|
|
| @deprecated("This object can be treated as a list, all() is useless") |
| def all(self) -> List[Stream]: |
| """Get all the results represented by this query as a list. |
| |
| :rtype: list |
| |
| """ |
| return self.fmt_streams |
|
|
| def __getitem__(self, i: Union[slice, int]): |
| return self.fmt_streams[i] |
|
|
| def __len__(self) -> int: |
| return len(self.fmt_streams) |
|
|
| def __repr__(self) -> str: |
| return f"{self.fmt_streams}" |
|
|
|
|
| class CaptionQuery(Mapping): |
| """Interface for querying the available captions.""" |
|
|
| def __init__(self, captions: List[Caption]): |
| """Construct a :class:`Caption <Caption>`. |
| |
| param list captions: |
| list of :class:`Caption <Caption>` instances. |
| |
| """ |
| self.lang_code_index = {c.code: c for c in captions} |
|
|
| @deprecated( |
| "This object can be treated as a dictionary, i.e. captions['en']" |
| ) |
| def get_by_language_code( |
| self, lang_code: str |
| ) -> Optional[Caption]: |
| """Get the :class:`Caption <Caption>` for a given ``lang_code``. |
| |
| :param str lang_code: |
| The code that identifies the caption language. |
| :rtype: :class:`Caption <Caption>` or None |
| :returns: |
| The :class:`Caption <Caption>` matching the given ``lang_code`` or |
| None if it does not exist. |
| """ |
| return self.lang_code_index.get(lang_code) |
|
|
| @deprecated("This object can be treated as a dictionary") |
| def all(self) -> List[Caption]: |
| """Get all the results represented by this query as a list. |
| |
| :rtype: list |
| |
| """ |
| return list(self.lang_code_index.values()) |
|
|
| def __getitem__(self, i: str): |
| return self.lang_code_index[i] |
|
|
| def __len__(self) -> int: |
| return len(self.lang_code_index) |
|
|
| def __iter__(self): |
| return iter(self.lang_code_index.values()) |
|
|
| def __repr__(self) -> str: |
| return f"{self.lang_code_index}" |
|
|