hsg_rag_eea / src /scraping /scraper.py
Pygmales
synched versions
698965e
raw
history blame
30.4 kB
import os, shutil, json
from datetime import datetime
from collections import Counter, defaultdict
from urllib.parse import urlsplit
from urllib.robotparser import RobotFileParser
from usp.objects.sitemap import InvalidSitemap
from usp.tree import sitemap_tree_for_homepage
from src.notification.notification_center import NotificationCenter
from .utils import *
from .types import *
from .html_processor import HTMLProcessor
from .content_cleaner import ContentCleaner
from .url_normalizer import UrlNormalizer
from ..utils.lang import detect_language
from ..utils.logging import get_logger
from ..utils.tools import call_with_exponential_backoff
from ..config import config
logger = get_logger('scraper.core')
incupd_logger = get_logger('scraper.incremental_updates')
class Scraper:
def __init__(self, scrape_all: bool = True) -> None:
self._scrape_all = scrape_all
self._path = config.paths
self._processor: HTMLProcessor = HTMLProcessor()
self._normalizer: UrlNormalizer = UrlNormalizer()
self._content_cleaner: ContentCleaner = ContentCleaner(self._scrape_all)
self._notif_center: NotificationCenter = NotificationCenter()
self._make_directories()
self._url_temp_timestamps: dict[str, UrlTimestamps] = {}
self._url_timestamps: dict[str, UrlTimestamps] = self._load_data(self._path.SCRAPING_OUTPUT, 'url_timestamps')
self._url_priorities: dict[str, list[str]] = self._load_data(self._path.URLS_OUTPUT, 'url_priorities')
logger.info(f'Successfully initialized the scraper')
if scrape_all:
logger.info("Initialized with SCRAPE_ALL=True. Timestamps and priorities will be ignored for this scraping session")
def _make_directories(self) -> None:
os.makedirs(self._path.URLS_OUTPUT, exist_ok=True)
os.makedirs(self._path.CHUNKS_OUTPUT, exist_ok=True)
os.makedirs(self._path.TEMP_CHUNKS_OUTPUT, exist_ok=True)
os.makedirs(self._path.SCRAPING_OUTPUT, exist_ok=True)
os.makedirs(self._path.RAW_HTML_OUTPUT, exist_ok=True)
os.makedirs(self._path.RAW_TEXT_OUTPUT, exist_ok=True)
os.makedirs(self._path.METADATA_OUTPUT, exist_ok=True)
os.makedirs(self._path.EXTRACTED_TEXT_OUTPUT, exist_ok=True)
def scrape_target(self, target_url: str) -> list[ChunkMetadata]:
# Step 1: Analyze the target URL for availability, robots and sitemap
analyzed_domain = self._analyze_domain(target_url)
if not analyzed_domain:
logger.error(f"Failed to scrape target URL {target_url}")
return {}
sitemap_urls = analyzed_domain.urls
self._save_results(self._path.URLS_OUTPUT, 'sitemap_urls', sitemap_urls, target_url)
# Step 2: Validate and scrape URLs listed in the sitemap
analyzed_sitemap = self._analyze_sitemap(analyzed_domain)
documents = analyzed_sitemap.documents
logger.info(f"Indexed {len(sitemap_urls)} sitemap URLs for target URL {target_url}")
logger.info(f"Scraped {len(documents)} unique URLs (others were either redirects or blacklisted)")
# Step 3: Analyze discovered URLs and search for the new ones
discovered_urls = analyzed_sitemap.discovered_urls
logger.info(f"Discovered {len(discovered_urls)} new URLs during sitemap analysis")
analyzed_discoveries = self._analyze_discoveries(discovered_urls, sitemap_urls, analyzed_domain)
discovered_urls = analyzed_discoveries.discovered_urls
self._save_results(self._path.URLS_OUTPUT, 'discovered_urls', discovered_urls, target_url)
documents.extend(analyzed_discoveries.documents)
logger.info(f"Indexed {len(discovered_urls)} new URLs for target URL {target_url}")
# Step 4: Load temp chunks first so resume works even when there are no new documents.
temp_filename = self._get_temp_chunks_filename(target_url)
temp_merged_chunks = self._load_data(self._path.TEMP_CHUNKS_OUTPUT, temp_filename)
if not documents and not temp_merged_chunks:
logger.info(f"No new content was scraped from the target URL {target_url}")
return {}
tagged_documents = []
# Step 5: Analyze the converted URLs
if documents:
self._content_cleaner.perform_content_analysis(target_url, self._normalizer.url_to_filename(target_url))
analyzied_documents = self._analyze_url_documents(documents)
self._save_results(self._path.URLS_OUTPUT, 'url_tags', analyzied_documents.url_tags)
self._save_results(self._path.URLS_OUTPUT, 'url_priorities', analyzied_documents.url_priorities)
tagged_documents = analyzied_documents.tagged_documents
# Step 6: Collect and save chunks
chunk_metadatas = self._collect_chunks(tagged_documents, target_url, temp_merged_chunks)
self._save_results(self._path.METADATA_OUTPUT, 'raw_chunk_metadata', chunk_metadatas['raw'], target_url)
self._save_results(self._path.METADATA_OUTPUT, 'merged_chunk_metadata', chunk_metadatas['merged'], target_url)
self._save_results(self._path.METADATA_OUTPUT, 'deleted_chunk_metadata', chunk_metadatas['deleted'], target_url)
logger.info(f"Collected {len(chunk_metadatas['merged'])} chunks from target URL {target_url}")
logger.info(f"Scraping finished for target URL '{target_url}'")
return chunk_metadatas['final']
def _analyze_domain(self, target_url: str) -> DomainAnalysisReport | None:
if not target_url:
logger.warning('The target URL string is empty!')
return None
# Step 1: Test whether the target URL is even accessible before initializing the scraping procedure
response = call_with_exponential_backoff(fetch_url, args=(target_url,))
if response['status'] == 'FAIL':
logger.error(f"Unaccessible target URL '{target_url}': {response['last_error']}")
return None
if not response['result']:
logger.warning(f"Unnaccessible target URL '{target_url}': Recieved client/server error!")
return None
# Step 2: Fetch and parse robots
logger.info(f"Fetching 'robots.txt' for the target URL '{target_url}'...")
robots_parser: RobotFileParser = parse_robots(target_url)
if not robots_parser:
logger.warning(
f"Could not fetch the 'robots.txt' file for the target URL '{target_url}'! " +
"(Are you sure the scraping begins from root?)"
)
return None
logger.info(f"Parsed the 'robots.txt' file for target URL '{target_url}'")
delay = robots_parser.crawl_delay('scraper')
target_domain = urlsplit(target_url).netloc
# Step 3: Fetch and parse sitemaps
logger.info(f"Fetching sitemaps for target URL {target_url}...")
sitemap_tree = sitemap_tree_for_homepage(target_url)
if isinstance(sitemap_tree, InvalidSitemap):
logger.error(f"Cannot fetch sitemap for target URL '{target_url}': Invalid sitemap structure!")
return None
page_data = []
page_urls = set()
for page in sitemap_tree.all_pages():
page_url = page.url
if not robots_parser.can_fetch('scraper', page_url) or page_url in page_urls:
continue
page_urls.add(page_url)
page_data.append(PageData(page_url, page.last_modified))
logger.info(f'Loaded sitemaps with {len(page_data)} pages')
return DomainAnalysisReport(
target = target_domain,
urls = list(page_urls),
pages = page_data,
delay = delay,
)
def _analyze_sitemap(self, domain: DomainAnalysisReport) -> UrlAnalysisReport:
documents = []
visited_urls = set()
discovered_urls = set()
rejected_urls = []
sitemap_pages = domain.pages
logger.info(f'Starting validation and scraping for sitemap URLs...')
for page in sitemap_pages:
result = self._scrape_page(page.url, domain.delay, visited_urls, last_modified=page.last_modified)
visited_urls.add(page.url)
if result.status != ScrapingStatus.OK:
if result.status == ScrapingStatus.REJECTED:
rejected_urls.append(page.url)
continue
final_url = result.final_url
documents.append(result.document)
visited_urls.add(final_url)
self._store_timestamps(final_url, result.timestamps, temp=True)
new_urls = self._normalizer.filter_discovered_urls(result.discovered_urls, visited_urls, domain.target)
discovered_urls |= new_urls
if len(rejected_urls) > len(sitemap_pages)*0.1:
rejection_rate = len(rejected_urls)/len(sitemap_pages)
logger.warning(f"Rejection rate is {rejection_rate}")
self._notif_center.send_notification(
subject = "⚠ WARNING: Scraping rejection rate is >10%!",
body = f"Rejection rate: {int(rejection_rate*100)}%\n" +
f"Failed to scrape following URLs for target domain {domain.target}:\n" +
"\n".join([f"\t- {url}" for url in rejected_urls]),
channel = "slack",
)
discovered_urls = [url for url in discovered_urls if url not in visited_urls]
return UrlAnalysisReport(
documents = documents,
discovered_urls = discovered_urls,
)
def _analyze_discoveries(
self,
discovered_urls: list,
sitemap_urls: list,
domain: DomainAnalysisReport
) -> UrlAnalysisReport:
if len(discovered_urls) == 0:
return UrlAnalysisReport([], [])
documents = []
discoveries = discovered_urls.copy()
visited_urls = set(sitemap_urls.copy())
discovered_urls = [{'url': url, 'depth': 0} for url in discovered_urls]
logger.info(f"Starting validation and scraping for discovered URLs...")
while discovered_urls:
discovered_url = discovered_urls.pop()
url = discovered_url['url']
result = self._scrape_page(url, domain.delay, visited_urls, discovery_depth=discovered_url['depth'])
visited_urls.add(url)
if not result: continue
final_url = result.final_url
documents.append(result.document)
visited_urls.add(final_url)
discoveries.append(final_url)
self._store_timestamps(final_url, result.timestamps, temp=True)
for new_url in self._normalizer.filter_discovered_urls(result.discovered_urls, visited_urls, domain.target):
discovered_urls.append({'url': new_url, 'depth': result.discovery_depth})
return UrlAnalysisReport(
documents = documents,
discovered_urls = discoveries,
)
def _analyze_url_documents(self, documents: list) -> DocumentAnalysisReport:
url_tags = {}
url_priorities = defaultdict(list)
tagged_documents = []
logger.info(f"Analyzing scraped contents of {len(documents)} pages...")
for document in documents:
url = document.name
self._content_cleaner.clean_document(document)
extracted_text = self._processor.convert_to_txt(document)
if extracted_text.strip() == '':
logger.warning(f'No text extracted from {url}. Skipping ...')
continue
url_filename = self._normalizer.url_to_filename(url)
extracted_text_file_path = os.path.join(self._path.EXTRACTED_TEXT_OUTPUT, url_filename + '.txt')
with open(extracted_text_file_path, 'w', encoding="utf-8") as f:
f.write(extracted_text)
logger.info(f"Saved extracted text for URL '{url}' under '{extracted_text_file_path}'")
language = detect_language(extracted_text)
tp_result = detect_page_topic_and_priority(extracted_text)
programs = self._processor.strategies_processor.apply_strategy(
strategy_name='programs',
arguments={'document_content': extracted_text},
)
program = programs[0] if programs else 'no program'
tags = UrlTags(
topic = tp_result['topic'],
priority = tp_result['priority'],
language = language,
program = program,
)
url_tags[url] = tags
url_priorities[tp_result['priority']].append(url)
tagged_documents.append(TaggedDocument(document, DocumentTags(program, language)))
return DocumentAnalysisReport(
url_tags = url_tags,
url_priorities = url_priorities,
tagged_documents = tagged_documents,
)
def _collect_chunks(
self,
tagged_documents: list[dict],
target_url: str,
temp_chunks: dict[str, list[ChunkMetadata]] | None = None,
) -> dict[str, list[ChunkMetadata]]:
raw_chunks = []
deleted_chunks = []
merged_chunks, final_chunks = self._read_temp_chunks(temp_chunks, tagged_documents)
program_counter = self._build_program_counter_from_merged_chunks(merged_chunks)
if merged_chunks: incupd_logger.info(f"Restored {len(merged_chunks)} chunks from temp")
for entry in tagged_documents:
document = entry.document
program = entry.tags.program
language = entry.tags.language
url = document.name
url_filename = self._normalizer.url_to_filename(url)
program_counter[program] += 1
doc_chunks_dir_path = os.path.join(config.paths.CHUNKS_OUTPUT, url_filename)
if os.path.exists(doc_chunks_dir_path): shutil.rmtree(doc_chunks_dir_path)
os.makedirs(doc_chunks_dir_path)
mergible_chunks_metadatas = []
raw_chunk_count = 0
for i, chunk in enumerate(self._processor.chunk(document), start=1):
raw_chunk_count = i
chunk_file_path = os.path.join(doc_chunks_dir_path, f"chunk_{i}.txt")
with open(chunk_file_path, 'w', encoding="utf-8") as f:
f.write(chunk['text'])
chunk_topic = detect_chunk_topic(chunk['text'])
chunk_metadata = ChunkMetadata(
chunk_id = f"{program.lower()}_{program_counter[program]:03d}_{i:02d}",
text = chunk['text'],
source_url = url,
program = program,
language = language,
topic = chunk_topic,
last_scraped = datetime.now(),
page_title = self._processor.extract_title(document),
section_heading = chunk['title'],
token_size = chunk['size'],
)
raw_chunks.append(chunk_metadata)
if chunk_topic == 'none':
deleted_chunks.append(chunk_metadata)
else:
mergible_chunks_metadatas.append(chunk_metadata)
logger.info(f"Collected {raw_chunk_count} raw chunks and saved under '{doc_chunks_dir_path}'")
merged_chunk_metadatas = self._processor.merge_chunks_by_topic(mergible_chunks_metadatas)
merged_chunks.extend(merged_chunk_metadatas)
self._store_temp_chunks(target_url, url, merged_chunk_metadatas)
logger.info(f"Merged {raw_chunk_count} raw chunks into {len(merged_chunk_metadatas)} chunks by topic")
prepared_chunks = self._processor.prepare_chunks(url, self._processor.convert_to_txt(document), merged_chunk_metadatas)
for lang in final_chunks.keys():
if lang in prepared_chunks.keys():
final_chunks[lang].extend(prepared_chunks[lang])
return {
'raw': raw_chunks,
'merged': merged_chunks,
'deleted': deleted_chunks,
'final': final_chunks,
}
def _read_temp_chunks(
self,
temp_chunks: dict[str, list[ChunkMetadata]],
tagged_documents: list[TaggedDocument]
) -> set[list, list[dict]]:
loaded_temp_chunks = temp_chunks.copy()
prepared_temp_chunks = {lang: [] for lang in config.get('AVAILABLE_LANGUAGES', ['en', 'de'])}
for url in [entry.document.name for entry in tagged_documents]:
if url in temp_chunks.keys():
incupd_logger.info(f"Deleted stored temp data for URL {url} as it was newly scraped")
del loaded_temp_chunks[url]
restored_temp_chunks = []
for url, chunks in loaded_temp_chunks.items():
url_filename = self._normalizer.url_to_filename(url)
extracted_text_path = os.path.join(self._path.EXTRACTED_TEXT_OUTPUT, url_filename + '.txt')
if not os.path.exists(extracted_text_path):
incupd_logger.warning(f"Cannot restore chunks for URL {url}: Failed to locate previously extracted contents!")
incupd_logger.warning(f"This URL will has to be rescraped in the next session")
continue
with open(extracted_text_path, 'r') as f:
url_text = f.read()
prepared_chunks = self._processor.prepare_chunks(url, url_text, chunks)
for lang in prepared_temp_chunks.keys():
if lang in prepared_chunks.keys():
prepared_temp_chunks[lang].extend(prepared_chunks[lang])
restored_temp_chunks.extend(chunks)
incupd_logger.info(f"Restored {len(chunks)} chunks for URL {url} from temp")
return restored_temp_chunks, prepared_temp_chunks
def _store_temp_chunks(self, target_url: str, url: str, chunks: list[ChunkMetadata]) -> None:
self._url_timestamps[url] = self._url_temp_timestamps[url]
temp_chunks = {url: chunks}
self._save_results(self._path.TEMP_CHUNKS_OUTPUT, self._get_temp_chunks_filename(target_url), temp_chunks)
self._save_results(self._path.SCRAPING_OUTPUT, 'url_timestamps', self._url_timestamps)
incupd_logger.info(f"Stored {len(chunks)} chunks in temp for URL {url}")
def _build_program_counter_from_merged_chunks(self, merged_chunks: list[ChunkMetadata]) -> Counter:
counter = Counter()
seen = set()
for chunk in merged_chunks:
key = (chunk.program, chunk.source_url)
if key not in seen:
counter[chunk.program] += 1
seen.add(key)
return counter
def _is_url_modified(
self,
url: str,
new_last_modified: datetime | None = None,
new_page_hash: str | None = None
) -> bool:
if url not in self._url_timestamps.keys():
return True
stored = self._url_timestamps[url]
if stored.last_modified and new_last_modified:
return stored.last_modified < new_last_modified
if new_page_hash and stored.page_hash:
return new_page_hash != stored.page_hash
return True
def _store_timestamps(self, url: str, timestamps: UrlTimestamps, temp=False) -> None:
if temp:
self._url_temp_timestamps[url] = timestamps
else:
self._url_timestamps[url] = timestamps
def _get_temp_chunks_filename(self, target_url: str) -> str:
return self._normalizer.url_to_filename(target_url) + '_merged_chunks'
def delete_temp_merged_chunks(self, target_url: str) -> None:
temp_path = os.path.join(
self._path.TEMP_CHUNKS_OUTPUT,
self._get_temp_chunks_filename(target_url) + '.json'
)
if os.path.exists(temp_path):
os.remove(temp_path)
incupd_logger.info(f"Deleted temp merged chunks file '{temp_path}'")
def _get_etag(self, url: str) -> str | None:
if url not in self._url_timestamps.keys():
return None
return self._url_timestamps[url].etag
def _is_fetch_valid(self, url: str, visited_urls: list[str], fetch_result: FetchResult) -> ScrapingStatus:
if not fetch_result:
logger.warning(f"Cannot fetch {url}! Skipping...")
return ScrapingStatus.REJECTED
if fetch_result.not_modified:
logger.info("No updates on the page, skipping...")
return ScrapingStatus.NO_UPDATES
final_url = fetch_result.final_url
if final_url != url:
logger.info(f"Redirect detected: '{url}' --> '{final_url}'")
if final_url in visited_urls:
logger.info(f"'{final_url}' was already visited, skipping...")
return ScrapingStatus.VISITED
logger.info(f"Continuing with URL '{final_url}'...")
last_modified = fetch_result.last_modified
page_hash = fetch_result.page_hash
if not self._scrape_all and not self._is_url_modified(final_url, new_last_modified=last_modified, new_page_hash=page_hash):
logger.info(f"URL {final_url} was not modified since last scraping session, skipping...")
return ScrapingStatus.NO_UPDATES
return ScrapingStatus.OK
def _is_url_prioritized(self, url) -> bool:
if url not in self._url_timestamps.keys():
return True
for prio, urls in self._url_priorities.items():
if url in urls:
return self._is_scraping_scheduled(url, prio)
return True
def _is_scraping_scheduled(self, url, prio) -> bool:
current_timestamp = datetime.now()
saved_timestamp = self._url_timestamps[url].last_scraped
time_difference = current_timestamp - saved_timestamp
if not saved_timestamp:
return True
for interval_prio, interval in config.scraping.INTERVALS.items():
if prio == interval_prio:
return time_difference.days >= interval
return True
def _scrape_page(
self, url: str,
crawl_delay: float,
visited_urls: list[str],
discovery_depth: int = 0,
last_modified: datetime | None = None
) -> ScrapingResult | None:
if not url:
return ScrapingResult(status=ScrapingStatus.REJECTED)
if self._normalizer.is_url_blacklisted(url):
logger.info(f"URL {url} is blacklisted by scraper, skipping...")
return ScrapingResult(status=ScrapingStatus.BLACKLISTED)
if url in visited_urls:
logger.info(f'URL {url} was already analyzed via redirect, skipping...')
return ScrapingResult(status=ScrapingStatus.VISITED)
if not self._scrape_all and last_modified and not self._is_url_modified(url, new_last_modified=last_modified):
logger.info(f"URL '{url}' was not modified since last scraping session, skipping...")
self._url_timestamps[url].last_modified = last_modified
return ScrapingResult(status=ScrapingStatus.NO_UPDATES)
if not self._scrape_all and not self._is_url_prioritized(url):
logger.info(f"URL {url} is not prioritized, skipping")
return ScrapingResult(status=ScrapingStatus.NO_UPDATES)
logger.info(f"Fetching head for URL '{url}'...")
etag = self._get_etag(url)
response = call_with_exponential_backoff(fetch_head, args=(url, etag), delay=crawl_delay)
if response['status'] == 'FAIL':
logger.warning(f"Failed to fetch head for URL {url}: {response['last_error']}! Skipping...")
return ScrapingResult(status=ScrapingStatus.REJECTED)
fetch_result = response['result']
validation = self._is_fetch_valid(url, visited_urls, fetch_result)
if validation != ScrapingStatus.OK:
return ScrapingResult(status=validation)
response = call_with_exponential_backoff(fetch_url, args=(url, etag), delay=crawl_delay)
if response['status'] == 'FAIL':
logger.warning(f"Failed to fetch URL {url}: {response['last_error']}! Skipping...")
return ScrapingResult(status=ScrapingStatus.REJECTED)
fetch_result = response['result']
validation = self._is_fetch_valid(url, visited_urls, fetch_result)
if validation != ScrapingStatus.OK:
return ScrapingResult(status=validation)
if not fetch_result.last_modified:
logger.warning("No information about URL last modification date exists!")
timestamps = UrlTimestamps(
last_modified = fetch_result.last_modified,
last_scraped = datetime.now(),
etag = fetch_result.etag,
page_hash = fetch_result.page_hash,
)
raw_html = fetch_result.text
final_url = fetch_result.final_url
url_filename = self._normalizer.url_to_filename(final_url)
raw_html_file_path = os.path.join(config.paths.RAW_HTML_OUTPUT, url_filename + '.html')
with open(raw_html_file_path, 'w', encoding="utf-8") as f:
f.write(raw_html)
logger.info(f"Saved fetched HTML under '{raw_html_file_path}'")
logger.info(f"Cleaning URL {final_url} from mobile data...")
cleaned_html = self._content_cleaner.clean_mobile_content(raw_html)
logger.info(f"Processing URL {final_url}...")
document = self._processor.process(final_url, cleaned_html)
if not document:
logger.warning(f"Failed to process URL '{final_url}'! Skipping...")
return ScrapingResult(status=ScrapingStatus.REJECTED)
discovered_urls = self._content_cleaner.extract_urls(document) if discovery_depth <= 3 else []
self._content_cleaner.collect_repetitive_content(document)
raw_text = self._processor.convert_to_txt(document)
raw_text_file_path = os.path.join(config.paths.RAW_TEXT_OUTPUT, url_filename + '.txt')
with open(raw_text_file_path, 'w', encoding="utf-8") as f:
f.write(raw_text)
logger.info(f"Saved raw text for URL '{final_url}' under '{raw_text_file_path}'")
return ScrapingResult(
document = document,
discovered_urls = discovered_urls,
final_url = final_url,
timestamps = timestamps,
discovery_depth = discovery_depth + 1,
status = ScrapingStatus.OK,
)
def _save_results(self, path: str, filename: str, results, target_url: str | None = None) -> None:
results_path = os.path.join(path, filename + '.json')
results_dict = {}
if os.path.exists(results_path):
try:
with open(results_path, 'r', encoding='utf-8') as f:
results_dict = json.load(f)
except Exception:
logger.warning(f"Failed to load existing {results_path}, will overwrite")
match filename:
case 'url_tags':
results_dict |= results
case 'url_timestamps':
for url, ts in results.items():
results_dict[url] = dataclass_to_dict(ts)
case 'url_priorities':
for prio, urls in results.items():
prev = set(results_dict.get(prio, []))
results_dict[prio] = list(prev.union(urls))
case _ if filename.endswith('_merged_chunks'):
for url, chunks in results.items():
results_dict[url] = [dataclass_to_dict(chunk) for chunk in chunks]
case _:
results = [dataclass_to_dict(r) for r in results]
if target_url:
results_dict[target_url] = results
else:
results_dict = results
try:
with open(results_path, 'w', encoding='utf-8') as f:
json.dump(
results_dict,
f,
indent=4,
default=lambda o: o.isoformat() if isinstance(o, datetime) else None,
)
except Exception as e:
logger.error(f"Failed to store results '{filename}'")
raise e
logger.debug(f"Stored results in file {results_path}")
def _load_data(self, path: str, filename: str):
datapath = os.path.join(path, filename + '.json')
if not os.path.exists(datapath):
logger.warning(f"Failed to locate file {datapath}; new data will be recorded")
return defaultdict(dict)
try:
with open(datapath, 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
match filename:
case 'url_timestamps':
for url, ts_dict in loaded_data.items():
loaded_data[url] = dict_to_dataclass(ts_dict, UrlTimestamps)
incupd_logger.debug(f"Loaded {len(loaded_data)} URL timestamps")
return loaded_data
case _ if filename.endswith('_merged_chunks'):
for url, chunk_metadata in loaded_data.items():
loaded_data[url] = [dict_to_dataclass(chunk, ChunkMetadata) for chunk in chunk_metadata]
incupd_logger.debug(f"Loaded {len(loaded_data)} temp merged chunks")
return loaded_data
case _:
incupd_logger.info(f"Loaded data '{filename}'")
return loaded_data
except Exception as e:
logger.error(f"Failed trying to load data '{filename}': {e}")
logger.info("New data will be recorded")
return defaultdict(dict)