import streamlit as st import torch import os import numpy as np import librosa import whisper from openai import OpenAI import tempfile import warnings import re from contextlib import contextmanager import gc from concurrent.futures import ThreadPoolExecutor, as_completed import pandas as pd import subprocess import json import shutil from pathlib import Path import time from faster_whisper import WhisperModel import soundfile as sf import logging from typing import Optional, Dict, Any, List, Tuple import sys import multiprocessing import concurrent.futures import hashlib # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class AudioProcessingError(Exception): """Custom exception for audio processing errors""" pass @contextmanager def temporary_file(suffix: Optional[str] = None): """Context manager for temporary file handling""" temp_path = tempfile.mktemp(suffix=suffix) try: yield temp_path finally: if os.path.exists(temp_path): try: os.remove(temp_path) except Exception as e: logger.warning(f"Failed to remove temporary file {temp_path}: {e}") class ProgressTracker: """Tracks progress across multiple processing steps""" def __init__(self, status_container, progress_bar): self.status = status_container self.progress = progress_bar self.current_step = 0 self.total_steps = 5 # Total number of main processing steps self.substep_container = st.empty() # Add container for substep details self.metrics_container = st.container() # Add container for metrics def update(self, progress: float, message: str, substep: str = "", metrics: Dict[str, Any] = None): """Update progress bar and status message with enhanced UI feedback Args: progress: Progress within current step (0-1) message: Main status message substep: Optional substep detail metrics: Optional dictionary of metrics to display """ # Calculate overall progress (each step is 20% of total) overall_progress = min((self.current_step + progress) / self.total_steps, 1.0) # Update progress bar with smoother animation self.progress.progress(overall_progress) # Update main status with color coding status_html = f"""

{message}

""" if substep: status_html += f"

{substep}

" status_html += "
" self.status.markdown(status_html, unsafe_allow_html=True) # Display metrics if provided if metrics: with self.metrics_container: cols = st.columns(len(metrics)) for col, (metric_name, metric_value) in zip(cols, metrics.items()): with col: st.metric( label=metric_name, value=metric_value if isinstance(metric_value, (int, float)) else str(metric_value) ) def next_step(self): """Move to next processing step with visual feedback""" self.current_step = min(self.current_step + 1, self.total_steps) # Clear substep container for new step self.substep_container.empty() # Update progress with completion animation if self.current_step == self.total_steps: self.progress.progress(1.0) self.status.markdown("""

✅ Processing Complete!

""", unsafe_allow_html=True) def error(self, message: str): """Display error message with visual feedback""" self.status.markdown(f"""

❌ Error

{message}

