import os, shutil, json from datetime import datetime from collections import Counter, defaultdict from urllib.parse import urlsplit from urllib.robotparser import RobotFileParser from usp.objects.sitemap import InvalidSitemap from usp.tree import sitemap_tree_for_homepage from src.notification.notification_center import NotificationCenter from .utils import * from .types import * from .html_processor import HTMLProcessor from .content_cleaner import ContentCleaner from .url_normalizer import UrlNormalizer from ..utils.lang import detect_language from ..utils.logging import get_logger from ..utils.tools import call_with_exponential_backoff from ..config import config logger = get_logger('scraper.core') incupd_logger = get_logger('scraper.incremental_updates') class Scraper: def __init__(self, scrape_all: bool = True) -> None: self._scrape_all = scrape_all self._path = config.paths self._processor: HTMLProcessor = HTMLProcessor() self._normalizer: UrlNormalizer = UrlNormalizer() self._content_cleaner: ContentCleaner = ContentCleaner(self._scrape_all) self._notif_center: NotificationCenter = NotificationCenter() self._make_directories() self._url_temp_timestamps: dict[str, UrlTimestamps] = {} self._url_timestamps: dict[str, UrlTimestamps] = self._load_data(self._path.SCRAPING_OUTPUT, 'url_timestamps') self._url_priorities: dict[str, list[str]] = self._load_data(self._path.URLS_OUTPUT, 'url_priorities') logger.info(f'Successfully initialized the scraper') if scrape_all: logger.info("Initialized with SCRAPE_ALL=True. Timestamps and priorities will be ignored for this scraping session") def _make_directories(self) -> None: os.makedirs(self._path.URLS_OUTPUT, exist_ok=True) os.makedirs(self._path.CHUNKS_OUTPUT, exist_ok=True) os.makedirs(self._path.TEMP_CHUNKS_OUTPUT, exist_ok=True) os.makedirs(self._path.SCRAPING_OUTPUT, exist_ok=True) os.makedirs(self._path.RAW_HTML_OUTPUT, exist_ok=True) os.makedirs(self._path.RAW_TEXT_OUTPUT, exist_ok=True) os.makedirs(self._path.METADATA_OUTPUT, exist_ok=True) os.makedirs(self._path.EXTRACTED_TEXT_OUTPUT, exist_ok=True) def scrape_target(self, target_url: str) -> list[ChunkMetadata]: # Step 1: Analyze the target URL for availability, robots and sitemap analyzed_domain = self._analyze_domain(target_url) if not analyzed_domain: logger.error(f"Failed to scrape target URL {target_url}") return {} sitemap_urls = analyzed_domain.urls self._save_results(self._path.URLS_OUTPUT, 'sitemap_urls', sitemap_urls, target_url) # Step 2: Validate and scrape URLs listed in the sitemap analyzed_sitemap = self._analyze_sitemap(analyzed_domain) documents = analyzed_sitemap.documents logger.info(f"Indexed {len(sitemap_urls)} sitemap URLs for target URL {target_url}") logger.info(f"Scraped {len(documents)} unique URLs (others were either redirects or blacklisted)") # Step 3: Analyze discovered URLs and search for the new ones discovered_urls = analyzed_sitemap.discovered_urls logger.info(f"Discovered {len(discovered_urls)} new URLs during sitemap analysis") analyzed_discoveries = self._analyze_discoveries(discovered_urls, sitemap_urls, analyzed_domain) discovered_urls = analyzed_discoveries.discovered_urls self._save_results(self._path.URLS_OUTPUT, 'discovered_urls', discovered_urls, target_url) documents.extend(analyzed_discoveries.documents) logger.info(f"Indexed {len(discovered_urls)} new URLs for target URL {target_url}") # Step 4: Load temp chunks first so resume works even when there are no new documents. temp_filename = self._get_temp_chunks_filename(target_url) temp_merged_chunks = self._load_data(self._path.TEMP_CHUNKS_OUTPUT, temp_filename) if not documents and not temp_merged_chunks: logger.info(f"No new content was scraped from the target URL {target_url}") return {} tagged_documents = [] # Step 5: Analyze the converted URLs if documents: self._content_cleaner.perform_content_analysis(target_url, self._normalizer.url_to_filename(target_url)) analyzied_documents = self._analyze_url_documents(documents) self._save_results(self._path.URLS_OUTPUT, 'url_tags', analyzied_documents.url_tags) self._save_results(self._path.URLS_OUTPUT, 'url_priorities', analyzied_documents.url_priorities) tagged_documents = analyzied_documents.tagged_documents # Step 6: Collect and save chunks chunk_metadatas = self._collect_chunks(tagged_documents, target_url, temp_merged_chunks) self._save_results(self._path.METADATA_OUTPUT, 'raw_chunk_metadata', chunk_metadatas['raw'], target_url) self._save_results(self._path.METADATA_OUTPUT, 'merged_chunk_metadata', chunk_metadatas['merged'], target_url) self._save_results(self._path.METADATA_OUTPUT, 'deleted_chunk_metadata', chunk_metadatas['deleted'], target_url) logger.info(f"Collected {len(chunk_metadatas['merged'])} chunks from target URL {target_url}") logger.info(f"Scraping finished for target URL '{target_url}'") return chunk_metadatas['final'] def _analyze_domain(self, target_url: str) -> DomainAnalysisReport | None: if not target_url: logger.warning('The target URL string is empty!') return None # Step 1: Test whether the target URL is even accessible before initializing the scraping procedure response = call_with_exponential_backoff(fetch_url, args=(target_url,)) if response['status'] == 'FAIL': logger.error(f"Unaccessible target URL '{target_url}': {response['last_error']}") return None if not response['result']: logger.warning(f"Unnaccessible target URL '{target_url}': Recieved client/server error!") return None # Step 2: Fetch and parse robots logger.info(f"Fetching 'robots.txt' for the target URL '{target_url}'...") robots_parser: RobotFileParser = parse_robots(target_url) if not robots_parser: logger.warning( f"Could not fetch the 'robots.txt' file for the target URL '{target_url}'! " + "(Are you sure the scraping begins from root?)" ) return None logger.info(f"Parsed the 'robots.txt' file for target URL '{target_url}'") delay = robots_parser.crawl_delay('scraper') target_domain = urlsplit(target_url).netloc # Step 3: Fetch and parse sitemaps logger.info(f"Fetching sitemaps for target URL {target_url}...") sitemap_tree = sitemap_tree_for_homepage(target_url) if isinstance(sitemap_tree, InvalidSitemap): logger.error(f"Cannot fetch sitemap for target URL '{target_url}': Invalid sitemap structure!") return None page_data = [] page_urls = set() for page in sitemap_tree.all_pages(): page_url = page.url if not robots_parser.can_fetch('scraper', page_url) or page_url in page_urls: continue page_urls.add(page_url) page_data.append(PageData(page_url, page.last_modified)) logger.info(f'Loaded sitemaps with {len(page_data)} pages') return DomainAnalysisReport( target = target_domain, urls = list(page_urls), pages = page_data, delay = delay, ) def _analyze_sitemap(self, domain: DomainAnalysisReport) -> UrlAnalysisReport: documents = [] visited_urls = set() discovered_urls = set() rejected_urls = [] sitemap_pages = domain.pages logger.info(f'Starting validation and scraping for sitemap URLs...') for page in sitemap_pages: result = self._scrape_page(page.url, domain.delay, visited_urls, last_modified=page.last_modified) visited_urls.add(page.url) if result.status != ScrapingStatus.OK: if result.status == ScrapingStatus.REJECTED: rejected_urls.append(page.url) continue final_url = result.final_url documents.append(result.document) visited_urls.add(final_url) self._store_timestamps(final_url, result.timestamps, temp=True) new_urls = self._normalizer.filter_discovered_urls(result.discovered_urls, visited_urls, domain.target) discovered_urls |= new_urls if len(rejected_urls) > len(sitemap_pages)*0.1: rejection_rate = len(rejected_urls)/len(sitemap_pages) logger.warning(f"Rejection rate is {rejection_rate}") self._notif_center.send_notification( subject = "⚠ WARNING: Scraping rejection rate is >10%!", body = f"Rejection rate: {int(rejection_rate*100)}%\n" + f"Failed to scrape following URLs for target domain {domain.target}:\n" + "\n".join([f"\t- {url}" for url in rejected_urls]), channel = "slack", ) discovered_urls = [url for url in discovered_urls if url not in visited_urls] return UrlAnalysisReport( documents = documents, discovered_urls = discovered_urls, ) def _analyze_discoveries( self, discovered_urls: list, sitemap_urls: list, domain: DomainAnalysisReport ) -> UrlAnalysisReport: if len(discovered_urls) == 0: return UrlAnalysisReport([], []) documents = [] discoveries = discovered_urls.copy() visited_urls = set(sitemap_urls.copy()) discovered_urls = [{'url': url, 'depth': 0} for url in discovered_urls] logger.info(f"Starting validation and scraping for discovered URLs...") while discovered_urls: discovered_url = discovered_urls.pop() url = discovered_url['url'] result = self._scrape_page(url, domain.delay, visited_urls, discovery_depth=discovered_url['depth']) visited_urls.add(url) if not result: continue final_url = result.final_url documents.append(result.document) visited_urls.add(final_url) discoveries.append(final_url) self._store_timestamps(final_url, result.timestamps, temp=True) for new_url in self._normalizer.filter_discovered_urls(result.discovered_urls, visited_urls, domain.target): discovered_urls.append({'url': new_url, 'depth': result.discovery_depth}) return UrlAnalysisReport( documents = documents, discovered_urls = discoveries, ) def _analyze_url_documents(self, documents: list) -> DocumentAnalysisReport: url_tags = {} url_priorities = defaultdict(list) tagged_documents = [] logger.info(f"Analyzing scraped contents of {len(documents)} pages...") for document in documents: url = document.name self._content_cleaner.clean_document(document) extracted_text = self._processor.convert_to_txt(document) if extracted_text.strip() == '': logger.warning(f'No text extracted from {url}. Skipping ...') continue url_filename = self._normalizer.url_to_filename(url) extracted_text_file_path = os.path.join(self._path.EXTRACTED_TEXT_OUTPUT, url_filename + '.txt') with open(extracted_text_file_path, 'w', encoding="utf-8") as f: f.write(extracted_text) logger.info(f"Saved extracted text for URL '{url}' under '{extracted_text_file_path}'") language = detect_language(extracted_text) tp_result = detect_page_topic_and_priority(extracted_text) programs = self._processor.strategies_processor.apply_strategy( strategy_name='programs', arguments={'document_content': extracted_text}, ) program = programs[0] if programs else 'no program' tags = UrlTags( topic = tp_result['topic'], priority = tp_result['priority'], language = language, program = program, ) url_tags[url] = tags url_priorities[tp_result['priority']].append(url) tagged_documents.append(TaggedDocument(document, DocumentTags(program, language))) return DocumentAnalysisReport( url_tags = url_tags, url_priorities = url_priorities, tagged_documents = tagged_documents, ) def _collect_chunks( self, tagged_documents: list[dict], target_url: str, temp_chunks: dict[str, list[ChunkMetadata]] | None = None, ) -> dict[str, list[ChunkMetadata]]: raw_chunks = [] deleted_chunks = [] merged_chunks, final_chunks = self._read_temp_chunks(temp_chunks, tagged_documents) program_counter = self._build_program_counter_from_merged_chunks(merged_chunks) if merged_chunks: incupd_logger.info(f"Restored {len(merged_chunks)} chunks from temp") for entry in tagged_documents: document = entry.document program = entry.tags.program language = entry.tags.language url = document.name url_filename = self._normalizer.url_to_filename(url) program_counter[program] += 1 doc_chunks_dir_path = os.path.join(config.paths.CHUNKS_OUTPUT, url_filename) if os.path.exists(doc_chunks_dir_path): shutil.rmtree(doc_chunks_dir_path) os.makedirs(doc_chunks_dir_path) mergible_chunks_metadatas = [] raw_chunk_count = 0 for i, chunk in enumerate(self._processor.chunk(document), start=1): raw_chunk_count = i chunk_file_path = os.path.join(doc_chunks_dir_path, f"chunk_{i}.txt") with open(chunk_file_path, 'w', encoding="utf-8") as f: f.write(chunk['text']) chunk_topic = detect_chunk_topic(chunk['text']) chunk_metadata = ChunkMetadata( chunk_id = f"{program.lower()}_{program_counter[program]:03d}_{i:02d}", text = chunk['text'], source_url = url, program = program, language = language, topic = chunk_topic, last_scraped = datetime.now(), page_title = self._processor.extract_title(document), section_heading = chunk['title'], token_size = chunk['size'], ) raw_chunks.append(chunk_metadata) if chunk_topic == 'none': deleted_chunks.append(chunk_metadata) else: mergible_chunks_metadatas.append(chunk_metadata) logger.info(f"Collected {raw_chunk_count} raw chunks and saved under '{doc_chunks_dir_path}'") merged_chunk_metadatas = self._processor.merge_chunks_by_topic(mergible_chunks_metadatas) merged_chunks.extend(merged_chunk_metadatas) self._store_temp_chunks(target_url, url, merged_chunk_metadatas) logger.info(f"Merged {raw_chunk_count} raw chunks into {len(merged_chunk_metadatas)} chunks by topic") prepared_chunks = self._processor.prepare_chunks(url, self._processor.convert_to_txt(document), merged_chunk_metadatas) for lang in final_chunks.keys(): if lang in prepared_chunks.keys(): final_chunks[lang].extend(prepared_chunks[lang]) return { 'raw': raw_chunks, 'merged': merged_chunks, 'deleted': deleted_chunks, 'final': final_chunks, } def _read_temp_chunks( self, temp_chunks: dict[str, list[ChunkMetadata]], tagged_documents: list[TaggedDocument] ) -> set[list, list[dict]]: loaded_temp_chunks = temp_chunks.copy() prepared_temp_chunks = {lang: [] for lang in config.get('AVAILABLE_LANGUAGES', ['en', 'de'])} for url in [entry.document.name for entry in tagged_documents]: if url in temp_chunks.keys(): incupd_logger.info(f"Deleted stored temp data for URL {url} as it was newly scraped") del loaded_temp_chunks[url] restored_temp_chunks = [] for url, chunks in loaded_temp_chunks.items(): url_filename = self._normalizer.url_to_filename(url) extracted_text_path = os.path.join(self._path.EXTRACTED_TEXT_OUTPUT, url_filename + '.txt') if not os.path.exists(extracted_text_path): incupd_logger.warning(f"Cannot restore chunks for URL {url}: Failed to locate previously extracted contents!") incupd_logger.warning(f"This URL will has to be rescraped in the next session") continue with open(extracted_text_path, 'r') as f: url_text = f.read() prepared_chunks = self._processor.prepare_chunks(url, url_text, chunks) for lang in prepared_temp_chunks.keys(): if lang in prepared_chunks.keys(): prepared_temp_chunks[lang].extend(prepared_chunks[lang]) restored_temp_chunks.extend(chunks) incupd_logger.info(f"Restored {len(chunks)} chunks for URL {url} from temp") return restored_temp_chunks, prepared_temp_chunks def _store_temp_chunks(self, target_url: str, url: str, chunks: list[ChunkMetadata]) -> None: self._url_timestamps[url] = self._url_temp_timestamps[url] temp_chunks = {url: chunks} self._save_results(self._path.TEMP_CHUNKS_OUTPUT, self._get_temp_chunks_filename(target_url), temp_chunks) self._save_results(self._path.SCRAPING_OUTPUT, 'url_timestamps', self._url_timestamps) incupd_logger.info(f"Stored {len(chunks)} chunks in temp for URL {url}") def _build_program_counter_from_merged_chunks(self, merged_chunks: list[ChunkMetadata]) -> Counter: counter = Counter() seen = set() for chunk in merged_chunks: key = (chunk.program, chunk.source_url) if key not in seen: counter[chunk.program] += 1 seen.add(key) return counter def _is_url_modified( self, url: str, new_last_modified: datetime | None = None, new_page_hash: str | None = None ) -> bool: if url not in self._url_timestamps.keys(): return True stored = self._url_timestamps[url] if stored.last_modified and new_last_modified: return stored.last_modified < new_last_modified if new_page_hash and stored.page_hash: return new_page_hash != stored.page_hash return True def _store_timestamps(self, url: str, timestamps: UrlTimestamps, temp=False) -> None: if temp: self._url_temp_timestamps[url] = timestamps else: self._url_timestamps[url] = timestamps def _get_temp_chunks_filename(self, target_url: str) -> str: return self._normalizer.url_to_filename(target_url) + '_merged_chunks' def delete_temp_merged_chunks(self, target_url: str) -> None: temp_path = os.path.join( self._path.TEMP_CHUNKS_OUTPUT, self._get_temp_chunks_filename(target_url) + '.json' ) if os.path.exists(temp_path): os.remove(temp_path) incupd_logger.info(f"Deleted temp merged chunks file '{temp_path}'") def _get_etag(self, url: str) -> str | None: if url not in self._url_timestamps.keys(): return None return self._url_timestamps[url].etag def _is_fetch_valid(self, url: str, visited_urls: list[str], fetch_result: FetchResult) -> ScrapingStatus: if not fetch_result: logger.warning(f"Cannot fetch {url}! Skipping...") return ScrapingStatus.REJECTED if fetch_result.not_modified: logger.info("No updates on the page, skipping...") return ScrapingStatus.NO_UPDATES final_url = fetch_result.final_url if final_url != url: logger.info(f"Redirect detected: '{url}' --> '{final_url}'") if final_url in visited_urls: logger.info(f"'{final_url}' was already visited, skipping...") return ScrapingStatus.VISITED logger.info(f"Continuing with URL '{final_url}'...") last_modified = fetch_result.last_modified page_hash = fetch_result.page_hash if not self._scrape_all and not self._is_url_modified(final_url, new_last_modified=last_modified, new_page_hash=page_hash): logger.info(f"URL {final_url} was not modified since last scraping session, skipping...") return ScrapingStatus.NO_UPDATES return ScrapingStatus.OK def _is_url_prioritized(self, url) -> bool: if url not in self._url_timestamps.keys(): return True for prio, urls in self._url_priorities.items(): if url in urls: return self._is_scraping_scheduled(url, prio) return True def _is_scraping_scheduled(self, url, prio) -> bool: current_timestamp = datetime.now() saved_timestamp = self._url_timestamps[url].last_scraped time_difference = current_timestamp - saved_timestamp if not saved_timestamp: return True for interval_prio, interval in config.scraping.INTERVALS.items(): if prio == interval_prio: return time_difference.days >= interval return True def _scrape_page( self, url: str, crawl_delay: float, visited_urls: list[str], discovery_depth: int = 0, last_modified: datetime | None = None ) -> ScrapingResult | None: if not url: return ScrapingResult(status=ScrapingStatus.REJECTED) if self._normalizer.is_url_blacklisted(url): logger.info(f"URL {url} is blacklisted by scraper, skipping...") return ScrapingResult(status=ScrapingStatus.BLACKLISTED) if url in visited_urls: logger.info(f'URL {url} was already analyzed via redirect, skipping...') return ScrapingResult(status=ScrapingStatus.VISITED) if not self._scrape_all and last_modified and not self._is_url_modified(url, new_last_modified=last_modified): logger.info(f"URL '{url}' was not modified since last scraping session, skipping...") self._url_timestamps[url].last_modified = last_modified return ScrapingResult(status=ScrapingStatus.NO_UPDATES) if not self._scrape_all and not self._is_url_prioritized(url): logger.info(f"URL {url} is not prioritized, skipping") return ScrapingResult(status=ScrapingStatus.NO_UPDATES) logger.info(f"Fetching head for URL '{url}'...") etag = self._get_etag(url) response = call_with_exponential_backoff(fetch_head, args=(url, etag), delay=crawl_delay) if response['status'] == 'FAIL': logger.warning(f"Failed to fetch head for URL {url}: {response['last_error']}! Skipping...") return ScrapingResult(status=ScrapingStatus.REJECTED) fetch_result = response['result'] validation = self._is_fetch_valid(url, visited_urls, fetch_result) if validation != ScrapingStatus.OK: return ScrapingResult(status=validation) response = call_with_exponential_backoff(fetch_url, args=(url, etag), delay=crawl_delay) if response['status'] == 'FAIL': logger.warning(f"Failed to fetch URL {url}: {response['last_error']}! Skipping...") return ScrapingResult(status=ScrapingStatus.REJECTED) fetch_result = response['result'] validation = self._is_fetch_valid(url, visited_urls, fetch_result) if validation != ScrapingStatus.OK: return ScrapingResult(status=validation) if not fetch_result.last_modified: logger.warning("No information about URL last modification date exists!") timestamps = UrlTimestamps( last_modified = fetch_result.last_modified, last_scraped = datetime.now(), etag = fetch_result.etag, page_hash = fetch_result.page_hash, ) raw_html = fetch_result.text final_url = fetch_result.final_url url_filename = self._normalizer.url_to_filename(final_url) raw_html_file_path = os.path.join(config.paths.RAW_HTML_OUTPUT, url_filename + '.html') with open(raw_html_file_path, 'w', encoding="utf-8") as f: f.write(raw_html) logger.info(f"Saved fetched HTML under '{raw_html_file_path}'") logger.info(f"Cleaning URL {final_url} from mobile data...") cleaned_html = self._content_cleaner.clean_mobile_content(raw_html) logger.info(f"Processing URL {final_url}...") document = self._processor.process(final_url, cleaned_html) if not document: logger.warning(f"Failed to process URL '{final_url}'! Skipping...") return ScrapingResult(status=ScrapingStatus.REJECTED) discovered_urls = self._content_cleaner.extract_urls(document) if discovery_depth <= 3 else [] self._content_cleaner.collect_repetitive_content(document) raw_text = self._processor.convert_to_txt(document) raw_text_file_path = os.path.join(config.paths.RAW_TEXT_OUTPUT, url_filename + '.txt') with open(raw_text_file_path, 'w', encoding="utf-8") as f: f.write(raw_text) logger.info(f"Saved raw text for URL '{final_url}' under '{raw_text_file_path}'") return ScrapingResult( document = document, discovered_urls = discovered_urls, final_url = final_url, timestamps = timestamps, discovery_depth = discovery_depth + 1, status = ScrapingStatus.OK, ) def _save_results(self, path: str, filename: str, results, target_url: str | None = None) -> None: results_path = os.path.join(path, filename + '.json') results_dict = {} if os.path.exists(results_path): try: with open(results_path, 'r', encoding='utf-8') as f: results_dict = json.load(f) except Exception: logger.warning(f"Failed to load existing {results_path}, will overwrite") match filename: case 'url_tags': results_dict |= results case 'url_timestamps': for url, ts in results.items(): results_dict[url] = dataclass_to_dict(ts) case 'url_priorities': for prio, urls in results.items(): prev = set(results_dict.get(prio, [])) results_dict[prio] = list(prev.union(urls)) case _ if filename.endswith('_merged_chunks'): for url, chunks in results.items(): results_dict[url] = [dataclass_to_dict(chunk) for chunk in chunks] case _: results = [dataclass_to_dict(r) for r in results] if target_url: results_dict[target_url] = results else: results_dict = results try: with open(results_path, 'w', encoding='utf-8') as f: json.dump( results_dict, f, indent=4, default=lambda o: o.isoformat() if isinstance(o, datetime) else None, ) except Exception as e: logger.error(f"Failed to store results '{filename}'") raise e logger.debug(f"Stored results in file {results_path}") def _load_data(self, path: str, filename: str): datapath = os.path.join(path, filename + '.json') if not os.path.exists(datapath): logger.warning(f"Failed to locate file {datapath}; new data will be recorded") return defaultdict(dict) try: with open(datapath, 'r', encoding='utf-8') as f: loaded_data = json.load(f) match filename: case 'url_timestamps': for url, ts_dict in loaded_data.items(): loaded_data[url] = dict_to_dataclass(ts_dict, UrlTimestamps) incupd_logger.debug(f"Loaded {len(loaded_data)} URL timestamps") return loaded_data case _ if filename.endswith('_merged_chunks'): for url, chunk_metadata in loaded_data.items(): loaded_data[url] = [dict_to_dataclass(chunk, ChunkMetadata) for chunk in chunk_metadata] incupd_logger.debug(f"Loaded {len(loaded_data)} temp merged chunks") return loaded_data case _: incupd_logger.info(f"Loaded data '{filename}'") return loaded_data except Exception as e: logger.error(f"Failed trying to load data '{filename}': {e}") logger.info("New data will be recorded") return defaultdict(dict)