shlproj / vecdb.py
levanell's picture
Upload 5 files
b3d9840 verified
"""
vecdb.py — 4-layer hybrid retrieval with scored ranking.
Changes from original:
1. FAISS k raised from 50 → 150 (catalog ~377; old k missed items that drift semantically)
2. Lexical search now hits BOTH name AND description columns
3. New SQL layers for job_level and test_category that hard-filter the DB
4. All candidates scored before the context string is built so the synthesizer
sees a ranked top-N, not a flat dump of 50+ items
5. Context string now includes scores and job_levels to help the synthesizer choose
"""
import json
import sqlite3
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
EMBEDDING_MODEL = "all-MiniLM-L6-v2"
# ==========================================
# PART 1: Build Databases (Run Once)
# ==========================================
def build_databases(json_filepath="dataset.json"):
with open(json_filepath, 'r') as f:
data = json.load(f)
conn = sqlite3.connect("shl_catalog.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS assessments (
entity_id TEXT PRIMARY KEY,
name TEXT,
link TEXT,
description TEXT,
job_levels TEXT,
keys TEXT
)
""")
texts = []
metadatas = []
str_ids = []
for item in data:
e_id = str(item["entity_id"])
cursor.execute("""
INSERT OR REPLACE INTO assessments
(entity_id, name, link, description, job_levels, keys)
VALUES (?, ?, ?, ?, ?, ?)
""", (
e_id,
item['name'],
item['link'],
item['description'],
",".join(item.get('job_levels', [])),
",".join(item.get('keys', []))
))
# Embed name + description + job_levels + keys so FAISS sees all metadata
embed_text = (
f"Name: {item['name']}\n"
f"Description: {item['description']}\n"
f"Job Levels: {','.join(item.get('job_levels', []))}\n"
f"Category: {','.join(item.get('keys', []))}"
)
texts.append(embed_text)
metadatas.append({
"entity_id": e_id,
"name": item["name"],
"link": item["link"],
"test_type": item.get("keys", [""])[0] if item.get("keys") else "",
"job_levels": ",".join(item.get('job_levels', [])),
})
str_ids.append(e_id)
conn.commit()
conn.close()
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
vector_store = FAISS.from_texts(
texts=texts,
embedding=embeddings,
metadatas=metadatas,
ids=str_ids
)
vector_store.save_local("faiss_index")
print("Databases built successfully!")
# ==========================================
# PART 2: Scored Candidate Accumulator
# ==========================================
def _accumulate(candidates: dict, name: str, row: tuple, score_delta: float):
"""
candidates is a dict keyed by assessment name.
Each value: {"name", "link", "test_type", "job_levels", "score"}
Score is additive — the same item can be found by multiple layers.
"""
if name not in candidates:
candidates[name] = {
"name": row[0],
"link": row[2],
"test_type": row[3],
"job_levels": row[4] if len(row) > 4 else "",
"score": 0.0,
}
candidates[name]["score"] += score_delta
# ==========================================
# PART 3: The Search Execution Engine
# ==========================================
def execute_rag_search(extracted_filters):
"""
4-layer retrieval:
Layer 1 — Keyword exact match on name (score +5 per hit)
Layer 2 — Keyword match on description (score +3 per hit)
Layer 3 — SQL filter on job_level (score +2)
Layer 4 — SQL filter on test_category (score +2)
Layer 5 — FAISS semantic search k=150 (score = 1 / (rank+1), max +1)
Mandatory flagships injected at the end if not already present.
"""
conn = sqlite3.connect("shl_catalog.db")
cursor = conn.cursor()
candidates: dict = {} # name → candidate dict
# ------------------------------------------------------------------
# Layer 1 & 2: Keyword lexical search (name + description)
# ------------------------------------------------------------------
if extracted_filters.exact_keywords:
for kw in extracted_filters.exact_keywords:
# Name match — high signal
cursor.execute("""
SELECT name, description, link, keys, job_levels
FROM assessments
WHERE name LIKE ?
""", (f"%{kw}%",))
for row in cursor.fetchall():
_accumulate(candidates, row[0], row, score_delta=5.0)
# Description match — medium signal (catches items where
# keyword only appears in the body, e.g. "Linux Programming (General)")
cursor.execute("""
SELECT name, description, link, keys, job_levels
FROM assessments
WHERE name NOT LIKE ?
AND description LIKE ?
""", (f"%{kw}%", f"%{kw}%"))
for row in cursor.fetchall():
_accumulate(candidates, row[0], row, score_delta=3.0)
# ------------------------------------------------------------------
# Layer 3: Job-level SQL filter
# Previously this was only appended to the FAISS query string, so it
# never enforced hard SQL filtering. Now it pulls real DB matches.
# ------------------------------------------------------------------
if extracted_filters.job_level:
cursor.execute("""
SELECT name, description, link, keys, job_levels
FROM assessments
WHERE job_levels LIKE ?
""", (f"%{extracted_filters.job_level}%",))
for row in cursor.fetchall():
_accumulate(candidates, row[0], row, score_delta=2.0)
# ------------------------------------------------------------------
# Layer 4: Test-category SQL filter
# "keys" column stores the category (e.g. "Knowledge & Skills")
# ------------------------------------------------------------------
if extracted_filters.test_category:
cursor.execute("""
SELECT name, description, link, keys, job_levels
FROM assessments
WHERE keys LIKE ?
""", (f"%{extracted_filters.test_category}%",))
for row in cursor.fetchall():
_accumulate(candidates, row[0], row, score_delta=2.0)
conn.close()
# ------------------------------------------------------------------
# Layer 5: FAISS semantic search
# k=150 (~40% of catalog) vs old k=50 (~13%).
# Items that drift semantically (e.g. "Global Skills Assessment" for
# a broad skills query) now have room to surface.
# ------------------------------------------------------------------
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
vector_store = FAISS.load_local(
"faiss_index", embeddings, allow_dangerous_deserialization=True
)
# Build a rich composite query
query_parts = [extracted_filters.semantic_query]
if extracted_filters.job_level:
query_parts.append(f"Job Level: {extracted_filters.job_level}")
if extracted_filters.test_category:
query_parts.append(f"Category: {extracted_filters.test_category}")
if extracted_filters.exact_keywords:
query_parts.append("Tools: " + ", ".join(extracted_filters.exact_keywords))
rich_query = " | ".join(query_parts)
faiss_results = vector_store.similarity_search(query=rich_query, k=150)
for rank, res in enumerate(faiss_results):
name = res.metadata["name"]
# Reciprocal rank score: rank 0 → +1.0, rank 149 → +0.0067
rr_score = 1.0 / (rank + 1)
# Build a stub row compatible with _accumulate
stub = (
name,
"", # description not needed after retrieval
res.metadata["link"],
res.metadata.get("test_type", ""),
res.metadata.get("job_levels", ""),
)
_accumulate(candidates, name, stub, score_delta=rr_score)
# ------------------------------------------------------------------
# Flagship injection (OPQ32r + Verify G+)
# Always included; given a modest baseline score so they don't
# outrank genuinely specific matches.
# ------------------------------------------------------------------
conn = sqlite3.connect("shl_catalog.db")
cursor = conn.cursor()
cursor.execute("""
SELECT name, description, link, keys, job_levels
FROM assessments
WHERE name = 'Occupational Personality Questionnaire OPQ32r'
OR name = 'SHL Verify Interactive G+'
""")
for row in cursor.fetchall():
# Score 1.5 — present but won't outrank a perfect keyword match
_accumulate(candidates, row[0], row, score_delta=1.5)
conn.close()
if not candidates:
return "No assessments found matching those constraints."
# ------------------------------------------------------------------
# Sort by score and build context string (top 25 to keep prompt lean)
# ------------------------------------------------------------------
ranked = sorted(candidates.values(), key=lambda x: x["score"], reverse=True)
top_candidates = ranked[:25]
context_lines = ["RANKED CANDIDATE ASSESSMENTS (higher score = better match):"]
context_lines.append(
f"{'Rank':<5} {'Score':<7} {'Name':<55} {'Type':<30} {'Job Levels'}"
)
context_lines.append("-" * 130)
for i, c in enumerate(top_candidates, 1):
context_lines.append(
f"{i:<5} {c['score']:<7.2f} {c['name']:<55} {c['test_type']:<30} {c['job_levels']}"
)
# Append the URL on a sub-line so synthesizer can pick it up easily
context_lines.append(f" URL: {c['link']}")
return "\n".join(context_lines)
# ==========================================
# PART 4: Comparison Lookup (unchanged logic, minor hardening)
# ==========================================
def execute_comparison_lookup(test_names: list) -> str:
conn = sqlite3.connect("shl_catalog.db")
cursor = conn.cursor()
retrieved_text = "COMPARISON DATA RETRIEVED:\n"
found_any = False
for name in test_names:
cursor.execute("""
SELECT name, description, link, keys
FROM assessments
WHERE name LIKE ? LIMIT 1
""", (f"%{name}%",))
row = cursor.fetchone()
if row:
found_any = True
retrieved_text += (
f"- Name: {row[0]}\n"
f" Type: {row[3]}\n"
f" URL: {row[2]}\n"
f" Description: {row[1]}\n\n"
)
conn.close()
if not found_any:
return "NO DATA RETRIEVED. Could not find those specific tests in the catalog."
return retrieved_text
if __name__ == "__main__":
build_databases()