Semantic_Search_CVs / semantic_search.py
rbbist's picture
Update semantic_search.py
422f3cc verified
import os
import tempfile
import chromadb
from chromadb.utils import embedding_functions
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict, Tuple
import logging
import re
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import requests
import pdfplumber
from io import BytesIO
from pdf2image import convert_from_bytes
import pytesseract
import PyPDF2
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CVSemanticSearch:
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
"""
Initialize the CV Semantic Search system
Args:
model_name: Name of the sentence transformer model to use
"""
self.model_name = model_name
self.model = SentenceTransformer(model_name)
# Initialize ChromaDB client (in-memory)
self.chroma_client = chromadb.Client()
# Create collection
self.collection_name = "cv_collection"
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=model_name
)
self.collection = self.chroma_client.create_collection(
name=self.collection_name,
embedding_function=sentence_transformer_ef
)
logger.info(f"Created new collection: {self.collection_name}")
def list_all_pdfs_in_folder(self, folder_id: str, api_key: str) -> List[Dict]:
"""
List all PDF files in a Google Drive folder using Drive API
Args:
folder_id: Google Drive folder ID
api_key: Google Drive API key
Returns:
List of file dictionaries with id, name, webViewLink
"""
try:
# Initialize the Drive API client
service = build("drive", "v3", developerKey=api_key)
# List to store all PDF files
all_files = []
page_token = None
# Query to list all PDF files in the folder
query = f"'{folder_id}' in parents and mimeType='application/pdf'"
while True:
# Make the API request
results = (
service.files()
.list(
q=query,
fields="nextPageToken, files(id, name, webViewLink)",
pageSize=100,
pageToken=page_token
)
.execute()
)
# Add files from this page to the list
files = results.get("files", [])
all_files.extend(files)
# Check for next page token
page_token = results.get("nextPageToken")
if not page_token:
break # No more pages to fetch
logger.info(f"Found {len(all_files)} PDF files in Google Drive folder")
return all_files
except HttpError as error:
logger.error(f"Google Drive API error: {error}")
return []
except Exception as e:
logger.error(f"Error listing PDFs from folder: {str(e)}")
return []
def extract_text_from_drive_pdf(self, file_id: str, filename: str) -> str:
"""
Download and extract text from a Google Drive PDF with OCR fallback
Args:
file_id: Google Drive file ID
filename: Name of the PDF file
Returns:
Extracted text
"""
try:
logger.info(f"Downloading and reading PDF: {filename}")
url = f"https://drive.google.com/uc?export=download&id={file_id}"
response = requests.get(url)
if response.status_code != 200:
logger.error(f"Failed to download {filename}: HTTP {response.status_code}")
return ""
text = ""
# First try with pdfplumber
try:
with pdfplumber.open(BytesIO(response.content)) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
if text.strip():
logger.info(f"Successfully extracted text using pdfplumber from {filename}")
return text
except Exception as e:
logger.warning(f"pdfplumber failed for {filename}: {e}")
# If no text found, try OCR fallback
if not text.strip():
logger.info(f"Falling back to OCR for {filename}")
try:
images = convert_from_bytes(response.content)
for img in images:
ocr_text = pytesseract.image_to_string(img)
text += ocr_text + "\n"
if text.strip():
logger.info(f"Successfully extracted text using OCR from {filename}")
return text
except Exception as ocr_error:
logger.error(f"OCR also failed for {filename}: {ocr_error}")
# Final fallback to PyPDF2
if not text.strip():
logger.info(f"Trying PyPDF2 as final fallback for {filename}")
try:
pdf_reader = PyPDF2.PdfReader(BytesIO(response.content))
for page in pdf_reader.pages:
text += page.extract_text() + "\n"
except Exception as pypdf_error:
logger.error(f"PyPDF2 also failed for {filename}: {pypdf_error}")
return text.strip()
except Exception as e:
logger.error(f"Error extracting text from {filename} (ID: {file_id}): {str(e)}")
return ""
def extract_text_from_pdf_bytes(self, pdf_bytes: bytes) -> str:
"""
Extract text from PDF bytes (for uploaded JD PDFs)
Args:
pdf_bytes: PDF file content as bytes
Returns:
Extracted text as string
"""
try:
# First try pdfplumber
with pdfplumber.open(BytesIO(pdf_bytes)) as pdf:
text = ""
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
if text.strip():
return text.strip()
# Fallback to PyPDF2
pdf_stream = io.BytesIO(pdf_bytes)
pdf_reader = PyPDF2.PdfReader(pdf_stream)
text = ""
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
text += page.extract_text() + "\n"
return text.strip()
except Exception as e:
logger.error(f"Error extracting text from PDF bytes: {str(e)}")
return ""
def preprocess_text(self, text: str) -> str:
"""
Clean and preprocess extracted text
Args:
text: Raw text from PDF
Returns:
Cleaned text
"""
# Remove extra whitespace and normalize
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep important punctuation
text = re.sub(r'[^\w\s\.\,\;\:\-\(\)]', '', text)
# Convert to lowercase for consistency
text = text.lower()
return text.strip()
def chunk_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
"""
Split text into overlapping chunks for better semantic search
Args:
text: Input text
chunk_size: Maximum characters per chunk
overlap: Number of characters to overlap between chunks
Returns:
List of text chunks
"""
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
# Try to break at word boundary
if end < len(text):
# Find the last space within the chunk
last_space = text.rfind(' ', start, end)
if last_space > start:
end = last_space
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
start = end - overlap
return chunks
def add_cv_text_to_database(self, text: str, filename: str) -> bool:
"""
Add a CV text to the vector database
Args:
text: CV text content
filename: Name of the CV file
Returns:
True if successful, False otherwise
"""
try:
if not text:
logger.error(f"No text provided for {filename}")
return False
# Preprocess text
clean_text = self.preprocess_text(text)
if not clean_text:
logger.error(f"No text after preprocessing for {filename}")
return False
# Create chunks for better semantic search
chunks = self.chunk_text(clean_text)
# Add chunks to ChromaDB
for i, chunk in enumerate(chunks):
chunk_id = f"{filename}_chunk_{i}"
self.collection.add(
documents=[chunk],
ids=[chunk_id],
metadatas=[{
"filename": filename,
"chunk_index": i,
"total_chunks": len(chunks)
}]
)
logger.info(f"Successfully added {filename} with {len(chunks)} chunks to database")
return True
except Exception as e:
logger.error(f"Error adding CV {filename}: {str(e)}")
return False
def load_cvs_from_google_drive(self, folder_id: str, api_key: str) -> Tuple[int, int, Dict]:
"""
Load all CVs from a Google Drive folder into the database
Args:
folder_id: Google Drive folder ID
api_key: Google Drive API key
Returns:
Tuple of (successful_uploads, total_files, file_mapping)
"""
logger.info("Starting to load CVs from Google Drive...")
# Get all PDF files from the folder
pdf_files = self.list_all_pdfs_in_folder(folder_id, api_key)
if not pdf_files:
logger.error("No PDF files found in Google Drive folder")
return 0, 0, {}
successful = 0
total = len(pdf_files)
file_mapping = {} # Map filename to file info for links
for i, file_info in enumerate(pdf_files, 1):
file_id = file_info['id']
filename = file_info['name']
# Store file mapping for later use
file_mapping[filename] = {
'id': file_id,
'name': filename,
'webViewLink': file_info.get('webViewLink', f"https://drive.google.com/file/d/{file_id}/view")
}
logger.info(f"Processing CV {i}/{total}: {filename}")
# Download and extract text
text = self.extract_text_from_drive_pdf(file_id, filename)
if text and text.strip():
# Add to database
if self.add_cv_text_to_database(text, filename):
successful += 1
else:
logger.error(f"Failed to add {filename} to database")
else:
logger.error(f"No text extracted from {filename}")
# Store file mapping for access by search function
self.file_mapping = file_mapping
logger.info(f"Completed loading CVs: {successful}/{total} successful")
return successful, total, file_mapping
def search_cvs(self, job_description: str, top_k: int = 5) -> List[Dict]:
"""
Search for CVs matching the job description
Args:
job_description: Job description text to match against
top_k: Number of top CVs to return
Returns:
List of dictionaries containing CV information and scores
"""
try:
# Preprocess job description
clean_jd = self.preprocess_text(job_description)
if not clean_jd:
logger.error("Empty job description after preprocessing")
return []
# Search in ChromaDB
results = self.collection.query(
query_texts=[clean_jd],
n_results=min(top_k * 3, 50) # Get more results to aggregate by CV
)
if not results['documents'][0]:
return []
# Aggregate results by CV filename
cv_scores = {}
for i, (doc, metadata, distance) in enumerate(zip(
results['documents'][0],
results['metadatas'][0],
results['distances'][0]
)):
filename = metadata['filename']
# Convert distance to similarity score (lower distance = higher similarity)
similarity = max(0, 1 - distance) # Ensure non-negative
if filename not in cv_scores:
cv_scores[filename] = {
'filename': filename,
'max_similarity': similarity,
'avg_similarity': similarity,
'chunk_count': 1,
'best_match_text': doc[:200] + "..." if len(doc) > 200 else doc,
'total_similarity': similarity
}
else:
cv_scores[filename]['total_similarity'] += similarity
cv_scores[filename]['chunk_count'] += 1
cv_scores[filename]['avg_similarity'] = (
cv_scores[filename]['total_similarity'] / cv_scores[filename]['chunk_count']
)
# Update max similarity and best match if this chunk is better
if similarity > cv_scores[filename]['max_similarity']:
cv_scores[filename]['max_similarity'] = similarity
cv_scores[filename]['best_match_text'] = doc[:200] + "..." if len(doc) > 200 else doc
# Sort by weighted score (combination of max and average similarity)
cv_list = list(cv_scores.values())
for cv in cv_list:
cv['weighted_score'] = (cv['max_similarity'] * 0.7 + cv['avg_similarity'] * 0.3)
cv_list.sort(key=lambda x: x['weighted_score'], reverse=True)
return cv_list[:top_k]
except Exception as e:
logger.error(f"Error searching CVs: {str(e)}")
return []
def get_database_info(self) -> Dict:
"""
Get information about the current database
Returns:
Dictionary with database statistics
"""
try:
count = self.collection.count()
# Get unique filenames
if count > 0:
results = self.collection.get()
filenames = set(metadata['filename'] for metadata in results['metadatas'])
unique_cvs = len(filenames)
else:
unique_cvs = 0
filenames = set()
return {
'total_chunks': count,
'unique_cvs': unique_cvs,
'cv_filenames': list(filenames)
}
except Exception as e:
logger.error(f"Error getting database info: {str(e)}")
return {'total_chunks': 0, 'unique_cvs': 0, 'cv_filenames': []}