| | from __future__ import annotations
|
| |
|
| | import importlib
|
| | import logging
|
| | import unicodedata
|
| | from codecs import IncrementalDecoder
|
| | from encodings.aliases import aliases
|
| | from functools import lru_cache
|
| | from re import findall
|
| | from typing import Generator
|
| |
|
| | from _multibytecodec import (
|
| | MultibyteIncrementalDecoder,
|
| | )
|
| |
|
| | from .constant import (
|
| | ENCODING_MARKS,
|
| | IANA_SUPPORTED_SIMILAR,
|
| | RE_POSSIBLE_ENCODING_INDICATION,
|
| | UNICODE_RANGES_COMBINED,
|
| | UNICODE_SECONDARY_RANGE_KEYWORD,
|
| | UTF8_MAXIMAL_ALLOCATION,
|
| | COMMON_CJK_CHARACTERS,
|
| | )
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_accentuated(character: str) -> bool:
|
| | try:
|
| | description: str = unicodedata.name(character)
|
| | except ValueError:
|
| | return False
|
| | return (
|
| | "WITH GRAVE" in description
|
| | or "WITH ACUTE" in description
|
| | or "WITH CEDILLA" in description
|
| | or "WITH DIAERESIS" in description
|
| | or "WITH CIRCUMFLEX" in description
|
| | or "WITH TILDE" in description
|
| | or "WITH MACRON" in description
|
| | or "WITH RING ABOVE" in description
|
| | )
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def remove_accent(character: str) -> str:
|
| | decomposed: str = unicodedata.decomposition(character)
|
| | if not decomposed:
|
| | return character
|
| |
|
| | codes: list[str] = decomposed.split(" ")
|
| |
|
| | return chr(int(codes[0], 16))
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def unicode_range(character: str) -> str | None:
|
| | """
|
| | Retrieve the Unicode range official name from a single character.
|
| | """
|
| | character_ord: int = ord(character)
|
| |
|
| | for range_name, ord_range in UNICODE_RANGES_COMBINED.items():
|
| | if character_ord in ord_range:
|
| | return range_name
|
| |
|
| | return None
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_latin(character: str) -> bool:
|
| | try:
|
| | description: str = unicodedata.name(character)
|
| | except ValueError:
|
| | return False
|
| | return "LATIN" in description
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_punctuation(character: str) -> bool:
|
| | character_category: str = unicodedata.category(character)
|
| |
|
| | if "P" in character_category:
|
| | return True
|
| |
|
| | character_range: str | None = unicode_range(character)
|
| |
|
| | if character_range is None:
|
| | return False
|
| |
|
| | return "Punctuation" in character_range
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_symbol(character: str) -> bool:
|
| | character_category: str = unicodedata.category(character)
|
| |
|
| | if "S" in character_category or "N" in character_category:
|
| | return True
|
| |
|
| | character_range: str | None = unicode_range(character)
|
| |
|
| | if character_range is None:
|
| | return False
|
| |
|
| | return "Forms" in character_range and character_category != "Lo"
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_emoticon(character: str) -> bool:
|
| | character_range: str | None = unicode_range(character)
|
| |
|
| | if character_range is None:
|
| | return False
|
| |
|
| | return "Emoticons" in character_range or "Pictographs" in character_range
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_separator(character: str) -> bool:
|
| | if character.isspace() or character in {"|", "+", "<", ">"}:
|
| | return True
|
| |
|
| | character_category: str = unicodedata.category(character)
|
| |
|
| | return "Z" in character_category or character_category in {"Po", "Pd", "Pc"}
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_case_variable(character: str) -> bool:
|
| | return character.islower() != character.isupper()
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_cjk(character: str) -> bool:
|
| | try:
|
| | character_name = unicodedata.name(character)
|
| | except ValueError:
|
| | return False
|
| |
|
| | return "CJK" in character_name
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_hiragana(character: str) -> bool:
|
| | try:
|
| | character_name = unicodedata.name(character)
|
| | except ValueError:
|
| | return False
|
| |
|
| | return "HIRAGANA" in character_name
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_katakana(character: str) -> bool:
|
| | try:
|
| | character_name = unicodedata.name(character)
|
| | except ValueError:
|
| | return False
|
| |
|
| | return "KATAKANA" in character_name
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_hangul(character: str) -> bool:
|
| | try:
|
| | character_name = unicodedata.name(character)
|
| | except ValueError:
|
| | return False
|
| |
|
| | return "HANGUL" in character_name
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_thai(character: str) -> bool:
|
| | try:
|
| | character_name = unicodedata.name(character)
|
| | except ValueError:
|
| | return False
|
| |
|
| | return "THAI" in character_name
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_arabic(character: str) -> bool:
|
| | try:
|
| | character_name = unicodedata.name(character)
|
| | except ValueError:
|
| | return False
|
| |
|
| | return "ARABIC" in character_name
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_arabic_isolated_form(character: str) -> bool:
|
| | try:
|
| | character_name = unicodedata.name(character)
|
| | except ValueError:
|
| | return False
|
| |
|
| | return "ARABIC" in character_name and "ISOLATED FORM" in character_name
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_cjk_uncommon(character: str) -> bool:
|
| | return character not in COMMON_CJK_CHARACTERS
|
| |
|
| |
|
| | @lru_cache(maxsize=len(UNICODE_RANGES_COMBINED))
|
| | def is_unicode_range_secondary(range_name: str) -> bool:
|
| | return any(keyword in range_name for keyword in UNICODE_SECONDARY_RANGE_KEYWORD)
|
| |
|
| |
|
| | @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
|
| | def is_unprintable(character: str) -> bool:
|
| | return (
|
| | character.isspace() is False
|
| | and character.isprintable() is False
|
| | and character != "\x1a"
|
| | and character != "\ufeff"
|
| |
|
| | )
|
| |
|
| |
|
| | def any_specified_encoding(sequence: bytes, search_zone: int = 8192) -> str | None:
|
| | """
|
| | Extract using ASCII-only decoder any specified encoding in the first n-bytes.
|
| | """
|
| | if not isinstance(sequence, bytes):
|
| | raise TypeError
|
| |
|
| | seq_len: int = len(sequence)
|
| |
|
| | results: list[str] = findall(
|
| | RE_POSSIBLE_ENCODING_INDICATION,
|
| | sequence[: min(seq_len, search_zone)].decode("ascii", errors="ignore"),
|
| | )
|
| |
|
| | if len(results) == 0:
|
| | return None
|
| |
|
| | for specified_encoding in results:
|
| | specified_encoding = specified_encoding.lower().replace("-", "_")
|
| |
|
| | encoding_alias: str
|
| | encoding_iana: str
|
| |
|
| | for encoding_alias, encoding_iana in aliases.items():
|
| | if encoding_alias == specified_encoding:
|
| | return encoding_iana
|
| | if encoding_iana == specified_encoding:
|
| | return encoding_iana
|
| |
|
| | return None
|
| |
|
| |
|
| | @lru_cache(maxsize=128)
|
| | def is_multi_byte_encoding(name: str) -> bool:
|
| | """
|
| | Verify is a specific encoding is a multi byte one based on it IANA name
|
| | """
|
| | return name in {
|
| | "utf_8",
|
| | "utf_8_sig",
|
| | "utf_16",
|
| | "utf_16_be",
|
| | "utf_16_le",
|
| | "utf_32",
|
| | "utf_32_le",
|
| | "utf_32_be",
|
| | "utf_7",
|
| | } or issubclass(
|
| | importlib.import_module(f"encodings.{name}").IncrementalDecoder,
|
| | MultibyteIncrementalDecoder,
|
| | )
|
| |
|
| |
|
| | def identify_sig_or_bom(sequence: bytes) -> tuple[str | None, bytes]:
|
| | """
|
| | Identify and extract SIG/BOM in given sequence.
|
| | """
|
| |
|
| | for iana_encoding in ENCODING_MARKS:
|
| | marks: bytes | list[bytes] = ENCODING_MARKS[iana_encoding]
|
| |
|
| | if isinstance(marks, bytes):
|
| | marks = [marks]
|
| |
|
| | for mark in marks:
|
| | if sequence.startswith(mark):
|
| | return iana_encoding, mark
|
| |
|
| | return None, b""
|
| |
|
| |
|
| | def should_strip_sig_or_bom(iana_encoding: str) -> bool:
|
| | return iana_encoding not in {"utf_16", "utf_32"}
|
| |
|
| |
|
| | def iana_name(cp_name: str, strict: bool = True) -> str:
|
| | """Returns the Python normalized encoding name (Not the IANA official name)."""
|
| | cp_name = cp_name.lower().replace("-", "_")
|
| |
|
| | encoding_alias: str
|
| | encoding_iana: str
|
| |
|
| | for encoding_alias, encoding_iana in aliases.items():
|
| | if cp_name in [encoding_alias, encoding_iana]:
|
| | return encoding_iana
|
| |
|
| | if strict:
|
| | raise ValueError(f"Unable to retrieve IANA for '{cp_name}'")
|
| |
|
| | return cp_name
|
| |
|
| |
|
| | def cp_similarity(iana_name_a: str, iana_name_b: str) -> float:
|
| | if is_multi_byte_encoding(iana_name_a) or is_multi_byte_encoding(iana_name_b):
|
| | return 0.0
|
| |
|
| | decoder_a = importlib.import_module(f"encodings.{iana_name_a}").IncrementalDecoder
|
| | decoder_b = importlib.import_module(f"encodings.{iana_name_b}").IncrementalDecoder
|
| |
|
| | id_a: IncrementalDecoder = decoder_a(errors="ignore")
|
| | id_b: IncrementalDecoder = decoder_b(errors="ignore")
|
| |
|
| | character_match_count: int = 0
|
| |
|
| | for i in range(255):
|
| | to_be_decoded: bytes = bytes([i])
|
| | if id_a.decode(to_be_decoded) == id_b.decode(to_be_decoded):
|
| | character_match_count += 1
|
| |
|
| | return character_match_count / 254
|
| |
|
| |
|
| | def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool:
|
| | """
|
| | Determine if two code page are at least 80% similar. IANA_SUPPORTED_SIMILAR dict was generated using
|
| | the function cp_similarity.
|
| | """
|
| | return (
|
| | iana_name_a in IANA_SUPPORTED_SIMILAR
|
| | and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a]
|
| | )
|
| |
|
| |
|
| | def set_logging_handler(
|
| | name: str = "charset_normalizer",
|
| | level: int = logging.INFO,
|
| | format_string: str = "%(asctime)s | %(levelname)s | %(message)s",
|
| | ) -> None:
|
| | logger = logging.getLogger(name)
|
| | logger.setLevel(level)
|
| |
|
| | handler = logging.StreamHandler()
|
| | handler.setFormatter(logging.Formatter(format_string))
|
| | logger.addHandler(handler)
|
| |
|
| |
|
| | def cut_sequence_chunks(
|
| | sequences: bytes,
|
| | encoding_iana: str,
|
| | offsets: range,
|
| | chunk_size: int,
|
| | bom_or_sig_available: bool,
|
| | strip_sig_or_bom: bool,
|
| | sig_payload: bytes,
|
| | is_multi_byte_decoder: bool,
|
| | decoded_payload: str | None = None,
|
| | ) -> Generator[str, None, None]:
|
| | if decoded_payload and is_multi_byte_decoder is False:
|
| | for i in offsets:
|
| | chunk = decoded_payload[i : i + chunk_size]
|
| | if not chunk:
|
| | break
|
| | yield chunk
|
| | else:
|
| | for i in offsets:
|
| | chunk_end = i + chunk_size
|
| | if chunk_end > len(sequences) + 8:
|
| | continue
|
| |
|
| | cut_sequence = sequences[i : i + chunk_size]
|
| |
|
| | if bom_or_sig_available and strip_sig_or_bom is False:
|
| | cut_sequence = sig_payload + cut_sequence
|
| |
|
| | chunk = cut_sequence.decode(
|
| | encoding_iana,
|
| | errors="ignore" if is_multi_byte_decoder else "strict",
|
| | )
|
| |
|
| |
|
| |
|
| | if is_multi_byte_decoder and i > 0:
|
| | chunk_partial_size_chk: int = min(chunk_size, 16)
|
| |
|
| | if (
|
| | decoded_payload
|
| | and chunk[:chunk_partial_size_chk] not in decoded_payload
|
| | ):
|
| | for j in range(i, i - 4, -1):
|
| | cut_sequence = sequences[j:chunk_end]
|
| |
|
| | if bom_or_sig_available and strip_sig_or_bom is False:
|
| | cut_sequence = sig_payload + cut_sequence
|
| |
|
| | chunk = cut_sequence.decode(encoding_iana, errors="ignore")
|
| |
|
| | if chunk[:chunk_partial_size_chk] in decoded_payload:
|
| | break
|
| |
|
| | yield chunk
|
| |
|