File size: 4,399 Bytes
870800f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Turn data/products.jsonl into data/documents.jsonl.

Each output line is one product flattened into:
  - id:       URL slug (stable, matches train labels)
  - text:     a structured blob to feed an embedding model
  - metadata: fields needed at query time (name, url, duration, etc.)

The text blob includes name, test type, duration, description, job levels —
each on its own line. We give the embedding model as much signal as we have.
"""
from __future__ import annotations

import json
from pathlib import Path

from recsys.urls import slug

ROOT = Path(__file__).resolve().parent.parent
IN = ROOT / "data" / "products.jsonl"
IN_ENRICHED = ROOT / "data" / "products_with_concepts.jsonl"
IN_PRE = ROOT / "data" / "prepackaged_products.jsonl"
OUT = ROOT / "data" / "documents.jsonl"


def build_text(p: dict) -> str:
    parts: list[str] = [f"Name: {p['name']}"]
    tt = p.get("test_type") or []
    if tt:
        parts.append("Test type: " + ", ".join(tt))
    if p.get("duration") is not None:
        parts.append(f"Duration: {p['duration']} minutes")
    parts.append(f"Remote support: {p.get('remote_support', 'No')}")
    parts.append(f"Adaptive support: {p.get('adaptive_support', 'No')}")
    if p.get("job_levels"):
        parts.append(f"Job levels: {p['job_levels']}")
    # LLM-extracted structured concepts (if present) — these compress the
    # description into the actual signal an embedder can match against.
    if p.get("key_concepts"):
        parts.append("Key concepts: " + ", ".join(p["key_concepts"]))
    if p.get("primary_domain"):
        parts.append(f"Primary domain: {p['primary_domain']}")
    if p.get("use_cases"):
        parts.append("Use cases: " + " | ".join(p["use_cases"]))
    if p.get("description"):
        parts.append(f"Description: {p['description']}")
    return "\n".join(parts)


def main() -> None:
    import argparse, os

    ap = argparse.ArgumentParser()
    ap.add_argument(
        "--include-pre-packaged",
        action="store_true",
        default=os.environ.get("INCLUDE_PRE_PACKAGED", "0") == "1",
        help="Include Pre-packaged Job Solutions in the index (off by default per spec).",
    )
    args = ap.parse_args()

    # Prefer the enriched file (with key_concepts) when it exists; fall back
    # to plain products.jsonl. This makes the change reversible: if the
    # enriched file is deleted or renamed, the pipeline silently downgrades
    # to description-only.
    src = IN_ENRICHED if IN_ENRICHED.exists() else IN
    print(f"reading products from: {src.name}")
    products = [json.loads(l) for l in src.read_text(encoding="utf-8").splitlines() if l.strip()]
    pre: list[dict] = []
    if args.include_pre_packaged and IN_PRE.exists():
        pre = [json.loads(l) for l in IN_PRE.read_text(encoding="utf-8").splitlines() if l.strip()]
        for p in pre:
            p.setdefault("category", "pre_packaged")
    for p in products:
        p.setdefault("category", "individual")

    all_products = products + pre
    written = 0
    seen: set[str] = set()
    with OUT.open("w", encoding="utf-8") as f:
        for p in all_products:
            sid = slug(p["url"])
            if sid is None or sid in seen:
                continue
            seen.add(sid)
            doc = {
                "id": sid,
                "text": build_text(p),
                "metadata": {
                    "name": p["name"],
                    "url": p["url"],
                    "category": p.get("category", "individual"),
                    "remote_support": p.get("remote_support", "No"),
                    "adaptive_support": p.get("adaptive_support", "No"),
                    "test_type": ", ".join(p.get("test_type") or []),
                    "duration": p["duration"] if p.get("duration") is not None else -1,
                    "description": p.get("description", ""),
                },
            }
            f.write(json.dumps(doc, ensure_ascii=False) + "\n")
            written += 1
    n_ind = sum(1 for p in products if slug(p["url"]))
    n_pre = sum(1 for p in pre if slug(p["url"]))
    print(f"wrote {written} documents ({n_ind} individual + {n_pre} pre-packaged) to {OUT}")
    sample = json.loads(OUT.read_text(encoding="utf-8").splitlines()[0])
    print("\nsample id:", sample["id"])
    print("sample text:")
    print(sample["text"])


if __name__ == "__main__":
    main()