Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import json | |
| from datetime import datetime | |
| from typing import List, Dict, Tuple | |
| from dotenv import load_dotenv | |
| import shutil | |
| import tempfile | |
| import google.generativeai as genai | |
| import traceback | |
| import numpy as np | |
| import scipy.io.wavfile as wavfile | |
| # Load environment variables | |
| load_dotenv() | |
| # Import OpenAI for Whisper transcription | |
| from openai import OpenAI | |
| # Initialize OpenAI client | |
| openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # Configure Gemini for analysis | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| if gemini_api_key: | |
| genai.configure(api_key=gemini_api_key) | |
| # Try to use the best available Gemini model | |
| try: | |
| # List available models | |
| available_models = genai.list_models() | |
| print("π Available Gemini models:") | |
| gemini_models = [] | |
| for model in available_models: | |
| if 'generateContent' in model.supported_generation_methods: | |
| print(f" - {model.name}") | |
| gemini_models.append(model.name) | |
| # Priority order: Try the best models first | |
| model_priority = [ | |
| 'models/gemini-1.5-pro-latest', # Latest 1.5 Pro | |
| 'models/gemini-1.5-pro', # Stable 1.5 Pro | |
| 'models/gemini-1.5-pro-002', # Specific version | |
| 'models/gemini-1.5-flash', # Faster but still good | |
| 'models/gemini-pro' # Original Pro | |
| ] | |
| gemini_model = None | |
| for model_name in model_priority: | |
| if model_name in gemini_models: | |
| try: | |
| gemini_model = genai.GenerativeModel( | |
| model_name.replace('models/', ''), | |
| generation_config={ | |
| 'temperature': 0.7, # Balance creativity and consistency | |
| 'top_p': 0.95, | |
| 'top_k': 40, | |
| 'max_output_tokens': 8192, # Increased for detailed analysis | |
| } | |
| ) | |
| print(f"β Using {model_name} - Best available model!") | |
| break | |
| except Exception as e: | |
| print(f" Could not initialize {model_name}: {e}") | |
| # Fallback if none of the preferred models work | |
| if not gemini_model and gemini_models: | |
| model_name = gemini_models[0].replace('models/', '') | |
| gemini_model = genai.GenerativeModel(model_name) | |
| print(f"β Using {model_name}") | |
| if not gemini_model: | |
| print("β No suitable Gemini models found!") | |
| except Exception as e: | |
| print(f"β οΈ Error listing Gemini models: {e}") | |
| # Try direct initialization with best model | |
| try: | |
| gemini_model = genai.GenerativeModel( | |
| 'gemini-1.5-pro', | |
| generation_config={ | |
| 'temperature': 0.7, | |
| 'top_p': 0.95, | |
| 'top_k': 40, | |
| 'max_output_tokens': 8192, | |
| } | |
| ) | |
| print("β Gemini 1.5 Pro initialized (direct)") | |
| except: | |
| try: | |
| gemini_model = genai.GenerativeModel('gemini-pro') | |
| print("β Gemini Pro initialized (fallback)") | |
| except: | |
| print("β Could not initialize any Gemini model!") | |
| gemini_model = None | |
| else: | |
| print("β οΈ No Gemini API key found!") | |
| gemini_model = None | |
| class InterviewCoPilot: | |
| def __init__(self): | |
| self.transcript_history = [] | |
| self.research_questions = [] | |
| self.interview_protocol = [] | |
| self.detected_codes = [] | |
| self.coverage_status = { | |
| "rq_covered": [], | |
| "protocol_covered": [] | |
| } | |
| # Add file tracking | |
| self.processed_files = [] | |
| self.current_file_info = {} | |
| self.current_audio_path = None # Store the current audio path | |
| # Enhanced framework support - Initialize all attributes | |
| self.theoretical_framework = "" | |
| self.predefined_codes = {} # {category: [codes]} | |
| self.analysis_focus = [] | |
| self.is_continuation = False # Initialize here | |
| self.segment_number = 1 # Initialize here | |
| # Session memory for Phase 1 | |
| self.session_segments = [] # List of processed segments | |
| self.session_name = f"Interview_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| self.framework_loaded = False | |
| # Create a persistent temp directory for this session | |
| self.temp_dir = tempfile.mkdtemp(prefix="interview_copilot_") | |
| print(f"π Created temp directory: {self.temp_dir}") | |
| # Multi-view analysis support | |
| self.segment_analyses = {} # Store individual segment analyses | |
| def __del__(self): | |
| """Cleanup temp directory on exit""" | |
| if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir): | |
| try: | |
| shutil.rmtree(self.temp_dir) | |
| print(f"π§Ή Cleaned up temp directory: {self.temp_dir}") | |
| except: | |
| pass | |
| def setup_research_context(self, research_questions: str, interview_protocol: str, | |
| theoretical_framework: str = "", predefined_codes: str = "", | |
| analysis_focus: str = ""): | |
| """Setup the research context before starting interviews""" | |
| if not research_questions.strip(): | |
| return "β Please provide at least research questions" | |
| # Parse research questions | |
| self.research_questions = [q.strip() for q in research_questions.split('\n') if q.strip()] | |
| # Parse interview protocol | |
| self.interview_protocol = [q.strip() for q in interview_protocol.split('\n') if q.strip()] | |
| # Store theoretical framework | |
| self.theoretical_framework = theoretical_framework.strip() | |
| # Parse predefined codes (format: "Category: code1, code2, code3") | |
| self.predefined_codes = {} | |
| if predefined_codes.strip(): | |
| for line in predefined_codes.split('\n'): | |
| if ':' in line: | |
| category, codes = line.split(':', 1) | |
| self.predefined_codes[category.strip()] = [ | |
| code.strip() for code in codes.split(',') if code.strip() | |
| ] | |
| # Parse analysis focus areas | |
| self.analysis_focus = [f.strip() for f in analysis_focus.split('\n') if f.strip()] | |
| # Initialize coverage tracking | |
| self.coverage_status = { | |
| "rq_covered": [False] * len(self.research_questions), | |
| "protocol_covered": [False] * len(self.interview_protocol) | |
| } | |
| # Build status message | |
| status_parts = [ | |
| f"β Setup complete!", | |
| f"π Research Questions: {len(self.research_questions)}", | |
| f"π Protocol Questions: {len(self.interview_protocol)}" | |
| ] | |
| if self.theoretical_framework: | |
| status_parts.append(f"π Theoretical Framework: Yes") | |
| if self.predefined_codes: | |
| total_codes = sum(len(codes) for codes in self.predefined_codes.values()) | |
| status_parts.append(f"π·οΈ Predefined Codes: {total_codes} codes in {len(self.predefined_codes)} categories") | |
| if self.analysis_focus: | |
| status_parts.append(f"π― Analysis Focus Areas: {len(self.analysis_focus)}") | |
| # Mark framework as loaded | |
| self.framework_loaded = True | |
| return "\n".join(status_parts) | |
| def add_segment_to_session(self, file_name, duration, transcript_length): | |
| """Add a processed segment to the current session""" | |
| segment_info = { | |
| "number": len(self.session_segments) + 1, | |
| "file_name": file_name, | |
| "duration": duration, | |
| "transcript_length": transcript_length, | |
| "timestamp": datetime.now().strftime("%H:%M:%S"), | |
| "codes_found": len(self.detected_codes) | |
| } | |
| self.session_segments.append(segment_info) | |
| return segment_info | |
| def get_session_summary(self): | |
| """Get a summary of the current session""" | |
| if not self.session_segments: | |
| return "No segments processed yet" | |
| total_duration = sum(seg.get("duration", 0) for seg in self.session_segments) | |
| total_transcript = sum(seg.get("transcript_length", 0) for seg in self.session_segments) | |
| summary = f"""### π Current Session: {self.session_name} | |
| **Segments Processed:** {len(self.session_segments)} | |
| **Total Duration:** {total_duration:.1f} minutes | |
| **Total Transcript:** {total_transcript:,} characters | |
| **Unique Codes Found:** {len(set(self.detected_codes))} | |
| **Processed Files:** | |
| """ | |
| for seg in self.session_segments: | |
| summary += f"\nβ Segment {seg['number']} - {seg['file_name']} ({seg['timestamp']})" | |
| return summary | |
| def reset_session(self, keep_framework=True): | |
| """Reset the session but optionally keep the framework""" | |
| self.session_segments = [] | |
| self.transcript_history = [] | |
| self.detected_codes = [] | |
| self.processed_files = [] | |
| self.segment_number = 1 | |
| self.is_continuation = False | |
| self.segment_analyses = {} # Reset segment analyses | |
| if not keep_framework: | |
| self.research_questions = [] | |
| self.interview_protocol = [] | |
| self.theoretical_framework = "" | |
| self.predefined_codes = {} | |
| self.analysis_focus = [] | |
| self.framework_loaded = False | |
| self.coverage_status = { | |
| "rq_covered": [], | |
| "protocol_covered": [] | |
| } | |
| else: | |
| # Reset only coverage status | |
| self.coverage_status = { | |
| "rq_covered": [False] * len(self.research_questions), | |
| "protocol_covered": [False] * len(self.interview_protocol) | |
| } | |
| self.session_name = f"Interview_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| return "β Session reset. " + ("Framework kept." if keep_framework else "Everything cleared.") | |
| def save_uploaded_file(self, audio_path): | |
| """Save uploaded file to our temp directory to ensure it persists""" | |
| if not audio_path or not os.path.exists(audio_path): | |
| return None | |
| try: | |
| # Copy file to our temp directory | |
| file_name = os.path.basename(audio_path) | |
| saved_path = os.path.join(self.temp_dir, file_name) | |
| # If file already exists, add timestamp to make unique | |
| if os.path.exists(saved_path): | |
| name, ext = os.path.splitext(file_name) | |
| timestamp = datetime.now().strftime("%H%M%S") | |
| file_name = f"{name}_{timestamp}{ext}" | |
| saved_path = os.path.join(self.temp_dir, file_name) | |
| shutil.copy2(audio_path, saved_path) | |
| print(f"πΎ Saved file to: {saved_path}") | |
| return saved_path | |
| except Exception as e: | |
| print(f"β Error saving file: {str(e)}") | |
| return None | |
| def check_audio_file(self, audio_path): | |
| """Pre-check audio file before processing""" | |
| if not audio_path: | |
| return None, "No file selected", None | |
| try: | |
| # Save the file to our temp directory | |
| saved_path = self.save_uploaded_file(audio_path) | |
| if not saved_path: | |
| return None, "β Error saving uploaded file", None | |
| file_size = os.path.getsize(saved_path) | |
| file_size_mb = file_size / (1024 * 1024) | |
| file_name = os.path.basename(saved_path) | |
| # Store file info | |
| self.current_file_info = { | |
| "name": file_name, | |
| "size_mb": file_size_mb, | |
| "path": saved_path, | |
| "original_path": audio_path | |
| } | |
| # Debug info | |
| print(f"π File check:") | |
| print(f" - Original path: {audio_path}") | |
| print(f" - Saved path: {saved_path}") | |
| print(f" - Size: {file_size_mb:.2f} MB") | |
| print(f" - Exists: {os.path.exists(saved_path)}") | |
| # Check file size | |
| if file_size_mb > 25: | |
| status = f"""β οΈ **File too large for direct processing** | |
| - File: {file_name} | |
| - Size: {file_size_mb:.1f} MB | |
| - Maximum: 25 MB | |
| **Options:** | |
| 1. Compress the file using the compression tool below | |
| 2. Split into smaller segments | |
| 3. Use a different recording with lower quality settings""" | |
| return None, status, saved_path | |
| # Good to go | |
| status = f"""β **File ready for processing** | |
| - File: {file_name} | |
| - Size: {file_size_mb:.1f} MB | |
| - Status: Within limits | |
| - Saved to: {os.path.basename(self.temp_dir)}/""" | |
| return saved_path, status, saved_path | |
| except Exception as e: | |
| print(f"β Error in check_audio_file: {traceback.format_exc()}") | |
| return None, f"β Error checking file: {str(e)}", None | |
| def compress_audio(self, audio_path, quality="medium"): | |
| """Compress audio file with different quality settings""" | |
| # Handle different input types | |
| actual_path = None | |
| # If it's a tuple (sample_rate, audio_data), save it first | |
| if isinstance(audio_path, tuple) and len(audio_path) == 2: | |
| sample_rate, audio_data = audio_path | |
| # Save to temporary file | |
| temp_path = os.path.join(self.temp_dir, f"temp_audio_{datetime.now().strftime('%H%M%S')}.wav") | |
| wavfile.write(temp_path, sample_rate, audio_data) | |
| actual_path = temp_path | |
| elif isinstance(audio_path, str): | |
| actual_path = audio_path | |
| else: | |
| return None, "No valid audio file to compress" | |
| if not actual_path or not os.path.exists(actual_path): | |
| return None, "No file to compress or file not found" | |
| try: | |
| import subprocess | |
| # Quality presets | |
| quality_settings = { | |
| "high": {"bitrate": "128k", "sample_rate": "44100"}, | |
| "medium": {"bitrate": "64k", "sample_rate": "22050"}, | |
| "low": {"bitrate": "32k", "sample_rate": "16000"} | |
| } | |
| settings = quality_settings.get(quality, quality_settings["medium"]) | |
| # Create output filename in our temp directory | |
| input_name = os.path.basename(actual_path) | |
| name, ext = os.path.splitext(input_name) | |
| output_path = os.path.join(self.temp_dir, f"{name}_compressed{ext}") | |
| # Compress | |
| cmd = [ | |
| 'ffmpeg', '-i', actual_path, | |
| '-b:a', settings["bitrate"], | |
| '-ar', settings["sample_rate"], | |
| '-ac', '1', # Mono | |
| '-y', output_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| # Check new size | |
| new_size = os.path.getsize(output_path) / (1024 * 1024) | |
| old_size = os.path.getsize(actual_path) / (1024 * 1024) | |
| # Update file info | |
| self.current_file_info["path"] = output_path | |
| self.current_file_info["size_mb"] = new_size | |
| return output_path, f"""β **Compression successful!** | |
| - Original size: {old_size:.1f} MB | |
| - Compressed size: {new_size:.1f} MB | |
| - Reduction: {((old_size - new_size) / old_size * 100):.0f}% | |
| - Quality setting: {quality} | |
| - Saved to: {os.path.basename(output_path)}""" | |
| else: | |
| return None, f"β Compression failed: {result.stderr}" | |
| except subprocess.SubprocessError as e: | |
| return None, f"β FFmpeg error: {str(e)}\n\nMake sure ffmpeg is installed." | |
| except Exception as e: | |
| return None, f"β Error: {str(e)}" | |
| def transcribe_audio(self, audio_path: str, progress_callback=None) -> str: | |
| """Transcribe audio using Whisper API with progress updates""" | |
| if not audio_path: | |
| return "Error: No audio file provided" | |
| if not os.path.exists(audio_path): | |
| return f"Error: Audio file not found at path: {audio_path}" | |
| if not openai_client.api_key: | |
| return "Error: OpenAI API key not found (needed for transcription)" | |
| try: | |
| file_size = os.path.getsize(audio_path) | |
| file_size_mb = file_size / (1024 * 1024) | |
| print(f"π Transcribing file: {audio_path}") | |
| print(f"π File size: {file_size_mb:.2f} MB ({file_size} bytes)") | |
| # Check if it's actually over 25MB (OpenAI's limit) | |
| if file_size_mb > 25: | |
| return f"Error: Audio file too large. File size: {file_size_mb:.1f} MB (limit: 25 MB)" | |
| # Update progress if callback provided | |
| if progress_callback: | |
| progress_callback(f"π΅ Transcribing {file_size_mb:.1f} MB file with OpenAI Whisper...") | |
| with open(audio_path, "rb") as audio_file: | |
| print("π Sending to OpenAI Whisper API...") | |
| # New OpenAI v1.x syntax | |
| transcript = openai_client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file, | |
| response_format="text" | |
| ) | |
| # In the new API, the response is directly the text | |
| text = transcript if isinstance(transcript, str) else str(transcript) | |
| # Add file info to transcript | |
| file_name = self.current_file_info.get("name", "unknown") | |
| if file_name not in self.processed_files: | |
| self.processed_files.append(file_name) | |
| print(f"β Transcription successful! Length: {len(text)} characters") | |
| return text | |
| except Exception as e: | |
| error_msg = str(e) | |
| print(f"β OpenAI API error: {error_msg}") | |
| # Check for specific error types | |
| if "Invalid file format" in error_msg: | |
| return "Error: Invalid audio file format. Supported formats: mp3, mp4, mpeg, mpga, m4a, wav, webm" | |
| elif "too large" in error_msg.lower(): | |
| return "Error: Audio file too large. Please use files under 25MB." | |
| elif "Incorrect API key" in error_msg or "Authentication" in error_msg: | |
| return "Error: Invalid OpenAI API key. Please check your .env file." | |
| elif "Rate limit" in error_msg: | |
| return "Error: OpenAI rate limit reached. Please wait a moment and try again." | |
| else: | |
| return f"Error: {error_msg}" | |
| def analyze_transcript_with_gemini(self, text: str) -> Dict: | |
| """Analyze transcript using Gemini with advanced prompt""" | |
| # Use the enhanced version by default | |
| return self.analyze_transcript_with_gemini_enhanced(text, segment_num=self.segment_number) | |
| def analyze_transcript_with_gemini_enhanced(self, text: str, segment_num: int = None) -> Dict: | |
| """Enhanced analysis that tracks individual segments and can combine them""" | |
| if not text or len(text.strip()) < 10: | |
| return {"error": "Text too short to analyze"} | |
| if not self.research_questions: | |
| return {"error": "Please set up research questions first"} | |
| if not gemini_model: | |
| return {"error": "Gemini API not configured"} | |
| # Determine if this is a specific segment or combined analysis | |
| is_combined = segment_num is None | |
| current_segment = segment_num if segment_num else self.segment_number | |
| # Build context section | |
| context_parts = [] | |
| if is_combined: | |
| context_parts.append("This is a COMBINED ANALYSIS of all segments.") | |
| context_parts.append(f"Total segments: {len(self.session_segments)}") | |
| else: | |
| context_parts.append(f"This is Segment {current_segment} of the interview.") | |
| if current_segment > 1: | |
| context_parts.append("Previous segments have covered:") | |
| covered_rqs = [f"RQ{i + 1}" for i, covered in enumerate(self.coverage_status["rq_covered"]) if covered] | |
| if covered_rqs: | |
| context_parts.append(f"- Research Questions: {', '.join(covered_rqs)}") | |
| context_section = "\n".join(context_parts) | |
| # Build framework section | |
| framework_section = "" | |
| if self.theoretical_framework: | |
| framework_section += f"\nTHEORETICAL FRAMEWORK:\n{self.theoretical_framework}\n" | |
| if self.predefined_codes: | |
| framework_section += "\nPREDEFINED CODES:\n" | |
| for category, codes in self.predefined_codes.items(): | |
| framework_section += f"- {category}: {', '.join(codes)}\n" | |
| if self.analysis_focus: | |
| framework_section += "\nANALYSIS FOCUS:\n" | |
| framework_section += "\n".join([f"- {focus}" for focus in self.analysis_focus]) | |
| # Modified prompt for combined vs individual analysis | |
| analysis_type = "COMBINED TRANSCRIPT" if is_combined else f"SEGMENT {current_segment}" | |
| prompt = f"""You are a Qualitative Research Analysis Assistant. | |
| {context_section} | |
| {analysis_type}: "{text}" | |
| RESEARCH FRAMEWORK: | |
| - Research Questions: | |
| {chr(10).join([f" RQ{i + 1}: {q}" for i, q in enumerate(self.research_questions)])} | |
| - Interview Protocol: | |
| {chr(10).join([f" Q{i + 1}: {q}" for i, q in enumerate(self.interview_protocol)])} | |
| {framework_section} | |
| ANALYSIS TASKS: | |
| 1. Apply predefined codes where relevant | |
| 2. Identify emergent codes not in the framework | |
| 3. Track research question coverage | |
| 4. Note theoretical alignments or challenges | |
| 5. Consider the analysis focus areas | |
| {"6. Identify patterns across segments" if is_combined else ""} | |
| {"7. Note evolution of themes" if is_combined else ""} | |
| PROVIDE YOUR ANALYSIS IN THIS EXACT JSON FORMAT: | |
| {{ | |
| "segment_number": {current_segment if not is_combined else '"combined"'}, | |
| "analysis_type": "{"combined" if is_combined else "individual"}", | |
| "alerts": [ | |
| {{"type": "supports", "code": "Code Name", "text": "β Supports [Theory/Concept]: ..."}}, | |
| {{"type": "challenges", "text": "β οΈ Challenges [Framework]: ..."}}, | |
| {{"type": "missing", "text": "π Missing [Dimension]: ..."}}, | |
| {{"type": "emergent", "code": "New Code", "text": "β³οΈ Emergent theme: ..."}}, | |
| {{"type": "noteworthy", "text": "π Noteworthy: ..."}} | |
| ], | |
| "rq_addressed": [1, 2], | |
| "codes_applied": ["Code 1", "Code 2"], | |
| "emergent_codes": ["New Theme 1"], | |
| "coverage": {{ | |
| "protocol_covered": [1, 3, 5], | |
| "completion_percent": 40, | |
| "missing_topics": ["Topic A", "Topic B"] | |
| }}, | |
| "follow_ups": [ | |
| "π§ To explore [concept], ask: 'Question?'", | |
| "π§ RQ3 needs data on [topic]" | |
| ], | |
| "insights": [ | |
| "Key pattern or finding", | |
| "Theoretical implication" | |
| ], | |
| "segment_summary": "Brief summary of {"all segments combined" if is_combined else "this segment's contribution"}"{', "cross_segment_patterns": ["Pattern 1", "Pattern 2"],' if is_combined else ""}{'"theme_evolution": "Description of how themes evolved across segments"' if is_combined else ""} | |
| }} | |
| Return ONLY the JSON.""" | |
| try: | |
| print(f"π€ Analyzing {analysis_type} with Gemini...") | |
| response = gemini_model.generate_content(prompt) | |
| content = response.text.strip() | |
| # Parse JSON response | |
| try: | |
| start = content.find('{') | |
| end = content.rfind('}') + 1 | |
| if start >= 0 and end > start: | |
| json_str = content[start:end] | |
| analysis = json.loads(json_str) | |
| else: | |
| analysis = json.loads(content) | |
| except json.JSONDecodeError: | |
| print(f"JSON parsing error. Raw response: {content[:200]}...") | |
| # Return a default structure | |
| analysis = { | |
| "segment_number": current_segment if not is_combined else "combined", | |
| "analysis_type": "combined" if is_combined else "individual", | |
| "alerts": [], | |
| "rq_addressed": [], | |
| "codes_applied": [], | |
| "emergent_codes": [], | |
| "coverage": { | |
| "protocol_covered": [], | |
| "completion_percent": 0, | |
| "missing_topics": [] | |
| }, | |
| "follow_ups": ["Please try again"], | |
| "insights": ["Unable to parse response"], | |
| "segment_summary": "Analysis failed" | |
| } | |
| # Store individual segment analysis | |
| if not is_combined: | |
| self.segment_analyses[current_segment] = analysis | |
| # Update coverage tracking | |
| for rq_num in analysis.get("rq_addressed", []): | |
| if isinstance(rq_num, int) and 0 < rq_num <= len(self.research_questions): | |
| self.coverage_status["rq_covered"][rq_num - 1] = True | |
| for pq_num in analysis.get("coverage", {}).get("protocol_covered", []): | |
| if isinstance(pq_num, int) and 0 < pq_num <= len(self.interview_protocol): | |
| self.coverage_status["protocol_covered"][pq_num - 1] = True | |
| # Add codes to master list | |
| self.detected_codes.extend(analysis.get("codes_applied", [])) | |
| self.detected_codes.extend(analysis.get("emergent_codes", [])) | |
| return analysis | |
| except Exception as e: | |
| print(f"β Gemini error: {type(e).__name__}: {str(e)}") | |
| return {"error": f"Analysis error: {str(e)}"} | |
| def format_analysis_output(self, analysis: Dict, show_segment_info: bool = True) -> str: | |
| """Format analysis output with segment information""" | |
| if "error" in analysis: | |
| return f"β {analysis['error']}" | |
| # Determine analysis type | |
| is_combined = analysis.get("analysis_type") == "combined" | |
| segment_num = analysis.get("segment_number", "Unknown") | |
| # Format alerts section | |
| alerts_text = "" | |
| if "alerts" in analysis: | |
| alerts_text = "### π’ Analysis Alerts:\n" | |
| for alert in analysis.get("alerts", []): | |
| alerts_text += f"{alert.get('text', '')}\n" | |
| # Format codes section | |
| codes_section = "" | |
| applied_codes = analysis.get("codes_applied", []) | |
| emergent_codes = analysis.get("emergent_codes", []) | |
| if applied_codes: | |
| codes_section += f"**Applied Codes:** {', '.join(applied_codes)}\n" | |
| if emergent_codes: | |
| codes_section += f"**β³οΈ Emergent Codes:** {', '.join(emergent_codes)}\n" | |
| # Build header based on type | |
| if is_combined: | |
| header = "### π Combined Analysis Results (All Segments)" | |
| segment_info = f"**Total Segments Analyzed:** {len(self.session_segments)}\n" | |
| else: | |
| header = f"### π Analysis Results - Segment {segment_num}" | |
| segment_info = f"**π Segment {segment_num} Summary:** {analysis.get('segment_summary', 'Analysis of this segment')}\n" | |
| # Get file name for current segment | |
| file_info = "" | |
| if not is_combined and segment_num != "Unknown" and isinstance(segment_num, int): | |
| if segment_num <= len(self.session_segments): | |
| file_info = f"**File:** {self.session_segments[segment_num - 1].get('file_name', 'unknown')}\n" | |
| # Build main analysis text | |
| analysis_text = f"""{header} | |
| {segment_info if show_segment_info else ""}{file_info}**Research Questions Addressed:** {', '.join([f"RQ{n}" for n in analysis.get('rq_addressed', [])])} | |
| {alerts_text} | |
| **Codes/Themes:** | |
| {codes_section} | |
| **Protocol Coverage:** {', '.join([f"Q{n}" for n in analysis.get('coverage', {}).get('protocol_covered', [])])} | |
| **Completion:** {analysis.get('coverage', {}).get('completion_percent', 0)}% of protocol addressed | |
| **Key Insights:** | |
| {chr(10).join(['β’ ' + insight for insight in analysis.get('insights', [])])}""" | |
| # Add combined-specific sections | |
| if is_combined: | |
| if "cross_segment_patterns" in analysis: | |
| analysis_text += "\n\n**Cross-Segment Patterns:**\n" | |
| analysis_text += chr(10).join( | |
| ['β’ ' + pattern for pattern in analysis.get('cross_segment_patterns', [])]) | |
| if "theme_evolution" in analysis: | |
| analysis_text += f"\n\n**Theme Evolution:**\n{analysis.get('theme_evolution', '')}" | |
| missing_topics = analysis.get('coverage', {}).get('missing_topics', []) | |
| if missing_topics: | |
| analysis_text += f"\n\n**Missing Topics:**\n{chr(10).join(['β’ ' + topic for topic in missing_topics])}" | |
| return analysis_text | |
| def generate_multi_view_analysis(self): | |
| """Generate both individual segment analyses and combined analysis""" | |
| if not hasattr(self, 'segment_analyses') or not self.segment_analyses: | |
| return "No segments analyzed yet", "", "" | |
| # Format individual segment analyses | |
| individual_analyses = "## π Individual Segment Analyses\n\n" | |
| for seg_num in sorted(self.segment_analyses.keys()): | |
| analysis = self.segment_analyses[seg_num] | |
| formatted = self.format_analysis_output(analysis, show_segment_info=True) | |
| individual_analyses += f"{formatted}\n\n{'=' * 50}\n\n" | |
| # Generate combined analysis if multiple segments | |
| combined_analysis = "" | |
| if len(self.segment_analyses) > 1: | |
| # Combine all transcripts | |
| all_transcripts = "\n\n".join(self.transcript_history) | |
| # Run combined analysis | |
| combined_result = self.analyze_transcript_with_gemini_enhanced(all_transcripts, segment_num=None) | |
| combined_analysis = "## π Combined Analysis (All Segments Together)\n\n" | |
| combined_analysis += self.format_analysis_output(combined_result, show_segment_info=True) | |
| else: | |
| combined_analysis = "Combined analysis requires at least 2 segments" | |
| # Generate comparison view | |
| comparison_view = self.generate_comparison_view() | |
| return individual_analyses, combined_analysis, comparison_view | |
| def generate_comparison_view(self): | |
| """Generate a comparison view of segments""" | |
| if not hasattr(self, 'segment_analyses') or not self.segment_analyses: | |
| return "No segments to compare" | |
| comparison = "## π Segment Comparison\n\n" | |
| # Create comparison table | |
| comparison += "| Segment | RQs Addressed | Codes Applied | Emergent Codes | Completion % |\n" | |
| comparison += "|---------|---------------|---------------|----------------|-------------|\n" | |
| for seg_num in sorted(self.segment_analyses.keys()): | |
| analysis = self.segment_analyses[seg_num] | |
| rqs = ', '.join([f"RQ{n}" for n in analysis.get('rq_addressed', [])]) | |
| applied = len(analysis.get('codes_applied', [])) | |
| emergent = len(analysis.get('emergent_codes', [])) | |
| completion = analysis.get('coverage', {}).get('completion_percent', 0) | |
| comparison += f"| {seg_num} | {rqs} | {applied} | {emergent} | {completion}% |\n" | |
| # Add theme tracking | |
| comparison += "\n### π Theme Frequency Across Segments\n\n" | |
| # Track code frequency by segment | |
| code_by_segment = {} | |
| for seg_num, analysis in self.segment_analyses.items(): | |
| all_codes = analysis.get('codes_applied', []) + analysis.get('emergent_codes', []) | |
| for code in all_codes: | |
| if code not in code_by_segment: | |
| code_by_segment[code] = {} | |
| code_by_segment[code][seg_num] = code_by_segment[code].get(seg_num, 0) + 1 | |
| # Display theme tracking | |
| for code, segments in sorted(code_by_segment.items()): | |
| seg_info = ', '.join([f"Seg{s}: {count}x" for s, count in sorted(segments.items())]) | |
| comparison += f"- **{code}**: {seg_info}\n" | |
| return comparison | |
| def process_interview_segment(self, audio_path, progress_callback=None): | |
| """Process an audio segment and return transcript and analysis""" | |
| print(f"\nπ― Starting process_interview_segment") | |
| print(f" Audio path provided: {audio_path}") | |
| print(f" Type of audio_path: {type(audio_path)}") | |
| # Handle different types of audio input | |
| actual_audio_path = None | |
| # Case 1: audio_path is a tuple (sample_rate, audio_data) from recording | |
| if isinstance(audio_path, tuple) and len(audio_path) == 2: | |
| print(" Detected audio data tuple (recording)") | |
| sample_rate, audio_data = audio_path | |
| # Save the audio data to a temporary file | |
| temp_path = os.path.join(self.temp_dir, f"recorded_{datetime.now().strftime('%H%M%S')}.wav") | |
| wavfile.write(temp_path, sample_rate, audio_data) | |
| actual_audio_path = temp_path | |
| print(f" Saved recording to: {temp_path}") | |
| # Case 2: audio_path is a string (file path) | |
| elif isinstance(audio_path, str): | |
| actual_audio_path = audio_path | |
| # Case 3: audio_path is None, check if we have a saved file | |
| elif audio_path is None and self.current_file_info: | |
| actual_audio_path = self.current_file_info.get("path") | |
| print(f" Using saved path: {actual_audio_path}") | |
| # Validate we have a valid path | |
| if not actual_audio_path or not os.path.exists(actual_audio_path): | |
| return "", "β No audio file found. Please upload a file or record audio first.", "", "", "No file to process" | |
| # Get file info | |
| if isinstance(audio_path, tuple): | |
| file_name = f"recorded_{datetime.now().strftime('%H%M%S')}.wav" | |
| file_size = os.path.getsize(actual_audio_path) / (1024 * 1024) | |
| # Update current file info for recording | |
| self.current_file_info = { | |
| "name": file_name, | |
| "size_mb": file_size, | |
| "path": actual_audio_path | |
| } | |
| else: | |
| file_name = self.current_file_info.get("name", os.path.basename(actual_audio_path)) | |
| file_size = self.current_file_info.get("size_mb", os.path.getsize(actual_audio_path) / (1024 * 1024)) | |
| # Progress update | |
| progress = f"""π Processing: {file_name} ({file_size:.1f} MB) | |
| π Current Step: Transcribing audio with Whisper... | |
| β±οΈ Estimated time: {int(file_size * 0.5)}-{int(file_size * 1)} minutes for transcription | |
| π‘ Tip: Larger files take longer. A 10MB file typically takes 5-10 minutes.""" | |
| # Update progress callback if provided | |
| if progress_callback: | |
| progress_callback(progress) | |
| # Transcribe with Whisper | |
| print(f"π΅ Starting transcription of {file_size:.1f} MB file...") | |
| start_time = datetime.now() | |
| transcript = self.transcribe_audio(actual_audio_path, progress_callback) | |
| transcription_time = (datetime.now() - start_time).total_seconds() | |
| print(f"β Transcription completed in {transcription_time:.1f} seconds") | |
| if transcript.startswith("Error:"): | |
| return transcript, "β Transcription failed", "", "", progress + "\n\nβ Transcription failed" | |
| # Add to history with file info | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| # Safely check for continuation attributes | |
| is_continuation = getattr(self, 'is_continuation', False) | |
| segment_number = getattr(self, 'segment_number', 1) | |
| segment_label = f"Segment {segment_number}" if is_continuation else "Segment 1" | |
| self.transcript_history.append(f"[{timestamp}] [{file_name}] [{segment_label}] {transcript}") | |
| # Check if research context is set up | |
| if not self.research_questions: | |
| full_transcript = "\n\n".join(self.transcript_history) | |
| return full_transcript, "β οΈ Please set up research questions first", "", "", progress | |
| # Update progress for analysis phase | |
| progress = f"""β Transcription complete! ({transcription_time:.1f} seconds) | |
| π Current Step: Analyzing with Gemini 1.5 Pro... | |
| π Analyzing {segment_label} | |
| β±οΈ This usually takes 10-30 seconds...""" | |
| if progress_callback: | |
| progress_callback(progress) | |
| # Analyze with Gemini | |
| print(f"π€ Starting Gemini analysis...") | |
| analysis_start = datetime.now() | |
| analysis = self.analyze_transcript_with_gemini(transcript) | |
| analysis_time = (datetime.now() - analysis_start).total_seconds() | |
| print(f"β Analysis completed in {analysis_time:.1f} seconds") | |
| # Format outputs | |
| full_transcript = "\n\n".join(self.transcript_history) | |
| if "error" not in analysis: | |
| # Format analysis output | |
| analysis_text = self.format_analysis_output(analysis) | |
| follow_ups = "### π‘ Suggested Follow-ups:\n" + \ | |
| '\n'.join(analysis.get('follow_ups', [])) | |
| rq_coverage = sum(self.coverage_status["rq_covered"]) / len( | |
| self.research_questions) * 100 if self.research_questions else 0 | |
| protocol_coverage = sum(self.coverage_status["protocol_covered"]) / len( | |
| self.interview_protocol) * 100 if self.interview_protocol else 0 | |
| # Track unique codes | |
| all_codes = list(set(self.detected_codes)) | |
| applied_unique = list(set(analysis.get("codes_applied", []))) | |
| emergent_unique = list(set(analysis.get("emergent_codes", []))) | |
| coverage = f"""### π Overall Progress: | |
| - **Research Questions:** {rq_coverage:.0f}% ({sum(self.coverage_status["rq_covered"])}/{len(self.research_questions)}) | |
| - **Protocol Questions:** {protocol_coverage:.0f}% ({sum(self.coverage_status["protocol_covered"])}/{len(self.interview_protocol)}) | |
| - **Total Unique Codes:** {len(all_codes)} | |
| - Framework Codes: {len(applied_unique)} | |
| - Emergent Codes: {len(emergent_unique)} | |
| - **Segments Processed:** {len(self.processed_files)}""" | |
| progress = f"β Completed: {file_name} ({segment_label})" | |
| else: | |
| analysis_text = f"β {analysis['error']}" | |
| follow_ups = "Unable to generate follow-ups" | |
| coverage = "Unable to calculate coverage" | |
| progress = f"β Failed: {file_name}" | |
| return full_transcript, analysis_text, follow_ups, coverage, progress | |
| # Initialize | |
| copilot = InterviewCoPilot() | |
| # Create improved interface | |
| with gr.Blocks(title="Research Interview Co-Pilot", theme=gr.themes.Soft(), css=""" | |
| .file-info { background-color: #f0f0f0; padding: 10px; border-radius: 5px; margin: 10px 0; } | |
| .success { color: #28a745; } | |
| .warning { color: #ffc107; } | |
| .error { color: #dc3545; } | |
| h1 { text-align: center; } | |
| .contain { max-width: 1200px; margin: auto; } | |
| """) as app: | |
| gr.Markdown(""" | |
| # ποΈ Research Interview Co-Pilot - Enhanced with Multi-View Analysis | |
| **Transcription:** OpenAI Whisper | **Analysis:** Google Gemini Pro | |
| Now with individual segment analysis, combined analysis, and segment comparison! | |
| """) | |
| with gr.Tab("π Setup"): | |
| gr.Markdown("### Set up your research context") | |
| with gr.Row(): | |
| with gr.Column(): | |
| rq_input = gr.Textbox( | |
| label="Research Questions (one per line) *", | |
| placeholder="What pedagogical strategies are evident in AI educators?\nHow do AI tools emphasize practical applications?\nWhat are the differences between various AI approaches?", | |
| lines=6 | |
| ) | |
| protocol_input = gr.Textbox( | |
| label="Interview Protocol Questions (one per line)", | |
| placeholder="Tell me about your experience with AI\nHow do you use AI tools?\nWhat challenges have you faced?", | |
| lines=6 | |
| ) | |
| with gr.Column(): | |
| framework_input = gr.Textbox( | |
| label="Theoretical Framework (optional)", | |
| placeholder="e.g., Technology Acceptance Model (TAM)\nGrounded Theory approach\nActivity Theory lens", | |
| lines=3 | |
| ) | |
| codes_input = gr.Textbox( | |
| label="Predefined Codes (optional - format: 'Category: code1, code2')", | |
| placeholder="Pedagogical: Scaffolding, Direct Instruction, Guided Practice\nPractical: Application, Implementation, Real-world Use\nEthical: Privacy Concerns, Bias Awareness, Transparency", | |
| lines=6 | |
| ) | |
| focus_input = gr.Textbox( | |
| label="Analysis Focus Areas (optional - one per line)", | |
| placeholder="Look for emotional responses\nPay attention to metaphors used\nNote any resistance or enthusiasm", | |
| lines=3 | |
| ) | |
| # Segment continuation option | |
| with gr.Row(): | |
| continue_interview = gr.Checkbox( | |
| label="This is a continuation of a previous interview segment", | |
| value=False | |
| ) | |
| segment_info = gr.Textbox( | |
| label="Segment Info", | |
| value="Segment 1", | |
| interactive=False | |
| ) | |
| setup_btn = gr.Button("Setup Research Context", variant="primary", size="lg") | |
| setup_output = gr.Textbox(label="Setup Status", interactive=False, lines=6) | |
| # Save/Load framework buttons | |
| with gr.Row(): | |
| save_framework_btn = gr.Button("πΎ Save Framework", size="sm") | |
| load_framework_btn = gr.Button("π Load Framework", size="sm") | |
| framework_file = gr.File(label="Framework File", visible=False, file_types=[".json"]) | |
| def update_segment_info(is_continuation): | |
| if is_continuation: | |
| copilot.is_continuation = True | |
| copilot.segment_number += 1 | |
| return f"Segment {copilot.segment_number} (Continuing from previous)" | |
| else: | |
| copilot.is_continuation = False | |
| copilot.segment_number = 1 | |
| return "Segment 1" | |
| def save_framework(rq, protocol, framework, codes, focus): | |
| """Save current framework to JSON file""" | |
| framework_data = { | |
| "research_questions": rq, | |
| "interview_protocol": protocol, | |
| "theoretical_framework": framework, | |
| "predefined_codes": codes, | |
| "analysis_focus": focus, | |
| "saved_date": datetime.now().isoformat() | |
| } | |
| filename = f"framework_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| filepath = os.path.join(copilot.temp_dir, filename) | |
| with open(filepath, 'w') as f: | |
| json.dump(framework_data, f, indent=2) | |
| return gr.update(visible=True, value=filepath) | |
| def load_framework(file): | |
| """Load framework from JSON file""" | |
| if not file: | |
| return "", "", "", "", "", "No file selected" | |
| try: | |
| with open(file.name, 'r') as f: | |
| data = json.load(f) | |
| return ( | |
| data.get("research_questions", ""), | |
| data.get("interview_protocol", ""), | |
| data.get("theoretical_framework", ""), | |
| data.get("predefined_codes", ""), | |
| data.get("analysis_focus", ""), | |
| f"β Loaded framework from {os.path.basename(file.name)}" | |
| ) | |
| except Exception as e: | |
| return "", "", "", "", "", f"β Error loading file: {str(e)}" | |
| continue_interview.change( | |
| update_segment_info, | |
| inputs=[continue_interview], | |
| outputs=[segment_info] | |
| ) | |
| setup_btn.click( | |
| fn=copilot.setup_research_context, | |
| inputs=[rq_input, protocol_input, framework_input, codes_input, focus_input], | |
| outputs=setup_output | |
| ) | |
| save_framework_btn.click( | |
| save_framework, | |
| inputs=[rq_input, protocol_input, framework_input, codes_input, focus_input], | |
| outputs=[framework_file] | |
| ) | |
| framework_file.change( | |
| lambda x: gr.update(visible=False), | |
| inputs=[framework_file], | |
| outputs=[framework_file] | |
| ) | |
| load_framework_btn.click( | |
| lambda: gr.update(visible=True), | |
| outputs=[framework_file] | |
| ).then( | |
| load_framework, | |
| inputs=[framework_file], | |
| outputs=[rq_input, protocol_input, framework_input, codes_input, focus_input, setup_output] | |
| ) | |
| with gr.Tab("π€ Interview Processing"): | |
| gr.Markdown("### Process interview audio with multi-view analysis") | |
| # Session info at the top | |
| with gr.Row(): | |
| session_info = gr.Markdown(copilot.get_session_summary()) | |
| with gr.Row(): | |
| # Session control buttons | |
| new_file_btn = gr.Button("π New File, Keep Setup", variant="secondary") | |
| reset_session_btn = gr.Button("π Reset Session", variant="secondary") | |
| reset_all_btn = gr.Button("ποΈ Reset Everything", variant="stop") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # File upload with preview | |
| audio_input = gr.Audio( | |
| sources=["upload", "microphone"], | |
| type="filepath", | |
| label="π Upload Audio File or π€ Record", | |
| interactive=True | |
| ) | |
| file_status = gr.Markdown("*Upload a file to see its status*") | |
| # Compression tool | |
| with gr.Accordion("π§ Audio Compression Tool", open=False): | |
| gr.Markdown("Compress large audio files") | |
| quality_select = gr.Radio( | |
| choices=["high", "medium", "low"], | |
| value="medium", | |
| label="Compression Quality" | |
| ) | |
| compress_btn = gr.Button("Compress Audio", variant="secondary") | |
| compress_output = gr.Markdown() | |
| compressed_audio = gr.Audio( | |
| label="Compressed Audio", | |
| visible=False | |
| ) | |
| process_btn = gr.Button("π Process & Analyze", variant="primary", size="lg") | |
| # Add visual processing indicator | |
| processing_status = gr.Markdown( | |
| value="", | |
| visible=True | |
| ) | |
| # Add progress bar | |
| with gr.Row(): | |
| progress_bar = gr.Progress() | |
| progress_status = gr.Textbox( | |
| label="Progress", | |
| interactive=False, | |
| lines=4, | |
| value="Ready to process audio..." | |
| ) | |
| # Add multi-view analysis button AFTER progress status | |
| generate_multiview_btn = gr.Button( | |
| "π Generate Multi-View Analysis", | |
| variant="secondary", | |
| size="lg", | |
| visible=True # Always visible for now | |
| ) | |
| with gr.Column(scale=2): | |
| # Results area with enhanced tabs | |
| with gr.Tabs(): | |
| with gr.Tab("π Transcript"): | |
| transcript_output = gr.Textbox( | |
| label="Full Transcript", | |
| lines=15, | |
| max_lines=25, | |
| interactive=False | |
| ) | |
| with gr.Tab("π Current Segment"): | |
| current_analysis_output = gr.Markdown( | |
| value="*Process a segment to see analysis*" | |
| ) | |
| with gr.Tab("π All Segments"): | |
| all_segments_output = gr.Markdown( | |
| value="*Individual analyses will appear here*" | |
| ) | |
| with gr.Tab("π Combined Analysis"): | |
| combined_analysis_output = gr.Markdown( | |
| value="*Combined analysis will appear here after 2+ segments*" | |
| ) | |
| with gr.Tab("π Comparison"): | |
| comparison_output = gr.Markdown( | |
| value="*Segment comparison will appear here*" | |
| ) | |
| with gr.Tab("π‘ Follow-ups"): | |
| followup_output = gr.Markdown() | |
| with gr.Tab("π Coverage"): | |
| coverage_output = gr.Markdown() | |
| # Hidden state to store file path | |
| audio_state = gr.State() | |
| # Session management functions | |
| def new_file_keep_setup(): | |
| """Clear audio input but keep framework""" | |
| copilot.is_continuation = True | |
| copilot.segment_number = len(copilot.session_segments) + 1 | |
| return ( | |
| None, # Clear audio input | |
| "*Upload a new file to continue the interview*", | |
| f"Ready for Segment {copilot.segment_number}", | |
| copilot.get_session_summary() | |
| ) | |
| def reset_session(): | |
| """Reset session but keep framework""" | |
| result = copilot.reset_session(keep_framework=True) | |
| return ( | |
| None, # Clear audio | |
| "*Session reset. Framework kept.*", | |
| "Ready to process audio...", | |
| copilot.get_session_summary(), | |
| "" # Clear transcript | |
| ) | |
| def reset_everything(): | |
| """Reset everything including framework""" | |
| result = copilot.reset_session(keep_framework=False) | |
| return ( | |
| None, # Clear audio | |
| "*Everything reset. Please set up framework again.*", | |
| "Ready to process audio...", | |
| copilot.get_session_summary(), | |
| "", # Clear transcript | |
| "β Framework cleared. Please go to Setup tab." | |
| ) | |
| # File status update - store the path in state | |
| audio_input.change( | |
| fn=copilot.check_audio_file, | |
| inputs=[audio_input], | |
| outputs=[audio_input, file_status, audio_state] | |
| ) | |
| # Compression - update state with compressed file | |
| compress_btn.click( | |
| fn=copilot.compress_audio, | |
| inputs=[audio_state, quality_select], | |
| outputs=[compressed_audio, compress_output] | |
| ).then( | |
| fn=lambda x, msg: (gr.update(visible=True), x) if x else (gr.update(visible=False), None), | |
| inputs=[compressed_audio, compress_output], | |
| outputs=[compressed_audio, audio_state] | |
| ) | |
| # Modified process function to handle multi-view | |
| def process_and_update_session_multiview(audio_path, progress=gr.Progress()): | |
| """Process audio and update session info with multi-view support""" | |
| # Create a progress callback function | |
| def update_progress(message): | |
| progress(0.5, desc=message) | |
| return message | |
| # Initialize progress | |
| progress(0, desc="Starting audio processing...") | |
| # First, process the current segment with progress callback | |
| results = copilot.process_interview_segment(audio_path, progress_callback=update_progress) | |
| # Update progress to complete | |
| progress(1.0, desc="Processing complete!") | |
| # Add to session if successful | |
| if results[4].startswith("β "): | |
| file_name = copilot.current_file_info.get("name", "unknown") | |
| duration = copilot.current_file_info.get("size_mb", 0) * 0.5 # Rough estimate | |
| transcript_length = len(results[0]) | |
| copilot.add_segment_to_session(file_name, duration, transcript_length) | |
| # Get current segment analysis | |
| current_segment_analysis = results[1] | |
| # Check if we should show multi-view button (only after 2+ segments for meaningful comparison) | |
| show_multiview = len(copilot.session_segments) >= 2 | |
| # Return results plus updated session info | |
| return ( | |
| results[0], # transcript | |
| current_segment_analysis, # current segment analysis | |
| results[2], # follow-ups | |
| results[3], # coverage | |
| results[4], # progress | |
| copilot.get_session_summary(), # session info | |
| gr.update(visible=show_multiview) # multi-view button visibility | |
| ) | |
| # Multi-view generation function | |
| def generate_all_views(): | |
| """Generate all analysis views""" | |
| individual, combined, comparison = copilot.generate_multi_view_analysis() | |
| return individual, combined, comparison | |
| # Connect the process button with loading state | |
| process_btn.click( | |
| fn=lambda: gr.update( | |
| value="π **Processing in progress...** Please wait, this may take several minutes for large files."), | |
| outputs=[processing_status] | |
| ).then( | |
| fn=process_and_update_session_multiview, | |
| inputs=[audio_state], | |
| outputs=[ | |
| transcript_output, | |
| current_analysis_output, | |
| followup_output, | |
| coverage_output, | |
| progress_status, | |
| session_info, | |
| generate_multiview_btn | |
| ] | |
| ).then( | |
| fn=lambda: gr.update(value=""), | |
| outputs=[processing_status] | |
| ) | |
| # Connect the multi-view button | |
| generate_multiview_btn.click( | |
| fn=generate_all_views, | |
| outputs=[ | |
| all_segments_output, | |
| combined_analysis_output, | |
| comparison_output | |
| ] | |
| ) | |
| # Session control buttons | |
| new_file_btn.click( | |
| fn=new_file_keep_setup, | |
| outputs=[audio_input, file_status, progress_status, session_info] | |
| ) | |
| reset_session_btn.click( | |
| fn=reset_session, | |
| outputs=[audio_input, file_status, progress_status, session_info, transcript_output] | |
| ) | |
| reset_all_btn.click( | |
| fn=reset_everything, | |
| outputs=[audio_input, file_status, progress_status, session_info, transcript_output, | |
| current_analysis_output] | |
| ) | |
| with gr.Tab("π Summary & Export"): | |
| gr.Markdown("### Generate comprehensive summary with multi-view analysis") | |
| def generate_enhanced_summary(): | |
| if not copilot.transcript_history: | |
| return "No interview data yet.", "", "" | |
| unique_codes = list(set(copilot.detected_codes)) | |
| # Generate different formats | |
| markdown_summary = f"""# Interview Summary Report | |
| **Generated:** {datetime.now().strftime("%Y-%m-%d %H:%M")} | |
| **Analysis Engine:** Google Gemini Pro | |
| **Files Processed:** {', '.join(copilot.processed_files)} | |
| **Total Segments:** {len(copilot.session_segments)} | |
| ## Research Question Coverage | |
| {chr(10).join([f"- {'β ' if covered else 'β'} {q}" for q, covered in zip(copilot.research_questions, copilot.coverage_status["rq_covered"])])} | |
| ## Detected Codes/Themes ({len(unique_codes)} unique) | |
| {chr(10).join(['- ' + code for code in unique_codes])} | |
| ## Segment-by-Segment Analysis | |
| {"Included in multi-view analysis - see Interview Processing tab" if copilot.segment_analyses else "No individual analyses yet"} | |
| ## Full Transcript | |
| {chr(10).join(copilot.transcript_history)}""" | |
| # CSV format for codes | |
| csv_codes = "Code,Frequency\n" | |
| code_freq = {} | |
| for code in copilot.detected_codes: | |
| code_freq[code] = code_freq.get(code, 0) + 1 | |
| for code, freq in sorted(code_freq.items(), key=lambda x: x[1], reverse=True): | |
| csv_codes += f'"{code}",{freq}\n' | |
| # JSON format with segment analyses | |
| json_export = json.dumps({ | |
| "metadata": { | |
| "date": datetime.now().isoformat(), | |
| "files": copilot.processed_files, | |
| "total_segments": len(copilot.transcript_history), | |
| "analysis_engine": "Gemini Pro" | |
| }, | |
| "research_questions": { | |
| "questions": copilot.research_questions, | |
| "coverage": copilot.coverage_status["rq_covered"] | |
| }, | |
| "codes": unique_codes, | |
| "transcripts": copilot.transcript_history, | |
| "segment_analyses": {str(k): v for k, v in copilot.segment_analyses.items()} if hasattr(copilot, | |
| 'segment_analyses') else {} | |
| }, indent=2) | |
| return markdown_summary, csv_codes, json_export | |
| with gr.Row(): | |
| summary_btn = gr.Button("Generate All Formats", variant="primary", size="lg") | |
| with gr.Row(): | |
| with gr.Column(): | |
| summary_display = gr.Markdown(label="Summary Preview") | |
| with gr.Column(): | |
| with gr.Accordion("π₯ Export Options", open=True): | |
| csv_export = gr.Textbox( | |
| label="CSV Export (Codes)", | |
| lines=10, | |
| interactive=True | |
| ) | |
| json_export = gr.Textbox( | |
| label="JSON Export (Complete Data)", | |
| lines=10, | |
| interactive=True | |
| ) | |
| summary_btn.click( | |
| fn=generate_enhanced_summary, | |
| outputs=[summary_display, csv_export, json_export] | |
| ) | |
| with gr.Tab("βΉοΈ Help"): | |
| gr.Markdown(f""" | |
| ### System Information | |
| **Temp Directory:** {copilot.temp_dir} | |
| **Transcription Engine:** OpenAI Whisper | |
| - Requires: OPENAI_API_KEY in .env file | |
| - Max file size: 25 MB | |
| - Supported formats: MP3, WAV, M4A, OGG, WEBM, MP4, MPEG, MPGA | |
| **Analysis Engine:** Google Gemini Pro | |
| - Requires: GEMINI_API_KEY in .env file | |
| - Free tier: 60 requests per minute | |
| - No file size limits (only processes text) | |
| ### Multi-View Analysis Features | |
| **Current Segment View:** Shows analysis of the just-processed segment | |
| **All Segments View:** Shows individual analyses for each segment | |
| **Combined Analysis:** Analyzes all segments together to find patterns | |
| **Comparison View:** Side-by-side comparison of all segments | |
| ### File Handling Tips | |
| **To reduce file size:** | |
| 1. Use the built-in compression tool | |
| 2. Record at lower quality (16kHz, mono) | |
| 3. Split long recordings into segments | |
| **Best practices:** | |
| - Process 3-5 minute segments for optimal results | |
| - Use clear file names for easy tracking | |
| - Check file size before processing | |
| ### Troubleshooting | |
| **If recording doesn't work:** | |
| - Check browser permissions for microphone | |
| - Try a different browser (Chrome/Edge work best) | |
| - Use upload instead of recording | |
| **If processing fails:** | |
| - Check the console for detailed error messages | |
| - Verify your API keys are correct | |
| - Ensure the audio file format is supported | |
| ### Required API Keys | |
| Add to your `.env` file: | |
| ``` | |
| OPENAI_API_KEY=sk-your-openai-key | |
| GEMINI_API_KEY=your-gemini-key | |
| ``` | |
| """) | |
| # Launch | |
| if __name__ == "__main__": | |
| print("\n" + "=" * 50) | |
| print("π Starting Enhanced Research Interview Co-Pilot with Multi-View Analysis") | |
| print("=" * 50) | |
| # Check temp directory | |
| print(f"π Temp directory: {copilot.temp_dir}") | |
| print(f" - Free space: {shutil.disk_usage(tempfile.gettempdir()).free / (1024 ** 3):.1f} GB") | |
| # Check dependencies | |
| if shutil.which('ffmpeg'): | |
| print("β FFmpeg found - compression available") | |
| else: | |
| print("β οΈ FFmpeg not found - compression unavailable") | |
| # Check API keys | |
| if not os.getenv("OPENAI_API_KEY"): | |
| print("β No OpenAI API key found (required for transcription)") | |
| else: | |
| print("β OpenAI API key loaded (Whisper transcription)") | |
| # Test OpenAI client initialization | |
| try: | |
| test_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| print("β OpenAI client initialized successfully") | |
| except Exception as e: | |
| print(f"β Error initializing OpenAI client: {e}") | |
| if not os.getenv("GEMINI_API_KEY"): | |
| print("β No Gemini API key found (required for analysis)") | |
| else: | |
| print("β Gemini API key loaded (analysis)") | |
| if not os.getenv("OPENAI_API_KEY") or not os.getenv("GEMINI_API_KEY"): | |
| print("\nβ οΈ Please add missing API keys to your .env file") | |
| else: | |
| print("\nβ All systems ready!") | |
| print("\nπ Launching application...") | |
| app.queue().launch() |