| | from __future__ import annotations |
| |
|
| | import importlib |
| | import logging |
| | import unicodedata |
| | from codecs import IncrementalDecoder |
| | from encodings.aliases import aliases |
| | from functools import lru_cache |
| | from re import findall |
| | from typing import Generator |
| |
|
| | from _multibytecodec import ( |
| | MultibyteIncrementalDecoder, |
| | ) |
| |
|
| | from .constant import ( |
| | ENCODING_MARKS, |
| | IANA_SUPPORTED_SIMILAR, |
| | RE_POSSIBLE_ENCODING_INDICATION, |
| | UNICODE_RANGES_COMBINED, |
| | UNICODE_SECONDARY_RANGE_KEYWORD, |
| | UTF8_MAXIMAL_ALLOCATION, |
| | COMMON_CJK_CHARACTERS, |
| | ) |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_accentuated(character: str) -> bool: |
| | try: |
| | description: str = unicodedata.name(character) |
| | except ValueError: |
| | return False |
| | return ( |
| | "WITH GRAVE" in description |
| | or "WITH ACUTE" in description |
| | or "WITH CEDILLA" in description |
| | or "WITH DIAERESIS" in description |
| | or "WITH CIRCUMFLEX" in description |
| | or "WITH TILDE" in description |
| | or "WITH MACRON" in description |
| | or "WITH RING ABOVE" in description |
| | ) |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def remove_accent(character: str) -> str: |
| | decomposed: str = unicodedata.decomposition(character) |
| | if not decomposed: |
| | return character |
| |
|
| | codes: list[str] = decomposed.split(" ") |
| |
|
| | return chr(int(codes[0], 16)) |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def unicode_range(character: str) -> str | None: |
| | """ |
| | Retrieve the Unicode range official name from a single character. |
| | """ |
| | character_ord: int = ord(character) |
| |
|
| | for range_name, ord_range in UNICODE_RANGES_COMBINED.items(): |
| | if character_ord in ord_range: |
| | return range_name |
| |
|
| | return None |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_latin(character: str) -> bool: |
| | try: |
| | description: str = unicodedata.name(character) |
| | except ValueError: |
| | return False |
| | return "LATIN" in description |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_punctuation(character: str) -> bool: |
| | character_category: str = unicodedata.category(character) |
| |
|
| | if "P" in character_category: |
| | return True |
| |
|
| | character_range: str | None = unicode_range(character) |
| |
|
| | if character_range is None: |
| | return False |
| |
|
| | return "Punctuation" in character_range |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_symbol(character: str) -> bool: |
| | character_category: str = unicodedata.category(character) |
| |
|
| | if "S" in character_category or "N" in character_category: |
| | return True |
| |
|
| | character_range: str | None = unicode_range(character) |
| |
|
| | if character_range is None: |
| | return False |
| |
|
| | return "Forms" in character_range and character_category != "Lo" |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_emoticon(character: str) -> bool: |
| | character_range: str | None = unicode_range(character) |
| |
|
| | if character_range is None: |
| | return False |
| |
|
| | return "Emoticons" in character_range or "Pictographs" in character_range |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_separator(character: str) -> bool: |
| | if character.isspace() or character in {"|", "+", "<", ">"}: |
| | return True |
| |
|
| | character_category: str = unicodedata.category(character) |
| |
|
| | return "Z" in character_category or character_category in {"Po", "Pd", "Pc"} |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_case_variable(character: str) -> bool: |
| | return character.islower() != character.isupper() |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_cjk(character: str) -> bool: |
| | try: |
| | character_name = unicodedata.name(character) |
| | except ValueError: |
| | return False |
| |
|
| | return "CJK" in character_name |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_hiragana(character: str) -> bool: |
| | try: |
| | character_name = unicodedata.name(character) |
| | except ValueError: |
| | return False |
| |
|
| | return "HIRAGANA" in character_name |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_katakana(character: str) -> bool: |
| | try: |
| | character_name = unicodedata.name(character) |
| | except ValueError: |
| | return False |
| |
|
| | return "KATAKANA" in character_name |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_hangul(character: str) -> bool: |
| | try: |
| | character_name = unicodedata.name(character) |
| | except ValueError: |
| | return False |
| |
|
| | return "HANGUL" in character_name |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_thai(character: str) -> bool: |
| | try: |
| | character_name = unicodedata.name(character) |
| | except ValueError: |
| | return False |
| |
|
| | return "THAI" in character_name |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_arabic(character: str) -> bool: |
| | try: |
| | character_name = unicodedata.name(character) |
| | except ValueError: |
| | return False |
| |
|
| | return "ARABIC" in character_name |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_arabic_isolated_form(character: str) -> bool: |
| | try: |
| | character_name = unicodedata.name(character) |
| | except ValueError: |
| | return False |
| |
|
| | return "ARABIC" in character_name and "ISOLATED FORM" in character_name |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_cjk_uncommon(character: str) -> bool: |
| | return character not in COMMON_CJK_CHARACTERS |
| |
|
| |
|
| | @lru_cache(maxsize=len(UNICODE_RANGES_COMBINED)) |
| | def is_unicode_range_secondary(range_name: str) -> bool: |
| | return any(keyword in range_name for keyword in UNICODE_SECONDARY_RANGE_KEYWORD) |
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) |
| | def is_unprintable(character: str) -> bool: |
| | return ( |
| | character.isspace() is False |
| | and character.isprintable() is False |
| | and character != "\x1a" |
| | and character != "\ufeff" |
| | |
| | ) |
| |
|
| |
|
| | def any_specified_encoding(sequence: bytes, search_zone: int = 8192) -> str | None: |
| | """ |
| | Extract using ASCII-only decoder any specified encoding in the first n-bytes. |
| | """ |
| | if not isinstance(sequence, bytes): |
| | raise TypeError |
| |
|
| | seq_len: int = len(sequence) |
| |
|
| | results: list[str] = findall( |
| | RE_POSSIBLE_ENCODING_INDICATION, |
| | sequence[: min(seq_len, search_zone)].decode("ascii", errors="ignore"), |
| | ) |
| |
|
| | if len(results) == 0: |
| | return None |
| |
|
| | for specified_encoding in results: |
| | specified_encoding = specified_encoding.lower().replace("-", "_") |
| |
|
| | encoding_alias: str |
| | encoding_iana: str |
| |
|
| | for encoding_alias, encoding_iana in aliases.items(): |
| | if encoding_alias == specified_encoding: |
| | return encoding_iana |
| | if encoding_iana == specified_encoding: |
| | return encoding_iana |
| |
|
| | return None |
| |
|
| |
|
| | @lru_cache(maxsize=128) |
| | def is_multi_byte_encoding(name: str) -> bool: |
| | """ |
| | Verify is a specific encoding is a multi byte one based on it IANA name |
| | """ |
| | return name in { |
| | "utf_8", |
| | "utf_8_sig", |
| | "utf_16", |
| | "utf_16_be", |
| | "utf_16_le", |
| | "utf_32", |
| | "utf_32_le", |
| | "utf_32_be", |
| | "utf_7", |
| | } or issubclass( |
| | importlib.import_module(f"encodings.{name}").IncrementalDecoder, |
| | MultibyteIncrementalDecoder, |
| | ) |
| |
|
| |
|
| | def identify_sig_or_bom(sequence: bytes) -> tuple[str | None, bytes]: |
| | """ |
| | Identify and extract SIG/BOM in given sequence. |
| | """ |
| |
|
| | for iana_encoding in ENCODING_MARKS: |
| | marks: bytes | list[bytes] = ENCODING_MARKS[iana_encoding] |
| |
|
| | if isinstance(marks, bytes): |
| | marks = [marks] |
| |
|
| | for mark in marks: |
| | if sequence.startswith(mark): |
| | return iana_encoding, mark |
| |
|
| | return None, b"" |
| |
|
| |
|
| | def should_strip_sig_or_bom(iana_encoding: str) -> bool: |
| | return iana_encoding not in {"utf_16", "utf_32"} |
| |
|
| |
|
| | def iana_name(cp_name: str, strict: bool = True) -> str: |
| | """Returns the Python normalized encoding name (Not the IANA official name).""" |
| | cp_name = cp_name.lower().replace("-", "_") |
| |
|
| | encoding_alias: str |
| | encoding_iana: str |
| |
|
| | for encoding_alias, encoding_iana in aliases.items(): |
| | if cp_name in [encoding_alias, encoding_iana]: |
| | return encoding_iana |
| |
|
| | if strict: |
| | raise ValueError(f"Unable to retrieve IANA for '{cp_name}'") |
| |
|
| | return cp_name |
| |
|
| |
|
| | def cp_similarity(iana_name_a: str, iana_name_b: str) -> float: |
| | if is_multi_byte_encoding(iana_name_a) or is_multi_byte_encoding(iana_name_b): |
| | return 0.0 |
| |
|
| | decoder_a = importlib.import_module(f"encodings.{iana_name_a}").IncrementalDecoder |
| | decoder_b = importlib.import_module(f"encodings.{iana_name_b}").IncrementalDecoder |
| |
|
| | id_a: IncrementalDecoder = decoder_a(errors="ignore") |
| | id_b: IncrementalDecoder = decoder_b(errors="ignore") |
| |
|
| | character_match_count: int = 0 |
| |
|
| | for i in range(255): |
| | to_be_decoded: bytes = bytes([i]) |
| | if id_a.decode(to_be_decoded) == id_b.decode(to_be_decoded): |
| | character_match_count += 1 |
| |
|
| | return character_match_count / 254 |
| |
|
| |
|
| | def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool: |
| | """ |
| | Determine if two code page are at least 80% similar. IANA_SUPPORTED_SIMILAR dict was generated using |
| | the function cp_similarity. |
| | """ |
| | return ( |
| | iana_name_a in IANA_SUPPORTED_SIMILAR |
| | and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a] |
| | ) |
| |
|
| |
|
| | def set_logging_handler( |
| | name: str = "charset_normalizer", |
| | level: int = logging.INFO, |
| | format_string: str = "%(asctime)s | %(levelname)s | %(message)s", |
| | ) -> None: |
| | logger = logging.getLogger(name) |
| | logger.setLevel(level) |
| |
|
| | handler = logging.StreamHandler() |
| | handler.setFormatter(logging.Formatter(format_string)) |
| | logger.addHandler(handler) |
| |
|
| |
|
| | def cut_sequence_chunks( |
| | sequences: bytes, |
| | encoding_iana: str, |
| | offsets: range, |
| | chunk_size: int, |
| | bom_or_sig_available: bool, |
| | strip_sig_or_bom: bool, |
| | sig_payload: bytes, |
| | is_multi_byte_decoder: bool, |
| | decoded_payload: str | None = None, |
| | ) -> Generator[str, None, None]: |
| | if decoded_payload and is_multi_byte_decoder is False: |
| | for i in offsets: |
| | chunk = decoded_payload[i : i + chunk_size] |
| | if not chunk: |
| | break |
| | yield chunk |
| | else: |
| | for i in offsets: |
| | chunk_end = i + chunk_size |
| | if chunk_end > len(sequences) + 8: |
| | continue |
| |
|
| | cut_sequence = sequences[i : i + chunk_size] |
| |
|
| | if bom_or_sig_available and strip_sig_or_bom is False: |
| | cut_sequence = sig_payload + cut_sequence |
| |
|
| | chunk = cut_sequence.decode( |
| | encoding_iana, |
| | errors="ignore" if is_multi_byte_decoder else "strict", |
| | ) |
| |
|
| | |
| | |
| | if is_multi_byte_decoder and i > 0: |
| | chunk_partial_size_chk: int = min(chunk_size, 16) |
| |
|
| | if ( |
| | decoded_payload |
| | and chunk[:chunk_partial_size_chk] not in decoded_payload |
| | ): |
| | for j in range(i, i - 4, -1): |
| | cut_sequence = sequences[j:chunk_end] |
| |
|
| | if bom_or_sig_available and strip_sig_or_bom is False: |
| | cut_sequence = sig_payload + cut_sequence |
| |
|
| | chunk = cut_sequence.decode(encoding_iana, errors="ignore") |
| |
|
| | if chunk[:chunk_partial_size_chk] in decoded_payload: |
| | break |
| |
|
| | yield chunk |
| |
|