Spaces:
Sleeping
Sleeping
| # Streamlit stuff | |
| import streamlit as st | |
| from streamlit_lottie import st_lottie | |
| # Misc | |
| import pandas as pd | |
| from io import StringIO | |
| import zipfile | |
| import io | |
| from reportlab.pdfgen import canvas | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.pdfgen import canvas | |
| import os | |
| from dotenv import load_dotenv | |
| import PyPDF2 | |
| import re | |
| from uuid import uuid4 | |
| # Type hinting | |
| from typing import List | |
| # Langchain | |
| # from langchain.schema import Document | |
| from langchain_core.documents import Document | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import Pinecone; from pinecone import Pinecone as PC | |
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_openai import OpenAI | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain.chains import LLMChain | |
| from langchain.prompts import PromptTemplate | |
| def summarize_chain(): | |
| """ | |
| Returns a LangChain LLMChain for summarizing resumes. | |
| """ | |
| resume_summary_prompt = PromptTemplate( | |
| input_variables=["text"], | |
| template=""" | |
| You are an AI expert in summarizing resumes. Given the following resume text: | |
| {text} | |
| Please provide a concise summary of the candidate's qualifications, experience, and skills in no more than 5 sentences. | |
| """ | |
| ) | |
| llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7, max_tokens=200) | |
| summarization_chain = LLMChain(llm=llm, prompt=resume_summary_prompt) | |
| return summarization_chain | |
| def check_missing_keys(): | |
| """ | |
| Checks if necessary environment variables are set for the application to work. | |
| Raises a KeyError if any required keys are missing. | |
| """ | |
| try: | |
| st.session_state.KEYS_ARE_MISSING = False | |
| keys = { | |
| "OpenAI": os.getenv("OPENAI_API_KEY"), | |
| "Pinecone": os.getenv("PINECONE_API_KEY") | |
| } | |
| missing_keys = [] | |
| for key, value in keys.items(): | |
| if not value: | |
| missing_keys.append(key) | |
| if len(missing_keys) > 0: | |
| st.session_state.KEYS_ARE_MISSING = True | |
| raise KeyError(f"Missing required API keys: {', '.join(missing_keys)}") | |
| except KeyError as e: | |
| st.error(e) | |
| def file_uploader() -> List[Document]: | |
| """ | |
| Initializes a file uploader component to the sidebar | |
| Returns: None or a list of Document() objects | |
| """ | |
| # FIXME: Duplicate documents can be uploaded | |
| uploaded_files = st.sidebar.file_uploader( | |
| "Upload resumes for screening (PDF)", | |
| accept_multiple_files=True, | |
| type=['pdf'] | |
| ) | |
| splits = [] | |
| text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder( | |
| chunk_size=300, | |
| chunk_overlap=50 | |
| ) | |
| for uploaded_file in uploaded_files: | |
| pdf_reader = PyPDF2.PdfReader(uploaded_file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| doc = Document( | |
| page_content=text, | |
| metadata={"source": uploaded_file.name} | |
| ) | |
| splits.append(doc) | |
| split_docs = text_splitter.split_documents(splits) | |
| return split_docs if len(split_docs) > 0 else None | |
| def pinecone_vector_store(embedding: str, index: str) -> Pinecone: | |
| """ | |
| Initializes a Pinecone vector store | |
| Params: | |
| embedding (str): The name of the OpenAI model for embedding | |
| Returns: Pinecone object or None if an error occurs | |
| """ | |
| try: | |
| embeddings = OpenAIEmbeddings( | |
| model=embedding | |
| ) | |
| vector_store = Pinecone.from_existing_index( | |
| index_name=index, | |
| embedding=embeddings | |
| ) | |
| return vector_store | |
| except Exception as e: | |
| st.error(f"Error connecting to Pinecone: {e}") | |
| return None | |
| def upload_button(documents: List[Document], vector_store: Pinecone): | |
| """ | |
| Initializes a button to upload files to the vector store | |
| Params: | |
| documents (list of Document()): The documents to be uploaded | |
| vector_store (Pinecone): The vector store to which the documents will be uploaded | |
| """ | |
| class EmptyUpload(Exception): | |
| """ | |
| A custom exception for when an an upload component | |
| is empty. | |
| """ | |
| def __init__(self, message = 'No files were uploaded'): | |
| self.message = message | |
| super().__init__(self.message) | |
| if st.sidebar.button( | |
| 'Store to the Database', | |
| key='vector-store', | |
| disabled=st.session_state.KEYS_ARE_MISSING | |
| ): | |
| try: | |
| if documents is None: | |
| raise EmptyUpload | |
| uuids = [str(uuid4()) for _ in range(len(documents))] | |
| vector_store.add_documents(documents=documents, ids=uuids) | |
| except EmptyUpload as e: | |
| st.sidebar.error(f"Error storing documents: {e}") | |
| except Exception as e: | |
| st.sidebar.error(f"An error in function upload_button has occurred: {e}") | |
| else: | |
| st.sidebar.success('Documents stored successfully!') | |
| def match_resumes( | |
| job_description: str, | |
| k: int, | |
| vector_store: Pinecone, | |
| summarization_chain: LLMChain | |
| ): | |
| class EmptyText(Exception): | |
| """ | |
| A custom exception for when an input component | |
| like st.text_area is empty. | |
| """ | |
| def __init__(self, message = 'Text area cannot be empty'): | |
| self.message = message | |
| super().__init__(self.message) | |
| class NoResults(Exception): | |
| """ | |
| A custom exception for when there no results retrieved. | |
| """ | |
| def __init__(self, message = 'No matched results found'): | |
| self.message = message | |
| super().__init__(self.message) | |
| try: | |
| with st.status("Fetching matching resumes...") as status: | |
| if job_description == '': | |
| status.update( | |
| label="Failed", | |
| state="error", | |
| expanded=False | |
| ) | |
| raise EmptyText | |
| if 'VECTOR_SCORE' not in st.session_state: | |
| st.session_state.VECTOR_SCORE = None | |
| st.session_state.VECTOR_SCORE = vector_store.similarity_search_with_relevance_scores(job_description, k=k) | |
| vector_score = st.session_state.VECTOR_SCORE #variable alias | |
| if len(vector_score) == 0: | |
| status.update( | |
| label="Failed", | |
| state="error", | |
| expanded=False | |
| ) | |
| raise NoResults | |
| status.update( | |
| label="Found matches...", | |
| state="running", | |
| expanded=False | |
| ) | |
| tab1, tab2 = st.tabs(["Summary", "Detailed View"]) | |
| with tab1: | |
| st.write(f""" | |
| **Matched resumes:** {len(vector_score)} | |
| **Highest score:** {round(max([x[1] for x in vector_score]) * 100, 2)}% | |
| **Lowest score:** {round(min([x[1] for x in vector_score]) * 100, 2)}% | |
| \n\n\n | |
| ### **Quick Overview** | |
| --- | |
| """) | |
| status.update( | |
| label="Summarizing results...", | |
| state="running", | |
| expanded=False | |
| ) | |
| for i in range(0, len(vector_score)): | |
| doc, score = vector_score[i] | |
| resume = re.search(r'([^/]+\.pdf)$', doc.metadata['source']) | |
| st.write(f"#### **Match Number:** {i+1}") | |
| st.write(f"**Resume:** {resume.group(1)}" + \ | |
| f"\n\n**Relevance:** {round(float(score) * 100, 2)}%") | |
| st.write(f"**AI Generated Summary:**\n\n{summarization_chain.run(text=doc.page_content)}") | |
| # st.write(f"**AI Generated Summary:**\n\n{doc.page_content[0:50]}") # debugging and testing purposes | |
| st.write("---") | |
| status.update( | |
| label="Done", | |
| state="complete", | |
| expanded=True | |
| ) | |
| with tab2: | |
| for i in range(0, len(vector_score)): | |
| doc, score = vector_score[i] | |
| resume = re.search(r'([^/]+\.pdf)$', doc.metadata['source']) | |
| st.write(f"#### **Match Number:** {i+1}") | |
| st.write(f"**Resume:** {resume.group(1)}" ) | |
| st.write(f"**Relevance:** {round(float(score) * 100, 2)}%") | |
| st.write(f"**Content:**\n\n{doc.page_content}") | |
| st.write("---") | |
| except EmptyText as e: | |
| st.error(e) | |
| except NoResults as e: | |
| st.error(e) | |
| except Exception as e: | |
| st.error(f"An error in function match_resumes has occurred: {e}") | |
| # Function to generate a PDF file | |
| def create_pdf(content): | |
| try: | |
| pdf_buffer = io.BytesIO() | |
| c = canvas.Canvas(pdf_buffer, pagesize=letter) | |
| width, height = letter # Get page dimensions | |
| # Define text wrapping parameters | |
| x_margin = 50 # Left margin | |
| y_margin = 750 # Starting y-position | |
| line_height = 15 # Line spacing | |
| max_width = width - 2 * x_margin # Text area width | |
| # Split content into lines that fit within the max_width | |
| from reportlab.pdfbase.pdfmetrics import stringWidth | |
| words = content.split() | |
| current_line = "" | |
| y_position = y_margin | |
| for word in words: | |
| # Check if adding the next word exceeds max_width | |
| if stringWidth(current_line + " " + word, "Helvetica", 12) <= max_width: | |
| current_line += " " + word | |
| else: | |
| # Draw the current line and reset for the next | |
| c.drawString(x_margin, y_position, current_line.strip()) | |
| y_position -= line_height # Move to the next line | |
| current_line = word | |
| # Check if we're running out of space on the page | |
| if y_position < 50: # Bottom margin | |
| c.showPage() # Start a new page | |
| y_position = y_margin # Reset y-position | |
| # Draw the last line | |
| if current_line: | |
| c.drawString(x_margin, y_position, current_line.strip()) | |
| c.save() | |
| pdf_buffer.seek(0) | |
| return pdf_buffer.getvalue() | |
| except Exception as e: | |
| st.error(f"Error creating PDF: {e}") | |
| return None | |
| # Function to create a zip file with multiple PDFs | |
| def create_zip_with_pdfs(pdf_data, create_pdf): | |
| try: | |
| zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: | |
| for file_name, content in pdf_data.items(): | |
| # Generate PDF for each file | |
| pdf_bytes = create_pdf(content) | |
| # Write the PDF into the zip file | |
| zf.writestr(file_name + ".pdf", pdf_bytes) | |
| zip_buffer.seek(0) | |
| return zip_buffer | |
| except Exception as e: | |
| st.error(f"Error creating ZIP: {e}") | |
| return None | |
| def pdfs_dict(vector_score): | |
| files = dict() | |
| for i in range(0, len(vector_score)): | |
| doc, score = vector_score[i] | |
| resume = re.search(r'([^/]+\.pdf)$', doc.metadata['source']) | |
| files[f"{resume.group(1)}"] = doc.page_content | |
| return files |