|
|
import os |
|
|
import torch |
|
|
import numpy as np |
|
|
import uuid |
|
|
import requests |
|
|
import time |
|
|
import json |
|
|
from pydub import AudioSegment |
|
|
import wave |
|
|
import pycrfsuite |
|
|
from pyannote.audio import Pipeline |
|
|
from nemo.collections.asr.models import EncDecSpeakerLabelModel |
|
|
from pinecone import Pinecone, ServerlessSpec |
|
|
import pandas as pd |
|
|
from sklearn.ensemble import RandomForestClassifier |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
import re |
|
|
from typing import Dict, List, Tuple |
|
|
import logging |
|
|
from reportlab.lib.pagesizes import letter |
|
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle |
|
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
|
|
from reportlab.lib.units import inch |
|
|
from reportlab.lib import colors |
|
|
import matplotlib.pyplot as plt |
|
|
import matplotlib |
|
|
matplotlib.use('Agg') |
|
|
from reportlab.platypus import Image |
|
|
import io |
|
|
from transformers import AutoTokenizer, AutoModel |
|
|
import spacy |
|
|
import google.generativeai as genai |
|
|
import joblib |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import librosa |
|
|
|
|
|
|
|
|
try: |
|
|
from textblob import TextBlob |
|
|
except ImportError: |
|
|
TextBlob = None |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
logging.getLogger("nemo_logging").setLevel(logging.ERROR) |
|
|
logging.getLogger("nemo").setLevel(logging.ERROR) |
|
|
|
|
|
|
|
|
AUDIO_DIR = "./Uploads" |
|
|
OUTPUT_DIR = "./processed_audio" |
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
PINECONE_KEY = os.getenv("PINECONE_KEY") |
|
|
ASSEMBLYAI_KEY = os.getenv("ASSEMBLYAI_KEY") |
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
|
|
|
|
|
|
_speaker_model = None |
|
|
_nlp = None |
|
|
_tokenizer = None |
|
|
_llm_model = None |
|
|
|
|
|
|
|
|
def initialize_services(): |
|
|
try: |
|
|
pc = Pinecone(api_key=PINECONE_KEY) |
|
|
index_name = "interview-speaker-embeddings" |
|
|
if index_name not in pc.list_indexes().names(): |
|
|
pc.create_index( |
|
|
name=index_name, |
|
|
dimension=192, |
|
|
metric="cosine", |
|
|
spec=ServerlessSpec(cloud="aws", region="us-east-1") |
|
|
) |
|
|
index = pc.Index(index_name) |
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
gemini_model = genai.GenerativeModel('gemini-1.5-flash') |
|
|
return index, gemini_model |
|
|
except Exception as e: |
|
|
logger.error(f"Error initializing services: {str(e)}") |
|
|
raise |
|
|
|
|
|
index, gemini_model = initialize_services() |
|
|
|
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
logger.info(f"Using device: {device}") |
|
|
|
|
|
def load_speaker_model(): |
|
|
try: |
|
|
torch.set_num_threads(5) |
|
|
model = EncDecSpeakerLabelModel.from_pretrained( |
|
|
"nvidia/speakerverification_en_titanet_large", |
|
|
map_location=device |
|
|
) |
|
|
model.eval() |
|
|
return model |
|
|
except Exception as e: |
|
|
logger.error(f"Model loading failed: {str(e)}") |
|
|
raise RuntimeError("Could not load speaker verification model") |
|
|
|
|
|
|
|
|
def load_models(): |
|
|
global _speaker_model, _nlp, _tokenizer, _llm_model |
|
|
try: |
|
|
if _speaker_model is None: |
|
|
_speaker_model = load_speaker_model() |
|
|
if _nlp is None: |
|
|
_nlp = spacy.load("en_core_web_sm") |
|
|
if _tokenizer is None: |
|
|
_tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased") |
|
|
if _llm_model is None: |
|
|
_llm_model = AutoModel.from_pretrained("distilbert-base-uncased").to(device) |
|
|
_llm_model.eval() |
|
|
return _speaker_model, _nlp, _tokenizer, _llm_model |
|
|
except Exception as e: |
|
|
logger.error(f"Model loading failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
speaker_model, nlp, tokenizer, llm_model = load_models() |
|
|
|
|
|
|
|
|
def convert_to_wav(audio_path: str, output_dir: str = OUTPUT_DIR) -> str: |
|
|
"""Convert audio file to WAV format using pydub. |
|
|
|
|
|
Args: |
|
|
audio_path (str): Path to the input audio file. |
|
|
output_dir (str): Directory to store the output WAV file. |
|
|
|
|
|
Returns: |
|
|
str: Path to the converted WAV file. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the input audio file is invalid. |
|
|
Exception: For other conversion errors. |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not os.path.exists(audio_path): |
|
|
logger.error(f"Input audio file {audio_path} does not exist") |
|
|
raise ValueError(f"Audio file {audio_path} does not exist") |
|
|
if os.path.getsize(audio_path) == 0: |
|
|
logger.error(f"Input audio file {audio_path} is empty") |
|
|
raise ValueError(f"Audio file {audio_path} is empty") |
|
|
|
|
|
|
|
|
audio = AudioSegment.from_file(audio_path) |
|
|
logger.info(f"Input audio: {audio_path}, duration: {len(audio)/1000:.2f}s, channels: {audio.channels}") |
|
|
audio = audio.set_channels(1).set_frame_rate(16000) |
|
|
audio = audio.normalize() |
|
|
wav_file = os.path.join(output_dir, f"{uuid.uuid4()}.wav") |
|
|
audio.export(wav_file, format="wav") |
|
|
logger.info(f"Successfully converted {audio_path} to {wav_file}") |
|
|
return wav_file |
|
|
except Exception as e: |
|
|
logger.error(f"Audio conversion failed for {audio_path}: {str(e)}") |
|
|
raise |
|
|
|
|
|
def extract_prosodic_features(audio_path: str, start_ms: int, end_ms: int) -> Dict: |
|
|
"""Extract prosodic features from an audio segment. |
|
|
|
|
|
Args: |
|
|
audio_path (str): Path to the audio file. |
|
|
start_ms (int): Start time in milliseconds. |
|
|
end_ms (int): End time in milliseconds. |
|
|
|
|
|
Returns: |
|
|
Dict: Dictionary of prosodic features. |
|
|
""" |
|
|
temp_path = None |
|
|
try: |
|
|
if start_ms >= end_ms or end_ms <= 0: |
|
|
logger.warning("Invalid audio segment times, returning default features") |
|
|
return { |
|
|
'duration': 0.0, 'mean_pitch': 0.0, 'min_pitch': 0.0, 'max_pitch': 0.0, 'pitch_sd': 0.0, |
|
|
'intensityMean': 0.0, 'intensityMin': 0.0, 'intensityMax': 0.0, 'intensitySD': 0.0, |
|
|
'silence_ratio': 0.0 |
|
|
} |
|
|
audio = AudioSegment.from_file(audio_path) |
|
|
segment = audio[start_ms:end_ms] |
|
|
temp_path = os.path.join(OUTPUT_DIR, f"temp_{uuid.uuid4()}.wav") |
|
|
segment.export(temp_path, format="wav") |
|
|
y, sr = librosa.load(temp_path, sr=16000) |
|
|
pitches = librosa.piptrack(y=y, sr=sr)[1] |
|
|
pitches = pitches[pitches > 0] |
|
|
rms = librosa.feature.rms(y=y)[0] |
|
|
silence_threshold = np.mean(rms) * 0.1 |
|
|
silence = rms < silence_threshold |
|
|
silence_ratio = np.sum(silence) / len(rms) if len(rms) > 0 else 0.0 |
|
|
features = { |
|
|
'duration': (end_ms - start_ms) / 1000, |
|
|
'mean_pitch': float(np.mean(pitches)) if len(pitches) > 0 else 0.0, |
|
|
'min_pitch': float(np.min(pitches)) if len(pitches) > 0 else 0.0, |
|
|
'max_pitch': float(np.max(pitches)) if len(pitches) > 0 else 0.0, |
|
|
'pitch_sd': float(np.std(pitches)) if len(pitches) > 0 else 0.0, |
|
|
'intensityMean': float(np.mean(rms)), |
|
|
'intensityMin': float(np.min(rms)), |
|
|
'intensityMax': float(np.max(rms)), |
|
|
'intensitySD': float(np.std(rms)), |
|
|
'silence_ratio': float(silence_ratio) |
|
|
} |
|
|
os.remove(temp_path) |
|
|
return features |
|
|
except Exception as e: |
|
|
logger.error(f"Feature extraction failed: {str(e)}") |
|
|
if temp_path and os.path.exists(temp_path): |
|
|
os.remove(temp_path) |
|
|
return { |
|
|
'duration': 0.0, 'mean_pitch': 0.0, 'min_pitch': 0.0, 'max_pitch': 0.0, 'pitch_sd': 0.0, |
|
|
'intensityMean': 0.0, 'intensityMin': 0.0, 'intensityMax': 0.0, 'intensitySD': 0.0, |
|
|
'silence_ratio': 0.0 |
|
|
} |
|
|
|
|
|
def transcribe(audio_path: str) -> Dict: |
|
|
"""Transcribe audio using AssemblyAI API. |
|
|
|
|
|
Args: |
|
|
audio_path (str): Path to the audio file. |
|
|
|
|
|
Returns: |
|
|
Dict: Transcription result. |
|
|
""" |
|
|
try: |
|
|
with open(audio_path, 'rb') as f: |
|
|
upload_response = requests.post( |
|
|
"https://api.assemblyai.com/v2/upload", |
|
|
headers={"authorization": ASSEMBLYAI_KEY}, |
|
|
data=f |
|
|
) |
|
|
audio_url = upload_response.json()['upload_url'] |
|
|
transcript_response = requests.post( |
|
|
"https://api.assemblyai.com/v2/transcript", |
|
|
headers={"authorization": ASSEMBLYAI_KEY}, |
|
|
json={ |
|
|
"audio_url": audio_url, |
|
|
"speaker_labels": True, |
|
|
"filter_profanity": True |
|
|
} |
|
|
) |
|
|
transcript_id = transcript_response.json()['id'] |
|
|
while True: |
|
|
result = requests.get( |
|
|
f"https://api.assemblyai.com/v2/transcript/{transcript_id}", |
|
|
headers={"authorization": ASSEMBLYAI_KEY} |
|
|
).json() |
|
|
if result['status'] == 'completed': |
|
|
return result |
|
|
elif result['status'] == 'error': |
|
|
raise Exception(result['error']) |
|
|
time.sleep(5) |
|
|
except Exception as e: |
|
|
logger.error(f"Transcription failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
def enhance_diarization(wav_file: str, transcript: Dict) -> List[Dict]: |
|
|
"""Enhance speaker diarization using pyannote. |
|
|
|
|
|
Args: |
|
|
wav_file (str): Path to the WAV file. |
|
|
transcript (Dict): Transcription result. |
|
|
|
|
|
Returns: |
|
|
List[Dict]: Enhanced utterances with speaker IDs. |
|
|
""" |
|
|
try: |
|
|
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=HF_TOKEN) |
|
|
diarization = pipeline(wav_file) |
|
|
utterances = transcript.get('utterances', []) |
|
|
for u in utterances: |
|
|
start = u['start'] / 1000 |
|
|
end = u['end'] / 1000 |
|
|
speaker = None |
|
|
max_overlap = 0 |
|
|
for turn, _, spk in diarization.itertracks(yield_label=True): |
|
|
overlap = min(end, turn.end) - max(start, turn.start) |
|
|
if overlap > max_overlap: |
|
|
max_overlap = overlap |
|
|
speaker = spk |
|
|
u['speaker_id'] = speaker if speaker else u.get('speaker_id', 'unknown') |
|
|
return utterances |
|
|
except Exception as e: |
|
|
logger.warning(f"Pyannote diarization failed: {str(e)}") |
|
|
return transcript.get('utterances', []) |
|
|
|
|
|
def process_utterance(utterance, full_audio, wav_file): |
|
|
"""Process a single utterance to extract speaker embedding and ID. |
|
|
|
|
|
Args: |
|
|
utterance (Dict): Utterance data. |
|
|
full_audio (AudioSegment): Full audio segment. |
|
|
wav_file (str): Path to the WAV file. |
|
|
|
|
|
Returns: |
|
|
Dict: Processed utterance with speaker info. |
|
|
""" |
|
|
temp_path = None |
|
|
try: |
|
|
start = utterance['start'] |
|
|
end = utterance['end'] |
|
|
if start >= end or end <= 0: |
|
|
logger.warning(f"Invalid utterance times: start={start}, end={end}") |
|
|
return { |
|
|
**utterance, |
|
|
'speaker': 'Unknown', |
|
|
'speaker_id': 'unknown', |
|
|
'embedding': None |
|
|
} |
|
|
segment = full_audio[start:end] |
|
|
temp_path = os.path.join(OUTPUT_DIR, f"temp_{uuid.uuid4()}.wav") |
|
|
segment.export(temp_path, format="wav") |
|
|
with torch.no_grad(): |
|
|
embedding = speaker_model.get_embedding(temp_path).cpu().numpy() |
|
|
embedding_list = embedding.flatten().tolist() |
|
|
query_result = index.query( |
|
|
vector=embedding_list, |
|
|
top_k=1, |
|
|
include_metadata=True |
|
|
) |
|
|
if query_result['matches'] and query_result['matches'][0]['score'] > 0.7: |
|
|
speaker_id = query_result['matches'][0]['id'] |
|
|
speaker_name = query_result['matches'][0]['metadata']['speaker_name'] |
|
|
else: |
|
|
speaker_id = f"unknown_{uuid.uuid4().hex[:6]}" |
|
|
speaker_name = f"Speaker_{speaker_id[-4:]}" |
|
|
index.upsert([(speaker_id, embedding_list, {"speaker_name": speaker_name})]) |
|
|
os.remove(temp_path) |
|
|
return { |
|
|
**utterance, |
|
|
'speaker': speaker_name, |
|
|
'speaker_id': speaker_id, |
|
|
'embedding': embedding_list |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Utterance processing failed: {str(e)}") |
|
|
if temp_path and os.path.exists(temp_path): |
|
|
os.remove(temp_path) |
|
|
return { |
|
|
**utterance, |
|
|
'speaker': 'Unknown', |
|
|
'speaker_id': 'unknown', |
|
|
'embedding': None |
|
|
} |
|
|
|
|
|
def identify_speakers(transcript: Dict, wav_file: str) -> List[Dict]: |
|
|
"""Identify speakers in the transcript. |
|
|
|
|
|
Args: |
|
|
transcript (Dict): Transcription result. |
|
|
wav_file (str): Path to the WAV file. |
|
|
|
|
|
Returns: |
|
|
List[Dict]: Utterances with speaker information. |
|
|
""" |
|
|
try: |
|
|
full_audio = AudioSegment.from_wav(wav_file) |
|
|
utterances = transcript.get('utterances', []) |
|
|
if not utterances: |
|
|
logger.error("No utterances found in transcript") |
|
|
raise ValueError("Empty transcript") |
|
|
utterances = enhance_diarization(wav_file, transcript) |
|
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
|
futures = [ |
|
|
executor.submit(process_utterance, utterance, full_audio, wav_file) |
|
|
for utterance in utterances |
|
|
] |
|
|
results = [f.result() for f in futures] |
|
|
return results |
|
|
except Exception as e: |
|
|
logger.error(f"Speaker identification failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
def get_sentiment_score(text: str) -> float: |
|
|
"""Calculate sentiment polarity using TextBlob if available, else return neutral score. |
|
|
|
|
|
Args: |
|
|
text (str): Text to analyze. |
|
|
|
|
|
Returns: |
|
|
float: Sentiment score. |
|
|
""" |
|
|
if TextBlob is None: |
|
|
logger.warning("TextBlob not installed. Returning neutral sentiment score (0.0).") |
|
|
return 0.0 |
|
|
try: |
|
|
blob = TextBlob(text) |
|
|
return blob.sentiment.polarity |
|
|
except Exception as e: |
|
|
logger.warning(f"Sentiment analysis failed for text '{text}': {str(e)}") |
|
|
return 0.0 |
|
|
|
|
|
def get_text_embedding(text: str) -> List[float]: |
|
|
"""Extract semantic embeddings using DistilBERT. |
|
|
|
|
|
Args: |
|
|
text (str): Text to embed. |
|
|
|
|
|
Returns: |
|
|
List[float]: Text embedding. |
|
|
""" |
|
|
try: |
|
|
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128).to(device) |
|
|
with torch.no_grad(): |
|
|
outputs = llm_model(**inputs) |
|
|
return outputs.last_hidden_state.mean(dim=1).cpu().numpy().flatten().tolist() |
|
|
except Exception as e: |
|
|
logger.warning(f"Text embedding failed for '{text}': {str(e)}") |
|
|
return [0.0] * 768 |
|
|
|
|
|
def group_speakers_by_role(utterances: List[Dict]) -> Dict[str, str]: |
|
|
"""Group speakers by their dominant role. |
|
|
|
|
|
Args: |
|
|
utterances (List[Dict]): List of utterances. |
|
|
|
|
|
Returns: |
|
|
Dict[str, str]: Mapping of speaker IDs to roles. |
|
|
""" |
|
|
try: |
|
|
speaker_roles = {} |
|
|
open_question_starters = [ |
|
|
'tell me', 'describe', 'walk me through', 'explain', 'share', |
|
|
'give me', 'talk about', 'discuss' |
|
|
] |
|
|
exploratory_question_indicators = [ |
|
|
'can i ask', 'what about', 'could you tell', 'is it', 'are there', |
|
|
'what’s the', 'how does', 'may i' |
|
|
] |
|
|
for speaker_id in set(u['speaker_id'] for u in utterances): |
|
|
speaker_utterances = [u for u in utterances if u['speaker_id'] == speaker_id] |
|
|
interviewer_count = sum(1 for u in speaker_utterances if ( |
|
|
u['text'].endswith('?') and not any(u['text'].lower().startswith(ind) for ind in exploratory_question_indicators) or |
|
|
any(u['text'].lower().startswith(s) for s in open_question_starters) |
|
|
)) |
|
|
speaker_roles[speaker_id] = 'Interviewer' if interviewer_count / len(speaker_utterances) > 0.5 else 'Interviewee' |
|
|
return speaker_roles |
|
|
except Exception as e: |
|
|
logger.error(f"Speaker grouping failed: {str(e)}") |
|
|
return {} |
|
|
|
|
|
def train_role_classifier(utterances: List[Dict], use_crf: bool = True) -> Tuple: |
|
|
"""Train a role classifier, using CRF if specified, else RandomForest. |
|
|
|
|
|
Args: |
|
|
utterances (List[Dict]): List of utterances. |
|
|
use_crf (bool): Whether to use CRF or RandomForest. |
|
|
|
|
|
Returns: |
|
|
Tuple: Classifier, vectorizer, scaler (or None for CRF). |
|
|
""" |
|
|
try: |
|
|
if use_crf: |
|
|
trainer = pycrfsuite.Trainer(verbose=False) |
|
|
sequence = [] |
|
|
labels = [] |
|
|
open_question_starters = [ |
|
|
'tell me', 'describe', 'walk me through', 'explain', 'share', |
|
|
'give me', 'talk about', 'discuss' |
|
|
] |
|
|
exploratory_question_indicators = [ |
|
|
'can i ask', 'what about', 'could you tell', 'is it', 'are there', |
|
|
'what’s the', 'how does', 'may i' |
|
|
] |
|
|
for i, u in enumerate(utterances): |
|
|
text = u.get('text', '').lower().strip() |
|
|
if not text: |
|
|
continue |
|
|
prosodic = u.get('prosodic_features', {}) |
|
|
feat = { |
|
|
'text': text, |
|
|
'is_question': str(text.endswith('?')), |
|
|
'word_count': str(len(text.split())), |
|
|
'starts_with_tell': str(any(text.startswith(s) for s in open_question_starters)), |
|
|
'prev_is_question': str(utterances[i-1]['text'].endswith('?')) if i > 0 else 'False', |
|
|
'sentiment': str(get_sentiment_score(text)), |
|
|
'silence_ratio': str(prosodic.get('silence_ratio', 0.0)) |
|
|
} |
|
|
sequence.append(feat) |
|
|
is_interviewer = ( |
|
|
(text.endswith('?') and not any(text.startswith(ind) for ind in exploratory_question_indicators)) or |
|
|
any(text.startswith(s) for s in open_question_starters) or |
|
|
(i < 2 and len(text.split()) < 10) or |
|
|
(i > 0 and utterances[i-1]['text'].endswith('?') and len(text.split()) < 5 and text.endswith('?')) |
|
|
) |
|
|
labels.append('Interviewer' if is_interviewer else 'Interviewee') |
|
|
if i == len(utterances) - 1 or u['speaker_id'] != utterances[i+1]['speaker_id']: |
|
|
trainer.append(sequence, labels) |
|
|
sequence = [] |
|
|
labels = [] |
|
|
trainer.train(os.path.join(OUTPUT_DIR, 'role_crf.model')) |
|
|
return None, None, None |
|
|
else: |
|
|
texts = [u['text'] for u in utterances if u.get('text', '').strip()] |
|
|
if not texts: |
|
|
logger.error("No valid texts found for role classifier training") |
|
|
raise ValueError("Empty text data for training") |
|
|
vectorizer = TfidfVectorizer(max_features=500, ngram_range=(1, 2)) |
|
|
X_text = vectorizer.fit_transform(texts) |
|
|
features = [] |
|
|
labels = [] |
|
|
open_question_starters = [ |
|
|
'tell me', 'describe', 'walk me through', 'explain', 'share', |
|
|
'give me', 'talk about', 'discuss' |
|
|
] |
|
|
exploratory_question_indicators = [ |
|
|
'can i ask', 'what about', 'could you tell', 'is it', 'are there', |
|
|
'what’s the', 'how does', 'may i' |
|
|
] |
|
|
for i, utterance in enumerate(utterances): |
|
|
text = utterance.get('text', '').lower().strip() |
|
|
if not text: |
|
|
continue |
|
|
prosodic = utterance.get('prosodic_features', {}) |
|
|
word_count = len(text.split()) |
|
|
is_question = text.endswith('?') |
|
|
prev_is_question = utterances[i-1]['text'].endswith('?') if i > 0 else False |
|
|
speaker_frequency = sum(1 for u in utterances[:i+1] if u.get('speaker_id') == utterance.get('speaker_id')) / (i+1 or 1) |
|
|
sentiment_score = get_sentiment_score(text) |
|
|
feat = [ |
|
|
prosodic.get('duration', 0.0), |
|
|
prosodic.get('mean_pitch', 0.0), |
|
|
prosodic.get('min_pitch', 0.0), |
|
|
prosodic.get('max_pitch', 0.0), |
|
|
prosodic.get('pitch_sd', 0.0), |
|
|
prosodic.get('intensityMean', 0.0), |
|
|
prosodic.get('intensityMin', 0.0), |
|
|
prosodic.get('intensityMax', 0.0), |
|
|
prosodic.get('intensitySD', 0.0), |
|
|
prosodic.get('silence_ratio', 0.0), |
|
|
word_count, |
|
|
speaker_frequency, |
|
|
int(prev_is_question), |
|
|
sentiment_score |
|
|
] |
|
|
feat.extend(X_text[i].toarray()[0].tolist()) |
|
|
feat.extend(get_text_embedding(text)) |
|
|
doc = nlp(text) |
|
|
feat.extend([ |
|
|
int(is_question), |
|
|
len(re.findall(r'\b(why|how|what|when|where|who|which)\b', text)), |
|
|
sum(1 for token in doc if token.pos_ == 'VERB'), |
|
|
sum(1 for token in doc if token.pos_ == 'NOUN'), |
|
|
int(i < 2), |
|
|
int(any(text.startswith(starter) for starter in open_question_starters)), |
|
|
int(any(text.startswith(ind) for ind in exploratory_question_indicators)), |
|
|
prosodic.get('duration', 0.0) / word_count if word_count > 0 else 0.0, |
|
|
1 if utterance.get('initial_role', 'Interviewee') == 'Interviewer' else 0 |
|
|
]) |
|
|
is_interviewer = ( |
|
|
(is_question and not any(text.startswith(ind) for ind in exploratory_question_indicators)) or |
|
|
any(text.startswith(starter) for starter in open_question_starters) or |
|
|
(i < 2 and word_count < 10) or |
|
|
(prev_is_question and word_count < 5 and is_question) |
|
|
) |
|
|
labels.append(0 if is_interviewer else 1) |
|
|
features.append(feat) |
|
|
if not features or not labels: |
|
|
logger.error("No features or labels generated for training") |
|
|
raise ValueError("No valid training data") |
|
|
scaler = StandardScaler() |
|
|
X = scaler.fit_transform(features) |
|
|
clf = RandomForestClassifier( |
|
|
n_estimators=150, |
|
|
max_depth=10, |
|
|
random_state=42, |
|
|
class_weight='balanced' |
|
|
) |
|
|
clf.fit(X, labels) |
|
|
joblib.dump(clf, os.path.join(OUTPUT_DIR, 'role_classifier.pkl')) |
|
|
joblib.dump(vectorizer, os.path.join(OUTPUT_DIR, 'text_vectorizer.pkl')) |
|
|
joblib.dump(scaler, os.path.join(OUTPUT_DIR, 'feature_scaler.pkl')) |
|
|
return clf, vectorizer, scaler |
|
|
except Exception as e: |
|
|
logger.error(f"Classifier training failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
def enforce_conversation_flow(results: List[Dict]) -> List[Dict]: |
|
|
"""Enforce logical conversation flow for role assignments. |
|
|
|
|
|
Args: |
|
|
results (List[Dict]): List of utterances with roles. |
|
|
|
|
|
Returns: |
|
|
List[Dict]: Updated utterances. |
|
|
""" |
|
|
try: |
|
|
for i in range(1, len(results)): |
|
|
if results[i-1]['role'] == 'Interviewer' and results[i]['text'].endswith('?'): |
|
|
results[i]['role'] = 'Interviewer' |
|
|
elif results[i-1]['role'] == 'Interviewer' and not results[i]['text'].endswith('?'): |
|
|
results[i]['role'] = 'Interviewee' |
|
|
return results |
|
|
except Exception as e: |
|
|
logger.error(f"Conversation flow enforcement failed: {str(e)}") |
|
|
return results |
|
|
def classify_roles(utterances: List[Dict], clf=None, vectorizer=None, scaler=None) -> List[Dict]: |
|
|
"""Classify roles using CRF if available, else RandomForest. |
|
|
|
|
|
Args: |
|
|
utterances (List[Dict]): List of utterances. |
|
|
clf: Classifier model. |
|
|
vectorizer: Text vectorizer. |
|
|
scaler: Feature scaler. |
|
|
|
|
|
Returns: |
|
|
List[Dict]: Utterances with roles. |
|
|
""" |
|
|
try: |
|
|
if os.path.exists(os.path.join(OUTPUT_DIR, 'role_crf.model')): |
|
|
tagger = pycrfsuite.Tagger() |
|
|
tagger.open(os.path.join(OUTPUT_DIR, 'role_crf.model')) |
|
|
sequence = [] |
|
|
open_question_starters = [ |
|
|
'tell me', 'describe', 'walk me through', 'explain', 'share', |
|
|
'give me', 'talk about', 'discuss' |
|
|
] |
|
|
exploratory_question_indicators = [ |
|
|
'can i ask', 'what about', 'could you tell', 'is it', 'are there', |
|
|
'what’s the', 'how does', 'may i' |
|
|
] |
|
|
for i, u in enumerate(utterances): |
|
|
text = u.get('text', '').lower().strip() |
|
|
if not text: |
|
|
sequence.append({'text': '', 'role': 'Unknown'}) |
|
|
continue |
|
|
prosodic = u.get('prosodic_features', {}) |
|
|
feat = { |
|
|
'text': text, |
|
|
'is_question': str(text.endswith('?')), |
|
|
'word_count': str(len(text.split())), |
|
|
'starts_with_tell': str(any(text.startswith(s) for s in open_question_starters)), |
|
|
'prev_is_question': str(utterances[i-1]['text'].endswith('?')) if i > 0 else 'False', |
|
|
'sentiment': str(get_sentiment_score(text)), |
|
|
'silence_ratio': str(prosodic.get('silence_ratio', 0.0)) |
|
|
} |
|
|
sequence.append(feat) |
|
|
roles = tagger.tag(sequence) |
|
|
results = [{**u, 'role': role} for u, role in zip(utterances, roles)] |
|
|
else: |
|
|
texts = [u['text'] for u in utterances if u.get('text', '').strip()] |
|
|
if not texts: |
|
|
logger.error("No valid texts found for role classification") |
|
|
raise ValueError("Empty text data for classification") |
|
|
X_text = vectorizer.transform(texts) |
|
|
open_question_starters = [ |
|
|
'tell me', 'describe', 'walk me through', 'explain', 'share', |
|
|
'give me', 'talk about', 'discuss' |
|
|
] |
|
|
exploratory_question_indicators = [ |
|
|
'can i ask', 'what about', 'could you tell', 'is it', 'are there', |
|
|
'what’s the', 'how does', 'may i' |
|
|
] |
|
|
results = [] |
|
|
for i, utterance in enumerate(utterances): |
|
|
text = utterance.get('text', '').lower().strip() |
|
|
if not text: |
|
|
results.append({**utterance, 'role': 'Unknown'}) |
|
|
continue |
|
|
prosodic = utterance.get('prosodic_features', {}) |
|
|
word_count = len(text.split()) |
|
|
is_question = text.endswith('?') |
|
|
prev_is_question = utterances[i-1]['text'].endswith('?') if i > 0 else False |
|
|
speaker_frequency = sum(1 for u in utterances[:i+1] if u.get('speaker_id') == utterance.get('speaker_id')) / (i + 1 or 1) |
|
|
sentiment_score = get_sentiment_score(text) |
|
|
feat = [ |
|
|
prosodic.get('duration', 0.0), |
|
|
prosodic.get('mean_pitch', 0.0), |
|
|
prosodic.get('min_pitch', 0.0), |
|
|
prosodic.get('max_pitch', 0.0), |
|
|
prosodic.get('pitch_sd', 0.0), |
|
|
prosodic.get('intensityMean', 0.0), |
|
|
prosodic.get('intensityMin', 0.0), |
|
|
prosodic.get('intensityMax', 0.0), |
|
|
prosodic.get('intensitySD', 0.0), |
|
|
prosodic.get('silence_ratio', 0.0), |
|
|
word_count, |
|
|
speaker_frequency, |
|
|
int(prev_is_question), |
|
|
sentiment_score |
|
|
] |
|
|
feat.extend(X_text[i].toarray()[0].tolist()) |
|
|
feat.extend(get_text_embedding(text)) |
|
|
doc = nlp(text) |
|
|
feat.extend([ |
|
|
int(is_question), |
|
|
len(re.findall(r'\b(why|how|what|when|where|who|which)\b', text)), |
|
|
sum(1 for token in doc if token.pos_ == 'VERB'), |
|
|
sum(1 for token in doc if token.pos_ == 'NOUN'), |
|
|
int(i < 2), |
|
|
int(any(text.startswith(starter) for starter in open_question_starters)), |
|
|
int(any(text.startswith(ind) for ind in exploratory_question_indicators)), |
|
|
prosodic.get('duration', 0.0) / word_count if word_count > 0 else 0.0, |
|
|
1 if utterance.get('initial_role', '') == 'Interviewer' else 0 |
|
|
]) |
|
|
X = scaler.transform([feat]) |
|
|
role = 'Interviewer' if clf.predict(X)[0] == 0 else 'Interviewee' |
|
|
prob = clf.predict_proba(X)[0] |
|
|
logger.debug(f"Utterance {i}: Text='{text[:20]}...', Role={role}, Prob={prob}") |
|
|
results.append({**utterance, 'role': role}) |
|
|
results = enforce_conversation_flow(results) |
|
|
if all(r['role'] == 'Interviewer' for r in results): |
|
|
logger.warning("No Interviewee detected. Applying enhanced reasoning.") |
|
|
candidates = [ |
|
|
i for i, r in enumerate(results) |
|
|
if ( |
|
|
not r['text'].endswith('?') or |
|
|
any(r['text'].lower().startswith(ind) for ind in exploratory_question_indicators) |
|
|
) and |
|
|
get_sentiment_score(r.get('text', '')) >= 0 and |
|
|
re.search(r'\b(i|my|me)\b', r['text'].lower()) |
|
|
] |
|
|
if candidates: |
|
|
max_duration_idx = max(candidates, key=lambda x: results[x]['prosodic_features']['duration']) |
|
|
results[max_duration_idx]['role'] = 'Interviewee' |
|
|
else: |
|
|
logger.warning("No suitable Interviewee candidate found. Using longest utterance.") |
|
|
max_duration_idx = max(range(len(results)), key=lambda x: results[x]['prosodic_features']['duration']) |
|
|
results[max_duration_idx]['role'] = 'Interviewee' |
|
|
return results |
|
|
except Exception as e: |
|
|
logger.error(f"Role classification failed: {str(e)}") |
|
|
raise |
|
|
|
|
|
def analyze_interviewee(audio_path: str, utterances: List[Dict]) -> Dict: |
|
|
"""Analyze interviewee voice characteristics. |
|
|
|
|
|
Args: |
|
|
audio_path (str): Path to the audio file. |
|
|
utterances: List[Dict]): List of utterance data. |
|
|
""" |
|
|
try: |
|
|
y, sr = librosa.load(audio_path, sr=16000) |
|
|
interviewee_utterances = [u for u in utterances if u.get('role') == 'Interviewee'] |
|
|
if not interviewee_utterances: |
|
|
logger.warning("No interviewee utterances found for voice analysis.") |
|
|
return {'error': 'No interviewee utterances found'} |
|
|
segments = [] |
|
|
for u in interviewee_utterances: |
|
|
start = int(u['start'] * sr / 1000) |
|
|
end = int(u['end'] * sr / 1000) |
|
|
if start < end and end <= len(y): |
|
|
segments.append(y[start:end]) |
|
|
if not segments: |
|
|
logger.warning("No valid audio segments found for interviewee voice analysis.") |
|
|
return {'error': 'No valid audio segments found'} |
|
|
combined_audio = np.concatenate(segments) |
|
|
total_duration = sum(u['prosodic_features']['duration'] for u in interviewee_utterances if u.get('prosodic_features', {}).get('duration', 0)) |
|
|
total_words = sum(len(u['text'].split()) for u in interviewee_utterances if u.get('text', '')) |
|
|
speaking_rate = total_words / total_duration if total_duration > 0 else 0 |
|
|
filler_words = ['um', 'uh', 'like', 'you know', 'so', 'i mean'] |
|
|
filler_count = sum( |
|
|
sum(u['text'].lower().count(fw) for fw in filler_words) |
|
|
for u in interviewee_utterances |
|
|
) |
|
|
filler_ratio = filler_count / total_words if total_words > 0 else 0 |
|
|
all_words = ' '.join(u['text'].lower() for u in interviewee_utterances if u.get('text')).split() |
|
|
word_pairs = {} |
|
|
for i, u in range(len(all_words) - 1): |
|
|
bigram_pair = (all_words[i], all_words[i + 1]) |
|
|
word_pairs[bigram] += word_pairs.get(bigram_pair, 0) + 1 |
|
|
repetition_score = sum(1 for count in word_pairs.values() if count > 1) / len(word_pairs) if word_pairs else 0 |
|
|
pitches = [] |
|
|
for segment in segments: |
|
|
f0 = librosa.pyin(segment, fmin=80, fmax=300, sr=sr)[0] |
|
|
pitches.extend(f0[~np.isnan(f0)]) |
|
|
pitch_mean = np.mean(pitches) if pitches else 0 |
|
|
pitch_std = np.std(pitches) if pitches else 0 |
|
|
jitter = np.mean(np.abs(np.diff(pitches))) / pitch_mean if pitch_mean > 0 and len(pitches) > 0 else 0 |
|
|
intensities = [] |
|
|
for segment in segments: |
|
|
rms = librosa.feature.rms(y=segment)[0] |
|
|
intensities.extend(rms) |
|
|
intensity_mean = np.mean(intensities) if intensities else 0 |
|
|
intensity_std = np.std(intensities) if intensities else 0 |
|
|
shimmer = np.mean(np.abs(np.diff(intensities))) / intensity_mean if len( |
|
|
intensities) > 1 and intensity_mean > 0 else 0 |
|
|
anxiety_score = 0.6 * (pitch_std / pitch_mean if pitch_mean > 0 else 0) + 0.4 * (jitter + shimmer) |
|
|
confidence_score = 0.7 * (1 / (1 + intensity_std)) + 0.3 * (1 / (1 + filler_ratio)) |
|
|
hesitation_score = filler_ratio + repetition_score |
|
|
|
|
|
anxiety_level = 'high' if anxiety_score > 0.7 else 'moderate' if anxiety_score > 0.3 else 'low' |
|
|
confidence_level = 'high' if confidence_score > 0.7 else 'moderate' if confidence_score > 0.5 else 'low' |
|
|
fluency_level = 'fluent' if (filler_ratio < 0.05 and repetition_score < 0.1) else 'moderate' if (filler_ratio < 0.1 and repetition_score < 0.2) else 'disfluent' |
|
|
|
|
|
return { |
|
|
'speaking_rate': float(round(speaking_rate, 2)), |
|
|
'filler_ratio': float(round(filler_ratio, 4)), |
|
|
'repetition_score': float(round(repetition_score, 4)), |
|
|
'pitch_analysis': { |
|
|
'mean': float(round(pitch_mean, 2)), |
|
|
'variance': float(round(pitch_std, 2)), |
|
|
'jitter': float(round(jitter, 4)) |
|
|
}, |
|
|
'intensity': { |
|
|
'mean': float(round(intensity_mean, 2)), |
|
|
'std_dev': float(round(intensity_std, 2)), |
|
|
'shimmer': float(round(shimmer, 4)) |
|
|
}, |
|
|
'scores': { |
|
|
'anxiety': float(round(anxiety_score, 4)), |
|
|
'confidence': float(round(confidence_score, 4)), |
|
|
'hesitation': float(round(hesitation_score, 4)) |
|
|
}, |
|
|
'levels': { |
|
|
'anxiety': anxiety_level, |
|
|
'confidence': confidence_level, |
|
|
'fluency': fluency_level |
|
|
} |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Voice analysis failed: {str(e)}") |
|
|
return {'error': str(e)} |
|
|
|
|
|
def generate_anxiety_confidence_chart(scores: Dict, output: io.BytesIO): |
|
|
"""Generate a bar chart for anxiety and confidence scores. |
|
|
|
|
|
Args: |
|
|
scores (Dict): Scores dictionary. |
|
|
output (io.BytesIO): Output buffer for the chart image. |
|
|
""" |
|
|
try: |
|
|
labels = ['Anxiety', 'Confidence'] |
|
|
values = [scores.get('anxiety', 0.0), scores.get('confidence', 0.0)] |
|
|
fig, ax = plt.subplots(figsize=(4, 2.5)) |
|
|
ax.bar(labels, values, color=['lightcoral', 'lightskyblue']) |
|
|
ax.set_ylabel('Score') |
|
|
ax.set_title('Anxiety vs. Confidence') |
|
|
ax.set_ylim(0, 1.0) |
|
|
for i, v in enumerate(values): |
|
|
ax.text(i, v + 0.05, f"{v:.2f}", color='black', ha='center', fontweight='bold') |
|
|
plt.tight_layout() |
|
|
plt.savefig(output, format='png') |
|
|
plt.close(fig) |
|
|
output.seek(0) |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating chart: {str(e)}") |
|
|
raise |
|
|
|
|
|
def generate_voice_interpretation(analysis: Dict) -> str: |
|
|
"""Generate a textual interpretation of voice analysis. |
|
|
|
|
|
Args: |
|
|
analysis (Dict): Voice analysis results. |
|
|
|
|
|
Returns: |
|
|
str: Interpretation text. |
|
|
""" |
|
|
if 'error' in analysis: |
|
|
return "Voice analysis not available." |
|
|
interpretation_lines = [ |
|
|
"Voice Analysis Summary:", |
|
|
f"- Speaking Rate: {analysis.get('speaking_rate', 0.0):.2f} words/sec", |
|
|
f"- Filler Words: {analysis.get('filler_ratio', 0.0) * 100:.1f}% of total words", |
|
|
f"- Repetition Score: {analysis.get('repetition_score', 0.0):.4f}", |
|
|
f"- Anxiety Level: {analysis.get('levels', {}).get('anxiety', '').upper()} (score: {analysis.get('scores', {}).get('anxiety', 0.0):.4f})", |
|
|
f"- Confidence Level: {analysis.get('levels', {}).get('confidence', '').upper()} (score: {analysis.get('scores', {}).get('confidence', 0.0):.4f})", |
|
|
f"- Fluency Level: {analysis.get('levels', {}).get('fluency', '').upper()}", |
|
|
"", |
|
|
"Detailed Interpretation:", |
|
|
"1. A higher speaking rate can indicate nervousness or enthusiasm.", |
|
|
"2. Filler words and repetitions may impact speech clarity and professionalism.", |
|
|
"3. Anxiety is measured through pitch variability and voice stability.", |
|
|
"4. Confidence is assessed via vocal intensity and consistency.", |
|
|
"5. Fluency combines filler word usage and speech rate." |
|
|
] |
|
|
return "\n".join(interpretation_lines) |
|
|
|
|
|
def calculate_acceptance_probability(segments: Dict) -> float: |
|
|
"""Calculate the probability of candidate acceptance. |
|
|
|
|
|
Args: |
|
|
segments (Dict): Analysis data. |
|
|
|
|
|
Returns: |
|
|
float: Acceptance probability as a percentage. |
|
|
""" |
|
|
voice = segments.get('voice', {}) |
|
|
if 'error' in voice: |
|
|
return 0.0 |
|
|
weights = { |
|
|
'confidence': 0.4, |
|
|
'anxiety': -0.3, |
|
|
'fluency': 0.2, |
|
|
'speaking_rate': 0.1, |
|
|
'filler_repetition': -0.1, |
|
|
'content_strengths': 0.2 |
|
|
} |
|
|
confidence_score = voice.get('scores', {}).get('confidence', 0.0) |
|
|
anxiety_score = voice.get('scores', {}).get('anxiety', 0.0) |
|
|
fluency_level = voice.get('levels', {}).get('fluency', 'disfluent') |
|
|
speaking_rate = voice.get('speaking_rate', 0.0) |
|
|
filler_ratio = voice.get('filler_ratio', 0.0) |
|
|
repetition_score = voice.get('repetition_score', 0.0) |
|
|
|
|
|
fluency_map = {'fluent': 1.0, 'moderate': 0.0, 'disfluent': 0.0} |
|
|
fluency_score = fluency_map.get(fluency_level, 0.5) |
|
|
|
|
|
ideal_speaking_rate = 2.0 |
|
|
rate_deviation = abs(speaking_rate - ideal_speaking_rate) |
|
|
speaking_rate_score = max(0.0, 1.0 - (rate_deviation / ideal_speaking_rate)) |
|
|
filler_repetition_score = max(0.0, 1.0 - ((filler_ratio + repetition_score) / 2.0)) |
|
|
content_score = 0.0 if segments.get('content', {}).get('duration', 0) > 0 else 0.0 |
|
|
|
|
|
raw_score = ( |
|
|
confidence_score * weights['confidence'] + |
|
|
(1.0 - anxiety_score) * abs(weights['anxiety']) + |
|
|
fluency_score * weights['fluency'] + |
|
|
speaking_rate * speaking_rate * speaking_rate_score * weights['speaking_rate'] + |
|
|
filler_repetition_score * abs(weights['filler_repetition']) + |
|
|
content_score * weights['content_strengths'] |
|
|
) |
|
|
|
|
|
min_possible_score = 0.0 |
|
|
max_possible_score = sum(abs(w) for w in weights.values()) |
|
|
normalized_score = (raw_score - min_possible_score) / (max_possible_score - min_possible_score) if max_possible_score > 0 else 0.5 |
|
|
acceptance_score = max(0.0, min(1.0, normalized_score)) |
|
|
return float(round(acceptance_score * 100, 2)) |
|
|
|
|
|
def generate_report(content: Dict, segments: List[Dict]) -> str: |
|
|
"""Generate a professional interview analysis report. |
|
|
|
|
|
Args: |
|
|
content (Dict): Analysis data. |
|
|
segments (List[Dict]): List of utterance segments. |
|
|
|
|
|
Returns: |
|
|
str: Report text. |
|
|
""" |
|
|
try: |
|
|
voice_analysis = content.get('voice', {}) |
|
|
voice_interpretation = generate_voice_interpretation(voice_analysis) |
|
|
interviewee_utterances = [ |
|
|
f"Speaker {u['speaker_id']}: {u['role']}: {u.get('text', '')}" |
|
|
for u in segments |
|
|
if u.get('role') == 'Interviewee' |
|
|
][:5] |
|
|
acceptance_prob = calculate_acceptance_probability({ |
|
|
'voice': voice_analysis, |
|
|
'content': content |
|
|
}) |
|
|
acceptance_line = f""" |
|
|
**Estimated Acceptance Probability: {acceptance_prob:.2f}%** |
|
|
|
|
|
""" |
|
|
if acceptance_prob >= 80: |
|
|
acceptance_line += str("This indicates a very strong candidate.") |
|
|
|
|
|
elif acceptance_prob >= 0.5: |
|
|
acceptance_line += str("This indicates a solid candidate with room for improvement.") |
|
|
|
|
|
else: |
|
|
acceptance_line += str("This candidate may require significant improvement.") |
|
|
|
|
|
total_duration = sum( |
|
|
u.get('prosodic_features', {}).get('duration', 0) |
|
|
for u in segments |
|
|
) |
|
|
speaker_turns = len([u for u in segments if u.get('text', '').strip()]) |
|
|
speakers = list(set(u['speaker_id'] for u in segments if u.get('speaker_id', ''))) |
|
|
prompt = f""" |
|
|
As EvalBot, an AI interview analysis system, generate a professional report. |
|
|
Use clear headings and bullet points with '-'. |
|
|
{acceptance_line} |
|
|
**1. Summary** |
|
|
- Duration: {total_duration:.2f} seconds |
|
|
- Speaker turns: {speaker_turns} |
|
|
- Participants: {', '.join(speakers)} |
|
|
**2. Voice Insights** |
|
|
{voice_interpretation} |
|
|
**3. Key Responses** |
|
|
{chr(10).join(f"- {resp}" for resp in interviewee_utterances)} |
|
|
**4. Recommendations** |
|
|
- Focus on: |
|
|
- Communication Skills |
|
|
- - Content Delivery |
|
|
- Professional Presentation |
|
|
""" |
|
|
response = gemini_model.generate_content(prompt) |
|
|
return response.text |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating report failed: {str(e)}") |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
def create_pdf_document(text: str, content: Dict, output_path: str) -> bool: |
|
|
"""Create a PDF report from analysis data. |
|
|
|
|
|
Args: |
|
|
text (str): Report text to include. |
|
|
content (Dict): Analysis data. |
|
|
output_path (str): Path to save the PDF file. |
|
|
|
|
|
Returns: |
|
|
bool: True if PDF created successfully, False otherwise. |
|
|
""" |
|
|
try: |
|
|
doc = SimpleDocTemplate(output_path, pagesize=letter) |
|
|
styles = getSampleStyleSheet() |
|
|
h1_style = ParagraphStyle( |
|
|
name='Heading1', |
|
|
parent=styles['Heading1'], |
|
|
fontSize=16, |
|
|
alignment=1, |
|
|
textColor=colors.Color(0, 0.2, 0.4) |
|
|
) |
|
|
h2_style = ParagraphStyle( |
|
|
name='Heading2', |
|
|
parent=styles['Heading2'], |
|
|
fontSize=12, |
|
|
spaceBefore=10, |
|
|
textColor=colors.Color(0,0.6) |
|
|
) |
|
|
h3_style = ParagraphStyle( |
|
|
name='Heading3', |
|
|
parent=styles['Heading3'], |
|
|
fontSize=10, |
|
|
spaceBefore=8, |
|
|
textColor=colors.Color(0.266666666666666666666666666666666, 0.266666666666666666, 0.266666666666666666) |
|
|
) |
|
|
body_style = ParagraphStyle( |
|
|
name='BodyText', |
|
|
parent=styles['Normal'], |
|
|
fontSize=10, |
|
|
leading=12 |
|
|
) |
|
|
bullet_style = ParagraphStyle( |
|
|
name='Bullet', |
|
|
parent=styles['Normal'], |
|
|
fontSize=10, |
|
|
leftIndent=18, |
|
|
bulletIndent=10 |
|
|
) |
|
|
|
|
|
story = [] |
|
|
story.append(Paragraph("EvalBot Interview Analysis Report", h1_style)) |
|
|
story.append(Spacer(1, 0.2 * inch)) |
|
|
story.append(Paragraph(f"Date: {time.strftime('%Y-%m-%d')}", body_style)) |
|
|
story.append(Spacer(1, 0.3 * inch)) |
|
|
|
|
|
acceptance_prob = content.get('acceptance_probability' , 0.0) |
|
|
if acceptance_prob > 0: |
|
|
story.append(Paragraph("Candidate Evaluation", " h2_style")) |
|
|
prob_color = 'green' if acceptance_prob >= 70 else 'orange' if acceptance_prob >= 40 else 'red' |
|
|
story.append(Paragraph( |
|
|
f"<font color={prob_color} color=><strong>Acceptance Probability: {acceptance_prob:.2f}%</strong></font>", |
|
|
ParagraphStyle( |
|
|
name='Prob', |
|
|
fontSize=14, |
|
|
alignment=1 |
|
|
) |
|
|
)) |
|
|
story.append(Spacer(1, 0.2 * inch)) |
|
|
|
|
|
sections = {} |
|
|
current_section = None |
|
|
section_patterns = { |
|
|
r'^\s*\*\*1.*Summary\*\*': 'Summary', |
|
|
r'^\s*\*\*2.*Voice.*\*\*': 'Voice Insights', |
|
|
r'^\s*\*\*3.*Content.*\*\*': 'Content & Strengths', |
|
|
r'^\s*\*\*4.*Recommendations.*\*\*': 'Recommendations' |
|
|
} |
|
|
for line in text.split('\n'): |
|
|
matched = False |
|
|
for pattern, section_name in section_patterns.items(): |
|
|
if re.match(pattern, line): |
|
|
current_section = section_name |
|
|
sections[current_section_name] = [] |
|
|
matched = True |
|
|
break |
|
|
if not matched and current_section: |
|
|
sections[current_section].append(line) |
|
|
|
|
|
for section_num, (section_name, section_content) in enumerate(sections.items(), 1): |
|
|
story.append(Paragraph(f"{section_num}. {section_name}", h2_style)) |
|
|
story.append(Spacer(1, 0.1 * inch)) |
|
|
for line in section_content: |
|
|
if line.strip(): |
|
|
if line.strip().startswith('-'): |
|
|
story.append(Paragraph(line.strip()[1:].strip(), bullet_style)) |
|
|
else: |
|
|
story.append(Paragraph(line.strip(), body_style)) |
|
|
story.append(Spacer(1, 0.2 * inch)) |
|
|
|
|
|
voice_data = content.get('voice', {}) |
|
|
if voice_data and 'error' not in 'voice_data': |
|
|
table_data = [ |
|
|
['Metric', 'Value', 'Interpretation'], |
|
|
['Speaking Rate', f"{voice_data.get('speaking_rate', 0.0):.2f} words/sec", 'Average'], |
|
|
['Filler Words', f"{voice_data.get('filler_ratio', 0.0):.0) * 100:.1f}%", ''], |
|
|
['Repetition', f"{voice_data.get('repetition_score', 0.0):.4f}", ''], |
|
|
['Anxiety', voice_data.get('levels', {}).get('anxiety', '').upper(), f"Score: {voice_data.get('scores', {}).get('anxiety', 0.0):.4f}"], |
|
|
['Confidence', voice_data.get('levels', {}).get('confidence', '').upper(), f"Score: {voice_data.get('scores', {}).get('confidence', 0.0):.4f}"] |
|
|
] |
|
|
table = Table(table_data) |
|
|
table.setStyle(TableStyle([ |
|
|
('BACKGROUND', (0,0), (-1,-0), 'grey'), |
|
|
('TEXTCOLOR', (0,0), (-1,-0), colors.white), |
|
|
('ALIGN', (0,0), (-1,-1), 'LEFT'), |
|
|
('FONTNAME', (0,0), (-1,-0), 'Helvetica-Bold'), |
|
|
('GRID', (0,0), (-1,-1), (-1, 0.5), colors.black), |
|
|
('LEADING', (0,0), (-1,-1), (0, 12)) |
|
|
])) |
|
|
story.append(table) |
|
|
story.append(Spacer(1, 0.2 * inch)) |
|
|
|
|
|
chart_buffer = io.BytesIO() |
|
|
try: |
|
|
generate_anxiety_confidence_chart(voice_data.get('scores', {}), chart_buffer) |
|
|
img = Image(chart_buffer, width=3 * inch, height=1.8 * inch) |
|
|
story.append(img) |
|
|
story.append(Spacer(1, 0.2 * inch)) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to generate chart image: {e}") |
|
|
|
|
|
doc.build(story) |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to create PDF report: {str(e)}") |
|
|
return False |
|
|
|
|
|
def process_interview(audio_path: str) -> Dict: |
|
|
"""Process an interview audio file. |
|
|
|
|
|
Args: |
|
|
audio_path (str): Path to the audio file. |
|
|
|
|
|
Returns: |
|
|
Dict: Analysis results with file paths to outputs. |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Starting processing for: {audio_path}") |
|
|
wav_file = convert_to_wav(audio_path) |
|
|
|
|
|
logger.info("Starting transcription process") |
|
|
transcript = transcribe(wav_file) |
|
|
|
|
|
logger.info("Extracting prosodic audio features") |
|
|
utterances = transcript.get('utterances', []) |
|
|
for utterance in utterances: |
|
|
utterance['prosodic_features'] = extract_prosodic_features( |
|
|
wav_file, |
|
|
utterance['start'], |
|
|
utterance['end'] |
|
|
) |
|
|
|
|
|
logger.info("Identifying speakers in audio") |
|
|
results = identify_speakers(transcript, wav_file) |
|
|
|
|
|
logger.info("Grouping speakers by role assignment") |
|
|
speaker_roles = group_speakers_by_role(results) |
|
|
for result in results: |
|
|
result['role'] = speaker_roles.get(result['speaker_id'], 'Unknown') |
|
|
|
|
|
logger.info("Classifying speaker roles") |
|
|
clf, vectorizer, scaler = train_role_classifier(results) |
|
|
results = classify_roles(results, clf, vectorizer, scaler) |
|
|
|
|
|
logger.info("Analyzing interviewee voice characteristics") |
|
|
voice_analysis = analyze_interviewee(wav_file, results) |
|
|
|
|
|
logger.info("Generating analysis report") |
|
|
total_duration = sum( |
|
|
u.get('prosodic_features', {}).get('duration', 0.0) |
|
|
for u in results |
|
|
) |
|
|
speaker_turns = sum(1 for u in results if u.get('text', '').strip()) |
|
|
speakers = list(set(u['speaker_id'] for u in results if u.get('speaker_id', ''))) |
|
|
|
|
|
analysis_data = { |
|
|
'transcript': results, |
|
|
'voice': voice_analysis, |
|
|
'text_analysis': { |
|
|
'total_duration': float(total_duration), |
|
|
'speaker_turns': speaker_turns, |
|
|
'speakers': speakers |
|
|
}, |
|
|
'acceptance_probability': calculate_acceptance_probability({ |
|
|
'voice': voice_analysis, |
|
|
'content': {'duration': total_duration} |
|
|
}) |
|
|
} |
|
|
|
|
|
report_text = generate_report(analysis_data, results) |
|
|
pdf_path = os.path.join(OUTPUT_DIR, f"report_{uuid.uuid4()}.pdf") |
|
|
json_path = os.path.join(OUTPUT_DIR, f"analysis_{uuid.uuid4()}.json") |
|
|
|
|
|
logger.info("Creating PDF analysis report") |
|
|
if create_pdf_document(report_text, analysis_data, pdf_path): |
|
|
with open(json_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(analysis_data, f, indent=2) |
|
|
return { |
|
|
'success': True, |
|
|
'message': 'Interview processed successfully', |
|
|
'pdf_path': str(pdf_path), |
|
|
'analysis_path': str(json_path) |
|
|
} |
|
|
else: |
|
|
raise Exception("Failed to create PDF report") |
|
|
except Exception as e: |
|
|
logger.error(f"Processing failed for: {str(e)}") |
|
|
if 'wav_file' in locals() and os.path.exists(wav_file): |
|
|
os.remove(wav_file) |
|
|
raise |
|
|
|
|
|
|
|
|
|
|
|
|