File size: 6,652 Bytes
29489df 916182a 2b65543 916182a 2b837a0 2b65543 916182a 2b837a0 2b65543 e5733cb 29489df 2b837a0 9b40891 2b837a0 916182a 2b837a0 e5733cb 2b837a0 1138f08 fcedcce e5733cb 2b837a0 e5733cb 2b837a0 2b65543 2b837a0 4da1914 2b837a0 fcedcce 2b837a0 fcedcce 2b837a0 1138f08 2b837a0 e5ecf76 31c2447 2b837a0 31c2447 2b837a0 1138f08 2b837a0 4da1914 fcedcce 916182a 2b837a0 4da1914 2b837a0 2b65543 4da1914 2b837a0 2b65543 2b837a0 226b463 2b837a0 1d98b51 2b837a0 4da1914 2b837a0 fcedcce 2b837a0 fcedcce 2b65543 fcedcce 2b837a0 1d98b51 2b837a0 fcedcce 2b837a0 2b65543 2b837a0 4da1914 2b837a0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | from hashlib import sha256
from threading import RLock
from functools import lru_cache
from abc import ABC, abstractmethod
from sqlite3 import connect as db_connect
from orjson import dumps, loads
from lxml.html import HtmlElement
from scrapling.core.utils import _StorageTools, log
from scrapling.core._types import Dict, Optional, Any, cast
class StorageSystemMixin(ABC): # pragma: no cover
# If you want to make your own storage system, you have to inherit from this
def __init__(self, url: Optional[str] = None):
"""
:param url: URL of the website we are working on to separate it from other websites data
"""
# Make the url in lowercase to handle this edge case until it's updated: https://github.com/barseghyanartur/tld/issues/124
self.url = url.lower() if (url and isinstance(url, str)) else None
@lru_cache(64, typed=True)
def _get_base_url(self, default_value: str = "default") -> str:
if not self.url:
return default_value
try:
from tld import get_tld, Result
# Fixing the inaccurate return type hint in `get_tld`
extracted: Result | None = cast(
Result, get_tld(self.url, as_object=True, fail_silently=True, fix_protocol=True)
)
if not extracted:
return default_value
return extracted.fld or extracted.domain or default_value
except AttributeError:
return default_value
@abstractmethod
def save(self, element: HtmlElement, identifier: str) -> None:
"""Saves the element's unique properties to the storage for retrieval and relocation later
:param element: The element itself which we want to save to storage.
:param identifier: This is the identifier that will be used to retrieve the element later from the storage. See
the docs for more info.
"""
raise NotImplementedError("Storage system must implement `save` method")
@abstractmethod
def retrieve(self, identifier: str) -> Optional[Dict]:
"""Using the identifier, we search the storage and return the unique properties of the element
:param identifier: This is the identifier that will be used to retrieve the element from the storage. See
the docs for more info.
:return: A dictionary of the unique properties
"""
raise NotImplementedError("Storage system must implement `save` method")
@staticmethod
@lru_cache(128, typed=True)
def _get_hash(identifier: str) -> str:
"""If you want to hash identifier in your storage system, use this safer"""
_identifier = identifier.lower().strip()
# Hash functions have to take bytes
_identifier_bytes = _identifier.encode("utf-8")
hash_value = sha256(_identifier_bytes).hexdigest()
return f"{hash_value}_{len(_identifier_bytes)}" # Length to reduce collision chance
@lru_cache(1, typed=True)
class SQLiteStorageSystem(StorageSystemMixin):
"""The recommended system to use, it's race condition safe and thread safe.
Mainly built, so the library can run in threaded frameworks like scrapy or threaded tools
> It's optimized for threaded applications, but running it without threads shouldn't make it slow."""
def __init__(self, storage_file: str, url: Optional[str] = None):
"""
:param storage_file: File to be used to store elements' data.
:param url: URL of the website we are working on to separate it from other websites data
"""
super().__init__(url)
self.storage_file = storage_file
self.lock = RLock() # Better than Lock for reentrancy
# >SQLite default mode in the earlier version is 1 not 2 (1=thread-safe 2=serialized)
# `check_same_thread=False` to allow it to be used across different threads.
self.connection = db_connect(self.storage_file, check_same_thread=False)
# WAL (Write-Ahead Logging) allows for better concurrency.
self.connection.execute("PRAGMA journal_mode=WAL")
self.cursor = self.connection.cursor()
self._setup_database()
log.debug(f'Storage system loaded with arguments (storage_file="{storage_file}", url="{url}")')
def _setup_database(self) -> None:
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS storage (
id INTEGER PRIMARY KEY,
url TEXT,
identifier TEXT,
element_data TEXT,
UNIQUE (url, identifier)
)
""")
self.connection.commit()
def save(self, element: HtmlElement, identifier: str) -> None:
"""Saves the elements unique properties to the storage for retrieval and relocation later
:param element: The element itself which we want to save to storage.
:param identifier: This is the identifier that will be used to retrieve the element later from the storage. See
the docs for more info.
"""
url = self._get_base_url()
element_data = _StorageTools.element_to_dict(element)
with self.lock:
self.cursor.execute(
"""
INSERT OR REPLACE INTO storage (url, identifier, element_data)
VALUES (?, ?, ?)
""",
(url, identifier, dumps(element_data)),
)
self.cursor.fetchall()
self.connection.commit()
def retrieve(self, identifier: str) -> Optional[Dict[str, Any]]:
"""Using the identifier, we search the storage and return the unique properties of the element
:param identifier: This is the identifier that will be used to retrieve the element from the storage. See
the docs for more info.
:return: A dictionary of the unique properties
"""
url = self._get_base_url()
with self.lock:
self.cursor.execute(
"SELECT element_data FROM storage WHERE url = ? AND identifier = ?",
(url, identifier),
)
result = self.cursor.fetchone()
if result:
return loads(result[0])
return None
def close(self):
"""Close all connections. It will be useful when with some things like scrapy Spider.closed() function/signal"""
with self.lock:
self.connection.commit()
self.cursor.close()
self.connection.close()
def __del__(self):
"""To ensure all connections are closed when the object is destroyed."""
self.close()
|