File size: 3,159 Bytes
5a3b322
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from __future__ import annotations

import json
from pathlib import Path
from typing import Optional

import pandas as pd
import structlog

from crawler.storage import Storage
from crawler.utils import now_iso

logger = structlog.get_logger(__name__)


def _normalize_row(row) -> dict:
    downloads = row["downloads"]
    if isinstance(downloads, str):
        try:
            downloads = json.loads(downloads)
        except Exception:
            downloads = None
    job_levels = row["job_levels"]
    if isinstance(job_levels, str):
        try:
            job_levels = json.loads(job_levels)
        except Exception:
            job_levels = [j.strip() for j in job_levels.split(",") if j.strip()]
    languages = row.get("languages")
    if isinstance(languages, str):
        try:
            languages = json.loads(languages)
        except Exception:
            languages = [l.strip() for l in languages.split(",") if l.strip()]
    duration_minutes = row["duration_minutes"]
    duration_hours = None
    if duration_minutes is not None:
        try:
            duration_hours = float(duration_minutes) / 60.0
        except Exception:
            duration_hours = None
    return {
        "url": row["url"],
        "name": row["name"],
        "description": row["description"],
        "test_type": row["test_type"],
        "test_type_full": row.get("test_type_full"),
        "remote_support": bool(row["remote_support"]) if row["remote_support"] is not None else None,
        "adaptive_support": bool(row["adaptive_support"]) if row["adaptive_support"] is not None else None,
        "duration": duration_minutes,
        "duration_hours": duration_hours,
        "job_levels": job_levels,
        "languages": languages,
        "downloads": downloads,
        "source": "shl_product_catalog",
        "crawled_at": now_iso(),
    }


def export_catalog(
    storage: Storage,
    parquet_path: str,
    jsonl_path: Optional[str] = None,
    min_count: int = 377,
    limit: Optional[int] = None,
) -> None:
    rows = storage.fetch_assessments()
    logger.info("export.assessments.fetched", count=len(rows))

    if len(rows) < min_count:
        raise RuntimeError(f"Validation failed: expected at least {min_count} assessments, got {len(rows)}")

    records = [_normalize_row(dict(r)) for r in rows]
    df = pd.DataFrame.from_records(records)
    if limit:
        df = df.head(limit)
        logger.info("export.limit.applied", limit=limit, rows=len(df))

    Path(parquet_path).parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(parquet_path, index=False)
    logger.info("export.parquet.write", path=parquet_path, rows=len(df))

    if jsonl_path:
        df.to_json(jsonl_path, orient="records", lines=True, force_ascii=False)
        logger.info("export.jsonl.write", path=jsonl_path, rows=len(df))

    missing_desc = df["description"].isna().sum()
    missing_duration = df["duration"].isna().sum()
    logger.info(
        "export.summary",
        missing_description=missing_desc,
        missing_duration=missing_duration,
        test_type_counts=df["test_type"].value_counts(dropna=False).to_dict(),
    )