""", unsafe_allow_html=True) class AudioFeatureExtractor: """Handles audio feature extraction with improved pause detection""" def __init__(self): self.sr = 16000 self.hop_length = 512 self.n_fft = 2048 self.chunk_duration = 300 # Parameters for pause detection self.min_pause_duration = 4 # minimum pause duration in seconds self.silence_threshold = -40 # dB threshold for silence def _analyze_pauses(self, silent_frames, frame_time): """Analyze pauses with minimal memory usage.""" pause_durations = [] current_pause = 0 for is_silent in silent_frames: if is_silent: current_pause += 1 elif current_pause > 0: duration = current_pause * frame_time if duration > 0.5: # Only count pauses longer than 300ms pause_durations.append(duration) current_pause = 0 if pause_durations: return { 'total_pauses': len(pause_durations), 'mean_pause_duration': float(np.mean(pause_durations)) } return { 'total_pauses': 0, 'mean_pause_duration': 0.0 } def extract_features(self, audio_path: str, progress_callback=None) -> Dict[str, float]: try: if progress_callback: progress_callback(0.1, "Loading audio file...") # Load audio with proper sample rate audio, sr = librosa.load(audio_path, sr=16000) # Calculate amplitude features rms = librosa.feature.rms(y=audio)[0] mean_amplitude = float(np.mean(rms)) * 100 # Scale for better readability # Enhanced pitch analysis for monotone detection f0, voiced_flag, _ = librosa.pyin( audio, sr=sr, fmin=70, fmax=400, frame_length=2048 ) # Filter out zero and NaN values valid_f0 = f0[np.logical_and(voiced_flag == 1, ~np.isnan(f0))] # Calculate pitch statistics for monotone detection pitch_mean = float(np.mean(valid_f0)) if len(valid_f0) > 0 else 0 pitch_std = float(np.std(valid_f0)) if len(valid_f0) > 0 else 0 pitch_range = float(np.ptp(valid_f0)) if len(valid_f0) > 0 else 0 # Peak-to-peak range # Calculate pitch variation coefficient (normalized standard deviation) pitch_variation_coeff = (pitch_std / pitch_mean * 100) if pitch_mean > 0 else 0 # Calculate monotone score based on multiple factors # 1. Low pitch variation (monotone speakers have less variation) variation_factor = min(1.0, max(0.0, 1.0 - (pitch_variation_coeff / 30.0))) # 2. Small pitch range relative to mean pitch (monotone speakers have smaller ranges) range_ratio = (pitch_range / pitch_mean * 100) if pitch_mean > 0 else 0 range_factor = min(1.0, max(0.0, 1.0 - (range_ratio / 100.0))) # 3. Few pitch direction changes (monotone speakers have fewer changes) pitch_changes = np.diff(valid_f0) if len(valid_f0) > 1 else np.array([]) direction_changes = np.sum(np.diff(np.signbit(pitch_changes))) if len(pitch_changes) > 0 else 0 changes_per_minute = direction_changes / (len(audio) / sr / 60) if len(audio) > 0 else 0 changes_factor = min(1.0, max(0.0, 1.0 - (changes_per_minute / 300.0))) # Calculate final monotone score (0-1, higher means more monotonous) monotone_score = (variation_factor * 0.4 + range_factor * 0.3 + changes_factor * 0.3) # Log the factors for debugging logger.info(f"""Monotone score calculation: Pitch variation coeff: {pitch_variation_coeff:.2f} Variation factor: {variation_factor:.2f} Range ratio: {range_ratio:.2f} Range factor: {range_factor:.2f} Changes per minute: {changes_per_minute:.2f} Changes factor: {changes_factor:.2f} Final monotone score: {monotone_score:.2f} """) # Calculate pauses per minute rms_db = librosa.amplitude_to_db(rms, ref=np.max) silence_frames = rms_db < self.silence_threshold frame_time = self.hop_length / sr pause_analysis = self._analyze_pauses(silence_frames, frame_time) # Calculate pauses per minute duration_minutes = len(audio) / sr / 60 pauses_per_minute = float(pause_analysis['total_pauses'] / duration_minutes if duration_minutes > 0 else 0) return { "pitch_mean": pitch_mean, "pitch_std": pitch_std, "pitch_range": pitch_range, "pitch_variation_coeff": pitch_variation_coeff, "monotone_score": monotone_score, # Added monotone score to output "mean_amplitude": mean_amplitude, "amplitude_deviation": float(np.std(rms) / np.mean(rms)) if np.mean(rms) > 0 else 0, "pauses_per_minute": pauses_per_minute, "duration": float(len(audio) / sr), "rising_patterns": int(np.sum(np.diff(valid_f0) > 0)) if len(valid_f0) > 1 else 0, "falling_patterns": int(np.sum(np.diff(valid_f0) < 0)) if len(valid_f0) > 1 else 0, "variations_per_minute": float(len(valid_f0) / (len(audio) / sr / 60)) if len(audio) > 0 else 0, "direction_changes_per_min": changes_per_minute } except Exception as e: logger.error(f"Error in feature extraction: {e}") raise AudioProcessingError(f"Feature extraction failed: {str(e)}") def _process_chunk(self, chunk: np.ndarray) -> Dict[str, Any]: """Process a single chunk of audio with improved pause detection""" # Calculate STFT D = librosa.stft(chunk, n_fft=self.n_fft, hop_length=self.hop_length) S = np.abs(D) # Calculate RMS energy in dB rms = librosa.feature.rms(S=S)[0] rms_db = librosa.amplitude_to_db(rms, ref=np.max) # Detect pauses using silence threshold is_silence = rms_db < self.silence_threshold frame_time = self.hop_length / self.sr pause_analysis = self._analyze_pauses(is_silence, frame_time) # Calculate pitch features f0, voiced_flag, _ = librosa.pyin( chunk, sr=self.sr, fmin=70, fmax=400, frame_length=self.n_fft ) return { "rms": rms, "f0": f0[voiced_flag == 1] if f0 is not None else np.array([]), "duration": len(chunk) / self.sr, "pause_count": pause_analysis['total_pauses'], "mean_pause_duration": pause_analysis['mean_pause_duration'] } def _combine_features(self, features: List[Dict[str, Any]]) -> Dict[str, float]: """Combine features from multiple chunks""" all_f0 = np.concatenate([f["f0"] for f in features if len(f["f0"]) > 0]) all_rms = np.concatenate([f["rms"] for f in features]) pitch_mean = np.mean(all_f0) if len(all_f0) > 0 else 0 pitch_std = np.std(all_f0) if len(all_f0) > 0 else 0 return { "pitch_mean": float(pitch_mean), "pitch_std": float(pitch_std), "mean_amplitude": float(np.mean(all_rms)), "amplitude_deviation": float(np.std(all_rms) / np.mean(all_rms)) if np.mean(all_rms) > 0 else 0, "rising_patterns": int(np.sum(np.diff(all_f0) > 0)) if len(all_f0) > 1 else 0, "falling_patterns": int(np.sum(np.diff(all_f0) < 0)) if len(all_f0) > 1 else 0, "variations_per_minute": float((np.sum(np.diff(all_f0) != 0) if len(all_f0) > 1 else 0) / (sum(f["duration"] for f in features) / 60)) } class ContentAnalyzer: """Analyzes teaching content using OpenAI API""" def __init__(self, api_key: str): self.client = OpenAI(api_key=api_key) self.retry_count = 3 self.retry_delay = 1 def analyze_content(self, transcript: str, progress_callback=None) -> Dict[str, Any]: """Analyze teaching content with strict validation and robust JSON handling""" default_structure = { "Concept Assessment": { "Subject Matter Accuracy": { "Score": 0, "Citations": ["[00:00] Unable to assess - insufficient evidence"] }, "First Principles Approach": { "Score": 0, "Citations": ["[00:00] Unable to assess - insufficient evidence"] }, "Examples and Business Context": { "Score": 0, "Citations": ["[00:00] Unable to assess - insufficient evidence"] }, "Cohesive Storytelling": { "Score": 0, "Citations": ["[00:00] Unable to assess - insufficient evidence"] }, "Engagement and Interaction": { "Score": 0, "Citations": ["[00:00] Unable to assess - insufficient evidence"] }, "Professional Tone": { "Score": 0, "Citations": ["[00:00] Unable to assess - insufficient evidence"] } }, "Code Assessment": { "Depth of Explanation": { "Score": 0, "Citations": ["[00:00] Unable to assess - insufficient evidence"] }, "Output Interpretation": { "Score": 0, "Citations": ["[00:00] Unable to assess - insufficient evidence"] }, "Breaking down Complexity": { "Score": 0, "Citations": ["[00:00] Unable to assess - insufficient evidence"] } } } for attempt in range(self.retry_count): try: if progress_callback: progress_callback(0.2, "Preparing content analysis...") prompt = self._create_analysis_prompt(transcript) if progress_callback: progress_callback(0.5, "Processing with AI model...") try: response = self.client.chat.completions.create( model="gpt-4o-mini", # Using GPT-4 for better analysis messages=[ {"role": "system", "content": """You are a strict teaching evaluator focusing on core teaching competencies. For each assessment point, you MUST include specific timestamps [MM:SS] from the transcript. Never use [00:00] as a placeholder - only use actual timestamps from the transcript. Each citation must include both the timestamp and a relevant quote showing evidence. Score of 1 requires meeting ALL criteria below with clear evidence. Score of 0 if ANY major teaching deficiency is present. Citations format: "[MM:SS] Exact quote from transcript showing evidence" Maintain high standards and require clear evidence of quality teaching."""}, {"role": "user", "content": prompt} ], temperature=0.3 ) logger.info("API call successful") except Exception as api_error: logger.error(f"API call failed: {str(api_error)}") raise result_text = response.choices[0].message.content.strip() logger.info(f"Raw API response: {result_text[:500]}...") try: # Parse the API response result = json.loads(result_text) # Validate and clean up the structure for category in ["Concept Assessment", "Code Assessment"]: if category not in result: result[category] = default_structure[category] else: for subcategory in default_structure[category]: if subcategory not in result[category]: result[category][subcategory] = default_structure[category][subcategory] else: # Ensure proper structure and non-empty citations entry = result[category][subcategory] if not isinstance(entry, dict): entry = {"Score": 0, "Citations": []} if "Score" not in entry: entry["Score"] = 0 if "Citations" not in entry or not entry["Citations"]: entry["Citations"] = [f"[{self._get_timestamp(transcript)}] Insufficient evidence for assessment"] # Ensure Score is either 0 or 1 entry["Score"] = 1 if entry["Score"] == 1 else 0 result[category][subcategory] = entry return result except json.JSONDecodeError as json_error: logger.error(f"JSON parsing error: {json_error}") if attempt == self.retry_count - 1: # On final attempt, try to extract structured data return self._extract_structured_data(result_text) except Exception as e: logger.error(f"Content analysis attempt {attempt + 1} failed: {str(e)}") if attempt == self.retry_count - 1: return default_structure time.sleep(self.retry_delay * (2 ** attempt)) return default_structure def _get_timestamp(self, transcript: str) -> str: """Generate a reasonable timestamp based on transcript length""" # Calculate approximate time based on word count words = len(transcript.split()) minutes = words // 150 # Assuming 150 words per minute seconds = (words % 150) * 60 // 150 return f"{minutes:02d}:{seconds:02d}" def _extract_structured_data(self, text: str) -> Dict[str, Any]: """Extract structured data from text response when JSON parsing fails""" default_structure = { "Concept Assessment": {}, "Code Assessment": {} } try: # Simple pattern matching to extract scores and citations sections = text.split('\n\n') current_category = None current_subcategory = None for section in sections: if "Concept Assessment" in section: current_category = "Concept Assessment" elif "Code Assessment" in section: current_category = "Code Assessment" elif current_category and ':' in section: title, content = section.split(':', 1) current_subcategory = title.strip() # Extract score (assuming 0 or 1 is mentioned) score = 1 if "pass" in content.lower() or "score: 1" in content.lower() else 0 # Extract citations (assuming they're in [MM:SS] format) citations = re.findall(r'\[\d{2}:\d{2}\].*?(?=\[|$)', content) citations = [c.strip() for c in citations if c.strip()] if not citations: citations = ["No specific citations found"] if current_category and current_subcategory: if current_category not in default_structure: default_structure[current_category] = {} default_structure[current_category][current_subcategory] = { "Score": score, "Citations": citations } return default_structure except Exception as e: logger.error(f"Error extracting structured data: {e}") return default_structure def _create_analysis_prompt(self, transcript: str) -> str: """Create the analysis prompt with stricter evaluation criteria""" # First try to extract existing timestamps timestamps = re.findall(r'\[(\d{2}:\d{2})\]', transcript) if timestamps: timestamp_instruction = f"""Use the EXACT timestamps from the transcript (e.g. {', '.join(timestamps[:3])}). Do not create new timestamps.""" else: # Calculate approximate timestamps based on word position timestamp_instruction = """Generate timestamps based on word position: 1. Count words from start of transcript 2. Calculate time: (word_count / 150) minutes 3. Format as [MM:SS]""" prompt_template = """Analyze this teaching content with balanced standards. Each criterion should be evaluated fairly, avoiding both excessive strictness and leniency. Score 1 if MOST key requirements are met with clear evidence. Score 0 if MULTIPLE significant requirements are not met. You MUST provide specific citations with timestamps [MM:SS] for each assessment point. Transcript: {transcript} Timestamp Instructions: {timestamp_instruction} Required JSON response format: {{ "Concept Assessment": {{ "Subject Matter Accuracy": {{ "Score": 0 or 1, "Citations": ["[MM:SS] Exact quote showing evidence"] }}, "First Principles Approach": {{ "Score": 0 or 1, "Citations": ["[MM:SS] Exact quote showing evidence"] }}, "Examples and Business Context": {{ "Score": 0 or 1, "Citations": ["[MM:SS] Exact quote showing evidence"] }}, "Cohesive Storytelling": {{ "Score": 0 or 1, "Citations": ["[MM:SS] Exact quote showing evidence"] }}, "Engagement and Interaction": {{ "Score": 0 or 1, "Citations": ["[MM:SS] Exact quote showing evidence"] }}, "Professional Tone": {{ "Score": 0 or 1, "Citations": ["[MM:SS] Exact quote showing evidence"] }} }}, "Code Assessment": {{ "Depth of Explanation": {{ "Score": 0 or 1, "Citations": ["[MM:SS] Exact quote showing evidence"] }}, "Output Interpretation": {{ "Score": 0 or 1, "Citations": ["[MM:SS] Exact quote showing evidence"] }}, "Breaking down Complexity": {{ "Score": 0 or 1, "Citations": ["[MM:SS] Exact quote showing evidence"] }} }} }} Balanced Scoring Criteria: Subject Matter Accuracy: ✓ Score 1 if MOST: - Shows good technical knowledge - Uses appropriate terminology - Explains concepts correctly ✗ Score 0 if MULTIPLE: - Contains significant technical errors - Uses consistently incorrect terminology - Misrepresents core concepts First Principles Approach: ✓ Score 1 if MOST: - Introduces fundamental concepts - Shows logical progression - Connects related concepts ✗ Score 0 if MULTIPLE: - Skips essential fundamentals - Shows unclear progression - Fails to connect concepts Examples and Business Context: ✓ Score 1 if MOST: - Provides relevant examples - Shows business application - Demonstrates practical value ✗ Score 0 if MULTIPLE: - Lacks meaningful examples - Missing practical context - Examples don't aid learning Cohesive Storytelling: ✓ Score 1 if MOST: - Shows clear structure - Has logical transitions - Maintains consistent theme ✗ Score 0 if MULTIPLE: - Has unclear structure - Shows jarring transitions - Lacks coherent theme Engagement and Interaction: ✓ Score 1 if MOST: - Encourages participation - Shows audience awareness - Uses engaging techniques ✗ Score 0 if MULTIPLE: - Shows minimal interaction - Ignores audience - Lacks engagement attempts Professional Tone: ✓ Score 1 if MOST: - Uses appropriate language - Shows confidence - Maintains clarity ✗ Score 0 if MULTIPLE: - Uses inappropriate language - Shows consistent uncertainty - Is frequently unclear Depth of Explanation: ✓ Score 1 if MOST: - Explains core concepts - Covers key details - Discusses implementation ✗ Score 0 if MULTIPLE: - Misses core concepts - Skips important details - Lacks implementation depth Output Interpretation: ✓ Score 1 if MOST: - Explains key results - Covers common errors - Discusses performance ✗ Score 0 if MULTIPLE: - Unclear about results - Ignores error cases - Misses performance aspects Breaking down Complexity: ✓ Score 1 if MOST: - Breaks down concepts - Shows clear steps - Builds understanding ✗ Score 0 if MULTIPLE: - Keeps concepts too complex - Skips important steps - Creates confusion Important: - Each citation must include timestamp and relevant quote - Score 1 requires meeting MOST (not all) criteria - Score 0 requires MULTIPLE significant issues - Use specific evidence from transcript - Balance between being overly strict and too lenient """ return prompt_template.format( transcript=transcript, timestamp_instruction=timestamp_instruction ) def _evaluate_speech_metrics(self, transcript: str, audio_features: Dict[str, float], progress_callback=None) -> Dict[str, Any]: """Evaluate speech metrics with improved accuracy and stricter checks""" try: if progress_callback: progress_callback(0.2, "Calculating speech metrics...") # Calculate words and duration words = len(transcript.split()) duration_minutes = float(audio_features.get('duration', 0)) / 60 # Enhanced grammatical error detection with stricter patterns grammatical_errors = [] # Subject-verb agreement errors sv_errors = re.findall(r'\b(they is|he are|she are|it are|there are \w+s|there is \w+s)\b', transcript.lower()) grammatical_errors.extend([("Subject-Verb Agreement", err) for err in sv_errors]) # Article misuse article_errors = re.findall(r'\b(a [aeiou]\w+|an [^aeiou\s]\w+)\b', transcript.lower()) grammatical_errors.extend([("Article Misuse", err) for err in article_errors]) # Double negatives double_neg = re.findall(r'\b(don\'t.*no|doesn\'t.*no|didn\'t.*no|never.*no)\b', transcript.lower()) grammatical_errors.extend([("Double Negative", err) for err in double_neg]) # Preposition errors prep_errors = re.findall(r'\b(depend of|different than|identical than)\b', transcript.lower()) grammatical_errors.extend([("Preposition Error", err) for err in prep_errors]) # Incomplete sentences (stricter detection) incomplete = re.findall(r'[a-zA-Z]+\s*[.!?]\s*(?![A-Z])|[a-zA-Z]+\s*-\s+|[a-zA-Z]+\s*\.\.\.', transcript) grammatical_errors.extend([("Incomplete Sentence", err) for err in incomplete]) # Calculate errors per minute with stricter threshold errors_count = len(grammatical_errors) errors_per_minute = float(errors_count / duration_minutes if duration_minutes > 0 else 0) # Stricter threshold for errors (max 1 error per minute) max_errors = 1.0 # Calculate monotone score with stricter thresholds pitch_mean = float(audio_features.get("pitch_mean", 0)) pitch_std = float(audio_features.get("pitch_std", 0)) pitch_variation_coeff = (pitch_std / pitch_mean * 100) if pitch_mean > 0 else 0 direction_changes = float(audio_features.get("direction_changes_per_min", 0)) pitch_range = float(audio_features.get("pitch_range", 0)) # Recalibrated scoring factors with stricter ranges # Variation factor: needs wider variation (20-40% is good) variation_factor = min(1.0, max(0.0, 1.0 if 20 <= pitch_variation_coeff <= 40 else 0.5 if 15 <= pitch_variation_coeff <= 45 else 0.0 )) # Range factor: needs wider range (200-300% is good) range_ratio = (pitch_range / pitch_mean * 100) if pitch_mean > 0 else 0 range_factor = min(1.0, max(0.0, 1.0 if 200 <= range_ratio <= 300 else 0.5 if 150 <= range_ratio <= 350 else 0.0 )) # Changes factor: needs more frequent changes (450-650 changes/min is good) changes_factor = min(1.0, max(0.0, 1.0 if 450 <= direction_changes <= 650 else 0.5 if 350 <= direction_changes <= 750 else 0.0 )) # Calculate final monotone score (0-1, higher means more monotonous) # Using weighted average to emphasize variation importance weights = [0.4, 0.3, 0.3] # More weight on pitch variation monotone_score = 1.0 - ( (variation_factor * weights[0] + range_factor * weights[1] + changes_factor * weights[2]) ) # Add debug logging logger.info(f"""Monotone score calculation: Pitch variation coeff: {pitch_variation_coeff:.2f} Pitch range ratio: {range_ratio:.2f}% Changes per minute: {direction_changes:.2f} Variation factor: {variation_factor:.2f} Range factor: {range_factor:.2f} Changes factor: {changes_factor:.2f} Final score: {monotone_score:.2f} """) return { "speed": { "score": 1 if 120 <= words_per_minute <= 180 else 0, "wpm": words_per_minute, "total_words": words, "duration_minutes": duration_minutes }, "fluency": { "score": 1 if errors_per_minute <= max_errors else 0, "errorsPerMin": errors_per_minute, "maxErrorsThreshold": max_errors, "detectedErrors": [ { "type": error_type, "context": error_text } for error_type, error_text in grammatical_errors ] }, "flow": { "score": 1 if audio_features.get("pauses_per_minute", 0) <= 12 else 0, "pausesPerMin": audio_features.get("pauses_per_minute", 0) }, "intonation": { "pitch": pitch_mean, "pitchScore": 1 if not any(monotone_indicators.values()) else 0, "pitchVariation": pitch_variation_coeff, "monotoneScore": monotone_score, "monotoneIndicators": monotone_indicators, "directionChanges": direction_changes, "variationsPerMin": audio_features.get("variations_per_minute", 0) }, "energy": { "score": 1 if 60 <= audio_features.get("mean_amplitude", 0) <= 75 else 0, "meanAmplitude": audio_features.get("mean_amplitude", 0), "amplitudeDeviation": audio_features.get("amplitude_deviation", 0), "variationScore": 1 if 0.05 <= audio_features.get("amplitude_deviation", 0) <= 0.15 else 0 } } except Exception as e: logger.error(f"Error in speech metrics evaluation: {e}") raise def generate_suggestions(self, category: str, citations: List[str]) -> List[str]: """Generate contextual suggestions based on category and citations""" try: response = self.client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": """You are a teaching expert providing specific, actionable suggestions for improvement. Focus on the single most important, practical advice based on the teaching category and cited issues. Keep suggestions under 25 words."""}, {"role": "user", "content": f""" Teaching Category: {category} Issues identified in citations: {json.dumps(citations, indent=2)} Please provide 2 or 3 at max specific, actionable suggestion for improvement. Format as a JSON array with a single string."""} ], response_format={"type": "json_object"}, temperature=0.7 ) result = json.loads(response.choices[0].message.content) return result.get("suggestions", []) except Exception as e: logger.error(f"Error generating suggestions: {e}") return [f"Unable to generate specific suggestions: {str(e)}"] class RecommendationGenerator: """Generates teaching recommendations using OpenAI API""" def __init__(self, api_key: str): self.client = OpenAI(api_key=api_key) self.retry_count = 3 self.retry_delay = 1 def generate_recommendations(self, metrics: Dict[str, Any], content_analysis: Dict[str, Any], progress_callback=None) -> Dict[str, Any]: """Generate recommendations with robust JSON handling""" for attempt in range(self.retry_count): try: if progress_callback: progress_callback(0.2, "Preparing recommendation analysis...") prompt = self._create_recommendation_prompt(metrics, content_analysis) if progress_callback: progress_callback(0.5, "Generating recommendations...") response = self.client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": """You are a teaching expert providing actionable recommendations. Each improvement must be categorized as one of: - COMMUNICATION: Related to speaking, pace, tone, clarity, delivery - TEACHING: Related to explanation, examples, engagement, structure - TECHNICAL: Related to code, implementation, technical concepts Always respond with a valid JSON object containing categorized improvements."""}, {"role": "user", "content": prompt} ], response_format={"type": "json_object"} ) if progress_callback: progress_callback(0.8, "Formatting recommendations...") result_text = response.choices[0].message.content.strip() try: result = json.loads(result_text) # Ensure improvements are properly formatted if "improvements" in result: formatted_improvements = [] for imp in result["improvements"]: if isinstance(imp, str): # Default categorization for legacy format formatted_improvements.append({ "category": "TECHNICAL", "message": imp }) elif isinstance(imp, dict): # Ensure proper structure for dict format formatted_improvements.append({ "category": imp.get("category", "TECHNICAL"), "message": imp.get("message", str(imp)) }) result["improvements"] = formatted_improvements except json.JSONDecodeError: result = { "geographyFit": "Unknown", "improvements": [ { "category": "TECHNICAL", "message": "Unable to generate specific recommendations" } ], "rigor": "Undetermined", "profileMatches": [] } if progress_callback: progress_callback(1.0, "Recommendations complete!") return result except Exception as e: logger.error(f"Recommendation generation attempt {attempt + 1} failed: {e}") if attempt == self.retry_count - 1: return { "geographyFit": "Unknown", "improvements": [ { "category": "TECHNICAL", "message": f"Unable to generate specific recommendations: {str(e)}" } ], "rigor": "Undetermined", "profileMatches": [] } time.sleep(self.retry_delay * (2 ** attempt)) def _create_recommendation_prompt(self, metrics: Dict[str, Any], content_analysis: Dict[str, Any]) -> str: """Create the recommendation prompt""" return f"""Based on the following metrics and analysis, provide recommendations: Metrics: {json.dumps(metrics)} Content Analysis: {json.dumps(content_analysis)} Analyze the teaching style and provide: 1. A concise performance summary (2-3 paragraphs highlighting key strengths and areas for improvement) 2. Geography fit assessment 3. Specific improvements needed (each must be categorized as COMMUNICATION, TEACHING, or TECHNICAL) 4. Profile matching for different learner types (choose ONLY ONE best match) 5. Overall teaching rigor assessment Required JSON structure: {{ "summary": "Comprehensive summary of teaching performance, strengths, and areas for improvement", "geographyFit": "String describing geographical market fit", "improvements": [ {{ "category": "COMMUNICATION", "message": "Specific improvement recommendation" }}, {{ "category": "TEACHING", "message": "Specific improvement recommendation" }}, {{ "category": "TECHNICAL", "message": "Specific improvement recommendation" }} ], "rigor": "Assessment of teaching rigor", "profileMatches": [ {{ "profile": "junior_technical", "match": false, "reason": "Detailed explanation why this profile is not the best match" }}, {{ "profile": "senior_non_technical", "match": false, "reason": "Detailed explanation why this profile is not the best match" }}, {{ "profile": "junior_expert", "match": false, "reason": "Detailed explanation why this profile is not the best match" }}, {{ "profile": "senior_expert", "match": false, "reason": "Detailed explanation why this profile is not the best match" }} ] }} Consider: - Teaching pace and complexity level - Balance of technical vs business context - Depth of code explanations - Use of examples and analogies - Engagement style - Communication metrics - Teaching assessment scores""" class CostCalculator: """Calculates API and processing costs""" def __init__(self): self.GPT4_INPUT_COST = 0.15 / 1_000_000 # $0.15 per 1M tokens input self.GPT4_OUTPUT_COST = 0.60 / 1_000_000 # $0.60 per 1M tokens output self.WHISPER_COST = 0.006 / 60 # $0.006 per minute self.costs = { 'transcription': 0.0, 'content_analysis': 0.0, 'recommendations': 0.0, 'total': 0.0 } def estimate_tokens(self, text: str) -> int: """Rough estimation of token count based on words""" return len(text.split()) * 1.3 # Approximate tokens per word def add_transcription_cost(self, duration_seconds: float): """Calculate Whisper transcription cost""" cost = (duration_seconds / 60) * self.WHISPER_COST self.costs['transcription'] = cost self.costs['total'] += cost print(f"\nTranscription Cost: ${cost:.4f}") def add_gpt4_cost(self, input_text: str, output_text: str, operation: str): """Calculate GPT-4 API cost for a single operation""" input_tokens = self.estimate_tokens(input_text) output_tokens = self.estimate_tokens(output_text) input_cost = input_tokens * self.GPT4_INPUT_COST output_cost = output_tokens * self.GPT4_OUTPUT_COST total_cost = input_cost + output_cost self.costs[operation] = total_cost self.costs['total'] += total_cost print(f"\n{operation.replace('_', ' ').title()} Cost:") print(f"Input tokens: {input_tokens:.0f} (${input_cost:.4f})") print(f"Output tokens: {output_tokens:.0f} (${output_cost:.4f})") print(f"Operation total: ${total_cost:.4f}") def print_total_cost(self): """Print total cost breakdown""" print("\n=== Cost Breakdown ===") for key, cost in self.costs.items(): if key != 'total': print(f"{key.replace('_', ' ').title()}: ${cost:.4f}") print(f"\nTotal Cost: ${self.costs['total']:.4f}") class MentorEvaluator: """Main class for video evaluation""" def __init__(self, model_cache_dir: Optional[str] = None): # Fix potential API key issue self.api_key = st.secrets.get("OPENAI_API_KEY") # Use get() method if not self.api_key: raise ValueError("OpenAI API key not found in secrets") # Add error handling for model cache directory try: if model_cache_dir: self.model_cache_dir = Path(model_cache_dir) else: self.model_cache_dir = Path.home() / ".cache" / "whisper" self.model_cache_dir.mkdir(parents=True, exist_ok=True) except Exception as e: raise RuntimeError(f"Failed to create model cache directory: {e}") # Initialize components with proper error handling try: self.feature_extractor = AudioFeatureExtractor() self.content_analyzer = ContentAnalyzer(self.api_key) self.recommendation_generator = RecommendationGenerator(self.api_key) self.cost_calculator = CostCalculator() except Exception as e: raise RuntimeError(f"Failed to initialize components: {e}") def _get_cached_result(self, key: str) -> Optional[Any]: """Get cached result if available and not expired""" if key in self._cache: timestamp, value = self._cache[key] if time.time() - timestamp < self.cache_ttl: return value return None def _set_cached_result(self, key: str, value: Any): """Cache result with timestamp""" self._cache[key] = (time.time(), value) def _extract_audio(self, video_path: str, output_path: str, progress_callback=None) -> str: """Extract audio from video with optimized settings""" try: if progress_callback: progress_callback(0.1, "Checking dependencies...") # Add optimized ffmpeg settings ffmpeg_cmd = [ 'ffmpeg', '-i', video_path, '-ar', '16000', # Set sample rate to 16kHz '-ac', '1', # Convert to mono '-f', 'wav', # Output format '-v', 'warning', # Reduce verbosity '-y', # Overwrite output file # Add these optimizations: '-c:a', 'pcm_s16le', # Use simple audio codec '-movflags', 'faststart', # Optimize for streaming '-threads', str(max(1, multiprocessing.cpu_count() - 1)), # Use multiple threads output_path ] # Use subprocess with optimized buffer size result = subprocess.run( ffmpeg_cmd, capture_output=True, text=True, bufsize=10*1024*1024 # 10MB buffer ) if result.returncode != 0: raise AudioProcessingError(f"FFmpeg Error: {result.stderr}") if not os.path.exists(output_path): raise AudioProcessingError("Audio extraction failed: output file not created") if progress_callback: progress_callback(1.0, "Audio extraction complete!") return output_path except Exception as e: logger.error(f"Error in audio extraction: {e}") raise AudioProcessingError(f"Audio extraction failed: {str(e)}") def _preprocess_audio(self, input_path: str, output_path: Optional[str] = None) -> str: """Preprocess audio for analysis""" try: if not os.path.exists(input_path): raise FileNotFoundError(f"Input audio file not found: {input_path}") # If no output path specified, use the input path if output_path is None: output_path = input_path # Load audio audio, sr = librosa.load(input_path, sr=16000) # Apply preprocessing steps # 1. Normalize audio audio = librosa.util.normalize(audio) # 2. Remove silence non_silent = librosa.effects.trim(audio, top_db=20)[0] # 3. Save processed audio sf.write(output_path, non_silent, sr) return output_path except Exception as e: logger.error(f"Error in audio preprocessing: {e}") raise AudioProcessingError(f"Audio preprocessing failed: {str(e)}") def evaluate_video(self, video_path: str, transcript_file: Optional[str] = None) -> Dict[str, Any]: try: # Add input validation if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") # Validate video file format valid_extensions = {'.mp4', '.avi', '.mov'} if not any(video_path.lower().endswith(ext) for ext in valid_extensions): raise ValueError("Unsupported video format. Use MP4, AVI, or MOV") # Create progress tracking containers with error handling try: status = st.empty() progress = st.progress(0) tracker = ProgressTracker(status, progress) except Exception as e: logger.error(f"Failed to create progress trackers: {e}") raise # Add cleanup for temporary files temp_files = [] try: with temporary_file(suffix=".wav") as temp_audio, \ temporary_file(suffix=".wav") as processed_audio: temp_files.extend([temp_audio, processed_audio]) # Step 1: Extract audio from video tracker.update(0.1, "Extracting audio from video") self._extract_audio(video_path, temp_audio) tracker.next_step() # Step 2: Preprocess audio tracker.update(0.2, "Preprocessing audio") self._preprocess_audio(temp_audio, processed_audio) tracker.next_step() # Step 3: Extract features tracker.update(0.4, "Extracting audio features") audio_features = self.feature_extractor.extract_features(processed_audio) tracker.next_step() # Step 4: Get transcript - Modified to handle 3-argument progress callback tracker.update(0.6, "Processing transcript") if transcript_file: transcript = transcript_file.getvalue().decode('utf-8') else: # Update progress callback to handle 3 arguments tracker.update(0.6, "Transcribing audio") transcript = self._transcribe_audio( processed_audio, lambda p, m, extra=None: tracker.update(0.6 + p * 0.2, m) ) tracker.next_step() # Step 5: Analyze content tracker.update(0.8, "Analyzing teaching content") content_analysis = self.content_analyzer.analyze_content(transcript) # Step 6: Generate recommendations tracker.update(0.9, "Generating recommendations") recommendations = self.recommendation_generator.generate_recommendations( audio_features, content_analysis ) tracker.next_step() # Add speech metrics evaluation speech_metrics = self._evaluate_speech_metrics(transcript, audio_features) # Clear progress indicators status.empty() progress.empty() return { "audio_features": audio_features, "transcript": transcript, "teaching": content_analysis, "recommendations": recommendations, "speech_metrics": speech_metrics } finally: # Clean up any remaining temporary files for temp_file in temp_files: try: if os.path.exists(temp_file): os.remove(temp_file) except Exception as e: logger.warning(f"Failed to remove temporary file {temp_file}: {e}") except Exception as e: logger.error(f"Error in video evaluation: {e}") # Clean up UI elements on error if 'status' in locals(): status.empty() if 'progress' in locals(): progress.empty() raise RuntimeError(f"Analysis failed: {str(e)}") def _transcribe_audio(self, audio_path: str, progress_callback=None) -> str: """Transcribe audio using Whisper with direct approach and timing""" try: if progress_callback: progress_callback(0.1, "Loading transcription model...") # Generate cache key based on file content cache_key = f"transcript_{hashlib.md5(open(audio_path, 'rb').read()).hexdigest()}" # Check cache first if cache_key in st.session_state: logger.info("Using cached transcription") if progress_callback: progress_callback(1.0, "Retrieved from cache") return st.session_state[cache_key] # Add validation for audio file if not os.path.exists(audio_path): raise FileNotFoundError(f"Audio file not found: {audio_path}") if progress_callback: progress_callback(0.2, "Initializing model...") # Start timing start_time = time.time() try: # Load and transcribe with Whisper model = whisper.load_model("medium") result = model.transcribe(audio_path) transcript = result["text"] # Calculate elapsed time end_time = time.time() elapsed_time = end_time - start_time logger.info(f"Transcription completed in {elapsed_time:.2f} seconds") if progress_callback: progress_callback(0.9, f"Transcription completed in {elapsed_time:.2f} seconds") # Validate transcript if not transcript.strip(): raise ValueError("Transcription produced empty result") # Cache the result st.session_state[cache_key] = transcript if progress_callback: progress_callback(1.0, "Transcription complete!") return transcript except Exception as e: logger.error(f"Error during transcription: {e}") raise RuntimeError(f"Transcription failed: {str(e)}") except Exception as e: logger.error(f"Error in transcription: {e}") if progress_callback: progress_callback(1.0, "Error in transcription", str(e)) raise def _merge_transcripts(self, transcripts: List[str]) -> str: """Merge transcripts with overlap deduplication""" if not transcripts: return "" def clean_text(text): # Remove extra spaces and normalize punctuation return ' '.join(text.split()) def find_overlap(text1, text2): # Find overlapping text between consecutive chunks words1 = text1.split() words2 = text2.split() for i in range(min(len(words1), 20), 0, -1): # Check up to 20 words if ' '.join(words1[-i:]) == ' '.join(words2[:i]): return i return 0 merged = clean_text(transcripts[0]) for i in range(1, len(transcripts)): current = clean_text(transcripts[i]) overlap_size = find_overlap(merged, current) merged += ' ' + current.split(' ', overlap_size)[-1] return merged def calculate_speech_metrics(self, transcript: str, audio_duration: float) -> Dict[str, float]: """Calculate words per minute and other speech metrics.""" words = len(transcript.split()) minutes = audio_duration / 60 return { 'words_per_minute': words / minutes if minutes > 0 else 0, 'total_words': words, 'duration_minutes': minutes } def _evaluate_speech_metrics(self, transcript: str, audio_features: Dict[str, float], progress_callback=None) -> Dict[str, Any]: """Evaluate speech metrics with improved accuracy""" try: if progress_callback: progress_callback(0.2, "Calculating speech metrics...") # Calculate words and duration words = len(transcript.split()) duration_minutes = float(audio_features.get('duration', 0)) / 60 # Calculate words per minute with updated range (130-160 WPM is ideal for teaching) words_per_minute = float(words / duration_minutes if duration_minutes > 0 else 0) # Improved filler word detection (2-3 per minute is acceptable) filler_words = re.findall(r'\b(um|uh|like|you\s+know|basically|actually|literally)\b', transcript.lower()) fillers_count = len(filler_words) fillers_per_minute = float(fillers_count / duration_minutes if duration_minutes > 0 else 0) # Improved error detection (1-2 per minute is acceptable) repeated_words = len(re.findall(r'\b(\w+)\s+\1\b', transcript.lower())) incomplete_sentences = len(re.findall(r'[a-zA-Z]+\s*\.\.\.|\b[a-zA-Z]+\s*-\s+', transcript)) errors_count = repeated_words + incomplete_sentences errors_per_minute = float(errors_count / duration_minutes if duration_minutes > 0 else 0) # Set default thresholds if analysis fails max_errors = 1.0 max_fillers = 3.0 threshold_explanation = "Using standard thresholds" grammatical_errors = [] # Calculate fluency score based on both errors and fillers fluency_score = 1 if (errors_per_minute <= max_errors and fillers_per_minute <= max_fillers) else 0 return { "speed": { "score": 1 if 120 <= words_per_minute <= 180 else 0, "wpm": words_per_minute, "total_words": words, "duration_minutes": duration_minutes }, "fluency": { "score": fluency_score, # Add explicit fluency score "errorsPerMin": errors_per_minute, "fillersPerMin": fillers_per_minute, "maxErrorsThreshold": max_errors, "maxFillersThreshold": max_fillers, "thresholdExplanation": threshold_explanation, "detectedErrors": [ { "type": "Grammar", "context": error, } for error in grammatical_errors ], "detectedFillers": filler_words }, "flow": { "score": 1 if audio_features.get("pauses_per_minute", 0) <= 12 else 0, "pausesPerMin": audio_features.get("pauses_per_minute", 0) }, "intonation": { "pitch": audio_features.get("pitch_mean", 0), "pitchScore": 1 if 20 <= (audio_features.get("pitch_std", 0) / audio_features.get("pitch_mean", 0) * 100 if audio_features.get("pitch_mean", 0) > 0 else 0) <= 40 else 0, "pitchVariation": audio_features.get("pitch_std", 0), "patternScore": 1 if audio_features.get("variations_per_minute", 0) >= 120 else 0, "risingPatterns": audio_features.get("rising_patterns", 0), "fallingPatterns": audio_features.get("falling_patterns", 0), "variationsPerMin": audio_features.get("variations_per_minute", 0), "mu": audio_features.get("pitch_mean", 0) }, "energy": { "score": 1 if 60 <= audio_features.get("mean_amplitude", 0) <= 75 else 0, "meanAmplitude": audio_features.get("mean_amplitude", 0), "amplitudeDeviation": audio_features.get("amplitude_deviation", 0), "variationScore": 1 if 0.05 <= audio_features.get("amplitude_deviation", 0) <= 0.15 else 0 } } except Exception as e: logger.error(f"Error in speech metrics evaluation: {e}") raise def validate_video_file(file_path: str): """Validate video file before processing""" MAX_SIZE = 1024 * 1024 * 1024 # 500MB limit if os.path.getsize(file_path) > MAX_SIZE: raise ValueError(f"File size exceeds {MAX_SIZE/1024/1024}MB limit") valid_extensions = {'.mp4', '.avi', '.mov'} if not os.path.exists(file_path): raise ValueError("Video file does not exist") if os.path.splitext(file_path)[1].lower() not in valid_extensions: raise ValueError("Unsupported video format") try: probe = subprocess.run( ['ffprobe', '-v', 'quiet', file_path], capture_output=True, text=True ) if probe.returncode != 0: raise ValueError("Invalid video file") except subprocess.SubprocessError: raise ValueError("Unable to validate video file") def display_evaluation(evaluation: Dict[str, Any]): """Display evaluation results with improved metrics visualization""" try: tabs = st.tabs(["Communication", "Teaching", "Recommendations", "Transcript"]) with tabs[0]: st.header("Communication Metrics") # Get audio features and ensure we have the required metrics audio_features = evaluation.get("audio_features", {}) # Speed Metrics with st.expander("🏃 Speed", expanded=True): # Fix: Calculate WPM using total words and duration speech_metrics = evaluation.get("speech_metrics", {}) speed_data = speech_metrics.get("speed", {}) words_per_minute = speed_data.get("wpm", 0) # Get WPM from speech metrics col1, col2 = st.columns(2) with col1: st.metric("Score", "✅ Pass" if 120 <= words_per_minute <= 180 else "❌ Needs Improvement") st.metric("Words per Minute", f"{words_per_minute:.1f}") with col2: st.info(""" **Acceptable Range:** 120-180 WPM - Optimal teaching pace: 130-160 WPM """) # Fluency Metrics with st.expander("🗣️ Fluency", expanded=True): # Get metrics from speech evaluation speech_metrics = evaluation.get("speech_metrics", {}) fillers_per_minute = float(speech_metrics.get("fluency", {}).get("fillersPerMin", 0)) errors_per_minute = float(speech_metrics.get("fluency", {}).get("errorsPerMin", 0)) col1, col2 = st.columns(2) with col1: st.metric("Score", "✅ Pass" if fillers_per_minute <= 3 and errors_per_minute <= 1 else "❌ Needs Improvement") st.metric("Fillers per Minute", f"{fillers_per_minute:.1f}") st.metric("Errors per Minute", f"{errors_per_minute:.1f}") with col2: st.info(""" **Acceptable Ranges:** - Fillers per Minute: <3 - Errors per Minute: <1 """) # Flow Metrics with st.expander("🌊 Flow", expanded=True): pauses_per_minute = float(audio_features.get("pauses_per_minute", 0)) col1, col2 = st.columns(2) with col1: st.metric("Score", "✅ Pass" if pauses_per_minute <= 12 else "❌ Needs Improvement") st.metric("Pauses per Minute", f"{pauses_per_minute:.1f}") with col2: st.info(""" **Acceptable Range:** - Pauses per Minute: <12 - Strategic pauses (8-12 PPM) aid comprehension """) # Add explanation card st.markdown("""

