import streamlit as st try: from PyPDF2 import PdfReader except Exception: PdfReader = None # Optional AI SDKs try: import google.generativeai as genai except Exception: genai = None try: from langchain_google_genai import ChatGoogleGenerativeAI from langchain_core.prompts import PromptTemplate except Exception: ChatGoogleGenerativeAI = None PromptTemplate = None from pydantic import BaseModel, Field from typing import Optional import os try: from google.cloud import speech except Exception: speech = None try: from google.cloud import texttospeech except Exception: texttospeech = None try: from streamlit_mic_recorder import mic_recorder except Exception: mic_recorder = None # --- Pydantic Models (from your code) --- class questions(BaseModel): questions: list[str] = Field(description="List of questions") class introduction(BaseModel): intro: Optional[str] = Field(description="Give AI agent's intro") question: str = Field(description="Question asked by AI agent") followup: Optional[str] = Field(description="The followup question to user's answer") class evaluation(BaseModel): marks: int = Field(description="Marks out of 100") followup: Optional[str] = Field(description="The followup question") review: Optional[str] = Field(description="Short Review of the answer") # --- AI & Logic Functions (from your code) --- @st.cache_resource def get_llm(api_key): """Cached function to initialize the LLM.""" return ChatGoogleGenerativeAI( model="gemini-2.5-flash", temperature=1.0, google_api_key=api_key ) @st.cache_resource def get_models(_llm_model): """Cached function to get structured output models.""" generate_questions_resume_model = _llm_model.with_structured_output(questions) intro_model = _llm_model.with_structured_output(introduction) evaluate_answers_model = _llm_model.with_structured_output(evaluation) return generate_questions_resume_model, intro_model, evaluate_answers_model # def read_resume(uploaded_file): # """Reads a PDF file uploaded via Streamlit.""" # try: # if PdfReader is None: # st.warning("PyPDF2 is not installed; resume text extraction disabled.") # return None # reader = PdfReader(uploaded_file) # text = "" # for page in reader.pages: # text += page.extract_text() or "" # return text # except Exception as e: # st.error(f"Error reading PDF: {e}") # return None def generate_questions_from_resume(resume_text, model): """Generates interview questions from resume text.""" if PromptTemplate is None or model is None or not st.session_state.get('enable_llm', False): # Simple fallback questions = ["Tell me about your most significant project.", "Describe a challenging bug you fixed.", "How do you design for scalability?", "Which technologies are you most comfortable with?"] return questions parse_resume_prompt_template = PromptTemplate( template="""Generate 4-8 interview questions about the Experience and Projects section from this given text of from a resume. Try to cover all projects and experience. Generate some conceptual questions too. Don't generate unnecessary questions. Resume:\n{text}""", input_variables=['text'] ) try: if not st.session_state.get('enable_llm', False): raise RuntimeError('LLM disabled') generate_question_from_resume_chain = parse_resume_prompt_template | model output = generate_question_from_resume_chain.invoke({'text': resume_text}) return getattr(output, 'questions', output) except Exception as e: st.warning(f"LLM question generation failed or disabled, using fallback: {e}") questions = ["Tell me about your most significant project.", "Describe a challenging bug you fixed.", "How do you design for scalability?", "Which technologies are you most comfortable with?"] return questions def get_introduction(model): """Gets the AI's intro and first question.""" if PromptTemplate is None or model is None or not st.session_state.get('enable_llm', False): return type('O', (), {'intro': "Hello, I'm Interviewer.AI. Please introduce yourself.", 'question': "Can you briefly introduce yourself?"})() introduction_prompt = PromptTemplate(template="""Introduce yourself to the user telling the user that you are Heisenberg, an AI agent. And ask the user to give introduction""") try: if not st.session_state.get('enable_llm', False): raise RuntimeError('LLM disabled') intro_chain = introduction_prompt | model output = intro_chain.invoke({}) return output except Exception as e: st.warning(f"LLM intro generation failed or disabled: {e}") return type('O', (), {'intro': "Hello, I'm Interviewer.AI. Please introduce yourself.", 'question': "Can you briefly introduce yourself?"})() def ask_followup(user_intro, model): """Asks a followup to the user's intro.""" if PromptTemplate is None or model is None or not st.session_state.get('enable_llm', False): return "Thanks — could you tell me one achievement you're most proud of?" intro_followup = PromptTemplate(template="""The user has given the following introduction of himself/herself. Ask a followup about his intro to make the user comfortable. Intro given by the user: {intro}""", input_variables=['intro']) try: if not st.session_state.get('enable_llm', False): raise RuntimeError('LLM disabled') followup_chain = intro_followup | model output = followup_chain.invoke({'intro': user_intro}) return getattr(output, 'followup', None) except Exception as e: st.warning(f"LLM followup generation failed or disabled: {e}") return "Could you tell me about a specific result from that experience?" def evaluate_answer(question, answer, model): """Evaluates the user's answer.""" if PromptTemplate is None or model is None or not st.session_state.get('enable_llm', False): # Simple heuristic evaluator score = 50 review = "Thank you for your answer. Provide more details next time." followup = None if answer and len(answer.split()) > 50: score = 80 review = "Good answer — you covered several points." elif answer and len(answer.split()) > 20: score = 65 review = "Decent answer; add more concrete examples." return type('O', (), {'marks': score, 'review': review, 'followup': followup})() evaluate_answer_prompt = PromptTemplate(template="""You are given a question and an answer. Evaluate the answer honestly on the question out of 100. Also generate a very short review on the answer telling the candidate about his answer. If he is wrong but close to the correct answer, give subtle hints. If a good followup question can be asked generate it but only if it is a genuine question.\nQuestion: {question}\n\n Answer: {answer}""", input_variables=['question', 'answer']) try: if not st.session_state.get('enable_llm', False): raise RuntimeError('LLM disabled') evaluate_chain = evaluate_answer_prompt | model output = evaluate_chain.invoke({'question': question, 'answer': answer}) return output except Exception as e: st.warning(f"LLM evaluation failed or disabled: {e}") score = 50 review = "Thank you for your answer. Provide more details next time." followup = None if answer and len(answer.split()) > 50: score = 80 elif answer and len(answer.split()) > 20: score = 65 return type('O', (), {'marks': score, 'review': review, 'followup': followup})() # --- MODIFIED Streamlit Audio/Visual Function --- import io # Make sure 'import io' is at the top of your file @st.cache_data def speech_to_text(audio_bytes): """ Transcribes audio bytes using Google Cloud Speech-to-Text and returns the transcribed text. """ if speech is None: st.warning("google-cloud-speech library not found, transcription is disabled.") return None # Get the API key from the environment (where HF secrets put it) api_key = os.environ.get("GOOGLE_API_KEY") # Check if the key exists if not api_key: st.error("GOOGLE_API_KEY not found in secrets. Cannot initialize STT.") return None # Pass the key explicitly to the client client_options = {"api_key": api_key} client = speech.SpeechClient(client_options=client_options) # Configure the audio # Note: streamlit-mic-recorder outputs WAV, which is LINEAR16 audio = speech.RecognitionAudio(content=audio_bytes) config = speech.RecognitionConfig( encoding=speech.RecognitionConfig.AudioEncoding.ENCODING_UNSPECIFIED, language_code="en-US", sample_rate_hertz=48000 # This is a common sample rate ) try: # Detects speech in the audio file st.info("Transcribing audio... (this may take a moment)") response = client.recognize(config=config, audio=audio) if response.results: transcript = response.results[0].alternatives[0].transcript st.session_state.chat_history.append(f"**You:** {transcript}") return transcript else: st.warning("Could not understand audio.") return None except Exception as e: st.error(f"Error during speech-to-text: {e}") st.info("This usually means the 'Cloud Speech-to-Text API' is not enabled or your mic is not outputting the correct audio format.") return None # --- REPLACED: Official Google Cloud TTS Function --- @st.cache_data def synthesize_speech(text): """ Synthesizes speech from the given text using Google Cloud TTS and returns the audio content as bytes. """ if texttospeech is None: st.warning("google-cloud-texttospeech library not found, audio playback is disabled.") return None # --- START OF FIX --- # Get the API key from the environment (where HF secrets put it) api_key = os.environ.get("GOOGLE_API_KEY") # Check if the key exists if not api_key: st.error("GOOGLE_API_KEY not found in secrets. Cannot initialize TTS.") return None # Pass the key explicitly to the client client_options = {"api_key": api_key} client = texttospeech.TextToSpeechClient(client_options=client_options) # --- END OF FIX --- # Set the text input to be synthesized synthesis_input = texttospeech.SynthesisInput(text=text) # Build the voice request voice = texttospeech.VoiceSelectionParams( language_code="en-US", ssml_gender=texttospeech.SsmlVoiceGender.NEUTRAL ) # Select the type of audio file you want audio_config = texttospeech.AudioConfig( audio_encoding=texttospeech.AudioEncoding.MP3 ) # Perform the text-to-speech request response = client.synthesize_speech( input=synthesis_input, voice=voice, audio_config=audio_config ) return response.audio_content def text_to_speech_and_display(text, autoplay=True): """ Displays the text and plays the synthesized audio. """ if not text: return try: # 1. Display the caption in chat if 'chat_history' not in st.session_state: st.session_state.chat_history = [] st.session_state.chat_history.append(f"**Interviewer:** {text}") # 2. Synthesize speech if not st.session_state.get('audio_enabled', False): return audio_content = synthesize_speech(text) # 3. Display audio player if audio_content: st.audio(audio_content, format='audio/mp3', autoplay=autoplay) else: st.info("Audio generation is disabled or failed.") except Exception as e: # This will catch any API errors (like 403, 404, etc.) st.error(f"Error during text-to-speech: {e}") st.info("This usually means the 'Cloud Text-to-Speech API' is not enabled in your Google Cloud project.") # --- END OF REPLACEMENT --- # We are replacing it with a text_input # --- Main Streamlit App --- st.set_page_config(page_title="AI Interviewer", layout="wide") st.title("Interviewer.AI") # Initialize LLM and models llm = None gen_q_model = None intro_model = None eval_model = None # First, load the key from the environment variable if genai is available if genai is None or ChatGoogleGenerativeAI is None: st.warning("Google GenAI or LangChain wrappers not available. App will use deterministic fallbacks.") if 'enable_llm' not in st.session_state: st.session_state.enable_llm = False GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") api_key_exists = bool(GOOGLE_API_KEY) if not api_key_exists: st.warning("⚠️ GOOGLE_API_KEY not found in environment variables.") st.info("Add GOOGLE_API_KEY to your Hugging Face Space secrets to enable AI features.") # LLM Enable Checkbox enable_llm_checkbox = st.checkbox( "Enable LLM features (Not checking this box will result in using default hard-coded questions)", value=st.session_state.enable_llm, disabled=not api_key_exists, help="AI-powered question generation and evaluation" ) st.session_state.enable_llm = enable_llm_checkbox # Initialize LLM if enabled if st.session_state.enable_llm and api_key_exists: try: genai.configure(api_key=GOOGLE_API_KEY) llm = get_llm(GOOGLE_API_KEY) gen_q_model, intro_model, eval_model = get_models(llm) st.success("✅ LLM features enabled successfully") except Exception as e: st.error(f"❌ Could not initialize LLM: {e}") st.info("Check your API key and try again.") st.session_state.enable_llm = False llm = None gen_q_model = None intro_model = None eval_model = None # Test API Button (AFTER initialization) if st.button("Test Google API Connection"): if not st.session_state.enable_llm: st.error("❌ LLM features are not enabled. Check the checkbox above first.") elif llm is None: st.error("❌ LLM is not initialized. Check API key configuration.") else: try: with st.spinner("Testing API connection..."): test_response = llm.invoke("Say 'Hello' if you can hear me.") st.success("✅ SUCCESS! API is working correctly.") st.info(f"Response: {test_response.content if hasattr(test_response, 'content') else str(test_response)}") except Exception as e: st.error(f"❌ API call FAILED with error: {e}") st.info("This usually means: invalid API key, quota exceeded, or network issues.") st.divider() # --- Session State Initialization --- if 'stage' not in st.session_state: st.session_state.stage = 'start' if 'audio_enabled' not in st.session_state: st.session_state.audio_enabled = False if 'chat_history' not in st.session_state: st.session_state.chat_history = [] if 'questions' not in st.session_state: st.session_state.questions = [] if 'q_index' not in st.session_state: st.session_state.q_index = 0 if 'current_question' not in st.session_state: st.session_state.current_question = "" if 'total_marks' not in st.session_state: st.session_state.total_marks = 0 if 'num_questions' not in st.session_state: st.session_state.num_questions = 0 # --- App Logic (State Machine) --- # --- STAGE 0: Start (File Upload) --- if st.session_state.stage == 'start': st.info("Welcome! Please paste your resume text below to begin.") st.toggle( "Enable Audio Mode (AI Voice & Microphone)", key='audio_enabled', help="If ON, the AI will speak and you can answer with your voice. If OFF, it's text-only and you have to type your answer." ) with st.form(key="resume_form"): resume_text_input = st.text_area("Paste your resume text here:", height=300) submit_button = st.form_submit_button("Start Interview") if submit_button and resume_text_input: if not resume_text_input.strip(): st.error("Please paste your resume text.") else: # Save text to session state and move to the processing stage st.session_state.resume_text = resume_text_input st.session_state.stage = 'processing_resume' st.rerun() # --- NEW STAGE 0.5: Process Resume (runs *after* file upload) --- elif st.session_state.stage == 'processing_resume': with st.spinner("Analyzing your resume... This may take a moment."): try: resume_text = st.session_state.resume_text st.session_state.questions = generate_questions_from_resume(resume_text, gen_q_model) # 2. Get DUMMY AI Introduction intro_output = get_introduction(intro_model) st.session_state.current_question = intro_output.question intr_and_ques = f"{intro_output.intro}...{intro_output.question}" # 3. Move to next stage and display intro st.session_state.stage = 'awaiting_intro' text_to_speech_and_display(intr_and_ques) # text_to_speech_and_display(intro_output.question) # Clean up the resume text from session state if 'resume_text' in st.session_state: del st.session_state.resume_text st.rerun() # --- END: TEMPORARY TEST CODE --- except Exception as e: st.error(f"An error occurred during AI processing: {e}") st.session_state.stage = 'start' # --- Main Interview Area (Stages > 0) --- if st.session_state.stage not in ['start', 'processing_resume']: # --- Chat History Display --- st.subheader("Interview Transcript") chat_container = st.container(height=400) # Added height for scrolling with chat_container: for entry in reversed(st.session_state.chat_history): st.markdown(entry) try: st.divider() except Exception: st.markdown('---') # --- End Interview Button --- if st.button("End Interview", type="primary"): st.session_state.stage = 'finished' st.rerun() # --- REPLACEMENT: Text Input Area --- user_text = None # Initialize user_text is_disabled = (st.session_state.stage == 'finished') if mic_recorder is None: st.error("streamlit_mic_recorder library failed to import. Voice input is disabled.") st.info("Please add 'streamlit-mic-recorder' to your requirements.txt") elif is_disabled: st.info("Interview is finished. Start a new interview to speak.") else: if st.session_state.audio_enabled: st.write("Your turn to speak:") audio_bytes_dict = mic_recorder( start_prompt="Start Recording ⏺️", stop_prompt="Stop Recording ⏹️", key='recorder' ) if audio_bytes_dict: # The component returns a dictionary, get the bytes audio_bytes = audio_bytes_dict['bytes'] with st.spinner("Transcribing your answer..."): # Use our NEW Google Cloud STT function user_text = speech_to_text(audio_bytes) else: # --- TEXT-ONLY MODE (Text Input) --- with st.form(key="answer_form", clear_on_submit=True): answer = st.text_input("Your answer:", disabled=is_disabled) submit_button = st.form_submit_button(label="Submit Answer", disabled=is_disabled) if submit_button and answer: user_text = answer st.session_state.chat_history.append(f"**You:** {user_text}") # --- END OF REPLACEMENT --- # --- Process Submitted Text --- if user_text: # --- STAGE 1: Process User's Introduction --- if st.session_state.stage == 'awaiting_intro': with st.spinner("Thinking of a followup..."): followup = ask_followup(user_text, intro_model) st.session_state.current_question = followup text_to_speech_and_display(followup) # This now just displays text st.session_state.stage = 'awaiting_intro_followup' # st.rerun() # --- STAGE 2: Process Followup to Intro --- elif st.session_state.stage == 'awaiting_intro_followup': text_to_speech_and_display("OK, Great. Let's start the interview with questions from your resume.") st.session_state.stage = 'asking_question' # Move to main questions # st.rerun() # --- STAGE 4: Process Answer to a Main Question --- elif st.session_state.stage == 'awaiting_answer': with st.spinner("Evaluating your answer..."): question_asked = st.session_state.current_question # text_to_speech_and_display(question_asked) output = evaluate_answer(question_asked, user_text, eval_model) st.session_state.total_marks += output.marks st.session_state.num_questions += 1 if output.review: text_to_speech_and_display(output.review) # This now just displays text if output.followup: st.session_state.current_question = output.followup text_to_speech_and_display(output.followup) # This now just displays text st.session_state.stage = 'awaiting_followup_answer' else: st.session_state.q_index += 1 st.session_state.stage = 'asking_question' # st.rerun() # --- STAGE 5: Process Answer to a Followup Question --- elif st.session_state.stage == 'awaiting_followup_answer': with st.spinner("Evaluating your answer..."): question_asked = st.session_state.current_question output = evaluate_answer(question_asked, user_text, eval_model) st.session_state.total_marks += output.marks st.session_state.num_questions += 1 if output.review: text_to_speech_and_display(output.review) # This now just displays text st.session_state.q_index += 1 st.session_state.stage = 'asking_question' # st.rerun() # --- STAGE 3: Ask a New Question --- # This runs when the page loads into this state, *before* user input if st.session_state.stage == 'asking_question': if st.session_state.q_index < len(st.session_state.questions): question = st.session_state.questions[st.session_state.q_index] st.session_state.current_question = question text_to_speech_and_display(question) # This now just displays text st.session_state.stage = 'awaiting_answer' else: text_to_speech_and_display("That's all the questions I have. Thank you!") st.session_state.stage = 'finished' st.rerun() # --- STAGE 6: Finished --- if st.session_state.stage == 'finished': st.balloons() st.success("Interview Complete!") final_score = 0 if st.session_state.num_questions > 0: final_score = st.session_state.total_marks / st.session_state.num_questions st.subheader("Final Report") st.markdown(f"**Total Questions Answered:** {st.session_state.num_questions}") st.markdown(f"**Average Score:** {final_score:.2f} / 100") # Transcript is already shown above, but we can show it again st.subheader("Full Transcript") for entry in st.session_state.chat_history: st.markdown(entry) if st.button("Start New Interview"): for key in st.session_state.keys(): del st.session_state[key] st.rerun()