import os
import dotenv
dotenv.load_dotenv()
import json
import requests
import streamlit as st
# ─────────────────────────────────────────────
# CONFIG
# ─────────────────────────────────────────────
BASE_URL = os.environ.get("BACKEND_BASE_URL", "http://localhost:8000")
st.set_page_config(
page_title="Candidate Explorer",
page_icon="🔍",
layout="wide",
initial_sidebar_state="expanded",
)
# ─────────────────────────────────────────────
# GLOBAL CSS
# ─────────────────────────────────────────────
st.markdown(
"""
""",
unsafe_allow_html=True,
)
# ─────────────────────────────────────────────
# SESSION STATE INIT
# ─────────────────────────────────────────────
for key, default in [
("token", None),
("user", None),
("upload_results", []),
("extract_result", None),
("user_files", []),
]:
if key not in st.session_state:
st.session_state[key] = default
# ─────────────────────────────────────────────
# API HELPERS
# ─────────────────────────────────────────────
def _headers():
return {"Authorization": f"Bearer {st.session_state.token}"}
def api_login(email: str, password: str):
"""POST /admin/login — returns (token, error_msg)"""
try:
resp = requests.post(
f"{BASE_URL}/admin/login",
data={"username": email, "password": password},
timeout=15,
)
if resp.status_code == 200:
return resp.json().get("access_token"), None
detail = resp.json().get("detail", resp.text)
return None, str(detail)
except requests.exceptions.ConnectionError:
return None, "Cannot connect to backend. Check BACKEND_BASE_URL."
except Exception as e:
return None, str(e)
def api_get_me():
"""GET /admin/me — returns user dict"""
try:
resp = requests.get(f"{BASE_URL}/admin/me", headers=_headers(), timeout=10)
if resp.status_code == 200:
return resp.json(), None
return None, resp.json().get("detail", resp.text)
except Exception as e:
return None, str(e)
def api_get_scorecard():
"""GET /file/score_card — returns data dict"""
try:
resp = requests.get(f"{BASE_URL}/file/score_card", headers=_headers(), timeout=10)
if resp.status_code == 200:
return resp.json().get("data", {}), None
return None, resp.json().get("detail", resp.text)
except Exception as e:
return None, str(e)
def api_upload_files(uploaded_files):
"""POST /file/upload — returns list of results"""
try:
files = [
("files", (f.name, f.read(), "application/pdf"))
for f in uploaded_files
]
resp = requests.post(
f"{BASE_URL}/file/upload",
headers=_headers(),
files=files,
timeout=60,
)
if resp.status_code == 201:
return resp.json().get("files", []), None
return None, resp.json().get("detail", resp.text)
except Exception as e:
return None, str(e)
def api_get_user_files(user_id: str):
"""GET /file/user/{user_id} — returns list of file dicts"""
try:
resp = requests.get(
f"{BASE_URL}/file/user/{user_id}",
headers=_headers(),
timeout=10,
)
if resp.status_code == 200:
return resp.json().get("files", []), None
return None, resp.json().get("detail", resp.text)
except Exception as e:
return None, str(e)
def api_extract_profile(filename: str):
"""POST /profile/extract_profile?filename=... — returns profile dict"""
try:
resp = requests.post(
f"{BASE_URL}/profile/extract_profile",
headers=_headers(),
params={"filename": filename},
timeout=120,
)
if resp.status_code == 200:
return resp.json(), None
return None, resp.json().get("detail", resp.text)
except Exception as e:
return None, str(e)
def api_delete_file(filename: str):
"""DELETE /file/{filename}"""
try:
resp = requests.delete(
f"{BASE_URL}/file/{filename}",
headers=_headers(),
timeout=15,
)
if resp.status_code == 200:
return True, None
return False, resp.json().get("detail", resp.text)
except Exception as e:
return False, str(e)
# ─────────────────────────────────────────────
# UI COMPONENTS
# ─────────────────────────────────────────────
def render_scorecard():
sc, err = api_get_scorecard()
if err:
st.warning(f"Could not load scorecard: {err}")
return
total_file = sc.get("total_file", 0)
total_extracted = sc.get("total_extracted", 0)
pct = sc.get("percent_extracted", 0)
# percent_extracted may be a float like 75.0 or string "75%"
if isinstance(pct, str):
pct_display = pct
else:
pct_display = f"{pct:.1f}%"
st.markdown(
f"""
📁 Total CVs Uploaded
{total_file}
files in your workspace
✅ Profiles Extracted
{total_extracted}
structured profiles
📊 Extraction Rate
{pct_display}
of uploaded CVs processed
""",
unsafe_allow_html=True,
)
def render_sidebar():
user = st.session_state.user or {}
with st.sidebar:
st.markdown(
f"""
👤
{user.get('full_name', 'User')}
{user.get('email', '')}
{user.get('role', 'user').upper()}
""",
unsafe_allow_html=True,
)
# st.markdown(
# "BACKEND
",
# unsafe_allow_html=True,
# )
# st.markdown(
# f"{BASE_URL}",
# unsafe_allow_html=True,
# )
# st.markdown("
", unsafe_allow_html=True)
if st.button("🚪 Logout", use_container_width=True):
for key in ["token", "user", "upload_results", "extract_result", "user_files"]:
st.session_state[key] = None if key in ("token", "user", "extract_result") else []
st.rerun()
# ─────────────────────────────────────────────
# PAGES
# ─────────────────────────────────────────────
def page_login():
# Center the login card
_, col, _ = st.columns([1, 1.4, 1])
with col:
st.markdown(
"""
🔍 Candidate Explorer
Sign in to manage and analyze candidate CVs.
""",
unsafe_allow_html=True,
)
with st.form("login_form", clear_on_submit=False):
st.markdown(
"Sign In
", unsafe_allow_html=True
)
email = st.text_input("Email address", placeholder="you@company.com")
password = st.text_input("Password", type="password", placeholder="••••••••")
submitted = st.form_submit_button("Sign In →", use_container_width=True)
if submitted:
if not email or not password:
st.error("Please enter both email and password.")
else:
with st.spinner("Signing in…"):
token, err = api_login(email, password)
if err:
st.error(f"Login failed: {err}")
else:
st.session_state.token = token
user, err2 = api_get_me()
if err2:
st.session_state.user = {"email": email, "full_name": email, "role": "user"}
else:
st.session_state.user = user
st.rerun()
def page_main():
render_sidebar()
# ── Page header ──────────────────────────────
st.markdown(
""
"🔍 Candidate Explorer"
"
"
""
"Upload CVs, extract candidate profiles, and track your workspace.
",
unsafe_allow_html=True,
)
# ── Scorecard ─────────────────────────────────
st.markdown("📊 Dashboard Overview
", unsafe_allow_html=True)
render_scorecard()
st.markdown("
", unsafe_allow_html=True)
# ── Tabs ──────────────────────────────────────
tab_upload, tab_extract = st.tabs(["📁 Upload CV", "🧠 Extract Profile"])
# ══════════════════════════════════════════
# TAB 1 — UPLOAD
# ══════════════════════════════════════════
with tab_upload:
st.markdown("
", unsafe_allow_html=True)
st.markdown(
"Upload Candidate CVs
",
unsafe_allow_html=True,
)
uploaded = st.file_uploader(
"Drop PDF files here or click to browse",
type=["pdf"],
accept_multiple_files=True,
help="Only PDF files are accepted.",
)
col_btn, col_info = st.columns([1, 3])
with col_btn:
do_upload = st.button("⬆️ Upload", use_container_width=True, disabled=not uploaded)
if do_upload and uploaded:
with st.spinner(f"Uploading {len(uploaded)} file(s)…"):
results, err = api_upload_files(uploaded)
if err:
st.error(f"Upload failed: {err}")
else:
st.session_state.upload_results = results
# Refresh user files list
user = st.session_state.user or {}
uid = str(user.get("user_id", ""))
if uid:
files, _ = api_get_user_files(uid)
st.session_state.user_files = files or []
st.rerun()
# ── Results table ──
if st.session_state.upload_results:
st.markdown("
", unsafe_allow_html=True)
st.markdown(
"Upload Results
",
unsafe_allow_html=True,
)
for r in st.session_state.upload_results:
fname = r.get("filename", r.get("name", ""))
status = r.get("status", "uploaded")
badge_cls = "badge-success" if "success" in status.lower() or status == "uploaded" else "badge-error"
st.markdown(
f"✓ "
f"{fname} "
f"{status}",
unsafe_allow_html=True,
)
# ── Existing files ──
st.markdown("
", unsafe_allow_html=True)
st.markdown(
"Your Uploaded Files
",
unsafe_allow_html=True,
)
user = st.session_state.user or {}
uid = str(user.get("user_id", ""))
col_refresh, _ = st.columns([1, 5])
with col_refresh:
if st.button("🔄 Refresh List", use_container_width=True):
if uid:
files, err = api_get_user_files(uid)
if err:
st.warning(f"Could not load files: {err}")
else:
st.session_state.user_files = files or []
if not st.session_state.user_files and uid:
# Auto-load on first visit
files, _ = api_get_user_files(uid)
st.session_state.user_files = files or []
if st.session_state.user_files:
rows = []
for f in st.session_state.user_files:
rows.append(
{
"Filename": f.get("filename", ""),
"Type": f.get("file_type", ""),
"Extracted": "✅" if f.get("is_extracted") else "⏳",
"Uploaded": str(f.get("uploaded_at", ""))[:19],
}
)
st.dataframe(rows, use_container_width=True, hide_index=True)
else:
st.info("No files uploaded yet.")
# ══════════════════════════════════════════
# TAB 2 — EXTRACT PROFILE
# ══════════════════════════════════════════
with tab_extract:
st.markdown("
", unsafe_allow_html=True)
st.markdown(
"Extract Structured Profile from CV
",
unsafe_allow_html=True,
)
# Load files if needed
user = st.session_state.user or {}
uid = str(user.get("user_id", ""))
if not st.session_state.user_files and uid:
files, _ = api_get_user_files(uid)
st.session_state.user_files = files or []
file_options = [f.get("filename", "") for f in st.session_state.user_files if f.get("filename")]
if not file_options:
st.info("No CVs found. Upload files first in the **Upload CV** tab.")
else:
col_sel, col_ex = st.columns([3, 1])
with col_sel:
chosen = st.selectbox(
"Select a CV file to extract",
options=file_options,
help="Choose a PDF you have already uploaded.",
)
with col_ex:
st.markdown("", unsafe_allow_html=True)
do_extract = st.button("🧠 Extract", use_container_width=True)
if do_extract and chosen:
with st.spinner(f"Extracting profile from **{chosen}**… this may take a moment."):
result, err = api_extract_profile(chosen)
if err:
st.error(f"Extraction failed: {err}")
st.session_state.extract_result = None
else:
st.session_state.extract_result = result
# Refresh files to update is_extracted flag
if uid:
files, _ = api_get_user_files(uid)
st.session_state.user_files = files or []
st.rerun()
# ── Display extracted profile ──
if st.session_state.extract_result:
st.markdown("
", unsafe_allow_html=True)
result = st.session_state.extract_result
# Try to highlight key fields
fullname = result.get("fullname") or result.get("full_name", "")
if fullname:
st.markdown(
f""
f"👤 {fullname}"
f"
",
unsafe_allow_html=True,
)
col_l, col_r = st.columns(2)
with col_l:
st.markdown("Education
", unsafe_allow_html=True)
for i in range(1, 4):
univ = result.get(f"univ_edu_{i}", "")
major = result.get(f"major_edu_{i}", "")
gpa = result.get(f"gpa_edu_{i}", "")
if univ or major:
gpa_str = f" · GPA {gpa}" if gpa else ""
st.markdown(
f""
f"{univ or '—'}
"
f"{major or ''}{gpa_str}"
f"
",
unsafe_allow_html=True,
)
st.markdown("Experience
", unsafe_allow_html=True)
yoe = result.get("yoe")
domicile = result.get("domicile", "")
st.markdown(
f""
f"Years of Experience: "
f"{yoe if yoe is not None else '—'}
"
f"Domicile: "
f"{domicile or '—'}"
f"
",
unsafe_allow_html=True,
)
with col_r:
def _tag_list(label, items, color="#c7cef5", text_color="#435cdc"):
if not items:
return
st.markdown(
f"{label}
",
unsafe_allow_html=True,
)
tags = "".join(
f""
f"{t}"
for t in items
)
st.markdown(
f"{tags}
",
unsafe_allow_html=True,
)
_tag_list("💻 Hard Skills", result.get("hardskills", []))
_tag_list("🤝 Soft Skills", result.get("softskills", []), "#f4f6ff", "#333e4a")
_tag_list("🏆 Certifications", result.get("certifications", []), "#fff3c4", "#a07c00")
_tag_list("🏢 Business Domains", result.get("business_domain", []), "#c7cef5", "#435cdc")
# Raw JSON toggle
with st.expander("📄 Raw JSON response"):
st.markdown(
f"{json.dumps(result, indent=2, default=str)}
",
unsafe_allow_html=True,
)
# ─────────────────────────────────────────────
# ROUTER
# ─────────────────────────────────────────────
if st.session_state.token:
page_main()
else:
page_login()