📊 Understanding Flow Metrics

""", unsafe_allow_html=True) # Intonation Metrics with st.expander("🎵 Intonation", expanded=True): pitch_mean = float(audio_features.get("pitch_mean", 0)) pitch_std = float(audio_features.get("pitch_std", 0)) pitch_variation_coeff = float(audio_features.get("pitch_variation_coeff", 0)) monotone_score = float(audio_features.get("monotone_score", 0)) direction_changes = float(audio_features.get("direction_changes_per_min", 0)) col1, col2 = st.columns(2) with col1: st.metric("Monotone Score", f"{monotone_score:.2f}") st.metric("Pitch Variation", f"{pitch_variation_coeff:.1f}%") st.metric("Direction Changes/Min", f"{direction_changes:.1f}") with col2: # Add interpretation guide with stricter thresholds st.info(""" **Monotone Analysis:** - Pitch Variation: 20-40% is optimal - Direction Changes: 300-600/min is optimal **Recommendations:** - Aim for pitch variation 20-40% - Target 300-600 direction changes/min - Use stress patterns for key points """) # Add visual indicator only for warning cases if monotone_score > 0.4 or pitch_variation_coeff < 20 or pitch_variation_coeff > 40 or direction_changes < 300 or direction_changes > 600: st.warning("⚠️ Speech patterns need adjustment. Consider varying pitch and pace more naturally.") # Energy Metrics with st.expander("⚡ Energy", expanded=True): mean_amplitude = float(audio_features.get("mean_amplitude", 0)) amplitude_deviation = float(audio_features.get("amplitude_deviation", 0)) sigma_mu_ratio = float(amplitude_deviation) if mean_amplitude > 0 else 0 col1, col2 = st.columns(2) with col1: st.metric("Mean Amplitude", f"{mean_amplitude:.1f}") st.metric("Amplitude Deviation (σ)", f"{amplitude_deviation:.3f}") # st.metric("σ/μ Ratio", f"{sigma_mu_ratio:.3f}") with col2: st.info(""" **Acceptable Ranges:** - Mean Amplitude: 60-75 - Amplitude Deviation: 0.05-0.15 """) # Add explanation card st.markdown("""

