"""Turn data/products.jsonl into data/documents.jsonl. Each output line is one product flattened into: - id: URL slug (stable, matches train labels) - text: a structured blob to feed an embedding model - metadata: fields needed at query time (name, url, duration, etc.) The text blob includes name, test type, duration, description, job levels — each on its own line. We give the embedding model as much signal as we have. """ from __future__ import annotations import json from pathlib import Path from recsys.urls import slug ROOT = Path(__file__).resolve().parent.parent IN = ROOT / "data" / "products.jsonl" IN_ENRICHED = ROOT / "data" / "products_with_concepts.jsonl" IN_PRE = ROOT / "data" / "prepackaged_products.jsonl" OUT = ROOT / "data" / "documents.jsonl" def build_text(p: dict) -> str: parts: list[str] = [f"Name: {p['name']}"] tt = p.get("test_type") or [] if tt: parts.append("Test type: " + ", ".join(tt)) if p.get("duration") is not None: parts.append(f"Duration: {p['duration']} minutes") parts.append(f"Remote support: {p.get('remote_support', 'No')}") parts.append(f"Adaptive support: {p.get('adaptive_support', 'No')}") if p.get("job_levels"): parts.append(f"Job levels: {p['job_levels']}") # LLM-extracted structured concepts (if present) — these compress the # description into the actual signal an embedder can match against. if p.get("key_concepts"): parts.append("Key concepts: " + ", ".join(p["key_concepts"])) if p.get("primary_domain"): parts.append(f"Primary domain: {p['primary_domain']}") if p.get("use_cases"): parts.append("Use cases: " + " | ".join(p["use_cases"])) if p.get("description"): parts.append(f"Description: {p['description']}") return "\n".join(parts) def main() -> None: import argparse, os ap = argparse.ArgumentParser() ap.add_argument( "--include-pre-packaged", action="store_true", default=os.environ.get("INCLUDE_PRE_PACKAGED", "0") == "1", help="Include Pre-packaged Job Solutions in the index (off by default per spec).", ) args = ap.parse_args() # Prefer the enriched file (with key_concepts) when it exists; fall back # to plain products.jsonl. This makes the change reversible: if the # enriched file is deleted or renamed, the pipeline silently downgrades # to description-only. src = IN_ENRICHED if IN_ENRICHED.exists() else IN print(f"reading products from: {src.name}") products = [json.loads(l) for l in src.read_text(encoding="utf-8").splitlines() if l.strip()] pre: list[dict] = [] if args.include_pre_packaged and IN_PRE.exists(): pre = [json.loads(l) for l in IN_PRE.read_text(encoding="utf-8").splitlines() if l.strip()] for p in pre: p.setdefault("category", "pre_packaged") for p in products: p.setdefault("category", "individual") all_products = products + pre written = 0 seen: set[str] = set() with OUT.open("w", encoding="utf-8") as f: for p in all_products: sid = slug(p["url"]) if sid is None or sid in seen: continue seen.add(sid) doc = { "id": sid, "text": build_text(p), "metadata": { "name": p["name"], "url": p["url"], "category": p.get("category", "individual"), "remote_support": p.get("remote_support", "No"), "adaptive_support": p.get("adaptive_support", "No"), "test_type": ", ".join(p.get("test_type") or []), "duration": p["duration"] if p.get("duration") is not None else -1, "description": p.get("description", ""), }, } f.write(json.dumps(doc, ensure_ascii=False) + "\n") written += 1 n_ind = sum(1 for p in products if slug(p["url"])) n_pre = sum(1 for p in pre if slug(p["url"])) print(f"wrote {written} documents ({n_ind} individual + {n_pre} pre-packaged) to {OUT}") sample = json.loads(OUT.read_text(encoding="utf-8").splitlines()[0]) print("\nsample id:", sample["id"]) print("sample text:") print(sample["text"]) if __name__ == "__main__": main()