from __future__ import annotations import json from pathlib import Path from typing import Optional import pandas as pd import structlog from crawler.storage import Storage from crawler.utils import now_iso logger = structlog.get_logger(__name__) def _normalize_row(row) -> dict: downloads = row["downloads"] if isinstance(downloads, str): try: downloads = json.loads(downloads) except Exception: downloads = None job_levels = row["job_levels"] if isinstance(job_levels, str): try: job_levels = json.loads(job_levels) except Exception: job_levels = [j.strip() for j in job_levels.split(",") if j.strip()] languages = row.get("languages") if isinstance(languages, str): try: languages = json.loads(languages) except Exception: languages = [l.strip() for l in languages.split(",") if l.strip()] duration_minutes = row["duration_minutes"] duration_hours = None if duration_minutes is not None: try: duration_hours = float(duration_minutes) / 60.0 except Exception: duration_hours = None return { "url": row["url"], "name": row["name"], "description": row["description"], "test_type": row["test_type"], "test_type_full": row.get("test_type_full"), "remote_support": bool(row["remote_support"]) if row["remote_support"] is not None else None, "adaptive_support": bool(row["adaptive_support"]) if row["adaptive_support"] is not None else None, "duration": duration_minutes, "duration_hours": duration_hours, "job_levels": job_levels, "languages": languages, "downloads": downloads, "source": "shl_product_catalog", "crawled_at": now_iso(), } def export_catalog( storage: Storage, parquet_path: str, jsonl_path: Optional[str] = None, min_count: int = 377, limit: Optional[int] = None, ) -> None: rows = storage.fetch_assessments() logger.info("export.assessments.fetched", count=len(rows)) if len(rows) < min_count: raise RuntimeError(f"Validation failed: expected at least {min_count} assessments, got {len(rows)}") records = [_normalize_row(dict(r)) for r in rows] df = pd.DataFrame.from_records(records) if limit: df = df.head(limit) logger.info("export.limit.applied", limit=limit, rows=len(df)) Path(parquet_path).parent.mkdir(parents=True, exist_ok=True) df.to_parquet(parquet_path, index=False) logger.info("export.parquet.write", path=parquet_path, rows=len(df)) if jsonl_path: df.to_json(jsonl_path, orient="records", lines=True, force_ascii=False) logger.info("export.jsonl.write", path=jsonl_path, rows=len(df)) missing_desc = df["description"].isna().sum() missing_duration = df["duration"].isna().sum() logger.info( "export.summary", missing_description=missing_desc, missing_duration=missing_duration, test_type_counts=df["test_type"].value_counts(dropna=False).to_dict(), )