📊 Understanding Energy Metrics

""", unsafe_allow_html=True) with tabs[1]: st.header("Teaching Analysis") teaching_data = evaluation.get("teaching", {}) content_analyzer = ContentAnalyzer(st.secrets["OPENAI_API_KEY"]) # Display Concept Assessment with AI-generated suggestions with st.expander("📚 Concept Assessment", expanded=True): concept_data = teaching_data.get("Concept Assessment", {}) for category, details in concept_data.items(): score = details.get("Score", 0) citations = details.get("Citations", []) # Get AI-generated suggestions if score is 0 suggestions = [] if score == 0: suggestions = content_analyzer.generate_suggestions(category, citations) # Create suggestions based on score and category st.markdown(f"""
{category} {'✅ Pass' if score == 1 else '❌ Needs Work'}
""", unsafe_allow_html=True) # Display citations for citation in citations: st.markdown(f"""
{citation}
""", unsafe_allow_html=True) # Display AI-generated suggestions if score is 0 if score == 0 and suggestions: st.markdown("""

🎯 Suggestions for Improvement:

""", unsafe_allow_html=True) for suggestion in suggestions: st.markdown(f"""
• {suggestion}
""", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) st.markdown("---") # Display Code Assessment with AI-generated suggestions with st.expander("💻 Code Assessment", expanded=True): code_data = teaching_data.get("Code Assessment", {}) for category, details in code_data.items(): score = details.get("Score", 0) citations = details.get("Citations", []) # Get AI-generated suggestions if score is 0 suggestions = [] if score == 0: suggestions = content_analyzer.generate_suggestions(category, citations) # Create suggestions based on score and category st.markdown(f"""
{category} {'✅ Pass' if score == 1 else '❌ Needs Work'}
""", unsafe_allow_html=True) for citation in citations: st.markdown(f"""
{citation}
""", unsafe_allow_html=True) # Display AI-generated suggestions if score is 0 if score == 0 and suggestions: st.markdown("""

