|
|
""" |
|
|
Query rewriting / understanding utilities. |
|
|
|
|
|
Goal: |
|
|
- Produce two strings per user input: |
|
|
* retrieval_query: short, keyword-heavy, good for BM25 + embedding retrievers. |
|
|
* rerank_query: full intent/constraints (close to original) for cross-encoder rerank. |
|
|
|
|
|
Design: pure rule-based (Phase 1, no LLM). Easily swappable later. |
|
|
""" |
|
|
from __future__ import annotations |
|
|
|
|
|
import re |
|
|
import json |
|
|
from dataclasses import dataclass, asdict |
|
|
from typing import List, Optional, Dict, Set |
|
|
from collections import Counter |
|
|
|
|
|
try: |
|
|
from nltk.stem import PorterStemmer |
|
|
from nltk.tokenize import wordpunct_tokenize |
|
|
_NLTK_AVAILABLE = True |
|
|
_STEMMER = PorterStemmer() |
|
|
except Exception: |
|
|
_NLTK_AVAILABLE = False |
|
|
_STEMMER = None |
|
|
|
|
|
|
|
|
TECH_SKILLS = { |
|
|
"java", |
|
|
"python", |
|
|
"c++", |
|
|
"c#", |
|
|
".net", |
|
|
"dotnet", |
|
|
"sql", |
|
|
"mysql", |
|
|
"postgres", |
|
|
"react", |
|
|
"node", |
|
|
"javascript", |
|
|
"typescript", |
|
|
"spring", |
|
|
"django", |
|
|
"flask", |
|
|
"aws", |
|
|
"azure", |
|
|
"gcp", |
|
|
"kubernetes", |
|
|
"docker", |
|
|
"html", |
|
|
"css", |
|
|
"jquery", |
|
|
"selenium", |
|
|
"angular", |
|
|
"vue", |
|
|
"ruby", |
|
|
"rails", |
|
|
"php", |
|
|
"laravel", |
|
|
"terraform", |
|
|
"ansible", |
|
|
"jenkins", |
|
|
"ci", |
|
|
"cd", |
|
|
"cicd", |
|
|
"ml", |
|
|
"ai", |
|
|
"data", |
|
|
"science", |
|
|
} |
|
|
|
|
|
SOFT_SKILLS = { |
|
|
"communication", |
|
|
"collaboration", |
|
|
"collaborate", |
|
|
"collaborative", |
|
|
"teamwork", |
|
|
"team", |
|
|
"stakeholder", |
|
|
"leadership", |
|
|
"analytical", |
|
|
"problem solving", |
|
|
"analytical thinking", |
|
|
"people management", |
|
|
"customer", |
|
|
"client", |
|
|
"communication", |
|
|
"negotiation", |
|
|
"presentation", |
|
|
"adaptability", |
|
|
"emotional", |
|
|
"conflict", |
|
|
} |
|
|
|
|
|
JOB_LEVEL_HINTS = { |
|
|
"entry": ["entry", "graduate", "junior"], |
|
|
"mid": ["mid", "mid-level", "midlevel"], |
|
|
"senior": ["senior", "sr", "lead"], |
|
|
"manager": ["manager", "management", "leadership"], |
|
|
} |
|
|
|
|
|
ROLE_HINTS = { |
|
|
"developer", |
|
|
"dev", |
|
|
"engineer", |
|
|
"manager", |
|
|
"analyst", |
|
|
"writer", |
|
|
"content", |
|
|
"designer", |
|
|
"consultant", |
|
|
"architect", |
|
|
"lead", |
|
|
"sales", |
|
|
} |
|
|
|
|
|
ROLE_PHRASES = [ |
|
|
"java developer", |
|
|
"software engineer", |
|
|
"software developer", |
|
|
"content writer", |
|
|
"technical writer", |
|
|
"product manager", |
|
|
"project manager", |
|
|
"data analyst", |
|
|
"data scientist", |
|
|
"business analyst", |
|
|
] |
|
|
|
|
|
|
|
|
INTENT_CANONICAL_MAP = { |
|
|
"culture fit": ["personality", "behavioral", "values", "situational judgement"], |
|
|
"cultural fit": ["personality", "behavioral", "values", "situational judgement"], |
|
|
"leadership": ["leadership", "management", "executive"], |
|
|
"coo": ["executive", "leadership", "management"], |
|
|
"chief operating officer": ["executive", "leadership", "management"], |
|
|
"content writer": ["english comprehension", "verbal ability", "writing"], |
|
|
"seo": ["verbal reasoning", "english comprehension", "writing"], |
|
|
"culture fit": ["personality", "behavioral", "values", "situational judgement"], |
|
|
"collaborate": ["communication", "teamwork", "interpersonal communications"], |
|
|
"collaboration": ["communication", "teamwork", "interpersonal communications"], |
|
|
"communication": ["communication", "interpersonal communications"], |
|
|
"business team": ["communication", "teamwork", "interpersonal communications"], |
|
|
} |
|
|
|
|
|
STOPWORDS = { |
|
|
"the", |
|
|
"and", |
|
|
"or", |
|
|
"for", |
|
|
"to", |
|
|
"with", |
|
|
"of", |
|
|
"a", |
|
|
"an", |
|
|
"in", |
|
|
"on", |
|
|
"at", |
|
|
"by", |
|
|
"is", |
|
|
"are", |
|
|
"be", |
|
|
"that", |
|
|
"this", |
|
|
"these", |
|
|
"those", |
|
|
"as", |
|
|
"from", |
|
|
"we", |
|
|
"our", |
|
|
"their", |
|
|
"your", |
|
|
"i", |
|
|
"you", |
|
|
"they", |
|
|
"he", |
|
|
"she", |
|
|
"it", |
|
|
"was", |
|
|
"were", |
|
|
"will", |
|
|
"can", |
|
|
"could", |
|
|
"should", |
|
|
"would", |
|
|
"who", |
|
|
|
|
|
"want", |
|
|
"hiring", |
|
|
"hire", |
|
|
"new", |
|
|
"role", |
|
|
"company", |
|
|
"compani", |
|
|
"my", |
|
|
"budget", |
|
|
"option", |
|
|
"options", |
|
|
"give", |
|
|
"some", |
|
|
"about", |
|
|
"each", |
|
|
"test", |
|
|
"tests", |
|
|
} |
|
|
|
|
|
SYNONYMS = { |
|
|
"js": "javascript", |
|
|
"ts": "typescript", |
|
|
"k8s": "kubernetes", |
|
|
"db": "database", |
|
|
"ml": "machine learning", |
|
|
"ai": "artificial intelligence", |
|
|
"comm": "communication", |
|
|
} |
|
|
|
|
|
MISSPELLINGS = { |
|
|
"pythn": "python", |
|
|
"javscript": "javascript", |
|
|
"dockr": "docker", |
|
|
"kubernets": "kubernetes", |
|
|
} |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DurationConstraint: |
|
|
mode: str |
|
|
minutes: int |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ParsedConstraints: |
|
|
duration: Optional[DurationConstraint] |
|
|
job_levels: List[str] |
|
|
languages: List[str] |
|
|
experience: Optional[str] |
|
|
flags: Dict[str, Optional[bool]] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class QueryRewrite: |
|
|
retrieval_query: str |
|
|
rerank_query: str |
|
|
intent: str |
|
|
must_have_skills: List[str] |
|
|
soft_skills: List[str] |
|
|
role_terms: List[str] |
|
|
negated_skills: List[str] |
|
|
constraints: ParsedConstraints |
|
|
llm_debug: Optional[dict] = None |
|
|
|
|
|
def to_dict(self): |
|
|
d = asdict(self) |
|
|
|
|
|
if self.constraints and self.constraints.duration: |
|
|
d["constraints"]["duration"] = asdict(self.constraints.duration) |
|
|
if self.llm_debug is not None: |
|
|
d["llm_debug"] = self.llm_debug |
|
|
return d |
|
|
|
|
|
|
|
|
def _normalize(text: str) -> str: |
|
|
return re.sub(r"\s+", " ", text).strip() |
|
|
|
|
|
|
|
|
def _lower(text: str) -> str: |
|
|
return _normalize(text.lower()) |
|
|
|
|
|
|
|
|
def parse_duration(text: str) -> Optional[DurationConstraint]: |
|
|
t = text.lower() |
|
|
if "about an hour" in t or "around an hour" in t or re.search(r"\ban hour\b", t): |
|
|
return DurationConstraint(mode="TARGET", minutes=60) |
|
|
if "half hour" in t: |
|
|
return DurationConstraint(mode="TARGET", minutes=30) |
|
|
|
|
|
m = re.search(r"(\d+(?:\.\d+)?)\s*(hour|hours|hr|hrs)", t) |
|
|
if m: |
|
|
minutes = int(round(float(m.group(1)) * 60)) |
|
|
return DurationConstraint(mode="TARGET", minutes=minutes) |
|
|
|
|
|
m = re.search(r"(\d{1,3})\s*(minute|min|minutes|mins)", t) |
|
|
if m: |
|
|
minutes = int(m.group(1)) |
|
|
|
|
|
if re.search(r"(at most|within|under|<=|less than)", t): |
|
|
return DurationConstraint(mode="MAX", minutes=minutes) |
|
|
return DurationConstraint(mode="TARGET", minutes=minutes) |
|
|
return None |
|
|
|
|
|
|
|
|
def parse_flags(text: str) -> Dict[str, Optional[bool]]: |
|
|
t = text.lower() |
|
|
remote = True if re.search(r"\bremote\b", t) else None |
|
|
adaptive = True if re.search(r"\badaptive\b|\birt\b", t) else None |
|
|
return {"remote": remote, "adaptive": adaptive} |
|
|
|
|
|
|
|
|
def parse_languages(text: str) -> List[str]: |
|
|
LANGS = { |
|
|
"english": "English", |
|
|
"spanish": "Spanish", |
|
|
"french": "French", |
|
|
"german": "German", |
|
|
"mandarin": "Mandarin", |
|
|
} |
|
|
langs = [] |
|
|
t = text.lower() |
|
|
for key, val in LANGS.items(): |
|
|
if key in t: |
|
|
langs.append(val) |
|
|
return langs |
|
|
|
|
|
|
|
|
def parse_experience(text: str) -> Optional[str]: |
|
|
t = text.lower() |
|
|
m = re.search(r"(\d{1,2})(?:\s*-\s*(\d{1,2}))?\s*(year|years|yr|yrs)", t) |
|
|
if m: |
|
|
low = int(m.group(1)) |
|
|
high = int(m.group(2)) if m.group(2) else low + 2 |
|
|
return f"{low}-{high} years" |
|
|
if "fresher" in t or "0 years" in t or "entry-level" in t: |
|
|
return "0-2 years" |
|
|
return None |
|
|
|
|
|
|
|
|
def parse_job_levels(text: str) -> List[str]: |
|
|
out = set() |
|
|
t = text.lower() |
|
|
for lvl, patterns in JOB_LEVEL_HINTS.items(): |
|
|
for p in patterns: |
|
|
if re.search(rf"\b{re.escape(p)}\b", t): |
|
|
out.add(lvl.title()) |
|
|
return sorted(out) |
|
|
|
|
|
|
|
|
def tokenize(text: str) -> List[str]: |
|
|
def simple_stem(w: str) -> str: |
|
|
if w.endswith("ing") and len(w) > 4: |
|
|
return w[:-3] |
|
|
if w.endswith("s") and len(w) > 3: |
|
|
return w[:-1] |
|
|
return w |
|
|
|
|
|
if _NLTK_AVAILABLE: |
|
|
raw_tokens = wordpunct_tokenize(text.lower()) |
|
|
else: |
|
|
raw_tokens = re.findall(r"[a-zA-Z0-9\+#\.]+", text.lower()) |
|
|
|
|
|
toks = [MISSPELLINGS.get(t, SYNONYMS.get(t, t)) for t in raw_tokens] |
|
|
|
|
|
if _NLTK_AVAILABLE and _STEMMER is not None: |
|
|
toks = [_STEMMER.stem(t) for t in toks] |
|
|
else: |
|
|
toks = [simple_stem(t) for t in toks] |
|
|
|
|
|
return [t for t in toks if t and t not in STOPWORDS] |
|
|
|
|
|
|
|
|
def extract_phrases(tokens: List[str], max_phrases: int = 5) -> List[str]: |
|
|
"""Return a few informative bigrams/trigrams preserved as phrases.""" |
|
|
phrases: List[str] = [] |
|
|
n = len(tokens) |
|
|
SIGNAL = set(TECH_SKILLS) | set(ROLE_HINTS) | set(SOFT_SKILLS) | { |
|
|
"graduate", |
|
|
"junior", |
|
|
"entry", |
|
|
"senior", |
|
|
"manager", |
|
|
"leadership", |
|
|
"culture", |
|
|
"values", |
|
|
"personality", |
|
|
"behavior", |
|
|
"behaviour", |
|
|
"sales", |
|
|
"marketing", |
|
|
} |
|
|
for size in (3, 2): |
|
|
for i in range(n - size + 1): |
|
|
gram = tokens[i : i + size] |
|
|
|
|
|
if not any(g in SIGNAL for g in gram): |
|
|
continue |
|
|
|
|
|
if all(t in STOPWORDS for t in gram): |
|
|
continue |
|
|
if sum(len(g) <= 2 for g in gram) >= 2: |
|
|
continue |
|
|
phrase = " ".join(gram) |
|
|
if phrase not in phrases: |
|
|
phrases.append(phrase) |
|
|
if len(phrases) >= max_phrases: |
|
|
return phrases |
|
|
return phrases[:max_phrases] |
|
|
|
|
|
|
|
|
def extract_skills(tokens: List[str]) -> (Set[str], Set[str], Set[str]): |
|
|
toks_join = " ".join(tokens) |
|
|
must = set() |
|
|
soft = set() |
|
|
negated = set() |
|
|
for skill in TECH_SKILLS: |
|
|
if re.search(rf"\b{re.escape(skill)}\b", toks_join): |
|
|
must.add(skill) |
|
|
for s in SOFT_SKILLS: |
|
|
if re.search(rf"\b{re.escape(s)}\b", toks_join): |
|
|
soft.add(s) |
|
|
for i, tok in enumerate(tokens): |
|
|
if tok in TECH_SKILLS and i > 0 and tokens[i - 1] in {"no", "without", "exclude", "not"}: |
|
|
negated.add(tok) |
|
|
must.discard(tok) |
|
|
return must, soft, negated |
|
|
|
|
|
|
|
|
def top_keywords(tokens: List[str], k: int = 15) -> List[str]: |
|
|
cnt = Counter(tokens) |
|
|
return [w for w, _ in cnt.most_common(k)] |
|
|
|
|
|
|
|
|
def classify_intent(tokens: List[str]) -> str: |
|
|
tset = set(tokens) |
|
|
tech_hit = any(tok in TECH_SKILLS for tok in tset) |
|
|
behav_hit = any(tok in {"communication", "collaboration", "teamwork", "stakeholder", "leadership", "personality", "values", "culture", "cultural", "fit", "behavioral", "behavioural"} or tok.startswith("sales") for tok in tset) |
|
|
if tech_hit and behav_hit: |
|
|
return "MIXED" |
|
|
if tech_hit: |
|
|
return "TECH" |
|
|
if behav_hit: |
|
|
return "BEHAVIORAL" |
|
|
return "UNKNOWN" |
|
|
|
|
|
|
|
|
LOCATION_TOKENS = { |
|
|
"china", |
|
|
"india", |
|
|
"usa", |
|
|
"uk", |
|
|
"europe", |
|
|
"us", |
|
|
"canada", |
|
|
"germany", |
|
|
"france", |
|
|
} |
|
|
|
|
|
|
|
|
def strip_locations(tokens: List[str]) -> List[str]: |
|
|
return [t for t in tokens if t not in LOCATION_TOKENS] |
|
|
|
|
|
|
|
|
def intent_canonical_terms(text_lower: str) -> List[str]: |
|
|
terms: List[str] = [] |
|
|
for phrase, mapped in INTENT_CANONICAL_MAP.items(): |
|
|
if phrase in text_lower: |
|
|
terms.extend(mapped) |
|
|
|
|
|
out = [] |
|
|
seen = set() |
|
|
for t in terms: |
|
|
if t not in seen: |
|
|
out.append(t) |
|
|
seen.add(t) |
|
|
return out |
|
|
|
|
|
|
|
|
def build_retrieval_query( |
|
|
role_terms: List[str], |
|
|
must_skills: List[str], |
|
|
soft_skills: List[str], |
|
|
constraints: ParsedConstraints, |
|
|
extra_terms: List[str], |
|
|
phrases: Optional[List[str]] = None, |
|
|
canonical_terms: Optional[List[str]] = None, |
|
|
) -> str: |
|
|
parts = [] |
|
|
if phrases: |
|
|
|
|
|
|
|
|
|
|
|
for p in phrases: |
|
|
p_norm = p.strip() |
|
|
if not p_norm: |
|
|
continue |
|
|
parts.append(p_norm.replace(" ", "_")) |
|
|
parts.append(p_norm) |
|
|
parts.extend(role_terms) |
|
|
parts.extend(must_skills) |
|
|
|
|
|
parts.extend(soft_skills) |
|
|
parts.extend(extra_terms) |
|
|
if canonical_terms: |
|
|
parts.extend(canonical_terms) |
|
|
if constraints.experience: |
|
|
parts.append(constraints.experience) |
|
|
if constraints.duration: |
|
|
if constraints.duration.mode == "MAX": |
|
|
parts.append("duration under") |
|
|
parts.append(f"{constraints.duration.minutes} minutes") |
|
|
if constraints.languages: |
|
|
parts.extend([l.lower() for l in constraints.languages]) |
|
|
|
|
|
def _is_stop(tok: str) -> bool: |
|
|
if not tok: |
|
|
return True |
|
|
base = tok.replace("_", " ") |
|
|
base_parts = base.split() |
|
|
|
|
|
return all(p in STOPWORDS for p in base_parts) |
|
|
|
|
|
deduped = [] |
|
|
seen = set() |
|
|
for p in parts: |
|
|
if p and p not in seen and not _is_stop(p): |
|
|
deduped.append(p) |
|
|
seen.add(p) |
|
|
query = _normalize(" ".join(deduped)) |
|
|
|
|
|
toks = query.split() |
|
|
if len(toks) > 40: |
|
|
query = " ".join(toks[:40]) |
|
|
return query |
|
|
|
|
|
|
|
|
def _boost_from_vocab(tokens: Set[str], vocab: Dict[str, List[str]], intent: str, max_terms: int = 5) -> List[str]: |
|
|
out: List[str] = [] |
|
|
if not vocab: |
|
|
return out |
|
|
if intent in ("TECH", "MIXED") and "technical" in vocab: |
|
|
for w in vocab["technical"]: |
|
|
if w in tokens and w not in out: |
|
|
out.append(w) |
|
|
if len(out) >= max_terms: |
|
|
return out |
|
|
if intent in ("BEHAVIORAL", "MIXED") and "behavioral" in vocab: |
|
|
for w in vocab["behavioral"]: |
|
|
if w in tokens and w not in out: |
|
|
out.append(w) |
|
|
if len(out) >= max_terms: |
|
|
return out |
|
|
if "roles" in vocab: |
|
|
for w in vocab["roles"]: |
|
|
if w in tokens and w not in out: |
|
|
out.append(w) |
|
|
if len(out) >= max_terms: |
|
|
return out |
|
|
return out[:max_terms] |
|
|
|
|
|
|
|
|
def _extract_json_text(raw): |
|
|
|
|
|
if isinstance(raw, dict): |
|
|
for k in ("clean_output", "output", "text"): |
|
|
v = raw.get(k) |
|
|
if isinstance(v, str) and v.strip(): |
|
|
return v |
|
|
|
|
|
v = raw.get("raw_output") |
|
|
if isinstance(v, str) and v.strip(): |
|
|
return v |
|
|
|
|
|
if "retrieval_query" in raw or "rerank_query" in raw: |
|
|
return json.dumps(raw) |
|
|
return "" |
|
|
|
|
|
|
|
|
if isinstance(raw, str): |
|
|
return raw |
|
|
if raw is None: |
|
|
return "" |
|
|
|
|
|
return str(raw) |
|
|
|
|
|
def _coerce_json(s: str) -> dict: |
|
|
s = (s or "").strip() |
|
|
if not s: |
|
|
raise ValueError("LLM returned empty output") |
|
|
|
|
|
|
|
|
if "```" in s: |
|
|
s = s.replace("```json", "").replace("```", "").strip() |
|
|
|
|
|
|
|
|
a, b = s.find("{"), s.rfind("}") |
|
|
if a != -1 and b != -1 and b > a: |
|
|
s = s[a:b+1] |
|
|
|
|
|
return json.loads(s) |
|
|
|
|
|
|
|
|
def _rewrite_with_llm(raw_text: str, catalog_vocab: Optional[Dict[str, List[str]]], llm_extractor) -> Optional[QueryRewrite]: |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
LLM_SCHEMA = json.dumps( |
|
|
{ |
|
|
"retrieval_query": "concise keyword-heavy string", |
|
|
"rerank_query": "full query text (keep intent/constraints)", |
|
|
"intent": "one of TECH, BEHAVIORAL, MIXED, UNKNOWN", |
|
|
"must_have_skills": ["string"], |
|
|
"soft_skills": ["string"], |
|
|
"role_terms": ["string"], |
|
|
"negated_skills": ["string"], |
|
|
"constraints": { |
|
|
"duration": {"mode": "one of MAX, TARGET, or null", "minutes": "integer or null"}, |
|
|
"job_levels": ["string"], |
|
|
"languages": ["string"], |
|
|
"experience": "string or null", |
|
|
"flags": {"remote": "boolean or null", "adaptive": "boolean or null"}, |
|
|
}, |
|
|
}, |
|
|
indent=2, |
|
|
) |
|
|
|
|
|
|
|
|
def _rewrite_with_llm(raw_text: str, catalog_vocab: Optional[Dict[str, List[str]]], llm_extractor) -> Optional[QueryRewrite]: |
|
|
""" |
|
|
Preferred LLM rewrite using Qwen (or other local LLM). Returns None on failure so the |
|
|
caller can fall back to deterministic rewrite. |
|
|
""" |
|
|
try: |
|
|
raw = llm_extractor.predict(text=raw_text, schema=LLM_SCHEMA, return_full=True) |
|
|
if "_extract_json_text" in globals(): |
|
|
raw_json = _extract_json_text(raw) |
|
|
else: |
|
|
raw_json = raw.get("clean_output") if isinstance(raw, dict) else raw |
|
|
data = _coerce_json(raw_json) if "_coerce_json" in globals() else json.loads(raw_json) |
|
|
|
|
|
dur = data.get("constraints", {}).get("duration") or {} |
|
|
duration_obj = None |
|
|
duration_error = None |
|
|
if dur.get("minutes") is not None: |
|
|
mode = dur.get("mode") or "TARGET" |
|
|
try: |
|
|
duration_obj = DurationConstraint(mode=mode, minutes=int(float(dur["minutes"]))) |
|
|
except Exception as e: |
|
|
duration_error = f"duration_parse_error: {e}" |
|
|
duration_obj = None |
|
|
|
|
|
constraints = ParsedConstraints( |
|
|
duration=duration_obj, |
|
|
job_levels=data.get("constraints", {}).get("job_levels") or [], |
|
|
languages=data.get("constraints", {}).get("languages") or [], |
|
|
experience=data.get("constraints", {}).get("experience"), |
|
|
flags=data.get("constraints", {}).get("flags") or {"remote": None, "adaptive": None}, |
|
|
) |
|
|
|
|
|
intent_raw = data.get("intent") |
|
|
allowed_intents = {"TECH", "BEHAVIORAL", "MIXED", "UNKNOWN"} |
|
|
intent_final = intent_raw if intent_raw in allowed_intents else None |
|
|
if intent_final is None: |
|
|
toks_src = data.get("retrieval_query") or raw_text |
|
|
toks = tokenize(_lower(toks_src)) |
|
|
intent_final = classify_intent(toks) |
|
|
|
|
|
retrieval_q = data.get("retrieval_query") or raw_text |
|
|
rerank_q = data.get("rerank_query") or raw_text |
|
|
placeholder = False |
|
|
if retrieval_q.strip().lower() in {"string", ""} or rerank_q.strip().lower() in {"string", ""}: |
|
|
placeholder = True |
|
|
if intent_raw and "|" in str(intent_raw): |
|
|
placeholder = True |
|
|
|
|
|
rw = QueryRewrite( |
|
|
retrieval_query=retrieval_q, |
|
|
rerank_query=rerank_q, |
|
|
intent=intent_final or "UNKNOWN", |
|
|
must_have_skills=data.get("must_have_skills") or [], |
|
|
soft_skills=data.get("soft_skills") or [], |
|
|
role_terms=data.get("role_terms") or [], |
|
|
negated_skills=data.get("negated_skills") or [], |
|
|
constraints=constraints, |
|
|
) |
|
|
|
|
|
if isinstance(raw, dict): |
|
|
rw.llm_debug = { |
|
|
"prompt": raw.get("prompt"), |
|
|
"raw_output": raw.get("raw_output"), |
|
|
"clean_output": raw_json, |
|
|
"intent_raw": intent_raw, |
|
|
"model": getattr(llm_extractor, "model_name", "llm"), |
|
|
} |
|
|
if duration_error: |
|
|
rw.llm_debug["duration_error"] = duration_error |
|
|
if placeholder: |
|
|
if rw.llm_debug is None: |
|
|
rw.llm_debug = {} |
|
|
rw.llm_debug["error"] = "placeholder_output" |
|
|
return None |
|
|
return rw |
|
|
except Exception: |
|
|
return None |
|
|
"""Try to rewrite via NuExtract (LLM) using a JSON schema; return None on failure.""" |
|
|
schema ={ |
|
|
{ |
|
|
"retrieval_query": "java developer assessment core java collaboration communication 40 minutes", |
|
|
"rerank_query": "Hiring Java dev who can collaborate with business teams. 40 minutes.", |
|
|
"intent": "MIXED", |
|
|
"must_have_skills": ["java"], |
|
|
"soft_skills": ["communication", "collaboration"], |
|
|
"role_terms": ["java developer"], |
|
|
"negated_skills": [], |
|
|
"constraints": { |
|
|
"duration": {"mode": "TARGET", "minutes": 40}, |
|
|
"job_levels": [], |
|
|
"languages": [], |
|
|
"experience": None, |
|
|
"flags": {"remote": None, "adaptive": None}, |
|
|
}, |
|
|
}, |
|
|
{ |
|
|
"retrieval_query": "culture fit leadership personality situational judgement executive assessment 60 minutes", |
|
|
"rerank_query": "Find a 1 hour culture fit assessment for a COO", |
|
|
"intent": "BEHAVIORAL", |
|
|
"must_have_skills": [], |
|
|
"soft_skills": ["leadership", "personality"], |
|
|
"role_terms": ["coo", "executive"], |
|
|
"negated_skills": [], |
|
|
"constraints": { |
|
|
"duration": {"mode": "TARGET", "minutes": 60}, |
|
|
"job_levels": ["manager"], |
|
|
"languages": [], |
|
|
"experience": None, |
|
|
"flags": {"remote": None, "adaptive": None}, |
|
|
}, |
|
|
} |
|
|
} |
|
|
|
|
|
try: |
|
|
raw = llm_extractor.predict(text=raw_text, schema=json.dumps(schema), return_full=True) |
|
|
print("LLM raw type:", type(raw)) |
|
|
print("LLM raw keys:" , list(raw.keys()) if isinstance(raw, dict) else None) |
|
|
print("LLM raw_json head:", repr(raw_json[:80])) |
|
|
|
|
|
raw_json = _extract_json_text(raw) |
|
|
data = _coerce_json(raw_json) |
|
|
dur = data.get("constraints", {}).get("duration") or {} |
|
|
duration_obj = None |
|
|
duration_error = None |
|
|
if dur.get("minutes") is not None: |
|
|
mode = dur.get("mode") or "TARGET" |
|
|
try: |
|
|
minutes_val = float(dur["minutes"]) |
|
|
duration_obj = DurationConstraint(mode=mode, minutes=int(minutes_val)) |
|
|
except Exception as e: |
|
|
duration_obj = None |
|
|
duration_error = f"duration_parse_error: {e}" |
|
|
constraints = ParsedConstraints( |
|
|
duration=duration_obj, |
|
|
job_levels=data.get("constraints", {}).get("job_levels") or [], |
|
|
languages=data.get("constraints", {}).get("languages") or [], |
|
|
experience=data.get("constraints", {}).get("experience"), |
|
|
flags=data.get("constraints", {}).get("flags") or {"remote": None, "adaptive": None}, |
|
|
) |
|
|
intent_raw = data.get("intent") |
|
|
allowed_intents = {"TECH", "BEHAVIORAL", "MIXED", "UNKNOWN"} |
|
|
intent_final = intent_raw if intent_raw in allowed_intents else None |
|
|
|
|
|
|
|
|
retrieval_q = data.get("retrieval_query") or raw_text |
|
|
if intent_final is None or intent_final == "UNKNOWN": |
|
|
|
|
|
toks_src = retrieval_q if isinstance(retrieval_q, str) else raw_text |
|
|
toks = tokenize(_lower(toks_src)) |
|
|
intent_final = classify_intent(toks) |
|
|
|
|
|
rerank_q = data.get("rerank_query") or raw_text |
|
|
|
|
|
placeholder = False |
|
|
if retrieval_q.strip().lower() in {"string", ""} or rerank_q.strip().lower() in {"string", ""}: |
|
|
placeholder = True |
|
|
if intent_raw == "TECH|BEHAVIORAL|MIXED|UNKNOWN": |
|
|
placeholder = True |
|
|
if dur.get("minutes") in ("int|null", "int", "", None) and duration_obj is None: |
|
|
placeholder = True |
|
|
|
|
|
rw = QueryRewrite( |
|
|
retrieval_query=retrieval_q, |
|
|
rerank_query=rerank_q, |
|
|
intent=intent_final or "UNKNOWN", |
|
|
must_have_skills=data.get("must_have_skills") or [], |
|
|
soft_skills=data.get("soft_skills") or [], |
|
|
role_terms=data.get("role_terms") or [], |
|
|
negated_skills=data.get("negated_skills") or [], |
|
|
constraints=constraints, |
|
|
) |
|
|
|
|
|
if isinstance(raw, dict): |
|
|
rw.llm_debug = { |
|
|
"prompt": raw.get("prompt"), |
|
|
"raw_output": raw.get("raw_output"), |
|
|
"clean_output": raw_json, |
|
|
"intent_raw": intent_raw, |
|
|
"model": getattr(llm_extractor, "model_name", "llm"), |
|
|
} |
|
|
if duration_error: |
|
|
rw.llm_debug["duration_error"] = duration_error |
|
|
if placeholder: |
|
|
rw.llm_debug["error"] = "placeholder_output" |
|
|
if placeholder: |
|
|
return None |
|
|
return rw |
|
|
except Exception as e: |
|
|
|
|
|
dummy = QueryRewrite( |
|
|
retrieval_query=raw_text, |
|
|
rerank_query=raw_text, |
|
|
intent="UNKNOWN", |
|
|
must_have_skills=[], |
|
|
soft_skills=[], |
|
|
role_terms=[], |
|
|
negated_skills=[], |
|
|
constraints=ParsedConstraints(duration=None, job_levels=[], languages=[], experience=None, flags={"remote": None, "adaptive": None}), |
|
|
llm_debug={"error": str(e)}, |
|
|
) |
|
|
return dummy |
|
|
|
|
|
|
|
|
def rewrite_query(raw_text: str, catalog_vocab: Optional[Dict[str, List[str]]] = None, llm_extractor=None) -> QueryRewrite: |
|
|
catalog_vocab = catalog_vocab or {} |
|
|
raw_clean = raw_text.strip() |
|
|
low = _lower(raw_text) |
|
|
|
|
|
llm_fail_debug = None |
|
|
if llm_extractor: |
|
|
llm_rw = _rewrite_with_llm(raw_text, catalog_vocab, llm_extractor) |
|
|
if llm_rw and not (llm_rw.llm_debug and llm_rw.llm_debug.get("error")): |
|
|
return llm_rw |
|
|
if llm_rw and llm_rw.llm_debug: |
|
|
llm_fail_debug = llm_rw.llm_debug |
|
|
|
|
|
tokens = tokenize(low) |
|
|
tokens = strip_locations(tokens) |
|
|
|
|
|
duration = parse_duration(raw_text) |
|
|
flags = parse_flags(raw_text) |
|
|
languages = parse_languages(raw_text) |
|
|
experience = parse_experience(raw_text) |
|
|
job_levels = parse_job_levels(raw_text) |
|
|
constraints = ParsedConstraints(duration=duration, job_levels=job_levels, languages=languages, experience=experience, flags=flags) |
|
|
|
|
|
intent = classify_intent(tokens) |
|
|
must_skills, soft_sk, neg_skills = extract_skills(tokens) |
|
|
keywords = top_keywords(tokens, k=25) |
|
|
|
|
|
boost = _boost_from_vocab(set(tokens), catalog_vocab, intent, max_terms=5) |
|
|
|
|
|
|
|
|
role_terms: List[str] = [] |
|
|
|
|
|
for phrase in ROLE_PHRASES: |
|
|
if phrase in low and phrase not in role_terms: |
|
|
role_terms.append(phrase) |
|
|
for tok in keywords: |
|
|
if tok in ROLE_HINTS and tok not in role_terms: |
|
|
role_terms.append(tok) |
|
|
if len(role_terms) >= 5: |
|
|
break |
|
|
if len(role_terms) < 3: |
|
|
for tok in keywords: |
|
|
if tok not in role_terms and tok not in STOPWORDS: |
|
|
role_terms.append(tok) |
|
|
if len(role_terms) >= 5: |
|
|
break |
|
|
phrases = extract_phrases(tokens, max_phrases=5) |
|
|
canonical_terms = intent_canonical_terms(low) |
|
|
retrieval_query = build_retrieval_query( |
|
|
role_terms, |
|
|
sorted(must_skills), |
|
|
sorted(soft_sk), |
|
|
constraints, |
|
|
boost, |
|
|
phrases=phrases, |
|
|
canonical_terms=canonical_terms, |
|
|
) |
|
|
rerank_query = raw_clean |
|
|
|
|
|
return QueryRewrite( |
|
|
retrieval_query=retrieval_query, |
|
|
rerank_query=rerank_query, |
|
|
intent=intent, |
|
|
must_have_skills=sorted(must_skills), |
|
|
soft_skills=sorted(soft_sk), |
|
|
role_terms=role_terms, |
|
|
negated_skills=sorted(neg_skills), |
|
|
constraints=constraints, |
|
|
llm_debug=llm_fail_debug, |
|
|
) |
|
|
|
|
|
|
|
|
def build_catalog_vocab(catalog_texts: List[str], min_len: int = 5, max_terms: int = 200) -> Dict[str, List[str]]: |
|
|
"""Lightweight vocab from catalog, bucketed by intent.""" |
|
|
cnt_tech = Counter() |
|
|
cnt_behav = Counter() |
|
|
cnt_roles = Counter() |
|
|
for txt in catalog_texts: |
|
|
for tok in tokenize(txt.lower()): |
|
|
if len(tok) < min_len: |
|
|
continue |
|
|
if tok in TECH_SKILLS: |
|
|
cnt_tech[tok] += 1 |
|
|
elif tok in SOFT_SKILLS or "behavior" in tok or "culture" in tok: |
|
|
cnt_behav[tok] += 1 |
|
|
elif tok in ROLE_HINTS: |
|
|
cnt_roles[tok] += 1 |
|
|
return { |
|
|
"technical": [w for w, _ in cnt_tech.most_common(max_terms // 3 or 1)], |
|
|
"behavioral": [w for w, _ in cnt_behav.most_common(max_terms // 3 or 1)], |
|
|
"roles": [w for w, _ in cnt_roles.most_common(max_terms // 3 or 1)], |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import argparse |
|
|
import json |
|
|
import sys |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Rewrite a query into retrieval + rerank forms.") |
|
|
parser.add_argument("--query", required=True, help="Raw user query/JD text") |
|
|
parser.add_argument("--catalog", help="Optional catalog JSONL to build vocab (uses doc_text or name/description)") |
|
|
args = parser.parse_args() |
|
|
|
|
|
vocab = {} |
|
|
if args.catalog: |
|
|
import pandas as pd |
|
|
|
|
|
df = pd.read_json(args.catalog, lines=True) |
|
|
if "doc_text" in df.columns: |
|
|
texts = df["doc_text"].astype(str).tolist() |
|
|
else: |
|
|
texts = (df.get("name", "").astype(str) + " " + df.get("description", "").astype(str)).tolist() |
|
|
vocab = build_catalog_vocab(texts) |
|
|
|
|
|
rewrite = rewrite_query(args.query, vocab) |
|
|
json.dump(rewrite.to_dict(), sys.stdout, indent=2) |
|
|
sys.stdout.write("\n") |
|
|
|