nono / process_interview.py
norhan12's picture
Update process_interview.py
2366713 verified
import os
import torch
import numpy as np
import uuid
import requests
import time
import json
from pydub import AudioSegment
import wave
import pycrfsuite
from pyannote.audio import Pipeline
from nemo.collections.asr.models import EncDecSpeakerLabelModel
from pinecone import Pinecone, ServerlessSpec
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
import re
from typing import Dict, List, Tuple
import logging
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
from reportlab.lib import colors
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
from reportlab.platypus import Image
import io
from transformers import AutoTokenizer, AutoModel
import spacy
import google.generativeai as genai
import joblib
from concurrent.futures import ThreadPoolExecutor
import librosa # Needed for voice analysis and prosodic features
# Try importing TextBlob, with fallback
try:
from textblob import TextBlob
except ImportError:
TextBlob = None
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logging.getLogger("nemo_logging").setLevel(logging.ERROR)
logging.getLogger("nemo").setLevel(logging.ERROR)
# Configuration
AUDIO_DIR = "./Uploads"
OUTPUT_DIR = "./processed_audio"
os.makedirs(OUTPUT_DIR, exist_ok=True)
# API Keys
PINECONE_KEY = os.getenv("PINECONE_KEY")
ASSEMBLYAI_KEY = os.getenv("ASSEMBLYAI_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
HF_TOKEN = os.getenv("HF_TOKEN") # Hugging Face token for pyannote
# Global model cache
_speaker_model = None
_nlp = None
_tokenizer = None
_llm_model = None
# Initialize services
def initialize_services():
try:
pc = Pinecone(api_key=PINECONE_KEY)
index_name = "interview-speaker-embeddings"
if index_name not in pc.list_indexes().names():
pc.create_index(
name=index_name,
dimension=192,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
index = pc.Index(index_name)
genai.configure(api_key=GEMINI_API_KEY)
gemini_model = genai.GenerativeModel('gemini-1.5-flash')
return index, gemini_model
except Exception as e:
logger.error(f"Error initializing services: {str(e)}")
raise
index, gemini_model = initialize_services()
# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {device}")
def load_speaker_model():
try:
torch.set_num_threads(5)
model = EncDecSpeakerLabelModel.from_pretrained(
"nvidia/speakerverification_en_titanet_large",
map_location=device
)
model.eval()
return model
except Exception as e:
logger.error(f"Model loading failed: {str(e)}")
raise RuntimeError("Could not load speaker verification model")
# Load ML models with caching
def load_models():
global _speaker_model, _nlp, _tokenizer, _llm_model
try:
if _speaker_model is None:
_speaker_model = load_speaker_model()
if _nlp is None:
_nlp = spacy.load("en_core_web_sm")
if _tokenizer is None:
_tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
if _llm_model is None:
_llm_model = AutoModel.from_pretrained("distilbert-base-uncased").to(device)
_llm_model.eval()
return _speaker_model, _nlp, _tokenizer, _llm_model
except Exception as e:
logger.error(f"Model loading failed: {str(e)}")
raise
speaker_model, nlp, tokenizer, llm_model = load_models()
# Audio processing functions
def convert_to_wav(audio_path: str, output_dir: str = OUTPUT_DIR) -> str:
"""Convert audio file to WAV format using pydub.
Args:
audio_path (str): Path to the input audio file.
output_dir (str): Directory to store the output WAV file.
Returns:
str: Path to the converted WAV file.
Raises:
ValueError: If the input audio file is invalid.
Exception: For other conversion errors.
"""
try:
# Validate input file
if not os.path.exists(audio_path):
logger.error(f"Input audio file {audio_path} does not exist")
raise ValueError(f"Audio file {audio_path} does not exist")
if os.path.getsize(audio_path) == 0:
logger.error(f"Input audio file {audio_path} is empty")
raise ValueError(f"Audio file {audio_path} is empty")
# Load and process audio with pydub
audio = AudioSegment.from_file(audio_path)
logger.info(f"Input audio: {audio_path}, duration: {len(audio)/1000:.2f}s, channels: {audio.channels}")
audio = audio.set_channels(1).set_frame_rate(16000)
audio = audio.normalize()
wav_file = os.path.join(output_dir, f"{uuid.uuid4()}.wav")
audio.export(wav_file, format="wav")
logger.info(f"Successfully converted {audio_path} to {wav_file}")
return wav_file
except Exception as e:
logger.error(f"Audio conversion failed for {audio_path}: {str(e)}")
raise
def extract_prosodic_features(audio_path: str, start_ms: int, end_ms: int) -> Dict:
"""Extract prosodic features from an audio segment.
Args:
audio_path (str): Path to the audio file.
start_ms (int): Start time in milliseconds.
end_ms (int): End time in milliseconds.
Returns:
Dict: Dictionary of prosodic features.
"""
temp_path = None
try:
if start_ms >= end_ms or end_ms <= 0:
logger.warning("Invalid audio segment times, returning default features")
return {
'duration': 0.0, 'mean_pitch': 0.0, 'min_pitch': 0.0, 'max_pitch': 0.0, 'pitch_sd': 0.0,
'intensityMean': 0.0, 'intensityMin': 0.0, 'intensityMax': 0.0, 'intensitySD': 0.0,
'silence_ratio': 0.0
}
audio = AudioSegment.from_file(audio_path)
segment = audio[start_ms:end_ms]
temp_path = os.path.join(OUTPUT_DIR, f"temp_{uuid.uuid4()}.wav")
segment.export(temp_path, format="wav")
y, sr = librosa.load(temp_path, sr=16000)
pitches = librosa.piptrack(y=y, sr=sr)[1]
pitches = pitches[pitches > 0]
rms = librosa.feature.rms(y=y)[0]
silence_threshold = np.mean(rms) * 0.1
silence = rms < silence_threshold
silence_ratio = np.sum(silence) / len(rms) if len(rms) > 0 else 0.0
features = {
'duration': (end_ms - start_ms) / 1000,
'mean_pitch': float(np.mean(pitches)) if len(pitches) > 0 else 0.0,
'min_pitch': float(np.min(pitches)) if len(pitches) > 0 else 0.0,
'max_pitch': float(np.max(pitches)) if len(pitches) > 0 else 0.0,
'pitch_sd': float(np.std(pitches)) if len(pitches) > 0 else 0.0,
'intensityMean': float(np.mean(rms)),
'intensityMin': float(np.min(rms)),
'intensityMax': float(np.max(rms)),
'intensitySD': float(np.std(rms)),
'silence_ratio': float(silence_ratio)
}
os.remove(temp_path)
return features
except Exception as e:
logger.error(f"Feature extraction failed: {str(e)}")
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
return {
'duration': 0.0, 'mean_pitch': 0.0, 'min_pitch': 0.0, 'max_pitch': 0.0, 'pitch_sd': 0.0,
'intensityMean': 0.0, 'intensityMin': 0.0, 'intensityMax': 0.0, 'intensitySD': 0.0,
'silence_ratio': 0.0
}
def transcribe(audio_path: str) -> Dict:
"""Transcribe audio using AssemblyAI API.
Args:
audio_path (str): Path to the audio file.
Returns:
Dict: Transcription result.
"""
try:
with open(audio_path, 'rb') as f:
upload_response = requests.post(
"https://api.assemblyai.com/v2/upload",
headers={"authorization": ASSEMBLYAI_KEY},
data=f
)
audio_url = upload_response.json()['upload_url']
transcript_response = requests.post(
"https://api.assemblyai.com/v2/transcript",
headers={"authorization": ASSEMBLYAI_KEY},
json={
"audio_url": audio_url,
"speaker_labels": True,
"filter_profanity": True
}
)
transcript_id = transcript_response.json()['id']
while True:
result = requests.get(
f"https://api.assemblyai.com/v2/transcript/{transcript_id}",
headers={"authorization": ASSEMBLYAI_KEY}
).json()
if result['status'] == 'completed':
return result
elif result['status'] == 'error':
raise Exception(result['error'])
time.sleep(5)
except Exception as e:
logger.error(f"Transcription failed: {str(e)}")
raise
def enhance_diarization(wav_file: str, transcript: Dict) -> List[Dict]:
"""Enhance speaker diarization using pyannote.
Args:
wav_file (str): Path to the WAV file.
transcript (Dict): Transcription result.
Returns:
List[Dict]: Enhanced utterances with speaker IDs.
"""
try:
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=HF_TOKEN)
diarization = pipeline(wav_file)
utterances = transcript.get('utterances', [])
for u in utterances:
start = u['start'] / 1000
end = u['end'] / 1000
speaker = None
max_overlap = 0
for turn, _, spk in diarization.itertracks(yield_label=True):
overlap = min(end, turn.end) - max(start, turn.start)
if overlap > max_overlap:
max_overlap = overlap
speaker = spk
u['speaker_id'] = speaker if speaker else u.get('speaker_id', 'unknown')
return utterances
except Exception as e:
logger.warning(f"Pyannote diarization failed: {str(e)}")
return transcript.get('utterances', [])
def process_utterance(utterance, full_audio, wav_file):
"""Process a single utterance to extract speaker embedding and ID.
Args:
utterance (Dict): Utterance data.
full_audio (AudioSegment): Full audio segment.
wav_file (str): Path to the WAV file.
Returns:
Dict: Processed utterance with speaker info.
"""
temp_path = None
try:
start = utterance['start']
end = utterance['end']
if start >= end or end <= 0:
logger.warning(f"Invalid utterance times: start={start}, end={end}")
return {
**utterance,
'speaker': 'Unknown',
'speaker_id': 'unknown',
'embedding': None
}
segment = full_audio[start:end]
temp_path = os.path.join(OUTPUT_DIR, f"temp_{uuid.uuid4()}.wav")
segment.export(temp_path, format="wav")
with torch.no_grad():
embedding = speaker_model.get_embedding(temp_path).cpu().numpy()
embedding_list = embedding.flatten().tolist()
query_result = index.query(
vector=embedding_list,
top_k=1,
include_metadata=True
)
if query_result['matches'] and query_result['matches'][0]['score'] > 0.7:
speaker_id = query_result['matches'][0]['id']
speaker_name = query_result['matches'][0]['metadata']['speaker_name']
else:
speaker_id = f"unknown_{uuid.uuid4().hex[:6]}"
speaker_name = f"Speaker_{speaker_id[-4:]}"
index.upsert([(speaker_id, embedding_list, {"speaker_name": speaker_name})])
os.remove(temp_path)
return {
**utterance,
'speaker': speaker_name,
'speaker_id': speaker_id,
'embedding': embedding_list
}
except Exception as e:
logger.error(f"Utterance processing failed: {str(e)}")
if temp_path and os.path.exists(temp_path):
os.remove(temp_path)
return {
**utterance,
'speaker': 'Unknown',
'speaker_id': 'unknown',
'embedding': None
}
def identify_speakers(transcript: Dict, wav_file: str) -> List[Dict]:
"""Identify speakers in the transcript.
Args:
transcript (Dict): Transcription result.
wav_file (str): Path to the WAV file.
Returns:
List[Dict]: Utterances with speaker information.
"""
try:
full_audio = AudioSegment.from_wav(wav_file)
utterances = transcript.get('utterances', [])
if not utterances:
logger.error("No utterances found in transcript")
raise ValueError("Empty transcript")
utterances = enhance_diarization(wav_file, transcript)
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(process_utterance, utterance, full_audio, wav_file)
for utterance in utterances
]
results = [f.result() for f in futures]
return results
except Exception as e:
logger.error(f"Speaker identification failed: {str(e)}")
raise
def get_sentiment_score(text: str) -> float:
"""Calculate sentiment polarity using TextBlob if available, else return neutral score.
Args:
text (str): Text to analyze.
Returns:
float: Sentiment score.
"""
if TextBlob is None:
logger.warning("TextBlob not installed. Returning neutral sentiment score (0.0).")
return 0.0
try:
blob = TextBlob(text)
return blob.sentiment.polarity
except Exception as e:
logger.warning(f"Sentiment analysis failed for text '{text}': {str(e)}")
return 0.0
def get_text_embedding(text: str) -> List[float]:
"""Extract semantic embeddings using DistilBERT.
Args:
text (str): Text to embed.
Returns:
List[float]: Text embedding.
"""
try:
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128).to(device)
with torch.no_grad():
outputs = llm_model(**inputs)
return outputs.last_hidden_state.mean(dim=1).cpu().numpy().flatten().tolist()
except Exception as e:
logger.warning(f"Text embedding failed for '{text}': {str(e)}")
return [0.0] * 768 # DistilBERT dimension
def group_speakers_by_role(utterances: List[Dict]) -> Dict[str, str]:
"""Group speakers by their dominant role.
Args:
utterances (List[Dict]): List of utterances.
Returns:
Dict[str, str]: Mapping of speaker IDs to roles.
"""
try:
speaker_roles = {}
open_question_starters = [
'tell me', 'describe', 'walk me through', 'explain', 'share',
'give me', 'talk about', 'discuss'
]
exploratory_question_indicators = [
'can i ask', 'what about', 'could you tell', 'is it', 'are there',
'what’s the', 'how does', 'may i'
]
for speaker_id in set(u['speaker_id'] for u in utterances):
speaker_utterances = [u for u in utterances if u['speaker_id'] == speaker_id]
interviewer_count = sum(1 for u in speaker_utterances if (
u['text'].endswith('?') and not any(u['text'].lower().startswith(ind) for ind in exploratory_question_indicators) or
any(u['text'].lower().startswith(s) for s in open_question_starters)
))
speaker_roles[speaker_id] = 'Interviewer' if interviewer_count / len(speaker_utterances) > 0.5 else 'Interviewee'
return speaker_roles
except Exception as e:
logger.error(f"Speaker grouping failed: {str(e)}")
return {}
def train_role_classifier(utterances: List[Dict], use_crf: bool = True) -> Tuple:
"""Train a role classifier, using CRF if specified, else RandomForest.
Args:
utterances (List[Dict]): List of utterances.
use_crf (bool): Whether to use CRF or RandomForest.
Returns:
Tuple: Classifier, vectorizer, scaler (or None for CRF).
"""
try:
if use_crf:
trainer = pycrfsuite.Trainer(verbose=False)
sequence = []
labels = []
open_question_starters = [
'tell me', 'describe', 'walk me through', 'explain', 'share',
'give me', 'talk about', 'discuss'
]
exploratory_question_indicators = [
'can i ask', 'what about', 'could you tell', 'is it', 'are there',
'what’s the', 'how does', 'may i'
]
for i, u in enumerate(utterances):
text = u.get('text', '').lower().strip()
if not text:
continue
prosodic = u.get('prosodic_features', {})
feat = {
'text': text,
'is_question': str(text.endswith('?')),
'word_count': str(len(text.split())),
'starts_with_tell': str(any(text.startswith(s) for s in open_question_starters)),
'prev_is_question': str(utterances[i-1]['text'].endswith('?')) if i > 0 else 'False',
'sentiment': str(get_sentiment_score(text)),
'silence_ratio': str(prosodic.get('silence_ratio', 0.0))
}
sequence.append(feat)
is_interviewer = (
(text.endswith('?') and not any(text.startswith(ind) for ind in exploratory_question_indicators)) or
any(text.startswith(s) for s in open_question_starters) or
(i < 2 and len(text.split()) < 10) or
(i > 0 and utterances[i-1]['text'].endswith('?') and len(text.split()) < 5 and text.endswith('?'))
)
labels.append('Interviewer' if is_interviewer else 'Interviewee')
if i == len(utterances) - 1 or u['speaker_id'] != utterances[i+1]['speaker_id']:
trainer.append(sequence, labels)
sequence = []
labels = []
trainer.train(os.path.join(OUTPUT_DIR, 'role_crf.model'))
return None, None, None # CRF doesn't use clf, vectorizer, scaler
else:
texts = [u['text'] for u in utterances if u.get('text', '').strip()]
if not texts:
logger.error("No valid texts found for role classifier training")
raise ValueError("Empty text data for training")
vectorizer = TfidfVectorizer(max_features=500, ngram_range=(1, 2))
X_text = vectorizer.fit_transform(texts)
features = []
labels = []
open_question_starters = [
'tell me', 'describe', 'walk me through', 'explain', 'share',
'give me', 'talk about', 'discuss'
]
exploratory_question_indicators = [
'can i ask', 'what about', 'could you tell', 'is it', 'are there',
'what’s the', 'how does', 'may i'
]
for i, utterance in enumerate(utterances):
text = utterance.get('text', '').lower().strip()
if not text:
continue
prosodic = utterance.get('prosodic_features', {})
word_count = len(text.split())
is_question = text.endswith('?')
prev_is_question = utterances[i-1]['text'].endswith('?') if i > 0 else False
speaker_frequency = sum(1 for u in utterances[:i+1] if u.get('speaker_id') == utterance.get('speaker_id')) / (i+1 or 1)
sentiment_score = get_sentiment_score(text)
feat = [
prosodic.get('duration', 0.0),
prosodic.get('mean_pitch', 0.0),
prosodic.get('min_pitch', 0.0),
prosodic.get('max_pitch', 0.0),
prosodic.get('pitch_sd', 0.0),
prosodic.get('intensityMean', 0.0),
prosodic.get('intensityMin', 0.0),
prosodic.get('intensityMax', 0.0),
prosodic.get('intensitySD', 0.0),
prosodic.get('silence_ratio', 0.0),
word_count,
speaker_frequency,
int(prev_is_question),
sentiment_score
]
feat.extend(X_text[i].toarray()[0].tolist())
feat.extend(get_text_embedding(text))
doc = nlp(text)
feat.extend([
int(is_question),
len(re.findall(r'\b(why|how|what|when|where|who|which)\b', text)),
sum(1 for token in doc if token.pos_ == 'VERB'),
sum(1 for token in doc if token.pos_ == 'NOUN'),
int(i < 2),
int(any(text.startswith(starter) for starter in open_question_starters)),
int(any(text.startswith(ind) for ind in exploratory_question_indicators)),
prosodic.get('duration', 0.0) / word_count if word_count > 0 else 0.0,
1 if utterance.get('initial_role', 'Interviewee') == 'Interviewer' else 0
])
is_interviewer = (
(is_question and not any(text.startswith(ind) for ind in exploratory_question_indicators)) or
any(text.startswith(starter) for starter in open_question_starters) or
(i < 2 and word_count < 10) or
(prev_is_question and word_count < 5 and is_question)
)
labels.append(0 if is_interviewer else 1) # 0: Interviewer, 1: Interviewee
features.append(feat)
if not features or not labels:
logger.error("No features or labels generated for training")
raise ValueError("No valid training data")
scaler = StandardScaler()
X = scaler.fit_transform(features)
clf = RandomForestClassifier(
n_estimators=150,
max_depth=10,
random_state=42,
class_weight='balanced'
)
clf.fit(X, labels)
joblib.dump(clf, os.path.join(OUTPUT_DIR, 'role_classifier.pkl'))
joblib.dump(vectorizer, os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl'))
joblib.dump(scaler, os.path.join(OUTPUT_DIR, 'feature_scaler.pkl'))
return clf, vectorizer, scaler
except Exception as e:
logger.error(f"Classifier training failed: {str(e)}")
raise
def enforce_conversation_flow(results: List[Dict]) -> List[Dict]:
"""Enforce logical conversation flow for role assignments.
Args:
results (List[Dict]): List of utterances with roles.
Returns:
List[Dict]: Updated utterances.
"""
try:
for i in range(1, len(results)):
if results[i-1]['role'] == 'Interviewer' and results[i]['text'].endswith('?'):
results[i]['role'] = 'Interviewer'
elif results[i-1]['role'] == 'Interviewer' and not results[i]['text'].endswith('?'):
results[i]['role'] = 'Interviewee'
return results
except Exception as e:
logger.error(f"Conversation flow enforcement failed: {str(e)}")
return results
def classify_roles(utterances: List[Dict], clf=None, vectorizer=None, scaler=None) -> List[Dict]:
"""Classify roles using CRF if available, else RandomForest.
Args:
utterances (List[Dict]): List of utterances.
clf: Classifier model.
vectorizer: Text vectorizer.
scaler: Feature scaler.
Returns:
List[Dict]: Utterances with roles.
"""
try:
if os.path.exists(os.path.join(OUTPUT_DIR, 'role_crf.model')):
tagger = pycrfsuite.Tagger()
tagger.open(os.path.join(OUTPUT_DIR, 'role_crf.model'))
sequence = []
open_question_starters = [
'tell me', 'describe', 'walk me through', 'explain', 'share',
'give me', 'talk about', 'discuss'
]
exploratory_question_indicators = [
'can i ask', 'what about', 'could you tell', 'is it', 'are there',
'what’s the', 'how does', 'may i' # Fixed: 'what’s' -> 'what’s the'
]
for i, u in enumerate(utterances):
text = u.get('text', '').lower().strip()
if not text:
sequence.append({'text': '', 'role': 'Unknown'})
continue
prosodic = u.get('prosodic_features', {})
feat = {
'text': text,
'is_question': str(text.endswith('?')),
'word_count': str(len(text.split())),
'starts_with_tell': str(any(text.startswith(s) for s in open_question_starters)),
'prev_is_question': str(utterances[i-1]['text'].endswith('?')) if i > 0 else 'False',
'sentiment': str(get_sentiment_score(text)),
'silence_ratio': str(prosodic.get('silence_ratio', 0.0))
}
sequence.append(feat)
roles = tagger.tag(sequence)
results = [{**u, 'role': role} for u, role in zip(utterances, roles)]
else:
texts = [u['text'] for u in utterances if u.get('text', '').strip()]
if not texts:
logger.error("No valid texts found for role classification")
raise ValueError("Empty text data for classification")
X_text = vectorizer.transform(texts)
open_question_starters = [
'tell me', 'describe', 'walk me through', 'explain', 'share',
'give me', 'talk about', 'discuss'
]
exploratory_question_indicators = [
'can i ask', 'what about', 'could you tell', 'is it', 'are there',
'what’s the', 'how does', 'may i' # Fixed: 'what’s' -> 'what’s the'
]
results = []
for i, utterance in enumerate(utterances):
text = utterance.get('text', '').lower().strip()
if not text:
results.append({**utterance, 'role': 'Unknown'})
continue
prosodic = utterance.get('prosodic_features', {})
word_count = len(text.split())
is_question = text.endswith('?')
prev_is_question = utterances[i-1]['text'].endswith('?') if i > 0 else False
speaker_frequency = sum(1 for u in utterances[:i+1] if u.get('speaker_id') == utterance.get('speaker_id')) / (i + 1 or 1)
sentiment_score = get_sentiment_score(text)
feat = [
prosodic.get('duration', 0.0),
prosodic.get('mean_pitch', 0.0),
prosodic.get('min_pitch', 0.0),
prosodic.get('max_pitch', 0.0),
prosodic.get('pitch_sd', 0.0),
prosodic.get('intensityMean', 0.0),
prosodic.get('intensityMin', 0.0),
prosodic.get('intensityMax', 0.0),
prosodic.get('intensitySD', 0.0),
prosodic.get('silence_ratio', 0.0),
word_count,
speaker_frequency,
int(prev_is_question),
sentiment_score
]
feat.extend(X_text[i].toarray()[0].tolist())
feat.extend(get_text_embedding(text))
doc = nlp(text)
feat.extend([
int(is_question),
len(re.findall(r'\b(why|how|what|when|where|who|which)\b', text)),
sum(1 for token in doc if token.pos_ == 'VERB'),
sum(1 for token in doc if token.pos_ == 'NOUN'),
int(i < 2),
int(any(text.startswith(starter) for starter in open_question_starters)),
int(any(text.startswith(ind) for ind in exploratory_question_indicators)),
prosodic.get('duration', 0.0) / word_count if word_count > 0 else 0.0,
1 if utterance.get('initial_role', '') == 'Interviewer' else 0
])
X = scaler.transform([feat])
role = 'Interviewer' if clf.predict(X)[0] == 0 else 'Interviewee'
prob = clf.predict_proba(X)[0]
logger.debug(f"Utterance {i}: Text='{text[:20]}...', Role={role}, Prob={prob}")
results.append({**utterance, 'role': role})
results = enforce_conversation_flow(results)
if all(r['role'] == 'Interviewer' for r in results):
logger.warning("No Interviewee detected. Applying enhanced reasoning.")
candidates = [
i for i, r in enumerate(results)
if (
not r['text'].endswith('?') or
any(r['text'].lower().startswith(ind) for ind in exploratory_question_indicators)
) and
get_sentiment_score(r.get('text', '')) >= 0 and
re.search(r'\b(i|my|me)\b', r['text'].lower())
]
if candidates:
max_duration_idx = max(candidates, key=lambda x: results[x]['prosodic_features']['duration'])
results[max_duration_idx]['role'] = 'Interviewee'
else:
logger.warning("No suitable Interviewee candidate found. Using longest utterance.")
max_duration_idx = max(range(len(results)), key=lambda x: results[x]['prosodic_features']['duration'])
results[max_duration_idx]['role'] = 'Interviewee'
return results
except Exception as e:
logger.error(f"Role classification failed: {str(e)}")
raise
def analyze_interviewee(audio_path: str, utterances: List[Dict]) -> Dict:
"""Analyze interviewee voice characteristics.
Args:
audio_path (str): Path to the audio file.
utterances: List[Dict]): List of utterance data.
"""
try:
y, sr = librosa.load(audio_path, sr=16000)
interviewee_utterances = [u for u in utterances if u.get('role') == 'Interviewee']
if not interviewee_utterances:
logger.warning("No interviewee utterances found for voice analysis.")
return {'error': 'No interviewee utterances found'}
segments = []
for u in interviewee_utterances:
start = int(u['start'] * sr / 1000)
end = int(u['end'] * sr / 1000)
if start < end and end <= len(y):
segments.append(y[start:end])
if not segments:
logger.warning("No valid audio segments found for interviewee voice analysis.")
return {'error': 'No valid audio segments found'}
combined_audio = np.concatenate(segments)
total_duration = sum(u['prosodic_features']['duration'] for u in interviewee_utterances if u.get('prosodic_features', {}).get('duration', 0))
total_words = sum(len(u['text'].split()) for u in interviewee_utterances if u.get('text', ''))
speaking_rate = total_words / total_duration if total_duration > 0 else 0
filler_words = ['um', 'uh', 'like', 'you know', 'so', 'i mean']
filler_count = sum(
sum(u['text'].lower().count(fw) for fw in filler_words)
for u in interviewee_utterances
)
filler_ratio = filler_count / total_words if total_words > 0 else 0
all_words = ' '.join(u['text'].lower() for u in interviewee_utterances if u.get('text')).split()
word_pairs = {}
for i, u in range(len(all_words) - 1):
bigram_pair = (all_words[i], all_words[i + 1])
word_pairs[bigram] += word_pairs.get(bigram_pair, 0) + 1
repetition_score = sum(1 for count in word_pairs.values() if count > 1) / len(word_pairs) if word_pairs else 0
pitches = []
for segment in segments:
f0 = librosa.pyin(segment, fmin=80, fmax=300, sr=sr)[0]
pitches.extend(f0[~np.isnan(f0)])
pitch_mean = np.mean(pitches) if pitches else 0
pitch_std = np.std(pitches) if pitches else 0
jitter = np.mean(np.abs(np.diff(pitches))) / pitch_mean if pitch_mean > 0 and len(pitches) > 0 else 0
intensities = []
for segment in segments:
rms = librosa.feature.rms(y=segment)[0]
intensities.extend(rms)
intensity_mean = np.mean(intensities) if intensities else 0
intensity_std = np.std(intensities) if intensities else 0
shimmer = np.mean(np.abs(np.diff(intensities))) / intensity_mean if len(
intensities) > 1 and intensity_mean > 0 else 0
anxiety_score = 0.6 * (pitch_std / pitch_mean if pitch_mean > 0 else 0) + 0.4 * (jitter + shimmer)
confidence_score = 0.7 * (1 / (1 + intensity_std)) + 0.3 * (1 / (1 + filler_ratio))
hesitation_score = filler_ratio + repetition_score
anxiety_level = 'high' if anxiety_score > 0.7 else 'moderate' if anxiety_score > 0.3 else 'low'
confidence_level = 'high' if confidence_score > 0.7 else 'moderate' if confidence_score > 0.5 else 'low'
fluency_level = 'fluent' if (filler_ratio < 0.05 and repetition_score < 0.1) else 'moderate' if (filler_ratio < 0.1 and repetition_score < 0.2) else 'disfluent'
return {
'speaking_rate': float(round(speaking_rate, 2)),
'filler_ratio': float(round(filler_ratio, 4)),
'repetition_score': float(round(repetition_score, 4)),
'pitch_analysis': {
'mean': float(round(pitch_mean, 2)),
'variance': float(round(pitch_std, 2)),
'jitter': float(round(jitter, 4))
},
'intensity': {
'mean': float(round(intensity_mean, 2)),
'std_dev': float(round(intensity_std, 2)),
'shimmer': float(round(shimmer, 4))
},
'scores': {
'anxiety': float(round(anxiety_score, 4)),
'confidence': float(round(confidence_score, 4)),
'hesitation': float(round(hesitation_score, 4))
},
'levels': {
'anxiety': anxiety_level,
'confidence': confidence_level,
'fluency': fluency_level
}
}
except Exception as e:
logger.error(f"Voice analysis failed: {str(e)}")
return {'error': str(e)}
def generate_anxiety_confidence_chart(scores: Dict, output: io.BytesIO):
"""Generate a bar chart for anxiety and confidence scores.
Args:
scores (Dict): Scores dictionary.
output (io.BytesIO): Output buffer for the chart image.
"""
try:
labels = ['Anxiety', 'Confidence']
values = [scores.get('anxiety', 0.0), scores.get('confidence', 0.0)]
fig, ax = plt.subplots(figsize=(4, 2.5))
ax.bar(labels, values, color=['lightcoral', 'lightskyblue'])
ax.set_ylabel('Score')
ax.set_title('Anxiety vs. Confidence')
ax.set_ylim(0, 1.0)
for i, v in enumerate(values):
ax.text(i, v + 0.05, f"{v:.2f}", color='black', ha='center', fontweight='bold')
plt.tight_layout()
plt.savefig(output, format='png')
plt.close(fig)
output.seek(0)
except Exception as e:
logger.error(f"Error generating chart: {str(e)}")
raise
def generate_voice_interpretation(analysis: Dict) -> str:
"""Generate a textual interpretation of voice analysis.
Args:
analysis (Dict): Voice analysis results.
Returns:
str: Interpretation text.
"""
if 'error' in analysis:
return "Voice analysis not available."
interpretation_lines = [
"Voice Analysis Summary:",
f"- Speaking Rate: {analysis.get('speaking_rate', 0.0):.2f} words/sec",
f"- Filler Words: {analysis.get('filler_ratio', 0.0) * 100:.1f}% of total words",
f"- Repetition Score: {analysis.get('repetition_score', 0.0):.4f}",
f"- Anxiety Level: {analysis.get('levels', {}).get('anxiety', '').upper()} (score: {analysis.get('scores', {}).get('anxiety', 0.0):.4f})",
f"- Confidence Level: {analysis.get('levels', {}).get('confidence', '').upper()} (score: {analysis.get('scores', {}).get('confidence', 0.0):.4f})",
f"- Fluency Level: {analysis.get('levels', {}).get('fluency', '').upper()}",
"",
"Detailed Interpretation:",
"1. A higher speaking rate can indicate nervousness or enthusiasm.",
"2. Filler words and repetitions may impact speech clarity and professionalism.",
"3. Anxiety is measured through pitch variability and voice stability.",
"4. Confidence is assessed via vocal intensity and consistency.",
"5. Fluency combines filler word usage and speech rate."
]
return "\n".join(interpretation_lines)
def calculate_acceptance_probability(segments: Dict) -> float:
"""Calculate the probability of candidate acceptance.
Args:
segments (Dict): Analysis data.
Returns:
float: Acceptance probability as a percentage.
"""
voice = segments.get('voice', {})
if 'error' in voice:
return 0.0
weights = {
'confidence': 0.4,
'anxiety': -0.3,
'fluency': 0.2,
'speaking_rate': 0.1,
'filler_repetition': -0.1,
'content_strengths': 0.2
}
confidence_score = voice.get('scores', {}).get('confidence', 0.0)
anxiety_score = voice.get('scores', {}).get('anxiety', 0.0)
fluency_level = voice.get('levels', {}).get('fluency', 'disfluent')
speaking_rate = voice.get('speaking_rate', 0.0)
filler_ratio = voice.get('filler_ratio', 0.0)
repetition_score = voice.get('repetition_score', 0.0)
fluency_map = {'fluent': 1.0, 'moderate': 0.0, 'disfluent': 0.0}
fluency_score = fluency_map.get(fluency_level, 0.5)
ideal_speaking_rate = 2.0
rate_deviation = abs(speaking_rate - ideal_speaking_rate)
speaking_rate_score = max(0.0, 1.0 - (rate_deviation / ideal_speaking_rate))
filler_repetition_score = max(0.0, 1.0 - ((filler_ratio + repetition_score) / 2.0))
content_score = 0.0 if segments.get('content', {}).get('duration', 0) > 0 else 0.0
raw_score = (
confidence_score * weights['confidence'] +
(1.0 - anxiety_score) * abs(weights['anxiety']) +
fluency_score * weights['fluency'] +
speaking_rate * speaking_rate * speaking_rate_score * weights['speaking_rate'] +
filler_repetition_score * abs(weights['filler_repetition']) +
content_score * weights['content_strengths']
)
min_possible_score = 0.0
max_possible_score = sum(abs(w) for w in weights.values())
normalized_score = (raw_score - min_possible_score) / (max_possible_score - min_possible_score) if max_possible_score > 0 else 0.5
acceptance_score = max(0.0, min(1.0, normalized_score))
return float(round(acceptance_score * 100, 2))
def generate_report(content: Dict, segments: List[Dict]) -> str:
"""Generate a professional interview analysis report.
Args:
content (Dict): Analysis data.
segments (List[Dict]): List of utterance segments.
Returns:
str: Report text.
"""
try:
voice_analysis = content.get('voice', {})
voice_interpretation = generate_voice_interpretation(voice_analysis)
interviewee_utterances = [
f"Speaker {u['speaker_id']}: {u['role']}: {u.get('text', '')}"
for u in segments
if u.get('role') == 'Interviewee'
][:5]
acceptance_prob = calculate_acceptance_probability({
'voice': voice_analysis,
'content': content
})
acceptance_line = f"""
**Estimated Acceptance Probability: {acceptance_prob:.2f}%**
"""
if acceptance_prob >= 80:
acceptance_line += str("This indicates a very strong candidate.")
elif acceptance_prob >= 0.5:
acceptance_line += str("This indicates a solid candidate with room for improvement.")
else:
acceptance_line += str("This candidate may require significant improvement.")
total_duration = sum(
u.get('prosodic_features', {}).get('duration', 0)
for u in segments
)
speaker_turns = len([u for u in segments if u.get('text', '').strip()])
speakers = list(set(u['speaker_id'] for u in segments if u.get('speaker_id', '')))
prompt = f"""
As EvalBot, an AI interview analysis system, generate a professional report.
Use clear headings and bullet points with '-'.
{acceptance_line}
**1. Summary**
- Duration: {total_duration:.2f} seconds
- Speaker turns: {speaker_turns}
- Participants: {', '.join(speakers)}
**2. Voice Insights**
{voice_interpretation}
**3. Key Responses**
{chr(10).join(f"- {resp}" for resp in interviewee_utterances)}
**4. Recommendations**
- Focus on:
- Communication Skills
- - Content Delivery
- Professional Presentation
"""
response = gemini_model.generate_content(prompt)
return response.text
except Exception as e:
logger.error(f"Error generating report failed: {str(e)}")
return f"Error: {str(e)}"
def create_pdf_document(text: str, content: Dict, output_path: str) -> bool:
"""Create a PDF report from analysis data.
Args:
text (str): Report text to include.
content (Dict): Analysis data.
output_path (str): Path to save the PDF file.
Returns:
bool: True if PDF created successfully, False otherwise.
"""
try:
doc = SimpleDocTemplate(output_path, pagesize=letter)
styles = getSampleStyleSheet()
h1_style = ParagraphStyle(
name='Heading1',
parent=styles['Heading1'],
fontSize=16,
alignment=1,
textColor=colors.Color(0, 0.2, 0.4) # #003366
)
h2_style = ParagraphStyle(
name='Heading2',
parent=styles['Heading2'],
fontSize=12,
spaceBefore=10,
textColor=colors.Color(0,0.6) # #336699
)
h3_style = ParagraphStyle(
name='Heading3',
parent=styles['Heading3'],
fontSize=10,
spaceBefore=8,
textColor=colors.Color(0.266666666666666666666666666666666, 0.266666666666666666, 0.266666666666666666) # #444444444
)
body_style = ParagraphStyle(
name='BodyText',
parent=styles['Normal'],
fontSize=10,
leading=12
)
bullet_style = ParagraphStyle(
name='Bullet',
parent=styles['Normal'],
fontSize=10,
leftIndent=18,
bulletIndent=10
)
story = []
story.append(Paragraph("EvalBot Interview Analysis Report", h1_style))
story.append(Spacer(1, 0.2 * inch))
story.append(Paragraph(f"Date: {time.strftime('%Y-%m-%d')}", body_style))
story.append(Spacer(1, 0.3 * inch))
acceptance_prob = content.get('acceptance_probability' , 0.0)
if acceptance_prob > 0:
story.append(Paragraph("Candidate Evaluation", " h2_style"))
prob_color = 'green' if acceptance_prob >= 70 else 'orange' if acceptance_prob >= 40 else 'red'
story.append(Paragraph(
f"<font color={prob_color} color=><strong>Acceptance Probability: {acceptance_prob:.2f}%</strong></font>",
ParagraphStyle(
name='Prob',
fontSize=14,
alignment=1
)
))
story.append(Spacer(1, 0.2 * inch))
sections = {}
current_section = None
section_patterns = {
r'^\s*\*\*1.*Summary\*\*': 'Summary',
r'^\s*\*\*2.*Voice.*\*\*': 'Voice Insights',
r'^\s*\*\*3.*Content.*\*\*': 'Content & Strengths',
r'^\s*\*\*4.*Recommendations.*\*\*': 'Recommendations'
}
for line in text.split('\n'):
matched = False
for pattern, section_name in section_patterns.items():
if re.match(pattern, line):
current_section = section_name
sections[current_section_name] = []
matched = True
break
if not matched and current_section:
sections[current_section].append(line)
for section_num, (section_name, section_content) in enumerate(sections.items(), 1):
story.append(Paragraph(f"{section_num}. {section_name}", h2_style))
story.append(Spacer(1, 0.1 * inch))
for line in section_content:
if line.strip():
if line.strip().startswith('-'):
story.append(Paragraph(line.strip()[1:].strip(), bullet_style))
else:
story.append(Paragraph(line.strip(), body_style))
story.append(Spacer(1, 0.2 * inch))
voice_data = content.get('voice', {})
if voice_data and 'error' not in 'voice_data':
table_data = [
['Metric', 'Value', 'Interpretation'],
['Speaking Rate', f"{voice_data.get('speaking_rate', 0.0):.2f} words/sec", 'Average'],
['Filler Words', f"{voice_data.get('filler_ratio', 0.0):.0) * 100:.1f}%", ''],
['Repetition', f"{voice_data.get('repetition_score', 0.0):.4f}", ''],
['Anxiety', voice_data.get('levels', {}).get('anxiety', '').upper(), f"Score: {voice_data.get('scores', {}).get('anxiety', 0.0):.4f}"],
['Confidence', voice_data.get('levels', {}).get('confidence', '').upper(), f"Score: {voice_data.get('scores', {}).get('confidence', 0.0):.4f}"]
]
table = Table(table_data)
table.setStyle(TableStyle([
('BACKGROUND', (0,0), (-1,-0), 'grey'),
('TEXTCOLOR', (0,0), (-1,-0), colors.white),
('ALIGN', (0,0), (-1,-1), 'LEFT'),
('FONTNAME', (0,0), (-1,-0), 'Helvetica-Bold'),
('GRID', (0,0), (-1,-1), (-1, 0.5), colors.black),
('LEADING', (0,0), (-1,-1), (0, 12))
]))
story.append(table)
story.append(Spacer(1, 0.2 * inch))
chart_buffer = io.BytesIO()
try:
generate_anxiety_confidence_chart(voice_data.get('scores', {}), chart_buffer)
img = Image(chart_buffer, width=3 * inch, height=1.8 * inch)
story.append(img)
story.append(Spacer(1, 0.2 * inch))
except Exception as e:
logger.warning(f"Failed to generate chart image: {e}")
doc.build(story)
return True
except Exception as e:
logger.error(f"Failed to create PDF report: {str(e)}")
return False
def process_interview(audio_path: str) -> Dict:
"""Process an interview audio file.
Args:
audio_path (str): Path to the audio file.
Returns:
Dict: Analysis results with file paths to outputs.
"""
try:
logger.info(f"Starting processing for: {audio_path}")
wav_file = convert_to_wav(audio_path)
logger.info("Starting transcription process")
transcript = transcribe(wav_file)
logger.info("Extracting prosodic audio features")
utterances = transcript.get('utterances', [])
for utterance in utterances:
utterance['prosodic_features'] = extract_prosodic_features(
wav_file,
utterance['start'],
utterance['end']
)
logger.info("Identifying speakers in audio")
results = identify_speakers(transcript, wav_file)
logger.info("Grouping speakers by role assignment")
speaker_roles = group_speakers_by_role(results)
for result in results:
result['role'] = speaker_roles.get(result['speaker_id'], 'Unknown')
logger.info("Classifying speaker roles")
clf, vectorizer, scaler = train_role_classifier(results)
results = classify_roles(results, clf, vectorizer, scaler)
logger.info("Analyzing interviewee voice characteristics")
voice_analysis = analyze_interviewee(wav_file, results)
logger.info("Generating analysis report")
total_duration = sum(
u.get('prosodic_features', {}).get('duration', 0.0)
for u in results
)
speaker_turns = sum(1 for u in results if u.get('text', '').strip())
speakers = list(set(u['speaker_id'] for u in results if u.get('speaker_id', '')))
analysis_data = {
'transcript': results,
'voice': voice_analysis,
'text_analysis': {
'total_duration': float(total_duration),
'speaker_turns': speaker_turns,
'speakers': speakers
},
'acceptance_probability': calculate_acceptance_probability({
'voice': voice_analysis,
'content': {'duration': total_duration}
})
}
report_text = generate_report(analysis_data, results)
pdf_path = os.path.join(OUTPUT_DIR, f"report_{uuid.uuid4()}.pdf")
json_path = os.path.join(OUTPUT_DIR, f"analysis_{uuid.uuid4()}.json")
logger.info("Creating PDF analysis report")
if create_pdf_document(report_text, analysis_data, pdf_path):
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(analysis_data, f, indent=2)
return {
'success': True,
'message': 'Interview processed successfully',
'pdf_path': str(pdf_path),
'analysis_path': str(json_path)
}
else:
raise Exception("Failed to create PDF report")
except Exception as e:
logger.error(f"Processing failed for: {str(e)}")
if 'wav_file' in locals() and os.path.exists(wav_file):
os.remove(wav_file)
raise