|
|
import os
|
|
|
import json
|
|
|
import numpy as np
|
|
|
import faiss
|
|
|
import google.generativeai as genai
|
|
|
from typing import List, Dict, Any, Tuple
|
|
|
|
|
|
|
|
|
API_KEY = os.getenv("GOOGLE_API_KEY")
|
|
|
if not API_KEY:
|
|
|
raise ValueError("Error: Set GOOGLE_API_KEY environment variable before running.")
|
|
|
genai.configure(api_key=API_KEY)
|
|
|
|
|
|
|
|
|
DATA_DIR = "data"
|
|
|
PROFILE_EMBEDDINGS = os.path.join(DATA_DIR, "embeddings_profiles.jsonl")
|
|
|
JOB_EMBEDDINGS = os.path.join(DATA_DIR, "embeddings_jobs.jsonl")
|
|
|
|
|
|
|
|
|
_profile_data = None
|
|
|
_job_data = None
|
|
|
_profile_index = None
|
|
|
_job_index = None
|
|
|
|
|
|
def load_embeddings_data(file_path: str) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Load embeddings data from JSONL file.
|
|
|
|
|
|
Args:
|
|
|
file_path: Path to the JSONL file containing embeddings
|
|
|
|
|
|
Returns:
|
|
|
List of dictionaries containing id, text, and embedding
|
|
|
"""
|
|
|
if not os.path.exists(file_path):
|
|
|
print(f"Warning: Embeddings file not found: {file_path}")
|
|
|
return []
|
|
|
|
|
|
data = []
|
|
|
try:
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
|
for line_num, line in enumerate(f, 1):
|
|
|
try:
|
|
|
record = json.loads(line.strip())
|
|
|
if 'embedding' in record and 'text' in record and 'id' in record:
|
|
|
data.append(record)
|
|
|
else:
|
|
|
print(f"Warning: Missing required fields in line {line_num} of {file_path}")
|
|
|
except json.JSONDecodeError as e:
|
|
|
print(f"Warning: JSON decode error in line {line_num} of {file_path}: {e}")
|
|
|
continue
|
|
|
except Exception as e:
|
|
|
print(f"Error reading {file_path}: {e}")
|
|
|
return []
|
|
|
|
|
|
return data
|
|
|
|
|
|
def build_faiss_index(embeddings: List[List[float]]) -> faiss.Index:
|
|
|
"""
|
|
|
Build a FAISS index from embeddings.
|
|
|
|
|
|
Args:
|
|
|
embeddings: List of embedding vectors
|
|
|
|
|
|
Returns:
|
|
|
FAISS index for similarity search
|
|
|
"""
|
|
|
if not embeddings:
|
|
|
return None
|
|
|
|
|
|
|
|
|
embedding_matrix = np.array(embeddings, dtype=np.float32)
|
|
|
|
|
|
|
|
|
dimension = embedding_matrix.shape[1]
|
|
|
|
|
|
|
|
|
index = faiss.IndexFlatL2(dimension)
|
|
|
|
|
|
|
|
|
index.add(embedding_matrix)
|
|
|
|
|
|
return index
|
|
|
|
|
|
def get_query_embedding(query: str, model: str = "models/text-embedding-004") -> List[float]:
|
|
|
"""
|
|
|
Get embedding for a query string using Gemini.
|
|
|
|
|
|
Args:
|
|
|
query: Query string to embed
|
|
|
model: Embedding model to use
|
|
|
|
|
|
Returns:
|
|
|
Embedding vector as list of floats
|
|
|
"""
|
|
|
try:
|
|
|
response = genai.embed_content(
|
|
|
model=model,
|
|
|
content=query,
|
|
|
task_type="retrieval_query"
|
|
|
)
|
|
|
return response['embedding']
|
|
|
except Exception as e:
|
|
|
print(f"Error getting query embedding: {e}")
|
|
|
|
|
|
return [0.0] * 768
|
|
|
|
|
|
def initialize_profile_data():
|
|
|
"""Initialize profile data and FAISS index if not already loaded."""
|
|
|
global _profile_data, _profile_index
|
|
|
|
|
|
if _profile_data is None:
|
|
|
print("Loading profile embeddings...")
|
|
|
_profile_data = load_embeddings_data(PROFILE_EMBEDDINGS)
|
|
|
|
|
|
if _profile_data:
|
|
|
embeddings = [record['embedding'] for record in _profile_data]
|
|
|
_profile_index = build_faiss_index(embeddings)
|
|
|
print(f"Loaded {len(_profile_data)} profile embeddings")
|
|
|
else:
|
|
|
print("No profile embeddings found")
|
|
|
_profile_index = None
|
|
|
|
|
|
def initialize_job_data():
|
|
|
"""Initialize job data and FAISS index if not already loaded."""
|
|
|
global _job_data, _job_index
|
|
|
|
|
|
if _job_data is None:
|
|
|
print("Loading job embeddings...")
|
|
|
_job_data = load_embeddings_data(JOB_EMBEDDINGS)
|
|
|
|
|
|
if _job_data:
|
|
|
embeddings = [record['embedding'] for record in _job_data]
|
|
|
_job_index = build_faiss_index(embeddings)
|
|
|
print(f"Loaded {len(_job_data)} job embeddings")
|
|
|
else:
|
|
|
print("No job embeddings found")
|
|
|
_job_index = None
|
|
|
|
|
|
def retrieve_profiles(query: str, top_k: int = 5) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Retrieve the most similar profiles based on a query.
|
|
|
|
|
|
Args:
|
|
|
query: Search query string
|
|
|
top_k: Number of top results to return
|
|
|
|
|
|
Returns:
|
|
|
List of dictionaries containing profile information
|
|
|
"""
|
|
|
|
|
|
initialize_profile_data()
|
|
|
|
|
|
if not _profile_data or _profile_index is None:
|
|
|
print("No profile data available for search")
|
|
|
return []
|
|
|
|
|
|
|
|
|
query_embedding = get_query_embedding(query)
|
|
|
if not query_embedding:
|
|
|
return []
|
|
|
|
|
|
|
|
|
query_vector = np.array([query_embedding], dtype=np.float32)
|
|
|
|
|
|
try:
|
|
|
|
|
|
distances, indices = _profile_index.search(query_vector, min(top_k, len(_profile_data)))
|
|
|
|
|
|
|
|
|
results = []
|
|
|
for i, idx in enumerate(indices[0]):
|
|
|
if idx < len(_profile_data):
|
|
|
profile = _profile_data[idx].copy()
|
|
|
profile['similarity_score'] = float(distances[0][i])
|
|
|
results.append(profile)
|
|
|
|
|
|
return results
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error during profile search: {e}")
|
|
|
return []
|
|
|
|
|
|
def retrieve_jobs(query: str, top_k: int = 5) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Retrieve the most similar job listings based on a query.
|
|
|
|
|
|
Args:
|
|
|
query: Search query string
|
|
|
top_k: Number of top results to return
|
|
|
|
|
|
Returns:
|
|
|
List of dictionaries containing job information
|
|
|
"""
|
|
|
|
|
|
initialize_job_data()
|
|
|
|
|
|
if not _job_data or _job_index is None:
|
|
|
print("No job data available for search")
|
|
|
return []
|
|
|
|
|
|
|
|
|
query_embedding = get_query_embedding(query)
|
|
|
if not query_embedding:
|
|
|
return []
|
|
|
|
|
|
|
|
|
query_vector = np.array([query_embedding], dtype=np.float32)
|
|
|
|
|
|
try:
|
|
|
|
|
|
distances, indices = _job_index.search(query_vector, min(top_k, len(_job_data)))
|
|
|
|
|
|
|
|
|
results = []
|
|
|
for i, idx in enumerate(indices[0]):
|
|
|
if idx < len(_job_data):
|
|
|
job = _job_data[idx].copy()
|
|
|
job['similarity_score'] = float(distances[0][i])
|
|
|
results.append(job)
|
|
|
|
|
|
return results
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error during job search: {e}")
|
|
|
return []
|
|
|
|
|
|
def search_profiles_by_keywords(keywords: List[str], top_k: int = 5) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Search profiles using keyword matching as a fallback method.
|
|
|
|
|
|
Args:
|
|
|
keywords: List of keywords to search for
|
|
|
top_k: Number of top results to return
|
|
|
|
|
|
Returns:
|
|
|
List of matching profiles
|
|
|
"""
|
|
|
initialize_profile_data()
|
|
|
|
|
|
if not _profile_data:
|
|
|
return []
|
|
|
|
|
|
results = []
|
|
|
keywords_lower = [kw.lower() for kw in keywords]
|
|
|
|
|
|
for profile in _profile_data:
|
|
|
text_lower = profile['text'].lower()
|
|
|
score = sum(1 for kw in keywords_lower if kw in text_lower)
|
|
|
|
|
|
if score > 0:
|
|
|
profile_copy = profile.copy()
|
|
|
profile_copy['keyword_score'] = score
|
|
|
results.append(profile_copy)
|
|
|
|
|
|
|
|
|
results.sort(key=lambda x: x['keyword_score'], reverse=True)
|
|
|
|
|
|
return results[:top_k]
|
|
|
|
|
|
def search_jobs_by_keywords(keywords: List[str], top_k: int = 5) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
Search jobs using keyword matching as a fallback method.
|
|
|
|
|
|
Args:
|
|
|
keywords: List of keywords to search for
|
|
|
top_k: Number of top results to return
|
|
|
|
|
|
Returns:
|
|
|
List of matching jobs
|
|
|
"""
|
|
|
initialize_job_data()
|
|
|
|
|
|
if not _job_data:
|
|
|
return []
|
|
|
|
|
|
results = []
|
|
|
keywords_lower = [kw.lower() for kw in keywords]
|
|
|
|
|
|
for job in _job_data:
|
|
|
text_lower = job['text'].lower()
|
|
|
score = sum(1 for kw in keywords_lower if kw in text_lower)
|
|
|
|
|
|
if score > 0:
|
|
|
job_copy = job.copy()
|
|
|
job_copy['keyword_score'] = score
|
|
|
results.append(job_copy)
|
|
|
|
|
|
|
|
|
results.sort(key=lambda x: x['keyword_score'], reverse=True)
|
|
|
|
|
|
return results[:top_k]
|
|
|
|
|
|
def get_stats():
|
|
|
"""Get statistics about loaded data."""
|
|
|
initialize_profile_data()
|
|
|
initialize_job_data()
|
|
|
|
|
|
profile_count = len(_profile_data) if _profile_data else 0
|
|
|
job_count = len(_job_data) if _job_data else 0
|
|
|
|
|
|
return {
|
|
|
"profiles_loaded": profile_count,
|
|
|
"jobs_loaded": job_count,
|
|
|
"profile_index_ready": _profile_index is not None,
|
|
|
"job_index_ready": _job_index is not None
|
|
|
}
|
|
|
|
|
|
|
|
|
def test_profile_search():
|
|
|
"""Test profile search functionality."""
|
|
|
test_queries = [
|
|
|
"Python developer",
|
|
|
"React frontend engineer",
|
|
|
"Data scientist with machine learning",
|
|
|
"Remote backend developer"
|
|
|
]
|
|
|
|
|
|
print("Testing profile search...")
|
|
|
for query in test_queries:
|
|
|
print(f"\nQuery: '{query}'")
|
|
|
results = retrieve_profiles(query, top_k=3)
|
|
|
for i, result in enumerate(results, 1):
|
|
|
print(f" {i}. {result['text'][:100]}...")
|
|
|
|
|
|
def test_job_search():
|
|
|
"""Test job search functionality."""
|
|
|
test_queries = [
|
|
|
"Remote React developer",
|
|
|
"Python backend engineer",
|
|
|
"Data science position",
|
|
|
"Full stack developer"
|
|
|
]
|
|
|
|
|
|
print("Testing job search...")
|
|
|
for query in test_queries:
|
|
|
print(f"\nQuery: '{query}'")
|
|
|
results = retrieve_jobs(query, top_k=3)
|
|
|
for i, result in enumerate(results, 1):
|
|
|
print(f" {i}. {result['text'][:100]}...")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
stats = get_stats()
|
|
|
print("RAG Utils Statistics:")
|
|
|
print(f" Profiles loaded: {stats['profiles_loaded']}")
|
|
|
print(f" Jobs loaded: {stats['jobs_loaded']}")
|
|
|
print(f" Profile index ready: {stats['profile_index_ready']}")
|
|
|
print(f" Job index ready: {stats['job_index_ready']}")
|
|
|
|
|
|
|
|
|
if stats['profiles_loaded'] > 0:
|
|
|
test_profile_search()
|
|
|
|
|
|
if stats['jobs_loaded'] > 0:
|
|
|
test_job_search() |