github-actions
Sync from GitHub 2025-12-17T12:18:53Z
5a3b322
from __future__ import annotations
import json
from pathlib import Path
from typing import Optional
import pandas as pd
import structlog
from crawler.storage import Storage
from crawler.utils import now_iso
logger = structlog.get_logger(__name__)
def _normalize_row(row) -> dict:
downloads = row["downloads"]
if isinstance(downloads, str):
try:
downloads = json.loads(downloads)
except Exception:
downloads = None
job_levels = row["job_levels"]
if isinstance(job_levels, str):
try:
job_levels = json.loads(job_levels)
except Exception:
job_levels = [j.strip() for j in job_levels.split(",") if j.strip()]
languages = row.get("languages")
if isinstance(languages, str):
try:
languages = json.loads(languages)
except Exception:
languages = [l.strip() for l in languages.split(",") if l.strip()]
duration_minutes = row["duration_minutes"]
duration_hours = None
if duration_minutes is not None:
try:
duration_hours = float(duration_minutes) / 60.0
except Exception:
duration_hours = None
return {
"url": row["url"],
"name": row["name"],
"description": row["description"],
"test_type": row["test_type"],
"test_type_full": row.get("test_type_full"),
"remote_support": bool(row["remote_support"]) if row["remote_support"] is not None else None,
"adaptive_support": bool(row["adaptive_support"]) if row["adaptive_support"] is not None else None,
"duration": duration_minutes,
"duration_hours": duration_hours,
"job_levels": job_levels,
"languages": languages,
"downloads": downloads,
"source": "shl_product_catalog",
"crawled_at": now_iso(),
}
def export_catalog(
storage: Storage,
parquet_path: str,
jsonl_path: Optional[str] = None,
min_count: int = 377,
limit: Optional[int] = None,
) -> None:
rows = storage.fetch_assessments()
logger.info("export.assessments.fetched", count=len(rows))
if len(rows) < min_count:
raise RuntimeError(f"Validation failed: expected at least {min_count} assessments, got {len(rows)}")
records = [_normalize_row(dict(r)) for r in rows]
df = pd.DataFrame.from_records(records)
if limit:
df = df.head(limit)
logger.info("export.limit.applied", limit=limit, rows=len(df))
Path(parquet_path).parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(parquet_path, index=False)
logger.info("export.parquet.write", path=parquet_path, rows=len(df))
if jsonl_path:
df.to_json(jsonl_path, orient="records", lines=True, force_ascii=False)
logger.info("export.jsonl.write", path=jsonl_path, rows=len(df))
missing_desc = df["description"].isna().sum()
missing_duration = df["duration"].isna().sum()
logger.info(
"export.summary",
missing_description=missing_desc,
missing_duration=missing_duration,
test_type_counts=df["test_type"].value_counts(dropna=False).to_dict(),
)