| | from __future__ import annotations |
| |
|
| | import logging |
| | from os import PathLike |
| | from typing import BinaryIO |
| |
|
| | from .cd import ( |
| | coherence_ratio, |
| | encoding_languages, |
| | mb_encoding_languages, |
| | merge_coherence_ratios, |
| | ) |
| | from .constant import IANA_SUPPORTED, TOO_BIG_SEQUENCE, TOO_SMALL_SEQUENCE, TRACE |
| | from .md import mess_ratio |
| | from .models import CharsetMatch, CharsetMatches |
| | from .utils import ( |
| | any_specified_encoding, |
| | cut_sequence_chunks, |
| | iana_name, |
| | identify_sig_or_bom, |
| | is_cp_similar, |
| | is_multi_byte_encoding, |
| | should_strip_sig_or_bom, |
| | ) |
| |
|
| | logger = logging.getLogger("charset_normalizer") |
| | explain_handler = logging.StreamHandler() |
| | explain_handler.setFormatter( |
| | logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") |
| | ) |
| |
|
| |
|
| | def from_bytes( |
| | sequences: bytes | bytearray, |
| | steps: int = 5, |
| | chunk_size: int = 512, |
| | threshold: float = 0.2, |
| | cp_isolation: list[str] | None = None, |
| | cp_exclusion: list[str] | None = None, |
| | preemptive_behaviour: bool = True, |
| | explain: bool = False, |
| | language_threshold: float = 0.1, |
| | enable_fallback: bool = True, |
| | ) -> CharsetMatches: |
| | """ |
| | Given a raw bytes sequence, return the best possibles charset usable to render str objects. |
| | If there is no results, it is a strong indicator that the source is binary/not text. |
| | By default, the process will extract 5 blocks of 512o each to assess the mess and coherence of a given sequence. |
| | And will give up a particular code page after 20% of measured mess. Those criteria are customizable at will. |
| | |
| | The preemptive behavior DOES NOT replace the traditional detection workflow, it prioritize a particular code page |
| | but never take it for granted. Can improve the performance. |
| | |
| | You may want to focus your attention to some code page or/and not others, use cp_isolation and cp_exclusion for that |
| | purpose. |
| | |
| | This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. |
| | By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' |
| | toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. |
| | Custom logging format and handler can be set manually. |
| | """ |
| |
|
| | if not isinstance(sequences, (bytearray, bytes)): |
| | raise TypeError( |
| | "Expected object of type bytes or bytearray, got: {}".format( |
| | type(sequences) |
| | ) |
| | ) |
| |
|
| | if explain: |
| | previous_logger_level: int = logger.level |
| | logger.addHandler(explain_handler) |
| | logger.setLevel(TRACE) |
| |
|
| | length: int = len(sequences) |
| |
|
| | if length == 0: |
| | logger.debug("Encoding detection on empty bytes, assuming utf_8 intention.") |
| | if explain: |
| | logger.removeHandler(explain_handler) |
| | logger.setLevel(previous_logger_level or logging.WARNING) |
| | return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) |
| |
|
| | if cp_isolation is not None: |
| | logger.log( |
| | TRACE, |
| | "cp_isolation is set. use this flag for debugging purpose. " |
| | "limited list of encoding allowed : %s.", |
| | ", ".join(cp_isolation), |
| | ) |
| | cp_isolation = [iana_name(cp, False) for cp in cp_isolation] |
| | else: |
| | cp_isolation = [] |
| |
|
| | if cp_exclusion is not None: |
| | logger.log( |
| | TRACE, |
| | "cp_exclusion is set. use this flag for debugging purpose. " |
| | "limited list of encoding excluded : %s.", |
| | ", ".join(cp_exclusion), |
| | ) |
| | cp_exclusion = [iana_name(cp, False) for cp in cp_exclusion] |
| | else: |
| | cp_exclusion = [] |
| |
|
| | if length <= (chunk_size * steps): |
| | logger.log( |
| | TRACE, |
| | "override steps (%i) and chunk_size (%i) as content does not fit (%i byte(s) given) parameters.", |
| | steps, |
| | chunk_size, |
| | length, |
| | ) |
| | steps = 1 |
| | chunk_size = length |
| |
|
| | if steps > 1 and length / steps < chunk_size: |
| | chunk_size = int(length / steps) |
| |
|
| | is_too_small_sequence: bool = len(sequences) < TOO_SMALL_SEQUENCE |
| | is_too_large_sequence: bool = len(sequences) >= TOO_BIG_SEQUENCE |
| |
|
| | if is_too_small_sequence: |
| | logger.log( |
| | TRACE, |
| | "Trying to detect encoding from a tiny portion of ({}) byte(s).".format( |
| | length |
| | ), |
| | ) |
| | elif is_too_large_sequence: |
| | logger.log( |
| | TRACE, |
| | "Using lazy str decoding because the payload is quite large, ({}) byte(s).".format( |
| | length |
| | ), |
| | ) |
| |
|
| | prioritized_encodings: list[str] = [] |
| |
|
| | specified_encoding: str | None = ( |
| | any_specified_encoding(sequences) if preemptive_behaviour else None |
| | ) |
| |
|
| | if specified_encoding is not None: |
| | prioritized_encodings.append(specified_encoding) |
| | logger.log( |
| | TRACE, |
| | "Detected declarative mark in sequence. Priority +1 given for %s.", |
| | specified_encoding, |
| | ) |
| |
|
| | tested: set[str] = set() |
| | tested_but_hard_failure: list[str] = [] |
| | tested_but_soft_failure: list[str] = [] |
| |
|
| | fallback_ascii: CharsetMatch | None = None |
| | fallback_u8: CharsetMatch | None = None |
| | fallback_specified: CharsetMatch | None = None |
| |
|
| | results: CharsetMatches = CharsetMatches() |
| |
|
| | early_stop_results: CharsetMatches = CharsetMatches() |
| |
|
| | sig_encoding, sig_payload = identify_sig_or_bom(sequences) |
| |
|
| | if sig_encoding is not None: |
| | prioritized_encodings.append(sig_encoding) |
| | logger.log( |
| | TRACE, |
| | "Detected a SIG or BOM mark on first %i byte(s). Priority +1 given for %s.", |
| | len(sig_payload), |
| | sig_encoding, |
| | ) |
| |
|
| | prioritized_encodings.append("ascii") |
| |
|
| | if "utf_8" not in prioritized_encodings: |
| | prioritized_encodings.append("utf_8") |
| |
|
| | for encoding_iana in prioritized_encodings + IANA_SUPPORTED: |
| | if cp_isolation and encoding_iana not in cp_isolation: |
| | continue |
| |
|
| | if cp_exclusion and encoding_iana in cp_exclusion: |
| | continue |
| |
|
| | if encoding_iana in tested: |
| | continue |
| |
|
| | tested.add(encoding_iana) |
| |
|
| | decoded_payload: str | None = None |
| | bom_or_sig_available: bool = sig_encoding == encoding_iana |
| | strip_sig_or_bom: bool = bom_or_sig_available and should_strip_sig_or_bom( |
| | encoding_iana |
| | ) |
| |
|
| | if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: |
| | logger.log( |
| | TRACE, |
| | "Encoding %s won't be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", |
| | encoding_iana, |
| | ) |
| | continue |
| | if encoding_iana in {"utf_7"} and not bom_or_sig_available: |
| | logger.log( |
| | TRACE, |
| | "Encoding %s won't be tested as-is because detection is unreliable without BOM/SIG.", |
| | encoding_iana, |
| | ) |
| | continue |
| |
|
| | try: |
| | is_multi_byte_decoder: bool = is_multi_byte_encoding(encoding_iana) |
| | except (ModuleNotFoundError, ImportError): |
| | logger.log( |
| | TRACE, |
| | "Encoding %s does not provide an IncrementalDecoder", |
| | encoding_iana, |
| | ) |
| | continue |
| |
|
| | try: |
| | if is_too_large_sequence and is_multi_byte_decoder is False: |
| | str( |
| | ( |
| | sequences[: int(50e4)] |
| | if strip_sig_or_bom is False |
| | else sequences[len(sig_payload) : int(50e4)] |
| | ), |
| | encoding=encoding_iana, |
| | ) |
| | else: |
| | decoded_payload = str( |
| | ( |
| | sequences |
| | if strip_sig_or_bom is False |
| | else sequences[len(sig_payload) :] |
| | ), |
| | encoding=encoding_iana, |
| | ) |
| | except (UnicodeDecodeError, LookupError) as e: |
| | if not isinstance(e, LookupError): |
| | logger.log( |
| | TRACE, |
| | "Code page %s does not fit given bytes sequence at ALL. %s", |
| | encoding_iana, |
| | str(e), |
| | ) |
| | tested_but_hard_failure.append(encoding_iana) |
| | continue |
| |
|
| | similar_soft_failure_test: bool = False |
| |
|
| | for encoding_soft_failed in tested_but_soft_failure: |
| | if is_cp_similar(encoding_iana, encoding_soft_failed): |
| | similar_soft_failure_test = True |
| | break |
| |
|
| | if similar_soft_failure_test: |
| | logger.log( |
| | TRACE, |
| | "%s is deemed too similar to code page %s and was consider unsuited already. Continuing!", |
| | encoding_iana, |
| | encoding_soft_failed, |
| | ) |
| | continue |
| |
|
| | r_ = range( |
| | 0 if not bom_or_sig_available else len(sig_payload), |
| | length, |
| | int(length / steps), |
| | ) |
| |
|
| | multi_byte_bonus: bool = ( |
| | is_multi_byte_decoder |
| | and decoded_payload is not None |
| | and len(decoded_payload) < length |
| | ) |
| |
|
| | if multi_byte_bonus: |
| | logger.log( |
| | TRACE, |
| | "Code page %s is a multi byte encoding table and it appear that at least one character " |
| | "was encoded using n-bytes.", |
| | encoding_iana, |
| | ) |
| |
|
| | max_chunk_gave_up: int = int(len(r_) / 4) |
| |
|
| | max_chunk_gave_up = max(max_chunk_gave_up, 2) |
| | early_stop_count: int = 0 |
| | lazy_str_hard_failure = False |
| |
|
| | md_chunks: list[str] = [] |
| | md_ratios = [] |
| |
|
| | try: |
| | for chunk in cut_sequence_chunks( |
| | sequences, |
| | encoding_iana, |
| | r_, |
| | chunk_size, |
| | bom_or_sig_available, |
| | strip_sig_or_bom, |
| | sig_payload, |
| | is_multi_byte_decoder, |
| | decoded_payload, |
| | ): |
| | md_chunks.append(chunk) |
| |
|
| | md_ratios.append( |
| | mess_ratio( |
| | chunk, |
| | threshold, |
| | explain is True and 1 <= len(cp_isolation) <= 2, |
| | ) |
| | ) |
| |
|
| | if md_ratios[-1] >= threshold: |
| | early_stop_count += 1 |
| |
|
| | if (early_stop_count >= max_chunk_gave_up) or ( |
| | bom_or_sig_available and strip_sig_or_bom is False |
| | ): |
| | break |
| | except ( |
| | UnicodeDecodeError |
| | ) as e: |
| | logger.log( |
| | TRACE, |
| | "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", |
| | encoding_iana, |
| | str(e), |
| | ) |
| | early_stop_count = max_chunk_gave_up |
| | lazy_str_hard_failure = True |
| |
|
| | |
| | |
| | if ( |
| | not lazy_str_hard_failure |
| | and is_too_large_sequence |
| | and not is_multi_byte_decoder |
| | ): |
| | try: |
| | sequences[int(50e3) :].decode(encoding_iana, errors="strict") |
| | except UnicodeDecodeError as e: |
| | logger.log( |
| | TRACE, |
| | "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", |
| | encoding_iana, |
| | str(e), |
| | ) |
| | tested_but_hard_failure.append(encoding_iana) |
| | continue |
| |
|
| | mean_mess_ratio: float = sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 |
| | if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: |
| | tested_but_soft_failure.append(encoding_iana) |
| | logger.log( |
| | TRACE, |
| | "%s was excluded because of initial chaos probing. Gave up %i time(s). " |
| | "Computed mean chaos is %f %%.", |
| | encoding_iana, |
| | early_stop_count, |
| | round(mean_mess_ratio * 100, ndigits=3), |
| | ) |
| | |
| | if ( |
| | enable_fallback |
| | and encoding_iana |
| | in ["ascii", "utf_8", specified_encoding, "utf_16", "utf_32"] |
| | and not lazy_str_hard_failure |
| | ): |
| | fallback_entry = CharsetMatch( |
| | sequences, |
| | encoding_iana, |
| | threshold, |
| | bom_or_sig_available, |
| | [], |
| | decoded_payload, |
| | preemptive_declaration=specified_encoding, |
| | ) |
| | if encoding_iana == specified_encoding: |
| | fallback_specified = fallback_entry |
| | elif encoding_iana == "ascii": |
| | fallback_ascii = fallback_entry |
| | else: |
| | fallback_u8 = fallback_entry |
| | continue |
| |
|
| | logger.log( |
| | TRACE, |
| | "%s passed initial chaos probing. Mean measured chaos is %f %%", |
| | encoding_iana, |
| | round(mean_mess_ratio * 100, ndigits=3), |
| | ) |
| |
|
| | if not is_multi_byte_decoder: |
| | target_languages: list[str] = encoding_languages(encoding_iana) |
| | else: |
| | target_languages = mb_encoding_languages(encoding_iana) |
| |
|
| | if target_languages: |
| | logger.log( |
| | TRACE, |
| | "{} should target any language(s) of {}".format( |
| | encoding_iana, str(target_languages) |
| | ), |
| | ) |
| |
|
| | cd_ratios = [] |
| |
|
| | |
| | |
| | if encoding_iana != "ascii": |
| | for chunk in md_chunks: |
| | chunk_languages = coherence_ratio( |
| | chunk, |
| | language_threshold, |
| | ",".join(target_languages) if target_languages else None, |
| | ) |
| |
|
| | cd_ratios.append(chunk_languages) |
| |
|
| | cd_ratios_merged = merge_coherence_ratios(cd_ratios) |
| |
|
| | if cd_ratios_merged: |
| | logger.log( |
| | TRACE, |
| | "We detected language {} using {}".format( |
| | cd_ratios_merged, encoding_iana |
| | ), |
| | ) |
| |
|
| | current_match = CharsetMatch( |
| | sequences, |
| | encoding_iana, |
| | mean_mess_ratio, |
| | bom_or_sig_available, |
| | cd_ratios_merged, |
| | ( |
| | decoded_payload |
| | if ( |
| | is_too_large_sequence is False |
| | or encoding_iana in [specified_encoding, "ascii", "utf_8"] |
| | ) |
| | else None |
| | ), |
| | preemptive_declaration=specified_encoding, |
| | ) |
| |
|
| | results.append(current_match) |
| |
|
| | if ( |
| | encoding_iana in [specified_encoding, "ascii", "utf_8"] |
| | and mean_mess_ratio < 0.1 |
| | ): |
| | |
| | if mean_mess_ratio == 0.0: |
| | logger.debug( |
| | "Encoding detection: %s is most likely the one.", |
| | current_match.encoding, |
| | ) |
| | if explain: |
| | logger.removeHandler(explain_handler) |
| | logger.setLevel(previous_logger_level) |
| | return CharsetMatches([current_match]) |
| |
|
| | early_stop_results.append(current_match) |
| |
|
| | if ( |
| | len(early_stop_results) |
| | and (specified_encoding is None or specified_encoding in tested) |
| | and "ascii" in tested |
| | and "utf_8" in tested |
| | ): |
| | probable_result: CharsetMatch = early_stop_results.best() |
| | logger.debug( |
| | "Encoding detection: %s is most likely the one.", |
| | probable_result.encoding, |
| | ) |
| | if explain: |
| | logger.removeHandler(explain_handler) |
| | logger.setLevel(previous_logger_level) |
| |
|
| | return CharsetMatches([probable_result]) |
| |
|
| | if encoding_iana == sig_encoding: |
| | logger.debug( |
| | "Encoding detection: %s is most likely the one as we detected a BOM or SIG within " |
| | "the beginning of the sequence.", |
| | encoding_iana, |
| | ) |
| | if explain: |
| | logger.removeHandler(explain_handler) |
| | logger.setLevel(previous_logger_level) |
| | return CharsetMatches([results[encoding_iana]]) |
| |
|
| | if len(results) == 0: |
| | if fallback_u8 or fallback_ascii or fallback_specified: |
| | logger.log( |
| | TRACE, |
| | "Nothing got out of the detection process. Using ASCII/UTF-8/Specified fallback.", |
| | ) |
| |
|
| | if fallback_specified: |
| | logger.debug( |
| | "Encoding detection: %s will be used as a fallback match", |
| | fallback_specified.encoding, |
| | ) |
| | results.append(fallback_specified) |
| | elif ( |
| | (fallback_u8 and fallback_ascii is None) |
| | or ( |
| | fallback_u8 |
| | and fallback_ascii |
| | and fallback_u8.fingerprint != fallback_ascii.fingerprint |
| | ) |
| | or (fallback_u8 is not None) |
| | ): |
| | logger.debug("Encoding detection: utf_8 will be used as a fallback match") |
| | results.append(fallback_u8) |
| | elif fallback_ascii: |
| | logger.debug("Encoding detection: ascii will be used as a fallback match") |
| | results.append(fallback_ascii) |
| |
|
| | if results: |
| | logger.debug( |
| | "Encoding detection: Found %s as plausible (best-candidate) for content. With %i alternatives.", |
| | results.best().encoding, |
| | len(results) - 1, |
| | ) |
| | else: |
| | logger.debug("Encoding detection: Unable to determine any suitable charset.") |
| |
|
| | if explain: |
| | logger.removeHandler(explain_handler) |
| | logger.setLevel(previous_logger_level) |
| |
|
| | return results |
| |
|
| |
|
| | def from_fp( |
| | fp: BinaryIO, |
| | steps: int = 5, |
| | chunk_size: int = 512, |
| | threshold: float = 0.20, |
| | cp_isolation: list[str] | None = None, |
| | cp_exclusion: list[str] | None = None, |
| | preemptive_behaviour: bool = True, |
| | explain: bool = False, |
| | language_threshold: float = 0.1, |
| | enable_fallback: bool = True, |
| | ) -> CharsetMatches: |
| | """ |
| | Same thing than the function from_bytes but using a file pointer that is already ready. |
| | Will not close the file pointer. |
| | """ |
| | return from_bytes( |
| | fp.read(), |
| | steps, |
| | chunk_size, |
| | threshold, |
| | cp_isolation, |
| | cp_exclusion, |
| | preemptive_behaviour, |
| | explain, |
| | language_threshold, |
| | enable_fallback, |
| | ) |
| |
|
| |
|
| | def from_path( |
| | path: str | bytes | PathLike, |
| | steps: int = 5, |
| | chunk_size: int = 512, |
| | threshold: float = 0.20, |
| | cp_isolation: list[str] | None = None, |
| | cp_exclusion: list[str] | None = None, |
| | preemptive_behaviour: bool = True, |
| | explain: bool = False, |
| | language_threshold: float = 0.1, |
| | enable_fallback: bool = True, |
| | ) -> CharsetMatches: |
| | """ |
| | Same thing than the function from_bytes but with one extra step. Opening and reading given file path in binary mode. |
| | Can raise IOError. |
| | """ |
| | with open(path, "rb") as fp: |
| | return from_fp( |
| | fp, |
| | steps, |
| | chunk_size, |
| | threshold, |
| | cp_isolation, |
| | cp_exclusion, |
| | preemptive_behaviour, |
| | explain, |
| | language_threshold, |
| | enable_fallback, |
| | ) |
| |
|
| |
|
| | def is_binary( |
| | fp_or_path_or_payload: PathLike | str | BinaryIO | bytes, |
| | steps: int = 5, |
| | chunk_size: int = 512, |
| | threshold: float = 0.20, |
| | cp_isolation: list[str] | None = None, |
| | cp_exclusion: list[str] | None = None, |
| | preemptive_behaviour: bool = True, |
| | explain: bool = False, |
| | language_threshold: float = 0.1, |
| | enable_fallback: bool = False, |
| | ) -> bool: |
| | """ |
| | Detect if the given input (file, bytes, or path) points to a binary file. aka. not a string. |
| | Based on the same main heuristic algorithms and default kwargs at the sole exception that fallbacks match |
| | are disabled to be stricter around ASCII-compatible but unlikely to be a string. |
| | """ |
| | if isinstance(fp_or_path_or_payload, (str, PathLike)): |
| | guesses = from_path( |
| | fp_or_path_or_payload, |
| | steps=steps, |
| | chunk_size=chunk_size, |
| | threshold=threshold, |
| | cp_isolation=cp_isolation, |
| | cp_exclusion=cp_exclusion, |
| | preemptive_behaviour=preemptive_behaviour, |
| | explain=explain, |
| | language_threshold=language_threshold, |
| | enable_fallback=enable_fallback, |
| | ) |
| | elif isinstance( |
| | fp_or_path_or_payload, |
| | ( |
| | bytes, |
| | bytearray, |
| | ), |
| | ): |
| | guesses = from_bytes( |
| | fp_or_path_or_payload, |
| | steps=steps, |
| | chunk_size=chunk_size, |
| | threshold=threshold, |
| | cp_isolation=cp_isolation, |
| | cp_exclusion=cp_exclusion, |
| | preemptive_behaviour=preemptive_behaviour, |
| | explain=explain, |
| | language_threshold=language_threshold, |
| | enable_fallback=enable_fallback, |
| | ) |
| | else: |
| | guesses = from_fp( |
| | fp_or_path_or_payload, |
| | steps=steps, |
| | chunk_size=chunk_size, |
| | threshold=threshold, |
| | cp_isolation=cp_isolation, |
| | cp_exclusion=cp_exclusion, |
| | preemptive_behaviour=preemptive_behaviour, |
| | explain=explain, |
| | language_threshold=language_threshold, |
| | enable_fallback=enable_fallback, |
| | ) |
| |
|
| | return not guesses |
| |
|