Spaces:
Running
Running
| import hashlib | |
| import json | |
| import requests, difflib, datetime | |
| from email.utils import parsedate_to_datetime | |
| from functools import lru_cache | |
| from collections import defaultdict | |
| from urllib.robotparser import RobotFileParser | |
| from urllib.error import URLError | |
| from fake_useragent import UserAgent | |
| from bs4 import BeautifulSoup | |
| from src.scraping.types import FetchResult | |
| from ..config import config | |
| from ..const.page_priority import * | |
| from ..utils.logging import get_logger | |
| from ..utils.tools import call_with_exponential_backoff | |
| logger = get_logger('scraper.utils') | |
| ua = UserAgent() | |
| def _fuzzy_match(word, keyword, threshold=0.8): | |
| """ | |
| Check if word fuzzy matches keyword using difflib ratio. | |
| """ | |
| return difflib.SequenceMatcher(None, word.lower(), keyword.lower()).ratio() >= threshold | |
| def detect_page_topic_and_priority(text: str) -> dict[str, str]: | |
| result = { | |
| 'priority': 'low', | |
| 'topic': 'none', | |
| } | |
| if not text: return result | |
| text_lower = text.lower() | |
| words = text_lower.split() | |
| topic_counter = { prio: defaultdict(int) for prio in PAGE_PRIORITY_KEYWORDS.keys() } | |
| prio_counter = { prio: 0 for prio in PAGE_PRIORITY_KEYWORDS.keys() } | |
| for word in words: | |
| for prio, kws in PAGE_PRIORITY_KEYWORDS.items(): | |
| for kw in kws: | |
| if _fuzzy_match(word, kw): | |
| topic_counter[prio][kw] += 1 | |
| prio_counter[prio] += sum(topic_counter[prio].values()) | |
| if max(prio_counter.values()) == 0: | |
| return result | |
| top_prio = max(prio_counter.keys(), key=lambda k: prio_counter[k]) | |
| top_topic = max(topic_counter[top_prio].keys(), key=lambda k: topic_counter[top_prio][k]) | |
| result['priority'] = top_prio | |
| result['topic'] = top_topic | |
| return result | |
| def detect_chunk_topic(text: str) -> str: | |
| if not text: return 'none' | |
| text_lower = text.lower() | |
| words = text_lower.split() | |
| topic_counter = { topic: 0 for topic in CHUNK_TOPIC_KEYWORDS.keys() } | |
| for word in words: | |
| for topic, kws in CHUNK_TOPIC_KEYWORDS.items(): | |
| topic_counter[topic] += len(list(filter(lambda kw: _fuzzy_match(word, kw), kws))) | |
| if max(topic_counter.values()) == 0: | |
| return 'none' | |
| top_topic = max(topic_counter.keys(), key=lambda k: topic_counter[k]) | |
| return top_topic | |
| def hash_html(html: str) -> str: | |
| soup = BeautifulSoup(html, "html.parser") | |
| for tag in soup(["script", "style"]): | |
| tag.decompose() | |
| text = soup.get_text() | |
| return hashlib.sha256(text.encode()).hexdigest() | |
| def parse_isoformat(data: str) -> datetime.datetime: | |
| if not data: | |
| return None | |
| try: | |
| return parsedate_to_datetime(data) | |
| except (TypeError, ValueError): | |
| pass | |
| try: | |
| return datetime.datetime.fromisoformat(data) | |
| except ValueError: | |
| pass | |
| return None | |
| def extract_last_modified(response, html) -> datetime.datetime | None: | |
| last_modified = response.headers.get("Last-Modified", None) | |
| soup = BeautifulSoup(html, "html.parser") | |
| if not last_modified: | |
| for key in [ ("name", "last-modified"), ("property", "article:modified_time")]: | |
| tag = soup.find("meta", {key[0]: key[1]}) | |
| if tag: | |
| last_modified = tag.get("content") | |
| break | |
| if not last_modified: | |
| scripts = soup.find_all("script", {"type": "application/ld+json"}) | |
| for script in scripts: | |
| try: | |
| data = json.loads(script.string) | |
| except: | |
| continue | |
| graph = data.get("@graph") if isinstance(data, dict) else None | |
| if graph: | |
| for item in graph: | |
| if item.get("@type") in ["WebPage", "Article"]: | |
| last_modified = item.get("dateModified") | |
| if last_modified: | |
| break | |
| return parse_isoformat(last_modified) | |
| def fetch_head(url: str, etag: str | None = None) -> FetchResult: | |
| try: | |
| headers = {"User-Agent": ua.chrome} | |
| if etag: | |
| headers["If-None-Match"] = etag | |
| response = requests.head( | |
| url, | |
| allow_redirects=True, | |
| timeout=15, | |
| headers=headers | |
| ) | |
| if response.status_code == 304: | |
| return FetchResult(not_modified=True) | |
| if response.status_code >= 400: | |
| logger.warning(f"HTTP {response.status_code} for URL '{url}'") | |
| raise Exception() | |
| return FetchResult( | |
| final_url = response.url, | |
| last_modified = response.headers.get('Last-Modified'), | |
| etag = response.headers.get('ETag') | |
| ) | |
| except Exception as e: | |
| logger.exception(f"Head fetch failed: {url}") | |
| raise e | |
| def fetch_url(url: str, etag: str | None = None) -> dict: | |
| try: | |
| headers = {"User-Agent": ua.chrome} | |
| if etag: | |
| headers["If-None-Match"] = etag | |
| response = requests.get( | |
| url, | |
| allow_redirects=True, | |
| timeout=15, | |
| headers=headers | |
| ) | |
| if response.status_code == 304: | |
| return FetchResult(not_modified=True) | |
| if response.status_code >= 400: | |
| logger.warning(f"HTTP {response.status_code} for URL '{url}'") | |
| raise Exception() | |
| html = response.text | |
| etag = response.headers.get("ETag") | |
| last_modified = extract_last_modified(response, html) | |
| page_hash = hash_html(html) | |
| return FetchResult( | |
| text = html, | |
| final_url = response.url, | |
| page_hash = page_hash, | |
| last_modified = last_modified, | |
| etag = etag, | |
| ) | |
| except Exception as e: | |
| logger.exception(f"Fetch failed: {url}") | |
| raise e | |
| def _robots_exist(robots_url) -> bool: | |
| try: | |
| logger.info(f"Checking if 'robots.txt' accessible on path '{robots_url}'...") | |
| response = requests.head(robots_url, allow_redirects=True, timeout=config.scraping.TIMEOUT) | |
| if response.status_code >= 400: | |
| logger.error("Cannot access the 'robots.txt' - recieved status code {response.status_code}!") | |
| return False | |
| return True | |
| except requests.RequestException as e: | |
| raise requests.RequestException(f"An error occured while requesting the URL '{robots_url}': {e}") | |
| except Exception as e: | |
| raise e | |
| def parse_robots(base_url: str) -> RobotFileParser | None: | |
| robots_url = f'{base_url.rstrip('/')}/robots.txt' | |
| # Check whether the robots.txt file is accessible from this url | |
| response = call_with_exponential_backoff(_robots_exist, args=(robots_url,)) | |
| if not response['result']: return None | |
| logger.info(f"File 'robots.txt' found for the target url '{base_url}'") | |
| rp = RobotFileParser() | |
| rp.set_url(robots_url) | |
| # Parse existing robots.txt file into the parser | |
| def fetch_robots(): | |
| try: | |
| rp.read() | |
| except URLError as e: | |
| raise URLError(f"Failed to fetch the 'robots.txt': {e}") | |
| response = call_with_exponential_backoff(fetch_robots) | |
| if response['status'] == 'FAIL': | |
| logger.error(f"Failed to fetch the 'robots.txt': {response['last_error']}") | |
| return None | |
| return rp | |