File size: 3,159 Bytes
5a3b322 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
from __future__ import annotations
import json
from pathlib import Path
from typing import Optional
import pandas as pd
import structlog
from crawler.storage import Storage
from crawler.utils import now_iso
logger = structlog.get_logger(__name__)
def _normalize_row(row) -> dict:
downloads = row["downloads"]
if isinstance(downloads, str):
try:
downloads = json.loads(downloads)
except Exception:
downloads = None
job_levels = row["job_levels"]
if isinstance(job_levels, str):
try:
job_levels = json.loads(job_levels)
except Exception:
job_levels = [j.strip() for j in job_levels.split(",") if j.strip()]
languages = row.get("languages")
if isinstance(languages, str):
try:
languages = json.loads(languages)
except Exception:
languages = [l.strip() for l in languages.split(",") if l.strip()]
duration_minutes = row["duration_minutes"]
duration_hours = None
if duration_minutes is not None:
try:
duration_hours = float(duration_minutes) / 60.0
except Exception:
duration_hours = None
return {
"url": row["url"],
"name": row["name"],
"description": row["description"],
"test_type": row["test_type"],
"test_type_full": row.get("test_type_full"),
"remote_support": bool(row["remote_support"]) if row["remote_support"] is not None else None,
"adaptive_support": bool(row["adaptive_support"]) if row["adaptive_support"] is not None else None,
"duration": duration_minutes,
"duration_hours": duration_hours,
"job_levels": job_levels,
"languages": languages,
"downloads": downloads,
"source": "shl_product_catalog",
"crawled_at": now_iso(),
}
def export_catalog(
storage: Storage,
parquet_path: str,
jsonl_path: Optional[str] = None,
min_count: int = 377,
limit: Optional[int] = None,
) -> None:
rows = storage.fetch_assessments()
logger.info("export.assessments.fetched", count=len(rows))
if len(rows) < min_count:
raise RuntimeError(f"Validation failed: expected at least {min_count} assessments, got {len(rows)}")
records = [_normalize_row(dict(r)) for r in rows]
df = pd.DataFrame.from_records(records)
if limit:
df = df.head(limit)
logger.info("export.limit.applied", limit=limit, rows=len(df))
Path(parquet_path).parent.mkdir(parents=True, exist_ok=True)
df.to_parquet(parquet_path, index=False)
logger.info("export.parquet.write", path=parquet_path, rows=len(df))
if jsonl_path:
df.to_json(jsonl_path, orient="records", lines=True, force_ascii=False)
logger.info("export.jsonl.write", path=jsonl_path, rows=len(df))
missing_desc = df["description"].isna().sum()
missing_duration = df["duration"].isna().sum()
logger.info(
"export.summary",
missing_description=missing_desc,
missing_duration=missing_duration,
test_type_counts=df["test_type"].value_counts(dropna=False).to_dict(),
)
|