Audio-EvalBot / process_interview.py
norhan12's picture
Initial project setup with multi-URL API
5327928
raw
history blame
39.4 kB
import os
import torch
import numpy as np
import uuid
import requests
import time
import json
from pydub import AudioSegment
import wave
from nemo.collections.asr.models import EncDecSpeakerLabelModel
from pinecone import Pinecone, ServerlessSpec
import librosa
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
import re
from typing import Dict, List, Tuple
import logging
# --- Imports for enhanced PDF ---
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.lib import colors
# --- End Imports for enhanced PDF ---
from transformers import AutoTokenizer, AutoModel
import spacy
import google.generativeai as genai
import joblib
from concurrent.futures import ThreadPoolExecutor
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.getLogger("nemo_logging").setLevel(logging.ERROR)
# Configuration
AUDIO_DIR = "./uploads"
OUTPUT_DIR = "./processed_audio"
os.makedirs(OUTPUT_DIR, exist_ok=True)
# API Keys
PINECONE_KEY = os.getenv("PINECONE_KEY")
ASSEMBLYAI_KEY = os.getenv("ASSEMBLYAI_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
# Initialize services
def initialize_services():
try:
pc = Pinecone(api_key=PINECONE_KEY)
index_name = "interview-speaker-embeddings"
if index_name not in pc.list_indexes().names():
pc.create_index(
name=index_name,
dimension=192,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
index = pc.Index(index_name)
genai.configure(api_key=GEMINI_API_KEY)
gemini_model = genai.GenerativeModel('gemini-1.5-flash')
return index, gemini_model
except Exception as e:
logger.error(f"Error initializing services: {str(e)}")
raise
index, gemini_model = initialize_services()
# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {device}")
def load_speaker_model():
try:
import torch
torch.set_num_threads(5)
model = EncDecSpeakerLabelModel.from_pretrained(
"nvidia/speakerverification_en_titanet_large",
map_location=torch.device('cpu')
)
model.eval()
return model
except Exception as e:
logger.error(f"Model loading failed: {str(e)}")
raise RuntimeError("Could not load speaker verification model")
# Load ML models
def load_models():
speaker_model = load_speaker_model()
nlp = spacy.load("en_core_web_sm")
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
llm_model = AutoModel.from_pretrained("distilbert-base-uncased").to(device)
llm_model.eval()
return speaker_model, nlp, tokenizer, llm_model
speaker_model, nlp, tokenizer, llm_model = load_models()
# Audio processing functions
def convert_to_wav(audio_path: str, output_dir: str = OUTPUT_DIR) -> str:
try:
audio = AudioSegment.from_file(audio_path)
if audio.channels > 1:
audio = audio.set_channels(1)
audio = audio.set_frame_rate(16000)
wav_file = os.path.join(output_dir, f"{uuid.uuid4()}.wav")
audio.export(wav_file, format="wav")
return wav_file
except Exception as e:
logger.error(f"Audio conversion failed: {str(e)}")
raise
def extract_prosodic_features(audio_path: str, start_ms: int, end_ms: int) -> Dict:
try:
audio = AudioSegment.from_file(audio_path)
segment = audio[start_ms:end_ms]
temp_path = os.path.join(OUTPUT_DIR, f"temp_{uuid.uuid4()}.wav")
segment.export(temp_path, format="wav")
y, sr = librosa.load(temp_path, sr=16000)
pitches = librosa.piptrack(y=y, sr=sr)[0]
pitches = pitches[pitches > 0]
features = {
'duration': (end_ms - start_ms) / 1000,
'mean_pitch': float(np.mean(pitches)) if len(pitches) > 0 else 0.0,
'min_pitch': float(np.min(pitches)) if len(pitches) > 0 else 0.0,
'max_pitch': float(np.max(pitches)) if len(pitches) > 0 else 0.0,
'pitch_sd': float(np.std(pitches)) if len(pitches) > 0 else 0.0,
'intensityMean': float(np.mean(librosa.feature.rms(y=y)[0])),
'intensityMin': float(np.min(librosa.feature.rms(y=y)[0])),
'intensityMax': float(np.max(librosa.feature.rms(y=y)[0])),
'intensitySD': float(np.std(librosa.feature.rms(y=y)[0])),
}
os.remove(temp_path)
return features
except Exception as e:
logger.error(f"Feature extraction failed: {str(e)}")
return {
'duration': (end_ms - start_ms) / 1000,
'mean_pitch': 0.0,
'min_pitch': 0.0,
'max_pitch': 0.0,
'pitch_sd': 0.0,
'intensityMean': 0.0,
'intensityMin': 0.0,
'intensityMax': 0.0,
'intensitySD': 0.0,
}
def transcribe(audio_path: str) -> Dict:
try:
with open(audio_path, 'rb') as f:
upload_response = requests.post(
"https://api.assemblyai.com/v2/upload",
headers={"authorization": ASSEMBLYAI_KEY},
data=f
)
audio_url = upload_response.json()['upload_url']
transcript_response = requests.post(
"https://api.assemblyai.com/v2/transcript",
headers={"authorization": ASSEMBLYAI_KEY},
json={
"audio_url": audio_url,
"speaker_labels": True,
"filter_profanity": True
}
)
transcript_id = transcript_response.json()['id']
while True:
result = requests.get(
f"https://api.assemblyai.com/v2/transcript/{transcript_id}",
headers={"authorization": ASSEMBLYAI_KEY}
).json()
if result['status'] == 'completed':
return result
elif result['status'] == 'error':
raise Exception(result['error'])
time.sleep(5)
except Exception as e:
logger.error(f"Transcription failed: {str(e)}")
raise
def process_utterance(utterance, full_audio, wav_file):
try:
start = utterance['start']
end = utterance['end']
segment = full_audio[start:end]
temp_path = os.path.join(OUTPUT_DIR, f"temp_{uuid.uuid4()}.wav")
segment.export(temp_path, format="wav")
with torch.no_grad():
embedding = speaker_model.get_embedding(temp_path).to(device)
query_result = index.query(
vector=embedding.cpu().numpy().tolist(),
top_k=1,
include_metadata=True
)
if query_result['matches'] and query_result['matches'][0]['score'] > 0.7:
speaker_id = query_result['matches'][0]['id']
speaker_name = query_result['matches'][0]['metadata']['speaker_name']
else:
speaker_id = f"unknown_{uuid.uuid4().hex[:6]}"
speaker_name = f"Speaker_{speaker_id[-4:]}"
index.upsert([(speaker_id, embedding.tolist(), {"speaker_name": speaker_name})])
os.remove(temp_path)
return {
**utterance,
'speaker': speaker_name,
'speaker_id': speaker_id,
'embedding': embedding.cpu().numpy().tolist()
}
except Exception as e:
logger.error(f"Utterance processing failed: {str(e)}")
return {
**utterance,
'speaker': 'Unknown',
'speaker_id': 'unknown',
'embedding': None
}
def identify_speakers(transcript: Dict, wav_file: str) -> List[Dict]:
try:
full_audio = AudioSegment.from_wav(wav_file)
utterances = transcript['utterances']
with ThreadPoolExecutor(max_workers=5) as executor: # Changed to 5 workers
futures = [
executor.submit(process_utterance, utterance, full_audio, wav_file)
for utterance in utterances
]
results = [f.result() for f in futures]
return results
except Exception as e:
logger.error(f"Speaker identification failed: {str(e)}")
raise
def train_role_classifier(utterances: List[Dict]):
try:
texts = [u['text'] for u in utterances]
vectorizer = TfidfVectorizer(max_features=500, ngram_range=(1, 2))
X_text = vectorizer.fit_transform(texts)
features = []
labels = []
for i, utterance in enumerate(utterances):
prosodic = utterance['prosodic_features']
feat = [
prosodic['duration'],
prosodic['mean_pitch'],
prosodic['min_pitch'],
prosodic['max_pitch'],
prosodic['pitch_sd'],
prosodic['intensityMean'],
prosodic['intensityMin'],
prosodic['intensityMax'],
prosodic['intensitySD'],
]
feat.extend(X_text[i].toarray()[0].tolist())
doc = nlp(utterance['text'])
feat.extend([
int(utterance['text'].endswith('?')),
len(re.findall(r'\b(why|how|what|when|where|who|which)\b', utterance['text'].lower())),
len(utterance['text'].split()),
sum(1 for token in doc if token.pos_ == 'VERB'),
sum(1 for token in doc if token.pos_ == 'NOUN')
])
features.append(feat)
labels.append(0 if i % 2 == 0 else 1)
scaler = StandardScaler()
X = scaler.fit_transform(features)
clf = RandomForestClassifier(
n_estimators=150,
max_depth=10,
random_state=42,
class_weight='balanced'
)
clf.fit(X, labels)
joblib.dump(clf, os.path.join(OUTPUT_DIR, 'role_classifier.pkl'))
joblib.dump(vectorizer, os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl'))
joblib.dump(scaler, os.path.join(OUTPUT_DIR, 'feature_scaler.pkl'))
return clf, vectorizer, scaler
except Exception as e:
logger.error(f"Classifier training failed: {str(e)}")
raise
def classify_roles(utterances: List[Dict], clf, vectorizer, scaler):
try:
texts = [u['text'] for u in utterances]
X_text = vectorizer.transform(texts)
results = []
for i, utterance in enumerate(utterances):
prosodic = utterance['prosodic_features']
feat = [
prosodic['duration'],
prosodic['mean_pitch'],
prosodic['min_pitch'],
prosodic['max_pitch'],
prosodic['pitch_sd'],
prosodic['intensityMean'],
prosodic['intensityMin'],
prosodic['intensityMax'],
prosodic['intensitySD'],
]
feat.extend(X_text[i].toarray()[0].tolist())
doc = nlp(utterance['text'])
feat.extend([
int(utterance['text'].endswith('?')),
len(re.findall(r'\b(why|how|what|when|where|who|which)\b', utterance['text'].lower())),
len(utterance['text'].split()),
sum(1 for token in doc if token.pos_ == 'VERB'),
sum(1 for token in doc if token.pos_ == 'NOUN')
])
X = scaler.transform([feat])
role = 'Interviewer' if clf.predict(X)[0] == 0 else 'Interviewee'
results.append({**utterance, 'role': role})
return results
except Exception as e:
logger.error(f"Role classification failed: {str(e)}")
raise
def analyze_interviewee_voice(audio_path: str, utterances: List[Dict]) -> Dict:
try:
y, sr = librosa.load(audio_path, sr=16000)
interviewee_utterances = [u for u in utterances if u['role'] == 'Interviewee']
if not interviewee_utterances:
return {'error': 'No interviewee utterances found'}
segments = []
for u in interviewee_utterances:
start = int(u['start'] * sr / 1000)
end = int(u['end'] * sr / 1000)
segments.append(y[start:end])
combined_audio = np.concatenate(segments)
total_duration = sum(u['prosodic_features']['duration'] for u in interviewee_utterances)
total_words = sum(len(u['text'].split()) for u in interviewee_utterances)
speaking_rate = total_words / total_duration if total_duration > 0 else 0
filler_words = ['um', 'uh', 'like', 'you know', 'so', 'i mean']
filler_count = sum(
sum(u['text'].lower().count(fw) for fw in filler_words)
for u in interviewee_utterances
)
filler_ratio = filler_count / total_words if total_words > 0 else 0
all_words = ' '.join(u['text'].lower() for u in interviewee_utterances).split()
word_counts = {}
for i in range(len(all_words) - 1):
bigram = (all_words[i], all_words[i + 1])
word_counts[bigram] = word_counts.get(bigram, 0) + 1
repetition_score = sum(1 for count in word_counts.values() if count > 1) / len(
word_counts) if word_counts else 0
pitches = []
for segment in segments:
f0, voiced_flag, _ = librosa.pyin(segment, fmin=80, fmax=300, sr=sr)
pitches.extend(f0[voiced_flag])
pitch_mean = np.mean(pitches) if len(pitches) > 0 else 0
pitch_std = np.std(pitches) if len(pitches) > 0 else 0
jitter = np.mean(np.abs(np.diff(pitches))) / pitch_mean if len(pitches) > 1 and pitch_mean > 0 else 0
intensities = []
for segment in segments:
rms = librosa.feature.rms(y=segment)[0]
intensities.extend(rms)
intensity_mean = np.mean(intensities) if intensities else 0
intensity_std = np.std(intensities) if intensities else 0
shimmer = np.mean(np.abs(np.diff(intensities))) / intensity_mean if len(
intensities) > 1 and intensity_mean > 0 else 0
anxiety_score = 0.6 * (pitch_std / pitch_mean) + 0.4 * (jitter + shimmer) if pitch_mean > 0 else 0
confidence_score = 0.7 * (1 / (1 + intensity_std)) + 0.3 * (1 / (1 + filler_ratio))
hesitation_score = filler_ratio + repetition_score
anxiety_level = 'high' if anxiety_score > 0.15 else 'moderate' if anxiety_score > 0.07 else 'low'
confidence_level = 'high' if confidence_score > 0.7 else 'moderate' if confidence_score > 0.5 else 'low'
fluency_level = 'fluent' if (filler_ratio < 0.05 and repetition_score < 0.1) else 'moderate' if (
filler_ratio < 0.1 and repetition_score < 0.2) else 'disfluent'
return {
'speaking_rate': float(round(speaking_rate, 2)),
'filler_ratio': float(round(filler_ratio, 4)),
'repetition_score': float(round(repetition_score, 4)),
'pitch_analysis': {
'mean': float(round(pitch_mean, 2)),
'std_dev': float(round(pitch_std, 2)),
'jitter': float(round(jitter, 4))
},
'intensity_analysis': {
'mean': float(round(intensity_mean, 2)),
'std_dev': float(round(intensity_std, 2)),
'shimmer': float(round(shimmer, 4))
},
'composite_scores': {
'anxiety': float(round(anxiety_score, 4)),
'confidence': float(round(confidence_score, 4)),
'hesitation': float(round(hesitation_score, 4))
},
'interpretation': {
'anxiety_level': anxiety_level,
'confidence_level': confidence_level,
'fluency_level': fluency_level
}
}
except Exception as e:
logger.error(f"Voice analysis failed: {str(e)}")
return {'error': str(e)}
def generate_voice_interpretation(analysis: Dict) -> str:
# This function is used to provide the text interpretation for Gemini's prompt.
if 'error' in analysis:
return "Voice analysis not available."
interpretation_lines = []
interpretation_lines.append("Voice Analysis Summary:")
interpretation_lines.append(f"- Speaking Rate: {analysis['speaking_rate']} words/sec (average)")
interpretation_lines.append(f"- Filler Words: {analysis['filler_ratio'] * 100:.1f}% of words")
interpretation_lines.append(f"- Repetition Score: {analysis['repetition_score']:.3f}")
interpretation_lines.append(
f"- Anxiety Level: {analysis['interpretation']['anxiety_level'].upper()} (score: {analysis['composite_scores']['anxiety']:.3f})")
interpretation_lines.append(
f"- Confidence Level: {analysis['interpretation']['confidence_level'].upper()} (score: {analysis['composite_scores']['confidence']:.3f})")
interpretation_lines.append(f"- Fluency: {analysis['interpretation']['fluency_level'].upper()}")
interpretation_lines.append("")
interpretation_lines.append("Detailed Interpretation:")
interpretation_lines.append(
"1. A higher speaking rate indicates faster speech, which can suggest nervousness or enthusiasm.")
interpretation_lines.append("2. Filler words and repetitions reduce speech clarity and professionalism.")
interpretation_lines.append("3. Anxiety is measured through pitch variability and voice instability.")
interpretation_lines.append("4. Confidence is assessed through voice intensity and stability.")
interpretation_lines.append("5. Fluency combines filler words and repetition metrics.")
return "\n".join(interpretation_lines)
# --- Chart Generation Function ---
# Removed function as charts are no longer included
def generate_anxiety_confidence_chart(composite_scores: Dict, chart_path: str):
try:
labels = ['Anxiety', 'Confidence']
scores = [composite_scores.get('anxiety', 0), composite_scores.get('confidence', 0)]
fig, ax = plt.subplots(figsize=(4, 2.5))
ax.bar(labels, scores, color=['lightcoral', 'lightskyblue'])
ax.set_ylabel('Score')
ax.set_title('Anxiety vs. Confidence Scores')
ax.set_ylim(0, 1.0)
for i, v in enumerate(scores):
ax.text(i, v + 0.05, f"{v:.2f}", color='black', ha='center', fontweight='bold')
plt.tight_layout()
plt.savefig(chart_path)
plt.close(fig)
except Exception as e:
logger.error(f"Error generating chart: {str(e)}")
# --- Acceptance Probability Calculation ---
def calculate_acceptance_probability(analysis_data: Dict) -> float:
"""
Calculates a hypothetical acceptance probability based on voice and content analysis.
This is a simplified, heuristic model and can be refined with more data/ML.
"""
voice = analysis_data.get('voice_analysis', {})
if 'error' in voice:
return 0.0 # Cannot calculate if voice analysis failed
# Weights for different factors (adjust these to fine-tune the model)
w_confidence = 0.4
w_anxiety = -0.3 # Negative weight for anxiety
w_fluency = 0.2
w_speaking_rate = 0.1 # Ideal rate gets higher score
w_filler_repetition = -0.1 # Negative weight for filler/repetition
w_content_strengths = 0.2 # Placeholder, ideally from deeper content analysis
# Normalize/interpret scores
confidence_score = voice.get('composite_scores', {}).get('confidence', 0.0)
anxiety_score = voice.get('composite_scores', {}).get('anxiety', 0.0)
fluency_level = voice.get('interpretation', {}).get('fluency_level', 'disfluent')
speaking_rate = voice.get('speaking_rate', 0.0)
filler_ratio = voice.get('filler_ratio', 0.0)
repetition_score = voice.get('repetition_score', 0.0)
# Fluency mapping (higher score for more fluent)
fluency_map = {'fluent': 1.0, 'moderate': 0.5, 'disfluent': 0.0}
fluency_val = fluency_map.get(fluency_level, 0.0)
# Speaking rate scoring (e.g., ideal is around 2.5 words/sec, gets lower for too fast/slow)
# This is a simple inverse of deviation from ideal
ideal_speaking_rate = 2.5
speaking_rate_deviation = abs(speaking_rate - ideal_speaking_rate)
speaking_rate_score = max(0, 1 - (speaking_rate_deviation / ideal_speaking_rate)) # Max 1.0, min 0.0
# Filler/Repetition score (lower is better, so 1 - score)
filler_repetition_composite = (filler_ratio + repetition_score) / 2 # Average them
filler_repetition_score = max(0, 1 - filler_repetition_composite)
# Simplified content strength score (you might need a more sophisticated NLP method here)
# For now, based on presence of strengths in Gemini's content analysis
content_strength_val = 0.0
# This part would ideally come from a structured output from Gemini's content analysis.
# For now, we'll make a simplified assumption based on the analysis data:
# If content analysis found "strengths" (which is likely if Gemini generates a full report)
# This needs refinement if Gemini output is not structured for this.
if analysis_data.get('text_analysis', {}).get('total_duration', 0) > 0: # Basic check if interview happened
content_strength_val = 0.8 # Assume moderate strength if analysis went through
# You could parse gemini_report_text for specific phrases like "Strengths:" and count items.
# Calculate raw score
raw_score = (
confidence_score * w_confidence +
(1 - anxiety_score) * abs(w_anxiety) + # (1 - anxiety) because lower anxiety is better
fluency_val * w_fluency +
speaking_rate_score * w_speaking_rate +
filler_repetition_score * abs(w_filler_repetition) + # Use abs weight as score is already inverted
content_strength_val * w_content_strengths
)
# Normalize to 0-1 and then to percentage
# These max/min values are rough estimates and should be calibrated with real data
min_possible_score = (0 * w_confidence) + (0 * abs(w_anxiety)) + (0 * w_fluency) + (0 * w_speaking_rate) + (
0 * abs(w_filler_repetition)) + (0 * w_content_strengths)
max_possible_score = (1 * w_confidence) + (1 * abs(w_anxiety)) + (1 * w_fluency) + (1 * w_speaking_rate) + (
1 * abs(w_filler_repetition)) + (1 * w_content_strengths)
# Prevent division by zero if all weights are zero or min/max are same
if max_possible_score == min_possible_score:
normalized_score = 0.5 # Default if no variation
else:
normalized_score = (raw_score - min_possible_score) / (max_possible_score - min_possible_score)
acceptance_probability = max(0.0, min(1.0, normalized_score)) # Clamp between 0 and 1
return float(f"{acceptance_probability * 100:.2f}") # Return as percentage
def generate_report(analysis_data: Dict) -> str:
try:
voice = analysis_data.get('voice_analysis', {})
voice_interpretation = generate_voice_interpretation(voice)
interviewee_responses = [
f"Speaker {u['speaker']} ({u['role']}): {u['text']}"
for u in analysis_data['transcript']
if u['role'] == 'Interviewee'
][:5] # Limit to first 5 for prompt brevity
acceptance_prob = analysis_data.get('acceptance_probability', None)
acceptance_line = ""
if acceptance_prob is not None:
acceptance_line = f"\n**Estimated Acceptance Probability: {acceptance_prob:.2f}%**\n"
if acceptance_prob >= 80:
acceptance_line += "This indicates a very strong candidate. Well done!"
elif acceptance_prob >= 50:
acceptance_line += "This indicates a solid candidate with potential for improvement."
else:
acceptance_line += "This candidate may require significant development or may not be a strong fit."
prompt = f"""
As EvalBot, an AI interview analysis system, generate a highly professional, well-structured, and concise interview analysis report.
The report should be suitable for a professional setting and clearly highlight key findings and actionable recommendations.
Use clear headings and subheadings. For bullet points, use '- '.
{acceptance_line}
**1. Executive Summary**
Provide a brief, high-level overview of the interview.
- Overall interview duration: {analysis_data['text_analysis']['total_duration']:.2f} seconds
- Number of speaker turns: {analysis_data['text_analysis']['speaker_turns']}
- Main participants: {', '.join(analysis_data['speakers'])}
**2. Voice Analysis Insights**
Analyze key voice metrics and provide a detailed interpretation.
{voice_interpretation}
**3. Content Analysis & Strengths/Areas for Development**
Analyze the key themes and identify both strengths and areas for development in the interviewee's responses.
Key responses from interviewee (for context):
{chr(10).join(interviewee_responses)}
**4. Actionable Recommendations**
Offer specific, actionable suggestions for improvement.
Focus on:
- Communication Skills (e.g., pacing, clarity, filler words)
- Content Delivery (e.g., quantifying achievements, structuring answers)
- Professional Presentation (e.g., research, specific examples, mock interviews)
"""
response = gemini_model.generate_content(prompt)
return response.text
except Exception as e:
logger.error(f"Report generation failed: {str(e)}")
return f"Error generating report: {str(e)}"
# --- ENHANCED PDF GENERATION FUNCTION (without logo or charts) ---
def create_pdf_report(analysis_data: Dict, output_path: str, gemini_report_text: str):
try:
doc = SimpleDocTemplate(output_path, pagesize=letter)
styles = getSampleStyleSheet()
# Define custom styles
h1 = ParagraphStyle(name='Heading1', parent=styles['h1'], fontSize=16, spaceAfter=14, alignment=1,
textColor=colors.HexColor('#003366'))
h2 = ParagraphStyle(name='Heading2', parent=styles['h2'], fontSize=12, spaceBefore=10, spaceAfter=8,
textColor=colors.HexColor('#336699'))
h3 = ParagraphStyle(name='Heading3', parent=styles['h3'], fontSize=10, spaceBefore=8, spaceAfter=4,
textColor=colors.HexColor('#0055AA'))
body_text = ParagraphStyle(name='BodyText', parent=styles['Normal'], fontSize=9, leading=12, spaceAfter=4)
bullet_style = ParagraphStyle(name='Bullet', parent=styles['Normal'], fontSize=9, leading=12, leftIndent=18,
bulletIndent=9)
story = []
# Title and Date
story.append(Paragraph(f"<b>EvalBot Interview Analysis Report</b>", h1))
story.append(Spacer(1, 0.2 * inch))
story.append(Paragraph(f"<b>Date:</b> {time.strftime('%Y-%m-%d')}", body_text))
story.append(Spacer(1, 0.3 * inch))
# --- Acceptance Probability (New Section) ---
acceptance_prob = analysis_data.get('acceptance_probability', None)
if acceptance_prob is not None:
story.append(Paragraph("<b>Candidate Evaluation Summary</b>", h2))
story.append(Spacer(1, 0.1 * inch))
prob_color = colors.green if acceptance_prob >= 70 else (
colors.orange if acceptance_prob >= 40 else colors.red)
story.append(Paragraph(
f"<font size='12' color='{prob_color.hexval}'><b>Estimated Acceptance Probability: {acceptance_prob:.2f}%</b></font>",
ParagraphStyle(name='AcceptanceProbability', parent=styles['Normal'], fontSize=12, spaceAfter=10,
alignment=1)
))
if acceptance_prob >= 80:
story.append(
Paragraph("This indicates a very strong candidate with high potential. Well done!", body_text))
elif acceptance_prob >= 50:
story.append(Paragraph(
"This candidate shows solid potential but has areas for improvement to become an even stronger fit.",
body_text))
else:
story.append(Paragraph(
"This candidate may require significant development or may not be the ideal fit at this time.",
body_text))
story.append(Spacer(1, 0.3 * inch))
# --- End Acceptance Probability ---
# Parse Gemini's report into sections for better PDF structuring
sections = {}
current_section = None
# Use regex to robustly identify sections, especially with varied bullet points
section_patterns = {
r'^\s*\*\*\s*1\.\s*Executive Summary\s*\*\*': 'Executive Summary',
r'^\s*\*\*\s*2\.\s*Voice Analysis Insights\s*\*\*': 'Voice Analysis Insights',
r'^\s*\*\*\s*3\.\s*Content Analysis & Strengths/Areas for Development\s*\*\*': 'Content Analysis & Strengths/Areas for Development',
r'^\s*\*\*\s*4\.\s*Actionable Recommendations\s*\*\*': 'Actionable Recommendations'
}
for line in gemini_report_text.split('\n'):
matched_section = False
for pattern, section_name in section_patterns.items():
if re.match(pattern, line):
current_section = section_name
sections[current_section] = []
matched_section = True
break
if not matched_section and current_section:
sections[current_section].append(line)
# 1. Executive Summary
story.append(Paragraph("1. Executive Summary", h2))
story.append(Spacer(1, 0.1 * inch))
if 'Executive Summary' in sections:
for line in sections['Executive Summary']:
if line.strip():
story.append(Paragraph(line.strip(), body_text))
story.append(Spacer(1, 0.2 * inch))
# 2. Voice Analysis (Detailed - using Table for summary)
story.append(Paragraph("2. Voice Analysis", h2))
voice_analysis = analysis_data.get('voice_analysis', {})
if voice_analysis and 'error' not in voice_analysis:
# Voice Analysis Summary Table
table_data = [
['Metric', 'Value', 'Interpretation'],
['Speaking Rate', f"{voice_analysis['speaking_rate']:.2f} words/sec", 'Average rate'],
['Filler Words', f"{voice_analysis['filler_ratio'] * 100:.1f}%", 'Percentage of total words'],
['Repetition Score', f"{voice_analysis['repetition_score']:.3f}", 'Lower is better articulation'],
['Anxiety Level', voice_analysis['interpretation']['anxiety_level'].upper(),
f"Score: {voice_analysis['composite_scores']['anxiety']:.3f}"],
['Confidence Level', voice_analysis['interpretation']['confidence_level'].upper(),
f"Score: {voice_analysis['composite_scores']['confidence']:.3f}"],
['Fluency', voice_analysis['interpretation']['fluency_level'].upper(), 'Overall speech flow']
]
table_style = TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#6699CC')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('BOTTOMPADDING', (0, 0), (-1, 0), 10),
('BACKGROUND', (0, 1), (-1, -1), colors.HexColor('#EFEFEF')),
('GRID', (0, 0), (-1, -1), 0.5, colors.HexColor('#CCCCCC')),
('LEFTPADDING', (0, 0), (-1, -1), 6),
('RIGHTPADDING', (0, 0), (-1, -1), 6),
('TOPPADDING', (0, 0), (-1, -1), 6),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
])
table = Table(table_data)
table.setStyle(table_style)
story.append(table)
story.append(Spacer(1, 0.2 * inch))
# Detailed Interpretation from Gemini (if present)
if 'Voice Analysis Insights' in sections:
story.append(Paragraph("Detailed Interpretation:", h3))
for line in sections['Voice Analysis Insights']:
if line.strip():
# Handle numbered lists from Gemini
if re.match(r'^\d+\.\s', line.strip()):
story.append(
Paragraph(line.strip(), bullet_style))
else:
story.append(Paragraph(line.strip(), body_text))
story.append(Spacer(1, 0.2 * inch))
else:
story.append(Paragraph("Voice analysis not available or encountered an error.", body_text))
story.append(Spacer(1, 0.3 * inch))
# 3. Content Analysis
story.append(Paragraph("3. Content Analysis", h2))
if 'Content Analysis & Strengths/Areas for Development' in sections:
for line in sections['Content Analysis & Strengths/Areas for Development']:
if line.strip():
if line.strip().startswith('-'):
story.append(Paragraph(line.strip(), bullet_style))
else:
story.append(Paragraph(line.strip(), body_text))
story.append(Spacer(1, 0.2 * inch))
# Add some interviewee responses to the report (can be formatted as a list)
story.append(Paragraph("Key Interviewee Responses (Contextual):", h3))
interviewee_responses = [
f"Speaker {u['speaker']} ({u['role']}): {u['text']}"
for u in analysis_data['transcript']
if u['role'] == 'Interviewee'
][:5]
for res in interviewee_responses:
story.append(Paragraph(res, bullet_style))
story.append(Spacer(1, 0.3 * inch))
# 4. Recommendations
story.append(Paragraph("4. Recommendations", h2))
if 'Actionable Recommendations' in sections:
for line in sections['Actionable Recommendations']:
if line.strip():
if line.strip().startswith('-'):
story.append(Paragraph(line.strip(), bullet_style))
else:
story.append(Paragraph(line.strip(), body_text))
story.append(Spacer(1, 0.2 * inch))
# Footer Text
story.append(Spacer(1, 0.5 * inch))
story.append(Paragraph("--- Analysis by EvalBot ---", ParagraphStyle(
name='FooterText', parent=styles['Normal'], fontSize=8, alignment=1, textColor=colors.HexColor('#666666')
)))
doc.build(story)
return True
except Exception as e:
logger.error(f"PDF creation failed: {str(e)}", exc_info=True)
return False
def convert_to_serializable(obj):
if isinstance(obj, np.generic):
return obj.item()
elif isinstance(obj, dict):
return {key: convert_to_serializable(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [convert_to_serializable(item) for item in obj]
elif isinstance(obj, np.ndarray):
return obj.tolist()
return obj
def process_interview(audio_path: str):
try:
logger.info(f"Starting processing for {audio_path}")
wav_file = convert_to_wav(audio_path)
logger.info("Starting transcription")
transcript = transcribe(wav_file)
logger.info("Extracting prosodic features")
for utterance in transcript['utterances']:
utterance['prosodic_features'] = extract_prosodic_features(
wav_file,
utterance['start'],
utterance['end']
)
logger.info("Identifying speakers")
utterances_with_speakers = identify_speakers(transcript, wav_file)
logger.info("Classifying roles")
# Ensure role classifier models are loaded/trained only once if possible,
# or handled carefully in a multi-threaded context.
# For simplicity, keeping it inside process_interview for now.
if os.path.exists(os.path.join(OUTPUT_DIR, 'role_classifier.pkl')):
clf = joblib.load(os.path.join(OUTPUT_DIR, 'role_classifier.pkl'))
vectorizer = joblib.load(os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl'))
scaler = joblib.load(os.path.join(OUTPUT_DIR, 'feature_scaler.pkl'))
else:
clf, vectorizer, scaler = train_role_classifier(utterances_with_speakers)
classified_utterances = classify_roles(utterances_with_speakers, clf, vectorizer, scaler)
logger.info("Analyzing interviewee voice")
voice_analysis = analyze_interviewee_voice(wav_file, classified_utterances)
analysis_data = {
'transcript': classified_utterances,
'speakers': list(set(u['speaker'] for u in classified_utterances)),
'voice_analysis': voice_analysis,
'text_analysis': {
'total_duration': sum(u['prosodic_features']['duration'] for u in classified_utterances),
'speaker_turns': len(classified_utterances)
}
}
# --- Calculate Acceptance Probability ---
acceptance_probability = calculate_acceptance_probability(analysis_data)
analysis_data['acceptance_probability'] = acceptance_probability
# --- End Acceptance Probability ---
logger.info("Generating report text using Gemini")
gemini_report_text = generate_report(analysis_data)
base_name = os.path.splitext(os.path.basename(audio_path))[0]
pdf_path = os.path.join(OUTPUT_DIR, f"{base_name}_report.pdf")
create_pdf_report(analysis_data, pdf_path, gemini_report_text=gemini_report_text)
json_path = os.path.join(OUTPUT_DIR, f"{base_name}_analysis.json")
with open(json_path, 'w') as f:
serializable_data = convert_to_serializable(analysis_data)
json.dump(serializable_data, f, indent=2)
os.remove(wav_file) # Clean up WAV file after processing
logger.info(f"Processing completed for {audio_path}")
return {
'pdf_path': pdf_path,
'json_path': json_path
}
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
# Clean up wav_file in case of error
if 'wav_file' in locals() and os.path.exists(wav_file):
os.remove(wav_file)
raise