TheEnd / process_interview.py
norhan12's picture
Update process_interview.py
381f8b7 verified
import os
import torch
import numpy as np
import uuid
import requests
import time
import json
import re
import logging
import io
import subprocess
from contextlib import contextmanager
import tempfile
from typing import Dict, List
# Core AI & Audio Processing Libraries
from pydub import AudioSegment
from nemo.collections.asr.models import EncDecSpeakerLabelModel
from pinecone import Pinecone, ServerlessSpec
import librosa
import parselmouth
from parselmouth.praat import call
from transformers import AutoTokenizer, AutoModel
import spacy
import google.generativeai as genai
from sklearn.metrics.pairwise import cosine_similarity
# Reporting & Visualization
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY
from reportlab.lib.units import inch
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')
# Concurrency
from concurrent.futures import ThreadPoolExecutor
import joblib
# ==============================================================================
# 2. CONFIGURATION AND INITIALIZATION
# ==============================================================================
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logging.getLogger("nemo_logging").setLevel(logging.ERROR)
logging.getLogger("nemo").setLevel(logging.ERROR)
logging.getLogger("transformers").setLevel(logging.ERROR)
OUTPUT_DIR = "./static/outputs"
JSON_DIR = os.path.join(OUTPUT_DIR, "json")
PDF_DIR = os.path.join(OUTPUT_DIR, "pdf")
os.makedirs(JSON_DIR, exist_ok=True)
os.makedirs(PDF_DIR, exist_ok=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PINECONE_KEY = os.getenv("PINECONE_KEY")
ASSEMBLYAI_KEY = os.getenv("ASSEMBLYAI_KEY")
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not all([PINECONE_KEY, ASSEMBLYAI_KEY, GEMINI_API_KEY]):
raise ValueError("One or more required environment variables are missing.")
# Global variables for models and services
index, gemini_model, speaker_model, nlp, tokenizer, text_embedding_model = (None,) * 6
def initialize_all_services_and_models():
global index, gemini_model, speaker_model, nlp, tokenizer, text_embedding_model
logger.info("Initializing all services and loading all models...")
pc = Pinecone(api_key=PINECONE_KEY)
index_name = "interview-speaker-embeddings"
if index_name not in pc.list_indexes().names():
pc.create_index(name=index_name, dimension=192, metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"))
index = pc.Index(index_name)
genai.configure(api_key=GEMINI_API_KEY)
gemini_model = genai.GenerativeModel('gemini-1.5-flash')
speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large",
map_location=device).eval()
nlp = spacy.load("en_core_web_sm")
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
text_embedding_model = AutoModel.from_pretrained("distilbert-base-uncased").to(device).eval()
logger.info("All services and models are ready.")
initialize_all_services_and_models()
# ==============================================================================
# 3. HELPER AND UTILITY FUNCTIONS
# ==============================================================================
@contextmanager
def temp_audio_file(suffix='.wav'):
temp_file_path = None
try:
fd, temp_file_path = tempfile.mkstemp(suffix=suffix)
os.close(fd)
yield temp_file_path
finally:
if temp_file_path and os.path.exists(temp_file_path):
os.remove(temp_file_path)
def convert_to_wav(input_path: str) -> str:
temp_wav_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False).name
try:
command = ['ffmpeg', '-y', '-i', input_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1',
temp_wav_file]
subprocess.run(command, check=True, capture_output=True, text=True)
return temp_wav_file
except Exception as e:
if os.path.exists(temp_wav_file):
os.remove(temp_wav_file)
logger.error(f"Audio conversion failed: {e}", exc_info=True)
raise
def transcribe(audio_path: str) -> Dict:
try:
headers = {"authorization": ASSEMBLYAI_KEY}
with open(audio_path, 'rb') as f:
upload_response = requests.post("https://api.assemblyai.com/v2/upload", headers=headers, data=f)
upload_response.raise_for_status()
audio_url = upload_response.json()['upload_url']
transcript_response = requests.post("https://api.assemblyai.com/v2/transcript", headers=headers,
json={"audio_url": audio_url, "speaker_labels": True,
"filter_profanity": True})
transcript_response.raise_for_status()
transcript_id = transcript_response.json()['id']
logger.info(f"Transcription submitted. Polling for results (ID: {transcript_id})...")
while True:
result = requests.get(f"https://api.assemblyai.com/v2/transcript/{transcript_id}", headers=headers).json()
if result['status'] == 'completed':
return result
if result['status'] == 'error':
raise Exception(f"Transcription failed: {result['error']}")
time.sleep(5)
except Exception as e:
logger.error(f"Transcription failed: {e}", exc_info=True)
raise
def identify_speakers(transcript: Dict, wav_file_path: str) -> List[Dict]:
try:
full_audio = AudioSegment.from_wav(wav_file_path)
def process_utterance(utterance):
start_ms, end_ms = utterance['start'], utterance['end']
if end_ms - start_ms < 1000: # Skip short utterances
return {**utterance, 'speaker_id': 'unknown_short_utterance'}
with temp_audio_file() as temp_path:
full_audio[start_ms:end_ms].export(temp_path, format="wav")
with torch.no_grad():
embedding = speaker_model.get_embedding(temp_path).cpu().numpy().flatten().tolist()
query_result = index.query(vector=embedding, top_k=1, include_metadata=True)
if query_result.get('matches') and query_result['matches'][0]['score'] > 0.75:
match = query_result['matches'][0]
return {**utterance, 'speaker_id': match['id'],
'speaker_name': match['metadata'].get('speaker_name', 'Unknown Speaker')}
else:
speaker_id = f"speaker_{uuid.uuid4().hex[:8]}"
vector_count = index.describe_index_stats()['namespaces'].get('default', {}).get('vector_count', 0)
speaker_name = f"Speaker {vector_count + 1 if vector_count >= 0 else 1}"
index.upsert(vectors=[(speaker_id, embedding, {"speaker_name": speaker_name})])
return {**utterance, 'speaker_id': speaker_id, 'speaker_name': speaker_name}
with ThreadPoolExecutor() as executor:
return list(executor.map(process_utterance, transcript.get('utterances', [])))
except Exception as e:
logger.error(f"Speaker identification failed: {e}", exc_info=True)
raise
def get_text_embedding(text: str) -> np.ndarray:
with torch.no_grad():
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=128, padding=True).to(device)
outputs = text_embedding_model(**inputs)
return outputs.last_hidden_state[0, 0, :].cpu().numpy()
def extract_detailed_prosodic_features(audio_segment: AudioSegment) -> Dict:
try:
with temp_audio_file() as temp_path:
audio_segment.export(temp_path, format="wav")
y, sr = librosa.load(temp_path, sr=16000)
if len(y) == 0:
return {'pitch_std': 0}
f0, _, _ = librosa.pyin(y, fmin=80, fmax=400, sr=sr)
f0_values = f0[~np.isnan(f0)]
return {'pitch_std': float(np.std(f0_values)) if len(f0_values) > 1 else 0}
except Exception:
return {'pitch_std': 0}
def extract_duration_feature(utterances: List[Dict]) -> List[Dict]:
for u in utterances:
u['prosodic_features'] = {'duration': (u['end'] - u['start']) / 1000.0}
return utterances
def convert_to_serializable(obj):
if isinstance(obj, (np.integer, np.floating)):
return obj.item()
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, bytes):
import base64
return base64.b64encode(obj).decode('utf-8')
if isinstance(obj, dict):
return {k: convert_to_serializable(v) for k, v in obj.items()}
if isinstance(obj, list):
return [convert_to_serializable(item) for item in obj]
return obj
# ==============================================================================
# 4. CORE LOGIC - ULTIMATE ROLE CLASSIFIER
# ==============================================================================
def classify_roles_ultimate(utterances: List[Dict], audio_path: str) -> List[Dict]:
logger.info("Starting ULTIMATE role classification with prosodic analysis...")
full_audio = AudioSegment.from_wav(audio_path)
speakers = {u['speaker_id'] for u in utterances if 'speaker_id' in u and not u['speaker_id'].startswith('unknown')}
if len(speakers) < 2:
return utterances
speaker_data = {sid: {'rule_score': 0, 'prosodic_score': 0, 'utterance_count': 0, 'embeddings': []} for sid in speakers}
interviewer_keywords = r'\b(what|why|how|when|where|who|which|tell me about|can you explain|describe|give me an example)\b'
for u in utterances:
sid, text = u.get('speaker_id'), u.get('text', '').lower()
if sid not in speaker_data or not text or sid.startswith('unknown'):
continue
rule_score = 10 if text.endswith('?') else 0
rule_score += 5 * len(re.findall(interviewer_keywords, text))
rule_score += 2 if len(text.split()) < 10 else -5 if len(text.split()) > 30 else 0
speaker_data[sid]['rule_score'] += rule_score
segment = full_audio[u['start']:u['end']]
prosodic_features = extract_detailed_prosodic_features(segment)
speaker_data[sid]['prosodic_score'] += -5 if prosodic_features['pitch_std'] > 40 else 2
speaker_data[sid]['embeddings'].append(get_text_embedding(u['text']))
speaker_data[sid]['utterance_count'] += 1
canonical_question_embedding = get_text_embedding("Tell me about your experience and skills.")
for sid, data in speaker_data.items():
if not data['embeddings']:
data['semantic_score'] = 0
continue
avg_embedding = np.mean(data['embeddings'], axis=0).reshape(1, -1)
data['semantic_score'] = cosine_similarity(avg_embedding, canonical_question_embedding.reshape(1, -1))[0][0]
final_scores = {}
for sid, data in speaker_data.items():
if data['utterance_count'] == 0:
final_scores[sid] = -999
continue
avg_rule_score = data['rule_score'] / data['utterance_count']
avg_prosodic_score = data['prosodic_score'] / data['utterance_count']
final_scores[sid] = (avg_rule_score * 0.5) + (data['semantic_score'] * 0.3) + (avg_prosodic_score * 0.2)
sorted_speakers = sorted(final_scores.items(), key=lambda item: item[1], reverse=True)
interviewer_id, interviewee_id = sorted_speakers[0][0], sorted_speakers[1][0]
logger.info(f"Ultimate Role Classification: Interviewer -> {interviewer_id}, Interviewee -> {interviewee_id}")
for u in utterances:
u['role'] = 'Interviewer' if u.get('speaker_id') == interviewer_id else 'Interviewee' if u.get('speaker_id') == interviewee_id else 'Unknown'
return utterances
# ==============================================================================
# 5. YOUR CUSTOM ANALYSIS & REPORTING FUNCTIONS
# ==============================================================================
def analyze_interviewee_voice(audio_path: str, utterances: List[Dict]) -> Dict:
logger.info("Performing detailed voice analysis using your custom function...")
try:
y, sr = librosa.load(audio_path, sr=16000)
interviewee_utterances = [u for u in utterances if u.get('role') == 'Interviewee' and not u['speaker_id'].startswith('unknown')]
if not interviewee_utterances:
return {'error': 'No valid interviewee utterances found'}
segments = [y[int(u['start'] * sr / 1000):int(u['end'] * sr / 1000)] for u in interviewee_utterances]
if not segments:
return {'error': 'No valid interviewee segments to analyze.'}
combined_audio = np.concatenate(segments)
total_duration = sum(u['prosodic_features']['duration'] for u in interviewee_utterances)
total_words = sum(len(u['text'].split()) for u in interviewee_utterances)
speaking_rate = total_words / total_duration if total_duration > 0 else 0
filler_words = ['um', 'uh', 'like', 'you know', 'so', 'i mean']
filler_count = sum(sum(u['text'].lower().count(fw) for fw in filler_words) for u in interviewee_utterances)
filler_ratio = filler_count / total_words if total_words > 0 else 0
all_words = ' '.join(u['text'].lower() for u in interviewee_utterances).split()
word_counts = {tuple(all_words[i:i + 2]): all_words.count(tuple(all_words[i:i + 2])) for i in range(len(all_words) - 1)}
repetition_score = sum(1 for count in word_counts.values() if count > 1) / len(word_counts) if word_counts else 0
f0, voiced_flag, _ = librosa.pyin(combined_audio, fmin=80, fmax=300, sr=sr)
f0_values = f0[voiced_flag & ~np.isnan(f0)]
pitch_mean = np.mean(f0_values) if len(f0_values) > 0 else 0
pitch_std = np.std(f0_values) if len(f0_values) > 0 else 0
jitter = np.mean(np.abs(np.diff(f0_values))) / pitch_mean if len(f0_values) > 1 and pitch_mean > 0 else 0
rms = librosa.feature.rms(y=combined_audio)[0]
intensity_mean = np.mean(rms) if len(rms) > 0 else 0
intensity_std = np.std(rms) if len(rms) > 0 else 0
shimmer = np.mean(np.abs(np.diff(rms))) / intensity_mean if len(rms) > 1 and intensity_mean > 0 else 0
anxiety_score = 0.6 * (pitch_std / pitch_mean if pitch_mean > 0 else 0) + 0.4 * (jitter + shimmer)
confidence_score = 0.7 * (1 / (1 + intensity_std)) + 0.3 * (1 / (1 + filler_ratio))
hesitation_score = filler_ratio + repetition_score
return {'speaking_rate': round(speaking_rate, 2), 'filler_ratio': round(filler_ratio, 4),
'repetition_score': round(repetition_score, 4),
'pitch_analysis': {'mean': float(pitch_mean), 'std_dev': float(pitch_std), 'jitter': float(jitter)},
'intensity_analysis': {'mean': float(intensity_mean), 'std_dev': float(intensity_std),
'shimmer': float(shimmer)},
'composite_scores': {'anxiety': float(anxiety_score), 'confidence': float(confidence_score),
'hesitation': float(hesitation_score)}}
except Exception as e:
logger.error(f"Error in detailed voice analysis: {e}", exc_info=True)
return {'error': str(e)}
def generate_voice_interpretation(analysis: Dict) -> str:
if 'error' in analysis:
return "<b>Detailed Vocal Metrics:</b><br/>Analysis not available."
scores = analysis.get('composite_scores', {})
pitch = analysis.get('pitch_analysis', {})
intensity = analysis.get('intensity_analysis', {})
return (f"<b>Detailed Vocal Metrics Interpretation:</b><br/>"
f"- Speaking Rate: {analysis.get('speaking_rate', 0):.2f} words/sec<br/>"
f"- Filler Word Ratio: {analysis.get('filler_ratio', 0) * 100:.1f}%<br/>"
f"-----------------------------------<br/>"
f"- Pitch Mean: {pitch.get('mean', 0):.2f} Hz (Std Dev: {pitch.get('std_dev', 0):.2f})<br/>"
f"- Jitter (Vocal Stability): {pitch.get('jitter', 0):.4f}<br/>"
f"- Intensity (Loudness) Std Dev: {intensity.get('std_dev', 0):.4f}<br/>"
f"-----------------------------------<br/>"
f"- <b>Anxiety Score:</b> {scores.get('anxiety', 0):.3f}<br/>"
f"- <b>Confidence Score:</b> {scores.get('confidence', 0):.3f}<br/>"
f"- <b>Hesitation Score:</b> {scores.get('hesitation', 0):.3f}")
def generate_anxiety_confidence_chart(composite_scores: Dict, chart_path_or_buffer):
try:
labels = ['Anxiety', 'Confidence', 'Hesitation']
scores = [composite_scores.get(k.lower(), 0) for k in labels]
fig, ax = plt.subplots(figsize=(6, 4))
ax.bar(labels, scores, color=['#FF6B6B', '#4ECDC4', '#FFA500'], edgecolor='black', width=0.5)
ax.set_ylabel('Score')
ax.set_title('Candidate Vocal Dynamics')
ax.set_ylim(0, max(scores) * 1.2 if scores and max(scores) > 0 else 1)
for bar in ax.patches:
ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01,
f"{bar.get_height():.2f}", ha='center', color='black')
plt.tight_layout()
plt.savefig(chart_path_or_buffer, format='png', dpi=150)
plt.close(fig)
except Exception as e:
logger.error(f"Error generating chart: {e}")
def calculate_acceptance_probability(analysis_data: Dict) -> float:
logger.info("Calculating final acceptance probability...")
voice_metrics = analysis_data.get('voice_analysis_metrics', {})
if 'error' in voice_metrics or not voice_metrics.get('composite_scores'):
return 30.0
scores = voice_metrics['composite_scores']
confidence = scores.get('confidence', 0.5)
anxiety = scores.get('anxiety', 0.5)
hesitation = scores.get('hesitation', 0.5)
raw_score = (confidence * 0.6) + ((1 - anxiety) * 0.2) + ((1 - hesitation) * 0.2)
max_score = 0.6 + 0.2 + 0.2
return round(max(10.0, min(99.0, (raw_score / max_score if max_score > 0 else 0) * 100)), 2)
# ==============================================================================
# 6. AI-POWERED NARRATIVE AND PDF REPORTING
# ==============================================================================
def generate_gemini_report_text(analysis_data: Dict) -> str:
logger.info("Generating AI-powered narrative report with Gemini...")
voice = analysis_data.get('voice_analysis_metrics', {})
interviewee_text = "\n".join([f"- {u['text']}" for u in analysis_data['transcript_with_roles'] if u.get('role') == 'Interviewee'])
acceptance_prob = analysis_data.get('acceptance_probability', 50.0)
def format_value(val):
return f"{val:.2f}" if isinstance(val, (int, float)) else val
confidence = voice.get('composite_scores', {}).get('confidence', 'N/A')
anxiety = voice.get('composite_scores', {}).get('anxiety', 'N/A')
speaking_rate = voice.get('speaking_rate', 'N/A')
prompt = f"""
You are EvalBot, a highly experienced senior HR analyst generating a comprehensive interview evaluation report.
Analyze deeply based on actual responses provided below. Avoid generic analysis.
Maintain professional, HR-standard language with clear structure and bullet points.
**Suitability Score: {format_value(acceptance_prob)}%**
### Interviewee Full Responses:
{interviewee_text if interviewee_text else "No responses recorded."}
### Key Metrics:
- Confidence Score: {format_value(confidence)}
- Anxiety Score: {format_value(anxiety)}
- Speaking Rate: {format_value(speaking_rate)} words/sec
### Report Sections to Generate (Follow this structure exactly):
**1. Executive Summary:**
- 3 bullets summarizing performance, key strengths, and hiring recommendation.
**2. Communication and Vocal Dynamics:**
- Analyze delivery: speaking rate, filler words, confidence, anxiety. Provide 3-4 insightful bullets and 1 actionable recommendation.
**3. Competency and Content:**
- Identify 5-8 strengths (e.g., leadership, teamwork) with concrete examples from their responses.
- Identify 5-10 weaknesses or development areas with actionable feedback.
**4. Role Fit and Potential:**
- Analyze role fit, cultural fit, and growth potential in 3 bullets.
**5. Recommendations & Next Steps for Hiring Managers:**
- Provide 5 actionable recommendations and 5 clear next steps.
"""
try:
response = gemini_model.generate_content(prompt)
return response.text
except Exception as e:
logger.error(f"Gemini report generation failed: {e}")
return "Error: Could not generate AI analysis report."
def create_pdf_report(analysis_data: Dict, output_path: str):
logger.info(f"Generating comprehensive PDF report at {output_path}...")
doc = SimpleDocTemplate(output_path, pagesize=letter, topMargin=inch, bottomMargin=inch)
styles = getSampleStyleSheet()
styles.add(ParagraphStyle(name='H1', fontSize=18, leading=22, spaceAfter=12, textColor=colors.HexColor('#003087'),
fontName='Helvetica-Bold', alignment=TA_CENTER))
styles.add(ParagraphStyle(name='H2', fontSize=14, leading=18, spaceBefore=12, spaceAfter=8,
textColor=colors.HexColor('#0050BC'), fontName='Helvetica-Bold'))
styles.add(ParagraphStyle(name='H3', fontSize=12, leading=16, spaceBefore=10, spaceAfter=6,
textColor=colors.HexColor('#333333'), fontName='Helvetica-Bold'))
styles.add(ParagraphStyle(name='Body', fontSize=10, leading=14, spaceAfter=6, alignment=TA_JUSTIFY,
leftIndent=10))
story = []
try:
# Cover Page
story.append(Paragraph("Candidate Interview Analysis Report", styles['H1']))
story.append(Spacer(1, 0.2 * inch))
story.append(Paragraph(f"Candidate ID: {analysis_data.get('user_id', 'N/A')}", styles['Body']))
story.append(Paragraph(f"Date of Analysis: {time.strftime('%B %d, %Y')}", styles['Body']))
prob = analysis_data.get('acceptance_probability', 0)
prob_color = 'green' if prob >= 75 else 'orange' if prob >= 50 else 'red'
story.append(Paragraph(f"<b>Overall Suitability Score:</b> <font size=16 color='{prob_color}'>{prob}%</font>", styles['H2']))
story.append(PageBreak())
# Quantitative Analysis Page
story.append(Paragraph("Quantitative Vocal Analysis", styles['H2']))
if analysis_data.get('chart_image_bytes'):
logger.debug("Adding chart image to PDF")
img_buffer = io.BytesIO(analysis_data['chart_image_bytes'])
story.append(Image(img_buffer, width=5.5 * inch, height=3.3 * inch))
else:
story.append(Paragraph("No chart data available.", styles['Body']))
story.append(Spacer(1, 0.2 * inch))
voice_text = analysis_data.get('voice_interpretation_text', 'Not available.').replace('\n', '<br/>')
story.append(Paragraph(voice_text, styles['Body']))
story.append(Spacer(1, 0.2 * inch))
# AI-Generated Narrative Page
story.append(Paragraph("Qualitative AI-Powered Report", styles['H2']))
gemini_text = analysis_data.get('gemini_report_text', 'Not available.')
for line in gemini_text.split('\n'):
line = line.strip()
if not line:
continue
if line.startswith('**') and line.endswith('**'):
story.append(Paragraph(line.strip('*'), styles['H3']))
elif line.startswith('- ') or line.startswith('* '):
story.append(Paragraph(f"• {line[2:]}", styles['Body']))
else:
story.append(Paragraph(line, styles['Body']))
doc.build(story)
logger.info("PDF report generated successfully.")
except Exception as e:
logger.error(f"Error generating PDF: {e}", exc_info=True)
raise
# ==============================================================================
# 7. MAIN PROCESSING PIPELINE
# ==============================================================================
def process_interview(audio_path: str, user_id: str = "candidate-123") -> Dict:
try:
logger.info(f"Starting processing for {audio_path} (User ID: {user_id})")
wav_file = convert_to_wav(audio_path)
logger.debug(f"Created WAV file: {wav_file}")
logger.info("Starting transcription")
transcript = transcribe(wav_file)
if not transcript or 'utterances' not in transcript or not transcript['utterances']:
logger.error("Transcription failed or returned empty utterances")
raise ValueError("Transcription failed or returned empty utterances")
logger.info("Extracting prosodic features")
full_audio = AudioSegment.from_wav(wav_file)
for utterance in transcript['utterances']:
segment = full_audio[utterance['start']:utterance['end']]
utterance['prosodic_features'] = extract_detailed_prosodic_features(segment)
logger.info("Identifying speakers")
utterances_with_speakers = identify_speakers(transcript, wav_file)
logger.info("Extracting duration features")
utterances_with_duration = extract_duration_feature(utterances_with_speakers)
logger.info("Classifying roles")
classified_utterances = classify_roles_ultimate(utterances_with_duration, wav_file)
logger.info("Analyzing interviewee voice")
voice_analysis = analyze_interviewee_voice(wav_file, classified_utterances)
logger.info("Generating chart and voice interpretation")
voice_interpretation = generate_voice_interpretation(voice_analysis)
chart_buffer = io.BytesIO()
generate_anxiety_confidence_chart(voice_analysis.get('composite_scores', {}), chart_buffer)
chart_buffer.seek(0)
analysis_data = {
'user_id': user_id,
'transcript_with_roles': classified_utterances,
'voice_analysis_metrics': voice_analysis,
'speakers': list(set(u['speaker_id'] for u in classified_utterances)),
'text_analysis': {
'total_duration': sum(u['prosodic_features']['duration'] for u in classified_utterances),
'speaker_turns': len(classified_utterances)
},
'acceptance_probability': calculate_acceptance_probability({'voice_analysis_metrics': voice_analysis}),
'voice_interpretation_text': voice_interpretation,
'chart_image_bytes': chart_buffer.getvalue() if chart_buffer.tell() > 0 else None
}
logger.info("Generating report text using Gemini")
gemini_report_text = generate_gemini_report_text(analysis_data)
analysis_data['gemini_report_text'] = gemini_report_text
base_name = f"{user_id}_{uuid.uuid4().hex}"
pdf_path = os.path.join(PDF_DIR, f"{base_name}_report.pdf")
create_pdf_report(analysis_data, pdf_path)
json_path = os.path.join(JSON_DIR, f"{base_name}_analysis.json")
logger.debug(f"JSON path before write: {json_path}")
if os.path.exists(json_path):
logger.warning(f"JSON file {json_path} already exists, overwriting.")
with open(json_path, 'w') as f:
logger.debug(f"Writing to JSON file: {json_path}")
serializable_data = convert_to_serializable(analysis_data)
json.dump(serializable_data, f, indent=2)
os.remove(wav_file)
logger.info(f"Processing completed for {audio_path} (User ID: {user_id})")
return {
'summary': f"User ID: {user_id}\nspeakers: {', '.join(analysis_data['speakers'])}",
'json_path': json_path,
'pdf_path': pdf_path
}
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
if 'wav_file' in locals() and os.path.exists(wav_file):
os.remove(wav_file)
raise