🎯Suggestions for Improvement:

""", unsafe_allow_html=True) for suggestion in suggestions: st.markdown(f"""
• {suggestion}
""", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) st.markdown("---") with tabs[2]: st.header("Recommendations") recommendations = evaluation.get("recommendations", {}) # Display summary in a styled card if "summary" in recommendations: st.markdown("""

📊 Overall Summary

""", unsafe_allow_html=True) st.markdown(recommendations["summary"]) st.markdown("
", unsafe_allow_html=True) # Display improvements using categories from content analysis st.markdown("

💡 Areas for Improvement

", unsafe_allow_html=True) improvements = recommendations.get("improvements", []) if isinstance(improvements, list): # Use predefined categories categories = { "🗣️ Communication": [], "📚 Teaching": [], "💻 Technical": [] } # Each improvement should now come with a category from the content analysis for improvement in improvements: if isinstance(improvement, dict): category = improvement.get("category", "💻 Technical") # Default to Technical if no category message = improvement.get("message", str(improvement)) if "COMMUNICATION" in category.upper(): categories["🗣️ Communication"].append(message) elif "TEACHING" in category.upper(): categories["📚 Teaching"].append(message) elif "TECHNICAL" in category.upper(): categories["💻 Technical"].append(message) else: # Handle legacy format or plain strings categories["💻 Technical"].append(improvement) # Display categorized improvements in columns cols = st.columns(len(categories)) for col, (category, items) in zip(cols, categories.items()): with col: st.markdown(f"""
{category}
""", unsafe_allow_html=True) for item in items: st.markdown(f"""
• {item}
""", unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) # Add additional CSS for new components st.markdown(""" """, unsafe_allow_html=True) with tabs[3]: st.header("Transcript with Timestamps") transcript = evaluation.get("transcript", "") # Split transcript into sentences and add timestamps sentences = re.split(r'(?<=[.!?])\s+', transcript) for i, sentence in enumerate(sentences): # Calculate approximate timestamp based on words and average speaking rate words_before = len(' '.join(sentences[:i]).split()) timestamp = words_before / 150 # Assuming 150 words per minute minutes = int(timestamp) seconds = int((timestamp - minutes) * 60) st.markdown(f"**[{minutes:02d}:{seconds:02d}]** {sentence}") # Comment out original transcript display # st.text(evaluation.get("transcript", "Transcript not available")) except Exception as e: logger.error(f"Error displaying evaluation: {e}") st.error(f"Error displaying results: {str(e)}") st.error("Please check the evaluation data structure and try again.") # Add these styles to the existing CSS in the main function st.markdown(""" """, unsafe_allow_html=True) def check_dependencies() -> List[str]: """Check if required dependencies are installed""" missing = [] if not shutil.which('ffmpeg'): missing.append("FFmpeg") return missing def generate_pdf_report(evaluation_data: Dict[str, Any]) -> bytes: """Generate a formatted PDF report from evaluation data""" try: from reportlab.lib import colors from reportlab.lib.pagesizes import letter from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle from io import BytesIO # Create PDF buffer buffer = BytesIO() doc = SimpleDocTemplate(buffer, pagesize=letter) styles = getSampleStyleSheet() story = [] # Title title_style = ParagraphStyle( 'CustomTitle', parent=styles['Heading1'], fontSize=24, spaceAfter=30 ) story.append(Paragraph("Mentor Demo Evaluation Report", title_style)) story.append(Spacer(1, 20)) # Communication Metrics Section story.append(Paragraph("Communication Metrics", styles['Heading2'])) comm_metrics = evaluation_data.get("communication", {}) # Create tables for each metric category for category in ["speed", "fluency", "flow", "intonation", "energy"]: if category in comm_metrics: metrics = comm_metrics[category] story.append(Paragraph(category.title(), styles['Heading3'])) data = [[k.replace('_', ' ').title(), str(v)] for k, v in metrics.items()] t = Table(data, colWidths=[200, 200]) t.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 14), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.beige), ('TEXTCOLOR', (0, 1), (-1, -1), colors.black), ('FONTNAME', (0, 1), (-1, -1), 'Helvetica'), ('FONTSIZE', (0, 1), (-1, -1), 12), ('GRID', (0, 0), (-1, -1), 1, colors.black) ])) story.append(t) story.append(Spacer(1, 20)) # Teaching Analysis Section story.append(Paragraph("Teaching Analysis", styles['Heading2'])) teaching_data = evaluation_data.get("teaching", {}) for assessment_type in ["Concept Assessment", "Code Assessment"]: if assessment_type in teaching_data: story.append(Paragraph(assessment_type, styles['Heading3'])) categories = teaching_data[assessment_type] for category, details in categories.items(): score = details.get("Score", 0) citations = details.get("Citations", []) data = [ [category, "Score: " + ("Pass" if score == 1 else "Needs Improvement")], ["Citations:", ""] ] + [["-", citation] for citation in citations] t = Table(data, colWidths=[200, 300]) t.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'LEFT'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('GRID', (0, 0), (-1, -1), 1, colors.black) ])) story.append(t) story.append(Spacer(1, 20)) # Recommendations Section story.append(Paragraph("Recommendations", styles['Heading2'])) recommendations = evaluation_data.get("recommendations", {}) if "summary" in recommendations: story.append(Paragraph("Overall Summary:", styles['Heading3'])) story.append(Paragraph(recommendations["summary"], styles['Normal'])) story.append(Spacer(1, 20)) if "improvements" in recommendations: story.append(Paragraph("Areas for Improvement:", styles['Heading3'])) improvements = recommendations["improvements"] for improvement in improvements: # Handle both string and dictionary improvement formats if isinstance(improvement, dict): message = improvement.get("message", "") category = improvement.get("category", "") story.append(Paragraph(f"• [{category}] {message}", styles['Normal'])) else: story.append(Paragraph(f"• {improvement}", styles['Normal'])) # Build PDF doc.build(story) pdf_data = buffer.getvalue() buffer.close() return pdf_data except Exception as e: logger.error(f"Error generating PDF report: {e}") raise RuntimeError(f"Failed to generate PDF report: {str(e)}") def main(): try: # Set page config must be the first Streamlit command st.set_page_config(page_title="🎓 Mentor Demo Review System", layout="wide") # Initialize session state for tracking progress if 'processing_complete' not in st.session_state: st.session_state.processing_complete = False if 'evaluation_results' not in st.session_state: st.session_state.evaluation_results = None # Add custom CSS for animations and styling st.markdown("""

