shl-recommender-api / scripts /extract_concepts.py
pankaj
SHL recommender — initial deploy
870800f
"""One-time per-product concept extraction.
For each of the 377 SHL assessments in data/products.jsonl, ask the LLM to
distill its description into a structured set of concrete concepts:
{
"key_concepts": [<short noun phrase>, ...], # 4–10 items, specific
"primary_domain": "technical" | "behavioral" | "cognitive"
| "language" | "sales" | "admin"
| "leadership" | "managerial" | "other",
"use_cases": [<one-line situation>, ...] # 1–3 items
}
This is cheap (one-shot per product) and amortizes across every future
search. Cached per slug at data/concepts_cache/<slug>.json so re-runs are
free, and we can resume after rate-limit interruptions.
Reads provider/model from env (LLM_PROVIDER / LLM_MODEL). Defaults to
whatever .env says.
Usage:
LLM_PROVIDER=gemini python3 -m scripts.extract_concepts # default
LLM_PROVIDER=openai LLM_MODEL=gpt-5-mini python3 -m scripts.extract_concepts
"""
from __future__ import annotations
import argparse
import json
import os
import re
import time
from pathlib import Path
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from tqdm import tqdm
from recsys.pipeline import get_llm
from recsys.tracing import callbacks
ROOT = Path(__file__).resolve().parent.parent
PRODUCTS = ROOT / "data" / "products.jsonl"
CACHE_DIR = ROOT / "data" / "concepts_cache"
OUT = ROOT / "data" / "products_with_concepts.jsonl"
PROMPT = ChatPromptTemplate.from_messages(
[
(
"system",
"You are an SHL assessment cataloger. Given the name, test type, and "
"description of one assessment, extract the concrete skills, tools, "
"and topics the test actually measures.\n\n"
"Output STRICT JSON, no prose, no code fences:\n"
"{{\n"
' "key_concepts": [<short noun phrase>, ...],\n'
' "primary_domain": "<technical|behavioral|cognitive|language|sales|admin|leadership|managerial|other>",\n'
' "use_cases": [<one-line situation when this test is appropriate>, ...]\n'
"}}\n\n"
"Rules:\n"
" • 4–10 key_concepts. Be specific: 'Java OOP', not 'programming'.\n"
" • Drop legal / compliance / advertising boilerplate.\n"
" • Strip duplicates and near-synonyms.\n"
" • use_cases: 1–3 items, each describing a concrete recruiter need."
),
(
"human",
"Name: {name}\n"
"Test type: {test_type}\n"
"Description:\n{description}\n\n"
"Return the JSON now."
),
]
)
_JSON_RE = re.compile(r"\{.*\}", re.S)
def _slug(url: str) -> str:
m = re.search(r"/product-catalog/view/([^/?#]+)/?", url, re.I)
return m.group(1).lower() if m else ""
def _cache_path(slug: str) -> Path:
return CACHE_DIR / f"{slug}.json"
def _parse(raw: str) -> dict | None:
m = _JSON_RE.search(raw or "")
if not m:
return None
try:
obj = json.loads(m.group(0))
except json.JSONDecodeError:
return None
out = {}
if isinstance(obj.get("key_concepts"), list):
out["key_concepts"] = [
str(x).strip() for x in obj["key_concepts"] if str(x).strip()
][:12]
if isinstance(obj.get("primary_domain"), str):
out["primary_domain"] = obj["primary_domain"].strip().lower()
if isinstance(obj.get("use_cases"), list):
out["use_cases"] = [
str(x).strip() for x in obj["use_cases"] if str(x).strip()
][:3]
if not out.get("key_concepts"):
return None
out.setdefault("primary_domain", "other")
out.setdefault("use_cases", [])
return out
def _process_one(chain, p: dict) -> tuple[str, str, dict | None]:
"""Run one product through the chain. Returns (slug, raw_or_err, parsed)."""
slug = _slug(p["url"])
if not slug:
return ("", "no-slug", None)
try:
raw = chain.invoke(
{
"name": p.get("name", ""),
"test_type": ", ".join(p.get("test_type") or []),
"description": (p.get("description") or "")[:2000],
},
config={"callbacks": callbacks()},
)
except Exception as e:
return (slug, f"{type(e).__name__}: {str(e)[:120]}", None)
parsed = _parse(raw)
return (slug, raw, parsed)
def main() -> None:
from concurrent.futures import ThreadPoolExecutor, as_completed
ap = argparse.ArgumentParser()
ap.add_argument("--workers", type=int, default=20,
help="parallel LLM calls (OpenAI handles this fine; lower for free Gemini)")
ap.add_argument("--limit", type=int, default=0,
help="stop after N products (0 = all)")
args = ap.parse_args()
CACHE_DIR.mkdir(parents=True, exist_ok=True)
products = [
json.loads(l) for l in PRODUCTS.read_text(encoding="utf-8").splitlines() if l.strip()
]
# Split into already-cached vs to-fetch
todo: list[dict] = []
cached_count = 0
for p in products:
slug = _slug(p["url"])
if slug and _cache_path(slug).exists():
cached_count += 1
else:
todo.append(p)
if args.limit:
todo = todo[: args.limit]
print(f"products: {len(products)} | cached: {cached_count} | to fetch: {len(todo)} "
f"| workers: {args.workers}")
chain = PROMPT | get_llm() | StrOutputParser()
fetched = 0
failed = 0
with ThreadPoolExecutor(max_workers=args.workers) as ex:
futs = {ex.submit(_process_one, chain, p): p for p in todo}
bar = tqdm(as_completed(futs), total=len(futs), desc="extract")
for fut in bar:
slug, raw_or_err, parsed = fut.result()
if not slug:
failed += 1
continue
if parsed is None:
tqdm.write(f" FAIL {slug}: {raw_or_err[:80]}")
failed += 1
continue
_cache_path(slug).write_text(json.dumps(parsed, ensure_ascii=False, indent=2))
fetched += 1
bar.set_postfix(fetched=fetched, failed=failed)
extracted = cached_count + fetched
print(f"\nfetched: {fetched} | from cache: {cached_count} | failed: {failed}")
print(f"merging {extracted}/{len(products)} into {OUT}")
# Merge cache into single output file
with OUT.open("w", encoding="utf-8") as f:
for p in products:
slug = _slug(p["url"])
if not slug:
continue
cache = _cache_path(slug)
if cache.exists():
try:
extra = json.loads(cache.read_text(encoding="utf-8"))
p = {**p, **extra}
except Exception:
pass
f.write(json.dumps(p, ensure_ascii=False) + "\n")
print(f"wrote {OUT}")
if __name__ == "__main__":
main()