Spaces:
Sleeping
Sleeping
| """One-time per-product concept extraction. | |
| For each of the 377 SHL assessments in data/products.jsonl, ask the LLM to | |
| distill its description into a structured set of concrete concepts: | |
| { | |
| "key_concepts": [<short noun phrase>, ...], # 4–10 items, specific | |
| "primary_domain": "technical" | "behavioral" | "cognitive" | |
| | "language" | "sales" | "admin" | |
| | "leadership" | "managerial" | "other", | |
| "use_cases": [<one-line situation>, ...] # 1–3 items | |
| } | |
| This is cheap (one-shot per product) and amortizes across every future | |
| search. Cached per slug at data/concepts_cache/<slug>.json so re-runs are | |
| free, and we can resume after rate-limit interruptions. | |
| Reads provider/model from env (LLM_PROVIDER / LLM_MODEL). Defaults to | |
| whatever .env says. | |
| Usage: | |
| LLM_PROVIDER=gemini python3 -m scripts.extract_concepts # default | |
| LLM_PROVIDER=openai LLM_MODEL=gpt-5-mini python3 -m scripts.extract_concepts | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import time | |
| from pathlib import Path | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from tqdm import tqdm | |
| from recsys.pipeline import get_llm | |
| from recsys.tracing import callbacks | |
| ROOT = Path(__file__).resolve().parent.parent | |
| PRODUCTS = ROOT / "data" / "products.jsonl" | |
| CACHE_DIR = ROOT / "data" / "concepts_cache" | |
| OUT = ROOT / "data" / "products_with_concepts.jsonl" | |
| PROMPT = ChatPromptTemplate.from_messages( | |
| [ | |
| ( | |
| "system", | |
| "You are an SHL assessment cataloger. Given the name, test type, and " | |
| "description of one assessment, extract the concrete skills, tools, " | |
| "and topics the test actually measures.\n\n" | |
| "Output STRICT JSON, no prose, no code fences:\n" | |
| "{{\n" | |
| ' "key_concepts": [<short noun phrase>, ...],\n' | |
| ' "primary_domain": "<technical|behavioral|cognitive|language|sales|admin|leadership|managerial|other>",\n' | |
| ' "use_cases": [<one-line situation when this test is appropriate>, ...]\n' | |
| "}}\n\n" | |
| "Rules:\n" | |
| " • 4–10 key_concepts. Be specific: 'Java OOP', not 'programming'.\n" | |
| " • Drop legal / compliance / advertising boilerplate.\n" | |
| " • Strip duplicates and near-synonyms.\n" | |
| " • use_cases: 1–3 items, each describing a concrete recruiter need." | |
| ), | |
| ( | |
| "human", | |
| "Name: {name}\n" | |
| "Test type: {test_type}\n" | |
| "Description:\n{description}\n\n" | |
| "Return the JSON now." | |
| ), | |
| ] | |
| ) | |
| _JSON_RE = re.compile(r"\{.*\}", re.S) | |
| def _slug(url: str) -> str: | |
| m = re.search(r"/product-catalog/view/([^/?#]+)/?", url, re.I) | |
| return m.group(1).lower() if m else "" | |
| def _cache_path(slug: str) -> Path: | |
| return CACHE_DIR / f"{slug}.json" | |
| def _parse(raw: str) -> dict | None: | |
| m = _JSON_RE.search(raw or "") | |
| if not m: | |
| return None | |
| try: | |
| obj = json.loads(m.group(0)) | |
| except json.JSONDecodeError: | |
| return None | |
| out = {} | |
| if isinstance(obj.get("key_concepts"), list): | |
| out["key_concepts"] = [ | |
| str(x).strip() for x in obj["key_concepts"] if str(x).strip() | |
| ][:12] | |
| if isinstance(obj.get("primary_domain"), str): | |
| out["primary_domain"] = obj["primary_domain"].strip().lower() | |
| if isinstance(obj.get("use_cases"), list): | |
| out["use_cases"] = [ | |
| str(x).strip() for x in obj["use_cases"] if str(x).strip() | |
| ][:3] | |
| if not out.get("key_concepts"): | |
| return None | |
| out.setdefault("primary_domain", "other") | |
| out.setdefault("use_cases", []) | |
| return out | |
| def _process_one(chain, p: dict) -> tuple[str, str, dict | None]: | |
| """Run one product through the chain. Returns (slug, raw_or_err, parsed).""" | |
| slug = _slug(p["url"]) | |
| if not slug: | |
| return ("", "no-slug", None) | |
| try: | |
| raw = chain.invoke( | |
| { | |
| "name": p.get("name", ""), | |
| "test_type": ", ".join(p.get("test_type") or []), | |
| "description": (p.get("description") or "")[:2000], | |
| }, | |
| config={"callbacks": callbacks()}, | |
| ) | |
| except Exception as e: | |
| return (slug, f"{type(e).__name__}: {str(e)[:120]}", None) | |
| parsed = _parse(raw) | |
| return (slug, raw, parsed) | |
| def main() -> None: | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--workers", type=int, default=20, | |
| help="parallel LLM calls (OpenAI handles this fine; lower for free Gemini)") | |
| ap.add_argument("--limit", type=int, default=0, | |
| help="stop after N products (0 = all)") | |
| args = ap.parse_args() | |
| CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| products = [ | |
| json.loads(l) for l in PRODUCTS.read_text(encoding="utf-8").splitlines() if l.strip() | |
| ] | |
| # Split into already-cached vs to-fetch | |
| todo: list[dict] = [] | |
| cached_count = 0 | |
| for p in products: | |
| slug = _slug(p["url"]) | |
| if slug and _cache_path(slug).exists(): | |
| cached_count += 1 | |
| else: | |
| todo.append(p) | |
| if args.limit: | |
| todo = todo[: args.limit] | |
| print(f"products: {len(products)} | cached: {cached_count} | to fetch: {len(todo)} " | |
| f"| workers: {args.workers}") | |
| chain = PROMPT | get_llm() | StrOutputParser() | |
| fetched = 0 | |
| failed = 0 | |
| with ThreadPoolExecutor(max_workers=args.workers) as ex: | |
| futs = {ex.submit(_process_one, chain, p): p for p in todo} | |
| bar = tqdm(as_completed(futs), total=len(futs), desc="extract") | |
| for fut in bar: | |
| slug, raw_or_err, parsed = fut.result() | |
| if not slug: | |
| failed += 1 | |
| continue | |
| if parsed is None: | |
| tqdm.write(f" FAIL {slug}: {raw_or_err[:80]}") | |
| failed += 1 | |
| continue | |
| _cache_path(slug).write_text(json.dumps(parsed, ensure_ascii=False, indent=2)) | |
| fetched += 1 | |
| bar.set_postfix(fetched=fetched, failed=failed) | |
| extracted = cached_count + fetched | |
| print(f"\nfetched: {fetched} | from cache: {cached_count} | failed: {failed}") | |
| print(f"merging {extracted}/{len(products)} into {OUT}") | |
| # Merge cache into single output file | |
| with OUT.open("w", encoding="utf-8") as f: | |
| for p in products: | |
| slug = _slug(p["url"]) | |
| if not slug: | |
| continue | |
| cache = _cache_path(slug) | |
| if cache.exists(): | |
| try: | |
| extra = json.loads(cache.read_text(encoding="utf-8")) | |
| p = {**p, **extra} | |
| except Exception: | |
| pass | |
| f.write(json.dumps(p, ensure_ascii=False) + "\n") | |
| print(f"wrote {OUT}") | |
| if __name__ == "__main__": | |
| main() | |