Devarshi's picture
Update app.py
6682d9a
"""
CareerCompass AI - Complete Multi-Agent RAG System
Streamlit version for HuggingFace Spaces
"""
import os
import streamlit as st
from typing import List, Dict, Any, Optional
from pathlib import Path
import pandas as pd
import chromadb
from chromadb.config import Settings as ChromaSettings
from openai import OpenAI
from groq import Groq
import google.generativeai as genai
from PyPDF2 import PdfReader
from docx import Document as DocxDocument
from abc import ABC, abstractmethod
import uuid
# ==================== CONFIGURATION ====================
class Settings:
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
GROQ_MODEL = "llama-3.3-70b-versatile"
OPENAI_MODEL = "gpt-4o-mini"
GEMINI_MODEL = "gemini-2.5-flash"
OPENAI_EMBEDDING_MODEL = "text-embedding-3-small"
CHROMA_PERSIST_DIR = "data/vector_store"
CHROMA_COLLECTION_JOBS = "job_descriptions"
TOP_K_RETRIEVAL = 5
settings = Settings()
os.makedirs("data", exist_ok=True)
os.makedirs("data/kaggle", exist_ok=True)
os.makedirs("data/vector_store", exist_ok=True)
# ==================== DATA PROCESSOR ====================
class DataProcessor:
def __init__(self, data_dir: str = "data/kaggle"):
self.data_dir = Path(data_dir)
def load_postings(self, max_rows: Optional[int] = None) -> pd.DataFrame:
postings_path = self.data_dir / "postings.csv"
if not postings_path.exists():
return pd.DataFrame()
return pd.read_csv(postings_path, nrows=max_rows)
def load_companies(self) -> pd.DataFrame:
path = self.data_dir / "companies.csv"
return pd.read_csv(path) if path.exists() else pd.DataFrame()
def load_skills(self) -> pd.DataFrame:
path = self.data_dir / "skills.csv"
return pd.read_csv(path) if path.exists() else pd.DataFrame()
def load_job_skills(self) -> pd.DataFrame:
path = self.data_dir / "job_skills.csv"
return pd.read_csv(path) if path.exists() else pd.DataFrame()
def combine_job_skills(self, postings_df, job_skills_df, skills_df):
if job_skills_df.empty or skills_df.empty:
postings_df['required_skills'] = ""
return postings_df
job_skills_with_names = job_skills_df.merge(skills_df, on='skill_abr', how='left')
skills_grouped = job_skills_with_names.groupby('job_id')['skill_name'].apply(
lambda x: ', '.join(x.dropna().astype(str))
).reset_index()
skills_grouped.columns = ['job_id', 'required_skills']
postings_with_skills = postings_df.merge(skills_grouped, on='job_id', how='left')
postings_with_skills['required_skills'] = postings_with_skills['required_skills'].fillna('')
return postings_with_skills
def process_jobs(self, max_jobs: Optional[int] = None) -> List[Dict]:
postings_df = self.load_postings(max_rows=max_jobs)
if postings_df.empty:
return []
companies_df = self.load_companies()
skills_df = self.load_skills()
job_skills_df = self.load_job_skills()
if not job_skills_df.empty and not skills_df.empty:
postings_df = self.combine_job_skills(postings_df, job_skills_df, skills_df)
postings_df = postings_df[postings_df['description'].notna()]
postings_df = postings_df[postings_df['description'].str.len() > 50]
postings_df = postings_df.drop_duplicates(subset=['title', 'description'])
jobs = []
for idx, row in postings_df.iterrows():
job_text = self._build_job_text(row)
metadata = self._build_metadata(row)
jobs.append({
'id': f"job_{row['job_id']}",
'description': job_text,
'metadata': metadata
})
return jobs
def _build_job_text(self, row: pd.Series) -> str:
parts = []
if pd.notna(row.get('title')):
parts.append(f"Job Title: {row['title']}")
company_name = row.get('company_name') or row.get('name_company')
if pd.notna(company_name):
parts.append(f"Company: {company_name}")
if pd.notna(row.get('location')):
parts.append(f"Location: {row['location']}")
if pd.notna(row.get('formatted_work_type')):
parts.append(f"Work Type: {row['formatted_work_type']}")
if pd.notna(row.get('formatted_experience_level')):
parts.append(f"Experience Level: {row['formatted_experience_level']}")
if pd.notna(row.get('required_skills')) and row['required_skills']:
parts.append(f"Required Skills: {row['required_skills']}")
if pd.notna(row.get('description')):
parts.append(f"\nJob Description:\n{row['description']}")
return "\n".join(parts)
def _build_metadata(self, row: pd.Series) -> Dict:
metadata = {
'job_id': str(row['job_id']),
'title': str(row.get('title', 'Unknown')),
'company': str(row.get('company_name') or row.get('name_company', 'Unknown')),
'location': str(row.get('location', 'Unknown'))
}
return metadata
# ==================== DOCUMENT PARSER ====================
class DocumentParser:
@staticmethod
def parse_file(file_path: str) -> str:
ext = Path(file_path).suffix.lower()
if ext == '.pdf':
reader = PdfReader(file_path)
return "\n".join([page.extract_text() for page in reader.pages])
elif ext == '.docx':
doc = DocxDocument(file_path)
return "\n".join([p.text for p in doc.paragraphs])
elif ext == '.txt':
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
raise ValueError(f"Unsupported format: {ext}")
# ==================== EMBEDDING GENERATOR ====================
class EmbeddingGenerator:
def __init__(self):
self.client = OpenAI(api_key=settings.OPENAI_API_KEY)
self.model = settings.OPENAI_EMBEDDING_MODEL
def generate_embedding(self, text: str) -> List[float]:
response = self.client.embeddings.create(model=self.model, input=text)
return response.data[0].embedding
def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
response = self.client.embeddings.create(model=self.model, input=texts)
return [item.embedding for item in response.data]
# ==================== CHROMA MANAGER ====================
class ChromaManager:
def __init__(self):
self.client = chromadb.PersistentClient(
path=settings.CHROMA_PERSIST_DIR,
settings=ChromaSettings(anonymized_telemetry=False)
)
self.embedding_generator = EmbeddingGenerator()
self.jobs_collection = self._get_or_create_collection(settings.CHROMA_COLLECTION_JOBS)
def _get_or_create_collection(self, name: str):
try:
return self.client.get_collection(name=name)
except:
return self.client.create_collection(name=name, metadata={"hnsw:space": "cosine"})
def add_jobs_batch(self, jobs: List[Dict[str, Any]]):
if not jobs:
return
documents = [job['description'] for job in jobs]
metadatas = [job['metadata'] for job in jobs]
ids = [job['id'] for job in jobs]
embeddings = self.embedding_generator.generate_embeddings(documents)
self.jobs_collection.add(
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
ids=ids
)
def search_jobs(self, query: str, n_results: int = 5) -> Dict[str, Any]:
query_embedding = self.embedding_generator.generate_embedding(query)
results = self.jobs_collection.query(
query_embeddings=[query_embedding],
n_results=n_results
)
return {
'documents': results['documents'][0] if results['documents'] else [],
'metadatas': results['metadatas'][0] if results['metadatas'] else [],
'distances': results['distances'][0] if results['distances'] else [],
'ids': results['ids'][0] if results['ids'] else []
}
def get_stats(self):
return {'total_jobs': self.jobs_collection.count()}
# ==================== LLM FACTORY ====================
class LLMFactory:
def __init__(self, provider: str):
self.provider = provider
if provider == "groq":
self.client = Groq(api_key=settings.GROQ_API_KEY)
self.model = settings.GROQ_MODEL
elif provider == "openai":
self.client = OpenAI(api_key=settings.OPENAI_API_KEY)
self.model = settings.OPENAI_MODEL
elif provider == "gemini":
genai.configure(api_key=settings.GOOGLE_API_KEY)
self.model = settings.GEMINI_MODEL
def generate(self, prompt: str, system_prompt: str = "", temp: float = 0.7, max_tok: int = 2000) -> str:
if self.provider == "gemini":
full_prompt = f"{system_prompt}\n\n{prompt}" if system_prompt else prompt
model = genai.GenerativeModel(self.model)
response = model.generate_content(
full_prompt,
generation_config=genai.GenerationConfig(temperature=temp, max_output_tokens=max_tok)
)
return response.text
else:
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=temp,
max_tokens=max_tok
)
return response.choices[0].message.content
# ==================== BASE AGENT ====================
class BaseAgent(ABC):
def __init__(self, llm_provider: str):
self.llm = LLMFactory(provider=llm_provider)
@abstractmethod
def get_system_prompt(self) -> str:
pass
@abstractmethod
def process(self, query: str, context: Dict[str, Any] = None) -> str:
pass
def generate_response(self, user_prompt: str, system_prompt: str = None,
temperature: float = 0.7, max_tokens: int = 2000) -> str:
system_prompt = system_prompt or self.get_system_prompt()
return self.llm.generate(user_prompt, system_prompt, temperature, max_tokens)
# ==================== SUPERVISOR AGENT ====================
class SupervisorAgent(BaseAgent):
def __init__(self):
super().__init__(llm_provider='openai')
def get_system_prompt(self) -> str:
return "Query router"
def process(self, query: str, context: Dict[str, Any] = None) -> str:
q = query.lower().strip()
job_kw = ['find job', 'match job', 'job search', 'which job', 'best job', 'recommend job']
resume_kw = ['review resume', 'resume feedback', 'improve resume', 'fix resume']
interview_kw = ['interview prep', 'interview question', 'prepare interview']
for kw in job_kw:
if kw in q:
return 'job_matcher'
for kw in resume_kw:
if kw in q:
return 'resume_coach'
for kw in interview_kw:
if kw in q:
return 'interview_prep'
if any(w in q.split() for w in ['job', 'jobs']):
return 'job_matcher'
if 'resume' in q:
return 'resume_coach'
if 'interview' in q:
return 'interview_prep'
return 'general'
# ==================== JOB MATCHER AGENT ====================
class JobMatcherAgent(BaseAgent):
def __init__(self, chroma_manager):
super().__init__(llm_provider='groq')
self.chroma_manager = chroma_manager
def get_system_prompt(self) -> str:
return """You are an expert job matching advisor. Analyze resumes against job postings.
For each job provide:
**Overall Match Score:** X%
- Why you're a great fit (cite evidence from resume)
- Skills to highlight
- Potential gaps
- Recommendation"""
def process(self, query: str, context: Dict[str, Any] = None) -> str:
if not context or 'resume_text' not in context:
return "Please provide your resume."
resume_text = context['resume_text']
n_results = context.get('n_results', 5)
stats = self.chroma_manager.get_stats()
if stats['total_jobs'] == 0:
return "⚠️ No jobs loaded. Please upload CSV files to data/kaggle/ folder."
results = self.chroma_manager.search_jobs(resume_text, n_results=n_results)
if not results['documents']:
return "No matching jobs found."
context_parts = ["=== CANDIDATE'S RESUME ===", resume_text, "\n=== RELEVANT JOBS ==="]
for i, (doc, meta, dist) in enumerate(zip(
results['documents'], results['metadatas'], results['distances']
)):
context_parts.append(f"\n[Job {i+1}: {meta.get('title', 'Unknown')}]")
context_parts.append(f"Company: {meta.get('company', 'Unknown')}")
context_parts.append(f"Similarity: {(1-dist)*100:.1f}%")
context_parts.append(f"\n{doc[:1000]}")
context_parts.append("-" * 60)
full_context = "\n".join(context_parts)
return self.generate_response(
user_prompt=full_context + f"\n\nUser Query: {query}",
max_tokens=4000
)
# ==================== RESUME COACH AGENT ====================
class ResumeCoachAgent(BaseAgent):
def __init__(self):
super().__init__(llm_provider='groq')
def get_system_prompt(self) -> str:
return """Professional resume coach. Provide:
1. Overall Assessment
2. Section-by-Section Analysis (quote text)
3. Quick Wins
4. ATS Optimization"""
def process(self, query: str, context: Dict[str, Any] = None) -> str:
if not context or 'resume_text' not in context:
return "Please provide your resume."
prompt = f"""=== RESUME ===
{context['resume_text']}
Query: {query}
Provide detailed feedback."""
return self.generate_response(user_prompt=prompt, max_tokens=3000)
# ==================== INTERVIEW PREP AGENT ====================
class InterviewPrepAgent(BaseAgent):
def __init__(self):
super().__init__(llm_provider='gemini')
def get_system_prompt(self) -> str:
return """Interview coach. Generate:
1. Interview Strategy
2. Technical Questions (4-5)
3. Behavioral Questions (4-5 STAR)
4. Questions to Ask"""
def process(self, query: str, context: Dict[str, Any] = None) -> str:
if not context or 'resume_text' not in context:
return "Please provide your resume."
prompt = f"""=== RESUME ===
{context['resume_text']}
Request: {query}
Generate comprehensive interview prep."""
return self.generate_response(user_prompt=prompt, temperature=0.8, max_tokens=8192)
# ==================== INITIALIZE SYSTEM ====================
@st.cache_resource
def initialize_system():
"""Initialize ChromaDB and agents (cached)"""
chroma = ChromaManager()
# Load jobs from CSV if available and vector store is empty
if chroma.get_stats()['total_jobs'] == 0:
processor = DataProcessor()
jobs = processor.process_jobs(max_jobs=500) # Load up to 500 jobs
if jobs:
with st.spinner(f"Loading {len(jobs)} jobs into vector store..."):
batch_size = 50
for i in range(0, len(jobs), batch_size):
batch = jobs[i:i + batch_size]
chroma.add_jobs_batch(batch)
st.success(f"βœ… Loaded {len(jobs)} jobs!")
supervisor = SupervisorAgent()
job_matcher = JobMatcherAgent(chroma)
resume_coach = ResumeCoachAgent()
interview_prep = InterviewPrepAgent()
return chroma, supervisor, job_matcher, resume_coach, interview_prep
# ==================== STREAMLIT UI ====================
st.set_page_config(page_title="CareerCompass AI", page_icon="πŸ’Ό", layout="wide")
# Initialize system
chroma, supervisor, job_matcher, resume_coach, interview_prep = initialize_system()
# Header
st.title("πŸ’Ό CareerCompass AI Assistant")
st.markdown("### Multi-Agent RAG System | OpenAI β€’ Groq β€’ Gemini")
# Sidebar
with st.sidebar:
st.header("πŸ“€ Upload Resume")
uploaded_file = st.file_uploader("Choose file", type=['pdf', 'docx', 'txt'])
if uploaded_file:
try:
# Save temp file
temp_path = f"temp_{uploaded_file.name}"
with open(temp_path, "wb") as f:
f.write(uploaded_file.getbuffer())
resume_text = DocumentParser.parse_file(temp_path)
st.session_state['resume_text'] = resume_text
st.success(f"βœ… Resume uploaded!\n\nπŸ“ {len(resume_text)} characters")
# Clean up
os.remove(temp_path)
except Exception as e:
st.error(f"Error: {e}")
st.markdown("---")
st.header("πŸ“Š System Info")
stats = chroma.get_stats()
st.metric("Jobs in Database", stats['total_jobs'])
st.metric("Embedding Model", "OpenAI (1536-dim)")
st.metric("Active Agents", "4")
st.markdown("---")
st.header("⚑ Quick Actions")
if st.button("πŸ” Find Matching Jobs", use_container_width=True):
st.session_state['quick_query'] = "Find matching jobs for me"
if st.button("πŸ“ Review My Resume", use_container_width=True):
st.session_state['quick_query'] = "Review my resume and suggest improvements"
if st.button("🎯 Interview Prep", use_container_width=True):
st.session_state['quick_query'] = "Help me prepare for an interview"
st.markdown("---")
st.caption("**Group 8**\nDevarshi Anil Mahajan\nSomya Sidharth Padhy")
# Initialize chat history
if 'messages' not in st.session_state:
st.session_state['messages'] = []
if 'resume_text' not in st.session_state:
st.session_state['resume_text'] = None
# Display chat history
for message in st.session_state['messages']:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Handle quick actions
if 'quick_query' in st.session_state:
query = st.session_state['quick_query']
del st.session_state['quick_query']
if not st.session_state['resume_text']:
st.warning("⚠️ Please upload your resume first!")
else:
# Add user message
st.session_state['messages'].append({"role": "user", "content": query})
with st.chat_message("user"):
st.markdown(query)
# Process with agent
with st.chat_message("assistant"):
with st.spinner("Processing..."):
agent_type = supervisor.process(query)
context = {'resume_text': st.session_state['resume_text']}
if agent_type == 'job_matcher':
context['n_results'] = 5
response = job_matcher.process(query, context)
elif agent_type == 'resume_coach':
response = resume_coach.process(query, context)
elif agent_type == 'interview_prep':
response = interview_prep.process(query, context)
else:
response = "πŸ‘‹ Hi! Upload your resume and ask about jobs, resume review, or interview prep!"
st.markdown(response)
st.session_state['messages'].append({"role": "assistant", "content": response})
# Chat input
if prompt := st.chat_input("Ask me anything about jobs, resumes, or interviews..."):
if not st.session_state['resume_text']:
st.warning("⚠️ Please upload your resume first!")
else:
# Add user message
st.session_state['messages'].append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# Process with agent
with st.chat_message("assistant"):
with st.spinner("Processing..."):
agent_type = supervisor.process(prompt)
context = {'resume_text': st.session_state['resume_text']}
if agent_type == 'job_matcher':
context['n_results'] = 5
response = job_matcher.process(prompt, context)
elif agent_type == 'resume_coach':
response = resume_coach.process(prompt, context)
elif agent_type == 'interview_prep':
response = interview_prep.process(prompt, context)
else:
response = "πŸ‘‹ Hi! I can help with job search, resume review, or interview prep!"
st.markdown(response)
st.session_state['messages'].append({"role": "assistant", "content": response})
# Clear chat button
if st.session_state['messages']:
if st.sidebar.button("πŸ—‘οΈ Clear Chat", use_container_width=True):
st.session_state['messages'] = []
st.rerun()