from __future__ import annotations import json import os import sqlite3 from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List, Optional import structlog from crawler.utils import canonicalize_url, make_assessment_id, now_iso logger = structlog.get_logger(__name__) PAGE_TYPE_CATALOG = "CATALOG" PAGE_TYPE_DETAIL = "DETAIL" PARSE_PENDING = "PENDING" PARSE_PARSED = "PARSED" PARSE_FAILED = "FAILED" @dataclass class PageRecord: url: str page_type: str http_status: Optional[int] = None html: Optional[str] = None error: Optional[str] = None retry_count: int = 0 parse_status: str = PARSE_PENDING class Storage: def __init__(self, db_path: str) -> None: self.db_path = db_path Path(db_path).parent.mkdir(parents=True, exist_ok=True) self.conn = sqlite3.connect(self.db_path) self.conn.row_factory = sqlite3.Row self.ensure_schema() def ensure_schema(self) -> None: logger.info("storage.schema.ensure", db_path=self.db_path) cur = self.conn.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS pages ( url TEXT PRIMARY KEY, url_canonical TEXT UNIQUE, page_type TEXT, http_status INTEGER, fetched_at TEXT, html TEXT, error TEXT, retry_count INTEGER DEFAULT 0, parse_status TEXT DEFAULT 'PENDING' ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS assessments ( assessment_id TEXT PRIMARY KEY, url TEXT UNIQUE, name TEXT, description TEXT, test_type TEXT, test_type_full TEXT, remote_support INTEGER, adaptive_support INTEGER, duration_minutes INTEGER, job_levels TEXT, languages TEXT, downloads TEXT, source_catalog_page TEXT, discovered_at TEXT, last_updated_at TEXT ) """ ) cur.execute( """ CREATE TABLE IF NOT EXISTS crawl_meta ( run_id TEXT, started_at TEXT, finished_at TEXT, total_catalog_pages INTEGER, total_detail_pages INTEGER, individual_assessment_count INTEGER, notes TEXT ) """ ) self.conn.commit() def upsert_page(self, record: PageRecord) -> None: canonical = canonicalize_url(record.url) logger.debug("storage.page.upsert", url=record.url, page_type=record.page_type) self.conn.execute( """ INSERT INTO pages (url, url_canonical, page_type, http_status, fetched_at, html, error, retry_count, parse_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(url) DO UPDATE SET page_type=excluded.page_type, http_status=excluded.http_status, fetched_at=excluded.fetched_at, html=excluded.html, error=excluded.error, retry_count=excluded.retry_count, parse_status=excluded.parse_status """, ( record.url, canonical, record.page_type, record.http_status, now_iso(), record.html, record.error, record.retry_count, record.parse_status, ), ) self.conn.commit() def update_parse_status(self, url: str, status: str) -> None: self.conn.execute("UPDATE pages SET parse_status=? WHERE url=?", (status, url)) self.conn.commit() def get_pages_by_type(self, page_type: str, parse_status: Optional[str] = None) -> List[sqlite3.Row]: cur = self.conn.cursor() if parse_status: cur.execute( "SELECT * FROM pages WHERE page_type=? AND parse_status=? ORDER BY url", (page_type, parse_status) ) else: cur.execute("SELECT * FROM pages WHERE page_type=? ORDER BY url", (page_type,)) return cur.fetchall() def upsert_assessment(self, data: Dict[str, Any]) -> None: url = data["url"] assessment_id = data.get("assessment_id") or make_assessment_id(url) data = {**data, "assessment_id": assessment_id} downloads = data.get("downloads") if downloads is not None and not isinstance(downloads, str): downloads = json.dumps(downloads) job_levels = data.get("job_levels") if isinstance(job_levels, (list, tuple)): job_levels = json.dumps(job_levels) languages = data.get("languages") if isinstance(languages, (list, tuple)): languages = json.dumps(languages) logger.debug("storage.assessment.upsert", url=url) self.conn.execute( """ INSERT INTO assessments ( assessment_id, url, name, description, test_type, test_type_full, remote_support, adaptive_support, duration_minutes, job_levels, languages, downloads, source_catalog_page, discovered_at, last_updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(assessment_id) DO UPDATE SET url=excluded.url, name=COALESCE(excluded.name, assessments.name), description=COALESCE(excluded.description, assessments.description), test_type=COALESCE(excluded.test_type, assessments.test_type), test_type_full=COALESCE(excluded.test_type_full, assessments.test_type_full), remote_support=COALESCE(excluded.remote_support, assessments.remote_support), adaptive_support=COALESCE(excluded.adaptive_support, assessments.adaptive_support), duration_minutes=COALESCE(excluded.duration_minutes, assessments.duration_minutes), job_levels=COALESCE(excluded.job_levels, assessments.job_levels), languages=COALESCE(excluded.languages, assessments.languages), downloads=COALESCE(excluded.downloads, assessments.downloads), source_catalog_page=COALESCE(excluded.source_catalog_page, assessments.source_catalog_page), last_updated_at=excluded.last_updated_at """, ( data["assessment_id"], url, data.get("name"), data.get("description"), data.get("test_type"), data.get("test_type_full"), data.get("remote_support"), data.get("adaptive_support"), data.get("duration_minutes"), job_levels, languages, downloads, data.get("source_catalog_page"), data.get("discovered_at") or now_iso(), data.get("last_updated_at") or now_iso(), ), ) self.conn.commit() def fetch_assessments(self) -> List[sqlite3.Row]: cur = self.conn.cursor() cur.execute("SELECT * FROM assessments ORDER BY name") return cur.fetchall() def count_assessments(self) -> int: cur = self.conn.cursor() cur.execute("SELECT COUNT(*) FROM assessments") return cur.fetchone()[0] def close(self) -> None: self.conn.close()