import os import dotenv dotenv.load_dotenv() import json import requests import streamlit as st # ───────────────────────────────────────────── # CONFIG # ───────────────────────────────────────────── BASE_URL = os.environ.get("BACKEND_BASE_URL", "http://localhost:8000") st.set_page_config( page_title="Candidate Explorer", page_icon="🔍", layout="wide", initial_sidebar_state="expanded", ) # ───────────────────────────────────────────── # GLOBAL CSS # ───────────────────────────────────────────── st.markdown( """ """, unsafe_allow_html=True, ) # ───────────────────────────────────────────── # SESSION STATE INIT # ───────────────────────────────────────────── for key, default in [ ("token", None), ("user", None), ("upload_results", []), ("extract_result", None), ("user_files", []), ]: if key not in st.session_state: st.session_state[key] = default # ───────────────────────────────────────────── # API HELPERS # ───────────────────────────────────────────── def _headers(): return {"Authorization": f"Bearer {st.session_state.token}"} def api_login(email: str, password: str): """POST /admin/login — returns (token, error_msg)""" try: resp = requests.post( f"{BASE_URL}/admin/login", data={"username": email, "password": password}, timeout=15, ) if resp.status_code == 200: return resp.json().get("access_token"), None detail = resp.json().get("detail", resp.text) return None, str(detail) except requests.exceptions.ConnectionError: return None, "Cannot connect to backend. Check BACKEND_BASE_URL." except Exception as e: return None, str(e) def api_get_me(): """GET /admin/me — returns user dict""" try: resp = requests.get(f"{BASE_URL}/admin/me", headers=_headers(), timeout=10) if resp.status_code == 200: return resp.json(), None return None, resp.json().get("detail", resp.text) except Exception as e: return None, str(e) def api_get_scorecard(): """GET /file/score_card — returns data dict""" try: resp = requests.get(f"{BASE_URL}/file/score_card", headers=_headers(), timeout=10) if resp.status_code == 200: return resp.json().get("data", {}), None return None, resp.json().get("detail", resp.text) except Exception as e: return None, str(e) def api_upload_files(uploaded_files): """POST /file/upload — returns list of results""" try: files = [ ("files", (f.name, f.read(), "application/pdf")) for f in uploaded_files ] resp = requests.post( f"{BASE_URL}/file/upload", headers=_headers(), files=files, timeout=60, ) if resp.status_code == 201: return resp.json().get("files", []), None return None, resp.json().get("detail", resp.text) except Exception as e: return None, str(e) def api_get_user_files(user_id: str): """GET /file/user/{user_id} — returns list of file dicts""" try: resp = requests.get( f"{BASE_URL}/file/user/{user_id}", headers=_headers(), timeout=10, ) if resp.status_code == 200: return resp.json().get("files", []), None return None, resp.json().get("detail", resp.text) except Exception as e: return None, str(e) def api_extract_profile(filename: str): """POST /profile/extract_profile?filename=... — returns profile dict""" try: resp = requests.post( f"{BASE_URL}/profile/extract_profile", headers=_headers(), params={"filename": filename}, timeout=120, ) if resp.status_code == 200: return resp.json(), None return None, resp.json().get("detail", resp.text) except Exception as e: return None, str(e) def api_delete_file(filename: str): """DELETE /file/{filename}""" try: resp = requests.delete( f"{BASE_URL}/file/{filename}", headers=_headers(), timeout=15, ) if resp.status_code == 200: return True, None return False, resp.json().get("detail", resp.text) except Exception as e: return False, str(e) # ───────────────────────────────────────────── # UI COMPONENTS # ───────────────────────────────────────────── def render_scorecard(): sc, err = api_get_scorecard() if err: st.warning(f"Could not load scorecard: {err}") return total_file = sc.get("total_file", 0) total_extracted = sc.get("total_extracted", 0) pct = sc.get("percent_extracted", 0) # percent_extracted may be a float like 75.0 or string "75%" if isinstance(pct, str): pct_display = pct else: pct_display = f"{pct:.1f}%" st.markdown( f"""
📁 Total CVs Uploaded
{total_file}
files in your workspace
✅ Profiles Extracted
{total_extracted}
structured profiles
📊 Extraction Rate
{pct_display}
of uploaded CVs processed
""", unsafe_allow_html=True, ) def render_sidebar(): user = st.session_state.user or {} with st.sidebar: st.markdown( f"""
👤
{user.get('full_name', 'User')}
{user.get('email', '')}
{user.get('role', 'user').upper()}