🎓 Mentor Demo Review System

""", unsafe_allow_html=True) # Sidebar with instructions and status with st.sidebar: st.markdown("""

Instructions

  1. Upload your teaching video
  2. Wait for the analysis
  3. Review the detailed feedback
  4. Download the report
""", unsafe_allow_html=True) # Add file format information separately st.markdown("**Supported formats:** MP4, AVI, MOV") st.markdown("**Maximum file size:** 1GB") # Create a placeholder for status updates in the sidebar status_placeholder = st.empty() status_placeholder.info("Upload a video to begin analysis") # Check dependencies with progress with st.status("Checking system requirements...") as status: progress_bar = st.progress(0) status.update(label="Checking FFmpeg installation...") progress_bar.progress(0.3) missing_deps = check_dependencies() progress_bar.progress(0.6) if missing_deps: status.update(label="Missing dependencies detected!", state="error") st.error(f"Missing required dependencies: {', '.join(missing_deps)}") st.markdown(""" Please install the missing dependencies: ```bash sudo apt-get update sudo apt-get install ffmpeg ``` """) return progress_bar.progress(1.0) status.update(label="System requirements satisfied!", state="complete") # Add input selection with improved styling st.markdown(""" """, unsafe_allow_html=True) # Input type selection with better UI st.markdown('
', unsafe_allow_html=True) st.markdown("### 📤 Select Upload Method") input_type = st.radio( "Choose how you want to provide your teaching content:", options=[ "Video Only (Auto-transcription)", "Video + Manual Transcript" ], help="Select whether you want to upload just the video (we'll transcribe it) or provide your own transcript" ) st.markdown('
', unsafe_allow_html=True) # Video upload section st.markdown('
', unsafe_allow_html=True) st.markdown('

📹 Upload Teaching Video

', unsafe_allow_html=True) uploaded_file = st.file_uploader( "Select video file", type=['mp4', 'avi', 'mov'], help="Upload your teaching video (MP4, AVI, or MOV format, max 1GB)" ) st.markdown('
', unsafe_allow_html=True) # Transcript upload section (conditional) uploaded_transcript = None if input_type == "Video + Manual Transcript": st.markdown('
', unsafe_allow_html=True) st.markdown('

📝 Upload Transcript

', unsafe_allow_html=True) uploaded_transcript = st.file_uploader( "Select transcript file", type=['txt'], help="Upload your transcript (TXT format)" ) st.markdown('
', unsafe_allow_html=True) # Process video when uploaded if uploaded_file: if input_type == "Video + Manual Transcript" and not uploaded_transcript: st.warning("Please upload both video and transcript files to continue.") return # Only process if not already completed if not st.session_state.processing_complete: status_placeholder.info("Video uploaded, beginning processing...") st.markdown("""

