| | import os |
| | import torch |
| | import numpy as np |
| | import uuid |
| | import requests |
| | import time |
| | import json |
| | from pydub import AudioSegment |
| | import wave |
| | from nemo.collections.asr.models import EncDecSpeakerLabelModel |
| | from pinecone import Pinecone, ServerlessSpec |
| | import librosa |
| | import pandas as pd |
| | from sklearn.ensemble import RandomForestClassifier |
| | from sklearn.preprocessing import StandardScaler |
| | from sklearn.feature_extraction.text import TfidfVectorizer |
| | import re |
| | from typing import Dict, List, Tuple |
| | import logging |
| | import tempfile |
| | from reportlab.lib.pagesizes import letter |
| | from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak, Image |
| | from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
| | from reportlab.lib.units import inch |
| | from reportlab.lib import colors |
| | import matplotlib.pyplot as plt |
| | import matplotlib |
| | matplotlib.use('Agg') |
| | import io |
| | from transformers import AutoTokenizer, AutoModel |
| | import spacy |
| | import google.generativeai as genai |
| | import joblib |
| | from concurrent.futures import ThreadPoolExecutor |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| | logging.getLogger("nemo_logger").setLevel(logging.WARNING) |
| |
|
| | |
| | AUDIO_DIR = "./Uploads" |
| | OUTPUT_DIR = "./processed_audio" |
| | os.makedirs(OUTPUT_DIR, exist_ok=True) |
| |
|
| | |
| | PINECONE_KEY = os.getenv("PINECONE_KEY", "your-pinecone-key") |
| | ASSEMBLYAI_KEY = os.getenv("ASSEMBLYAI_KEY", "your-assemblyai-key") |
| | GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "your-gemini-key") |
| |
|
| | def validate_url(url: str) -> bool: |
| | """Check if the URL is accessible.""" |
| | try: |
| | response = requests.head(url, timeout=5) |
| | return response.status_code == 200 |
| | except requests.RequestException as e: |
| | logger.error(f"URL validation failed for {url}: {str(e)}") |
| | return False |
| |
|
| | def download_audio_from_url(url: str) -> str: |
| | """Downloads an audio file from a URL to a temporary local path.""" |
| | if not validate_url(url): |
| | logger.error(f"Invalid or inaccessible URL: {url}") |
| | raise ValueError(f"Audio file not found at {url}") |
| | try: |
| | temp_dir = tempfile.gettempdir() |
| | temp_path = os.path.join(temp_dir, f"{uuid.uuid4()}.tmp_audio") |
| | logger.info(f"Downloading audio from {url} to {temp_path}") |
| | with requests.get(url, stream=True, timeout=10) as r: |
| | r.raise_for_status() |
| | with open(temp_path, 'wb') as f: |
| | for chunk in r.iter_content(chunk_size=8192): |
| | f.write(chunk) |
| | return temp_path |
| | except requests.HTTPError as e: |
| | logger.error(f"HTTP error downloading audio from {url}: {str(e)}") |
| | raise |
| | except Exception as e: |
| | logger.error(f"Failed to download audio from URL {url}: {str(e)}") |
| | raise |
| |
|
| | def initialize_services(): |
| | try: |
| | pc = Pinecone(api_key=PINECONE_KEY) |
| | index_name = "interview-speaker-embeddings" |
| | if index_name not in pc.list_indexes().names(): |
| | pc.create_index( |
| | name=index_name, |
| | dimension=192, |
| | metric="cosine", |
| | spec=ServerlessSpec(cloud="aws", region="us-east-1") |
| | ) |
| | index = pc.Index(index_name) |
| | genai.configure(api_key=GEMINI_API_KEY) |
| | gemini_model = genai.GenerativeModel('gemini-1.5-flash') |
| | return index, gemini_model |
| | except Exception as e: |
| | logger.error(f"Error initializing services: {str(e)}") |
| | raise |
| |
|
| | index, gemini_model = initialize_services() |
| |
|
| | device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| | logger.info(f"Using device: {device}") |
| |
|
| | def load_speaker_model(): |
| | try: |
| | torch.set_num_threads(5) |
| | model = EncDecSpeakerLabelModel.from_pretrained( |
| | "nvidia/speakerverification_en_titanet_large", |
| | map_location=device |
| | ) |
| | model.eval() |
| | return model |
| | except Exception as e: |
| | logger.error(f"Model loading failed: {str(e)}") |
| | raise RuntimeError("Could not load speaker verification model") |
| |
|
| | def load_models(): |
| | speaker_model = load_speaker_model() |
| | nlp = spacy.load("en_core_web_sm") |
| | tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased") |
| | llm_model = AutoModel.from_pretrained("distilbert-base-uncased").to(device) |
| | llm_model.eval() |
| | return speaker_model, nlp, tokenizer, llm_model |
| |
|
| | speaker_model, nlp, tokenizer, llm_model = load_models() |
| |
|
| | def convert_to_wav(audio_path: str, output_dir: str = OUTPUT_DIR) -> str: |
| | try: |
| | audio = AudioSegment.from_file(audio_path) |
| | if audio.channels > 1: |
| | audio = audio.set_channels(1) |
| | audio = audio.set_frame_rate(16000) |
| | wav_file = os.path.join(output_dir, f"{uuid.uuid4()}.wav") |
| | audio.export(wav_file, format="wav") |
| | return wav_file |
| | except Exception as e: |
| | logger.error(f"Audio conversion failed: {str(e)}") |
| | raise |
| |
|
| | def extract_prosodic_features(audio_path: str, start_ms: int, end_ms: int) -> Dict: |
| | try: |
| | audio = AudioSegment.from_file(audio_path) |
| | segment = audio[start_ms:end_ms] |
| | temp_path = os.path.join(OUTPUT_DIR, f"temp_{uuid.uuid4()}.wav") |
| | segment.export(temp_path, format="wav") |
| | y, sr = librosa.load(temp_path, sr=16000) |
| | pitches = librosa.piptrack(y=y, sr=sr)[0] |
| | pitches = pitches[pitches > 0] |
| | features = { |
| | 'duration': (end_ms - start_ms) / 1000, |
| | 'mean_pitch': float(np.mean(pitches)) if len(pitches) > 0 else 0.0, |
| | 'min_pitch': float(np.min(pitches)) if len(pitches) > 0 else 0.0, |
| | 'max_pitch': float(np.max(pitches)) if len(pitches) > 0 else 0.0, |
| | 'pitch_sd': float(np.std(pitches)) if len(pitches) > 0 else 0.0, |
| | 'intensityMean': float(np.mean(librosa.feature.rms(y=y)[0])), |
| | 'intensityMin': float(np.min(librosa.feature.rms(y=y)[0])), |
| | 'intensityMax': float(np.max(librosa.feature.rms(y=y)[0])), |
| | 'intensitySD': float(np.std(librosa.feature.rms(y=y)[0])), |
| | } |
| | os.remove(temp_path) |
| | return features |
| | except Exception as e: |
| | logger.error(f"Feature extraction failed: {str(e)}") |
| | return { |
| | 'duration': 0.0, 'mean_pitch': 0.0, 'min_pitch': 0.0, 'max_pitch': 0.0, |
| | 'pitch_sd': 0.0, 'intensityMean': 0.0, 'intensityMin': 0.0, |
| | 'intensityMax': 0.0, 'intensitySD': 0.0 |
| | } |
| |
|
| | def transcribe(audio_path: str) -> Dict: |
| | try: |
| | with open(audio_path, 'rb') as f: |
| | upload_response = requests.post( |
| | "https://api.assemblyai.com/v2/upload", |
| | headers={"authorization": ASSEMBLYAI_KEY}, |
| | data=f |
| | ) |
| | audio_url = upload_response.json()['upload_url'] |
| | transcript_response = requests.post( |
| | "https://api.assemblyai.com/v2/transcript", |
| | headers={"authorization": ASSEMBLYAI_KEY}, |
| | json={ |
| | "audio_url": audio_url, |
| | "speaker_labels": True, |
| | "filter_profanity": True |
| | } |
| | ) |
| | transcript_id = transcript_response.json()['id'] |
| | while True: |
| | result = requests.get( |
| | f"https://api.assemblyai.com/v2/transcript/{transcript_id}", |
| | headers={"authorization": ASSEMBLYAI_KEY} |
| | ).json() |
| | if result['status'] == 'completed': |
| | return result |
| | elif result['status'] == 'error': |
| | raise Exception(result['error']) |
| | time.sleep(5) |
| | except Exception as e: |
| | logger.error(f"Transcription failed: {str(e)}") |
| | raise |
| |
|
| | def process_utterance(utterance: Dict, full_audio: AudioSegment, wav_file: str) -> Dict: |
| | try: |
| | start = utterance['start'] |
| | end = utterance['end'] |
| | segment = full_audio[start:end] |
| | temp_path = os.path.join(OUTPUT_DIR, f"temp_{uuid.uuid4()}.wav") |
| | segment.export(temp_path, format="wav") |
| | with torch.no_grad(): |
| | embedding = speaker_model.get_embedding(temp_path).cpu().numpy() |
| | embedding_list = embedding.flatten().tolist() |
| | query_result = index.query( |
| | vector=embedding_list, |
| | top_k=1, |
| | include_metadata=True |
| | ) |
| | if query_result['matches'] and query_result['matches'][0]['score'] > 0.7: |
| | speaker_id = query_result['matches'][0]['id'] |
| | speaker_name = query_result['matches'][0]['metadata']['speaker_name'] |
| | else: |
| | speaker_id = f"unknown_{uuid.uuid4().hex[:6]}" |
| | speaker_name = f"Speaker_{speaker_id[-4:]}" |
| | index.upsert([(speaker_id, embedding_list, {"speaker_name": speaker_name})]) |
| | os.remove(temp_path) |
| | return { |
| | **utterance, |
| | 'speaker': speaker_name, |
| | 'speaker_id': speaker_id, |
| | 'embedding': embedding_list |
| | } |
| | except Exception as e: |
| | logger.error(f"Utterance processing failed: {str(e)}") |
| | return { |
| | **utterance, |
| | 'speaker': 'Unknown', |
| | 'speaker_id': 'unknown', |
| | 'embedding': None |
| | } |
| |
|
| | def identify_speakers(transcript: Dict, wav_file: str) -> List[Dict]: |
| | try: |
| | full_audio = AudioSegment.from_wav(wav_file) |
| | utterances = transcript['utterances'] |
| | with ThreadPoolExecutor(max_workers=5) as executor: |
| | futures = [ |
| | executor.submit(process_utterance, utterance, full_audio, wav_file) |
| | for utterance in utterances |
| | ] |
| | results = [f.result() for f in futures] |
| | return results |
| | except Exception as e: |
| | logger.error(f"Speaker identification failed: {str(e)}") |
| | raise |
| |
|
| | def train_role_classifier(utterances: List[Dict]): |
| | try: |
| | texts = [u['text'] for u in utterances] |
| | vectorizer = TfidfVectorizer(max_features=500, ngram_range=(1, 2)) |
| | X_text = vectorizer.fit_transform(texts) |
| | features = [] |
| | labels = [] |
| | for i, utterance in enumerate(utterances): |
| | prosodic = utterance['prosodic_features'] |
| | feat = [ |
| | prosodic['duration'], prosodic['mean_pitch'], prosodic['min_pitch'], |
| | prosodic['max_pitch'], prosodic['pitch_sd'], prosodic['intensityMean'], |
| | prosodic['intensityMin'], prosodic['intensityMax'], prosodic['intensitySD'], |
| | ] |
| | feat.extend(X_text[i].toarray()[0].tolist()) |
| | doc = nlp(utterance['text']) |
| | feat.extend([ |
| | int(utterance['text'].endswith('?')), |
| | len(re.findall(r'\b(why|how|what|when|where|who|which)\b', utterance['text'].lower())), |
| | len(utterance['text'].split()), |
| | sum(1 for token in doc if token.pos_ == 'VERB'), |
| | sum(1 for token in doc if token.pos_ == 'NOUN') |
| | ]) |
| | features.append(feat) |
| | labels.append(0 if i % 2 == 0 else 1) |
| | scaler = StandardScaler() |
| | X = scaler.fit_transform(features) |
| | clf = RandomForestClassifier( |
| | n_estimators=150, max_depth=10, random_state=42, class_weight='balanced' |
| | ) |
| | clf.fit(X, labels) |
| | joblib.dump(clf, os.path.join(OUTPUT_DIR, 'role_classifier.pkl')) |
| | joblib.dump(vectorizer, os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl')) |
| | joblib.dump(scaler, os.path.join(OUTPUT_DIR, 'feature_scaler.pkl')) |
| | return clf, vectorizer, scaler |
| | except Exception as e: |
| | logger.error(f"Classifier training failed: {str(e)}") |
| | raise |
| |
|
| | def classify_roles(utterances: List[Dict], clf, vectorizer, scaler): |
| | try: |
| | texts = [u['text'] for u in utterances] |
| | X_text = vectorizer.transform(texts) |
| | results = [] |
| | for i, utterance in enumerate(utterances): |
| | prosodic = utterance['prosodic_features'] |
| | feat = [ |
| | prosodic['duration'], prosodic['mean_pitch'], prosodic['min_pitch'], |
| | prosodic['max_pitch'], prosodic['pitch_sd'], prosodic['intensityMean'], |
| | prosodic['intensityMin'], prosodic['intensityMax'], prosodic['intensitySD'], |
| | ] |
| | feat.extend(X_text[i].toarray()[0].tolist()) |
| | doc = nlp(utterance['text']) |
| | feat.extend([ |
| | int(utterance['text'].endswith('?')), |
| | len(re.findall(r'\b(why|how|what|when|where|who|which)\b', utterance['text'].lower())), |
| | len(utterance['text'].split()), |
| | sum(1 for token in doc if token.pos_ == 'VERB'), |
| | sum(1 for token in doc if token.pos_ == 'NOUN') |
| | ]) |
| | X = scaler.transform([feat]) |
| | role = 'Interviewer' if clf.predict(X)[0] == 0 else 'Interviewee' |
| | results.append({**utterance, 'role': role}) |
| | return results |
| | except Exception as e: |
| | logger.error(f"Role classification failed: {str(e)}") |
| | raise |
| |
|
| | def analyze_interviewee_voice(audio_path: str, utterances: List[Dict]) -> Dict: |
| | try: |
| | y, sr = librosa.load(audio_path, sr=16000) |
| | interviewee_utterances = [u for u in utterances if u['role'] == 'Interviewee'] |
| | if not interviewee_utterances: |
| | logger.warning("No interviewee utterances found") |
| | return {'error': 'No interviewee utterances found'} |
| | segments = [] |
| | for u in interviewee_utterances: |
| | start = int(u['start'] * sr / 1000) |
| | end = int(u['end'] * sr / 1000) |
| | if end > start and len(y[start:end]) > 0: |
| | segments.append(y[start:end]) |
| | else: |
| | logger.warning(f"Invalid segment for utterance: start={start}, end={end}") |
| | if not segments: |
| | logger.warning("No valid audio segments for voice analysis") |
| | return {'error': 'No valid audio segments found'} |
| | total_duration = sum(u['prosodic_features']['duration'] for u in interviewee_utterances) |
| | total_words = sum(len(u['text'].split()) for u in interviewee_utterances) |
| | speaking_rate = total_words / total_duration if total_duration > 0 else 0 |
| | filler_words = ['um', 'uh', 'like', 'you know', 'so', 'i mean'] |
| | filler_count = sum(sum(u['text'].lower().count(fw) for fw in filler_words) for u in interviewee_utterances) |
| | filler_ratio = filler_count / total_words if total_words > 0 else 0 |
| | all_words = ' '.join(u['text'].lower() for u in interviewee_utterances).split() |
| | word_counts = {} |
| | for i in range(len(all_words) - 1): |
| | bigram = (all_words[i], all_words[i + 1]) |
| | word_counts[bigram] = word_counts.get(bigram, 0) + 1 |
| | repetition_score = sum(1 for count in word_counts.values() if count > 1) / len(word_counts) if word_counts else 0 |
| | pitches = [] |
| | for segment in segments: |
| | f0, voiced_flag, _ = librosa.pyin(segment, fmin=80, fmax=300, sr=sr) |
| | pitches.extend(f0[voiced_flag]) |
| | pitch_mean = np.mean(pitches) if len(pitches) > 0 else 0 |
| | pitch_std = np.std(pitches) if len(pitches) > 0 else 0 |
| | jitter = np.mean(np.abs(np.diff(pitches))) / pitch_mean if len(pitches) > 1 and pitch_mean > 0 else 0 |
| | intensities = [] |
| | for segment in segments: |
| | rms = np.mean(librosa.feature.rms(y=segment)[0]) if len(segment) > 0 else 0.0 |
| | intensities.append(float(rms)) |
| | intensity_mean = np.mean(intensities) if intensities else 0 |
| | intensity_std = np.std(intensities) if intensities else 0 |
| | shimmer = np.mean(np.abs(np.diff(intensities))) / intensity_mean if len(intensities) > 1 and intensity_mean > 0 else 0 |
| | anxiety_score = 0.6 * (pitch_std / pitch_mean) + 0.4 * (jitter + shimmer) if pitch_mean > 0 else 0 |
| | confidence_score = 0.7 * (1 / (1 + intensity_std)) + 0.3 * (1 / (1 + filler_ratio)) |
| | hesitation_score = filler_ratio + repetition_score |
| | anxiety_level = 'High' if anxiety_score > 0.15 else 'Moderate' if anxiety_score > 0.07 else 'Low' |
| | confidence_level = 'High' if confidence_score > 0.7 else 'Moderate' if confidence_score > 0.5 else 'Low' |
| | fluency_level = 'Fluent' if (filler_ratio < 0.05 and repetition_score < 0.1) else 'Moderate' if (filler_ratio < 0.1 and repetition_score < 0.2) else 'Disfluent' |
| | return { |
| | 'speaking_rate': float(round(speaking_rate, 2)), |
| | 'filler_ratio': float(round(filler_ratio, 4)), |
| | 'repetition_score': float(round(repetition_score, 4)), |
| | 'pitch_analysis': {'mean': float(round(pitch_mean, 2)), 'std_dev': float(round(pitch_std, 2)), 'jitter': float(round(jitter, 4))}, |
| | 'intensity_analysis': {'mean': float(round(intensity_mean, 2)), 'std_dev': float(round(intensity_std, 2)), 'shimmer': float(round(shimmer, 4))}, |
| | 'composite_scores': {'anxiety': float(round(anxiety_score, 4)), 'confidence': float(round(confidence_score, 4)), 'hesitation': float(round(hesitation_score, 4))}, |
| | 'interpretation': {'anxiety_level': anxiety_level, 'confidence_level': confidence_level, 'fluency_level': fluency_level} |
| | } |
| | except Exception as e: |
| | logger.error(f"Voice analysis failed: {str(e)}", exc_info=True) |
| | return {'error': f'Voice analysis incomplete due to audio processing issues: {str(e)}'} |
| |
|
| | def generate_voice_interpretation(analysis: Dict) -> str: |
| | if 'error' in analysis: |
| | return f"Voice analysis unavailable: {analysis['error']}" |
| | interpretation_lines = [ |
| | f"- Speaking Rate: {analysis['speaking_rate']} words/sec (Benchmark: 2.0-3.0 wps; affects clarity)", |
| | f"- Filler Words: {analysis['filler_ratio'] * 100:.1f}% (High usage reduces credibility)", |
| | f"- Anxiety: {analysis['interpretation']['anxiety_level']} (Score: {analysis['composite_scores']['anxiety']:.3f}; stress response)", |
| | f"- Confidence: {analysis['interpretation']['confidence_level']} (Score: {analysis['composite_scores']['confidence']:.3f}; vocal strength)", |
| | f"- Fluency: {analysis['interpretation']['fluency_level']} (Drives engagement)", |
| | "", |
| | "HR Insights:", |
| | "- Rapid speech (>3.0 wps) may reduce clarity; slower pacing enhances professionalism.", |
| | "- High filler word usage undermines perceived confidence.", |
| | "- Elevated anxiety suggests pressure; training can improve resilience.", |
| | "- Strong confidence supports leadership presence.", |
| | "- Fluent speech enhances engagement in team settings." |
| | ] |
| | return "\n".join(interpretation_lines) |
| |
|
| | def generate_anxiety_confidence_chart(composite_scores: Dict, chart_buffer): |
| | try: |
| | labels = ['Anxiety', 'Confidence'] |
| | scores = [composite_scores.get('anxiety', 0), composite_scores.get('confidence', 0)] |
| | fig, ax = plt.subplots(figsize=(5, 3.5)) |
| | bars = ax.bar(labels, scores, color=['#FF5252', '#26A69A'], edgecolor='black', width=0.45) |
| | ax.set_ylabel('Score', fontsize=12) |
| | ax.set_title('Vocal Dynamics: Anxiety vs. Confidence', fontsize=14, pad=15) |
| | ax.set_ylim(0, 1.3) |
| | for bar in bars: |
| | height = bar.get_height() |
| | ax.text(bar.get_x() + bar.get_width()/2, height + 0.05, f"{height:.2f}", |
| | ha='center', color='black', fontweight='bold', fontsize=10) |
| | ax.grid(True, axis='y', linestyle='--', alpha=0.7) |
| | plt.tight_layout() |
| | plt.savefig(chart_buffer, format='png', bbox_inches='tight', dpi=100) |
| | plt.close(fig) |
| | except Exception as e: |
| | logger.error(f"Error generating chart: {str(e)}") |
| |
|
| | def calculate_acceptance_probability(analysis_data: Dict) -> float: |
| | voice = analysis_data.get('voice_analysis', {}) |
| | if 'error' in voice: return 50.0 |
| | w_confidence, w_anxiety, w_fluency, w_speaking_rate, w_filler_repetition, w_content_strengths = 0.35, -0.25, 0.2, 0.15, -0.15, 0.25 |
| | confidence_score = voice.get('composite_scores', {}).get('confidence', 0.0) |
| | anxiety_score = voice.get('composite_scores', {}).get('anxiety', 0.0) |
| | fluency_level = voice.get('interpretation', {}).get('fluency_level', 'Disfluent') |
| | speaking_rate = voice.get('speaking_rate', 0.0) |
| | filler_ratio = voice.get('filler_ratio', 0.0) |
| | repetition_score = voice.get('repetition_score', 0.0) |
| | fluency_map = {'Fluent': 1.0, 'Moderate': 0.6, 'Disfluent': 0.2} |
| | fluency_val = fluency_map.get(fluency_level, 0.2) |
| | ideal_speaking_rate = 2.5 |
| | speaking_rate_deviation = abs(speaking_rate - ideal_speaking_rate) |
| | speaking_rate_score = max(0, 1 - (speaking_rate_deviation / ideal_speaking_rate)) |
| | filler_repetition_composite = (filler_ratio + repetition_score) / 2 |
| | filler_repetition_score = max(0, 1 - filler_repetition_composite) |
| | content_strength_val = 0.85 if analysis_data.get('text_analysis', {}).get('total_duration', 0) > 60 else 0.4 |
| | raw_score = (confidence_score * w_confidence + (1 - anxiety_score) * abs(w_anxiety) + fluency_val * w_fluency + speaking_rate_score * w_speaking_rate + filler_repetition_score * abs(w_filler_repetition) + content_strength_val * w_content_strengths) |
| | max_possible_score = (w_confidence + abs(w_anxiety) + w_fluency + w_speaking_rate + abs(w_filler_repetition) + w_content_strengths) |
| | normalized_score = raw_score / max_possible_score if max_possible_score > 0 else 0.5 |
| | acceptance_probability = max(0.0, min(1.0, normalized_score)) |
| | return float(f"{acceptance_probability * 100:.2f}") |
| |
|
| | def generate_report(analysis_data: Dict) -> str: |
| | try: |
| | voice = analysis_data.get('voice_analysis', {}) |
| | voice_interpretation = generate_voice_interpretation(voice) |
| | interviewee_responses = [f"- {u['text']}" for u in analysis_data['transcript'] if u['role'] == 'Interviewee'][:3] |
| | acceptance_prob = analysis_data.get('acceptance_probability', 50.0) |
| | acceptance_line = f"\n**Suitability Score: {acceptance_prob:.2f}%**\n" |
| | if acceptance_prob >= 80: |
| | acceptance_line += "HR Verdict: Outstanding candidate, recommended for immediate advancement." |
| | elif acceptance_prob >= 60: |
| | acceptance_line += "HR Verdict: Strong candidate, suitable for further evaluation." |
| | elif acceptance_prob >= 40: |
| | acceptance_line += "HR Verdict: Moderate potential, needs additional assessment." |
| | else: |
| | acceptance_line += "HR Verdict: Limited fit, significant improvement required." |
| | prompt = f""" |
| | You are EvalBot, a senior HR consultant delivering a concise, professional interview analysis report. Use clear headings, bullet points ('-'), complete sentences, and formal language. Avoid redundancy, vague terms, and special characters that could break formatting. Ensure each section is unique and actionable. |
| | {acceptance_line} |
| | **1. Executive Summary** |
| | - Provide a narrative overview of the candidate’s performance, highlighting key strengths and fit. |
| | - Duration: {analysis_data['text_analysis']['total_duration']:.2f} seconds |
| | - Speaker Turns: {analysis_data['text_analysis']['speaker_turns']} |
| | - Participants: {', '.join(sorted(set(u['speaker'] for u in analysis_data['transcript'])))} |
| | **2. Communication and Vocal Dynamics** |
| | - Evaluate vocal delivery (rate, fluency, confidence) with specific insights. |
| | {voice_interpretation} |
| | **3. Competency and Content** |
| | - Assess leadership, problem-solving, communication, and adaptability with clear examples. |
| | - List strengths and growth areas separately, using quantifiable achievements where possible. |
| | - Sample responses: |
| | {chr(10).join(interviewee_responses)} |
| | **4. Role Fit and Potential** |
| | - Analyze cultural fit, role readiness, and long-term growth potential with specific alignment to role requirements. |
| | **5. Recommendations** |
| | - Provide prioritized development strategies (e.g., communication training, technical assessments). |
| | - Suggest specific next steps for hiring managers (e.g., advance, schedule tests). |
| | """ |
| | response = gemini_model.generate_content(prompt) |
| | return re.sub(r'[^\x00-\x7F]+|[()]+', '', response.text) |
| | except Exception as e: |
| | logger.error(f"Report generation failed: {str(e)}") |
| | return f"Error generating report: {str(e)}" |
| |
|
| | def create_pdf_report(analysis_data: Dict, output_path: str, gemini_report_text: str) -> bool: |
| | try: |
| | doc = SimpleDocTemplate(output_path, pagesize=letter, |
| | rightMargin=0.75*inch, leftMargin=0.75*inch, |
| | topMargin=1*inch, bottomMargin=1*inch) |
| | styles = getSampleStyleSheet() |
| | h1 = ParagraphStyle(name='Heading1', fontSize=18, leading=22, spaceAfter=16, alignment=1, textColor=colors.HexColor('#003087'), fontName='Helvetica-Bold') |
| | h2 = ParagraphStyle(name='Heading2', fontSize=13, leading=15, spaceBefore=10, spaceAfter=6, textColor=colors.HexColor('#0050BC'), fontName='Helvetica-Bold') |
| | h3 = ParagraphStyle(name='Heading3', fontSize=9, leading=11, spaceBefore=6, spaceAfter=4, textColor=colors.HexColor('#3F7CFF'), fontName='Helvetica') |
| | body_text = ParagraphStyle(name='BodyText', fontSize=8, leading=10, spaceAfter=4, fontName='Helvetica', textColor=colors.HexColor('#333333')) |
| | bullet_style = ParagraphStyle(name='Bullet', parent=body_text, leftIndent=16, bulletIndent=6, fontName='Helvetica', bulletFontName='Helvetica', bulletFontSize=8) |
| | |
| | story = [] |
| |
|
| | def header_footer(canvas, doc): |
| | canvas.saveState() |
| | canvas.setFont('Helvetica', 7) |
| | canvas.setFillColor(colors.HexColor('#666666')) |
| | canvas.drawString(doc.leftMargin, 0.5*inch, f"Page {doc.page} | EvalBot HR Interview Report | Confidential") |
| | canvas.setStrokeColor(colors.HexColor('#0050BC')) |
| | canvas.setLineWidth(0.5) |
| | canvas.line(doc.leftMargin, doc.height + 0.9*inch, doc.width + doc.leftMargin, doc.height + 0.9*inch) |
| | canvas.setFont('Helvetica-Bold', 8) |
| | canvas.drawString(doc.leftMargin, doc.height + 0.95*inch, "Candidate Interview Analysis") |
| | canvas.drawRightString(doc.width + doc.leftMargin, doc.height + 0.95*inch, time.strftime('%B %d, %Y')) |
| | canvas.restoreState() |
| |
|
| | |
| | story.append(Paragraph("Candidate Interview Analysis", h1)) |
| | story.append(Paragraph(f"Generated: {time.strftime('%B %d, %Y')}", ParagraphStyle(name='Date', alignment=1, fontSize=8, textColor=colors.HexColor('#666666'), fontName='Helvetica'))) |
| | story.append(Spacer(1, 0.3*inch)) |
| | acceptance_prob = float(np.mean([np.mean([np.mean([analysis_data['acceptance_probability'], 0.0])])])) |
| | story.append(Paragraph("Hiring Suitability Snapshot", h2)) |
| | prob_color = colors.HexColor('#2E7D32') if acceptance_prob >= 80 else (colors.HexColor('#F57C00') if acceptance_prob >= 60 else colors.HexColor('#D32F2F')) |
| | story.append(Paragraph(f"Suitability Score: <font size=14 color='{prob_color.hexval()}'>{acceptance_prob:.2f}%</font>", |
| | ParagraphStyle(name='Prob', fontSize=10, spaceAfter=8, alignment=1, fontName='Helvetica-Bold'))) |
| | if acceptance_prob >= 80: |
| | story.append(Paragraph("<b>HR Verdict:</b> Outstanding candidate, recommended for immediate advancement.", body_text)) |
| | elif acceptance_prob >= 60: |
| | story.append(Paragraph("<b>HR Verdict:</b> Strong candidate, suitable for further evaluation.", body_text)) |
| | elif acceptance_prob >= 40: |
| | story.append(Paragraph("<b>HR Verdict:</b> Moderate potential, needs additional assessment.", body_text)) |
| | else: |
| | story.append(Paragraph("<b>HR Verdict:</b> Limited fit, significant improvement required.", body_text)) |
| | story.append(Spacer(1, 0.2*inch)) |
| | participants = sorted([p for p in set(u['speaker'] for u in analysis_data['transcript']) if p != 'Unknown']) |
| | table_data = [ |
| | ['Metric', 'Value'], |
| | ['Interview Duration', f"{analysis_data['text_analysis']['total_duration']:.1f} seconds"], |
| | ['Speaker Turns', f"{analysis_data['text_analysis']['speaker_turns']}"], |
| | ['Participants', f"{', '.join(participants)}"], |
| | ] |
| | table = Table(table_data, colWidths=[2.2*inch, 3.8*inch]) |
| | table.setStyle(TableStyle([ |
| | ('BACKGROUND', (0,0), (-1,0), colors.HexColor('#0050BC')), |
| | ('TEXTCOLOR', (0,0), (-1,0), colors.white), |
| | ('ALIGN', (0,0), (-1,-1), 'LEFT'), |
| | ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), |
| | ('FONTNAME', (0,0), (-1,0), 'Helvetica-Bold'), |
| | ('FONTSIZE', (0,0), (-1,-1), 8), |
| | ('BOTTOMPADDING', (0,0), (-1,0), 6), |
| | ('TOPPADDING', (0,0), (-1,0), 6), |
| | ('BACKGROUND', (0,1), (-1,-1), colors.HexColor('#F5F6FA'),), |
| | ('GRID', (0,0), (-1,-1), 0.4, colors.HexColor('#DDE4EB')), |
| | ])) |
| | story.append(table) |
| | story.append(Spacer(1, 0.3*inch)) |
| | story.append(Paragraph("Prepared by: EvalBot - AI-Powered HR Analysis", body_text)) |
| | story.append(PageBreak()) |
| |
|
| | |
| | story.append(Paragraph("Detailed Candidate Evaluation", h1)) |
| | |
| | |
| | story.append(Paragraph("1. Communication & Vocal Dynamics", h2)) |
| | voice_analysis = analysis_data.get('voice_analysis', {}) |
| | if voice_analysis' and 'error' not in voice_analysis: |
| | table_data = [ |
| | ['Metric', 'Value', 'HR Insight'], |
| | ['Speaking Rate', f"{voice_analysis.get('speaking_rate', 0):.2f} words/sec", 'Benchmark: 2.0-3.0 wps; impacts clarity'], |
| | ['Filler Words', f"{voice_analysis.get('filler_ratio', 0) * 100:.1f}%', 'High usage reduces credibility'], |
| | ['Anxiety', voice_analysis.get('interpretation', {}).get('anxiety_level', 'N/A'), f"Score: {voice_analysis.get('composite_scores', {}).get('anxiety', 0):.3f}"], |
| | ['Confidence', voice_analysis.get('interpretation', {}).get('confidence_level', 'N/A'), f"Score: {voice_analysis.get('composite_scores', {}).get('confidence', 0):.3f}"], |
| | ['Fluency', voice_analysis.get('interpretation', {}).get('fluency_level', 'N/A'), 'Drives engagement'], |
| | ] |
| | table = Table(table_data, colWidths=[1.5*inch, 1.3*inch, 3.2*inch]) |
| | table.setStyle(TableStyle([ |
| | ('BACKGROUND', (0,0), (-1,0)), colors.HexColor('#0050BC')), |
| | ('TEXTCOLOR', (0,0), (-1,-0)), colors.white), |
| | ('ALIGN', (0,0), (-1,-1), 'LEFT'), |
| | ('VALIGN', (0,0), (-1,-1), 'MIDDLE'), |
| | ('FONTNAME', (0,0), (-1,-0)), 'Helvetica-Bold'), |
| | ('FONTSIZE', (0,0), (-1,-1), 8), |
| | ('BOTTOMPADDING', (0,0), (-1,-0)), 6), |
| | ('TOPPADDING', (0,0), (0,-1), 6), |
| | ('BACKGROUND', (0,1), (-1,-1), colors.HexColor('#F5F6FA'))), |
| | ('GRID', (0,0), (-1,-1), 0.4, colors.HexColor('#DDE4EB'))), |
| | ])) |
| | story.append(table) |
| | story.append(Spacer(1, 0.15*inch)) |
| | chart_buffer = io.BytesIO() |
| | generate_anxiety_chart(voice_analysis.get('composite_scores', {}), chart_buffer) |
| | chart_buffer.seek(0) |
| | img = Image(chart_buffer, width=4.2*inch, height=2.8*inch) |
| | img.hAlign = 'CENTER' |
| | story.append(img) |
| | else: |
| | story.append(Paragraph("Voice analysis unavailable.", body_text)) |
| | story.append(Spacer(1, 0.15*inch)) |
| |
|
| | |
| | sections = { |
| | "Executive Summary": [], |
| | "Communication": [], |
| | "Competency": {"Strengths": [], "Growth Areas": []}, |
| | "Recommendations": {"Development": [], "Next Steps": []}, |
| | "Role Fit": [], |
| | } |
| | current_section = None |
| | current_subsection = None |
| | lines = gemini_report_text.split('\n') |
| | for line in lines: |
| | line = line.strip() |
| | if not line: continue |
| | if line.startswith('**') and line.endswith('**'): |
| | section_title = line.strip('**').strip() |
| | if section_title.startswith(('1.', '2.', '3.', '4.', '5.')): |
| | section_title = section_title[2:].strip() |
| | if 'Executive Summary' in section_title: |
| | current_section = 'Executive Summary' |
| | current_subsection = None |
| | elif 'Communication' in section_title: |
| | current_section = 'Communication' |
| | current_subsection = None |
| | elif 'Competency' in section_title: |
| | current_section = 'Competency' |
| | current_subsection = None |
| | elif 'Role Fit' in section_title: |
| | current_section = 'Role Fit' |
| | current_subsection = None |
| | elif 'Recommendations' in section_title: |
| | current_section = 'Recommendations' |
| | current_subsection = None |
| | elif line.startswith('-') and current_section: |
| | clean_line = line.lstrip('-').strip() |
| | if not clean_line: continue |
| | clean_line = re.sub(r'[()]', '', clean_line) |
| | if current_section == 'Competency': |
| | if any(k in clean_line.lower() for k in ['leader', 'leadership', 'problem', 'commun', 'adapt', 'strength']): |
| | current_subsection = 'Strengths' |
| | elif any(k in clean_line.lower() for k in ['improv', 'grow', 'depth']): |
| | current_subsection = 'Growth Areas' |
| | if current_subsection: |
| | sections[current_section][current_subsection].append(clean_line) |
| | elif current_section == 'Recommendations': |
| | if any(k in clean_line.lower() for k in ['commun', 'tech', 'depth', 'pres']): |
| | current_subsection = 'Development' |
| | elif any(k in clean_line.lower() for k in ['adv', 'train', 'assess', 'next', 'mentor']): |
| | current_subsection = 'Next Steps' |
| | if current_subsection: |
| | sections[current_section][current_subsection].append(clean_line) |
| | else: |
| | sections[current_section].append(clean_line) |
| |
|
| | |
| | story.append(Paragraph("2. Executive Summary", h2)) |
| | if sections['Executive Summary']: |
| | for line in sections['Executive Summary']: |
| | story.append(Paragraph(line, bullet_style)) |
| | else: |
| | story.append(Paragraph("No summary provided.", body_text)) |
| | story.append(Spacer(1, 0.15*inch)) |
| |
|
| | |
| | story.append(Paragraph("3. Competency & Content", h2)) |
| | story.append(Paragraph("Strengths", h3)) |
| | if sections['Competency']['Strengths']: |
| | for line in sections['Competency']['Strengths']: |
| | story.append(Paragraph(line, bullet_style)) |
| | else: |
| | story.append(Paragraph("No strengths identified.", body_text)) |
| | story.append(Spacer(1, 0.1*inch)) |
| | story.append(Paragraph("Growth Areas", h3)) |
| | if sections['Competency']['Growth Areas']: |
| | for line in sections['Competency']['Growth Areas']: |
| | story.append(Paragraph(line, bullet_style)) |
| | else: |
| | story.append(Paragraph("No growth areas identified; maintain current strengths.", body_text)) |
| | story.append(Spacer(1, 0.15*inch)) |
| |
|
| | |
| | story.append(Paragraph("4. Role Fit & Potential", h2)) |
| | if sections['Role Fit']: |
| | for line in sections['Role Fit']: |
| | story.append(Paragraph(line, bullet_style)) |
| | else: |
| | story.append(Paragraph("No fit analysis provided.", body_text)) |
| | story.append(Spacer(1, 0.15*inch)) |
| |
|
| | |
| | story.append(Paragraph("5. Recommendations", h2)) |
| | story.append(Paragraph("Development Priorities", h3)) |
| | if sections['Recommendations']['Development']: |
| | for line in sections['Recommendations']['Development']: |
| | story.append(Paragraph(line, bullet_style)) |
| | else: |
| | story.append(Paragraph("No development priorities specified.", body_text)) |
| | story.append(Spacer(1, 0.1*inch)) |
| | story.append(Paragraph("Next Steps for Hiring Managers", h3)) |
| | if sections['Recommendations']['Next Steps']: |
| | for line in sections['Recommendations']['Next Steps']: |
| | story.append(Paragraph(line, bullet_style)) |
| | else: |
| | story.append(Paragraph("No next steps provided.", body_text)) |
| | story.append(Spacer(1, 0.15*inch)) |
| | story.append(Paragraph("This report provides actionable insights to support hiring and candidate development.", body_text)) |
| |
|
| | doc.build(story, onFirstPage=header_footer, onLaterPages=header_footer) |
| | logger.info(f"PDF report successfully generated at {output_path}") |
| | return True |
| | except Exception as e: |
| | logger.error(f"PDF generation failed: {str(e)}", exc_info=True) |
| | return False |
| |
|
| | def convert_to_serializable(obj): |
| | if isinstance(obj, np.generic): return obj.item() |
| | if isinstance(obj, dict): return {k: convert_to_serializable(v) for k, v in obj.items()} |
| | if isinstance(obj, list): return [convert_to_serializable(item) for item in obj] |
| | if isinstance(obj, np.ndarray): return obj.tolist() |
| | return obj |
| |
|
| | def process_interview(audio_url: str) -> Dict: |
| | """Process a single audio URL and generate analysis report.""" |
| | local_audio_path = None |
| | wav_file = None |
| | is_downloaded = False |
| | try: |
| | if not isinstance(audio_url, str): |
| | raise ValueError("Input must be a single URL string") |
| | logger.info(f"Starting processing for {audio_url}") |
| | if audio_url.startswith(('http://', 'https://')): |
| | local_audio_path = download_audio_from_url(audio_url) |
| | is_downloaded = True |
| | else: |
| | local_audio_path = audio_url |
| | if not os.path.exists(local_audio_path): |
| | raise FileNotFoundError(f"Local audio file not found: {local_audio_path}") |
| | wav_file = convert_to_wav(local_audio_path) |
| | transcript = transcribe(wav_file) |
| | for utterance in transcript['utterances']: |
| | utterance['prosodic_features'] = extract_prosodic_features(wav_file, utterance['start'], utterance['end']) |
| | utterances_with_speakers = identify_speakers(transcript, wav_file) |
| | clf, vectorizer, scaler = None, None, None |
| | if os.path.exists(os.path.join(OUTPUT_DIR, 'role_classifier.pkl')): |
| | clf = joblib.load(os.path.join(OUTPUT_DIR, 'role_classifier.pkl')) |
| | vectorizer = joblib.load(os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl')) |
| | scaler = joblib.load(os.path.join(OUTPUT_DIR, 'feature_scaler.pkl')) |
| | else: |
| | clf, vectorizer, scaler = train_role_classifier(utterances_with_speakers) |
| | classified_utterances = classify_roles(utterances_with_speakers, clf, vectorizer, scaler) |
| | voice_analysis = analyze_interviewee_voice(wav_file, classified_utterances) |
| | analysis_data = { |
| | 'transcript': classified_utterances, |
| | 'speakers': list(set(u['speaker'] for u in classified_utterances)), |
| | 'voice_analysis': voice_analysis, |
| | 'text_analysis': { |
| | 'total_duration': sum(u['prosodic_features']['duration'] for u in classified_utterances), |
| | 'speaker_turns': len(classified_utterances) |
| | } |
| | } |
| | analysis_data['acceptance_probability'] = calculate_acceptance_probability(analysis_data) |
| | gemini_report_text = generate_report(analysis_data) |
| | base_name = str(uuid.uuid4()) |
| | pdf_path = os.path.join(OUTPUT_DIR, f"{base_name}_report.pdf") |
| | json_path = os.path.join(OUTPUT_DIR, f"{base_name}_analysis.json") |
| | pdf_success = create_pdf_report(analysis_data, pdf_path, gemini_report_text) |
| | with open(json_path, 'w') as f: |
| | serializable_data = convert_to_serializable(analysis_data) |
| | json.dump(serializable_data, f, indent=2) |
| | if not pdf_success: |
| | logger.warning(f"PDF report failed to generate for {audio_url}") |
| | return { |
| | 'pdf_path': None, |
| | 'json_path': json_path, |
| | 'error': 'PDF generation failed' |
| | } |
| | logger.info(f"Processing completed for {audio_url}") |
| | return {'pdf_path': pdf_path, 'json_path': json_path} |
| | except Exception as e: |
| | logger.error(f"Processing failed for {audio_url}: {str(e)}", exc_info=True) |
| | base_name = str(uuid.uuid4()) |
| | json_path = os.path.join(OUTPUT_DIR, f"{base_name}_analysis.json") |
| | with open(json_path, 'w') as f: |
| | json.dump({'error': str(e)}, f, indent=2) |
| | return { |
| | 'pdf_path': None, |
| | 'json_path': json_path, |
| | 'error': str(e) |
| | } |
| | finally: |
| | if wav_file and os.path.exists(wav_file): |
| | try: |
| | os.remove(wav_file) |
| | except Exception as e: |
| | logger.error(f"Failed to clean up wav file {wav_file}: {str(e)}") |
| | if is_downloaded and local_audio_path and os.path.exists(local_audio_path): |
| | try: |
| | os.remove(local_audio_path) |
| | logger.info(f"Cleaned up temporary audio file: {local_audio_path}") |
| | except Exception as e: |
| | logger.error(f"Failed to clean up local audio file {local_audio_path}: {str(e)}") |