Spaces:
Paused
Paused
| import requests | |
| import os | |
| import json | |
| from langchain_groq import ChatGroq | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import Qdrant | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import LLMChain | |
| from langchain.retrievers import ContextualCompressionRetriever | |
| from langchain.retrievers.document_compressors import CohereRerank | |
| from qdrant_client import QdrantClient | |
| import cohere | |
| import json | |
| import re | |
| import time | |
| from collections import defaultdict | |
| from qdrant_client.http import models | |
| from qdrant_client.models import PointStruct | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.neighbors import NearestNeighbors | |
| from transformers import AutoTokenizer | |
| #from langchain_huggingface import HuggingFaceEndpoint | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| import numpy as np | |
| import os | |
| from dotenv import load_dotenv | |
| from enum import Enum | |
| import time | |
| from inputimeout import inputimeout, TimeoutOccurred | |
| # Import Qdrant client and models (adjust based on your environment) | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http.models import VectorParams, Distance, Filter, FieldCondition, MatchValue | |
| from qdrant_client.http.models import PointStruct, Filter, FieldCondition, MatchValue, SearchRequest | |
| import traceback | |
| from transformers import pipeline | |
| from textwrap import dedent | |
| import json | |
| import logging | |
| from transformers import pipeline,BitsAndBytesConfig | |
| import os | |
| cohere_api_key = os.getenv("COHERE_API_KEY") | |
| chat_groq_api = os.getenv("GROQ_API_KEY") | |
| hf_api_key = os.getenv("HF_API_KEY") | |
| qdrant_api = os.getenv("QDRANT_API_KEY") | |
| qdrant_url = os.getenv("QDRANT_API_URL") | |
| print("GROQ API Key:", chat_groq_api) | |
| print("QDRANT API Key:", qdrant_api) | |
| print("QDRANT API URL:", qdrant_url) | |
| print("Cohere API Key:", cohere_api_key) | |
| from qdrant_client import QdrantClient | |
| qdrant_client = QdrantClient( | |
| url="https://313b1ceb-057f-4b7b-89f5-7b19a213fe65.us-east-1-0.aws.cloud.qdrant.io:6333", | |
| api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.w13SPZbljbSvt9Ch_0r034QhMFlmEr4ctXqLo2zhxm4", | |
| ) | |
| print(qdrant_client.get_collections()) | |
| class CustomChatGroq: | |
| def __init__(self, temperature, model_name, api_key): | |
| self.temperature = temperature | |
| self.model_name = model_name | |
| self.api_key = api_key | |
| self.api_url = "https://api.groq.com/openai/v1/chat/completions" | |
| def predict(self, prompt): | |
| """Send a request to the Groq API and return the generated response.""" | |
| try: | |
| headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": self.model_name, | |
| "messages": [{"role": "system", "content": "You are an AI interviewer."}, | |
| {"role": "user", "content": prompt}], | |
| "temperature": self.temperature, | |
| "max_tokens": 150 | |
| } | |
| response = requests.post(self.api_url, headers=headers, json=payload, timeout=10) | |
| response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx | |
| data = response.json() | |
| # Extract response text based on Groq API response format | |
| if "choices" in data and len(data["choices"]) > 0: | |
| return data["choices"][0]["message"]["content"].strip() | |
| logging.warning("Unexpected response structure from Groq API") | |
| return "Interviewer: Could you tell me more about your relevant experience?" | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"ChatGroq API error: {e}") | |
| return "Interviewer: Due to a system issue, let's move on to another question." | |
| groq_llm = ChatGroq( | |
| temperature=0.7, | |
| model_name="llama-3.3-70b-versatile", | |
| api_key=chat_groq_api | |
| ) | |
| from huggingface_hub import login | |
| import os | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if HF_TOKEN: | |
| login(HF_TOKEN) | |
| else: | |
| raise EnvironmentError("Missing HF_TOKEN environment variable.") | |
| #Load mistral Model | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| import torch | |
| print(torch.cuda.is_available()) | |
| MODEL_PATH = "mistralai/Mistral-7B-Instruct-v0.3" | |
| #MODEL_PATH = "tiiuae/falcon-rw-1b" | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_8bit=True, | |
| ) | |
| mistral_tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH,token=hf_api_key) | |
| judge_llm = AutoModelForCausalLM.from_pretrained( | |
| MODEL_PATH, | |
| quantization_config=bnb_config,torch_dtype=torch.float16, | |
| device_map="auto", | |
| token=hf_api_key | |
| ) | |
| judge_llm.config.pad_token_id = judge_llm.config.eos_token_id | |
| print(judge_llm.hf_device_map) | |
| judge_pipeline = pipeline( | |
| "text-generation", | |
| model=judge_llm, | |
| tokenizer=mistral_tokenizer, | |
| max_new_tokens=128, | |
| temperature=0.3, | |
| top_p=0.9, | |
| do_sample=True, # Optional but recommended with temperature/top_p | |
| repetition_penalty=1.1, | |
| ) | |
| output = judge_pipeline("Q: What is Python?\nA:", max_new_tokens=128)[0]['generated_text'] | |
| print(output) | |
| # embedding model | |
| from sentence_transformers import SentenceTransformer | |
| class LocalEmbeddings: | |
| def __init__(self, model_name="all-MiniLM-L6-v2"): | |
| self.model = SentenceTransformer(model_name) | |
| def embed_query(self, text): | |
| return self.model.encode(text).tolist() | |
| def embed_documents(self, documents): | |
| return self.model.encode(documents).tolist() | |
| embeddings = LocalEmbeddings() | |
| # import cohere | |
| qdrant_client = QdrantClient(url=qdrant_url, api_key=qdrant_api,check_compatibility=False) | |
| co = cohere.Client(api_key=cohere_api_key) | |
| class EvaluationScore(str, Enum): | |
| POOR = "Poor" | |
| MEDIUM = "Medium" | |
| GOOD = "Good" | |
| EXCELLENT = "Excellent" | |
| # Cohere Reranker | |
| class CohereReranker: | |
| def __init__(self, client): | |
| self.client = client | |
| def compress_documents(self, documents, query): | |
| if not documents: | |
| return [] | |
| doc_texts = [doc.page_content for doc in documents] | |
| try: | |
| reranked = self.client.rerank( | |
| query=query, | |
| documents=doc_texts, | |
| model="rerank-english-v2.0", | |
| top_n=5 | |
| ) | |
| return [documents[result.index] for result in reranked.results] | |
| except Exception as e: | |
| logging.error(f"Error in CohereReranker.compress_documents: {e}") | |
| return documents[:5] | |
| reranker = CohereReranker(co) | |
| def load_data_from_json(file_path): | |
| """Load interview Q&A data from a JSON file.""" | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| job_role_buckets = defaultdict(list) | |
| for idx, item in enumerate(data): | |
| try: | |
| job_role = item["Job Role"].lower().strip() | |
| question = item["Questions"].strip() | |
| answer = item["Answers"].strip() | |
| job_role_buckets[job_role].append({"question": question, "answer": answer}) | |
| except KeyError as e: | |
| logging.warning(f"Skipping item {idx}: missing key {e}") | |
| return job_role_buckets # <--- You missed this! | |
| except Exception as e: | |
| logging.error(f"Error loading data: {e}") | |
| raise | |
| def verify_qdrant_collection(collection_name='interview_questions'): | |
| """Verify if a Qdrant collection exists with the correct configuration.""" | |
| try: | |
| collection_info = qdrant_client.get_collection(collection_name) | |
| vector_size = collection_info.config.params.vectors.size | |
| logging.info(f"Collection '{collection_name}' exists with vector size: {vector_size}") | |
| return True | |
| except Exception as e: | |
| logging.warning(f"Collection '{collection_name}' not found: {e}") | |
| return False | |
| def store_data_to_qdrant(data, collection_name='interview_questions', batch_size=100): | |
| """Store interview data in the Qdrant vector database.""" | |
| try: | |
| # Check if collection exists, otherwise create it | |
| if not verify_qdrant_collection(collection_name): | |
| try: | |
| qdrant_client.create_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams(size=384, distance=Distance.COSINE) | |
| ) | |
| logging.info(f"Created collection '{collection_name}'") | |
| except Exception as e: | |
| logging.error(f"Error creating collection: {e}\n{traceback.format_exc()}") | |
| return False | |
| points = [] | |
| point_id = 0 | |
| total_points = sum(len(qa_list) for qa_list in data.values()) | |
| processed = 0 | |
| for job_role, qa_list in data.items(): | |
| for entry in qa_list: | |
| try: | |
| emb = embeddings.embed_query(entry["question"]) | |
| print(f"Embedding shape: {len(emb)}") | |
| if not emb or len(emb) != 384: | |
| logging.warning(f"Skipping point {point_id} due to invalid embedding length: {len(emb)}") | |
| continue | |
| points.append(PointStruct( | |
| id=point_id, | |
| vector=emb, | |
| payload={ | |
| "job_role": job_role, | |
| "question": entry["question"], | |
| "answer": entry["answer"] | |
| } | |
| )) | |
| point_id += 1 | |
| processed += 1 | |
| # Batch upload | |
| if len(points) >= batch_size: | |
| try: | |
| qdrant_client.upsert(collection_name=collection_name, points=points) | |
| logging.info(f"Stored {processed}/{total_points} points ({processed/total_points*100:.1f}%)") | |
| except Exception as upsert_err: | |
| logging.error(f"Error during upsert: {upsert_err}\n{traceback.format_exc()}") | |
| points = [] | |
| except Exception as embed_err: | |
| logging.error(f"Embedding error for point {point_id}: {embed_err}\n{traceback.format_exc()}") | |
| # Final batch upload | |
| if points: | |
| try: | |
| qdrant_client.upsert(collection_name=collection_name, points=points) | |
| logging.info(f"Stored final batch of {len(points)} points") | |
| except Exception as final_upsert_err: | |
| logging.error(f"Error during final upsert: {final_upsert_err}\n{traceback.format_exc()}") | |
| # Final verification | |
| try: | |
| count = qdrant_client.count(collection_name=collection_name, exact=True).count | |
| print("Current count:", count) | |
| logging.info(f"✅ Successfully stored {count} points in Qdrant") | |
| if count != total_points: | |
| logging.warning(f"Expected {total_points} points but stored {count}") | |
| except Exception as count_err: | |
| logging.error(f"Error verifying stored points: {count_err}\n{traceback.format_exc()}") | |
| return True | |
| except Exception as e: | |
| logging.error(f"Error storing data to Qdrant: {e}\n{traceback.format_exc()}") | |
| return False | |
| # to ensure cosine similarity use | |
| info = qdrant_client.get_collection("interview_questions") | |
| print(info.config.params.vectors.distance) | |
| def extract_all_roles_from_qdrant(collection_name='interview_questions'): | |
| """ Extract all unique job roles from the Qdrant vector store """ | |
| try: | |
| all_roles = set() | |
| scroll_offset = None | |
| while True: | |
| response = qdrant_client.scroll( | |
| collection_name=collection_name, | |
| limit=200, | |
| offset=scroll_offset, | |
| with_payload=True | |
| ) | |
| points, next_page_offset = response | |
| if not points: | |
| break | |
| for point in points: | |
| role = point.payload.get("job_role", "").strip().lower() | |
| if role: | |
| all_roles.add(role) | |
| if not next_page_offset: | |
| break | |
| scroll_offset = next_page_offset | |
| if not all_roles: | |
| logging.warning("[Qdrant] No roles found in payloads.") | |
| else: | |
| logging.info(f"[Qdrant] Extracted {len(all_roles)} unique job roles.") | |
| return list(all_roles) | |
| except Exception as e: | |
| logging.error(f"Error extracting roles from Qdrant: {e}") | |
| return [] | |
| import numpy as np | |
| import logging | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| def find_similar_roles(user_role, all_roles, top_k=3): | |
| """ | |
| Find the most similar job roles to the given user_role using embeddings. | |
| """ | |
| try: | |
| # Clean inputs | |
| user_role = user_role.strip().lower() | |
| if not user_role or not all_roles or not isinstance(all_roles, list): | |
| logging.warning("Invalid input for role similarity") | |
| return [] | |
| # Embed user role | |
| try: | |
| user_embedding = embeddings.embed_query(user_role) | |
| if user_embedding is None: | |
| logging.error("User embedding is None") | |
| return [] | |
| except Exception as e: | |
| logging.error(f"Error embedding user role: {type(e).__name__}: {e}") | |
| return [] | |
| # Embed all roles | |
| try: | |
| role_embeddings = [] | |
| valid_roles = [] | |
| for role in all_roles: | |
| emb = embeddings.embed_query(role.lower()) | |
| if emb is not None: | |
| role_embeddings.append(emb) | |
| valid_roles.append(role) | |
| else: | |
| logging.warning(f"Skipping role with no embedding: {role}") | |
| except Exception as e: | |
| logging.error(f"Error embedding all roles: {type(e).__name__}: {e}") | |
| return [] | |
| if not role_embeddings: | |
| logging.error("All role embeddings failed") | |
| return [] | |
| # Compute similarities | |
| similarities = cosine_similarity([user_embedding], role_embeddings)[0] | |
| top_indices = np.argsort(similarities)[::-1][:top_k] | |
| similar_roles = [valid_roles[i] for i in top_indices] | |
| logging.debug(f"Similar roles to '{user_role}': {similar_roles}") | |
| return similar_roles | |
| except Exception as e: | |
| logging.error(f"Error finding similar roles: {type(e).__name__}: {e}", exc_info=True) | |
| return [] | |
| # RETREIVE ALL DATA RELATED TO THE JOB ROLE NOT JUST TOP_K | |
| def get_role_questions(job_role): | |
| try: | |
| if not job_role: | |
| logging.warning("Job role is empty.") | |
| return [] | |
| filter_by_role = Filter( | |
| must=[FieldCondition( | |
| key="job_role", | |
| match=MatchValue(value=job_role.lower()) | |
| )] | |
| ) | |
| all_results = [] | |
| offset = None | |
| while True: | |
| results, next_page_offset = qdrant_client.scroll( | |
| collection_name="interview_questions", | |
| scroll_filter=filter_by_role, | |
| with_payload=True, | |
| with_vectors=False, | |
| limit=100, # batch size | |
| offset=offset | |
| ) | |
| all_results.extend(results) | |
| if not next_page_offset: | |
| break | |
| offset = next_page_offset | |
| parsed_results = [{ | |
| "question": r.payload.get("question"), | |
| "answer": r.payload.get("answer"), | |
| "job_role": r.payload.get("job_role") | |
| } for r in all_results] | |
| return parsed_results | |
| except Exception as e: | |
| logging.error(f"Error fetching role questions: {type(e).__name__}: {e}", exc_info=True) | |
| return [] | |
| def retrieve_interview_data(job_role, all_roles): | |
| """ | |
| Retrieve all interview Q&A for a given job role. | |
| Falls back to similar roles if no data found. | |
| Args: | |
| job_role (str): Input job role (can be misspelled) | |
| all_roles (list): Full list of available job roles | |
| Returns: | |
| list: List of QA dicts with keys: 'question', 'answer', 'job_role' | |
| """ | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| job_role = job_role.strip().lower() | |
| seen_questions = set() | |
| final_results = [] | |
| # Step 1: Try exact match (fetch all questions for role) | |
| logging.info(f"Trying to fetch all data for exact role: '{job_role}'") | |
| exact_matches = get_role_questions(job_role) | |
| for qa in exact_matches: | |
| question = qa["question"] | |
| if question and question not in seen_questions: | |
| seen_questions.add(question) | |
| final_results.append(qa) | |
| if final_results: | |
| logging.info(f"Found {len(final_results)} QA pairs for exact role '{job_role}'") | |
| return final_results | |
| logging.warning(f"No data found for role '{job_role}'. Trying similar roles...") | |
| # Step 2: No matches — find similar roles | |
| similar_roles = find_similar_roles(job_role, all_roles, top_k=3) | |
| if not similar_roles: | |
| logging.warning("No similar roles found.") | |
| return [] | |
| logging.info(f"Found similar roles: {similar_roles}") | |
| # Step 3: Retrieve data for each similar role (all questions) | |
| for role in similar_roles: | |
| logging.info(f"Fetching data for similar role: '{role}'") | |
| role_qa = get_role_questions(role) | |
| for qa in role_qa: | |
| question = qa["question"] | |
| if question and question not in seen_questions: | |
| seen_questions.add(question) | |
| final_results.append(qa) | |
| logging.info(f"Retrieved total {len(final_results)} QA pairs from similar roles") | |
| return final_results | |
| import random | |
| def random_context_chunks(retrieved_data, k=3): | |
| chunks = random.sample(retrieved_data, k) | |
| return "\n\n".join([f"Q: {item['question']}\nA: {item['answer']}" for item in chunks]) | |
| import json | |
| import logging | |
| import re | |
| from typing import Dict | |
| def eval_question_quality( | |
| question: str, | |
| job_role: str, | |
| seniority: str | |
| ) -> Dict[str, str]: | |
| """ | |
| Evaluate the quality of a generated interview question using Groq LLM. | |
| Returns a structured JSON with score, reasoning, and suggestions. | |
| """ | |
| import time, json | |
| prompt = f""" | |
| You are a senior AI hiring expert evaluating the quality of an interview question for a {seniority} {job_role} role. | |
| Evaluate the question based on: | |
| - Relevance to the role and level | |
| - Clarity and conciseness | |
| - Depth of technical insight | |
| --- | |
| Question: {question} | |
| --- | |
| Respond only with a valid JSON like: | |
| {{ | |
| "Score": "Poor" | "Medium" | "Good" | "Excellent", | |
| "Reasoning": "short justification", | |
| "Improvements": ["tip1", "tip2"] | |
| }} | |
| """ | |
| try: | |
| start = time.time() | |
| response = groq_llm.invoke(prompt) | |
| print("⏱️ eval_question_quality duration:", round(time.time() - start, 2), "s") | |
| # Extract JSON safely | |
| start_idx = response.rfind("{") | |
| end_idx = response.rfind("}") + 1 | |
| json_str = response[start_idx:end_idx] | |
| result = json.loads(json_str) | |
| if result.get("Score") in {"Poor", "Medium", "Good", "Excellent"}: | |
| return result | |
| else: | |
| raise ValueError("Invalid Score value in model output") | |
| except Exception as e: | |
| print(f"⚠️ eval_question_quality fallback: {e}") | |
| return { | |
| "Score": "Poor", | |
| "Reasoning": "Evaluation failed, using fallback.", | |
| "Improvements": [ | |
| "Ensure the question is relevant and clear.", | |
| "Avoid vague or overly generic phrasing.", | |
| "Include role-specific context if needed." | |
| ] | |
| } | |
| def evaluate_answer( | |
| question: str, | |
| answer: str, | |
| ref_answer: str, | |
| job_role: str, | |
| seniority: str, | |
| ) -> Dict[str, str]: | |
| """ | |
| Fast and structured answer evaluation using Groq LLM (e.g. Mixtral or LLaMA 3). | |
| """ | |
| import time, json | |
| from langchain_core.messages import AIMessage | |
| prompt = f""" | |
| You are a technical interviewer evaluating a candidate for a {seniority} {job_role} role. | |
| Evaluate the response based on: | |
| - Technical correctness | |
| - Clarity | |
| - Relevance | |
| - Structure | |
| --- | |
| Question: {question} | |
| Candidate Answer: {answer} | |
| Reference Answer: {ref_answer} | |
| --- | |
| Respond ONLY with valid JSON in the following format: | |
| {{ | |
| "Score": "Poor" | "Medium" | "Good" | "Excellent", | |
| "Reasoning": "short justification", | |
| "Improvements": ["tip1", "tip2"] | |
| }} | |
| """ | |
| try: | |
| start = time.time() | |
| raw = groq_llm.invoke(prompt) | |
| print("⏱️ evaluate_answer duration:", round(time.time() - start, 2), "s") | |
| if isinstance(raw, AIMessage): | |
| output = raw.content | |
| else: | |
| output = str(raw) | |
| print("🔍 Raw Groq Response:\n", output) | |
| start_idx = output.rfind("{") | |
| end_idx = output.rfind("}") + 1 | |
| json_str = output[start_idx:end_idx] | |
| result = json.loads(json_str) | |
| if result.get("Score") in {"Poor", "Medium", "Good", "Excellent"}: | |
| return result | |
| else: | |
| raise ValueError("Invalid score value") | |
| except Exception as e: | |
| print(f"⚠️ evaluate_answer fallback: {e}") | |
| return { | |
| "Score": "Poor", | |
| "Reasoning": "Failed to evaluate properly. Defaulted to Poor.", | |
| "Improvements": [ | |
| "Be more specific", | |
| "Add technical details", | |
| "Structure the answer clearly" | |
| ] | |
| } | |
| # SAME BUT USING LLAMA 3.3 FROM GROQ | |
| def generate_reference_answer(question, job_role, seniority): | |
| """ | |
| Generates a high-quality reference answer using Groq-hosted LLaMA model. | |
| Args: | |
| question (str): Interview question to answer. | |
| job_role (str): Target job role (e.g., "Frontend Developer"). | |
| seniority (str): Experience level (e.g., "Mid-Level"). | |
| Returns: | |
| str: Clean, generated reference answer or error message. | |
| """ | |
| try: | |
| # Clean, role-specific prompt | |
| prompt = f"""You are a {seniority} {job_role}. | |
| Q: {question} | |
| A:""" | |
| # Use Groq-hosted model to generate the answer | |
| ref_answer = groq_llm.predict(prompt) | |
| if not ref_answer.strip(): | |
| return "Reference answer not generated." | |
| return ref_answer.strip() | |
| except Exception as e: | |
| logging.error(f"Error generating reference answer: {e}", exc_info=True) | |
| return "Unable to generate reference answer due to an error" | |
| def build_interview_prompt(conversation_history, user_response, context, job_role, skills, seniority, | |
| difficulty_adjustment=None, voice_label=None, face_label=None, effective_confidence=None): | |
| """Build a prompt for generating the next interview question with adaptive difficulty and fairness logic.""" | |
| interview_template = """ | |
| You are an AI interviewer conducting a real-time interview for a {job_role} position. | |
| Your objective is to thoroughly evaluate the candidate's suitability for the role using smart, structured, and adaptive questioning. | |
| --- | |
| Interview Rules and Principles: | |
| - The **baseline difficulty** of questions must match the candidate’s seniority level (e.g., junior, mid-level, senior). | |
| - Use your judgment to increase difficulty **slightly** if the candidate performs well, or simplify if they struggle — but never drop below the expected baseline for their level. | |
| - Avoid asking extremely difficult questions to junior candidates unless they’ve clearly demonstrated advanced knowledge. | |
| - Be fair: candidates for the same role should be evaluated within a consistent difficulty range. | |
| - Adapt your line of questioning gradually and logically based on the **overall flow**, not just the last answer. | |
| - Include real-world problem-solving scenarios to test how the candidate thinks and behaves practically. | |
| - You must **lead** the interview and make intelligent decisions about what to ask next. | |
| --- | |
| Context Use: | |
| {context_instruction} | |
| Note: | |
| If no relevant context was retrieved or the previous answer is unclear, you must still generate a thoughtful interview question using your own knowledge. Do not skip generation. Avoid default or fallback responses — always try to generate a meaningful and fair next question. | |
| --- | |
| Job Role: {job_role} | |
| Seniority Level: {seniority} | |
| Skills Focus: {skills} | |
| Difficulty Setting: {difficulty} (based on {difficulty_adjustment}) | |
| --- | |
| Recent Conversation History: | |
| {history} | |
| Candidate's Last Response: | |
| "{user_response}" | |
| Evaluation of Last Response: | |
| {response_evaluation} | |
| Voice Tone: {voice_label} | |
| --- | |
| --- | |
| Important: | |
| If no relevant context was retrieved or the previous answer is unclear or off-topic, | |
| you must still generate a meaningful and fair interview question using your own knowledge and best practices. | |
| Do not skip question generation or fall back to default/filler responses. | |
| --- | |
| Guidelines for Next Question: | |
| - If this is the beginning of the interview, start with a question about the candidate’s background or experience. | |
| - Base the difficulty primarily on the seniority level, with light adjustment from recent performance. | |
| - Focus on core skills, real-world applications, and depth of reasoning. | |
| - Ask only one question. Be clear and concise. | |
| Generate the next interview question now: | |
| """ | |
| # Calculate difficulty phrase | |
| if difficulty_adjustment == "harder": | |
| difficulty = f"slightly more challenging than typical for {seniority}" | |
| elif difficulty_adjustment == "easier": | |
| difficulty = f"slightly easier than typical for {seniority}" | |
| else: | |
| difficulty = f"appropriate for {seniority}" | |
| # Choose context logic | |
| if context.strip(): | |
| context_instruction = ( | |
| "Use both your own expertise and the provided context from relevant interview datasets. " | |
| "You can either build on questions from the dataset or generate your own." | |
| ) | |
| context = context.strip() | |
| else: | |
| context_instruction = ( | |
| "No specific context retrieved. Use your own knowledge and best practices to craft a question." | |
| ) | |
| context = "" # Let it be actually empty! | |
| # Format conversation history (last 6 exchanges max) | |
| recent_history = conversation_history[-6:] if len(conversation_history) > 6 else conversation_history | |
| formatted_history = "\n".join([f"{msg['role'].capitalize()}: {msg['content']}" for msg in recent_history]) | |
| # Add evaluation summary if available | |
| if conversation_history and conversation_history[-1].get("evaluation"): | |
| eval_data = conversation_history[-1]["evaluation"][-1] | |
| response_evaluation = f""" | |
| - Score: {eval_data.get('Score', 'N/A')} | |
| - Reasoning: {eval_data.get('Reasoning', 'N/A')} | |
| - Improvements: {eval_data.get('Improvements', 'N/A')} | |
| """ | |
| else: | |
| response_evaluation = "No evaluation available yet." | |
| # Fill the template | |
| prompt = interview_template.format( | |
| job_role=job_role, | |
| seniority=seniority, | |
| skills=skills, | |
| difficulty=difficulty, | |
| difficulty_adjustment=difficulty_adjustment if difficulty_adjustment else "default seniority", | |
| context_instruction=context_instruction, | |
| context=context, | |
| history=formatted_history, | |
| user_response=user_response, | |
| response_evaluation=response_evaluation.strip(), | |
| voice_label=voice_label or "unknown", | |
| ) | |
| return prompt | |
| def generate_llm_interview_report( | |
| interview_state, logged_samples, job_role, seniority | |
| ): | |
| from collections import Counter | |
| # Helper for converting score to 1–5 | |
| def score_label(label): | |
| mapping = { | |
| "confident": 5, "calm": 4, "neutral": 3, "nervous": 2, "anxious": 1, "unknown": 3 | |
| } | |
| return mapping.get(label.lower(), 3) | |
| def section_score(vals): | |
| return round(sum(vals)/len(vals), 2) if vals else "N/A" | |
| # Aggregate info | |
| scores, voice_conf, face_conf, comm_scores = [], [], [], [] | |
| tech_details, comm_details, emotion_details, relevance_details, problem_details = [], [], [], [], [] | |
| for entry in logged_samples: | |
| answer_eval = entry.get("answer_evaluation", {}) | |
| score = answer_eval.get("Score", "Not Evaluated") | |
| reasoning = answer_eval.get("Reasoning", "") | |
| if score.lower() in ["excellent", "good", "medium", "poor"]: | |
| score_map = {"excellent": 5, "good": 4, "medium": 3, "poor": 2} | |
| scores.append(score_map[score.lower()]) | |
| # Section details | |
| tech_details.append(reasoning) | |
| comm_details.append(reasoning) | |
| # Emotions/confidence | |
| voice_conf.append(score_label(entry.get("voice_label", "unknown"))) | |
| face_conf.append(score_label(entry.get("face_label", "unknown"))) | |
| # Communication estimate | |
| if entry["user_answer"]: | |
| length = len(entry["user_answer"].split()) | |
| comm_score = min(5, max(2, length // 30)) | |
| comm_scores.append(comm_score) | |
| # Compute averages for sections | |
| avg_problem = section_score(scores) | |
| avg_tech = section_score(scores) | |
| avg_comm = section_score(comm_scores) | |
| avg_emotion = section_score([(v+f)/2 for v, f in zip(voice_conf, face_conf)]) | |
| # Compute decision heuristics | |
| section_averages = [avg_problem, avg_tech, avg_comm, avg_emotion] | |
| numeric_avgs = [v for v in section_averages if isinstance(v, (float, int))] | |
| avg_overall = round(sum(numeric_avgs) / len(numeric_avgs), 2) if numeric_avgs else 0 | |
| # Hiring logic (you can customize thresholds) | |
| if avg_overall >= 4.5: | |
| verdict = "Strong Hire" | |
| elif avg_overall >= 4.0: | |
| verdict = "Hire" | |
| elif avg_overall >= 3.0: | |
| verdict = "Conditional Hire" | |
| else: | |
| verdict = "No Hire" | |
| # Build LLM report prompt | |
| transcript = "\n\n".join([ | |
| f"Q: {e['generated_question']}\nA: {e['user_answer']}\nScore: {e.get('answer_evaluation',{}).get('Score','')}\nReasoning: {e.get('answer_evaluation',{}).get('Reasoning','')}" | |
| for e in logged_samples | |
| ]) | |
| prompt = f""" | |
| You are a senior technical interviewer at a major tech company. | |
| Write a structured, realistic hiring report for this {seniority} {job_role} interview, using these section scores (scale 1–5, with 5 best): | |
| Section-wise Evaluation | |
| 1. *Problem Solving & Critical Thinking*: {avg_problem} | |
| 2. *Technical Depth & Knowledge*: {avg_tech} | |
| 3. *Communication & Clarity*: {avg_comm} | |
| 4. *Emotional Composure & Confidence*: {avg_emotion} | |
| 5. *Role Relevance*: 5 | |
| *Transcript* | |
| {transcript} | |
| Your report should have the following sections: | |
| 1. *Executive Summary* (realistic, hiring-committee style) | |
| 2. *Section-wise Comments* (for each numbered category above, with short paragraph citing specifics) | |
| 3. *Strengths & Weaknesses* (list at least 2 for each) | |
| 4. *Final Verdict*: {verdict} | |
| 5. *Recommendations* (2–3 for future improvement) | |
| Use realistic language. If some sections are N/A or lower than others, comment honestly. | |
| Interview Report: | |
| """ | |
| # LLM call, or just return prompt for review | |
| return groq_llm.predict(prompt) | |
| def get_user_info(): | |
| """ | |
| Collects essential information from the candidate before starting the interview. | |
| Returns a dictionary with keys: name, job_role, seniority, skills | |
| """ | |
| import logging | |
| logging.info("Collecting user information...") | |
| print("Welcome to the AI Interview Simulator!") | |
| print("Let’s set up your mock interview.\n") | |
| # Get user name | |
| name = input("What is your name? ").strip() | |
| while not name: | |
| print("Please enter your name.") | |
| name = input("What is your name? ").strip() | |
| # Get job role | |
| job_role = input(f"Hi {name}, what job role are you preparing for? (e.g. Frontend Developer) ").strip() | |
| while not job_role: | |
| print("Please specify the job role.") | |
| job_role = input("What job role are you preparing for? ").strip() | |
| # Get seniority level | |
| seniority_options = ["Entry-level", "Junior", "Mid-Level", "Senior", "Lead"] | |
| print("\nSelect your experience level:") | |
| for i, option in enumerate(seniority_options, 1): | |
| print(f"{i}. {option}") | |
| seniority_choice = None | |
| while seniority_choice not in range(1, len(seniority_options)+1): | |
| try: | |
| seniority_choice = int(input("Enter the number corresponding to your level: ")) | |
| except ValueError: | |
| print(f"Please enter a number between 1 and {len(seniority_options)}") | |
| seniority = seniority_options[seniority_choice - 1] | |
| # Get skills | |
| skills_input = input(f"\nWhat are your top skills relevant to {job_role}? (Separate with commas): ") | |
| skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] | |
| while not skills: | |
| print("Please enter at least one skill.") | |
| skills_input = input("Your top skills (comma-separated): ") | |
| skills = [skill.strip() for skill in skills_input.split(",") if skill.strip()] | |
| # Confirm collected info | |
| print("\n Interview Setup Complete!") | |
| print(f"Name: {name}") | |
| print(f"Job Role: {job_role}") | |
| print(f"Experience Level: {seniority}") | |
| print(f"Skills: {', '.join(skills)}") | |
| print("\nStarting your mock interview...\n") | |
| return { | |
| "name": name, | |
| "job_role": job_role, | |
| "seniority": seniority, | |
| "skills": skills | |
| } | |
| import threading | |
| def wait_for_user_response(timeout=200): | |
| """Wait for user input with timeout. Returns '' if no response.""" | |
| user_input = [] | |
| def get_input(): | |
| answer = input("Your Answer (within timeout): ").strip() | |
| user_input.append(answer) | |
| thread = threading.Thread(target=get_input) | |
| thread.start() | |
| thread.join(timeout) | |
| return user_input[0] if user_input else "" | |
| import json | |
| from datetime import datetime | |
| from time import time | |
| import random | |
| def interview_loop(max_questions, timeout_seconds=300, collection_name="interview_questions", judge_pipeline=None, save_path="interview_log.json"): | |
| user_info = get_user_info() | |
| job_role = user_info['job_role'] | |
| seniority = user_info['seniority'] | |
| skills = user_info['skills'] | |
| all_roles = extract_all_roles_from_qdrant(collection_name=collection_name) | |
| retrieved_data = retrieve_interview_data(job_role, all_roles) | |
| context_data = random_context_chunks(retrieved_data, k=4) | |
| conversation_history = [] | |
| interview_state = { | |
| "questions": [], | |
| "user_answer": [], | |
| "job_role": job_role, | |
| "seniority": seniority, | |
| "start_time": time() | |
| } | |
| # Store log for evaluation | |
| logged_samples = [] | |
| difficulty_adjustment = None | |
| for i in range(max_questions): | |
| last_user_response = conversation_history[-1]['content'] if conversation_history else "" | |
| # Generate question prompt | |
| prompt = build_interview_prompt( | |
| conversation_history=conversation_history, | |
| user_response=last_user_response, | |
| context=context_data, | |
| job_role=job_role, | |
| skills=skills, | |
| seniority=seniority, | |
| difficulty_adjustment=difficulty_adjustment | |
| ) | |
| question = groq_llm.predict(prompt) | |
| question_eval = eval_question_quality(question, job_role, seniority) | |
| conversation_history.append({'role': "Interviewer", "content": question}) | |
| print(f"Interviewer: Q{i + 1} : {question}") | |
| # Wait for user answer | |
| start_time = time() | |
| user_answer = wait_for_user_response(timeout=timeout_seconds) | |
| response_time = time() - start_time | |
| skipped = False | |
| answer_eval = None | |
| ref_answer = None | |
| if not user_answer: | |
| print("No Response Received, moving to next question.") | |
| user_answer = None | |
| skipped = True | |
| difficulty_adjustment = "medium" | |
| else: | |
| conversation_history.append({"role": "Candidate", "content": user_answer}) | |
| ref_answer = generate_reference_answer(question, job_role, seniority) | |
| answer_eval = evaluate_answer( | |
| question=question, | |
| answer=user_answer, | |
| ref_answer=ref_answer, | |
| job_role=job_role, | |
| seniority=seniority, | |
| judge_pipeline=judge_pipeline | |
| ) | |
| interview_state["user_answer"].append(user_answer) | |
| # Append inline evaluation for history | |
| conversation_history[-1].setdefault('evaluation', []).append({ | |
| "technical_depth": { | |
| "score": answer_eval['Score'], | |
| "Reasoning": answer_eval['Reasoning'] | |
| } | |
| }) | |
| # Adjust difficulty | |
| score = answer_eval['Score'].lower() | |
| if score == "excellent": | |
| difficulty_adjustment = "harder" | |
| elif score in ['poor', 'medium']: | |
| difficulty_adjustment = "easier" | |
| else: | |
| difficulty_adjustment = None | |
| # Store for local logging | |
| logged_samples.append({ | |
| "job_role": job_role, | |
| "seniority": seniority, | |
| "skills": skills, | |
| "context": context_data, | |
| "prompt": prompt, | |
| "generated_question": question, | |
| "question_evaluation": question_eval, | |
| "user_answer": user_answer, | |
| "reference_answer": ref_answer, | |
| "answer_evaluation": answer_eval, | |
| "skipped": skipped | |
| }) | |
| # Store state | |
| interview_state['questions'].append({ | |
| "question": question, | |
| "question_evaluation": question_eval, | |
| "user_answer": user_answer, | |
| "answer_evaluation": answer_eval, | |
| "skipped": skipped | |
| }) | |
| interview_state['end_time'] = time() | |
| report = generate_llm_interview_report(interview_state, job_role, seniority) | |
| print("Report : _____________________\n") | |
| print(report) | |
| print('______________________________________________') | |
| # Save full interview logs to JSON | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"{save_path.replace('.json', '')}_{timestamp}.json" | |
| with open(filename, "w", encoding="utf-8") as f: | |
| json.dump(logged_samples, f, indent=2, ensure_ascii=False) | |
| print(f" Interview log saved to {filename}") | |
| print("____________________________________\n") | |
| print(f"interview state : {interview_state}") | |
| return interview_state, report | |
| from sklearn.metrics import precision_score, recall_score, f1_score | |
| import numpy as np | |
| # build ground truth for retrieving data for testing | |
| def build_ground_truth(all_roles): | |
| gt = {} | |
| for role in all_roles: | |
| qa_list = get_role_questions(role) | |
| gt[role] = set(q["question"] for q in qa_list if q["question"]) | |
| return gt | |
| def evaluate_retrieval(job_role, all_roles, k=10): | |
| """ | |
| Evaluate retrieval quality using Precision@k, Recall@k, and F1@k. | |
| Args: | |
| job_role (str): The input job role to search for. | |
| all_roles (list): List of all available job roles in the system. | |
| k (int): Top-k retrieved questions to evaluate. | |
| Returns: | |
| dict: Evaluation metrics including precision, recall, and f1. | |
| """ | |
| # Step 1: Ground Truth (all exact questions stored for this role) | |
| ground_truth_qs = set( | |
| q["question"].strip() | |
| for q in get_role_questions(job_role) | |
| if q.get("question") | |
| ) | |
| if not ground_truth_qs: | |
| print(f"[!] No ground truth found for role: {job_role}") | |
| return {} | |
| # Step 2: Retrieved Questions (may include fallback roles) | |
| retrieved_qas = retrieve_interview_data(job_role, all_roles) | |
| retrieved_qs = [q["question"].strip() for q in retrieved_qas if q.get("question")] | |
| # Step 3: Take top-k retrieved (you can also do full if needed) | |
| retrieved_top_k = retrieved_qs[:k] | |
| # Step 4: Binary relevance (1 if in ground truth, 0 if not) | |
| y_true = [1 if q in ground_truth_qs else 0 for q in retrieved_top_k] | |
| y_pred = [1] * len(y_true) # all retrieved are treated as predicted relevant | |
| precision = precision_score(y_true, y_pred, zero_division=0) | |
| recall = recall_score(y_true, y_pred, zero_division=0) | |
| f1 = f1_score(y_true, y_pred, zero_division=0) | |
| print(f" Retrieval Evaluation for role: '{job_role}' (Top-{k})") | |
| print(f"Precision@{k}: {precision:.2f}") | |
| print(f"Recall@{k}: {recall:.2f}") | |
| print(f"F1@{k}: {f1:.2f}") | |
| print(f"Relevant Retrieved: {sum(y_true)}/{len(y_true)}") | |
| print("–" * 40) | |
| return { | |
| "job_role": job_role, | |
| "precision": precision, | |
| "recall": recall, | |
| "f1": f1, | |
| "relevant_retrieved": sum(y_true), | |
| "total_retrieved": len(y_true), | |
| "ground_truth_count": len(ground_truth_qs), | |
| } | |
| k_values = [5, 10, 20] | |
| all_roles = extract_all_roles_from_qdrant(collection_name="interview_questions") | |
| results = [] | |
| for k in k_values: | |
| for role in all_roles: | |
| metrics = evaluate_retrieval(role, all_roles, k=k) | |
| if metrics: # only if we found ground truth | |
| metrics["k"] = k | |
| results.append(metrics) | |
| import pandas as pd | |
| df = pd.DataFrame(results) | |
| summary = df.groupby("k")[["precision", "recall", "f1"]].mean().round(3) | |
| print(summary) | |
| def extract_job_details(job_description): | |
| """Extract job details such as title, skills, experience level, and years of experience from the job description.""" | |
| title_match = re.search(r"(?i)(?:seeking|hiring) a (.+?) to", job_description) | |
| job_title = title_match.group(1) if title_match else "Unknown" | |
| skills_match = re.findall(r"(?i)(?:Proficiency in|Experience with|Knowledge of) (.+?)(?:,|\.| and| or)", job_description) | |
| skills = list(set([skill.strip() for skill in skills_match])) if skills_match else [] | |
| experience_match = re.search(r"(\d+)\+? years of experience", job_description) | |
| if experience_match: | |
| years_experience = int(experience_match.group(1)) | |
| experience_level = "Senior" if years_experience >= 5 else "Mid" if years_experience >= 3 else "Junior" | |
| else: | |
| years_experience = None | |
| experience_level = "Unknown" | |
| return { | |
| "job_title": job_title, | |
| "skills": skills, | |
| "experience_level": experience_level, | |
| "years_experience": years_experience | |
| } | |
| import re | |
| from docx import Document | |
| import textract | |
| from PyPDF2 import PdfReader | |
| JOB_TITLES = [ | |
| "Accountant", "Data Scientist", "Machine Learning Engineer", "Software Engineer", | |
| "Developer", "Analyst", "Researcher", "Intern", "Consultant", "Manager", | |
| "Engineer", "Specialist", "Project Manager", "Product Manager", "Administrator", | |
| "Director", "Officer", "Assistant", "Coordinator", "Supervisor" | |
| ] | |
| def clean_filename_name(filename): | |
| # Remove file extension | |
| base = re.sub(r"\.[^.]+$", "", filename) | |
| base = base.strip() | |
| # Remove 'cv' or 'CV' words | |
| base_clean = re.sub(r"\bcv\b", "", base, flags=re.IGNORECASE).strip() | |
| # If after removing CV it's empty, return None | |
| if not base_clean: | |
| return None | |
| # If it contains any digit, return None (unreliable) | |
| if re.search(r"\d", base_clean): | |
| return None | |
| # Replace underscores/dashes with spaces, capitalize | |
| base_clean = base_clean.replace("_", " ").replace("-", " ") | |
| return base_clean.title() | |
| def looks_like_job_title(line): | |
| for title in JOB_TITLES: | |
| pattern = r"\b" + re.escape(title.lower()) + r"\b" | |
| if re.search(pattern, line.lower()): | |
| return True | |
| return False | |
| def extract_name_from_text(lines): | |
| # Try first 3 lines for a name, skipping job titles | |
| for i in range(min(1, len(lines))): | |
| line = lines[i].strip() | |
| if looks_like_job_title(line): | |
| return "unknown" | |
| if re.search(r"\d", line): # skip lines with digits | |
| continue | |
| if len(line.split()) > 4 or len(line) > 40: # too long or many words | |
| continue | |
| # If line has only uppercase words, it's probably not a name | |
| if line.isupper(): | |
| continue | |
| # Passed checks, return title-cased line as name | |
| return line.title() | |
| return None | |
| def extract_text_from_file(file_path): | |
| if file_path.endswith('.pdf'): | |
| reader = PdfReader(file_path) | |
| text = "\n".join(page.extract_text() or '' for page in reader.pages) | |
| elif file_path.endswith('.docx'): | |
| doc = Document(file_path) | |
| text = "\n".join([para.text for para in doc.paragraphs]) | |
| else: # For .doc or fallback | |
| text = textract.process(file_path).decode('utf-8') | |
| return text.strip() | |
| def extract_candidate_details(file_path): | |
| text = extract_text_from_file(file_path) | |
| lines = [line.strip() for line in text.splitlines() if line.strip()] | |
| # Extract name | |
| filename = file_path.split("/")[-1] # just filename, no path | |
| name = clean_filename_name(filename) | |
| if not name: | |
| name = extract_name_from_text(lines) | |
| if not name: | |
| name = "Unknown" | |
| # Extract skills (basic version) | |
| skills = [] | |
| skills_section = re.search(r"Skills\s*[:\-]?\s*(.+)", text, re.IGNORECASE) | |
| if skills_section: | |
| raw_skills = skills_section.group(1) | |
| skills = [s.strip() for s in re.split(r",|\n|•|-", raw_skills) if s.strip()] | |
| return { | |
| "name": name, | |
| "skills": skills | |
| } | |
| # import gradio as gr | |
| # import time | |
| # import tempfile | |
| # import numpy as np | |
| # import scipy.io.wavfile as wavfile | |
| # import os | |
| # import json | |
| # from transformers import BarkModel, AutoProcessor | |
| # import torch, gc | |
| # import whisper | |
| # from transformers import Wav2Vec2Processor, Wav2Vec2ForSequenceClassification | |
| # import librosa | |
| # import torch | |
| # print(torch.cuda.is_available()) # ✅ Tells you if GPU is available | |
| # torch.cuda.empty_cache() | |
| # gc.collect() | |
| # # Bark TTS | |
| # print("🔁 Loading Bark model...") | |
| # model_bark = BarkModel.from_pretrained("suno/bark").to("cuda" if torch.cuda.is_available() else "cpu") | |
| # print("✅ Bark model loaded") | |
| # print("🔁 Loading Bark processor...") | |
| # processor_bark = AutoProcessor.from_pretrained("suno/bark") | |
| # print("✅ Bark processor loaded") | |
| # bark_voice_preset = "v2/en_speaker_5" | |
| # def bark_tts(text): | |
| # print(f"🔁 Synthesizing TTS for: {text}") | |
| # # Process the text | |
| # inputs = processor_bark(text, return_tensors="pt", voice_preset=bark_voice_preset) | |
| # # Move tensors to device | |
| # input_ids = inputs["input_ids"].to(model_bark.device) | |
| # start = time.time() | |
| # # Generate speech with only the required parameters | |
| # with torch.no_grad(): | |
| # speech_values = model_bark.generate( | |
| # input_ids=input_ids, | |
| # do_sample=True, | |
| # fine_temperature=0.4, | |
| # coarse_temperature=0.8 | |
| # ) | |
| # print(f"✅ Bark finished in {round(time.time() - start, 2)}s") | |
| # # Convert to audio | |
| # speech = speech_values.cpu().numpy().squeeze() | |
| # speech = (speech * 32767).astype(np.int16) | |
| # temp_wav = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") | |
| # wavfile.write(temp_wav.name, 22050, speech) | |
| # return temp_wav.name | |
| # # Whisper STT | |
| # print("🔁 Loading Whisper model...") | |
| # whisper_model = whisper.load_model("base", device="cuda") | |
| # print("✅ Whisper model loaded") | |
| # def whisper_stt(audio_path): | |
| # if not audio_path or not os.path.exists(audio_path): return "" | |
| # result = whisper_model.transcribe(audio_path) | |
| # return result["text"] | |
| # seniority_mapping = { | |
| # "Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 | |
| # } | |
| # # --- 2. Gradio App --- | |
| # with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| # user_data = gr.State({}) | |
| # interview_state = gr.State({}) | |
| # missing_fields_state = gr.State([]) | |
| # # --- UI Layout --- | |
| # with gr.Column(visible=True) as user_info_section: | |
| # gr.Markdown("## Candidate Information") | |
| # cv_file = gr.File(label="Upload CV") | |
| # job_desc = gr.Textbox(label="Job Description") | |
| # start_btn = gr.Button("Continue", interactive=False) | |
| # with gr.Column(visible=False) as missing_section: | |
| # gr.Markdown("## Missing Information") | |
| # name_in = gr.Textbox(label="Name", visible=False) | |
| # role_in = gr.Textbox(label="Job Role", visible=False) | |
| # seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) | |
| # skills_in = gr.Textbox(label="Skills", visible=False) | |
| # submit_btn = gr.Button("Submit", interactive=False) | |
| # with gr.Column(visible=False) as interview_pre_section: | |
| # pre_interview_greeting_md = gr.Markdown() | |
| # start_interview_final_btn = gr.Button("Start Interview") | |
| # with gr.Column(visible=False) as interview_section: | |
| # gr.Markdown("## Interview in Progress") | |
| # question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) | |
| # question_text = gr.Markdown() | |
| # user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") | |
| # stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") | |
| # confirm_btn = gr.Button("Confirm Answer") | |
| # evaluation_display = gr.Markdown() | |
| # interview_summary = gr.Markdown(visible=False) | |
| # # --- UI Logic --- | |
| # def validate_start_btn(cv_file, job_desc): | |
| # return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) | |
| # cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
| # job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
| # def process_and_route_initial(cv_file, job_desc): | |
| # details = extract_candidate_details(cv_file.name) | |
| # job_info = extract_job_details(job_desc) | |
| # data = { | |
| # "name": details.get("name", "unknown"), "job_role": job_info.get("job_title", "unknown"), | |
| # "seniority": job_info.get("experience_level", "unknown"), "skills": job_info.get("skills", []) | |
| # } | |
| # missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] | |
| # if missing: | |
| # return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
| # else: | |
| # greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." | |
| # return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) | |
| # start_btn.click( | |
| # process_and_route_initial, | |
| # [cv_file, job_desc], | |
| # [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md] | |
| # ) | |
| # def show_missing(missing): | |
| # if missing is None: missing = [] | |
| # return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) | |
| # missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) | |
| # def validate_fields(name, role, seniority, skills, missing): | |
| # if not missing: return gr.update(interactive=False) | |
| # all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip())),]) | |
| # return gr.update(interactive=all_filled) | |
| # for inp in [name_in, role_in, seniority_in, skills_in]: | |
| # inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) | |
| # def complete_manual(data, name, role, seniority, skills): | |
| # if data["name"].lower() == "unknown": data["name"] = name | |
| # if data["job_role"].lower() == "unknown": data["job_role"] = role | |
| # if data["seniority"].lower() == "unknown": data["seniority"] = seniority | |
| # if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] | |
| # greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." | |
| # return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) | |
| # submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) | |
| # def start_interview(data): | |
| # # --- Advanced state with full logging --- | |
| # state = { | |
| # "questions": [], "answers": [], "face_labels": [], "voice_labels": [], "timings": [], | |
| # "question_evaluations": [], "answer_evaluations": [], "effective_confidences": [], | |
| # "conversation_history": [], | |
| # "difficulty_adjustment": None, | |
| # "question_idx": 0, "max_questions": 3, "q_start_time": time.time(), | |
| # "log": [] | |
| # } | |
| # # --- Optionally: context retrieval here (currently just blank) --- | |
| # context = "" | |
| # prompt = build_interview_prompt( | |
| # conversation_history=[], user_response="", context=context, job_role=data["job_role"], | |
| # skills=data["skills"], seniority=data["seniority"], difficulty_adjustment=None, | |
| # voice_label="neutral", face_label="neutral" | |
| # ) | |
| # #here the original one | |
| # # first_q = groq_llm.predict(prompt) | |
| # # # Evaluate Q for quality | |
| # # q_eval = eval_question_quality(first_q, data["job_role"], data["seniority"], None) | |
| # # state["questions"].append(first_q) | |
| # # state["question_evaluations"].append(q_eval) | |
| # #here the testing one | |
| # first_q = groq_llm.predict(prompt) | |
| # q_eval = { | |
| # "Score": "N/A", | |
| # "Reasoning": "Skipped to reduce processing time", | |
| # "Improvements": [] | |
| # } | |
| # state["questions"].append(first_q) | |
| # state["question_evaluations"].append(q_eval) | |
| # state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) | |
| # start = time.perf_counter() | |
| # audio_path = bark_tts(first_q) | |
| # print("⏱️ Bark TTS took", time.perf_counter() - start, "seconds") | |
| # # LOG | |
| # state["log"].append({"type": "question", "question": first_q, "question_eval": q_eval, "timestamp": time.time()}) | |
| # return state, gr.update(visible=False), gr.update(visible=True), audio_path, f"*Question 1:* {first_q}" | |
| # start_interview_final_btn.click(start_interview, [user_data], [interview_state, interview_pre_section, interview_section, question_audio, question_text]) | |
| # def transcribe(audio_path): | |
| # return whisper_stt(audio_path) | |
| # user_audio_input.change(transcribe, user_audio_input, stt_transcript) | |
| # def process_answer(transcript, audio_path, state, data): | |
| # if not transcript: | |
| # return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
| # elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) | |
| # state["timings"].append(elapsed) | |
| # state["answers"].append(transcript) | |
| # state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) | |
| # # --- 1. Emotion analysis (simplified for testing) --- | |
| # voice_label = "neutral" | |
| # face_label = "neutral" | |
| # state["voice_labels"].append(voice_label) | |
| # state["face_labels"].append(face_label) | |
| # # --- 2. Evaluate previous Q and Answer --- | |
| # last_q = state["questions"][-1] | |
| # q_eval = state["question_evaluations"][-1] # Already in state | |
| # ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) | |
| # answer_eval = evaluate_answer(last_q, transcript, ref_answer, data["job_role"], data["seniority"], None) | |
| # state["answer_evaluations"].append(answer_eval) | |
| # answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" | |
| # # --- 3. Adaptive difficulty --- | |
| # if answer_score == "excellent": | |
| # state["difficulty_adjustment"] = "harder" | |
| # elif answer_score in ("medium", "poor"): | |
| # state["difficulty_adjustment"] = "easier" | |
| # else: | |
| # state["difficulty_adjustment"] = None | |
| # # --- 4. Effective confidence (simplified) --- | |
| # eff_conf = {"effective_confidence": 0.6} | |
| # state["effective_confidences"].append(eff_conf) | |
| # # --- LOG --- | |
| # state["log"].append({ | |
| # "type": "answer", | |
| # "question": last_q, | |
| # "answer": transcript, | |
| # "answer_eval": answer_eval, | |
| # "ref_answer": ref_answer, | |
| # "face_label": face_label, | |
| # "voice_label": voice_label, | |
| # "effective_confidence": eff_conf, | |
| # "timing": elapsed, | |
| # "timestamp": time.time() | |
| # }) | |
| # # --- Next or End --- | |
| # qidx = state["question_idx"] + 1 | |
| # if qidx >= state["max_questions"]: | |
| # # Save as JSON (optionally) | |
| # timestamp = time.strftime("%Y%m%d_%H%M%S") | |
| # log_file = f"interview_log_{timestamp}.json" | |
| # with open(log_file, "w", encoding="utf-8") as f: | |
| # json.dump(state["log"], f, indent=2, ensure_ascii=False) | |
| # # Report | |
| # summary = "# Interview Summary\n" | |
| # for i, q in enumerate(state["questions"]): | |
| # summary += (f"\n### Q{i + 1}: {q}\n" | |
| # f"- *Answer*: {state['answers'][i]}\n" | |
| # f"- *Q Eval*: {state['question_evaluations'][i]}\n" | |
| # f"- *A Eval*: {state['answer_evaluations'][i]}\n" | |
| # f"- *Time*: {state['timings'][i]}s\n") | |
| # summary += f"\n\n⏺ Full log saved as {log_file}." | |
| # return (state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=f"Last Detected — Face: {face_label}, Voice: {voice_label}")) | |
| # else: | |
| # # --- Build next prompt using adaptive difficulty --- | |
| # state["question_idx"] = qidx | |
| # state["q_start_time"] = time.time() | |
| # context = "" # You can add your context logic here | |
| # prompt = build_interview_prompt( | |
| # conversation_history=state["conversation_history"], | |
| # user_response=transcript, | |
| # context=context, | |
| # job_role=data["job_role"], | |
| # skills=data["skills"], | |
| # seniority=data["seniority"], | |
| # difficulty_adjustment=state["difficulty_adjustment"], | |
| # voice_label=voice_label, | |
| # ) | |
| # next_q = groq_llm.predict(prompt) | |
| # # Evaluate Q quality | |
| # q_eval = eval_question_quality(next_q, data["job_role"], data["seniority"], None) | |
| # state["questions"].append(next_q) | |
| # state["question_evaluations"].append(q_eval) | |
| # state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) | |
| # state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) | |
| # audio_path = bark_tts(next_q) | |
| # # Display evaluations | |
| # eval_md = f"*Last Answer Eval:* {answer_eval}\n\n*Effective Confidence:* {eff_conf}" | |
| # return ( | |
| # state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", | |
| # gr.update(value=None), gr.update(value=None), | |
| # gr.update(visible=True, value=eval_md), | |
| # ) | |
| # # Replace your confirm_btn.click with this: | |
| # confirm_btn.click( | |
| # process_answer, | |
| # [stt_transcript, user_audio_input, interview_state, user_data], # Added None for video_path | |
| # [interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, evaluation_display] | |
| # ).then( | |
| # lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript] | |
| # ) | |
| # demo.launch(debug=True) | |
| import gradio as gr | |
| import time | |
| import tempfile | |
| import numpy as np | |
| import scipy.io.wavfile as wavfile | |
| import os | |
| import json | |
| import edge_tts | |
| import torch, gc | |
| from faster_whisper import WhisperModel | |
| import asyncio | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor | |
| print(torch.cuda.is_available()) | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| # Global variables for lazy loading | |
| faster_whisper_model = None | |
| tts_voice = "en-US-AriaNeural" | |
| # Thread pool for async operations | |
| executor = ThreadPoolExecutor(max_workers=2) | |
| # Add after your imports | |
| if torch.cuda.is_available(): | |
| print(f"🔥 CUDA Available: {torch.cuda.get_device_name(0)}") | |
| print(f"🔥 CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB") | |
| # Set default device | |
| torch.cuda.set_device(0) | |
| else: | |
| print("⚠️ CUDA not available, using CPU") | |
| def load_models_lazy(): | |
| """Load models only when needed""" | |
| global faster_whisper_model | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"🔁 Using device: {device}") | |
| if faster_whisper_model is None: | |
| print("🔁 Loading Faster-Whisper model...") | |
| compute_type = "float16" if device == "cuda" else "int8" | |
| faster_whisper_model = WhisperModel("base", device=device, compute_type=compute_type) | |
| print(f"✅ Faster-Whisper model loaded on {device}") | |
| async def edge_tts_to_file(text, output_path="tts.wav", voice=tts_voice): | |
| communicate = edge_tts.Communicate(text, voice) | |
| await communicate.save(output_path) | |
| return output_path | |
| def tts_async(text): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| return executor.submit(loop.run_until_complete, edge_tts_to_file(text)) | |
| def whisper_stt(audio_path): | |
| """STT using Faster-Whisper""" | |
| if not audio_path or not os.path.exists(audio_path): | |
| return "" | |
| load_models_lazy() | |
| print("🔁 Transcribing with Faster-Whisper") | |
| segments, _ = faster_whisper_model.transcribe(audio_path) | |
| transcript = " ".join(segment.text for segment in segments) | |
| return transcript.strip() | |
| seniority_mapping = { | |
| "Entry-level": 1, "Junior": 2, "Mid-Level": 3, "Senior": 4, "Lead": 5 | |
| } | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| user_data = gr.State({}) | |
| interview_state = gr.State({}) | |
| missing_fields_state = gr.State([]) | |
| tts_future = gr.State(None) # Store async TTS future | |
| with gr.Column(visible=True) as user_info_section: | |
| gr.Markdown("## Candidate Information") | |
| cv_file = gr.File(label="Upload CV") | |
| job_desc = gr.Textbox(label="Job Description") | |
| start_btn = gr.Button("Continue", interactive=False) | |
| with gr.Column(visible=False) as missing_section: | |
| gr.Markdown("## Missing Information") | |
| name_in = gr.Textbox(label="Name", visible=False) | |
| role_in = gr.Textbox(label="Job Role", visible=False) | |
| seniority_in = gr.Dropdown(list(seniority_mapping.keys()), label="Seniority", visible=False) | |
| skills_in = gr.Textbox(label="Skills", visible=False) | |
| submit_btn = gr.Button("Submit", interactive=False) | |
| with gr.Column(visible=False) as interview_pre_section: | |
| pre_interview_greeting_md = gr.Markdown() | |
| start_interview_final_btn = gr.Button("Start Interview") | |
| loading_status = gr.Markdown("", visible=False) | |
| with gr.Column(visible=False) as interview_section: | |
| gr.Markdown("## Interview in Progress") | |
| question_audio = gr.Audio(label="Listen", interactive=False, autoplay=True) | |
| question_text = gr.Markdown() | |
| user_audio_input = gr.Audio(sources=["microphone"], type="filepath", label="1. Record Audio Answer") | |
| stt_transcript = gr.Textbox(label="Transcribed Answer (edit if needed)") | |
| confirm_btn = gr.Button("Confirm Answer") | |
| evaluation_display = gr.Markdown() | |
| interview_summary = gr.Markdown(visible=False) | |
| def validate_start_btn(cv_file, job_desc): | |
| return gr.update(interactive=(cv_file is not None and hasattr(cv_file, "name") and bool(job_desc and job_desc.strip()))) | |
| cv_file.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
| job_desc.change(validate_start_btn, [cv_file, job_desc], start_btn) | |
| def process_and_route_initial(cv_file, job_desc): | |
| details = extract_candidate_details(cv_file.name) | |
| job_info = extract_job_details(job_desc) | |
| data = { | |
| "name": details.get("name", "unknown"), | |
| "job_role": job_info.get("job_title", "unknown"), | |
| "seniority": job_info.get("experience_level", "unknown"), | |
| "skills": job_info.get("skills", []) | |
| } | |
| missing = [k for k, v in data.items() if (isinstance(v, str) and v.lower() == "unknown") or not v] | |
| if missing: | |
| return data, missing, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
| else: | |
| greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' when ready." | |
| return data, missing, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True, value=greeting) | |
| start_btn.click(process_and_route_initial, [cv_file, job_desc], [user_data, missing_fields_state, user_info_section, missing_section, pre_interview_greeting_md]) | |
| def show_missing(missing): | |
| if missing is None: missing = [] | |
| return gr.update(visible="name" in missing), gr.update(visible="job_role" in missing), gr.update(visible="seniority" in missing), gr.update(visible="skills" in missing) | |
| missing_fields_state.change(show_missing, missing_fields_state, [name_in, role_in, seniority_in, skills_in]) | |
| def validate_fields(name, role, seniority, skills, missing): | |
| if not missing: return gr.update(interactive=False) | |
| all_filled = all([(not ("name" in missing) or bool(name.strip())), (not ("job_role" in missing) or bool(role.strip())), (not ("seniority" in missing) or bool(seniority)), (not ("skills" in missing) or bool(skills.strip()))]) | |
| return gr.update(interactive=all_filled) | |
| for inp in [name_in, role_in, seniority_in, skills_in]: | |
| inp.change(validate_fields, [name_in, role_in, seniority_in, skills_in, missing_fields_state], submit_btn) | |
| def complete_manual(data, name, role, seniority, skills): | |
| if data["name"].lower() == "unknown": data["name"] = name | |
| if data["job_role"].lower() == "unknown": data["job_role"] = role | |
| if data["seniority"].lower() == "unknown": data["seniority"] = seniority | |
| if not data["skills"]: data["skills"] = [s.strip() for s in skills.split(",")] | |
| greeting = f"Hello {data['name']}, your profile is ready. Click 'Start Interview' to begin." | |
| return data, gr.update(visible=False), gr.update(visible=True), gr.update(value=greeting) | |
| submit_btn.click(complete_manual, [user_data, name_in, role_in, seniority_in, skills_in], [user_data, missing_section, interview_pre_section, pre_interview_greeting_md]) | |
| async def start_interview(data): | |
| # Initialize interview state | |
| state = { | |
| "questions": [], | |
| "answers": [], | |
| "timings": [], | |
| "question_evaluations": [], | |
| "answer_evaluations": [], | |
| "conversation_history": [], | |
| "difficulty_adjustment": None, | |
| "question_idx": 0, | |
| "max_questions": 3, | |
| "q_start_time": time.time(), | |
| "log": [] | |
| } | |
| # Build prompt for first question | |
| context = "" | |
| prompt = build_interview_prompt( | |
| conversation_history=[], | |
| user_response="", | |
| context=context, | |
| job_role=data["job_role"], | |
| skills=data["skills"], | |
| seniority=data["seniority"], | |
| difficulty_adjustment=None, | |
| voice_label="neutral" | |
| ) | |
| # Generate first question | |
| start = time.time() | |
| first_q = groq_llm.predict(prompt) | |
| print("⏱️ Groq LLM Response Time:", round(time.time() - start, 2), "seconds") | |
| q_eval = { | |
| "Score": "N/A", | |
| "Reasoning": "Skipped to reduce processing time", | |
| "Improvements": [] | |
| } | |
| state["questions"].append(first_q) | |
| state["question_evaluations"].append(q_eval) | |
| state["conversation_history"].append({'role': 'Interviewer', 'content': first_q}) | |
| # Generate audio with Bark (wait for it) | |
| start = time.perf_counter() | |
| cleaned_text = first_q.strip().replace("\n", " ") | |
| audio_path = await edge_tts_to_file(first_q) | |
| print("⏱️ TTS (edge-tts) took", round(time.perf_counter() - start, 2), "seconds") | |
| # Log question | |
| state["log"].append({ | |
| "type": "question", | |
| "question": first_q, | |
| "question_eval": q_eval, | |
| "timestamp": time.time() | |
| }) | |
| return ( | |
| state, | |
| gr.update(visible=False), # Hide interview_pre_section | |
| gr.update(visible=True), # Show interview_section | |
| audio_path, # Set audio | |
| f"*Question 1:* {first_q}" # Set question text | |
| ) | |
| # Hook into Gradio | |
| start_interview_final_btn.click( | |
| fn=start_interview, | |
| inputs=[user_data], | |
| outputs=[interview_state, interview_pre_section, interview_section, question_audio, question_text], | |
| concurrency_limit=1 | |
| ) | |
| def transcribe(audio_path): | |
| return whisper_stt(audio_path) | |
| user_audio_input.change(transcribe, user_audio_input, stt_transcript) | |
| async def process_answer(transcript, audio_path, state, data): | |
| start = time.time() | |
| if not transcript: | |
| return state, gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
| elapsed = round(time.time() - state.get("q_start_time", time.time()), 2) | |
| state["timings"].append(elapsed) | |
| state["answers"].append(transcript) | |
| state["conversation_history"].append({'role': 'Candidate', 'content': transcript}) | |
| last_q = state["questions"][-1] | |
| q_eval = state["question_evaluations"][-1] | |
| ref_answer = generate_reference_answer(last_q, data["job_role"], data["seniority"]) | |
| answer_eval = await asyncio.get_event_loop().run_in_executor( | |
| executor, | |
| evaluate_answer, | |
| last_q, transcript, ref_answer, data["job_role"], data["seniority"] | |
| ) | |
| state["answer_evaluations"].append(answer_eval) | |
| answer_score = answer_eval.get("Score", "medium") if answer_eval else "medium" | |
| if answer_score == "excellent": | |
| state["difficulty_adjustment"] = "harder" | |
| elif answer_score in ("medium", "poor"): | |
| state["difficulty_adjustment"] = "easier" | |
| else: | |
| state["difficulty_adjustment"] = None | |
| state["log"].append({ | |
| "type": "answer", "question": last_q, "answer": transcript, | |
| "answer_eval": answer_eval, "ref_answer": ref_answer, | |
| "timing": elapsed, "timestamp": time.time() | |
| }) | |
| qidx = state["question_idx"] + 1 | |
| if qidx >= state["max_questions"]: | |
| timestamp = time.strftime("%Y%m%d_%H%M%S") | |
| log_file = f"interview_log_{timestamp}.json" | |
| with open(log_file, "w", encoding="utf-8") as f: | |
| json.dump(state["log"], f, indent=2, ensure_ascii=False) | |
| summary = "# Interview Summary\n" | |
| for i, q in enumerate(state["questions"]): | |
| summary += (f"\n### Q{i + 1}: {q}\n" | |
| f"- *Answer*: {state['answers'][i]}\n" | |
| f"- *Q Eval*: {state['question_evaluations'][i]}\n" | |
| f"- *A Eval*: {state['answer_evaluations'][i]}\n" | |
| f"- *Time*: {state['timings'][i]}s\n") | |
| summary += f"\n\n⏺ Full log saved as {log_file}." | |
| return state, gr.update(visible=True, value=summary), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(value=None), gr.update(visible=False) | |
| else: | |
| state["question_idx"] = qidx | |
| state["q_start_time"] = time.time() | |
| context = "" | |
| prompt = build_interview_prompt( | |
| conversation_history=state["conversation_history"], | |
| user_response=transcript, context=context, | |
| job_role=data["job_role"], skills=data["skills"], | |
| seniority=data["seniority"], difficulty_adjustment=state["difficulty_adjustment"], | |
| voice_label="neutral" | |
| ) | |
| start = time.time() | |
| next_q = groq_llm.predict(prompt) | |
| print("⏱️ Groq LLM Response Time:", round(time.time() - start, 2), "seconds") | |
| start = time.time() | |
| q_eval_future = executor.submit( | |
| eval_question_quality, | |
| next_q, data["job_role"], data["seniority"] | |
| ) | |
| q_eval = q_eval_future.result() | |
| print("⏱️ Evaluation time:", round(time.time() - start, 2), "seconds") | |
| state["questions"].append(next_q) | |
| state["question_evaluations"].append(q_eval) | |
| state["conversation_history"].append({'role': 'Interviewer', 'content': next_q}) | |
| state["log"].append({"type": "question", "question": next_q, "question_eval": q_eval, "timestamp": time.time()}) | |
| audio_path = await edge_tts_to_file(next_q) | |
| eval_md = f"*Last Answer Eval:* {answer_eval}" | |
| print("✅ process_answer time:", round(time.time() - start, 2), "s") | |
| return state, gr.update(visible=False), audio_path, f"*Question {qidx + 1}:* {next_q}", gr.update(value=None), gr.update(value=None), gr.update(visible=True, value=eval_md) | |
| confirm_btn.click( | |
| fn=process_answer, | |
| inputs=[stt_transcript, user_audio_input, interview_state, user_data], | |
| outputs=[interview_state, interview_summary, question_audio, question_text, user_audio_input, stt_transcript, | |
| evaluation_display], | |
| concurrency_limit=1 | |
| ).then( | |
| lambda: (gr.update(value=None), gr.update(value=None)), None, [user_audio_input, stt_transcript] | |
| ) | |
| demo.launch(debug=True) |