""", unsafe_allow_html=True, ) # st.markdown( # "
BACKEND
", # unsafe_allow_html=True, # ) # st.markdown( # f"{BASE_URL}", # unsafe_allow_html=True, # ) # st.markdown("
", unsafe_allow_html=True) if st.button("🚪 Logout", use_container_width=True): for key in ["token", "user", "upload_results", "extract_result", "user_files"]: st.session_state[key] = None if key in ("token", "user", "extract_result") else [] st.rerun() # ───────────────────────────────────────────── # PAGES # ───────────────────────────────────────────── def page_login(): # Center the login card _, col, _ = st.columns([1, 1.4, 1]) with col: st.markdown( """

🔍 Candidate Explorer

Sign in to manage and analyze candidate CVs.

""", unsafe_allow_html=True, ) with st.form("login_form", clear_on_submit=False): st.markdown( "
Sign In
", unsafe_allow_html=True ) email = st.text_input("Email address", placeholder="you@company.com") password = st.text_input("Password", type="password", placeholder="••••••••") submitted = st.form_submit_button("Sign In →", use_container_width=True) if submitted: if not email or not password: st.error("Please enter both email and password.") else: with st.spinner("Signing in…"): token, err = api_login(email, password) if err: st.error(f"Login failed: {err}") else: st.session_state.token = token user, err2 = api_get_me() if err2: st.session_state.user = {"email": email, "full_name": email, "role": "user"} else: st.session_state.user = user st.rerun() def page_main(): render_sidebar() # ── Page header ────────────────────────────── st.markdown( "

" "🔍 Candidate Explorer" "

" "

" "Upload CVs, extract candidate profiles, and track your workspace.

", unsafe_allow_html=True, ) # ── Scorecard ───────────────────────────────── st.markdown("
📊 Dashboard Overview
", unsafe_allow_html=True) render_scorecard() st.markdown("
", unsafe_allow_html=True) # ── Tabs ────────────────────────────────────── tab_upload, tab_extract = st.tabs(["📁 Upload CV", "🧠 Extract Profile"]) # ══════════════════════════════════════════ # TAB 1 — UPLOAD # ══════════════════════════════════════════ with tab_upload: st.markdown("
", unsafe_allow_html=True) st.markdown( "
Upload Candidate CVs
", unsafe_allow_html=True, ) uploaded = st.file_uploader( "Drop PDF files here or click to browse", type=["pdf"], accept_multiple_files=True, help="Only PDF files are accepted.", ) col_btn, col_info = st.columns([1, 3]) with col_btn: do_upload = st.button("⬆️ Upload", use_container_width=True, disabled=not uploaded) if do_upload and uploaded: with st.spinner(f"Uploading {len(uploaded)} file(s)…"): results, err = api_upload_files(uploaded) if err: st.error(f"Upload failed: {err}") else: st.session_state.upload_results = results # Refresh user files list user = st.session_state.user or {} uid = str(user.get("user_id", "")) if uid: files, _ = api_get_user_files(uid) st.session_state.user_files = files or [] st.rerun() # ── Results table ── if st.session_state.upload_results: st.markdown("
", unsafe_allow_html=True) st.markdown( "
Upload Results
", unsafe_allow_html=True, ) for r in st.session_state.upload_results: fname = r.get("filename", r.get("name", "")) status = r.get("status", "uploaded") badge_cls = "badge-success" if "success" in status.lower() or status == "uploaded" else "badge-error" st.markdown( f" " f"{fname} " f"{status}", unsafe_allow_html=True, ) # ── Existing files ── st.markdown("
", unsafe_allow_html=True) st.markdown( "
Your Uploaded Files
", unsafe_allow_html=True, ) user = st.session_state.user or {} uid = str(user.get("user_id", "")) col_refresh, _ = st.columns([1, 5]) with col_refresh: if st.button("🔄 Refresh List", use_container_width=True): if uid: files, err = api_get_user_files(uid) if err: st.warning(f"Could not load files: {err}") else: st.session_state.user_files = files or [] if not st.session_state.user_files and uid: # Auto-load on first visit files, _ = api_get_user_files(uid) st.session_state.user_files = files or [] if st.session_state.user_files: rows = [] for f in st.session_state.user_files: rows.append( { "Filename": f.get("filename", ""), "Type": f.get("file_type", ""), "Extracted": "✅" if f.get("is_extracted") else "⏳", "Uploaded": str(f.get("uploaded_at", ""))[:19], } ) st.dataframe(rows, use_container_width=True, hide_index=True) else: st.info("No files uploaded yet.") # ══════════════════════════════════════════ # TAB 2 — EXTRACT PROFILE # ══════════════════════════════════════════ with tab_extract: st.markdown("
", unsafe_allow_html=True) st.markdown( "
Extract Structured Profile from CV
", unsafe_allow_html=True, ) # Load files if needed user = st.session_state.user or {} uid = str(user.get("user_id", "")) if not st.session_state.user_files and uid: files, _ = api_get_user_files(uid) st.session_state.user_files = files or [] file_options = [f.get("filename", "") for f in st.session_state.user_files if f.get("filename")] if not file_options: st.info("No CVs found. Upload files first in the **Upload CV** tab.") else: col_sel, col_ex = st.columns([3, 1]) with col_sel: chosen = st.selectbox( "Select a CV file to extract", options=file_options, help="Choose a PDF you have already uploaded.", ) with col_ex: st.markdown("
", unsafe_allow_html=True) do_extract = st.button("🧠 Extract", use_container_width=True) if do_extract and chosen: with st.spinner(f"Extracting profile from **{chosen}**… this may take a moment."): result, err = api_extract_profile(chosen) if err: st.error(f"Extraction failed: {err}") st.session_state.extract_result = None else: st.session_state.extract_result = result # Refresh files to update is_extracted flag if uid: files, _ = api_get_user_files(uid) st.session_state.user_files = files or [] st.rerun() # ── Display extracted profile ── if st.session_state.extract_result: st.markdown("
", unsafe_allow_html=True) result = st.session_state.extract_result # Try to highlight key fields fullname = result.get("fullname") or result.get("full_name", "") if fullname: st.markdown( f"
" f"👤 {fullname}" f"
", unsafe_allow_html=True, ) col_l, col_r = st.columns(2) with col_l: st.markdown("
Education
", unsafe_allow_html=True) for i in range(1, 4): univ = result.get(f"univ_edu_{i}", "") major = result.get(f"major_edu_{i}", "") gpa = result.get(f"gpa_edu_{i}", "") if univ or major: gpa_str = f" · GPA {gpa}" if gpa else "" st.markdown( f"
" f"{univ or '—'}
" f"{major or ''}{gpa_str}" f"
", unsafe_allow_html=True, ) st.markdown("
Experience
", unsafe_allow_html=True) yoe = result.get("yoe") domicile = result.get("domicile", "") st.markdown( f"
" f"Years of Experience: " f"{yoe if yoe is not None else '—'}
" f"Domicile: " f"{domicile or '—'}" f"
", unsafe_allow_html=True, ) with col_r: def _tag_list(label, items, color="#c7cef5", text_color="#435cdc"): if not items: return st.markdown( f"
{label}
", unsafe_allow_html=True, ) tags = "".join( f"" f"{t}" for t in items ) st.markdown( f"
{tags}
", unsafe_allow_html=True, ) _tag_list("💻 Hard Skills", result.get("hardskills", [])) _tag_list("🤝 Soft Skills", result.get("softskills", []), "#f4f6ff", "#333e4a") _tag_list("🏆 Certifications", result.get("certifications", []), "#fff3c4", "#a07c00") _tag_list("🏢 Business Domains", result.get("business_domain", []), "#c7cef5", "#435cdc") # Raw JSON toggle with st.expander("📄 Raw JSON response"): st.markdown( f"
{json.dumps(result, indent=2, default=str)}
", unsafe_allow_html=True, ) # ───────────────────────────────────────────── # ROUTER # ───────────────────────────────────────────── if st.session_state.token: page_main() else: page_login()