File size: 3,684 Bytes
94ec243
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import logging
from itertools import chain
from re import compile as re_compile
from contextvars import ContextVar, Token

from lxml import html

from scrapling.core._types import Any, Dict, Iterable, List

# Using cache on top of a class is a brilliant way to achieve a Singleton design pattern without much code
from functools import lru_cache  # isort:skip

html_forbidden = (html.HtmlComment,)

__CLEANING_TABLE__ = str.maketrans({"\t": " ", "\n": None, "\r": None})
__CONSECUTIVE_SPACES_REGEX__ = re_compile(r" +")


@lru_cache(1, typed=True)
def setup_logger():
    """Create and configure a logger with a standard format.

    :returns: logging.Logger: Configured logger instance
    """
    logger = logging.getLogger("scrapling")
    logger.setLevel(logging.INFO)

    formatter = logging.Formatter(fmt="[%(asctime)s] %(levelname)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S")

    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)

    # Add handler to logger (if not already added)
    if not logger.handlers:
        logger.addHandler(console_handler)

    return logger


_current_logger: ContextVar[logging.Logger] = ContextVar("scrapling_logger", default=setup_logger())


class LoggerProxy:
    def __getattr__(self, name: str):
        return getattr(_current_logger.get(), name)


log = LoggerProxy()


def set_logger(logger: logging.Logger) -> Token:
    """Set the current context logger. Returns token for reset."""
    return _current_logger.set(logger)


def reset_logger(token: Token) -> None:
    """Reset logger to previous state using token."""
    _current_logger.reset(token)


def flatten(lst: Iterable[Any]) -> List[Any]:
    return list(chain.from_iterable(lst))


def _is_iterable(obj: Any) -> bool:
    # This will be used only in regex functions to make sure it's iterable but not string/bytes
    return isinstance(
        obj,
        (
            list,
            tuple,
        ),
    )


class _StorageTools:
    @staticmethod
    def __clean_attributes(element: html.HtmlElement, forbidden: tuple = ()) -> Dict:
        if not element.attrib:
            return {}
        return {k: v.strip() for k, v in element.attrib.items() if v and v.strip() and k not in forbidden}

    @classmethod
    def element_to_dict(cls, element: html.HtmlElement) -> Dict:
        parent = element.getparent()
        result = {
            "tag": str(element.tag),
            "attributes": cls.__clean_attributes(element),
            "text": element.text.strip() if element.text else None,
            "path": cls._get_element_path(element),
        }
        if parent is not None:
            result.update(
                {
                    "parent_name": parent.tag,
                    "parent_attribs": dict(parent.attrib),
                    "parent_text": parent.text.strip() if parent.text else None,
                }
            )

            siblings = [child.tag for child in parent.iterchildren() if child != element]
            if siblings:
                result.update({"siblings": tuple(siblings)})

        children = [child.tag for child in element.iterchildren() if not isinstance(child, html_forbidden)]
        if children:
            result.update({"children": tuple(children)})

        return result

    @classmethod
    def _get_element_path(cls, element: html.HtmlElement):
        parent = element.getparent()
        return tuple((element.tag,) if parent is None else (cls._get_element_path(parent) + (element.tag,)))


@lru_cache(128, typed=True)
def clean_spaces(string):
    string = string.translate(__CLEANING_TABLE__)
    return __CONSECUTIVE_SPACES_REGEX__.sub(" ", string)