Processing your video...

""", unsafe_allow_html=True) # Create temp directory for processing temp_dir = tempfile.mkdtemp() video_path = os.path.join(temp_dir, uploaded_file.name) try: # Save uploaded file with progress with st.status("Saving uploaded file...") as status: # Update sidebar status status_placeholder.info("Saving uploaded file...") progress_bar = st.progress(0) # Save in chunks to show progress chunk_size = 1024 * 1024 # 1MB chunks file_size = len(uploaded_file.getbuffer()) chunks = file_size // chunk_size + 1 with open(video_path, 'wb') as f: for i in range(chunks): start = i * chunk_size end = min(start + chunk_size, file_size) f.write(uploaded_file.getbuffer()[start:end]) progress = (i + 1) / chunks status.update(label=f"Saving file: {progress:.1%}") progress_bar.progress(progress) status.update(label="File saved successfully!", state="complete") # Validate file size file_size = os.path.getsize(video_path) / (1024 * 1024 * 1024) if file_size > 1: st.error("File size exceeds 1GB limit. Please upload a smaller file.") return # Process video status_placeholder.info("Processing video and generating analysis...") process_container = st.container() with process_container: st.markdown("""

🎥 Processing Video

""", unsafe_allow_html=True) evaluator = MentorEvaluator() st.session_state.evaluation_results = evaluator.evaluate_video( video_path, uploaded_transcript if input_type == "Video + Manual Transcript" else None ) st.session_state.processing_complete = True except Exception as e: status_placeholder.error(f"Error during processing: {str(e)}") st.error(f"Error during evaluation: {str(e)}") finally: # Clean up temp files if 'temp_dir' in locals(): shutil.rmtree(temp_dir) # Display results if processing is complete if st.session_state.processing_complete and st.session_state.evaluation_results: status_placeholder.success("Analysis complete! Review results below.") st.success("Analysis complete!") display_evaluation(st.session_state.evaluation_results) # Add download options col1, col2 = st.columns(2) with col1: if st.download_button( "📥 Download JSON Report", json.dumps(st.session_state.evaluation_results, indent=2), "evaluation_report.json", "application/json", help="Download the raw evaluation data in JSON format" ): st.success("JSON report downloaded successfully!") with col2: if st.download_button( "📄 Download Full Report (PDF)", generate_pdf_report(st.session_state.evaluation_results), "evaluation_report.pdf", "application/pdf", help="Download a formatted PDF report with detailed analysis" ): st.success("PDF report downloaded successfully!") except Exception as e: st.error(f"Application error: {str(e)}") if __name__ == "__main__": main()