import os import torch import numpy as np import uuid import requests import time import json from pydub import AudioSegment import wave from nemo.collections.asr.models import EncDecSpeakerLabelModel from pinecone import Pinecone, ServerlessSpec import librosa import pandas as pd from sklearn.ensemble import RandomForestClassifier from sklearn.preprocessing import StandardScaler from sklearn.feature_extraction.text import TfidfVectorizer import re from typing import Dict, List, Tuple import logging # --- Imports for enhanced PDF --- from reportlab.lib.pagesizes import letter from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import inch from reportlab.lib import colors # --- End Imports for enhanced PDF --- from transformers import AutoTokenizer, AutoModel import spacy import google.generativeai as genai import joblib from concurrent.futures import ThreadPoolExecutor # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) logging.getLogger("nemo_logging").setLevel(logging.ERROR) # Configuration AUDIO_DIR = "./uploads" OUTPUT_DIR = "./processed_audio" os.makedirs(OUTPUT_DIR, exist_ok=True) # API Keys PINECONE_KEY = os.getenv("PINECONE_KEY") ASSEMBLYAI_KEY = os.getenv("ASSEMBLYAI_KEY") GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") # Initialize services def initialize_services(): try: pc = Pinecone(api_key=PINECONE_KEY) index_name = "interview-speaker-embeddings" if index_name not in pc.list_indexes().names(): pc.create_index( name=index_name, dimension=192, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1") ) index = pc.Index(index_name) genai.configure(api_key=GEMINI_API_KEY) gemini_model = genai.GenerativeModel('gemini-1.5-flash') return index, gemini_model except Exception as e: logger.error(f"Error initializing services: {str(e)}") raise index, gemini_model = initialize_services() # Device setup device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f"Using device: {device}") def load_speaker_model(): try: import torch torch.set_num_threads(5) model = EncDecSpeakerLabelModel.from_pretrained( "nvidia/speakerverification_en_titanet_large", map_location=torch.device('cpu') ) model.eval() return model except Exception as e: logger.error(f"Model loading failed: {str(e)}") raise RuntimeError("Could not load speaker verification model") # Load ML models def load_models(): speaker_model = load_speaker_model() nlp = spacy.load("en_core_web_sm") tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased") llm_model = AutoModel.from_pretrained("distilbert-base-uncased").to(device) llm_model.eval() return speaker_model, nlp, tokenizer, llm_model speaker_model, nlp, tokenizer, llm_model = load_models() # Audio processing functions def convert_to_wav(audio_path: str, output_dir: str = OUTPUT_DIR) -> str: try: audio = AudioSegment.from_file(audio_path) if audio.channels > 1: audio = audio.set_channels(1) audio = audio.set_frame_rate(16000) wav_file = os.path.join(output_dir, f"{uuid.uuid4()}.wav") audio.export(wav_file, format="wav") return wav_file except Exception as e: logger.error(f"Audio conversion failed: {str(e)}") raise def extract_prosodic_features(audio_path: str, start_ms: int, end_ms: int) -> Dict: try: audio = AudioSegment.from_file(audio_path) segment = audio[start_ms:end_ms] temp_path = os.path.join(OUTPUT_DIR, f"temp_{uuid.uuid4()}.wav") segment.export(temp_path, format="wav") y, sr = librosa.load(temp_path, sr=16000) pitches = librosa.piptrack(y=y, sr=sr)[0] pitches = pitches[pitches > 0] features = { 'duration': (end_ms - start_ms) / 1000, 'mean_pitch': float(np.mean(pitches)) if len(pitches) > 0 else 0.0, 'min_pitch': float(np.min(pitches)) if len(pitches) > 0 else 0.0, 'max_pitch': float(np.max(pitches)) if len(pitches) > 0 else 0.0, 'pitch_sd': float(np.std(pitches)) if len(pitches) > 0 else 0.0, 'intensityMean': float(np.mean(librosa.feature.rms(y=y)[0])), 'intensityMin': float(np.min(librosa.feature.rms(y=y)[0])), 'intensityMax': float(np.max(librosa.feature.rms(y=y)[0])), 'intensitySD': float(np.std(librosa.feature.rms(y=y)[0])), } os.remove(temp_path) return features except Exception as e: logger.error(f"Feature extraction failed: {str(e)}") return { 'duration': (end_ms - start_ms) / 1000, 'mean_pitch': 0.0, 'min_pitch': 0.0, 'max_pitch': 0.0, 'pitch_sd': 0.0, 'intensityMean': 0.0, 'intensityMin': 0.0, 'intensityMax': 0.0, 'intensitySD': 0.0, } def transcribe(audio_path: str) -> Dict: try: with open(audio_path, 'rb') as f: upload_response = requests.post( "https://api.assemblyai.com/v2/upload", headers={"authorization": ASSEMBLYAI_KEY}, data=f ) audio_url = upload_response.json()['upload_url'] transcript_response = requests.post( "https://api.assemblyai.com/v2/transcript", headers={"authorization": ASSEMBLYAI_KEY}, json={ "audio_url": audio_url, "speaker_labels": True, "filter_profanity": True } ) transcript_id = transcript_response.json()['id'] while True: result = requests.get( f"https://api.assemblyai.com/v2/transcript/{transcript_id}", headers={"authorization": ASSEMBLYAI_KEY} ).json() if result['status'] == 'completed': return result elif result['status'] == 'error': raise Exception(result['error']) time.sleep(5) except Exception as e: logger.error(f"Transcription failed: {str(e)}") raise def process_utterance(utterance, full_audio, wav_file): try: start = utterance['start'] end = utterance['end'] segment = full_audio[start:end] temp_path = os.path.join(OUTPUT_DIR, f"temp_{uuid.uuid4()}.wav") segment.export(temp_path, format="wav") with torch.no_grad(): embedding = speaker_model.get_embedding(temp_path).to(device) query_result = index.query( vector=embedding.cpu().numpy().tolist(), top_k=1, include_metadata=True ) if query_result['matches'] and query_result['matches'][0]['score'] > 0.7: speaker_id = query_result['matches'][0]['id'] speaker_name = query_result['matches'][0]['metadata']['speaker_name'] else: speaker_id = f"unknown_{uuid.uuid4().hex[:6]}" speaker_name = f"Speaker_{speaker_id[-4:]}" index.upsert([(speaker_id, embedding.tolist(), {"speaker_name": speaker_name})]) os.remove(temp_path) return { **utterance, 'speaker': speaker_name, 'speaker_id': speaker_id, 'embedding': embedding.cpu().numpy().tolist() } except Exception as e: logger.error(f"Utterance processing failed: {str(e)}") return { **utterance, 'speaker': 'Unknown', 'speaker_id': 'unknown', 'embedding': None } def identify_speakers(transcript: Dict, wav_file: str) -> List[Dict]: try: full_audio = AudioSegment.from_wav(wav_file) utterances = transcript['utterances'] with ThreadPoolExecutor(max_workers=5) as executor: # Changed to 5 workers futures = [ executor.submit(process_utterance, utterance, full_audio, wav_file) for utterance in utterances ] results = [f.result() for f in futures] return results except Exception as e: logger.error(f"Speaker identification failed: {str(e)}") raise def train_role_classifier(utterances: List[Dict]): try: texts = [u['text'] for u in utterances] vectorizer = TfidfVectorizer(max_features=500, ngram_range=(1, 2)) X_text = vectorizer.fit_transform(texts) features = [] labels = [] for i, utterance in enumerate(utterances): prosodic = utterance['prosodic_features'] feat = [ prosodic['duration'], prosodic['mean_pitch'], prosodic['min_pitch'], prosodic['max_pitch'], prosodic['pitch_sd'], prosodic['intensityMean'], prosodic['intensityMin'], prosodic['intensityMax'], prosodic['intensitySD'], ] feat.extend(X_text[i].toarray()[0].tolist()) doc = nlp(utterance['text']) feat.extend([ int(utterance['text'].endswith('?')), len(re.findall(r'\b(why|how|what|when|where|who|which)\b', utterance['text'].lower())), len(utterance['text'].split()), sum(1 for token in doc if token.pos_ == 'VERB'), sum(1 for token in doc if token.pos_ == 'NOUN') ]) features.append(feat) labels.append(0 if i % 2 == 0 else 1) scaler = StandardScaler() X = scaler.fit_transform(features) clf = RandomForestClassifier( n_estimators=150, max_depth=10, random_state=42, class_weight='balanced' ) clf.fit(X, labels) joblib.dump(clf, os.path.join(OUTPUT_DIR, 'role_classifier.pkl')) joblib.dump(vectorizer, os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl')) joblib.dump(scaler, os.path.join(OUTPUT_DIR, 'feature_scaler.pkl')) return clf, vectorizer, scaler except Exception as e: logger.error(f"Classifier training failed: {str(e)}") raise def classify_roles(utterances: List[Dict], clf, vectorizer, scaler): try: texts = [u['text'] for u in utterances] X_text = vectorizer.transform(texts) results = [] for i, utterance in enumerate(utterances): prosodic = utterance['prosodic_features'] feat = [ prosodic['duration'], prosodic['mean_pitch'], prosodic['min_pitch'], prosodic['max_pitch'], prosodic['pitch_sd'], prosodic['intensityMean'], prosodic['intensityMin'], prosodic['intensityMax'], prosodic['intensitySD'], ] feat.extend(X_text[i].toarray()[0].tolist()) doc = nlp(utterance['text']) feat.extend([ int(utterance['text'].endswith('?')), len(re.findall(r'\b(why|how|what|when|where|who|which)\b', utterance['text'].lower())), len(utterance['text'].split()), sum(1 for token in doc if token.pos_ == 'VERB'), sum(1 for token in doc if token.pos_ == 'NOUN') ]) X = scaler.transform([feat]) role = 'Interviewer' if clf.predict(X)[0] == 0 else 'Interviewee' results.append({**utterance, 'role': role}) return results except Exception as e: logger.error(f"Role classification failed: {str(e)}") raise def analyze_interviewee_voice(audio_path: str, utterances: List[Dict]) -> Dict: try: y, sr = librosa.load(audio_path, sr=16000) interviewee_utterances = [u for u in utterances if u['role'] == 'Interviewee'] if not interviewee_utterances: return {'error': 'No interviewee utterances found'} segments = [] for u in interviewee_utterances: start = int(u['start'] * sr / 1000) end = int(u['end'] * sr / 1000) segments.append(y[start:end]) combined_audio = np.concatenate(segments) total_duration = sum(u['prosodic_features']['duration'] for u in interviewee_utterances) total_words = sum(len(u['text'].split()) for u in interviewee_utterances) speaking_rate = total_words / total_duration if total_duration > 0 else 0 filler_words = ['um', 'uh', 'like', 'you know', 'so', 'i mean'] filler_count = sum( sum(u['text'].lower().count(fw) for fw in filler_words) for u in interviewee_utterances ) filler_ratio = filler_count / total_words if total_words > 0 else 0 all_words = ' '.join(u['text'].lower() for u in interviewee_utterances).split() word_counts = {} for i in range(len(all_words) - 1): bigram = (all_words[i], all_words[i + 1]) word_counts[bigram] = word_counts.get(bigram, 0) + 1 repetition_score = sum(1 for count in word_counts.values() if count > 1) / len( word_counts) if word_counts else 0 pitches = [] for segment in segments: f0, voiced_flag, _ = librosa.pyin(segment, fmin=80, fmax=300, sr=sr) pitches.extend(f0[voiced_flag]) pitch_mean = np.mean(pitches) if len(pitches) > 0 else 0 pitch_std = np.std(pitches) if len(pitches) > 0 else 0 jitter = np.mean(np.abs(np.diff(pitches))) / pitch_mean if len(pitches) > 1 and pitch_mean > 0 else 0 intensities = [] for segment in segments: rms = librosa.feature.rms(y=segment)[0] intensities.extend(rms) intensity_mean = np.mean(intensities) if intensities else 0 intensity_std = np.std(intensities) if intensities else 0 shimmer = np.mean(np.abs(np.diff(intensities))) / intensity_mean if len( intensities) > 1 and intensity_mean > 0 else 0 anxiety_score = 0.6 * (pitch_std / pitch_mean) + 0.4 * (jitter + shimmer) if pitch_mean > 0 else 0 confidence_score = 0.7 * (1 / (1 + intensity_std)) + 0.3 * (1 / (1 + filler_ratio)) hesitation_score = filler_ratio + repetition_score anxiety_level = 'high' if anxiety_score > 0.15 else 'moderate' if anxiety_score > 0.07 else 'low' confidence_level = 'high' if confidence_score > 0.7 else 'moderate' if confidence_score > 0.5 else 'low' fluency_level = 'fluent' if (filler_ratio < 0.05 and repetition_score < 0.1) else 'moderate' if ( filler_ratio < 0.1 and repetition_score < 0.2) else 'disfluent' return { 'speaking_rate': float(round(speaking_rate, 2)), 'filler_ratio': float(round(filler_ratio, 4)), 'repetition_score': float(round(repetition_score, 4)), 'pitch_analysis': { 'mean': float(round(pitch_mean, 2)), 'std_dev': float(round(pitch_std, 2)), 'jitter': float(round(jitter, 4)) }, 'intensity_analysis': { 'mean': float(round(intensity_mean, 2)), 'std_dev': float(round(intensity_std, 2)), 'shimmer': float(round(shimmer, 4)) }, 'composite_scores': { 'anxiety': float(round(anxiety_score, 4)), 'confidence': float(round(confidence_score, 4)), 'hesitation': float(round(hesitation_score, 4)) }, 'interpretation': { 'anxiety_level': anxiety_level, 'confidence_level': confidence_level, 'fluency_level': fluency_level } } except Exception as e: logger.error(f"Voice analysis failed: {str(e)}") return {'error': str(e)} def generate_voice_interpretation(analysis: Dict) -> str: # This function is used to provide the text interpretation for Gemini's prompt. if 'error' in analysis: return "Voice analysis not available." interpretation_lines = [] interpretation_lines.append("Voice Analysis Summary:") interpretation_lines.append(f"- Speaking Rate: {analysis['speaking_rate']} words/sec (average)") interpretation_lines.append(f"- Filler Words: {analysis['filler_ratio'] * 100:.1f}% of words") interpretation_lines.append(f"- Repetition Score: {analysis['repetition_score']:.3f}") interpretation_lines.append( f"- Anxiety Level: {analysis['interpretation']['anxiety_level'].upper()} (score: {analysis['composite_scores']['anxiety']:.3f})") interpretation_lines.append( f"- Confidence Level: {analysis['interpretation']['confidence_level'].upper()} (score: {analysis['composite_scores']['confidence']:.3f})") interpretation_lines.append(f"- Fluency: {analysis['interpretation']['fluency_level'].upper()}") interpretation_lines.append("") interpretation_lines.append("Detailed Interpretation:") interpretation_lines.append( "1. A higher speaking rate indicates faster speech, which can suggest nervousness or enthusiasm.") interpretation_lines.append("2. Filler words and repetitions reduce speech clarity and professionalism.") interpretation_lines.append("3. Anxiety is measured through pitch variability and voice instability.") interpretation_lines.append("4. Confidence is assessed through voice intensity and stability.") interpretation_lines.append("5. Fluency combines filler words and repetition metrics.") return "\n".join(interpretation_lines) # --- Chart Generation Function --- # Removed function as charts are no longer included def generate_anxiety_confidence_chart(composite_scores: Dict, chart_path: str): try: labels = ['Anxiety', 'Confidence'] scores = [composite_scores.get('anxiety', 0), composite_scores.get('confidence', 0)] fig, ax = plt.subplots(figsize=(4, 2.5)) ax.bar(labels, scores, color=['lightcoral', 'lightskyblue']) ax.set_ylabel('Score') ax.set_title('Anxiety vs. Confidence Scores') ax.set_ylim(0, 1.0) for i, v in enumerate(scores): ax.text(i, v + 0.05, f"{v:.2f}", color='black', ha='center', fontweight='bold') plt.tight_layout() plt.savefig(chart_path) plt.close(fig) except Exception as e: logger.error(f"Error generating chart: {str(e)}") # --- Acceptance Probability Calculation --- def calculate_acceptance_probability(analysis_data: Dict) -> float: """ Calculates a hypothetical acceptance probability based on voice and content analysis. This is a simplified, heuristic model and can be refined with more data/ML. """ voice = analysis_data.get('voice_analysis', {}) if 'error' in voice: return 0.0 # Cannot calculate if voice analysis failed # Weights for different factors (adjust these to fine-tune the model) w_confidence = 0.4 w_anxiety = -0.3 # Negative weight for anxiety w_fluency = 0.2 w_speaking_rate = 0.1 # Ideal rate gets higher score w_filler_repetition = -0.1 # Negative weight for filler/repetition w_content_strengths = 0.2 # Placeholder, ideally from deeper content analysis # Normalize/interpret scores confidence_score = voice.get('composite_scores', {}).get('confidence', 0.0) anxiety_score = voice.get('composite_scores', {}).get('anxiety', 0.0) fluency_level = voice.get('interpretation', {}).get('fluency_level', 'disfluent') speaking_rate = voice.get('speaking_rate', 0.0) filler_ratio = voice.get('filler_ratio', 0.0) repetition_score = voice.get('repetition_score', 0.0) # Fluency mapping (higher score for more fluent) fluency_map = {'fluent': 1.0, 'moderate': 0.5, 'disfluent': 0.0} fluency_val = fluency_map.get(fluency_level, 0.0) # Speaking rate scoring (e.g., ideal is around 2.5 words/sec, gets lower for too fast/slow) # This is a simple inverse of deviation from ideal ideal_speaking_rate = 2.5 speaking_rate_deviation = abs(speaking_rate - ideal_speaking_rate) speaking_rate_score = max(0, 1 - (speaking_rate_deviation / ideal_speaking_rate)) # Max 1.0, min 0.0 # Filler/Repetition score (lower is better, so 1 - score) filler_repetition_composite = (filler_ratio + repetition_score) / 2 # Average them filler_repetition_score = max(0, 1 - filler_repetition_composite) # Simplified content strength score (you might need a more sophisticated NLP method here) # For now, based on presence of strengths in Gemini's content analysis content_strength_val = 0.0 # This part would ideally come from a structured output from Gemini's content analysis. # For now, we'll make a simplified assumption based on the analysis data: # If content analysis found "strengths" (which is likely if Gemini generates a full report) # This needs refinement if Gemini output is not structured for this. if analysis_data.get('text_analysis', {}).get('total_duration', 0) > 0: # Basic check if interview happened content_strength_val = 0.8 # Assume moderate strength if analysis went through # You could parse gemini_report_text for specific phrases like "Strengths:" and count items. # Calculate raw score raw_score = ( confidence_score * w_confidence + (1 - anxiety_score) * abs(w_anxiety) + # (1 - anxiety) because lower anxiety is better fluency_val * w_fluency + speaking_rate_score * w_speaking_rate + filler_repetition_score * abs(w_filler_repetition) + # Use abs weight as score is already inverted content_strength_val * w_content_strengths ) # Normalize to 0-1 and then to percentage # These max/min values are rough estimates and should be calibrated with real data min_possible_score = (0 * w_confidence) + (0 * abs(w_anxiety)) + (0 * w_fluency) + (0 * w_speaking_rate) + ( 0 * abs(w_filler_repetition)) + (0 * w_content_strengths) max_possible_score = (1 * w_confidence) + (1 * abs(w_anxiety)) + (1 * w_fluency) + (1 * w_speaking_rate) + ( 1 * abs(w_filler_repetition)) + (1 * w_content_strengths) # Prevent division by zero if all weights are zero or min/max are same if max_possible_score == min_possible_score: normalized_score = 0.5 # Default if no variation else: normalized_score = (raw_score - min_possible_score) / (max_possible_score - min_possible_score) acceptance_probability = max(0.0, min(1.0, normalized_score)) # Clamp between 0 and 1 return float(f"{acceptance_probability * 100:.2f}") # Return as percentage def generate_report(analysis_data: Dict) -> str: try: voice = analysis_data.get('voice_analysis', {}) voice_interpretation = generate_voice_interpretation(voice) interviewee_responses = [ f"Speaker {u['speaker']} ({u['role']}): {u['text']}" for u in analysis_data['transcript'] if u['role'] == 'Interviewee' ][:5] # Limit to first 5 for prompt brevity acceptance_prob = analysis_data.get('acceptance_probability', None) acceptance_line = "" if acceptance_prob is not None: acceptance_line = f"\n**Estimated Acceptance Probability: {acceptance_prob:.2f}%**\n" if acceptance_prob >= 80: acceptance_line += "This indicates a very strong candidate. Well done!" elif acceptance_prob >= 50: acceptance_line += "This indicates a solid candidate with potential for improvement." else: acceptance_line += "This candidate may require significant development or may not be a strong fit." prompt = f""" As EvalBot, an AI interview analysis system, generate a highly professional, well-structured, and concise interview analysis report. The report should be suitable for a professional setting and clearly highlight key findings and actionable recommendations. Use clear headings and subheadings. For bullet points, use '- '. {acceptance_line} **1. Executive Summary** Provide a brief, high-level overview of the interview. - Overall interview duration: {analysis_data['text_analysis']['total_duration']:.2f} seconds - Number of speaker turns: {analysis_data['text_analysis']['speaker_turns']} - Main participants: {', '.join(analysis_data['speakers'])} **2. Voice Analysis Insights** Analyze key voice metrics and provide a detailed interpretation. {voice_interpretation} **3. Content Analysis & Strengths/Areas for Development** Analyze the key themes and identify both strengths and areas for development in the interviewee's responses. Key responses from interviewee (for context): {chr(10).join(interviewee_responses)} **4. Actionable Recommendations** Offer specific, actionable suggestions for improvement. Focus on: - Communication Skills (e.g., pacing, clarity, filler words) - Content Delivery (e.g., quantifying achievements, structuring answers) - Professional Presentation (e.g., research, specific examples, mock interviews) """ response = gemini_model.generate_content(prompt) return response.text except Exception as e: logger.error(f"Report generation failed: {str(e)}") return f"Error generating report: {str(e)}" # --- ENHANCED PDF GENERATION FUNCTION (without logo or charts) --- def create_pdf_report(analysis_data: Dict, output_path: str, gemini_report_text: str): try: doc = SimpleDocTemplate(output_path, pagesize=letter) styles = getSampleStyleSheet() # Define custom styles h1 = ParagraphStyle(name='Heading1', parent=styles['h1'], fontSize=16, spaceAfter=14, alignment=1, textColor=colors.HexColor('#003366')) h2 = ParagraphStyle(name='Heading2', parent=styles['h2'], fontSize=12, spaceBefore=10, spaceAfter=8, textColor=colors.HexColor('#336699')) h3 = ParagraphStyle(name='Heading3', parent=styles['h3'], fontSize=10, spaceBefore=8, spaceAfter=4, textColor=colors.HexColor('#0055AA')) body_text = ParagraphStyle(name='BodyText', parent=styles['Normal'], fontSize=9, leading=12, spaceAfter=4) bullet_style = ParagraphStyle(name='Bullet', parent=styles['Normal'], fontSize=9, leading=12, leftIndent=18, bulletIndent=9) story = [] # Title and Date story.append(Paragraph(f"EvalBot Interview Analysis Report", h1)) story.append(Spacer(1, 0.2 * inch)) story.append(Paragraph(f"Date: {time.strftime('%Y-%m-%d')}", body_text)) story.append(Spacer(1, 0.3 * inch)) # --- Acceptance Probability (New Section) --- acceptance_prob = analysis_data.get('acceptance_probability', None) if acceptance_prob is not None: story.append(Paragraph("Candidate Evaluation Summary", h2)) story.append(Spacer(1, 0.1 * inch)) prob_color = colors.green if acceptance_prob >= 70 else ( colors.orange if acceptance_prob >= 40 else colors.red) story.append(Paragraph( f"Estimated Acceptance Probability: {acceptance_prob:.2f}%", ParagraphStyle(name='AcceptanceProbability', parent=styles['Normal'], fontSize=12, spaceAfter=10, alignment=1) )) if acceptance_prob >= 80: story.append( Paragraph("This indicates a very strong candidate with high potential. Well done!", body_text)) elif acceptance_prob >= 50: story.append(Paragraph( "This candidate shows solid potential but has areas for improvement to become an even stronger fit.", body_text)) else: story.append(Paragraph( "This candidate may require significant development or may not be the ideal fit at this time.", body_text)) story.append(Spacer(1, 0.3 * inch)) # --- End Acceptance Probability --- # Parse Gemini's report into sections for better PDF structuring sections = {} current_section = None # Use regex to robustly identify sections, especially with varied bullet points section_patterns = { r'^\s*\*\*\s*1\.\s*Executive Summary\s*\*\*': 'Executive Summary', r'^\s*\*\*\s*2\.\s*Voice Analysis Insights\s*\*\*': 'Voice Analysis Insights', r'^\s*\*\*\s*3\.\s*Content Analysis & Strengths/Areas for Development\s*\*\*': 'Content Analysis & Strengths/Areas for Development', r'^\s*\*\*\s*4\.\s*Actionable Recommendations\s*\*\*': 'Actionable Recommendations' } for line in gemini_report_text.split('\n'): matched_section = False for pattern, section_name in section_patterns.items(): if re.match(pattern, line): current_section = section_name sections[current_section] = [] matched_section = True break if not matched_section and current_section: sections[current_section].append(line) # 1. Executive Summary story.append(Paragraph("1. Executive Summary", h2)) story.append(Spacer(1, 0.1 * inch)) if 'Executive Summary' in sections: for line in sections['Executive Summary']: if line.strip(): story.append(Paragraph(line.strip(), body_text)) story.append(Spacer(1, 0.2 * inch)) # 2. Voice Analysis (Detailed - using Table for summary) story.append(Paragraph("2. Voice Analysis", h2)) voice_analysis = analysis_data.get('voice_analysis', {}) if voice_analysis and 'error' not in voice_analysis: # Voice Analysis Summary Table table_data = [ ['Metric', 'Value', 'Interpretation'], ['Speaking Rate', f"{voice_analysis['speaking_rate']:.2f} words/sec", 'Average rate'], ['Filler Words', f"{voice_analysis['filler_ratio'] * 100:.1f}%", 'Percentage of total words'], ['Repetition Score', f"{voice_analysis['repetition_score']:.3f}", 'Lower is better articulation'], ['Anxiety Level', voice_analysis['interpretation']['anxiety_level'].upper(), f"Score: {voice_analysis['composite_scores']['anxiety']:.3f}"], ['Confidence Level', voice_analysis['interpretation']['confidence_level'].upper(), f"Score: {voice_analysis['composite_scores']['confidence']:.3f}"], ['Fluency', voice_analysis['interpretation']['fluency_level'].upper(), 'Overall speech flow'] ] table_style = TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#6699CC')), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'LEFT'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('BOTTOMPADDING', (0, 0), (-1, 0), 10), ('BACKGROUND', (0, 1), (-1, -1), colors.HexColor('#EFEFEF')), ('GRID', (0, 0), (-1, -1), 0.5, colors.HexColor('#CCCCCC')), ('LEFTPADDING', (0, 0), (-1, -1), 6), ('RIGHTPADDING', (0, 0), (-1, -1), 6), ('TOPPADDING', (0, 0), (-1, -1), 6), ('BOTTOMPADDING', (0, 0), (-1, -1), 6), ]) table = Table(table_data) table.setStyle(table_style) story.append(table) story.append(Spacer(1, 0.2 * inch)) # Detailed Interpretation from Gemini (if present) if 'Voice Analysis Insights' in sections: story.append(Paragraph("Detailed Interpretation:", h3)) for line in sections['Voice Analysis Insights']: if line.strip(): # Handle numbered lists from Gemini if re.match(r'^\d+\.\s', line.strip()): story.append( Paragraph(line.strip(), bullet_style)) else: story.append(Paragraph(line.strip(), body_text)) story.append(Spacer(1, 0.2 * inch)) else: story.append(Paragraph("Voice analysis not available or encountered an error.", body_text)) story.append(Spacer(1, 0.3 * inch)) # 3. Content Analysis story.append(Paragraph("3. Content Analysis", h2)) if 'Content Analysis & Strengths/Areas for Development' in sections: for line in sections['Content Analysis & Strengths/Areas for Development']: if line.strip(): if line.strip().startswith('-'): story.append(Paragraph(line.strip(), bullet_style)) else: story.append(Paragraph(line.strip(), body_text)) story.append(Spacer(1, 0.2 * inch)) # Add some interviewee responses to the report (can be formatted as a list) story.append(Paragraph("Key Interviewee Responses (Contextual):", h3)) interviewee_responses = [ f"Speaker {u['speaker']} ({u['role']}): {u['text']}" for u in analysis_data['transcript'] if u['role'] == 'Interviewee' ][:5] for res in interviewee_responses: story.append(Paragraph(res, bullet_style)) story.append(Spacer(1, 0.3 * inch)) # 4. Recommendations story.append(Paragraph("4. Recommendations", h2)) if 'Actionable Recommendations' in sections: for line in sections['Actionable Recommendations']: if line.strip(): if line.strip().startswith('-'): story.append(Paragraph(line.strip(), bullet_style)) else: story.append(Paragraph(line.strip(), body_text)) story.append(Spacer(1, 0.2 * inch)) # Footer Text story.append(Spacer(1, 0.5 * inch)) story.append(Paragraph("--- Analysis by EvalBot ---", ParagraphStyle( name='FooterText', parent=styles['Normal'], fontSize=8, alignment=1, textColor=colors.HexColor('#666666') ))) doc.build(story) return True except Exception as e: logger.error(f"PDF creation failed: {str(e)}", exc_info=True) return False def convert_to_serializable(obj): if isinstance(obj, np.generic): return obj.item() elif isinstance(obj, dict): return {key: convert_to_serializable(value) for key, value in obj.items()} elif isinstance(obj, list): return [convert_to_serializable(item) for item in obj] elif isinstance(obj, np.ndarray): return obj.tolist() return obj def process_interview(audio_path: str): try: logger.info(f"Starting processing for {audio_path}") wav_file = convert_to_wav(audio_path) logger.info("Starting transcription") transcript = transcribe(wav_file) logger.info("Extracting prosodic features") for utterance in transcript['utterances']: utterance['prosodic_features'] = extract_prosodic_features( wav_file, utterance['start'], utterance['end'] ) logger.info("Identifying speakers") utterances_with_speakers = identify_speakers(transcript, wav_file) logger.info("Classifying roles") # Ensure role classifier models are loaded/trained only once if possible, # or handled carefully in a multi-threaded context. # For simplicity, keeping it inside process_interview for now. if os.path.exists(os.path.join(OUTPUT_DIR, 'role_classifier.pkl')): clf = joblib.load(os.path.join(OUTPUT_DIR, 'role_classifier.pkl')) vectorizer = joblib.load(os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl')) scaler = joblib.load(os.path.join(OUTPUT_DIR, 'feature_scaler.pkl')) else: clf, vectorizer, scaler = train_role_classifier(utterances_with_speakers) classified_utterances = classify_roles(utterances_with_speakers, clf, vectorizer, scaler) logger.info("Analyzing interviewee voice") voice_analysis = analyze_interviewee_voice(wav_file, classified_utterances) analysis_data = { 'transcript': classified_utterances, 'speakers': list(set(u['speaker'] for u in classified_utterances)), 'voice_analysis': voice_analysis, 'text_analysis': { 'total_duration': sum(u['prosodic_features']['duration'] for u in classified_utterances), 'speaker_turns': len(classified_utterances) } } # --- Calculate Acceptance Probability --- acceptance_probability = calculate_acceptance_probability(analysis_data) analysis_data['acceptance_probability'] = acceptance_probability # --- End Acceptance Probability --- logger.info("Generating report text using Gemini") gemini_report_text = generate_report(analysis_data) base_name = os.path.splitext(os.path.basename(audio_path))[0] pdf_path = os.path.join(OUTPUT_DIR, f"{base_name}_report.pdf") create_pdf_report(analysis_data, pdf_path, gemini_report_text=gemini_report_text) json_path = os.path.join(OUTPUT_DIR, f"{base_name}_analysis.json") with open(json_path, 'w') as f: serializable_data = convert_to_serializable(analysis_data) json.dump(serializable_data, f, indent=2) os.remove(wav_file) # Clean up WAV file after processing logger.info(f"Processing completed for {audio_path}") return { 'pdf_path': pdf_path, 'json_path': json_path } except Exception as e: logger.error(f"Processing failed: {str(e)}", exc_info=True) # Clean up wav_file in case of error if 'wav_file' in locals() and os.path.exists(wav_file): os.remove(wav_file) raise