Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import chromadb | |
| from chromadb.utils import embedding_functions | |
| from sentence_transformers import SentenceTransformer | |
| import numpy as np | |
| from typing import List, Dict, Tuple | |
| import logging | |
| import re | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| import requests | |
| import pdfplumber | |
| from io import BytesIO | |
| from pdf2image import convert_from_bytes | |
| import pytesseract | |
| import PyPDF2 | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class CVSemanticSearch: | |
| def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): | |
| """ | |
| Initialize the CV Semantic Search system | |
| Args: | |
| model_name: Name of the sentence transformer model to use | |
| """ | |
| self.model_name = model_name | |
| self.model = SentenceTransformer(model_name) | |
| # Initialize ChromaDB client (in-memory) | |
| self.chroma_client = chromadb.Client() | |
| # Create collection | |
| self.collection_name = "cv_collection" | |
| sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction( | |
| model_name=model_name | |
| ) | |
| self.collection = self.chroma_client.create_collection( | |
| name=self.collection_name, | |
| embedding_function=sentence_transformer_ef | |
| ) | |
| logger.info(f"Created new collection: {self.collection_name}") | |
| def list_all_pdfs_in_folder(self, folder_id: str, api_key: str) -> List[Dict]: | |
| """ | |
| List all PDF files in a Google Drive folder using Drive API | |
| Args: | |
| folder_id: Google Drive folder ID | |
| api_key: Google Drive API key | |
| Returns: | |
| List of file dictionaries with id, name, webViewLink | |
| """ | |
| try: | |
| # Initialize the Drive API client | |
| service = build("drive", "v3", developerKey=api_key) | |
| # List to store all PDF files | |
| all_files = [] | |
| page_token = None | |
| # Query to list all PDF files in the folder | |
| query = f"'{folder_id}' in parents and mimeType='application/pdf'" | |
| while True: | |
| # Make the API request | |
| results = ( | |
| service.files() | |
| .list( | |
| q=query, | |
| fields="nextPageToken, files(id, name, webViewLink)", | |
| pageSize=100, | |
| pageToken=page_token | |
| ) | |
| .execute() | |
| ) | |
| # Add files from this page to the list | |
| files = results.get("files", []) | |
| all_files.extend(files) | |
| # Check for next page token | |
| page_token = results.get("nextPageToken") | |
| if not page_token: | |
| break # No more pages to fetch | |
| logger.info(f"Found {len(all_files)} PDF files in Google Drive folder") | |
| return all_files | |
| except HttpError as error: | |
| logger.error(f"Google Drive API error: {error}") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error listing PDFs from folder: {str(e)}") | |
| return [] | |
| def extract_text_from_drive_pdf(self, file_id: str, filename: str) -> str: | |
| """ | |
| Download and extract text from a Google Drive PDF with OCR fallback | |
| Args: | |
| file_id: Google Drive file ID | |
| filename: Name of the PDF file | |
| Returns: | |
| Extracted text | |
| """ | |
| try: | |
| logger.info(f"Downloading and reading PDF: {filename}") | |
| url = f"https://drive.google.com/uc?export=download&id={file_id}" | |
| response = requests.get(url) | |
| if response.status_code != 200: | |
| logger.error(f"Failed to download {filename}: HTTP {response.status_code}") | |
| return "" | |
| text = "" | |
| # First try with pdfplumber | |
| try: | |
| with pdfplumber.open(BytesIO(response.content)) as pdf: | |
| for page in pdf.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| if text.strip(): | |
| logger.info(f"Successfully extracted text using pdfplumber from {filename}") | |
| return text | |
| except Exception as e: | |
| logger.warning(f"pdfplumber failed for {filename}: {e}") | |
| # If no text found, try OCR fallback | |
| if not text.strip(): | |
| logger.info(f"Falling back to OCR for {filename}") | |
| try: | |
| images = convert_from_bytes(response.content) | |
| for img in images: | |
| ocr_text = pytesseract.image_to_string(img) | |
| text += ocr_text + "\n" | |
| if text.strip(): | |
| logger.info(f"Successfully extracted text using OCR from {filename}") | |
| return text | |
| except Exception as ocr_error: | |
| logger.error(f"OCR also failed for {filename}: {ocr_error}") | |
| # Final fallback to PyPDF2 | |
| if not text.strip(): | |
| logger.info(f"Trying PyPDF2 as final fallback for {filename}") | |
| try: | |
| pdf_reader = PyPDF2.PdfReader(BytesIO(response.content)) | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + "\n" | |
| except Exception as pypdf_error: | |
| logger.error(f"PyPDF2 also failed for {filename}: {pypdf_error}") | |
| return text.strip() | |
| except Exception as e: | |
| logger.error(f"Error extracting text from {filename} (ID: {file_id}): {str(e)}") | |
| return "" | |
| def extract_text_from_pdf_bytes(self, pdf_bytes: bytes) -> str: | |
| """ | |
| Extract text from PDF bytes (for uploaded JD PDFs) | |
| Args: | |
| pdf_bytes: PDF file content as bytes | |
| Returns: | |
| Extracted text as string | |
| """ | |
| try: | |
| # First try pdfplumber | |
| with pdfplumber.open(BytesIO(pdf_bytes)) as pdf: | |
| text = "" | |
| for page in pdf.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| if text.strip(): | |
| return text.strip() | |
| # Fallback to PyPDF2 | |
| pdf_stream = io.BytesIO(pdf_bytes) | |
| pdf_reader = PyPDF2.PdfReader(pdf_stream) | |
| text = "" | |
| for page_num in range(len(pdf_reader.pages)): | |
| page = pdf_reader.pages[page_num] | |
| text += page.extract_text() + "\n" | |
| return text.strip() | |
| except Exception as e: | |
| logger.error(f"Error extracting text from PDF bytes: {str(e)}") | |
| return "" | |
| def preprocess_text(self, text: str) -> str: | |
| """ | |
| Clean and preprocess extracted text | |
| Args: | |
| text: Raw text from PDF | |
| Returns: | |
| Cleaned text | |
| """ | |
| # Remove extra whitespace and normalize | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove special characters but keep important punctuation | |
| text = re.sub(r'[^\w\s\.\,\;\:\-\(\)]', '', text) | |
| # Convert to lowercase for consistency | |
| text = text.lower() | |
| return text.strip() | |
| def chunk_text(self, text: str, chunk_size: int = 500, overlap: int = 50) -> List[str]: | |
| """ | |
| Split text into overlapping chunks for better semantic search | |
| Args: | |
| text: Input text | |
| chunk_size: Maximum characters per chunk | |
| overlap: Number of characters to overlap between chunks | |
| Returns: | |
| List of text chunks | |
| """ | |
| if len(text) <= chunk_size: | |
| return [text] | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| # Try to break at word boundary | |
| if end < len(text): | |
| # Find the last space within the chunk | |
| last_space = text.rfind(' ', start, end) | |
| if last_space > start: | |
| end = last_space | |
| chunk = text[start:end].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| start = end - overlap | |
| return chunks | |
| def add_cv_text_to_database(self, text: str, filename: str) -> bool: | |
| """ | |
| Add a CV text to the vector database | |
| Args: | |
| text: CV text content | |
| filename: Name of the CV file | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| if not text: | |
| logger.error(f"No text provided for {filename}") | |
| return False | |
| # Preprocess text | |
| clean_text = self.preprocess_text(text) | |
| if not clean_text: | |
| logger.error(f"No text after preprocessing for {filename}") | |
| return False | |
| # Create chunks for better semantic search | |
| chunks = self.chunk_text(clean_text) | |
| # Add chunks to ChromaDB | |
| for i, chunk in enumerate(chunks): | |
| chunk_id = f"{filename}_chunk_{i}" | |
| self.collection.add( | |
| documents=[chunk], | |
| ids=[chunk_id], | |
| metadatas=[{ | |
| "filename": filename, | |
| "chunk_index": i, | |
| "total_chunks": len(chunks) | |
| }] | |
| ) | |
| logger.info(f"Successfully added {filename} with {len(chunks)} chunks to database") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error adding CV {filename}: {str(e)}") | |
| return False | |
| def load_cvs_from_google_drive(self, folder_id: str, api_key: str) -> Tuple[int, int, Dict]: | |
| """ | |
| Load all CVs from a Google Drive folder into the database | |
| Args: | |
| folder_id: Google Drive folder ID | |
| api_key: Google Drive API key | |
| Returns: | |
| Tuple of (successful_uploads, total_files, file_mapping) | |
| """ | |
| logger.info("Starting to load CVs from Google Drive...") | |
| # Get all PDF files from the folder | |
| pdf_files = self.list_all_pdfs_in_folder(folder_id, api_key) | |
| if not pdf_files: | |
| logger.error("No PDF files found in Google Drive folder") | |
| return 0, 0, {} | |
| successful = 0 | |
| total = len(pdf_files) | |
| file_mapping = {} # Map filename to file info for links | |
| for i, file_info in enumerate(pdf_files, 1): | |
| file_id = file_info['id'] | |
| filename = file_info['name'] | |
| # Store file mapping for later use | |
| file_mapping[filename] = { | |
| 'id': file_id, | |
| 'name': filename, | |
| 'webViewLink': file_info.get('webViewLink', f"https://drive.google.com/file/d/{file_id}/view") | |
| } | |
| logger.info(f"Processing CV {i}/{total}: {filename}") | |
| # Download and extract text | |
| text = self.extract_text_from_drive_pdf(file_id, filename) | |
| if text and text.strip(): | |
| # Add to database | |
| if self.add_cv_text_to_database(text, filename): | |
| successful += 1 | |
| else: | |
| logger.error(f"Failed to add {filename} to database") | |
| else: | |
| logger.error(f"No text extracted from {filename}") | |
| # Store file mapping for access by search function | |
| self.file_mapping = file_mapping | |
| logger.info(f"Completed loading CVs: {successful}/{total} successful") | |
| return successful, total, file_mapping | |
| def search_cvs(self, job_description: str, top_k: int = 5) -> List[Dict]: | |
| """ | |
| Search for CVs matching the job description | |
| Args: | |
| job_description: Job description text to match against | |
| top_k: Number of top CVs to return | |
| Returns: | |
| List of dictionaries containing CV information and scores | |
| """ | |
| try: | |
| # Preprocess job description | |
| clean_jd = self.preprocess_text(job_description) | |
| if not clean_jd: | |
| logger.error("Empty job description after preprocessing") | |
| return [] | |
| # Search in ChromaDB | |
| results = self.collection.query( | |
| query_texts=[clean_jd], | |
| n_results=min(top_k * 3, 50) # Get more results to aggregate by CV | |
| ) | |
| if not results['documents'][0]: | |
| return [] | |
| # Aggregate results by CV filename | |
| cv_scores = {} | |
| for i, (doc, metadata, distance) in enumerate(zip( | |
| results['documents'][0], | |
| results['metadatas'][0], | |
| results['distances'][0] | |
| )): | |
| filename = metadata['filename'] | |
| # Convert distance to similarity score (lower distance = higher similarity) | |
| similarity = max(0, 1 - distance) # Ensure non-negative | |
| if filename not in cv_scores: | |
| cv_scores[filename] = { | |
| 'filename': filename, | |
| 'max_similarity': similarity, | |
| 'avg_similarity': similarity, | |
| 'chunk_count': 1, | |
| 'best_match_text': doc[:200] + "..." if len(doc) > 200 else doc, | |
| 'total_similarity': similarity | |
| } | |
| else: | |
| cv_scores[filename]['total_similarity'] += similarity | |
| cv_scores[filename]['chunk_count'] += 1 | |
| cv_scores[filename]['avg_similarity'] = ( | |
| cv_scores[filename]['total_similarity'] / cv_scores[filename]['chunk_count'] | |
| ) | |
| # Update max similarity and best match if this chunk is better | |
| if similarity > cv_scores[filename]['max_similarity']: | |
| cv_scores[filename]['max_similarity'] = similarity | |
| cv_scores[filename]['best_match_text'] = doc[:200] + "..." if len(doc) > 200 else doc | |
| # Sort by weighted score (combination of max and average similarity) | |
| cv_list = list(cv_scores.values()) | |
| for cv in cv_list: | |
| cv['weighted_score'] = (cv['max_similarity'] * 0.7 + cv['avg_similarity'] * 0.3) | |
| cv_list.sort(key=lambda x: x['weighted_score'], reverse=True) | |
| return cv_list[:top_k] | |
| except Exception as e: | |
| logger.error(f"Error searching CVs: {str(e)}") | |
| return [] | |
| def get_database_info(self) -> Dict: | |
| """ | |
| Get information about the current database | |
| Returns: | |
| Dictionary with database statistics | |
| """ | |
| try: | |
| count = self.collection.count() | |
| # Get unique filenames | |
| if count > 0: | |
| results = self.collection.get() | |
| filenames = set(metadata['filename'] for metadata in results['metadatas']) | |
| unique_cvs = len(filenames) | |
| else: | |
| unique_cvs = 0 | |
| filenames = set() | |
| return { | |
| 'total_chunks': count, | |
| 'unique_cvs': unique_cvs, | |
| 'cv_filenames': list(filenames) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting database info: {str(e)}") | |
| return {'total_chunks': 0, 'unique_cvs': 0, 'cv_filenames': []} |