Spaces:
Running
Running
| import streamlit as st | |
| try: | |
| from PyPDF2 import PdfReader | |
| except Exception: | |
| PdfReader = None | |
| # Optional AI SDKs | |
| try: | |
| import google.generativeai as genai | |
| except Exception: | |
| genai = None | |
| try: | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_core.prompts import PromptTemplate | |
| except Exception: | |
| ChatGoogleGenerativeAI = None | |
| PromptTemplate = None | |
| from pydantic import BaseModel, Field | |
| from typing import Optional | |
| import os | |
| # --- Pydantic Models (from your code) --- | |
| class questions(BaseModel): | |
| questions: list[str] = Field(description="List of questions") | |
| class introduction(BaseModel): | |
| intro: Optional[str] = Field(description="Give AI agent's intro") | |
| question: str = Field(description="Question asked by AI agent") | |
| followup: Optional[str] = Field(description="The followup question to user's answer") | |
| class evaluation(BaseModel): | |
| marks: int = Field(description="Marks out of 100") | |
| followup: Optional[str] = Field(description="The followup question") | |
| review: Optional[str] = Field(description="Short Review of the answer") | |
| # --- AI & Logic Functions (from your code) --- | |
| def get_llm(api_key): | |
| """Cached function to initialize the LLM.""" | |
| return ChatGoogleGenerativeAI( | |
| model="gemini-2.5-flash", | |
| temperature=1.0, | |
| google_api_key=api_key | |
| ) | |
| def get_models(_llm_model): | |
| """Cached function to get structured output models.""" | |
| generate_questions_resume_model = _llm_model.with_structured_output(questions) | |
| intro_model = _llm_model.with_structured_output(introduction) | |
| evaluate_answers_model = _llm_model.with_structured_output(evaluation) | |
| return generate_questions_resume_model, intro_model, evaluate_answers_model | |
| def read_resume(uploaded_file): | |
| """Reads a PDF file uploaded via Streamlit.""" | |
| try: | |
| if PdfReader is None: | |
| st.warning("PyPDF2 is not installed; resume text extraction disabled.") | |
| return None | |
| reader = PdfReader(uploaded_file) | |
| text = "" | |
| for page in reader.pages: | |
| text += page.extract_text() or "" | |
| return text | |
| except Exception as e: | |
| st.error(f"Error reading PDF: {e}") | |
| return None | |
| def generate_questions_from_resume(resume_text, model): | |
| """Generates interview questions from resume text.""" | |
| if PromptTemplate is None or model is None or not st.session_state.get('enable_llm', False): | |
| # Simple fallback | |
| questions = ["Tell me about your most significant project.", "Describe a challenging bug you fixed.", "How do you design for scalability?", "Which technologies are you most comfortable with?"] | |
| return questions | |
| parse_resume_prompt_template = PromptTemplate( | |
| template="""Generate 4-8 interview questions about the Experience and Projects section from this given text of from a resume. | |
| Try to cover all projects and experience. Generate some conceptual questions too. Don't generate unnecessary questions. | |
| Resume:\n{text}""", | |
| input_variables=['text'] | |
| ) | |
| try: | |
| if not st.session_state.get('enable_llm', False): | |
| raise RuntimeError('LLM disabled') | |
| generate_question_from_resume_chain = parse_resume_prompt_template | model | |
| output = generate_question_from_resume_chain.invoke({'text': resume_text}) | |
| return getattr(output, 'questions', output) | |
| except Exception as e: | |
| st.warning(f"LLM question generation failed or disabled, using fallback: {e}") | |
| questions = ["Tell me about your most significant project.", "Describe a challenging bug you fixed.", "How do you design for scalability?", "Which technologies are you most comfortable with?"] | |
| return questions | |
| def get_introduction(model): | |
| """Gets the AI's intro and first question.""" | |
| if PromptTemplate is None or model is None or not st.session_state.get('enable_llm', False): | |
| return type('O', (), {'intro': "Hello, I'm Interviewer.AI. Please introduce yourself.", 'question': "Can you briefly introduce yourself?"})() | |
| introduction_prompt = PromptTemplate(template="""Introduce yourself to the user telling the user that you are a AI agent. And ask the user to give introduction""") | |
| try: | |
| if not st.session_state.get('enable_llm', False): | |
| raise RuntimeError('LLM disabled') | |
| intro_chain = introduction_prompt | model | |
| output = intro_chain.invoke({}) | |
| return output | |
| except Exception as e: | |
| st.warning(f"LLM intro generation failed or disabled: {e}") | |
| return type('O', (), {'intro': "Hello, I'm Interviewer.AI. Please introduce yourself.", 'question': "Can you briefly introduce yourself?"})() | |
| def ask_followup(user_intro, model): | |
| """Asks a followup to the user's intro.""" | |
| if PromptTemplate is None or model is None or not st.session_state.get('enable_llm', False): | |
| return "Thanks — could you tell me one achievement you're most proud of?" | |
| intro_followup = PromptTemplate(template="""The user has given the following introduction of himself/herself. Ask a followup about his intro to make the user comfortable. Intro given by the user: {intro}""", | |
| input_variables=['intro']) | |
| try: | |
| if not st.session_state.get('enable_llm', False): | |
| raise RuntimeError('LLM disabled') | |
| followup_chain = intro_followup | model | |
| output = followup_chain.invoke({'intro': user_intro}) | |
| return getattr(output, 'followup', None) | |
| except Exception as e: | |
| st.warning(f"LLM followup generation failed or disabled: {e}") | |
| return "Could you tell me about a specific result from that experience?" | |
| def evaluate_answer(question, answer, model): | |
| """Evaluates the user's answer.""" | |
| if PromptTemplate is None or model is None or not st.session_state.get('enable_llm', False): | |
| # Simple heuristic evaluator | |
| score = 50 | |
| review = "Thank you for your answer. Provide more details next time." | |
| followup = None | |
| if answer and len(answer.split()) > 50: | |
| score = 80 | |
| review = "Good answer — you covered several points." | |
| elif answer and len(answer.split()) > 20: | |
| score = 65 | |
| review = "Decent answer; add more concrete examples." | |
| return type('O', (), {'marks': score, 'review': review, 'followup': followup})() | |
| evaluate_answer_prompt = PromptTemplate(template="""You are given a question and an answer. Evaluate the answer honestly on the question out of 100. | |
| Also generate a very short review on the answer telling the candidate about his answer. If he is wrong but close to the correct answer, give subtle hints. | |
| If a good followup question can be asked generate it but only if it is a genuine question.\nQuestion: {question}\n\n Answer: {answer}""", | |
| input_variables=['question', 'answer']) | |
| try: | |
| if not st.session_state.get('enable_llm', False): | |
| raise RuntimeError('LLM disabled') | |
| evaluate_chain = evaluate_answer_prompt | model | |
| output = evaluate_chain.invoke({'question': question, 'answer': answer}) | |
| return output | |
| except Exception as e: | |
| st.warning(f"LLM evaluation failed or disabled: {e}") | |
| score = 50 | |
| review = "Thank you for your answer. Provide more details next time." | |
| followup = None | |
| if answer and len(answer.split()) > 50: | |
| score = 80 | |
| elif answer and len(answer.split()) > 20: | |
| score = 65 | |
| return type('O', (), {'marks': score, 'review': review, 'followup': followup})() | |
| # --- MODIFIED Streamlit Audio/Visual Function --- | |
| def text_to_speech_and_display(text, autoplay=True): | |
| """ | |
| MODIFIED: This function no longer plays audio. | |
| It just displays the text in the chat history. | |
| """ | |
| if not text: | |
| return | |
| try: | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| st.session_state.chat_history.append(f"**Interviewer:** {text}") | |
| except Exception as e: | |
| st.error(f"Error in text_to_speech_and_display: {e}") | |
| # --- DELETED speech_to_text function --- | |
| # We are replacing it with a text_input | |
| # --- Main Streamlit App --- | |
| st.set_page_config(page_title="AI Interviewer", layout="wide") | |
| st.title("Interviewer.AI") | |
| # Initialize LLM and models | |
| llm = None | |
| gen_q_model = None | |
| intro_model = None | |
| eval_model = None | |
| # First, load the key from the environment variable if genai is available | |
| if genai is None or ChatGoogleGenerativeAI is None: | |
| st.warning("Google GenAI or LangChain wrappers not available. App will use deterministic fallbacks.") | |
| if 'enable_llm' not in st.session_state: | |
| st.session_state.enable_llm = False | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| api_key_exists = bool(GOOGLE_API_KEY) | |
| if not api_key_exists: | |
| st.warning("⚠️ GOOGLE_API_KEY not found in environment variables.") | |
| st.info("Add GOOGLE_API_KEY to your Hugging Face Space secrets to enable AI features.") | |
| # LLM Enable Checkbox | |
| enable_llm_checkbox = st.checkbox( | |
| "Enable LLM features (requires GOOGLE_API_KEY)", | |
| value=st.session_state.enable_llm, | |
| disabled=not api_key_exists, | |
| help="AI-powered question generation and evaluation" | |
| ) | |
| st.session_state.enable_llm = enable_llm_checkbox | |
| # Initialize LLM if enabled | |
| if st.session_state.enable_llm and api_key_exists: | |
| try: | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| llm = get_llm(GOOGLE_API_KEY) | |
| gen_q_model, intro_model, eval_model = get_models(llm) | |
| st.success("✅ LLM features enabled successfully") | |
| except Exception as e: | |
| st.error(f"❌ Could not initialize LLM: {e}") | |
| st.info("Check your API key and try again.") | |
| st.session_state.enable_llm = False | |
| llm = None | |
| gen_q_model = None | |
| intro_model = None | |
| eval_model = None | |
| # Test API Button (AFTER initialization) | |
| if st.button("Test Google API Connection"): | |
| if not st.session_state.enable_llm: | |
| st.error("❌ LLM features are not enabled. Check the checkbox above first.") | |
| elif llm is None: | |
| st.error("❌ LLM is not initialized. Check API key configuration.") | |
| else: | |
| try: | |
| with st.spinner("Testing API connection..."): | |
| test_response = llm.invoke("Say 'Hello' if you can hear me.") | |
| st.success("✅ SUCCESS! API is working correctly.") | |
| st.info(f"Response: {test_response.content if hasattr(test_response, 'content') else str(test_response)}") | |
| except Exception as e: | |
| st.error(f"❌ API call FAILED with error: {e}") | |
| st.info("This usually means: invalid API key, quota exceeded, or network issues.") | |
| st.divider() | |
| # --- Session State Initialization --- | |
| if 'stage' not in st.session_state: | |
| st.session_state.stage = 'start' | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if 'questions' not in st.session_state: | |
| st.session_state.questions = [] | |
| if 'q_index' not in st.session_state: | |
| st.session_state.q_index = 0 | |
| if 'current_question' not in st.session_state: | |
| st.session_state.current_question = "" | |
| if 'total_marks' not in st.session_state: | |
| st.session_state.total_marks = 0 | |
| if 'num_questions' not in st.session_state: | |
| st.session_state.num_questions = 0 | |
| # --- App Logic (State Machine) --- | |
| # --- STAGE 0: Start (File Upload) --- | |
| if st.session_state.stage == 'start': | |
| st.info("Welcome! Please upload your resume (PDF) to begin the interview.") | |
| uploaded_file = st.file_uploader("Upload your Resume (PDF)", type=["pdf"]) | |
| if uploaded_file: | |
| with st.spinner("Analyzing your resume... This may take a moment."): | |
| try: | |
| resume_text = read_resume(uploaded_file) | |
| if not resume_text: | |
| st.error("Could not extract text from the resume. Please try another file.") | |
| st.session_state.stage = 'start' | |
| else: | |
| # 1. Generate Questions | |
| st.session_state.questions = generate_questions_from_resume(resume_text, gen_q_model) | |
| if not st.session_state.questions: | |
| st.warning("No AI-generated questions returned; using fallback questions.") | |
| st.session_state.questions = generate_questions_from_resume(resume_text, None) | |
| # 2. Get AI Introduction | |
| intro_output = get_introduction(intro_model) | |
| st.session_state.current_question = getattr(intro_output, 'question', "Can you introduce yourself?") | |
| # 3. Move to next stage and display intro | |
| st.session_state.stage = 'awaiting_intro' | |
| # --- MODIFIED: Display text directly --- | |
| text_to_speech_and_display(getattr(intro_output, 'intro', "Hello, I'm Interviewer.AI. Please introduce yourself.")) | |
| text_to_speech_and_display(getattr(intro_output, 'question', "Can you introduce yourself?")) | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"An error occurred while processing the resume. Using fallback behaviour. Error: {e}") | |
| fallback_qs = ["Tell me about your most significant project.", "Describe a challenging bug you fixed.", "How do you design for scalability?", "Which technologies are you most comfortable with?"] | |
| st.session_state.questions = fallback_qs | |
| st.session_state.stage = 'asking_question' | |
| st.rerun() | |
| # --- Main Interview Area (Stages > 0) --- | |
| if st.session_state.stage != 'start': | |
| # --- Chat History Display --- | |
| st.subheader("Interview Transcript") | |
| chat_container = st.container(height=400) # Added height for scrolling | |
| with chat_container: | |
| for entry in st.session_state.chat_history: | |
| st.markdown(entry) | |
| try: | |
| st.divider() | |
| except Exception: | |
| st.markdown('---') | |
| # --- End Interview Button --- | |
| if st.button("End Interview", type="primary"): | |
| st.session_state.stage = 'finished' | |
| st.rerun() | |
| # --- REPLACEMENT: Text Input Area --- | |
| user_text = None # Initialize user_text | |
| is_disabled = (st.session_state.stage == 'finished') | |
| with st.form(key="answer_form", clear_on_submit=True): | |
| answer = st.text_input("Your answer:", disabled=is_disabled) | |
| submit_button = st.form_submit_button(label="Submit Answer", disabled=is_disabled) | |
| if submit_button and answer: | |
| user_text = answer | |
| st.session_state.chat_history.append(f"**You:** {user_text}") | |
| # --- END OF REPLACEMENT --- | |
| # --- Process Submitted Text --- | |
| if user_text: | |
| # --- STAGE 1: Process User's Introduction --- | |
| if st.session_state.stage == 'awaiting_intro': | |
| with st.spinner("Thinking of a followup..."): | |
| followup = ask_followup(user_text, intro_model) | |
| st.session_state.current_question = followup | |
| text_to_speech_and_display(followup) # This now just displays text | |
| st.session_state.stage = 'awaiting_intro_followup' | |
| st.rerun() | |
| # --- STAGE 2: Process Followup to Intro --- | |
| elif st.session_state.stage == 'awaiting_intro_followup': | |
| text_to_speech_and_display("OK, Great. Let's start the interview with questions from your resume.") | |
| st.session_state.stage = 'asking_question' # Move to main questions | |
| st.rerun() | |
| # --- STAGE 4: Process Answer to a Main Question --- | |
| elif st.session_state.stage == 'awaiting_answer': | |
| with st.spinner("Evaluating your answer..."): | |
| question_asked = st.session_state.current_question | |
| output = evaluate_answer(question_asked, user_text, eval_model) | |
| st.session_state.total_marks += output.marks | |
| st.session_state.num_questions += 1 | |
| if output.review: | |
| text_to_speech_and_display(output.review) # This now just displays text | |
| if output.followup: | |
| st.session_state.current_question = output.followup | |
| text_to_speech_and_display(output.followup) # This now just displays text | |
| st.session_state.stage = 'awaiting_followup_answer' | |
| else: | |
| st.session_state.q_index += 1 | |
| st.session_state.stage = 'asking_question' | |
| st.rerun() | |
| # --- STAGE 5: Process Answer to a Followup Question --- | |
| elif st.session_state.stage == 'awaiting_followup_answer': | |
| with st.spinner("Evaluating your answer..."): | |
| question_asked = st.session_state.current_question | |
| output = evaluate_answer(question_asked, user_text, eval_model) | |
| st.session_state.total_marks += output.marks | |
| st.session_state.num_questions += 1 | |
| if output.review: | |
| text_to_speech_and_display(output.review) # This now just displays text | |
| st.session_state.q_index += 1 | |
| st.session_state.stage = 'asking_question' | |
| st.rerun() | |
| # --- STAGE 3: Ask a New Question --- | |
| # This runs when the page loads into this state, *before* user input | |
| if st.session_state.stage == 'asking_question': | |
| if st.session_state.q_index < len(st.session_state.questions): | |
| question = st.session_state.questions[st.session_state.q_index] | |
| st.session_state.current_question = question | |
| text_to_speech_and_display(question) # This now just displays text | |
| st.session_state.stage = 'awaiting_answer' | |
| else: | |
| text_to_speech_and_display("That's all the questions I have. Thank you!") | |
| st.session_state.stage = 'finished' | |
| st.rerun() | |
| # --- STAGE 6: Finished --- | |
| if st.session_state.stage == 'finished': | |
| st.balloons() | |
| st.success("Interview Complete!") | |
| final_score = 0 | |
| if st.session_state.num_questions > 0: | |
| final_score = st.session_state.total_marks / st.session_state.num_questions | |
| st.subheader("Final Report") | |
| st.markdown(f"**Total Questions Answered:** {st.session_state.num_questions}") | |
| st.markdown(f"**Average Score:** {final_score:.2f} / 100") | |
| # Transcript is already shown above, but we can show it again | |
| st.subheader("Full Transcript") | |
| for entry in st.session_state.chat_history: | |
| st.markdown(entry) | |
| if st.button("Start New Interview"): | |
| for key in st.session_state.keys(): | |
| del st.session_state[key] | |
| st.rerun() |