import os import torch import numpy as np import uuid import requests import time import json import re import logging import io import subprocess from contextlib import contextmanager import tempfile from typing import Dict, List # Core AI & Audio Processing Libraries from pydub import AudioSegment from nemo.collections.asr.models import EncDecSpeakerLabelModel from pinecone import Pinecone, ServerlessSpec import librosa import parselmouth from parselmouth.praat import call from transformers import AutoTokenizer, AutoModel import spacy import google.generativeai as genai from sklearn.metrics.pairwise import cosine_similarity # Reporting & Visualization from reportlab.lib.pagesizes import letter from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, PageBreak from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib import colors from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY from reportlab.lib.units import inch import matplotlib.pyplot as plt import matplotlib matplotlib.use('Agg') # Concurrency from concurrent.futures import ThreadPoolExecutor import joblib # ============================================================================== # 2. CONFIGURATION AND INITIALIZATION # ============================================================================== logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) logging.getLogger("nemo_logging").setLevel(logging.ERROR) logging.getLogger("nemo").setLevel(logging.ERROR) logging.getLogger("transformers").setLevel(logging.ERROR) OUTPUT_DIR = "./static/outputs" JSON_DIR = os.path.join(OUTPUT_DIR, "json") PDF_DIR = os.path.join(OUTPUT_DIR, "pdf") os.makedirs(JSON_DIR, exist_ok=True) os.makedirs(PDF_DIR, exist_ok=True) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") PINECONE_KEY = os.getenv("PINECONE_KEY") ASSEMBLYAI_KEY = os.getenv("ASSEMBLYAI_KEY") GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") if not all([PINECONE_KEY, ASSEMBLYAI_KEY, GEMINI_API_KEY]): raise ValueError("One or more required environment variables are missing.") # Global variables for models and services index, gemini_model, speaker_model, nlp, tokenizer, text_embedding_model = (None,) * 6 def initialize_all_services_and_models(): global index, gemini_model, speaker_model, nlp, tokenizer, text_embedding_model logger.info("Initializing all services and loading all models...") pc = Pinecone(api_key=PINECONE_KEY) index_name = "interview-speaker-embeddings" if index_name not in pc.list_indexes().names(): pc.create_index(name=index_name, dimension=192, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1")) index = pc.Index(index_name) genai.configure(api_key=GEMINI_API_KEY) gemini_model = genai.GenerativeModel('gemini-1.5-flash') speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large", map_location=device).eval() nlp = spacy.load("en_core_web_sm") tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased") text_embedding_model = AutoModel.from_pretrained("distilbert-base-uncased").to(device).eval() logger.info("All services and models are ready.") initialize_all_services_and_models() # ============================================================================== # 3. HELPER AND UTILITY FUNCTIONS # ============================================================================== @contextmanager def temp_audio_file(suffix='.wav'): temp_file_path = None try: fd, temp_file_path = tempfile.mkstemp(suffix=suffix) os.close(fd) yield temp_file_path finally: if temp_file_path and os.path.exists(temp_file_path): os.remove(temp_file_path) def convert_to_wav(input_path: str) -> str: temp_wav_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False).name try: command = ['ffmpeg', '-y', '-i', input_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', temp_wav_file] subprocess.run(command, check=True, capture_output=True, text=True) return temp_wav_file except Exception as e: if os.path.exists(temp_wav_file): os.remove(temp_wav_file) logger.error(f"Audio conversion failed: {e}", exc_info=True) raise def transcribe(audio_path: str) -> Dict: try: headers = {"authorization": ASSEMBLYAI_KEY} with open(audio_path, 'rb') as f: upload_response = requests.post("https://api.assemblyai.com/v2/upload", headers=headers, data=f) upload_response.raise_for_status() audio_url = upload_response.json()['upload_url'] transcript_response = requests.post("https://api.assemblyai.com/v2/transcript", headers=headers, json={"audio_url": audio_url, "speaker_labels": True, "filter_profanity": True}) transcript_response.raise_for_status() transcript_id = transcript_response.json()['id'] logger.info(f"Transcription submitted. Polling for results (ID: {transcript_id})...") while True: result = requests.get(f"https://api.assemblyai.com/v2/transcript/{transcript_id}", headers=headers).json() if result['status'] == 'completed': return result if result['status'] == 'error': raise Exception(f"Transcription failed: {result['error']}") time.sleep(5) except Exception as e: logger.error(f"Transcription failed: {e}", exc_info=True) raise def identify_speakers(transcript: Dict, wav_file_path: str) -> List[Dict]: try: full_audio = AudioSegment.from_wav(wav_file_path) def process_utterance(utterance): start_ms, end_ms = utterance['start'], utterance['end'] if end_ms - start_ms < 1000: # Skip short utterances return {**utterance, 'speaker_id': 'unknown_short_utterance'} with temp_audio_file() as temp_path: full_audio[start_ms:end_ms].export(temp_path, format="wav") with torch.no_grad(): embedding = speaker_model.get_embedding(temp_path).cpu().numpy().flatten().tolist() query_result = index.query(vector=embedding, top_k=1, include_metadata=True) if query_result.get('matches') and query_result['matches'][0]['score'] > 0.75: match = query_result['matches'][0] return {**utterance, 'speaker_id': match['id'], 'speaker_name': match['metadata'].get('speaker_name', 'Unknown Speaker')} else: speaker_id = f"speaker_{uuid.uuid4().hex[:8]}" vector_count = index.describe_index_stats()['namespaces'].get('default', {}).get('vector_count', 0) speaker_name = f"Speaker {vector_count + 1 if vector_count >= 0 else 1}" index.upsert(vectors=[(speaker_id, embedding, {"speaker_name": speaker_name})]) return {**utterance, 'speaker_id': speaker_id, 'speaker_name': speaker_name} with ThreadPoolExecutor() as executor: return list(executor.map(process_utterance, transcript.get('utterances', []))) except Exception as e: logger.error(f"Speaker identification failed: {e}", exc_info=True) raise def get_text_embedding(text: str) -> np.ndarray: with torch.no_grad(): inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=128, padding=True).to(device) outputs = text_embedding_model(**inputs) return outputs.last_hidden_state[0, 0, :].cpu().numpy() def extract_detailed_prosodic_features(audio_segment: AudioSegment) -> Dict: try: with temp_audio_file() as temp_path: audio_segment.export(temp_path, format="wav") y, sr = librosa.load(temp_path, sr=16000) if len(y) == 0: return {'pitch_std': 0} f0, _, _ = librosa.pyin(y, fmin=80, fmax=400, sr=sr) f0_values = f0[~np.isnan(f0)] return {'pitch_std': float(np.std(f0_values)) if len(f0_values) > 1 else 0} except Exception: return {'pitch_std': 0} def extract_duration_feature(utterances: List[Dict]) -> List[Dict]: for u in utterances: u['prosodic_features'] = {'duration': (u['end'] - u['start']) / 1000.0} return utterances def convert_to_serializable(obj): if isinstance(obj, (np.integer, np.floating)): return obj.item() if isinstance(obj, np.ndarray): return obj.tolist() if isinstance(obj, bytes): import base64 return base64.b64encode(obj).decode('utf-8') if isinstance(obj, dict): return {k: convert_to_serializable(v) for k, v in obj.items()} if isinstance(obj, list): return [convert_to_serializable(item) for item in obj] return obj # ============================================================================== # 4. CORE LOGIC - ULTIMATE ROLE CLASSIFIER # ============================================================================== def classify_roles_ultimate(utterances: List[Dict], audio_path: str) -> List[Dict]: logger.info("Starting ULTIMATE role classification with prosodic analysis...") full_audio = AudioSegment.from_wav(audio_path) speakers = {u['speaker_id'] for u in utterances if 'speaker_id' in u and not u['speaker_id'].startswith('unknown')} if len(speakers) < 2: return utterances speaker_data = {sid: {'rule_score': 0, 'prosodic_score': 0, 'utterance_count': 0, 'embeddings': []} for sid in speakers} interviewer_keywords = r'\b(what|why|how|when|where|who|which|tell me about|can you explain|describe|give me an example)\b' for u in utterances: sid, text = u.get('speaker_id'), u.get('text', '').lower() if sid not in speaker_data or not text or sid.startswith('unknown'): continue rule_score = 10 if text.endswith('?') else 0 rule_score += 5 * len(re.findall(interviewer_keywords, text)) rule_score += 2 if len(text.split()) < 10 else -5 if len(text.split()) > 30 else 0 speaker_data[sid]['rule_score'] += rule_score segment = full_audio[u['start']:u['end']] prosodic_features = extract_detailed_prosodic_features(segment) speaker_data[sid]['prosodic_score'] += -5 if prosodic_features['pitch_std'] > 40 else 2 speaker_data[sid]['embeddings'].append(get_text_embedding(u['text'])) speaker_data[sid]['utterance_count'] += 1 canonical_question_embedding = get_text_embedding("Tell me about your experience and skills.") for sid, data in speaker_data.items(): if not data['embeddings']: data['semantic_score'] = 0 continue avg_embedding = np.mean(data['embeddings'], axis=0).reshape(1, -1) data['semantic_score'] = cosine_similarity(avg_embedding, canonical_question_embedding.reshape(1, -1))[0][0] final_scores = {} for sid, data in speaker_data.items(): if data['utterance_count'] == 0: final_scores[sid] = -999 continue avg_rule_score = data['rule_score'] / data['utterance_count'] avg_prosodic_score = data['prosodic_score'] / data['utterance_count'] final_scores[sid] = (avg_rule_score * 0.5) + (data['semantic_score'] * 0.3) + (avg_prosodic_score * 0.2) sorted_speakers = sorted(final_scores.items(), key=lambda item: item[1], reverse=True) interviewer_id, interviewee_id = sorted_speakers[0][0], sorted_speakers[1][0] logger.info(f"Ultimate Role Classification: Interviewer -> {interviewer_id}, Interviewee -> {interviewee_id}") for u in utterances: u['role'] = 'Interviewer' if u.get('speaker_id') == interviewer_id else 'Interviewee' if u.get('speaker_id') == interviewee_id else 'Unknown' return utterances # ============================================================================== # 5. YOUR CUSTOM ANALYSIS & REPORTING FUNCTIONS # ============================================================================== def analyze_interviewee_voice(audio_path: str, utterances: List[Dict]) -> Dict: logger.info("Performing detailed voice analysis using your custom function...") try: y, sr = librosa.load(audio_path, sr=16000) interviewee_utterances = [u for u in utterances if u.get('role') == 'Interviewee' and not u['speaker_id'].startswith('unknown')] if not interviewee_utterances: return {'error': 'No valid interviewee utterances found'} segments = [y[int(u['start'] * sr / 1000):int(u['end'] * sr / 1000)] for u in interviewee_utterances] if not segments: return {'error': 'No valid interviewee segments to analyze.'} combined_audio = np.concatenate(segments) total_duration = sum(u['prosodic_features']['duration'] for u in interviewee_utterances) total_words = sum(len(u['text'].split()) for u in interviewee_utterances) speaking_rate = total_words / total_duration if total_duration > 0 else 0 filler_words = ['um', 'uh', 'like', 'you know', 'so', 'i mean'] filler_count = sum(sum(u['text'].lower().count(fw) for fw in filler_words) for u in interviewee_utterances) filler_ratio = filler_count / total_words if total_words > 0 else 0 all_words = ' '.join(u['text'].lower() for u in interviewee_utterances).split() word_counts = {tuple(all_words[i:i + 2]): all_words.count(tuple(all_words[i:i + 2])) for i in range(len(all_words) - 1)} repetition_score = sum(1 for count in word_counts.values() if count > 1) / len(word_counts) if word_counts else 0 f0, voiced_flag, _ = librosa.pyin(combined_audio, fmin=80, fmax=300, sr=sr) f0_values = f0[voiced_flag & ~np.isnan(f0)] pitch_mean = np.mean(f0_values) if len(f0_values) > 0 else 0 pitch_std = np.std(f0_values) if len(f0_values) > 0 else 0 jitter = np.mean(np.abs(np.diff(f0_values))) / pitch_mean if len(f0_values) > 1 and pitch_mean > 0 else 0 rms = librosa.feature.rms(y=combined_audio)[0] intensity_mean = np.mean(rms) if len(rms) > 0 else 0 intensity_std = np.std(rms) if len(rms) > 0 else 0 shimmer = np.mean(np.abs(np.diff(rms))) / intensity_mean if len(rms) > 1 and intensity_mean > 0 else 0 anxiety_score = 0.6 * (pitch_std / pitch_mean if pitch_mean > 0 else 0) + 0.4 * (jitter + shimmer) confidence_score = 0.7 * (1 / (1 + intensity_std)) + 0.3 * (1 / (1 + filler_ratio)) hesitation_score = filler_ratio + repetition_score return {'speaking_rate': round(speaking_rate, 2), 'filler_ratio': round(filler_ratio, 4), 'repetition_score': round(repetition_score, 4), 'pitch_analysis': {'mean': float(pitch_mean), 'std_dev': float(pitch_std), 'jitter': float(jitter)}, 'intensity_analysis': {'mean': float(intensity_mean), 'std_dev': float(intensity_std), 'shimmer': float(shimmer)}, 'composite_scores': {'anxiety': float(anxiety_score), 'confidence': float(confidence_score), 'hesitation': float(hesitation_score)}} except Exception as e: logger.error(f"Error in detailed voice analysis: {e}", exc_info=True) return {'error': str(e)} def generate_voice_interpretation(analysis: Dict) -> str: if 'error' in analysis: return "Detailed Vocal Metrics:
Analysis not available." scores = analysis.get('composite_scores', {}) pitch = analysis.get('pitch_analysis', {}) intensity = analysis.get('intensity_analysis', {}) return (f"Detailed Vocal Metrics Interpretation:
" f"- Speaking Rate: {analysis.get('speaking_rate', 0):.2f} words/sec
" f"- Filler Word Ratio: {analysis.get('filler_ratio', 0) * 100:.1f}%
" f"-----------------------------------
" f"- Pitch Mean: {pitch.get('mean', 0):.2f} Hz (Std Dev: {pitch.get('std_dev', 0):.2f})
" f"- Jitter (Vocal Stability): {pitch.get('jitter', 0):.4f}
" f"- Intensity (Loudness) Std Dev: {intensity.get('std_dev', 0):.4f}
" f"-----------------------------------
" f"- Anxiety Score: {scores.get('anxiety', 0):.3f}
" f"- Confidence Score: {scores.get('confidence', 0):.3f}
" f"- Hesitation Score: {scores.get('hesitation', 0):.3f}") def generate_anxiety_confidence_chart(composite_scores: Dict, chart_path_or_buffer): try: labels = ['Anxiety', 'Confidence', 'Hesitation'] scores = [composite_scores.get(k.lower(), 0) for k in labels] fig, ax = plt.subplots(figsize=(6, 4)) ax.bar(labels, scores, color=['#FF6B6B', '#4ECDC4', '#FFA500'], edgecolor='black', width=0.5) ax.set_ylabel('Score') ax.set_title('Candidate Vocal Dynamics') ax.set_ylim(0, max(scores) * 1.2 if scores and max(scores) > 0 else 1) for bar in ax.patches: ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01, f"{bar.get_height():.2f}", ha='center', color='black') plt.tight_layout() plt.savefig(chart_path_or_buffer, format='png', dpi=150) plt.close(fig) except Exception as e: logger.error(f"Error generating chart: {e}") def calculate_acceptance_probability(analysis_data: Dict) -> float: logger.info("Calculating final acceptance probability...") voice_metrics = analysis_data.get('voice_analysis_metrics', {}) if 'error' in voice_metrics or not voice_metrics.get('composite_scores'): return 30.0 scores = voice_metrics['composite_scores'] confidence = scores.get('confidence', 0.5) anxiety = scores.get('anxiety', 0.5) hesitation = scores.get('hesitation', 0.5) raw_score = (confidence * 0.6) + ((1 - anxiety) * 0.2) + ((1 - hesitation) * 0.2) max_score = 0.6 + 0.2 + 0.2 return round(max(10.0, min(99.0, (raw_score / max_score if max_score > 0 else 0) * 100)), 2) # ============================================================================== # 6. AI-POWERED NARRATIVE AND PDF REPORTING # ============================================================================== def generate_gemini_report_text(analysis_data: Dict) -> str: logger.info("Generating AI-powered narrative report with Gemini...") voice = analysis_data.get('voice_analysis_metrics', {}) interviewee_text = "\n".join([f"- {u['text']}" for u in analysis_data['transcript_with_roles'] if u.get('role') == 'Interviewee']) acceptance_prob = analysis_data.get('acceptance_probability', 50.0) def format_value(val): return f"{val:.2f}" if isinstance(val, (int, float)) else val confidence = voice.get('composite_scores', {}).get('confidence', 'N/A') anxiety = voice.get('composite_scores', {}).get('anxiety', 'N/A') speaking_rate = voice.get('speaking_rate', 'N/A') prompt = f""" You are EvalBot, a highly experienced senior HR analyst generating a comprehensive interview evaluation report. Analyze deeply based on actual responses provided below. Avoid generic analysis. Maintain professional, HR-standard language with clear structure and bullet points. **Suitability Score: {format_value(acceptance_prob)}%** ### Interviewee Full Responses: {interviewee_text if interviewee_text else "No responses recorded."} ### Key Metrics: - Confidence Score: {format_value(confidence)} - Anxiety Score: {format_value(anxiety)} - Speaking Rate: {format_value(speaking_rate)} words/sec ### Report Sections to Generate (Follow this structure exactly): **1. Executive Summary:** - 3 bullets summarizing performance, key strengths, and hiring recommendation. **2. Communication and Vocal Dynamics:** - Analyze delivery: speaking rate, filler words, confidence, anxiety. Provide 3-4 insightful bullets and 1 actionable recommendation. **3. Competency and Content:** - Identify 5-8 strengths (e.g., leadership, teamwork) with concrete examples from their responses. - Identify 5-10 weaknesses or development areas with actionable feedback. **4. Role Fit and Potential:** - Analyze role fit, cultural fit, and growth potential in 3 bullets. **5. Recommendations & Next Steps for Hiring Managers:** - Provide 5 actionable recommendations and 5 clear next steps. """ try: response = gemini_model.generate_content(prompt) return response.text except Exception as e: logger.error(f"Gemini report generation failed: {e}") return "Error: Could not generate AI analysis report." def create_pdf_report(analysis_data: Dict, output_path: str): logger.info(f"Generating comprehensive PDF report at {output_path}...") doc = SimpleDocTemplate(output_path, pagesize=letter, topMargin=inch, bottomMargin=inch) styles = getSampleStyleSheet() styles.add(ParagraphStyle(name='H1', fontSize=18, leading=22, spaceAfter=12, textColor=colors.HexColor('#003087'), fontName='Helvetica-Bold', alignment=TA_CENTER)) styles.add(ParagraphStyle(name='H2', fontSize=14, leading=18, spaceBefore=12, spaceAfter=8, textColor=colors.HexColor('#0050BC'), fontName='Helvetica-Bold')) styles.add(ParagraphStyle(name='H3', fontSize=12, leading=16, spaceBefore=10, spaceAfter=6, textColor=colors.HexColor('#333333'), fontName='Helvetica-Bold')) styles.add(ParagraphStyle(name='Body', fontSize=10, leading=14, spaceAfter=6, alignment=TA_JUSTIFY, leftIndent=10)) story = [] try: # Cover Page story.append(Paragraph("Candidate Interview Analysis Report", styles['H1'])) story.append(Spacer(1, 0.2 * inch)) story.append(Paragraph(f"Candidate ID: {analysis_data.get('user_id', 'N/A')}", styles['Body'])) story.append(Paragraph(f"Date of Analysis: {time.strftime('%B %d, %Y')}", styles['Body'])) prob = analysis_data.get('acceptance_probability', 0) prob_color = 'green' if prob >= 75 else 'orange' if prob >= 50 else 'red' story.append(Paragraph(f"Overall Suitability Score: {prob}%", styles['H2'])) story.append(PageBreak()) # Quantitative Analysis Page story.append(Paragraph("Quantitative Vocal Analysis", styles['H2'])) if analysis_data.get('chart_image_bytes'): logger.debug("Adding chart image to PDF") img_buffer = io.BytesIO(analysis_data['chart_image_bytes']) story.append(Image(img_buffer, width=5.5 * inch, height=3.3 * inch)) else: story.append(Paragraph("No chart data available.", styles['Body'])) story.append(Spacer(1, 0.2 * inch)) voice_text = analysis_data.get('voice_interpretation_text', 'Not available.').replace('\n', '
') story.append(Paragraph(voice_text, styles['Body'])) story.append(Spacer(1, 0.2 * inch)) # AI-Generated Narrative Page story.append(Paragraph("Qualitative AI-Powered Report", styles['H2'])) gemini_text = analysis_data.get('gemini_report_text', 'Not available.') for line in gemini_text.split('\n'): line = line.strip() if not line: continue if line.startswith('**') and line.endswith('**'): story.append(Paragraph(line.strip('*'), styles['H3'])) elif line.startswith('- ') or line.startswith('* '): story.append(Paragraph(f"• {line[2:]}", styles['Body'])) else: story.append(Paragraph(line, styles['Body'])) doc.build(story) logger.info("PDF report generated successfully.") except Exception as e: logger.error(f"Error generating PDF: {e}", exc_info=True) raise # ============================================================================== # 7. MAIN PROCESSING PIPELINE # ============================================================================== def process_interview(audio_path: str, user_id: str = "candidate-123") -> Dict: try: logger.info(f"Starting processing for {audio_path} (User ID: {user_id})") wav_file = convert_to_wav(audio_path) logger.debug(f"Created WAV file: {wav_file}") logger.info("Starting transcription") transcript = transcribe(wav_file) if not transcript or 'utterances' not in transcript or not transcript['utterances']: logger.error("Transcription failed or returned empty utterances") raise ValueError("Transcription failed or returned empty utterances") logger.info("Extracting prosodic features") full_audio = AudioSegment.from_wav(wav_file) for utterance in transcript['utterances']: segment = full_audio[utterance['start']:utterance['end']] utterance['prosodic_features'] = extract_detailed_prosodic_features(segment) logger.info("Identifying speakers") utterances_with_speakers = identify_speakers(transcript, wav_file) logger.info("Extracting duration features") utterances_with_duration = extract_duration_feature(utterances_with_speakers) logger.info("Classifying roles") classified_utterances = classify_roles_ultimate(utterances_with_duration, wav_file) logger.info("Analyzing interviewee voice") voice_analysis = analyze_interviewee_voice(wav_file, classified_utterances) logger.info("Generating chart and voice interpretation") voice_interpretation = generate_voice_interpretation(voice_analysis) chart_buffer = io.BytesIO() generate_anxiety_confidence_chart(voice_analysis.get('composite_scores', {}), chart_buffer) chart_buffer.seek(0) analysis_data = { 'user_id': user_id, 'transcript_with_roles': classified_utterances, 'voice_analysis_metrics': voice_analysis, 'speakers': list(set(u['speaker_id'] for u in classified_utterances)), 'text_analysis': { 'total_duration': sum(u['prosodic_features']['duration'] for u in classified_utterances), 'speaker_turns': len(classified_utterances) }, 'acceptance_probability': calculate_acceptance_probability({'voice_analysis_metrics': voice_analysis}), 'voice_interpretation_text': voice_interpretation, 'chart_image_bytes': chart_buffer.getvalue() if chart_buffer.tell() > 0 else None } logger.info("Generating report text using Gemini") gemini_report_text = generate_gemini_report_text(analysis_data) analysis_data['gemini_report_text'] = gemini_report_text base_name = f"{user_id}_{uuid.uuid4().hex}" pdf_path = os.path.join(PDF_DIR, f"{base_name}_report.pdf") create_pdf_report(analysis_data, pdf_path) json_path = os.path.join(JSON_DIR, f"{base_name}_analysis.json") logger.debug(f"JSON path before write: {json_path}") if os.path.exists(json_path): logger.warning(f"JSON file {json_path} already exists, overwriting.") with open(json_path, 'w') as f: logger.debug(f"Writing to JSON file: {json_path}") serializable_data = convert_to_serializable(analysis_data) json.dump(serializable_data, f, indent=2) os.remove(wav_file) logger.info(f"Processing completed for {audio_path} (User ID: {user_id})") return { 'summary': f"User ID: {user_id}\nspeakers: {', '.join(analysis_data['speakers'])}", 'json_path': json_path, 'pdf_path': pdf_path } except Exception as e: logger.error(f"Processing failed: {str(e)}", exc_info=True) if 'wav_file' in locals() and os.path.exists(wav_file): os.remove(wav_file) raise