import os import subprocess import cv2 import json import math import torch import librosa import ffmpeg import numpy as np import soundfile as sf import mediapipe as mp from PIL import Image from transformers import AutoImageProcessor, AutoModelForImageClassification, pipeline from sentence_transformers import SentenceTransformer, CrossEncoder from sklearn.metrics.pairwise import cosine_similarity from mediapipe.tasks import python from mediapipe.tasks.python import vision # Ignore unnecessary warnings import warnings warnings.filterwarnings("ignore", category=UserWarning) warnings.filterwarnings("ignore", category=FutureWarning) TONE_MAPPING = { "Hesitant": 0, "Confident": 1, "Unstable": 2, "Natural": 3, "Excited": 3 } device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 2. Download and Initialize Mediapipe once (Global) MODEL_PATH = "face_landmarker.task" if not os.path.exists(MODEL_PATH): os.system(f"wget -O {MODEL_PATH} -q https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task") # 3. Initialize Models asr = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if torch.cuda.is_available() else -1) semantic_model = SentenceTransformer("all-MiniLM-L6-v2") cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2") FACE_MODEL_NAME = "dima806/facial_emotions_image_detection" face_processor = AutoImageProcessor.from_pretrained(FACE_MODEL_NAME) face_model = AutoModelForImageClassification.from_pretrained(FACE_MODEL_NAME).to(device).eval() # Emotion Mapping for Wheel emotion_va = { "happy": (0.8, 0.2), "fear": (0.2, 0.8), "angry": (-0.7, 0.65), "sad": (-0.65, -0.55), "surprise": (0.1, -0.75), "disgust": (0.6, -0.4), "neutral": (0.0, 0.0) } EMOTION_RING = [ ("Happy", 0, 0.84), ("Surprise", 45, 0.84), ("Fear", 100, 0.84), ("Sad", 160, 0.84), ("Disgust", 215, 0.84), ("Angry", 270, 0.84) ] ##Utility functions def normalize(v, mn, mx): return np.clip((v - mn) / (mx - mn), 0, 1) if mx - mn != 0 else 0.0 def extract_audio(v_in, a_out): ffmpeg.input(v_in).output(a_out, ac=1, ar=16000).overwrite_output().run(quiet=True) def merge_audio_video(v_in, a_in, v_out): ffmpeg.output(ffmpeg.input(v_in).video, ffmpeg.input(a_in).audio, v_out, vcodec="libx264", acodec="aac").overwrite_output().run(quiet=True) def draw_face_box(frame, x, y, w, h, emotion_name=""): color, th, cl = (0, 255, 100), 2, 20 # Green color cv2.rectangle(frame, (x, y), (x+w, y+h), color, 1) # Add emotion name above face box if emotion_name: cv2.putText( frame, emotion_name.upper(), (x + 10, y - 15), cv2.FONT_HERSHEY_DUPLEX, 0.7, (0, 255, 100), 2, cv2.LINE_AA ) # Corners for px, py, dx, dy in [(x,y,cl,0), (x,y,0,cl), (x+w,y,-cl,0), (x+w,y,0,cl), (x,y+h,cl,0), (x,y+h,0,-cl), (x+w,y+h,-cl,0), (x+w,y+h,0,-cl)]: cv2.line(frame, (px, py), (px+dx, py+dy), color, 5) return frame def compute_eye_contact_ratio(frame, landmarks): h, w, _ = frame.shape def ear(idx): p = [np.array([landmarks[i].x * w, landmarks[i].y * h]) for i in idx] return (np.linalg.norm(p[1]-p[5]) + np.linalg.norm(p[2]-p[4])) / (2.0 * np.linalg.norm(p[0]-p[3])) avg_ear = (ear([33, 160, 158, 133, 153, 144]) + ear([362, 385, 387, 263, 373, 380])) / 2.0 return min(max(avg_ear * 3, 0), 1) def analyze_face_emotion(frame): img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) inputs = face_processor(images=img, return_tensors="pt").to(device) with torch.no_grad(): outputs = face_model(**inputs) probs = torch.nn.functional.softmax(outputs.logits, dim=-1)[0] return {face_model.config.id2label[i].lower(): float(probs[i]) for i in range(len(probs))} ##Audio analysis def extract_audio_features(y, sr): duration = librosa.get_duration(y=y, sr=sr) if duration == 0: return {"pitch_std": 0, "jitter": 0, "energy_std": 0, "pause_ratio": 0, "speech_rate": 0} # Pitch & Jitter f0 = librosa.yin(y, fmin=75, fmax=300, sr=sr) f0 = f0[~np.isnan(f0)] pitch_std = np.std(f0) if len(f0) else 0 jitter = np.mean(np.abs(np.diff(f0)) / np.maximum(f0[:-1], 1e-6)) if len(f0) > 1 else 0 # Energy rms = librosa.feature.rms(y=y)[0] energy_std = np.std(rms) intervals = librosa.effects.split(y, top_db=20) speech_duration = sum((e - s) for s, e in intervals) / sr pause_ratio = 1 - (speech_duration / duration) if duration > 0 else 0 # Speech Rate oenv = librosa.onset.onset_strength(y=y, sr=sr) onsets = librosa.onset.onset_detect(onset_envelope=oenv, sr=sr) speech_rate = len(onsets) / duration if duration > 0 else 0 return { "pitch_std": pitch_std, "jitter": jitter, "energy_std": energy_std, "pause_ratio": pause_ratio, "speech_rate": speech_rate } def compute_audio_scores(features, baseline=None): """ Fairness-aware audio scoring with personal baseline comparison """ # Use standard defaults if no baseline provided if baseline is None: baseline = {"pitch_std": 30.0, "energy_std": 0.05, "jitter": 0.02, "pause_ratio": 0.2, "speech_rate": 4.0} # Calculate Relative Ratios (Current / Baseline) pitch_ratio = features["pitch_std"] / max(baseline["pitch_std"], 1e-6) energy_ratio = features["energy_std"] / max(baseline["energy_std"], 1e-6) rate_ratio = features["speech_rate"] / max(baseline["speech_rate"], 1e-6) # Stress Score (Relative) pitch_dev = abs(1 - pitch_ratio) energy_dev = abs(1 - energy_ratio) stress_val = (pitch_dev * 0.4 + energy_dev * 0.4 + features["jitter"] * 0.2) * 150 stress = np.clip(stress_val + 20, 0, 100) # Clarity Score (Relative) pause_dev = max(0, features["pause_ratio"] - baseline["pause_ratio"]) clarity = 100 - (pause_dev * 120 + features["jitter"] * 400) # Confidence Score (Relative) rate_dev = abs(1 - rate_ratio) confidence_audio = 100 - (rate_dev * 40 + energy_dev * 30 + features["pause_ratio"] * 50) # Tone classification based on relative shifts tones = { "Confident": confidence_audio, "Hesitant": features["pause_ratio"] * 150, "Excited": (energy_ratio - 1) * 100 if energy_ratio > 1 else 0, "Unstable": stress, "Natural": 100 - (pitch_dev * 60 + rate_dev * 40) } dominant_tone = max(tones, key=tones.get) return { "confidence_audio": round(float(np.clip(confidence_audio, 0, 100)), 2), "clarity": round(float(np.clip(clarity, 0, 100)), 2), "stress": round(float(np.clip(stress, 0, 100)), 2), "pauses": round(float(features["pause_ratio"] * 100), 2), "tone_of_voice": TONE_MAPPING.get(dominant_tone, 3) } def analyze_audio_segment(audio_path, baseline=None): """ Main entry point for audio segment analysis """ y, sr = librosa.load(audio_path, sr=16000) features = extract_audio_features(y, sr) return compute_audio_scores(features, baseline) ##Text analysis def get_user_answer(audio_path): """Transcribe audio using Whisper""" result = asr(audio_path, chunk_length_s=20) return result["text"].strip() def compute_similarity_score(user_answer, ideal_answer): emb = semantic_model.encode([user_answer, ideal_answer]) sim = cosine_similarity([emb[0]], [emb[1]])[0][0] score = float(sim * 100) return round(max(0, score), 2) def compute_relevance_score(question, user_answer): raw_score = cross_encoder.predict([(question, user_answer)])[0] prob = 1 / (1 + np.exp(-raw_score)) score = float(prob * 100) return round(max(0, score), 2) ##Video # Eye indices LEFT_EYE = [33, 160, 158, 133, 153, 144] RIGHT_EYE = [362, 385, 387, 263, 373, 380] # Eye Contact Function def compute_eye_contact_ratio(frame, landmarks): """ Compute eye contact ratio from detected face landmarks """ if not landmarks: return 0.5 h, w, _ = frame.shape def ear(indices): points = [ np.array([ landmarks[i].x * w, landmarks[i].y * h ]) for i in indices ] v1 = np.linalg.norm(points[1] - points[5]) v2 = np.linalg.norm(points[2] - points[4]) h_dist = np.linalg.norm(points[0] - points[3]) return (v1 + v2) / (2.0 * h_dist) ear_left = ear(LEFT_EYE) ear_right = ear(RIGHT_EYE) avg_ear = (ear_left + ear_right) / 2.0 eye_score = min(max(avg_ear * 3, 0), 1) return eye_score def analyze_face_emotion(frame): """ Predict facial emotion probabilities from single frame """ # Convert BGR to RGB rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) image = Image.fromarray(rgb) # Preprocess inputs = face_processor(images=image, return_tensors="pt").to(device) with torch.no_grad(): outputs = face_model(**inputs) probs = torch.nn.functional.softmax(outputs.logits, dim=-1)[0] labels = face_model.config.id2label emotion_probs = { labels[i].lower(): float(probs[i]) for i in range(len(probs)) } return emotion_probs def draw_face_box(frame, x, y, w, h, emotion_label="Neutral"): """ Draw face bounding box with emotion label above it """ # Green color for face box color = (0, 255, 0) thickness = 2 corner_len = 22 # Main rectangle cv2.rectangle(frame, (x, y), (x+w, y+h), color, thickness) # Decorative corner lines for (px, py, dx, dy) in [ (x, y, corner_len, 0), (x, y, 0, corner_len), (x+w, y, -corner_len, 0), (x+w, y, 0, corner_len), (x, y+h, corner_len, 0), (x, y+h, 0, -corner_len), (x+w, y+h, -corner_len, 0), (x+w, y+h, 0, -corner_len), ]: cv2.line(frame, (px, py), (px+dx, py+dy), color, 4) # Draw emotion text above the face box label_text = emotion_label.capitalize() (tw, th), _ = cv2.getTextSize( label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2 ) text_x = x + (w - tw) // 2 text_y = y - 10 cv2.putText( frame, label_text, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA ) return frame def compute_valence_arousal_from_probs(emotion_probs): """Computing Valence and Arousal from emotion probabilities""" v, a, total = 0.0, 0.0, 0.0 for emo, score in emotion_probs.items(): emo = emo.lower() if emo in emotion_va: v += emotion_va[emo][0] * score a += emotion_va[emo][1] * score total += score if total == 0: return 0.0, 0.0 return v / total, a / total def draw_full_emotion_wheel(panel, center, radius, valence, arousal, dominant_emotion="neutral"): cx, cy = center # Circle background cv2.circle(panel, center, radius + 5, (15, 15, 25), -1) cv2.circle(panel, center, radius, (60, 60, 85), 2) for rf in [0.33, 0.66]: cv2.circle(panel, center, int(radius * rf), (35, 35, 50), 1) # Drawing dividing lines between emotions for angle_deg in range(0, 360, 60): rad = math.radians(angle_deg) x1 = int(cx + radius * math.cos(rad)) y1 = int(cy - radius * math.sin(rad)) cv2.line(panel, (cx, cy), (x1, y1), (40, 40, 60), 1) # Drawing emotion labels ef, es, et = cv2.FONT_HERSHEY_SIMPLEX, 0.40, 1 for emotion_data in EMOTION_RING: if emotion_data[1] is None: continue label, angle_deg, rf = emotion_data rad = math.radians(angle_deg) lx = int(cx + rf * radius * math.cos(rad)) ly = int(cy - rf * radius * math.sin(rad)) (tw, th), _ = cv2.getTextSize(label, ef, es, et) tx, ty = lx - tw//2, ly + th//2 # Highlight active emotion if label.lower() == dominant_emotion.lower(): cv2.putText(panel, label, (tx, ty), ef, es+0.08, (0, 255, 200), 2, cv2.LINE_AA) else: cv2.putText(panel, label, (tx, ty), ef, es, (190, 190, 255), et, cv2.LINE_AA) # Neutral in center nc = (0, 255, 200) if dominant_emotion == "neutral" else (160, 160, 160) (tw, th), _ = cv2.getTextSize("Neutral", ef, es, et) cv2.putText(panel, "Neutral", (cx-tw//2, cy+th//2), ef, es, nc, et, cv2.LINE_AA) # Animated dot with glow dot_x = int(cx + valence * radius * 0.88) dot_y = int(cy - arousal * radius * 0.88) cv2.circle(panel, (dot_x, dot_y), 15, (160, 120, 0), -1) cv2.circle(panel, (dot_x, dot_y), 11, (220, 180, 0), -1) cv2.circle(panel, (dot_x, dot_y), 7, (255, 230, 60), -1) return panel BAR_CONFIGS = [ ("Confidence", (70, 180, 255), (30, 50, 100)), # light blue ("Clarity", (100, 220, 150), (25, 70, 50)), # light cyan ("Stress", (255, 120, 100), (100, 40, 30)), # light coral ] def draw_metric_bars(panel, bars_x_start, bar_y_top, bar_height, bar_width, bar_gap, confidence, clarity, stress): """ Draw horizontal metric bars with label above each bar """ values = [confidence, clarity, stress] labels_list = ["Confidence", "Clarity", "Stress"] # Extra vertical space for labels label_space = 20 for i, value in enumerate(values): label, fill_color, bg_color = BAR_CONFIGS[i] # Each bar block height = label + bar + gap y = bar_y_top + i * (bar_height + label_space + bar_gap) x_right = bars_x_start + bar_width filled = int((value / 100) * bar_width) # Draw label above bar cv2.putText( panel, label, (bars_x_start, y), cv2.FONT_HERSHEY_DUPLEX, 0.6, (230, 230, 230), 1, cv2.LINE_AA ) # Move bar slightly down to leave space for label bar_y = y + 8 # Draw background bar cv2.rectangle( panel, (bars_x_start, bar_y), (x_right, bar_y + bar_height), bg_color, -1 ) # Draw filled portion cv2.rectangle( panel, (bars_x_start, bar_y), (bars_x_start + filled, bar_y + bar_height), fill_color, -1 ) # Draw percentage text cv2.putText( panel, f"{int(value)}%", (bars_x_start + 12, bar_y + bar_height - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA ) return panel ##Integrated Video Processing (Analysis + Annotation) def process_full_video(video_path, output_dir, questions_config, audio_results_map=None): """ Enhanced video processing with: 1. Real-time (Live) Audio Metric Bars using a sliding window. 2. Dynamic Emotion Wheel and Face Tracking. 3. Auto-wrapping Question Text that avoids UI overlap. """ cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Load full audio for live segment analysis full_audio, sr = librosa.load(video_path, sr=16000) temp_output = os.path.join(output_dir, "annotated_full_raw.mp4") fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(temp_output, fourcc, fps, (width, height)) # Mediapipe Setup base_options = python.BaseOptions(model_asset_path="face_landmarker.task") options = vision.FaceLandmarkerOptions( base_options=base_options, running_mode=vision.RunningMode.VIDEO, num_faces=1 ) frame_idx = 0 smooth_v, smooth_a = 0.0, 0.0 dom_emo = "neutral" last_landmarks = None # Live Scores Buffering live_scores = {"confidence_audio": 0.0, "clarity": 0.0, "stress": 0.0} smoothing_factor = 0.15 # Controls how "bouncy" the bars are with vision.FaceLandmarker.create_from_options(options) as landmarker: while cap.isOpened(): ret, frame = cap.read() if not ret: break current_time = frame_idx / fps active_answer = next((q for q in questions_config if q["start_time"] <= current_time <= q["end_time"]), None) next_q = next((q for q in questions_config if current_time < q["start_time"]), None) next_text = f"Q) {next_q['question_text']}" if next_q else "Preparing..." # --- 1. LIVE AUDIO ANALYSIS (Every 10 frames) --- if frame_idx % 10 == 0: # Analyze the last 3 seconds of audio for "Live" feel start_sample = max(0, int((current_time - 3) * sr)) end_sample = int(current_time * sr) audio_segment = full_audio[start_sample:end_sample] if len(audio_segment) > sr * 0.5: # At least 0.5s of audio to analyze feats = extract_audio_features(audio_segment, sr) # Use global baseline if available instant_scores = compute_audio_scores(feats, baseline=None) # Apply smoothing to prevent jittery bars live_scores["confidence_audio"] += smoothing_factor * (instant_scores["confidence_audio"] - live_scores["confidence_audio"]) live_scores["clarity"] += smoothing_factor * (instant_scores["clarity"] - live_scores["clarity"]) live_scores["stress"] += smoothing_factor * (instant_scores["stress"] - live_scores["stress"]) # --- 2. VISUAL AI (Face & Emotion) --- if frame_idx % 4 == 0: mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) results = landmarker.detect_for_video(mp_image, int(current_time * 1000)) if results.face_landmarks: last_landmarks = results.face_landmarks[0] emo_probs = analyze_face_emotion(frame) dom_emo = max(emo_probs, key=emo_probs.get) v_target, a_target = compute_valence_arousal_from_probs(emo_probs) smooth_v += 0.15 * (v_target - smooth_v) smooth_a += 0.15 * (a_target - smooth_a) # --- 3. RENDERING UI ELEMENTS --- # Face Box if last_landmarks: xs = [lm.x * width for lm in last_landmarks] ys = [lm.y * height for lm in last_landmarks] draw_face_box(frame, int(min(xs)), int(min(ys)), int(max(xs)-min(xs)), int(max(ys)-min(ys)), dom_emo) # Emotion Wheel draw_full_emotion_wheel(frame, (width - 130, height - 100), 90, smooth_v, smooth_a, dom_emo) # Live Metric Bars draw_metric_bars( frame, 30, height - 160, 28, 200, 6, live_scores["confidence_audio"], live_scores["clarity"], live_scores["stress"] ) # Question Overlay (Wrapped Text) if not active_answer: frame = draw_question_overlay(frame, next_text, width, height) out.write(frame) frame_idx += 1 cap.release() out.release() return temp_output def draw_question_overlay(frame, text, width, height): """Draws a wrapped text box above the Wheel and Bars.""" font = cv2.FONT_HERSHEY_DUPLEX font_scale = 0.65 thickness = 1 side_margin = 50 bottom_limit = height - 270 # Ensure it stays above the bars/wheel line_height = 35 # Text Wrapping Logic max_w = width - (2 * side_margin) words = text.split(' ') lines, current_line = [], "" for word in words: test = current_line + word + " " (w, _), _ = cv2.getTextSize(test, font, font_scale, thickness) if w < max_w: current_line = test else: lines.append(current_line) current_line = word + " " lines.append(current_line) # Box dimensions rect_h = (len(lines) * line_height) + 20 y2 = bottom_limit y1 = y2 - rect_h # Transparent Background overlay = frame.copy() cv2.rectangle(overlay, (side_margin - 10, y1), (width - side_margin + 10, y2), (20, 20, 20), -1) cv2.addWeighted(overlay, 0.7, frame, 0.3, 0, frame) # Draw Lines for i, line in enumerate(lines): (tw, th), _ = cv2.getTextSize(line.strip(), font, font_scale, thickness) tx = (width - tw) // 2 ty = y1 + 25 + (i * line_height) cv2.putText(frame, line.strip(), (tx, ty), font, font_scale, (255, 255, 255), thickness, cv2.LINE_AA) return frame ##Main pipeline def run_intervision_pipeline(video_path, questions_config, output_dir): """ Run the full Intervision analysis pipeline. Steps: 1. Extract baseline audio 2. Run video annotation 3. Merge annotated video with original audio 4. Generate report """ os.makedirs(output_dir, exist_ok=True) print("[PIPELINE] Starting pipeline") print("[PIPELINE] Video path:", video_path) # --------------------------------------------------- #Extract baseline audio (first 10 seconds) # --------------------------------------------------- baseline_wav = os.path.join(output_dir, "baseline.wav") print("[PIPELINE] Extracting baseline audio") subprocess.run([ "ffmpeg", "-y", "-i", video_path, "-t", "10", "-vn", "-acodec", "pcm_s16le", "-ar", "16000", baseline_wav ], check=True) if not os.path.exists(baseline_wav): raise Exception("Baseline audio extraction failed") y_b, sr_b = librosa.load(baseline_wav, sr=16000) baseline_features = extract_audio_features(y_b, sr_b) # --------------------------------------------------- #Process video frames and annotate # --------------------------------------------------- print("[PIPELINE] Running video annotation") annotated_video_raw = process_full_video( video_path, output_dir, questions_config ) if not os.path.exists(annotated_video_raw): raise Exception("Annotated video was not generated") # --------------------------------------------------- #Merge annotated video with original audio # --------------------------------------------------- final_output = os.path.join( output_dir, "Intervision_Final_Report.mp4" ) print("[PIPELINE] Merging audio and annotated video") subprocess.run([ 'ffmpeg', '-y', '-i', annotated_video_raw, '-i', video_path, '-map', '0:v:0', '-map', '1:a:0', '-c:v', 'libx264', '-preset', 'veryfast', '-crf', '23', '-c:a', 'aac', '-b:a', '160k', '-shortest', final_output ], check=True) if not os.path.exists(final_output): raise Exception("Final video merge failed") print("[PIPELINE] Final video created:", final_output) # --------------------------------------------------- # Generate report JSON # --------------------------------------------------- report = { "status": "completed", "questionsAnalyzed": len(questions_config) } report_path = os.path.join(output_dir, "report.json") with open(report_path, "w") as f: json.dump(report, f, indent=2) print("[PIPELINE] Report saved:", report_path) return final_output, report_path