HR_assistant_ver-3 / ver3_streamlit.py
Tarun-intellentech's picture
Update ver3_streamlit.py
9831972 verified
# --- 1. SQLITE FIX FOR HUGGING FACE ---
__import__('pysqlite3')
import sys
sys.modules['sqlite3'] = sys.modules.pop('pysqlite3')
import os
import json
import re
import gdown
import hashlib
import shutil
from datetime import datetime
from typing import List
import streamlit as st
from huggingface_hub import HfApi, snapshot_download
# LangChain & Mistral
from langchain_mistralai import ChatMistralAI, MistralAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pydantic import BaseModel, Field
# =================================================================
# 2. CONFIG & HF SYNC
# =================================================================
HF_TOKEN = os.environ.get("HF_TOKEN")
REPO_ID = os.environ.get("DATASET_REPO_ID")
DB_PATH = "./hr_scout_db"
MANIFEST_PATH = "project_manifest.json"
def sync_from_hf():
if HF_TOKEN and REPO_ID:
try:
snapshot_download(repo_id=REPO_ID, repo_type="dataset", local_dir=".", token=HF_TOKEN)
return True
except Exception as e:
st.warning(f"Note: Could not sync from Cloud Dataset. {e}")
return False
return False
def sync_to_hf():
if HF_TOKEN and REPO_ID:
try:
api = HfApi()
if os.path.exists(MANIFEST_PATH):
api.upload_file(path_or_fileobj=MANIFEST_PATH, path_in_repo=MANIFEST_PATH, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN)
if os.path.exists(DB_PATH):
api.upload_folder(folder_path=DB_PATH, path_in_repo="hr_scout_db", repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN)
except Exception as e:
st.error(f"Cloud Sync Failed: {e}")
# =================================================================
# 3. SCHEMAS & UTILS
# =================================================================
class ScoredCandidate(BaseModel):
name: str = Field(description="Full name")
phone: str = Field(description="Phone")
email: str = Field(description="Email")
score: int = Field(description="0-100")
review: str = Field(description="Summary")
class SearchIntent(BaseModel):
specific_candidates: List[str] = Field(default_factory=list)
def get_md5(text: str):
return hashlib.md5(text.encode('utf-8')).hexdigest()
def get_project_id(link, jd):
return hashlib.md5(f"{link}{jd}".encode('utf-8')).hexdigest()
# =================================================================
# 4. STREAMLIT UI
# =================================================================
st.set_page_config(page_title="AI Talent Scout", page_icon="🎯", layout="wide")
st.title("🎯 AI Talent Scout")
api_key = os.environ.get("MISTRAL_API_KEY")
if "synced" not in st.session_state:
with st.spinner("πŸ”„ Checking Cloud Storage..."):
sync_from_hf()
st.session_state.synced = True
if "messages" not in st.session_state:
st.session_state.messages = []
with st.sidebar:
st.header("1. Input Sources")
drive_link = st.text_input("G-Drive Folder Link", placeholder="https://drive.google.com/...")
st.write("--- OR ---")
manual_files = st.file_uploader("Upload PDFs manually", type="pdf", accept_multiple_files=True)
st.header("2. Job Details")
job_desc = st.text_area("Job Description", height=200)
process_btn = st.button("πŸš€ Analyze Candidates", use_container_width=True)
if st.button("πŸ—‘οΈ Reset All Data"):
if os.path.exists(DB_PATH): shutil.rmtree(DB_PATH)
if os.path.exists(MANIFEST_PATH): os.remove(MANIFEST_PATH)
st.rerun()
# =================================================================
# 5. FIXED CORE ENGINE
# =================================================================
if process_btn and (drive_link or manual_files) and job_desc:
p_id = get_project_id(drive_link if drive_link else "manual", job_desc)
if os.path.exists(MANIFEST_PATH):
with open(MANIFEST_PATH, "r") as f: projects = json.load(f)
else: projects = {}
# Check if we already have this Project cached
if p_id in projects:
st.success(f"πŸ“¦ Found in Cloud Dataset (Processed: {projects[p_id]['date']})")
st.session_state.name_list = projects[p_id]['names']
st.session_state.summary = projects[p_id]['summary']
else:
with st.status("πŸ” Processing...", expanded=True) as status:
local_resumes = "gdrive_resumes"
if not os.path.exists(local_resumes): os.makedirs(local_resumes)
# --- FIXED GDOWN SECTION ---
if drive_link:
status.write("πŸ“‘ Connecting to Google Drive...")
match = re.search(r'folders/([\w-]+)', drive_link)
if match:
try:
gdown.download_folder(id=match.group(1), output=local_resumes, quiet=True, use_cookies=True)
except Exception as e:
status.warning("⚠️ G-Drive link blocked by Google. Checking local/manual files...")
# Save manually uploaded files
if manual_files:
for uploaded_file in manual_files:
with open(os.path.join(local_resumes, uploaded_file.name), "wb") as f:
f.write(uploaded_file.getbuffer())
# --- AI ANALYSIS ---
try:
embeddings = MistralAIEmbeddings(mistral_api_key=api_key)
vs = Chroma(persist_directory=DB_PATH, embedding_function=embeddings)
llm = ChatMistralAI(model="mistral-large-latest", api_key=api_key, temperature=0).with_structured_output(ScoredCandidate)
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
from PyPDF2 import PdfReader
files = [f for f in os.listdir(local_resumes) if f.lower().endswith(".pdf")]
if not files:
st.error("No PDF files found to process.")
st.stop()
existing_hashes = set(m.get("file_hash") for m in vs.get()["metadatas"]) if vs.get()["metadatas"] else set()
for f in files:
try:
reader = PdfReader(f"{local_resumes}/{f}")
txt = "".join([p.extract_text() for p in reader.pages])
f_hash = get_md5(txt)
if f_hash not in existing_hashes:
status.write(f"🧠 AI Scoring: {f}")
res = llm.invoke(f"JD: {job_desc}\n\nRESUME: {txt[:8000]}")
chunks = splitter.split_text(txt)
docs = [Document(page_content=f"CANDIDATE: {res.name}\n---\n{c}",
metadata={"candidate_name": res.name, "file_hash": f_hash, "score": res.score}) for c in chunks]
vs.add_documents(docs)
except Exception as e:
status.write(f"❌ Error skipping {f}: {e}")
# Leaderboard Logic
all_meta = vs.get()["metadatas"]
unique = {m['candidate_name']: m['score'] for m in all_meta}
sorted_ranks = sorted(unique.items(), key=lambda x: x[1], reverse=True)
sum_text = "\n".join([f"- {n}: {s}/100" for n, s in sorted_ranks])
st.session_state.name_list = list(unique.keys())
st.session_state.summary = sum_text
projects[p_id] = {"date": str(datetime.now()), "names": st.session_state.name_list, "summary": sum_text}
with open(MANIFEST_PATH, "w") as f: json.dump(projects, f)
status.write("☁️ Saving results to Cloud Dataset...")
sync_to_hf()
status.update(label="βœ… Analysis Complete!", state="complete")
except Exception as api_err:
st.error(f"AI Service Error: {api_err}")
# =================================================================
# 6. CHAT INTERFACE
# =================================================================
if "summary" in st.session_state:
st.subheader("πŸ† Leaderboard & Discussion")
col1, col2 = st.columns([1, 2])
with col1:
st.info(st.session_state.summary)
with col2:
chat_container = st.container(height=400)
for msg in st.session_state.messages:
chat_container.chat_message(msg["role"]).write(msg["content"])
if prompt := st.chat_input("Ask about a candidate's experience..."):
st.session_state.messages.append({"role": "user", "content": prompt})
chat_container.chat_message("user").write(prompt)
# Retrieval
emb = MistralAIEmbeddings(mistral_api_key=api_key)
vs = Chroma(persist_directory=DB_PATH, embedding_function=emb)
intent_llm = ChatMistralAI(model="mistral-large-latest", api_key=api_key).with_structured_output(SearchIntent)
try:
intent = intent_llm.invoke(f"Candidates: {st.session_state.name_list}\nQuestion: {prompt}")
filt = {"candidate_name": {"$in": intent.specific_candidates}} if intent.specific_candidates else None
docs = vs.similarity_search(prompt, k=20, filter=filt)
ctx = "\n\n".join([d.page_content for d in docs])
chat_llm = ChatMistralAI(model="mistral-large-latest", api_key=api_key)
full_prompt = ChatPromptTemplate.from_messages([
("system", "You are an HR Assistant. Use the context to answer precisely and shortly."),
MessagesPlaceholder(variable_name="history"),
("human", "CONTEXT:\n{context}\n\nQUESTION: {input}")
])
hist = [HumanMessage(m["content"]) if m["role"]=="user" else AIMessage(m["content"]) for m in st.session_state.messages[-5:]]
res = (full_prompt | chat_llm).invoke({"input": prompt, "context": ctx, "history": hist})
st.session_state.messages.append({"role": "assistant", "content": res.content})
chat_container.chat_message("assistant").write(res.content)
except Exception as e:
st.error(f"Chat Error: {e}")