|
|
import os |
|
|
import torch |
|
|
import numpy as np |
|
|
import uuid |
|
|
import requests |
|
|
import time |
|
|
import json |
|
|
import re |
|
|
import logging |
|
|
import io |
|
|
import subprocess |
|
|
from contextlib import contextmanager |
|
|
import tempfile |
|
|
from typing import Dict, List |
|
|
|
|
|
|
|
|
from pydub import AudioSegment |
|
|
from nemo.collections.asr.models import EncDecSpeakerLabelModel |
|
|
from pinecone import Pinecone, ServerlessSpec |
|
|
import librosa |
|
|
import parselmouth |
|
|
from parselmouth.praat import call |
|
|
from transformers import AutoTokenizer, AutoModel |
|
|
import spacy |
|
|
import google.generativeai as genai |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
|
|
|
|
|
from reportlab.lib.pagesizes import letter |
|
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image, PageBreak |
|
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
|
|
from reportlab.lib import colors |
|
|
from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY |
|
|
from reportlab.lib.units import inch |
|
|
import matplotlib.pyplot as plt |
|
|
import matplotlib |
|
|
|
|
|
matplotlib.use('Agg') |
|
|
|
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
import joblib |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
logging.getLogger("nemo_logging").setLevel(logging.ERROR) |
|
|
logging.getLogger("nemo").setLevel(logging.ERROR) |
|
|
logging.getLogger("transformers").setLevel(logging.ERROR) |
|
|
|
|
|
OUTPUT_DIR = "./static/outputs" |
|
|
JSON_DIR = os.path.join(OUTPUT_DIR, "json") |
|
|
PDF_DIR = os.path.join(OUTPUT_DIR, "pdf") |
|
|
os.makedirs(JSON_DIR, exist_ok=True) |
|
|
os.makedirs(PDF_DIR, exist_ok=True) |
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
PINECONE_KEY = os.getenv("PINECONE_KEY") |
|
|
ASSEMBLYAI_KEY = os.getenv("ASSEMBLYAI_KEY") |
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
|
if not all([PINECONE_KEY, ASSEMBLYAI_KEY, GEMINI_API_KEY]): |
|
|
raise ValueError("One or more required environment variables are missing.") |
|
|
|
|
|
|
|
|
index, gemini_model, speaker_model, nlp, tokenizer, text_embedding_model = (None,) * 6 |
|
|
|
|
|
def initialize_all_services_and_models(): |
|
|
global index, gemini_model, speaker_model, nlp, tokenizer, text_embedding_model |
|
|
logger.info("Initializing all services and loading all models...") |
|
|
pc = Pinecone(api_key=PINECONE_KEY) |
|
|
index_name = "interview-speaker-embeddings" |
|
|
if index_name not in pc.list_indexes().names(): |
|
|
pc.create_index(name=index_name, dimension=192, metric="cosine", |
|
|
spec=ServerlessSpec(cloud="aws", region="us-east-1")) |
|
|
index = pc.Index(index_name) |
|
|
genai.configure(api_key=GEMINI_API_KEY) |
|
|
gemini_model = genai.GenerativeModel('gemini-1.5-flash') |
|
|
speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large", |
|
|
map_location=device).eval() |
|
|
nlp = spacy.load("en_core_web_sm") |
|
|
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased") |
|
|
text_embedding_model = AutoModel.from_pretrained("distilbert-base-uncased").to(device).eval() |
|
|
logger.info("All services and models are ready.") |
|
|
|
|
|
initialize_all_services_and_models() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@contextmanager |
|
|
def temp_audio_file(suffix='.wav'): |
|
|
temp_file_path = None |
|
|
try: |
|
|
fd, temp_file_path = tempfile.mkstemp(suffix=suffix) |
|
|
os.close(fd) |
|
|
yield temp_file_path |
|
|
finally: |
|
|
if temp_file_path and os.path.exists(temp_file_path): |
|
|
os.remove(temp_file_path) |
|
|
|
|
|
def convert_to_wav(input_path: str) -> str: |
|
|
temp_wav_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False).name |
|
|
try: |
|
|
command = ['ffmpeg', '-y', '-i', input_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', |
|
|
temp_wav_file] |
|
|
subprocess.run(command, check=True, capture_output=True, text=True) |
|
|
return temp_wav_file |
|
|
except Exception as e: |
|
|
if os.path.exists(temp_wav_file): |
|
|
os.remove(temp_wav_file) |
|
|
logger.error(f"Audio conversion failed: {e}", exc_info=True) |
|
|
raise |
|
|
|
|
|
def transcribe(audio_path: str) -> Dict: |
|
|
try: |
|
|
headers = {"authorization": ASSEMBLYAI_KEY} |
|
|
with open(audio_path, 'rb') as f: |
|
|
upload_response = requests.post("https://api.assemblyai.com/v2/upload", headers=headers, data=f) |
|
|
upload_response.raise_for_status() |
|
|
audio_url = upload_response.json()['upload_url'] |
|
|
transcript_response = requests.post("https://api.assemblyai.com/v2/transcript", headers=headers, |
|
|
json={"audio_url": audio_url, "speaker_labels": True, |
|
|
"filter_profanity": True}) |
|
|
transcript_response.raise_for_status() |
|
|
transcript_id = transcript_response.json()['id'] |
|
|
logger.info(f"Transcription submitted. Polling for results (ID: {transcript_id})...") |
|
|
while True: |
|
|
result = requests.get(f"https://api.assemblyai.com/v2/transcript/{transcript_id}", headers=headers).json() |
|
|
if result['status'] == 'completed': |
|
|
return result |
|
|
if result['status'] == 'error': |
|
|
raise Exception(f"Transcription failed: {result['error']}") |
|
|
time.sleep(5) |
|
|
except Exception as e: |
|
|
logger.error(f"Transcription failed: {e}", exc_info=True) |
|
|
raise |
|
|
|
|
|
def identify_speakers(transcript: Dict, wav_file_path: str) -> List[Dict]: |
|
|
try: |
|
|
full_audio = AudioSegment.from_wav(wav_file_path) |
|
|
|
|
|
def process_utterance(utterance): |
|
|
start_ms, end_ms = utterance['start'], utterance['end'] |
|
|
if end_ms - start_ms < 1000: |
|
|
return {**utterance, 'speaker_id': 'unknown_short_utterance'} |
|
|
with temp_audio_file() as temp_path: |
|
|
full_audio[start_ms:end_ms].export(temp_path, format="wav") |
|
|
with torch.no_grad(): |
|
|
embedding = speaker_model.get_embedding(temp_path).cpu().numpy().flatten().tolist() |
|
|
query_result = index.query(vector=embedding, top_k=1, include_metadata=True) |
|
|
if query_result.get('matches') and query_result['matches'][0]['score'] > 0.75: |
|
|
match = query_result['matches'][0] |
|
|
return {**utterance, 'speaker_id': match['id'], |
|
|
'speaker_name': match['metadata'].get('speaker_name', 'Unknown Speaker')} |
|
|
else: |
|
|
speaker_id = f"speaker_{uuid.uuid4().hex[:8]}" |
|
|
vector_count = index.describe_index_stats()['namespaces'].get('default', {}).get('vector_count', 0) |
|
|
speaker_name = f"Speaker {vector_count + 1 if vector_count >= 0 else 1}" |
|
|
index.upsert(vectors=[(speaker_id, embedding, {"speaker_name": speaker_name})]) |
|
|
return {**utterance, 'speaker_id': speaker_id, 'speaker_name': speaker_name} |
|
|
|
|
|
with ThreadPoolExecutor() as executor: |
|
|
return list(executor.map(process_utterance, transcript.get('utterances', []))) |
|
|
except Exception as e: |
|
|
logger.error(f"Speaker identification failed: {e}", exc_info=True) |
|
|
raise |
|
|
|
|
|
def get_text_embedding(text: str) -> np.ndarray: |
|
|
with torch.no_grad(): |
|
|
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=128, padding=True).to(device) |
|
|
outputs = text_embedding_model(**inputs) |
|
|
return outputs.last_hidden_state[0, 0, :].cpu().numpy() |
|
|
|
|
|
def extract_detailed_prosodic_features(audio_segment: AudioSegment) -> Dict: |
|
|
try: |
|
|
with temp_audio_file() as temp_path: |
|
|
audio_segment.export(temp_path, format="wav") |
|
|
y, sr = librosa.load(temp_path, sr=16000) |
|
|
if len(y) == 0: |
|
|
return {'pitch_std': 0} |
|
|
f0, _, _ = librosa.pyin(y, fmin=80, fmax=400, sr=sr) |
|
|
f0_values = f0[~np.isnan(f0)] |
|
|
return {'pitch_std': float(np.std(f0_values)) if len(f0_values) > 1 else 0} |
|
|
except Exception: |
|
|
return {'pitch_std': 0} |
|
|
|
|
|
def extract_duration_feature(utterances: List[Dict]) -> List[Dict]: |
|
|
for u in utterances: |
|
|
u['prosodic_features'] = {'duration': (u['end'] - u['start']) / 1000.0} |
|
|
return utterances |
|
|
|
|
|
def convert_to_serializable(obj): |
|
|
if isinstance(obj, (np.integer, np.floating)): |
|
|
return obj.item() |
|
|
if isinstance(obj, np.ndarray): |
|
|
return obj.tolist() |
|
|
if isinstance(obj, bytes): |
|
|
import base64 |
|
|
return base64.b64encode(obj).decode('utf-8') |
|
|
if isinstance(obj, dict): |
|
|
return {k: convert_to_serializable(v) for k, v in obj.items()} |
|
|
if isinstance(obj, list): |
|
|
return [convert_to_serializable(item) for item in obj] |
|
|
return obj |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def classify_roles_ultimate(utterances: List[Dict], audio_path: str) -> List[Dict]: |
|
|
logger.info("Starting ULTIMATE role classification with prosodic analysis...") |
|
|
full_audio = AudioSegment.from_wav(audio_path) |
|
|
speakers = {u['speaker_id'] for u in utterances if 'speaker_id' in u and not u['speaker_id'].startswith('unknown')} |
|
|
if len(speakers) < 2: |
|
|
return utterances |
|
|
speaker_data = {sid: {'rule_score': 0, 'prosodic_score': 0, 'utterance_count': 0, 'embeddings': []} for sid in speakers} |
|
|
interviewer_keywords = r'\b(what|why|how|when|where|who|which|tell me about|can you explain|describe|give me an example)\b' |
|
|
for u in utterances: |
|
|
sid, text = u.get('speaker_id'), u.get('text', '').lower() |
|
|
if sid not in speaker_data or not text or sid.startswith('unknown'): |
|
|
continue |
|
|
rule_score = 10 if text.endswith('?') else 0 |
|
|
rule_score += 5 * len(re.findall(interviewer_keywords, text)) |
|
|
rule_score += 2 if len(text.split()) < 10 else -5 if len(text.split()) > 30 else 0 |
|
|
speaker_data[sid]['rule_score'] += rule_score |
|
|
segment = full_audio[u['start']:u['end']] |
|
|
prosodic_features = extract_detailed_prosodic_features(segment) |
|
|
speaker_data[sid]['prosodic_score'] += -5 if prosodic_features['pitch_std'] > 40 else 2 |
|
|
speaker_data[sid]['embeddings'].append(get_text_embedding(u['text'])) |
|
|
speaker_data[sid]['utterance_count'] += 1 |
|
|
canonical_question_embedding = get_text_embedding("Tell me about your experience and skills.") |
|
|
for sid, data in speaker_data.items(): |
|
|
if not data['embeddings']: |
|
|
data['semantic_score'] = 0 |
|
|
continue |
|
|
avg_embedding = np.mean(data['embeddings'], axis=0).reshape(1, -1) |
|
|
data['semantic_score'] = cosine_similarity(avg_embedding, canonical_question_embedding.reshape(1, -1))[0][0] |
|
|
final_scores = {} |
|
|
for sid, data in speaker_data.items(): |
|
|
if data['utterance_count'] == 0: |
|
|
final_scores[sid] = -999 |
|
|
continue |
|
|
avg_rule_score = data['rule_score'] / data['utterance_count'] |
|
|
avg_prosodic_score = data['prosodic_score'] / data['utterance_count'] |
|
|
final_scores[sid] = (avg_rule_score * 0.5) + (data['semantic_score'] * 0.3) + (avg_prosodic_score * 0.2) |
|
|
sorted_speakers = sorted(final_scores.items(), key=lambda item: item[1], reverse=True) |
|
|
interviewer_id, interviewee_id = sorted_speakers[0][0], sorted_speakers[1][0] |
|
|
logger.info(f"Ultimate Role Classification: Interviewer -> {interviewer_id}, Interviewee -> {interviewee_id}") |
|
|
for u in utterances: |
|
|
u['role'] = 'Interviewer' if u.get('speaker_id') == interviewer_id else 'Interviewee' if u.get('speaker_id') == interviewee_id else 'Unknown' |
|
|
return utterances |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_interviewee_voice(audio_path: str, utterances: List[Dict]) -> Dict: |
|
|
logger.info("Performing detailed voice analysis using your custom function...") |
|
|
try: |
|
|
y, sr = librosa.load(audio_path, sr=16000) |
|
|
interviewee_utterances = [u for u in utterances if u.get('role') == 'Interviewee' and not u['speaker_id'].startswith('unknown')] |
|
|
if not interviewee_utterances: |
|
|
return {'error': 'No valid interviewee utterances found'} |
|
|
segments = [y[int(u['start'] * sr / 1000):int(u['end'] * sr / 1000)] for u in interviewee_utterances] |
|
|
if not segments: |
|
|
return {'error': 'No valid interviewee segments to analyze.'} |
|
|
combined_audio = np.concatenate(segments) |
|
|
total_duration = sum(u['prosodic_features']['duration'] for u in interviewee_utterances) |
|
|
total_words = sum(len(u['text'].split()) for u in interviewee_utterances) |
|
|
speaking_rate = total_words / total_duration if total_duration > 0 else 0 |
|
|
filler_words = ['um', 'uh', 'like', 'you know', 'so', 'i mean'] |
|
|
filler_count = sum(sum(u['text'].lower().count(fw) for fw in filler_words) for u in interviewee_utterances) |
|
|
filler_ratio = filler_count / total_words if total_words > 0 else 0 |
|
|
all_words = ' '.join(u['text'].lower() for u in interviewee_utterances).split() |
|
|
word_counts = {tuple(all_words[i:i + 2]): all_words.count(tuple(all_words[i:i + 2])) for i in range(len(all_words) - 1)} |
|
|
repetition_score = sum(1 for count in word_counts.values() if count > 1) / len(word_counts) if word_counts else 0 |
|
|
f0, voiced_flag, _ = librosa.pyin(combined_audio, fmin=80, fmax=300, sr=sr) |
|
|
f0_values = f0[voiced_flag & ~np.isnan(f0)] |
|
|
pitch_mean = np.mean(f0_values) if len(f0_values) > 0 else 0 |
|
|
pitch_std = np.std(f0_values) if len(f0_values) > 0 else 0 |
|
|
jitter = np.mean(np.abs(np.diff(f0_values))) / pitch_mean if len(f0_values) > 1 and pitch_mean > 0 else 0 |
|
|
rms = librosa.feature.rms(y=combined_audio)[0] |
|
|
intensity_mean = np.mean(rms) if len(rms) > 0 else 0 |
|
|
intensity_std = np.std(rms) if len(rms) > 0 else 0 |
|
|
shimmer = np.mean(np.abs(np.diff(rms))) / intensity_mean if len(rms) > 1 and intensity_mean > 0 else 0 |
|
|
anxiety_score = 0.6 * (pitch_std / pitch_mean if pitch_mean > 0 else 0) + 0.4 * (jitter + shimmer) |
|
|
confidence_score = 0.7 * (1 / (1 + intensity_std)) + 0.3 * (1 / (1 + filler_ratio)) |
|
|
hesitation_score = filler_ratio + repetition_score |
|
|
return {'speaking_rate': round(speaking_rate, 2), 'filler_ratio': round(filler_ratio, 4), |
|
|
'repetition_score': round(repetition_score, 4), |
|
|
'pitch_analysis': {'mean': float(pitch_mean), 'std_dev': float(pitch_std), 'jitter': float(jitter)}, |
|
|
'intensity_analysis': {'mean': float(intensity_mean), 'std_dev': float(intensity_std), |
|
|
'shimmer': float(shimmer)}, |
|
|
'composite_scores': {'anxiety': float(anxiety_score), 'confidence': float(confidence_score), |
|
|
'hesitation': float(hesitation_score)}} |
|
|
except Exception as e: |
|
|
logger.error(f"Error in detailed voice analysis: {e}", exc_info=True) |
|
|
return {'error': str(e)} |
|
|
|
|
|
def generate_voice_interpretation(analysis: Dict) -> str: |
|
|
if 'error' in analysis: |
|
|
return "<b>Detailed Vocal Metrics:</b><br/>Analysis not available." |
|
|
scores = analysis.get('composite_scores', {}) |
|
|
pitch = analysis.get('pitch_analysis', {}) |
|
|
intensity = analysis.get('intensity_analysis', {}) |
|
|
return (f"<b>Detailed Vocal Metrics Interpretation:</b><br/>" |
|
|
f"- Speaking Rate: {analysis.get('speaking_rate', 0):.2f} words/sec<br/>" |
|
|
f"- Filler Word Ratio: {analysis.get('filler_ratio', 0) * 100:.1f}%<br/>" |
|
|
f"-----------------------------------<br/>" |
|
|
f"- Pitch Mean: {pitch.get('mean', 0):.2f} Hz (Std Dev: {pitch.get('std_dev', 0):.2f})<br/>" |
|
|
f"- Jitter (Vocal Stability): {pitch.get('jitter', 0):.4f}<br/>" |
|
|
f"- Intensity (Loudness) Std Dev: {intensity.get('std_dev', 0):.4f}<br/>" |
|
|
f"-----------------------------------<br/>" |
|
|
f"- <b>Anxiety Score:</b> {scores.get('anxiety', 0):.3f}<br/>" |
|
|
f"- <b>Confidence Score:</b> {scores.get('confidence', 0):.3f}<br/>" |
|
|
f"- <b>Hesitation Score:</b> {scores.get('hesitation', 0):.3f}") |
|
|
|
|
|
def generate_anxiety_confidence_chart(composite_scores: Dict, chart_path_or_buffer): |
|
|
try: |
|
|
labels = ['Anxiety', 'Confidence', 'Hesitation'] |
|
|
scores = [composite_scores.get(k.lower(), 0) for k in labels] |
|
|
fig, ax = plt.subplots(figsize=(6, 4)) |
|
|
ax.bar(labels, scores, color=['#FF6B6B', '#4ECDC4', '#FFA500'], edgecolor='black', width=0.5) |
|
|
ax.set_ylabel('Score') |
|
|
ax.set_title('Candidate Vocal Dynamics') |
|
|
ax.set_ylim(0, max(scores) * 1.2 if scores and max(scores) > 0 else 1) |
|
|
for bar in ax.patches: |
|
|
ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.01, |
|
|
f"{bar.get_height():.2f}", ha='center', color='black') |
|
|
plt.tight_layout() |
|
|
plt.savefig(chart_path_or_buffer, format='png', dpi=150) |
|
|
plt.close(fig) |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating chart: {e}") |
|
|
|
|
|
def calculate_acceptance_probability(analysis_data: Dict) -> float: |
|
|
logger.info("Calculating final acceptance probability...") |
|
|
voice_metrics = analysis_data.get('voice_analysis_metrics', {}) |
|
|
if 'error' in voice_metrics or not voice_metrics.get('composite_scores'): |
|
|
return 30.0 |
|
|
scores = voice_metrics['composite_scores'] |
|
|
confidence = scores.get('confidence', 0.5) |
|
|
anxiety = scores.get('anxiety', 0.5) |
|
|
hesitation = scores.get('hesitation', 0.5) |
|
|
raw_score = (confidence * 0.6) + ((1 - anxiety) * 0.2) + ((1 - hesitation) * 0.2) |
|
|
max_score = 0.6 + 0.2 + 0.2 |
|
|
return round(max(10.0, min(99.0, (raw_score / max_score if max_score > 0 else 0) * 100)), 2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_gemini_report_text(analysis_data: Dict) -> str: |
|
|
logger.info("Generating AI-powered narrative report with Gemini...") |
|
|
voice = analysis_data.get('voice_analysis_metrics', {}) |
|
|
interviewee_text = "\n".join([f"- {u['text']}" for u in analysis_data['transcript_with_roles'] if u.get('role') == 'Interviewee']) |
|
|
acceptance_prob = analysis_data.get('acceptance_probability', 50.0) |
|
|
|
|
|
def format_value(val): |
|
|
return f"{val:.2f}" if isinstance(val, (int, float)) else val |
|
|
|
|
|
confidence = voice.get('composite_scores', {}).get('confidence', 'N/A') |
|
|
anxiety = voice.get('composite_scores', {}).get('anxiety', 'N/A') |
|
|
speaking_rate = voice.get('speaking_rate', 'N/A') |
|
|
|
|
|
prompt = f""" |
|
|
You are EvalBot, a highly experienced senior HR analyst generating a comprehensive interview evaluation report. |
|
|
Analyze deeply based on actual responses provided below. Avoid generic analysis. |
|
|
Maintain professional, HR-standard language with clear structure and bullet points. |
|
|
**Suitability Score: {format_value(acceptance_prob)}%** |
|
|
### Interviewee Full Responses: |
|
|
{interviewee_text if interviewee_text else "No responses recorded."} |
|
|
### Key Metrics: |
|
|
- Confidence Score: {format_value(confidence)} |
|
|
- Anxiety Score: {format_value(anxiety)} |
|
|
- Speaking Rate: {format_value(speaking_rate)} words/sec |
|
|
### Report Sections to Generate (Follow this structure exactly): |
|
|
**1. Executive Summary:** |
|
|
- 3 bullets summarizing performance, key strengths, and hiring recommendation. |
|
|
**2. Communication and Vocal Dynamics:** |
|
|
- Analyze delivery: speaking rate, filler words, confidence, anxiety. Provide 3-4 insightful bullets and 1 actionable recommendation. |
|
|
**3. Competency and Content:** |
|
|
- Identify 5-8 strengths (e.g., leadership, teamwork) with concrete examples from their responses. |
|
|
- Identify 5-10 weaknesses or development areas with actionable feedback. |
|
|
**4. Role Fit and Potential:** |
|
|
- Analyze role fit, cultural fit, and growth potential in 3 bullets. |
|
|
**5. Recommendations & Next Steps for Hiring Managers:** |
|
|
- Provide 5 actionable recommendations and 5 clear next steps. |
|
|
""" |
|
|
try: |
|
|
response = gemini_model.generate_content(prompt) |
|
|
return response.text |
|
|
except Exception as e: |
|
|
logger.error(f"Gemini report generation failed: {e}") |
|
|
return "Error: Could not generate AI analysis report." |
|
|
|
|
|
def create_pdf_report(analysis_data: Dict, output_path: str): |
|
|
logger.info(f"Generating comprehensive PDF report at {output_path}...") |
|
|
doc = SimpleDocTemplate(output_path, pagesize=letter, topMargin=inch, bottomMargin=inch) |
|
|
styles = getSampleStyleSheet() |
|
|
styles.add(ParagraphStyle(name='H1', fontSize=18, leading=22, spaceAfter=12, textColor=colors.HexColor('#003087'), |
|
|
fontName='Helvetica-Bold', alignment=TA_CENTER)) |
|
|
styles.add(ParagraphStyle(name='H2', fontSize=14, leading=18, spaceBefore=12, spaceAfter=8, |
|
|
textColor=colors.HexColor('#0050BC'), fontName='Helvetica-Bold')) |
|
|
styles.add(ParagraphStyle(name='H3', fontSize=12, leading=16, spaceBefore=10, spaceAfter=6, |
|
|
textColor=colors.HexColor('#333333'), fontName='Helvetica-Bold')) |
|
|
styles.add(ParagraphStyle(name='Body', fontSize=10, leading=14, spaceAfter=6, alignment=TA_JUSTIFY, |
|
|
leftIndent=10)) |
|
|
|
|
|
story = [] |
|
|
try: |
|
|
|
|
|
story.append(Paragraph("Candidate Interview Analysis Report", styles['H1'])) |
|
|
story.append(Spacer(1, 0.2 * inch)) |
|
|
story.append(Paragraph(f"Candidate ID: {analysis_data.get('user_id', 'N/A')}", styles['Body'])) |
|
|
story.append(Paragraph(f"Date of Analysis: {time.strftime('%B %d, %Y')}", styles['Body'])) |
|
|
prob = analysis_data.get('acceptance_probability', 0) |
|
|
prob_color = 'green' if prob >= 75 else 'orange' if prob >= 50 else 'red' |
|
|
story.append(Paragraph(f"<b>Overall Suitability Score:</b> <font size=16 color='{prob_color}'>{prob}%</font>", styles['H2'])) |
|
|
story.append(PageBreak()) |
|
|
|
|
|
|
|
|
story.append(Paragraph("Quantitative Vocal Analysis", styles['H2'])) |
|
|
if analysis_data.get('chart_image_bytes'): |
|
|
logger.debug("Adding chart image to PDF") |
|
|
img_buffer = io.BytesIO(analysis_data['chart_image_bytes']) |
|
|
story.append(Image(img_buffer, width=5.5 * inch, height=3.3 * inch)) |
|
|
else: |
|
|
story.append(Paragraph("No chart data available.", styles['Body'])) |
|
|
story.append(Spacer(1, 0.2 * inch)) |
|
|
|
|
|
voice_text = analysis_data.get('voice_interpretation_text', 'Not available.').replace('\n', '<br/>') |
|
|
story.append(Paragraph(voice_text, styles['Body'])) |
|
|
story.append(Spacer(1, 0.2 * inch)) |
|
|
|
|
|
|
|
|
story.append(Paragraph("Qualitative AI-Powered Report", styles['H2'])) |
|
|
gemini_text = analysis_data.get('gemini_report_text', 'Not available.') |
|
|
for line in gemini_text.split('\n'): |
|
|
line = line.strip() |
|
|
if not line: |
|
|
continue |
|
|
if line.startswith('**') and line.endswith('**'): |
|
|
story.append(Paragraph(line.strip('*'), styles['H3'])) |
|
|
elif line.startswith('- ') or line.startswith('* '): |
|
|
story.append(Paragraph(f"• {line[2:]}", styles['Body'])) |
|
|
else: |
|
|
story.append(Paragraph(line, styles['Body'])) |
|
|
|
|
|
doc.build(story) |
|
|
logger.info("PDF report generated successfully.") |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating PDF: {e}", exc_info=True) |
|
|
raise |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_interview(audio_path: str, user_id: str = "candidate-123") -> Dict: |
|
|
try: |
|
|
logger.info(f"Starting processing for {audio_path} (User ID: {user_id})") |
|
|
wav_file = convert_to_wav(audio_path) |
|
|
logger.debug(f"Created WAV file: {wav_file}") |
|
|
logger.info("Starting transcription") |
|
|
transcript = transcribe(wav_file) |
|
|
if not transcript or 'utterances' not in transcript or not transcript['utterances']: |
|
|
logger.error("Transcription failed or returned empty utterances") |
|
|
raise ValueError("Transcription failed or returned empty utterances") |
|
|
|
|
|
logger.info("Extracting prosodic features") |
|
|
full_audio = AudioSegment.from_wav(wav_file) |
|
|
for utterance in transcript['utterances']: |
|
|
segment = full_audio[utterance['start']:utterance['end']] |
|
|
utterance['prosodic_features'] = extract_detailed_prosodic_features(segment) |
|
|
|
|
|
logger.info("Identifying speakers") |
|
|
utterances_with_speakers = identify_speakers(transcript, wav_file) |
|
|
|
|
|
logger.info("Extracting duration features") |
|
|
utterances_with_duration = extract_duration_feature(utterances_with_speakers) |
|
|
|
|
|
logger.info("Classifying roles") |
|
|
classified_utterances = classify_roles_ultimate(utterances_with_duration, wav_file) |
|
|
|
|
|
logger.info("Analyzing interviewee voice") |
|
|
voice_analysis = analyze_interviewee_voice(wav_file, classified_utterances) |
|
|
|
|
|
logger.info("Generating chart and voice interpretation") |
|
|
voice_interpretation = generate_voice_interpretation(voice_analysis) |
|
|
chart_buffer = io.BytesIO() |
|
|
generate_anxiety_confidence_chart(voice_analysis.get('composite_scores', {}), chart_buffer) |
|
|
chart_buffer.seek(0) |
|
|
|
|
|
analysis_data = { |
|
|
'user_id': user_id, |
|
|
'transcript_with_roles': classified_utterances, |
|
|
'voice_analysis_metrics': voice_analysis, |
|
|
'speakers': list(set(u['speaker_id'] for u in classified_utterances)), |
|
|
'text_analysis': { |
|
|
'total_duration': sum(u['prosodic_features']['duration'] for u in classified_utterances), |
|
|
'speaker_turns': len(classified_utterances) |
|
|
}, |
|
|
'acceptance_probability': calculate_acceptance_probability({'voice_analysis_metrics': voice_analysis}), |
|
|
'voice_interpretation_text': voice_interpretation, |
|
|
'chart_image_bytes': chart_buffer.getvalue() if chart_buffer.tell() > 0 else None |
|
|
} |
|
|
|
|
|
logger.info("Generating report text using Gemini") |
|
|
gemini_report_text = generate_gemini_report_text(analysis_data) |
|
|
analysis_data['gemini_report_text'] = gemini_report_text |
|
|
|
|
|
base_name = f"{user_id}_{uuid.uuid4().hex}" |
|
|
pdf_path = os.path.join(PDF_DIR, f"{base_name}_report.pdf") |
|
|
create_pdf_report(analysis_data, pdf_path) |
|
|
|
|
|
json_path = os.path.join(JSON_DIR, f"{base_name}_analysis.json") |
|
|
logger.debug(f"JSON path before write: {json_path}") |
|
|
if os.path.exists(json_path): |
|
|
logger.warning(f"JSON file {json_path} already exists, overwriting.") |
|
|
with open(json_path, 'w') as f: |
|
|
logger.debug(f"Writing to JSON file: {json_path}") |
|
|
serializable_data = convert_to_serializable(analysis_data) |
|
|
json.dump(serializable_data, f, indent=2) |
|
|
|
|
|
os.remove(wav_file) |
|
|
logger.info(f"Processing completed for {audio_path} (User ID: {user_id})") |
|
|
return { |
|
|
'summary': f"User ID: {user_id}\nspeakers: {', '.join(analysis_data['speakers'])}", |
|
|
'json_path': json_path, |
|
|
'pdf_path': pdf_path |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Processing failed: {str(e)}", exc_info=True) |
|
|
if 'wav_file' in locals() and os.path.exists(wav_file): |
|
|
os.remove(wav_file) |
|
|
raise |