| import requests |
| import os |
| import json |
| from langchain_groq import ChatGroq |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
| from langchain_community.vectorstores import Qdrant |
| from langchain.prompts import PromptTemplate |
| from langchain.chains import LLMChain |
| from langchain.retrievers import ContextualCompressionRetriever |
| from langchain.retrievers.document_compressors import CohereRerank |
| from qdrant_client import QdrantClient |
| import cohere |
| import json |
| import re |
| import time |
| from collections import defaultdict |
|
|
|
|
| from qdrant_client.http import models |
| from qdrant_client.models import PointStruct |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.neighbors import NearestNeighbors |
| from transformers import AutoTokenizer |
| |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
| import numpy as np |
| import os |
| from dotenv import load_dotenv |
| from enum import Enum |
| import time |
| from inputimeout import inputimeout, TimeoutOccurred |
|
|
|
|
| |
| from qdrant_client import QdrantClient |
| from qdrant_client.http.models import VectorParams, Distance, Filter, FieldCondition, MatchValue |
| from qdrant_client.http.models import PointStruct, Filter, FieldCondition, MatchValue, SearchRequest |
| import traceback |
| from transformers import pipeline |
|
|
| from textwrap import dedent |
| import json |
| import logging |
|
|
| from transformers import pipeline,BitsAndBytesConfig |
|
|
|
|
| import os |
|
|
| cohere_api_key = os.getenv("COHERE_API_KEY") |
| chat_groq_api = os.getenv("GROQ_API_KEY") |
| hf_api_key = os.getenv("HF_API_KEY") |
| qdrant_api = os.getenv("QDRANT_API_KEY") |
| qdrant_url = os.getenv("QDRANT_API_URL") |
|
|
| print("GROQ API Key:", chat_groq_api) |
| print("QDRANT API Key:", qdrant_api) |
| print("QDRANT API URL:", qdrant_url) |
| print("Cohere API Key:", cohere_api_key) |
|
|
|
|
| from qdrant_client import QdrantClient |
|
|
| qdrant_client = QdrantClient( |
| url="https://313b1ceb-057f-4b7b-89f5-7b19a213fe65.us-east-1-0.aws.cloud.qdrant.io:6333", |
| api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.w13SPZbljbSvt9Ch_0r034QhMFlmEr4ctXqLo2zhxm4", |
| ) |
|
|
| print(qdrant_client.get_collections()) |
|
|
| class CustomChatGroq: |
| def __init__(self, temperature, model_name, api_key): |
| self.temperature = temperature |
| self.model_name = model_name |
| self.api_key = api_key |
| self.api_url = "https://api.groq.com/openai/v1/chat/completions" |
|
|
| def predict(self, prompt): |
| """Send a request to the Groq API and return the generated response.""" |
| try: |
| headers = { |
| "Authorization": f"Bearer {self.api_key}", |
| "Content-Type": "application/json" |
| } |
|
|
| payload = { |
| "model": self.model_name, |
| "messages": [{"role": "system", "content": "You are an AI interviewer."}, |
| {"role": "user", "content": prompt}], |
| "temperature": self.temperature, |
| "max_tokens": 150 |
| } |
|
|
| response = requests.post(self.api_url, headers=headers, json=payload, timeout=10) |
| response.raise_for_status() |
|
|
| data = response.json() |
|
|
| |
| if "choices" in data and len(data["choices"]) > 0: |
| return data["choices"][0]["message"]["content"].strip() |
|
|
| logging.warning("Unexpected response structure from Groq API") |
| return "Interviewer: Could you tell me more about your relevant experience?" |
|
|
| except requests.exceptions.RequestException as e: |
| logging.error(f"ChatGroq API error: {e}") |
| return "Interviewer: Due to a system issue, let's move on to another question." |
|
|
| groq_llm = ChatGroq( |
| temperature=0.7, |
| model_name="llama-3.3-70b-versatile", |
| api_key=chat_groq_api |
| ) |
|
|
| from huggingface_hub import login |
| import os |
|
|
| HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
| if HF_TOKEN: |
| login(HF_TOKEN) |
| else: |
| raise EnvironmentError("Missing HF_TOKEN environment variable.") |
|
|
| |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
| import torch |
| print(torch.cuda.is_available()) |
|
|
| MODEL_PATH = "mistralai/Mistral-7B-Instruct-v0.3" |
| |
|
|
| bnb_config = BitsAndBytesConfig( |
| load_in_8bit=True, |
| ) |
|
|
| mistral_tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH,token=hf_api_key) |
|
|
| judge_llm = AutoModelForCausalLM.from_pretrained( |
| MODEL_PATH, |
| quantization_config=bnb_config,torch_dtype=torch.float16, |
| device_map="auto", |
| token=hf_api_key |
| ) |
| judge_llm.config.pad_token_id = judge_llm.config.eos_token_id |
|
|
|
|
| print(judge_llm.hf_device_map) |
|
|
| judge_pipeline = pipeline( |
| "text-generation", |
| model=judge_llm, |
| tokenizer=mistral_tokenizer, |
| max_new_tokens=128, |
| temperature=0.3, |
| top_p=0.9, |
| do_sample=True, |
| repetition_penalty=1.1, |
| ) |
|
|
|
|
| output = judge_pipeline("Q: What is Python?\nA:", max_new_tokens=128)[0]['generated_text'] |
| print(output) |
|
|
|
|
|
|
| |
| from sentence_transformers import SentenceTransformer |
|
|
| class LocalEmbeddings: |
| def __init__(self, model_name="all-MiniLM-L6-v2"): |
| self.model = SentenceTransformer(model_name) |
|
|
| def embed_query(self, text): |
| return self.model.encode(text).tolist() |
|
|
| def embed_documents(self, documents): |
| return self.model.encode(documents).tolist() |
|
|
|
|
| embeddings = LocalEmbeddings() |
|
|
| |
| qdrant_client = QdrantClient(url=qdrant_url, api_key=qdrant_api,check_compatibility=False) |
| co = cohere.Client(api_key=cohere_api_key) |
|
|
| class EvaluationScore(str, Enum): |
| POOR = "Poor" |
| MEDIUM = "Medium" |
| GOOD = "Good" |
| EXCELLENT = "Excellent" |
|
|
| |
| class CohereReranker: |
| def __init__(self, client): |
| self.client = client |
|
|
| def compress_documents(self, documents, query): |
| if not documents: |
| return [] |
| doc_texts = [doc.page_content for doc in documents] |
| try: |
| reranked = self.client.rerank( |
| query=query, |
| documents=doc_texts, |
| model="rerank-english-v2.0", |
| top_n=5 |
| ) |
| return [documents[result.index] for result in reranked.results] |
| except Exception as e: |
| logging.error(f"Error in CohereReranker.compress_documents: {e}") |
| return documents[:5] |
|
|
| reranker = CohereReranker(co) |
|
|
| def load_data_from_json(file_path): |
| """Load interview Q&A data from a JSON file.""" |
| try: |
| with open(file_path, "r", encoding="utf-8") as f: |
| data = json.load(f) |
| job_role_buckets = defaultdict(list) |
| for idx, item in enumerate(data): |
| try: |
| job_role = item["Job Role"].lower().strip() |
| question = item["Questions"].strip() |
| answer = item["Answers"].strip() |
| job_role_buckets[job_role].append({"question": question, "answer": answer}) |
| except KeyError as e: |
| logging.warning(f"Skipping item {idx}: missing key {e}") |
| return job_role_buckets |
| except Exception as e: |
| logging.error(f"Error loading data: {e}") |
| raise |
|
|
|
|
| def verify_qdrant_collection(collection_name='interview_questions'): |
| """Verify if a Qdrant collection exists with the correct configuration.""" |
| try: |
| collection_info = qdrant_client.get_collection(collection_name) |
| vector_size = collection_info.config.params.vectors.size |
| logging.info(f"Collection '{collection_name}' exists with vector size: {vector_size}") |
| return True |
| except Exception as e: |
| logging.warning(f"Collection '{collection_name}' not found: {e}") |
| return False |
|
|
|
|
|
|
|
|
| def store_data_to_qdrant(data, collection_name='interview_questions', batch_size=100): |
| """Store interview data in the Qdrant vector database.""" |
| try: |
| |
| if not verify_qdrant_collection(collection_name): |
| try: |
| qdrant_client.create_collection( |
| collection_name=collection_name, |
| vectors_config=VectorParams(size=384, distance=Distance.COSINE) |
| ) |
| logging.info(f"Created collection '{collection_name}'") |
| except Exception as e: |
| logging.error(f"Error creating collection: {e}\n{traceback.format_exc()}") |
| return False |
|
|
| points = [] |
| point_id = 0 |
| total_points = sum(len(qa_list) for qa_list in data.values()) |
| processed = 0 |
|
|
| for job_role, qa_list in data.items(): |
| for entry in qa_list: |
| try: |
| emb = embeddings.embed_query(entry["question"]) |
| print(f"Embedding shape: {len(emb)}") |
|
|
| if not emb or len(emb) != 384: |
| logging.warning(f"Skipping point {point_id} due to invalid embedding length: {len(emb)}") |
| continue |
|
|
| points.append(PointStruct( |
| id=point_id, |
| vector=emb, |
| payload={ |
| "job_role": job_role, |
| "question": entry["question"], |
| "answer": entry["answer"] |
| } |
| )) |
| point_id += 1 |
| processed += 1 |
|
|
| |
| if len(points) >= batch_size: |
| try: |
| qdrant_client.upsert(collection_name=collection_name, points=points) |
| logging.info(f"Stored {processed}/{total_points} points ({processed/total_points*100:.1f}%)") |
| except Exception as upsert_err: |
| logging.error(f"Error during upsert: {upsert_err}\n{traceback.format_exc()}") |
| points = [] |
|
|
| except Exception as embed_err: |
| logging.error(f"Embedding error for point {point_id}: {embed_err}\n{traceback.format_exc()}") |
|
|
| |
| if points: |
| try: |
| qdrant_client.upsert(collection_name=collection_name, points=points) |
| logging.info(f"Stored final batch of {len(points)} points") |
| except Exception as final_upsert_err: |
| logging.error(f"Error during final upsert: {final_upsert_err}\n{traceback.format_exc()}") |
|
|
| |
| try: |
| count = qdrant_client.count(collection_name=collection_name, exact=True).count |
| print("Current count:", count) |
| logging.info(f"✅ Successfully stored {count} points in Qdrant") |
| if count != total_points: |
| logging.warning(f"Expected {total_points} points but stored {count}") |
| except Exception as count_err: |
| logging.error(f"Error verifying stored points: {count_err}\n{traceback.format_exc()}") |
|
|
| return True |
|
|
| except Exception as e: |
| logging.error(f"Error storing data to Qdrant: {e}\n{traceback.format_exc()}") |
| return False |
|
|
| |
| info = qdrant_client.get_collection("interview_questions") |
| print(info.config.params.vectors.distance) |
|
|
| def extract_all_roles_from_qdrant(collection_name='interview_questions'): |
| """ Extract all unique job roles from the Qdrant vector store """ |
| try: |
| all_roles = set() |
| scroll_offset = None |
|
|
| while True: |
| response = qdrant_client.scroll( |
| collection_name=collection_name, |
| limit=200, |
| offset=scroll_offset, |
| with_payload=True |
| ) |
| points, next_page_offset = response |
|
|
| if not points: |
| break |
|
|
| for point in points: |
| role = point.payload.get("job_role", "").strip().lower() |
| if role: |
| all_roles.add(role) |
|
|
| if not next_page_offset: |
| break |
|
|
| scroll_offset = next_page_offset |
|
|
| if not all_roles: |
| logging.warning("[Qdrant] No roles found in payloads.") |
| else: |
| logging.info(f"[Qdrant] Extracted {len(all_roles)} unique job roles.") |
|
|
| return list(all_roles) |
| except Exception as e: |
| logging.error(f"Error extracting roles from Qdrant: {e}") |
| return [] |
|
|
| import numpy as np |
| import logging |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| def find_similar_roles(user_role, all_roles, top_k=3): |
| """ |
| Find the most similar job roles to the given user_role using embeddings. |
| """ |
| try: |
| |
| user_role = user_role.strip().lower() |
| if not user_role or not all_roles or not isinstance(all_roles, list): |
| logging.warning("Invalid input for role similarity") |
| return [] |
|
|
| |
| try: |
| user_embedding = embeddings.embed_query(user_role) |
| if user_embedding is None: |
| logging.error("User embedding is None") |
| return [] |
| except Exception as e: |
| logging.error(f"Error embedding user role: {type(e).__name__}: {e}") |
| return [] |
|
|
| |
| try: |
| role_embeddings = [] |
| valid_roles = [] |
| for role in all_roles: |
| emb = embeddings.embed_query(role.lower()) |
| if emb is not None: |
| role_embeddings.append(emb) |
| valid_roles.append(role) |
| else: |
| logging.warning(f"Skipping role with no embedding: {role}") |
| except Exception as e: |
| logging.error(f"Error embedding all roles: {type(e).__name__}: {e}") |
| return [] |
|
|
| if not role_embeddings: |
| logging.error("All role embeddings failed") |
| return [] |
|
|
| |
| similarities = cosine_similarity([user_embedding], role_embeddings)[0] |
| top_indices = np.argsort(similarities)[::-1][:top_k] |
|
|
| similar_roles = [valid_roles[i] for i in top_indices] |
| logging.debug(f"Similar roles to '{user_role}': {similar_roles}") |
| return similar_roles |
|
|
| except Exception as e: |
| logging.error(f"Error finding similar roles: {type(e).__name__}: {e}", exc_info=True) |
| return [] |
|
|
| |
| def get_role_questions(job_role): |
| try: |
| if not job_role: |
| logging.warning("Job role is empty.") |
| return [] |
|
|
| filter_by_role = Filter( |
| must=[FieldCondition( |
| key="job_role", |
| match=MatchValue(value=job_role.lower()) |
| )] |
| ) |
|
|
| all_results = [] |
| offset = None |
| while True: |
| results, next_page_offset = qdrant_client.scroll( |
| collection_name="interview_questions", |
| scroll_filter=filter_by_role, |
| with_payload=True, |
| with_vectors=False, |
| limit=100, |
| offset=offset |
| ) |
| all_results.extend(results) |
|
|
| if not next_page_offset: |
| break |
| offset = next_page_offset |
|
|
| parsed_results = [{ |
| "question": r.payload.get("question"), |
| "answer": r.payload.get("answer"), |
| "job_role": r.payload.get("job_role") |
| } for r in all_results] |
|
|
| return parsed_results |
|
|
| except Exception as e: |
| logging.error(f"Error fetching role questions: {type(e).__name__}: {e}", exc_info=True) |
| return [] |
|
|
| def retrieve_interview_data(job_role, all_roles): |
| """ |
| Retrieve all interview Q&A for a given job role. |
| Falls back to similar roles if no data found. |
| Args: |
| job_role (str): Input job role (can be misspelled) |
| all_roles (list): Full list of available job roles |
| Returns: |
| list: List of QA dicts with keys: 'question', 'answer', 'job_role' |
| """ |
| import logging |
| logging.basicConfig(level=logging.INFO) |
|
|
| job_role = job_role.strip().lower() |
| seen_questions = set() |
| final_results = [] |
|
|
| |
| logging.info(f"Trying to fetch all data for exact role: '{job_role}'") |
| exact_matches = get_role_questions(job_role) |
|
|
| for qa in exact_matches: |
| question = qa["question"] |
| if question and question not in seen_questions: |
| seen_questions.add(question) |
| final_results.append(qa) |
|
|
| if final_results: |
| logging.info(f"Found {len(final_results)} QA pairs for exact role '{job_role}'") |
| return final_results |
|
|
| logging.warning(f"No data found for role '{job_role}'. Trying similar roles...") |
|
|
| |
| similar_roles = find_similar_roles(job_role, all_roles, top_k=3) |
|
|
| if not similar_roles: |
| logging.warning("No similar roles found.") |
| return [] |
|
|
| logging.info(f"Found similar roles: {similar_roles}") |
|
|
| |
| for role in similar_roles: |
| logging.info(f"Fetching data for similar role: '{role}'") |
| role_qa = get_role_questions(role) |
|
|
| for qa in role_qa: |
| question = qa["question"] |
| if question and question not in seen_questions: |
| seen_questions.add(question) |
| final_results.append(qa) |
|
|
| logging.info(f"Retrieved total {len(final_results)} QA pairs from similar roles") |
| return final_results |
|
|
| import random |
|
|
| def random_context_chunks(retrieved_data, k=3): |
| chunks = random.sample(retrieved_data, k) |
| return "\n\n".join([f"Q: {item['question']}\nA: {item['answer']}" for item in chunks]) |
|
|
| import json |
| import logging |
| import re |
| from typing import Dict |
|
|
| def eval_question_quality( |
| question: str, |
| job_role: str, |
| seniority: str |
| ) -> Dict[str, str]: |
| """ |
| Evaluate the quality of a generated interview question using Groq LLM. |
| Returns a structured JSON with score, reasoning, and suggestions. |
| """ |
| import time, json |
|
|
| prompt = f""" |
| You are a senior AI hiring expert evaluating the quality of an interview question for a {seniority} {job_role} role. |
| |
| Evaluate the question based on: |
| - Relevance to the role and level |
| - Clarity and conciseness |
| - Depth of technical insight |
| |
| --- |
| Question: {question} |
| --- |
| |
| Respond only with a valid JSON like: |
| {{ |
| "Score": "Poor" | "Medium" | "Good" | "Excellent", |
| "Reasoning": "short justification", |
| "Improvements": ["tip1", "tip2"] |
| }} |
| """ |
|
|
| try: |
| start = time.time() |
| response = groq_llm.invoke(prompt) |
| print("⏱️ eval_question_quality duration:", round(time.time() - start, 2), "s") |
|
|
| |
| start_idx = response.rfind("{") |
| end_idx = response.rfind("}") + 1 |
| json_str = response[start_idx:end_idx] |
| result = json.loads(json_str) |
|
|
| if result.get("Score") in {"Poor", "Medium", "Good", "Excellent"}: |
| return result |
| else: |
| raise ValueError("Invalid Score value in model output") |
|
|
| except Exception as e: |
| print(f"⚠️ eval_question_quality fallback: {e}") |
| return { |
| "Score": "Poor", |
| "Reasoning": "Evaluation failed, using fallback.", |
| "Improvements": [ |
| "Ensure the question is relevant and clear.", |
| "Avoid vague or overly generic phrasing.", |
| "Include role-specific context if needed." |
| ] |
| } |
| |
| def evaluate_answer( |
| question: str, |
| answer: str, |
| ref_answer: str, |
| job_role: str, |
| seniority: str, |
| ) -> Dict[str, str]: |
| """ |
| Fast and structured answer evaluation using Groq LLM (e.g. Mixtral or LLaMA 3). |
| """ |
| import time, json |
| from langchain_core.messages import AIMessage |
|
|
| prompt = f""" |
| You are a technical interviewer evaluating a candidate for a {seniority} {job_role} role. |
| |
| Evaluate the response based on: |
| - Technical correctness |
| - Clarity |
| - Relevance |
| - Structure |
| |
| --- |
| Question: {question} |
| Candidate Answer: {answer} |
| Reference Answer: {ref_answer} |
| --- |
| |
| Respond ONLY with valid JSON in the following format: |
| {{ |
| "Score": "Poor" | "Medium" | "Good" | "Excellent", |
| "Reasoning": "short justification", |
| "Improvements": ["tip1", "tip2"] |
| }} |
| """ |
|
|
| try: |
| start = time.time() |
| raw = groq_llm.invoke(prompt) |
| print("⏱️ evaluate_answer duration:", round(time.time() - start, 2), "s") |
|
|
| if isinstance(raw, AIMessage): |
| output = raw.content |
| else: |
| output = str(raw) |
|
|
| print("🔍 Raw Groq Response:\n", output) |
|
|
| start_idx = output.rfind("{") |
| end_idx = output.rfind("}") + 1 |
| json_str = output[start_idx:end_idx] |
|
|
| result = json.loads(json_str) |
| if result.get("Score") in {"Poor", "Medium", "Good", "Excellent"}: |
| return result |
| else: |
| raise ValueError("Invalid score value") |
|
|
| except Exception as e: |
| print(f"⚠️ evaluate_answer fallback: {e}") |
| return { |
| "Score": "Poor", |
| "Reasoning": "Failed to evaluate properly. Defaulted to Poor.", |
| "Improvements": [ |
| "Be more specific", |
| "Add technical details", |
| "Structure the answer clearly" |
| ] |
| } |
|
|
| |
| def generate_reference_answer(question, job_role, seniority): |
| """ |
| Generates a high-quality reference answer using Groq-hosted LLaMA model. |
| Args: |
| question (str): Interview question to answer. |
| job_role (str): Target job role (e.g., "Frontend Developer"). |
| seniority (str): Experience level (e.g., "Mid-Level"). |
| Returns: |
| str: Clean, generated reference answer or error message. |
| """ |
| try: |
| |
| prompt = f"""You are a {seniority} {job_role}. |
| Q: {question} |
| A:""" |
|
|
| |
| ref_answer = groq_llm.predict(prompt) |
|
|
| if not ref_answer.strip(): |
| return "Reference answer not generated." |
|
|
| return ref_answer.strip() |
|
|
| except Exception as e: |
| logging.error(f"Error generating reference answer: {e}", exc_info=True) |
| return "Unable to generate reference answer due to an error" |
|
|
|
|
|
|
| def build_interview_prompt(conversation_history, user_response, context, job_role, skills, seniority, |
| difficulty_adjustment=None, voice_label=None, face_label=None, effective_confidence=None): |
| """Build a prompt for generating the next interview question with adaptive difficulty and fairness logic.""" |
| |
| interview_template = """ |
| You are an AI interviewer conducting a real-time interview for a {job_role} position. |
| Your objective is to thoroughly evaluate the candidate's suitability for the role using smart, structured, and adaptive questioning. |
| --- |
| Interview Rules and Principles: |
| - The **baseline difficulty** of questions must match the candidate’s seniority level (e.g., junior, mid-level, senior). |
| - Use your judgment to increase difficulty **slightly** if the candidate performs well, or simplify if they struggle — but never drop below the expected baseline for their level. |
| - Avoid asking extremely difficult questions to junior candidates unless they’ve clearly demonstrated advanced knowledge. |
| - Be fair: candidates for the same role should be evaluated within a consistent difficulty range. |
| - Adapt your line of questioning gradually and logically based on the **overall flow**, not just the last answer. |
| - Include real-world problem-solving scenarios to test how the candidate thinks and behaves practically. |
| - You must **lead** the interview and make intelligent decisions about what to ask next. |
| --- |
| Context Use: |
| {context_instruction} |
| Note: |
| If no relevant context was retrieved or the previous answer is unclear, you must still generate a thoughtful interview question using your own knowledge. Do not skip generation. Avoid default or fallback responses — always try to generate a meaningful and fair next question. |
| --- |
| Job Role: {job_role} |
| Seniority Level: {seniority} |
| Skills Focus: {skills} |
| Difficulty Setting: {difficulty} (based on {difficulty_adjustment}) |
| --- |
| Recent Conversation History: |
| {history} |
| Candidate's Last Response: |
| "{user_response}" |
| Evaluation of Last Response: |
| {response_evaluation} |
| Voice Tone: {voice_label} |
| --- |
| --- |
| Important: |
| If no relevant context was retrieved or the previous answer is unclear or off-topic, |
| you must still generate a meaningful and fair interview question using your own knowledge and best practices. |
| Do not skip question generation or fall back to default/filler responses. |
| --- |
| Guidelines for Next Question: |
| - If this is the beginning of the interview, start with a question about the candidate’s background or experience. |
| - Base the difficulty primarily on the seniority level, with light adjustment from recent performance. |
| - Focus on core skills, real-world applications, and depth of reasoning. |
| - Ask only one question. Be clear and concise. |
| Generate the next interview question now: |
| """ |
|
|
| |
| if difficulty_adjustment == "harder": |
| difficulty = f"slightly more challenging than typical for {seniority}" |
| elif difficulty_adjustment == "easier": |
| difficulty = f"slightly easier than typical for {seniority}" |
| else: |
| difficulty = f"appropriate for {seniority}" |
|
|
| |
| if context.strip(): |
| context_instruction = ( |
| "Use both your own expertise and the provided context from relevant interview datasets. " |
| "You can either build on questions from the dataset or generate your own." |
| ) |
| context = context.strip() |
| else: |
| context_instruction = ( |
| "No specific context retrieved. Use your own knowledge and best practices to craft a question." |
| ) |
| context = "" |
| |
|
|
| |
| recent_history = conversation_history[-6:] if len(conversation_history) > 6 else conversation_history |
| formatted_history = "\n".join([f"{msg['role'].capitalize()}: {msg['content']}" for msg in recent_history]) |
|
|
| |
| |
| if conversation_history and conversation_history[-1].get("evaluation"): |
| eval_data = conversation_history[-1]["evaluation"][-1] |
| response_evaluation = f""" |
| - Score: {eval_data.get('Score', 'N/A')} |
| - Reasoning: {eval_data.get('Reasoning', 'N/A')} |
| - Improvements: {eval_data.get('Improvements', 'N/A')} |
| """ |
| else: |
| response_evaluation = "No evaluation available yet." |
|
|
|
|
| |
| prompt = interview_template.format( |
| job_role=job_role, |
| seniority=seniority, |
| skills=skills, |
| difficulty=difficulty, |
| difficulty_adjustment=difficulty_adjustment if difficulty_adjustment else "default seniority", |
| context_instruction=context_instruction, |
| context=context, |
| history=formatted_history, |
| user_response=user_response, |
| response_evaluation=response_evaluation.strip(), |
| voice_label=voice_label or "unknown", |
| ) |
|
|
| return prompt |
|
|
|
|
| def generate_llm_interview_report( |
| interview_state, logged_samples, job_role, seniority |
| ): |
| from collections import Counter |
|
|
| |
| def score_label(label): |
| mapping = { |
| "confident": 5, "calm": 4, "neutral": 3, "nervous": 2, "anxious": 1, "unknown": 3 |
| } |
| return mapping.get(label.lower(), 3) |
|
|
| def section_score(vals): |
| return round(sum(vals)/len(vals), 2) if vals else "N/A" |
|
|
| |
| scores, voice_conf, face_conf, comm_scores = [], [], [], [] |
| tech_details, comm_details, emotion_details, relevance_details, problem_details = [], [], [], [], [] |
|
|
| for entry in logged_samples: |
| answer_eval = entry.get("answer_evaluation", {}) |
| score = answer_eval.get("Score", "Not Evaluated") |
| reasoning = answer_eval.get("Reasoning", "") |
| if score.lower() in ["excellent", "good", "medium", "poor"]: |
| score_map = {"excellent": 5, "good": 4, "medium": 3, "poor": 2} |
| scores.append(score_map[score.lower()]) |
| |
| tech_details.append(reasoning) |
| comm_details.append(reasoning) |
| |
| voice_conf.append(score_label(entry.get("voice_label", "unknown"))) |
| face_conf.append(score_label(entry.get("face_label", "unknown"))) |
| |
| if entry["user_answer"]: |
| length = len(entry["user_answer"].split()) |
| comm_score = min(5, max(2, length // 30)) |
| comm_scores.append(comm_score) |
|
|
| |
| avg_problem = section_score(scores) |
| avg_tech = section_score(scores) |
| avg_comm = section_score(comm_scores) |
| avg_emotion = section_score([(v+f)/2 for v, f in zip(voice_conf, face_conf)]) |
|
|
| |
| section_averages = [avg_problem, avg_tech, avg_comm, avg_emotion] |
| numeric_avgs = [v for v in section_averages if isinstance(v, (float, int))] |
| avg_overall = round(sum(numeric_avgs) / len(numeric_avgs), 2) if numeric_avgs else 0 |
|
|
| |
| if avg_overall >= 4.5: |
| verdict = "Strong Hire" |
| elif avg_overall >= 4.0: |
| verdict = "Hire" |
| elif avg_overall >= 3.0: |
| verdict = "Conditional Hire" |
| else: |
| verdict = "No Hire" |
|
|
| |
| transcript = "\n\n".join([ |
| f"Q: {e['generated_question']}\nA: {e['user_answer']}\nScore: {e.get('answer_evaluation',{}).get('Score','')}\nReasoning: {e.get('answer_evaluation',{}).get('Reasoning','')}" |
| for e in logged_samples |
| ]) |
|
|
| prompt = f""" |
| You are a senior technical interviewer at a major tech company. |
| Write a structured, realistic hiring report for this {seniority} {job_role} interview, using these section scores (scale 1–5, with 5 best): |
| Section-wise Evaluation |
| 1. *Problem Solving & Critical Thinking*: {avg_problem} |
| 2. *Technical Depth & Knowledge*: {avg_tech} |
| 3. *Communication & Clarity*: {avg_comm} |
| 4. *Emotional Composure & Confidence*: {avg_emotion} |
| 5. *Role Relevance*: 5 |
| *Transcript* |
| {transcript} |
| Your report should have the following sections: |
| 1. *Executive Summary* (realistic, hiring-committee style) |
| 2. *Section-wise Comments* (for each numbered category above, with short paragraph citing specifics) |
| 3. *Strengths & Weaknesses* (list at least 2 for each) |
| 4. *Final Verdict*: {verdict} |
| 5. *Recommendations* (2–3 for future improvement) |
| Use realistic language. If some sections are N/A or lower than others, comment honestly. |
| Interview Report: |
| """ |
| |
| return groq_llm.predict(prompt) |
|
|
| def get_user_info(): |
| """ |
| Collects essential information from the candidate before starting the interview. |
| Returns a dictionary with keys: name, job_role, seniority, skills |
| """ |
| import logging |
| logging.info("Collecting user information...") |
|
|
| print("Welcome to the AI Interview Simulator!") |
| print("Let’s set up your mock interview.\n") |
|
|
| |
| name = input("What is your name? ").strip() |
| while not name: |
| print("Please enter your name.") |
| name = input("What is your name? ").strip() |
|
|
| |
| job_role = input(f"Hi {name}, what job role are you preparing for? (e.g. Frontend Developer) ").strip() |
| while not job_role: |
| print("Please specify the job role.") |
| job_role = input("What job role are you preparing for? ").strip() |
|
|
| |
| seniority_options = ["Entry-level", "Junior", "Mid-Level", "Senior", "Lead"] |
| print("\nSelect your experience level:") |
| for i, option in enumerate(seniority_options, 1): |
| print(f"{i}. {option}") |
|
|
| seniority_choice = None |
| while seniority_choice not in range(1, len(seniority_options)+1): |
| try: |
| seniority_choice = int(input("Enter the number corresponding to your level: ")) |
| except ValueError: |
| print(f"Please enter a number between 1 and {len(seniority_options)}") |
|
|
| seniority = seniority_options[seniority_choice - 1] |
|
|
| |
| skills_input = input(f"\nWhat are your top skills relevant to {job_role}? (Separate with commas): ") |
| skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] |
|
|
| while not skills: |
| print("Please enter at least one skill.") |
| skills_input = input("Your top skills (comma-separated): ") |
| skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] |
|
|
| |
| print("\n Interview Setup Complete!") |
| print(f"Name: {name}") |
| print(f"Job Role: {job_role}") |
| print(f"Experience Level: {seniority}") |
| print(f"Skills: {', '.join(skills)}") |
| print("\nStarting your mock interview...\n") |
|
|
| return { |
| "name": name, |
| "job_role": job_role, |
| "seniority": seniority, |
| "skills": skills |
| } |
|
|
| import threading |
|
|
| def wait_for_user_response(timeout=200): |
| """Wait for user input with timeout. Returns '' if no response.""" |
| user_input = [] |
|
|
| def get_input(): |
| answer = input("Your Answer (within timeout): ").strip() |
| user_input.append(answer) |
|
|
| thread = threading.Thread(target=get_input) |
| thread.start() |
| thread.join(timeout) |
|
|
| return user_input[0] if user_input else "" |
|
|
| import json |
| from datetime import datetime |
| from time import time |
| import random |
|
|
| def interview_loop(max_questions, timeout_seconds=300, collection_name="interview_questions", judge_pipeline=None, save_path="interview_log.json"): |
|
|
|
|
| user_info = get_user_info() |
| job_role = user_info['job_role'] |
| seniority = user_info['seniority'] |
| skills = user_info['skills'] |
|
|
| all_roles = extract_all_roles_from_qdrant(collection_name=collection_name) |
| retrieved_data = retrieve_interview_data(job_role, all_roles) |
| context_data = random_context_chunks(retrieved_data, k=4) |
|
|
| conversation_history = [] |
| interview_state = { |
| "questions": [], |
| "user_answer": [], |
| "job_role": job_role, |
| "seniority": seniority, |
| "start_time": time() |
| } |
|
|
| |
| logged_samples = [] |
|
|
| difficulty_adjustment = None |
|
|
| for i in range(max_questions): |
| last_user_response = conversation_history[-1]['content'] if conversation_history else "" |
|
|
| |
| prompt = build_interview_prompt( |
| conversation_history=conversation_history, |
| user_response=last_user_response, |
| context=context_data, |
| job_role=job_role, |
| skills=skills, |
| seniority=seniority, |
| difficulty_adjustment=difficulty_adjustment |
| ) |
| question = groq_llm.predict(prompt) |
| question_eval = eval_question_quality(question, job_role, seniority) |
|
|
| conversation_history.append({'role': "Interviewer", "content": question}) |
| print(f"Interviewer: Q{i + 1} : {question}") |
|
|
| |
| start_time = time() |
| user_answer = wait_for_user_response(timeout=timeout_seconds) |
| response_time = time() - start_time |
|
|
| skipped = False |
| answer_eval = None |
| ref_answer = None |
|
|
| if not user_answer: |
| print("No Response Received, moving to next question.") |
| user_answer = None |
| skipped = True |
| difficulty_adjustment = "medium" |
| else: |
| conversation_history.append({"role": "Candidate", "content": user_answer}) |
|
|
| ref_answer = generate_reference_answer(question, job_role, seniority) |
| answer_eval = evaluate_answer( |
| question=question, |
| answer=user_answer, |
| ref_answer=ref_answer, |
| job_role=job_role, |
| seniority=seniority, |
| judge_pipeline=judge_pipeline |
| ) |
| |
|
|
| interview_state["user_answer"].append(user_answer) |
| |
| conversation_history[-1].setdefault('evaluation', []).append({ |
| "technical_depth": { |
| "score": answer_eval['Score'], |
| "Reasoning": answer_eval['Reasoning'] |
| } |
| }) |
|
|
| |
| score = answer_eval['Score'].lower() |
| if score == "excellent": |
| difficulty_adjustment = "harder" |
| elif score in ['poor', 'medium']: |
| difficulty_adjustment = "easier" |
| else: |
| difficulty_adjustment = None |
|
|
| |
| logged_samples.append({ |
| "job_role": job_role, |
| "seniority": seniority, |
| "skills": skills, |
| "context": context_data, |
| "prompt": prompt, |
| "generated_question": question, |
| "question_evaluation": question_eval, |
| "user_answer": user_answer, |
| "reference_answer": ref_answer, |
| "answer_evaluation": answer_eval, |
| "skipped": skipped |
| }) |
|
|
| |
| interview_state['questions'].append({ |
| "question": question, |
| "question_evaluation": question_eval, |
| "user_answer": user_answer, |
| "answer_evaluation": answer_eval, |
| "skipped": skipped |
| }) |
|
|
| interview_state['end_time'] = time() |
| report = generate_llm_interview_report(interview_state, job_role, seniority) |
| print("Report : _____________________\n") |
| print(report) |
| print('______________________________________________') |
|
|
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| filename = f"{save_path.replace('.json', '')}_{timestamp}.json" |
| with open(filename, "w", encoding="utf-8") as f: |
| json.dump(logged_samples, f, indent=2, ensure_ascii=False) |
|
|
| print(f" Interview log saved to {filename}") |
| print("____________________________________\n") |
| |
| print(f"interview state : {interview_state}") |
| return interview_state, report |
|
|
| from sklearn.metrics import precision_score, recall_score, f1_score |
| import numpy as np |
| |
|
|
| def build_ground_truth(all_roles): |
| gt = {} |
| for role in all_roles: |
| qa_list = get_role_questions(role) |
| gt[role] = set(q["question"] for q in qa_list if q["question"]) |
| return gt |
|
|
|
|
| def evaluate_retrieval(job_role, all_roles, k=10): |
| """ |
| Evaluate retrieval quality using Precision@k, Recall@k, and F1@k. |
| Args: |
| job_role (str): The input job role to search for. |
| all_roles (list): List of all available job roles in the system. |
| k (int): Top-k retrieved questions to evaluate. |
| Returns: |
| dict: Evaluation metrics including precision, recall, and f1. |
| """ |
|
|
| |
| ground_truth_qs = set( |
| q["question"].strip() |
| for q in get_role_questions(job_role) |
| if q.get("question") |
| ) |
|
|
| if not ground_truth_qs: |
| print(f"[!] No ground truth found for role: {job_role}") |
| return {} |
|
|
| |
| retrieved_qas = retrieve_interview_data(job_role, all_roles) |
| retrieved_qs = [q["question"].strip() for q in retrieved_qas if q.get("question")] |
|
|
| |
| retrieved_top_k = retrieved_qs[:k] |
|
|
| |
| y_true = [1 if q in ground_truth_qs else 0 for q in retrieved_top_k] |
| y_pred = [1] * len(y_true) |
|
|
| precision = precision_score(y_true, y_pred, zero_division=0) |
| recall = recall_score(y_true, y_pred, zero_division=0) |
| f1 = f1_score(y_true, y_pred, zero_division=0) |
|
|
| print(f" Retrieval Evaluation for role: '{job_role}' (Top-{k})") |
| print(f"Precision@{k}: {precision:.2f}") |
| print(f"Recall@{k}: {recall:.2f}") |
| print(f"F1@{k}: {f1:.2f}") |
| print(f"Relevant Retrieved: {sum(y_true)}/{len(y_true)}") |
| print("–" * 40) |
|
|
| return { |
| "job_role": job_role, |
| "precision": precision, |
| "recall": recall, |
| "f1": f1, |
| "relevant_retrieved": sum(y_true), |
| "total_retrieved": len(y_true), |
| "ground_truth_count": len(ground_truth_qs), |
| } |
|
|
|
|
| k_values = [5, 10, 20] |
| all_roles = extract_all_roles_from_qdrant(collection_name="interview_questions") |
|
|
| results = [] |
|
|
| for k in k_values: |
| for role in all_roles: |
| metrics = evaluate_retrieval(role, all_roles, k=k) |
| if metrics: |
| metrics["k"] = k |
| results.append(metrics) |
|
|
| import pandas as pd |
|
|
| df = pd.DataFrame(results) |
| summary = df.groupby("k")[["precision", "recall", "f1"]].mean().round(3) |
| print(summary) |
|
|
|
|
| def extract_job_details(job_description): |
| """Extract job details such as title, skills, experience level, and years of experience from the job description.""" |
| title_match = re.search(r"(?i)(?:seeking|hiring) a (.+?) to", job_description) |
| job_title = title_match.group(1) if title_match else "Unknown" |
|
|
| skills_match = re.findall(r"(?i)(?:Proficiency in|Experience with|Knowledge of) (.+?)(?:,|\.| and| or)", job_description) |
| skills = list(set([skill.strip() for skill in skills_match])) if skills_match else [] |
|
|
| experience_match = re.search(r"(\d+)\+? years of experience", job_description) |
| if experience_match: |
| years_experience = int(experience_match.group(1)) |
| experience_level = "Senior" if years_experience >= 5 else "Mid" if years_experience >= 3 else "Junior" |
| else: |
| years_experience = None |
| experience_level = "Unknown" |
|
|
| return { |
| "job_title": job_title, |
| "skills": skills, |
| "experience_level": experience_level, |
| "years_experience": years_experience |
| } |
|
|
| import re |
| from docx import Document |
| import textract |
| from PyPDF2 import PdfReader |
|
|
| JOB_TITLES = [ |
| "Accountant", "Data Scientist", "Machine Learning Engineer", "Software Engineer", |
| "Developer", "Analyst", "Researcher", "Intern", "Consultant", "Manager", |
| "Engineer", "Specialist", "Project Manager", "Product Manager", "Administrator", |
| "Director", "Officer", "Assistant", "Coordinator", "Supervisor" |
| ] |
|
|
| def clean_filename_name(filename): |
| |
| base = re.sub(r"\.[^.]+$", "", filename) |
| base = base.strip() |
| |
| |
| base_clean = re.sub(r"\bcv\b", "", base, flags=re.IGNORECASE).strip() |
| |
| |
| if not base_clean: |
| return None |
| |
| |
| if re.search(r"\d", base_clean): |
| return None |
| |
| |
| base_clean = base_clean.replace("_", " ").replace("-", " ") |
| return base_clean.title() |
|
|
| def looks_like_job_title(line): |
| for title in JOB_TITLES: |
| pattern = r"\b" + re.escape(title.lower()) + r"\b" |
| if re.search(pattern, line.lower()): |
| return True |
| return False |
|
|
| def extract_name_from_text(lines): |
| |
| for i in range(min(1, len(lines))): |
| line = lines[i].strip() |
| if looks_like_job_title(line): |
| return "unknown" |
| if re.search(r"\d", line): |
| continue |
| if len(line.split()) > 4 or len(line) > 40: |
| continue |
| |
| if line.isupper(): |
| continue |
| |
| return line.title() |
| return None |
|
|
| def extract_text_from_file(file_path): |
| if file_path.endswith('.pdf'): |
| reader = PdfReader(file_path) |
| text = "\n".join(page.extract_text() or '' for page in reader.pages) |
| elif file_path.endswith('.docx'): |
| doc = Document(file_path) |
| text = "\n".join([para.text for para in doc.paragraphs]) |
| else: |
| text = textract.process(file_path).decode('utf-8') |
| return text.strip() |
|
|
| def extract_candidate_details(file_path): |
| text = extract_text_from_file(file_path) |
| lines = [line.strip() for line in text.splitlines() if line.strip()] |
|
|
| |
| filename = file_path.split("/")[-1] |
| name = clean_filename_name(filename) |
| if not name: |
| name = extract_name_from_text(lines) |
| if not name: |
| name = "Unknown" |
|
|
| |
| skills = [] |
| skills_section = re.search(r"Skills\s*[:\-]?\s*(.+)", text, re.IGNORECASE) |
| if skills_section: |
| raw_skills = skills_section.group(1) |
| skills = [s.strip() for s in re.split(r",|\n|•|-", raw_skills) if s.strip()] |
|
|
| return { |
| "name": name, |
| "skills": skills |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
|
|
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
|
|
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| import gradio as gr |
| import time |
| import tempfile |
| import numpy as np |
| import scipy.io.wavfile as wavfile |
| import os |
| import json |
| import edge_tts |
| import torch, gc |
| from faster_whisper import WhisperModel |
| import asyncio |
| import threading |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| print(torch.cuda.is_available()) |
| torch.cuda.empty_cache() |
| gc.collect() |
|
|
| |
| faster_whisper_model = None |
| tts_voice = "en-US-AriaNeural" |
|
|
|
|
| |
| executor = ThreadPoolExecutor(max_workers=2) |
|
|
| |
| if torch.cuda.is_available(): |
| print(f"🔥 CUDA Available: {torch.cuda.get_device_name(0)}") |
| print(f"🔥 CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB") |
| |
| torch.cuda.set_device(0) |
| else: |
| print("⚠️ CUDA not available, using CPU") |
|
|
| def load_models_lazy(): |
| """Load models only when needed""" |
| global faster_whisper_model |
| |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| print(f"🔁 Using device: {device}") |
|
|
| if faster_whisper_model is None: |
| print("🔁 Loading Faster-Whisper model...") |
| compute_type = "float16" if device == "cuda" else "int8" |
| faster_whisper_model = WhisperModel("base", device=device, compute_type=compute_type) |
| print(f"✅ Faster-Whisper model loaded on {device}") |
|
|
|
|
| async def edge_tts_to_file(text, output_path="tts.wav", voice=tts_voice): |
| communicate = edge_tts.Communicate(text, voice) |
| await communicate.save(output_path) |
| return output_path |
|
|
| def tts_async(text): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| return executor.submit(loop.run_until_complete, edge_tts_to_file(text)) |
|
|
|
|
|
|
|
|
| def whisper_stt(audio_path): |
| """STT using Faster-Whisper""" |
| if not audio_path or not os.path.exists(audio_path): |
| return "" |
|
|
| load_models_lazy() |
| print("🔁 Transcribing with Faster-Whisper") |
|
|
| segments, _ = faster_whisper_model.transcribe(audio_path) |
| transcript = " ".join(segment.text for segment in segments) |
| return transcript.strip() |
|
|
|
|
| seniority_mapping = { |
| "Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 |
| } |
|
|
| with gr.Blocks(theme=gr.themes.Soft()) as demo: |
| user_data = gr.State({}) |
| interview_state = gr.State({}) |
| missing_fields_state = gr.State([]) |
| tts_future = gr.State(None) |
|
|
| with gr.Column(visible=True) as user_info_section: |
| gr.Markdown("## Candidate Information") |
| cv_file = gr.File(label="Upload CV") |
| job_desc = gr.Textbox(label="Job Description") |
| start_btn = gr.Button("Continue", interactive=False) |
|
|
| with gr.Column(visible=False) as missing_section: |
| gr.Markdown("## Missing Information") |
| name_in = gr.Textbox(label="Name", visible=False) |
| role_in = gr.Textbox(label="Job Role", visible=False) |
| seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) |
| skills_in = gr.Textbox(label="Skills", visible=False) |
| submit_btn = gr.Button("Submit", interactive=False) |
|
|
| with gr.Column(visible=False) as interview_pre_section: |
| pre_interview_greeting_md = gr.Markdown() |
| start_interview_final_btn = gr.Button("Start Interview") |
| loading_status = gr.Markdown("", visible=False) |
|
|
| with gr.Column(visible=False) as interview_section: |
| gr.Markdown("## Interview in Progress") |
| question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) |
| question_text = gr.Markdown() |
| user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") |
| stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") |
| confirm_btn = gr.Button("Confirm Answer") |
| evaluation_display = gr.Markdown() |
| interview_summary = gr.Markdown(visible=False) |
|
|
| def validate_start_btn(cv_file, job_desc): |
| return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) |
| |
| cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) |
| job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) |
|
|
| def process_and_route_initial(cv_file, job_desc): |
| details = extract_candidate_details(cv_file.name) |
| job_info = extract_job_details(job_desc) |
| data = { |
| "name": details.get("name", "unknown"), |
| "job_role": job_info.get("job_title", "unknown"), |
| "seniority": job_info.get("experience_level", "unknown"), |
| "skills": job_info.get("skills", []) |
| } |
| missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] |
| if missing: |
| return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) |
| else: |
| greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." |
| return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) |
| |
| start_btn.click(process_and_route_initial, [cv_file, job_desc], [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md]) |
|
|
| def show_missing(missing): |
| if missing is None: missing = [] |
| return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) |
| |
| missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) |
|
|
| def validate_fields(name, role, seniority, skills, missing): |
| if not missing: return gr.update(interactive=False) |
| all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip()))]) |
| return gr.update(interactive=all_filled) |
| |
| for inp in [name_in, role_in, seniority_in, skills_in]: |
| inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) |
|
|
| def complete_manual(data, name, role, seniority, skills): |
| if data["name"].lower() == "unknown": data["name"] = name |
| if data["job_role"].lower() == "unknown": data["job_role"] = role |
| if data["seniority"].lower() == "unknown": data["seniority"] = seniority |
| if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] |
| greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." |
| return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) |
| |
| submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) |
|
|
| async def start_interview(data): |
| |
| state = { |
| "questions": [], |
| "answers": [], |
| "timings": [], |
| "question_evaluations": [], |
| "answer_evaluations": [], |
| "conversation_history": [], |
| "difficulty_adjustment": None, |
| "question_idx": 0, |
| "max_questions": 3, |
| "q_start_time": time.time(), |
| "log": [] |
| } |
| |
| |
| context = "" |
| prompt = build_interview_prompt( |
| conversation_history=[], |
| user_response="", |
| context=context, |
| job_role=data["job_role"], |
| skills=data["skills"], |
| seniority=data["seniority"], |
| difficulty_adjustment=None, |
| voice_label="neutral" |
| ) |
| |
| |
| start = time.time() |
| first_q = groq_llm.predict(prompt) |
| print("⏱️ Groq LLM Response Time:", round(time.time() - start, 2), "seconds") |
| q_eval = { |
| "Score": "N/A", |
| "Reasoning": "Skipped to reduce processing time", |
| "Improvements": [] |
| } |
| |
| state["questions"].append(first_q) |
| state["question_evaluations"].append(q_eval) |
| state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) |
| |
| |
| start = time.perf_counter() |
| cleaned_text = first_q.strip().replace("\n", " ") |
| audio_path = await edge_tts_to_file(first_q) |
| print("⏱️ TTS (edge-tts) took", round(time.perf_counter() - start, 2), "seconds") |
|
|
| |
| state["log"].append({ |
| "type": "question", |
| "question": first_q, |
| "question_eval": q_eval, |
| "timestamp": time.time() |
| }) |
| |
| return ( |
| state, |
| gr.update(visible=False), |
| gr.update(visible=True), |
| audio_path, |
| f"*Question 1:* {first_q}" |
| ) |
| |
| |
| start_interview_final_btn.click( |
| fn=start_interview, |
| inputs=[user_data], |
| outputs=[interview_state, interview_pre_section, interview_section, question_audio, question_text], |
| concurrency_limit=1 |
| ) |
|
|
|
|
| def transcribe(audio_path): |
| return whisper_stt(audio_path) |
| |
| user_audio_input.change(transcribe, user_audio_input, stt_transcript) |
|
|
| async def process_answer(transcript, audio_path, state, data): |
| start = time.time() |
| if not transcript: |
| return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() |
|
|
| elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) |
| state["timings"].append(elapsed) |
| state["answers"].append(transcript) |
| state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) |
|
|
| last_q = state["questions"][-1] |
| q_eval = state["question_evaluations"][-1] |
| ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) |
| answer_eval = await asyncio.get_event_loop().run_in_executor( |
| executor, |
| evaluate_answer, |
| last_q, transcript, ref_answer, data["job_role"], data["seniority"] |
| ) |
|
|
| state["answer_evaluations"].append(answer_eval) |
| answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" |
|
|
| if answer_score == "excellent": |
| state["difficulty_adjustment"] = "harder" |
| elif answer_score in ("medium", "poor"): |
| state["difficulty_adjustment"] = "easier" |
| else: |
| state["difficulty_adjustment"] = None |
|
|
| state["log"].append({ |
| "type": "answer", "question": last_q, "answer": transcript, |
| "answer_eval": answer_eval, "ref_answer": ref_answer, |
| "timing": elapsed, "timestamp": time.time() |
| }) |
|
|
| qidx = state["question_idx"] + 1 |
| if qidx >= state["max_questions"]: |
| timestamp = time.strftime("%Y%m%d_%H%M%S") |
| log_file = f"interview_log_{timestamp}.json" |
| with open(log_file, "w", encoding="utf-8") as f: |
| json.dump(state["log"], f, indent=2, ensure_ascii=False) |
| summary = "# Interview Summary\n" |
| for i, q in enumerate(state["questions"]): |
| summary += (f"\n### Q{i + 1}: {q}\n" |
| f"- *Answer*: {state['answers'][i]}\n" |
| f"- *Q Eval*: {state['question_evaluations'][i]}\n" |
| f"- *A Eval*: {state['answer_evaluations'][i]}\n" |
| f"- *Time*: {state['timings'][i]}s\n") |
| summary += f"\n\n⏺ Full log saved as {log_file}." |
| return state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=False) |
| else: |
| state["question_idx"] = qidx |
| state["q_start_time"] = time.time() |
| context = "" |
| prompt = build_interview_prompt( |
| conversation_history=state["conversation_history"], |
| user_response=transcript, context=context, |
| job_role=data["job_role"], skills=data["skills"], |
| seniority=data["seniority"], difficulty_adjustment=state["difficulty_adjustment"], |
| voice_label="neutral" |
| ) |
| start = time.time() |
| next_q = groq_llm.predict(prompt) |
| print("⏱️ Groq LLM Response Time:", round(time.time() - start, 2), "seconds") |
| start = time.time() |
| q_eval_future = executor.submit( |
| eval_question_quality, |
| next_q, data["job_role"], data["seniority"] |
| ) |
| q_eval = q_eval_future.result() |
| print("⏱️ Evaluation time:", round(time.time() - start, 2), "seconds") |
| state["questions"].append(next_q) |
| state["question_evaluations"].append(q_eval) |
| state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) |
| state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) |
| |
| audio_path = await edge_tts_to_file(next_q) |
|
|
| |
| eval_md = f"*Last Answer Eval:* {answer_eval}" |
| print("✅ process_answer time:", round(time.time() - start, 2), "s") |
| return state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=eval_md) |
|
|
|
|
| confirm_btn.click( |
| fn=process_answer, |
| inputs=[stt_transcript, user_audio_input, interview_state, user_data], |
| outputs=[interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, |
| evaluation_display], |
| concurrency_limit=1 |
| ).then( |
| lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript] |
| ) |
|
|
| demo.launch(debug=True) |