"""One-time per-product concept extraction. For each of the 377 SHL assessments in data/products.jsonl, ask the LLM to distill its description into a structured set of concrete concepts: { "key_concepts": [, ...], # 4–10 items, specific "primary_domain": "technical" | "behavioral" | "cognitive" | "language" | "sales" | "admin" | "leadership" | "managerial" | "other", "use_cases": [, ...] # 1–3 items } This is cheap (one-shot per product) and amortizes across every future search. Cached per slug at data/concepts_cache/.json so re-runs are free, and we can resume after rate-limit interruptions. Reads provider/model from env (LLM_PROVIDER / LLM_MODEL). Defaults to whatever .env says. Usage: LLM_PROVIDER=gemini python3 -m scripts.extract_concepts # default LLM_PROVIDER=openai LLM_MODEL=gpt-5-mini python3 -m scripts.extract_concepts """ from __future__ import annotations import argparse import json import os import re import time from pathlib import Path from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from tqdm import tqdm from recsys.pipeline import get_llm from recsys.tracing import callbacks ROOT = Path(__file__).resolve().parent.parent PRODUCTS = ROOT / "data" / "products.jsonl" CACHE_DIR = ROOT / "data" / "concepts_cache" OUT = ROOT / "data" / "products_with_concepts.jsonl" PROMPT = ChatPromptTemplate.from_messages( [ ( "system", "You are an SHL assessment cataloger. Given the name, test type, and " "description of one assessment, extract the concrete skills, tools, " "and topics the test actually measures.\n\n" "Output STRICT JSON, no prose, no code fences:\n" "{{\n" ' "key_concepts": [, ...],\n' ' "primary_domain": "",\n' ' "use_cases": [, ...]\n' "}}\n\n" "Rules:\n" " • 4–10 key_concepts. Be specific: 'Java OOP', not 'programming'.\n" " • Drop legal / compliance / advertising boilerplate.\n" " • Strip duplicates and near-synonyms.\n" " • use_cases: 1–3 items, each describing a concrete recruiter need." ), ( "human", "Name: {name}\n" "Test type: {test_type}\n" "Description:\n{description}\n\n" "Return the JSON now." ), ] ) _JSON_RE = re.compile(r"\{.*\}", re.S) def _slug(url: str) -> str: m = re.search(r"/product-catalog/view/([^/?#]+)/?", url, re.I) return m.group(1).lower() if m else "" def _cache_path(slug: str) -> Path: return CACHE_DIR / f"{slug}.json" def _parse(raw: str) -> dict | None: m = _JSON_RE.search(raw or "") if not m: return None try: obj = json.loads(m.group(0)) except json.JSONDecodeError: return None out = {} if isinstance(obj.get("key_concepts"), list): out["key_concepts"] = [ str(x).strip() for x in obj["key_concepts"] if str(x).strip() ][:12] if isinstance(obj.get("primary_domain"), str): out["primary_domain"] = obj["primary_domain"].strip().lower() if isinstance(obj.get("use_cases"), list): out["use_cases"] = [ str(x).strip() for x in obj["use_cases"] if str(x).strip() ][:3] if not out.get("key_concepts"): return None out.setdefault("primary_domain", "other") out.setdefault("use_cases", []) return out def _process_one(chain, p: dict) -> tuple[str, str, dict | None]: """Run one product through the chain. Returns (slug, raw_or_err, parsed).""" slug = _slug(p["url"]) if not slug: return ("", "no-slug", None) try: raw = chain.invoke( { "name": p.get("name", ""), "test_type": ", ".join(p.get("test_type") or []), "description": (p.get("description") or "")[:2000], }, config={"callbacks": callbacks()}, ) except Exception as e: return (slug, f"{type(e).__name__}: {str(e)[:120]}", None) parsed = _parse(raw) return (slug, raw, parsed) def main() -> None: from concurrent.futures import ThreadPoolExecutor, as_completed ap = argparse.ArgumentParser() ap.add_argument("--workers", type=int, default=20, help="parallel LLM calls (OpenAI handles this fine; lower for free Gemini)") ap.add_argument("--limit", type=int, default=0, help="stop after N products (0 = all)") args = ap.parse_args() CACHE_DIR.mkdir(parents=True, exist_ok=True) products = [ json.loads(l) for l in PRODUCTS.read_text(encoding="utf-8").splitlines() if l.strip() ] # Split into already-cached vs to-fetch todo: list[dict] = [] cached_count = 0 for p in products: slug = _slug(p["url"]) if slug and _cache_path(slug).exists(): cached_count += 1 else: todo.append(p) if args.limit: todo = todo[: args.limit] print(f"products: {len(products)} | cached: {cached_count} | to fetch: {len(todo)} " f"| workers: {args.workers}") chain = PROMPT | get_llm() | StrOutputParser() fetched = 0 failed = 0 with ThreadPoolExecutor(max_workers=args.workers) as ex: futs = {ex.submit(_process_one, chain, p): p for p in todo} bar = tqdm(as_completed(futs), total=len(futs), desc="extract") for fut in bar: slug, raw_or_err, parsed = fut.result() if not slug: failed += 1 continue if parsed is None: tqdm.write(f" FAIL {slug}: {raw_or_err[:80]}") failed += 1 continue _cache_path(slug).write_text(json.dumps(parsed, ensure_ascii=False, indent=2)) fetched += 1 bar.set_postfix(fetched=fetched, failed=failed) extracted = cached_count + fetched print(f"\nfetched: {fetched} | from cache: {cached_count} | failed: {failed}") print(f"merging {extracted}/{len(products)} into {OUT}") # Merge cache into single output file with OUT.open("w", encoding="utf-8") as f: for p in products: slug = _slug(p["url"]) if not slug: continue cache = _cache_path(slug) if cache.exists(): try: extra = json.loads(cache.read_text(encoding="utf-8")) p = {**p, **extra} except Exception: pass f.write(json.dumps(p, ensure_ascii=False) + "\n") print(f"wrote {OUT}") if __name__ == "__main__": main()