import os import gradio as gr import json from datetime import datetime from typing import List, Dict, Tuple from dotenv import load_dotenv import shutil import tempfile import google.generativeai as genai import traceback import numpy as np import scipy.io.wavfile as wavfile # Load environment variables load_dotenv() # Import OpenAI for Whisper transcription from openai import OpenAI # Initialize OpenAI client openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Configure Gemini for analysis gemini_api_key = os.getenv("GEMINI_API_KEY") if gemini_api_key: genai.configure(api_key=gemini_api_key) # Try to use the best available Gemini model try: # List available models available_models = genai.list_models() print("๐Ÿ“‹ Available Gemini models:") gemini_models = [] for model in available_models: if 'generateContent' in model.supported_generation_methods: print(f" - {model.name}") gemini_models.append(model.name) # Priority order: Try the best models first model_priority = [ 'models/gemini-1.5-pro-latest', # Latest 1.5 Pro 'models/gemini-1.5-pro', # Stable 1.5 Pro 'models/gemini-1.5-pro-002', # Specific version 'models/gemini-1.5-flash', # Faster but still good 'models/gemini-pro' # Original Pro ] gemini_model = None for model_name in model_priority: if model_name in gemini_models: try: gemini_model = genai.GenerativeModel( model_name.replace('models/', ''), generation_config={ 'temperature': 0.7, # Balance creativity and consistency 'top_p': 0.95, 'top_k': 40, 'max_output_tokens': 8192, # Increased for detailed analysis } ) print(f"โœ… Using {model_name} - Best available model!") break except Exception as e: print(f" Could not initialize {model_name}: {e}") # Fallback if none of the preferred models work if not gemini_model and gemini_models: model_name = gemini_models[0].replace('models/', '') gemini_model = genai.GenerativeModel(model_name) print(f"โœ… Using {model_name}") if not gemini_model: print("โŒ No suitable Gemini models found!") except Exception as e: print(f"โš ๏ธ Error listing Gemini models: {e}") # Try direct initialization with best model try: gemini_model = genai.GenerativeModel( 'gemini-1.5-pro', generation_config={ 'temperature': 0.7, 'top_p': 0.95, 'top_k': 40, 'max_output_tokens': 8192, } ) print("โœ… Gemini 1.5 Pro initialized (direct)") except: try: gemini_model = genai.GenerativeModel('gemini-pro') print("โœ… Gemini Pro initialized (fallback)") except: print("โŒ Could not initialize any Gemini model!") gemini_model = None else: print("โš ๏ธ No Gemini API key found!") gemini_model = None class InterviewCoPilot: def __init__(self): self.transcript_history = [] self.research_questions = [] self.interview_protocol = [] self.detected_codes = [] self.coverage_status = { "rq_covered": [], "protocol_covered": [] } # Add file tracking self.processed_files = [] self.current_file_info = {} self.current_audio_path = None # Store the current audio path # Enhanced framework support - Initialize all attributes self.theoretical_framework = "" self.predefined_codes = {} # {category: [codes]} self.analysis_focus = [] self.is_continuation = False # Initialize here self.segment_number = 1 # Initialize here # Session memory for Phase 1 self.session_segments = [] # List of processed segments self.session_name = f"Interview_{datetime.now().strftime('%Y%m%d_%H%M%S')}" self.framework_loaded = False # Create a persistent temp directory for this session self.temp_dir = tempfile.mkdtemp(prefix="interview_copilot_") print(f"๐Ÿ“ Created temp directory: {self.temp_dir}") # Multi-view analysis support self.segment_analyses = {} # Store individual segment analyses def __del__(self): """Cleanup temp directory on exit""" if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir): try: shutil.rmtree(self.temp_dir) print(f"๐Ÿงน Cleaned up temp directory: {self.temp_dir}") except: pass def setup_research_context(self, research_questions: str, interview_protocol: str, theoretical_framework: str = "", predefined_codes: str = "", analysis_focus: str = ""): """Setup the research context before starting interviews""" if not research_questions.strip(): return "โŒ Please provide at least research questions" # Parse research questions self.research_questions = [q.strip() for q in research_questions.split('\n') if q.strip()] # Parse interview protocol self.interview_protocol = [q.strip() for q in interview_protocol.split('\n') if q.strip()] # Store theoretical framework self.theoretical_framework = theoretical_framework.strip() # Parse predefined codes (format: "Category: code1, code2, code3") self.predefined_codes = {} if predefined_codes.strip(): for line in predefined_codes.split('\n'): if ':' in line: category, codes = line.split(':', 1) self.predefined_codes[category.strip()] = [ code.strip() for code in codes.split(',') if code.strip() ] # Parse analysis focus areas self.analysis_focus = [f.strip() for f in analysis_focus.split('\n') if f.strip()] # Initialize coverage tracking self.coverage_status = { "rq_covered": [False] * len(self.research_questions), "protocol_covered": [False] * len(self.interview_protocol) } # Build status message status_parts = [ f"โœ… Setup complete!", f"๐Ÿ“‹ Research Questions: {len(self.research_questions)}", f"๐Ÿ“ Protocol Questions: {len(self.interview_protocol)}" ] if self.theoretical_framework: status_parts.append(f"๐Ÿ“š Theoretical Framework: Yes") if self.predefined_codes: total_codes = sum(len(codes) for codes in self.predefined_codes.values()) status_parts.append(f"๐Ÿท๏ธ Predefined Codes: {total_codes} codes in {len(self.predefined_codes)} categories") if self.analysis_focus: status_parts.append(f"๐ŸŽฏ Analysis Focus Areas: {len(self.analysis_focus)}") # Mark framework as loaded self.framework_loaded = True return "\n".join(status_parts) def add_segment_to_session(self, file_name, duration, transcript_length): """Add a processed segment to the current session""" segment_info = { "number": len(self.session_segments) + 1, "file_name": file_name, "duration": duration, "transcript_length": transcript_length, "timestamp": datetime.now().strftime("%H:%M:%S"), "codes_found": len(self.detected_codes) } self.session_segments.append(segment_info) return segment_info def get_session_summary(self): """Get a summary of the current session""" if not self.session_segments: return "No segments processed yet" total_duration = sum(seg.get("duration", 0) for seg in self.session_segments) total_transcript = sum(seg.get("transcript_length", 0) for seg in self.session_segments) summary = f"""### ๐Ÿ“Š Current Session: {self.session_name} **Segments Processed:** {len(self.session_segments)} **Total Duration:** {total_duration:.1f} minutes **Total Transcript:** {total_transcript:,} characters **Unique Codes Found:** {len(set(self.detected_codes))} **Processed Files:** """ for seg in self.session_segments: summary += f"\nโœ“ Segment {seg['number']} - {seg['file_name']} ({seg['timestamp']})" return summary def reset_session(self, keep_framework=True): """Reset the session but optionally keep the framework""" self.session_segments = [] self.transcript_history = [] self.detected_codes = [] self.processed_files = [] self.segment_number = 1 self.is_continuation = False self.segment_analyses = {} # Reset segment analyses if not keep_framework: self.research_questions = [] self.interview_protocol = [] self.theoretical_framework = "" self.predefined_codes = {} self.analysis_focus = [] self.framework_loaded = False self.coverage_status = { "rq_covered": [], "protocol_covered": [] } else: # Reset only coverage status self.coverage_status = { "rq_covered": [False] * len(self.research_questions), "protocol_covered": [False] * len(self.interview_protocol) } self.session_name = f"Interview_{datetime.now().strftime('%Y%m%d_%H%M%S')}" return "โœ… Session reset. " + ("Framework kept." if keep_framework else "Everything cleared.") def save_uploaded_file(self, audio_path): """Save uploaded file to our temp directory to ensure it persists""" if not audio_path or not os.path.exists(audio_path): return None try: # Copy file to our temp directory file_name = os.path.basename(audio_path) saved_path = os.path.join(self.temp_dir, file_name) # If file already exists, add timestamp to make unique if os.path.exists(saved_path): name, ext = os.path.splitext(file_name) timestamp = datetime.now().strftime("%H%M%S") file_name = f"{name}_{timestamp}{ext}" saved_path = os.path.join(self.temp_dir, file_name) shutil.copy2(audio_path, saved_path) print(f"๐Ÿ’พ Saved file to: {saved_path}") return saved_path except Exception as e: print(f"โŒ Error saving file: {str(e)}") return None def check_audio_file(self, audio_path): """Pre-check audio file before processing""" if not audio_path: return None, "No file selected", None try: # Save the file to our temp directory saved_path = self.save_uploaded_file(audio_path) if not saved_path: return None, "โŒ Error saving uploaded file", None file_size = os.path.getsize(saved_path) file_size_mb = file_size / (1024 * 1024) file_name = os.path.basename(saved_path) # Store file info self.current_file_info = { "name": file_name, "size_mb": file_size_mb, "path": saved_path, "original_path": audio_path } # Debug info print(f"๐Ÿ“Š File check:") print(f" - Original path: {audio_path}") print(f" - Saved path: {saved_path}") print(f" - Size: {file_size_mb:.2f} MB") print(f" - Exists: {os.path.exists(saved_path)}") # Check file size if file_size_mb > 25: status = f"""โš ๏ธ **File too large for direct processing** - File: {file_name} - Size: {file_size_mb:.1f} MB - Maximum: 25 MB **Options:** 1. Compress the file using the compression tool below 2. Split into smaller segments 3. Use a different recording with lower quality settings""" return None, status, saved_path # Good to go status = f"""โœ… **File ready for processing** - File: {file_name} - Size: {file_size_mb:.1f} MB - Status: Within limits - Saved to: {os.path.basename(self.temp_dir)}/""" return saved_path, status, saved_path except Exception as e: print(f"โŒ Error in check_audio_file: {traceback.format_exc()}") return None, f"โŒ Error checking file: {str(e)}", None def compress_audio(self, audio_path, quality="medium"): """Compress audio file with different quality settings""" # Handle different input types actual_path = None # If it's a tuple (sample_rate, audio_data), save it first if isinstance(audio_path, tuple) and len(audio_path) == 2: sample_rate, audio_data = audio_path # Save to temporary file temp_path = os.path.join(self.temp_dir, f"temp_audio_{datetime.now().strftime('%H%M%S')}.wav") wavfile.write(temp_path, sample_rate, audio_data) actual_path = temp_path elif isinstance(audio_path, str): actual_path = audio_path else: return None, "No valid audio file to compress" if not actual_path or not os.path.exists(actual_path): return None, "No file to compress or file not found" try: import subprocess # Quality presets quality_settings = { "high": {"bitrate": "128k", "sample_rate": "44100"}, "medium": {"bitrate": "64k", "sample_rate": "22050"}, "low": {"bitrate": "32k", "sample_rate": "16000"} } settings = quality_settings.get(quality, quality_settings["medium"]) # Create output filename in our temp directory input_name = os.path.basename(actual_path) name, ext = os.path.splitext(input_name) output_path = os.path.join(self.temp_dir, f"{name}_compressed{ext}") # Compress cmd = [ 'ffmpeg', '-i', actual_path, '-b:a', settings["bitrate"], '-ar', settings["sample_rate"], '-ac', '1', # Mono '-y', output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: # Check new size new_size = os.path.getsize(output_path) / (1024 * 1024) old_size = os.path.getsize(actual_path) / (1024 * 1024) # Update file info self.current_file_info["path"] = output_path self.current_file_info["size_mb"] = new_size return output_path, f"""โœ… **Compression successful!** - Original size: {old_size:.1f} MB - Compressed size: {new_size:.1f} MB - Reduction: {((old_size - new_size) / old_size * 100):.0f}% - Quality setting: {quality} - Saved to: {os.path.basename(output_path)}""" else: return None, f"โŒ Compression failed: {result.stderr}" except subprocess.SubprocessError as e: return None, f"โŒ FFmpeg error: {str(e)}\n\nMake sure ffmpeg is installed." except Exception as e: return None, f"โŒ Error: {str(e)}" def transcribe_audio(self, audio_path: str, progress_callback=None) -> str: """Transcribe audio using Whisper API with progress updates""" if not audio_path: return "Error: No audio file provided" if not os.path.exists(audio_path): return f"Error: Audio file not found at path: {audio_path}" if not openai_client.api_key: return "Error: OpenAI API key not found (needed for transcription)" try: file_size = os.path.getsize(audio_path) file_size_mb = file_size / (1024 * 1024) print(f"๐Ÿ“Š Transcribing file: {audio_path}") print(f"๐Ÿ“Š File size: {file_size_mb:.2f} MB ({file_size} bytes)") # Check if it's actually over 25MB (OpenAI's limit) if file_size_mb > 25: return f"Error: Audio file too large. File size: {file_size_mb:.1f} MB (limit: 25 MB)" # Update progress if callback provided if progress_callback: progress_callback(f"๐ŸŽต Transcribing {file_size_mb:.1f} MB file with OpenAI Whisper...") with open(audio_path, "rb") as audio_file: print("๐Ÿ“Š Sending to OpenAI Whisper API...") # New OpenAI v1.x syntax transcript = openai_client.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="text" ) # In the new API, the response is directly the text text = transcript if isinstance(transcript, str) else str(transcript) # Add file info to transcript file_name = self.current_file_info.get("name", "unknown") if file_name not in self.processed_files: self.processed_files.append(file_name) print(f"โœ… Transcription successful! Length: {len(text)} characters") return text except Exception as e: error_msg = str(e) print(f"โŒ OpenAI API error: {error_msg}") # Check for specific error types if "Invalid file format" in error_msg: return "Error: Invalid audio file format. Supported formats: mp3, mp4, mpeg, mpga, m4a, wav, webm" elif "too large" in error_msg.lower(): return "Error: Audio file too large. Please use files under 25MB." elif "Incorrect API key" in error_msg or "Authentication" in error_msg: return "Error: Invalid OpenAI API key. Please check your .env file." elif "Rate limit" in error_msg: return "Error: OpenAI rate limit reached. Please wait a moment and try again." else: return f"Error: {error_msg}" def analyze_transcript_with_gemini(self, text: str) -> Dict: """Analyze transcript using Gemini with advanced prompt""" # Use the enhanced version by default return self.analyze_transcript_with_gemini_enhanced(text, segment_num=self.segment_number) def analyze_transcript_with_gemini_enhanced(self, text: str, segment_num: int = None) -> Dict: """Enhanced analysis that tracks individual segments and can combine them""" if not text or len(text.strip()) < 10: return {"error": "Text too short to analyze"} if not self.research_questions: return {"error": "Please set up research questions first"} if not gemini_model: return {"error": "Gemini API not configured"} # Determine if this is a specific segment or combined analysis is_combined = segment_num is None current_segment = segment_num if segment_num else self.segment_number # Build context section context_parts = [] if is_combined: context_parts.append("This is a COMBINED ANALYSIS of all segments.") context_parts.append(f"Total segments: {len(self.session_segments)}") else: context_parts.append(f"This is Segment {current_segment} of the interview.") if current_segment > 1: context_parts.append("Previous segments have covered:") covered_rqs = [f"RQ{i + 1}" for i, covered in enumerate(self.coverage_status["rq_covered"]) if covered] if covered_rqs: context_parts.append(f"- Research Questions: {', '.join(covered_rqs)}") context_section = "\n".join(context_parts) # Build framework section framework_section = "" if self.theoretical_framework: framework_section += f"\nTHEORETICAL FRAMEWORK:\n{self.theoretical_framework}\n" if self.predefined_codes: framework_section += "\nPREDEFINED CODES:\n" for category, codes in self.predefined_codes.items(): framework_section += f"- {category}: {', '.join(codes)}\n" if self.analysis_focus: framework_section += "\nANALYSIS FOCUS:\n" framework_section += "\n".join([f"- {focus}" for focus in self.analysis_focus]) # Modified prompt for combined vs individual analysis analysis_type = "COMBINED TRANSCRIPT" if is_combined else f"SEGMENT {current_segment}" prompt = f"""You are a Qualitative Research Analysis Assistant. {context_section} {analysis_type}: "{text}" RESEARCH FRAMEWORK: - Research Questions: {chr(10).join([f" RQ{i + 1}: {q}" for i, q in enumerate(self.research_questions)])} - Interview Protocol: {chr(10).join([f" Q{i + 1}: {q}" for i, q in enumerate(self.interview_protocol)])} {framework_section} ANALYSIS TASKS: 1. Apply predefined codes where relevant 2. Identify emergent codes not in the framework 3. Track research question coverage 4. Note theoretical alignments or challenges 5. Consider the analysis focus areas {"6. Identify patterns across segments" if is_combined else ""} {"7. Note evolution of themes" if is_combined else ""} PROVIDE YOUR ANALYSIS IN THIS EXACT JSON FORMAT: {{ "segment_number": {current_segment if not is_combined else '"combined"'}, "analysis_type": "{"combined" if is_combined else "individual"}", "alerts": [ {{"type": "supports", "code": "Code Name", "text": "โœ… Supports [Theory/Concept]: ..."}}, {{"type": "challenges", "text": "โš ๏ธ Challenges [Framework]: ..."}}, {{"type": "missing", "text": "๐Ÿ” Missing [Dimension]: ..."}}, {{"type": "emergent", "code": "New Code", "text": "โœณ๏ธ Emergent theme: ..."}}, {{"type": "noteworthy", "text": "๐Ÿ“Œ Noteworthy: ..."}} ], "rq_addressed": [1, 2], "codes_applied": ["Code 1", "Code 2"], "emergent_codes": ["New Theme 1"], "coverage": {{ "protocol_covered": [1, 3, 5], "completion_percent": 40, "missing_topics": ["Topic A", "Topic B"] }}, "follow_ups": [ "๐Ÿงญ To explore [concept], ask: 'Question?'", "๐Ÿงญ RQ3 needs data on [topic]" ], "insights": [ "Key pattern or finding", "Theoretical implication" ], "segment_summary": "Brief summary of {"all segments combined" if is_combined else "this segment's contribution"}"{', "cross_segment_patterns": ["Pattern 1", "Pattern 2"],' if is_combined else ""}{'"theme_evolution": "Description of how themes evolved across segments"' if is_combined else ""} }} Return ONLY the JSON.""" try: print(f"๐Ÿค– Analyzing {analysis_type} with Gemini...") response = gemini_model.generate_content(prompt) content = response.text.strip() # Parse JSON response try: start = content.find('{') end = content.rfind('}') + 1 if start >= 0 and end > start: json_str = content[start:end] analysis = json.loads(json_str) else: analysis = json.loads(content) except json.JSONDecodeError: print(f"JSON parsing error. Raw response: {content[:200]}...") # Return a default structure analysis = { "segment_number": current_segment if not is_combined else "combined", "analysis_type": "combined" if is_combined else "individual", "alerts": [], "rq_addressed": [], "codes_applied": [], "emergent_codes": [], "coverage": { "protocol_covered": [], "completion_percent": 0, "missing_topics": [] }, "follow_ups": ["Please try again"], "insights": ["Unable to parse response"], "segment_summary": "Analysis failed" } # Store individual segment analysis if not is_combined: self.segment_analyses[current_segment] = analysis # Update coverage tracking for rq_num in analysis.get("rq_addressed", []): if isinstance(rq_num, int) and 0 < rq_num <= len(self.research_questions): self.coverage_status["rq_covered"][rq_num - 1] = True for pq_num in analysis.get("coverage", {}).get("protocol_covered", []): if isinstance(pq_num, int) and 0 < pq_num <= len(self.interview_protocol): self.coverage_status["protocol_covered"][pq_num - 1] = True # Add codes to master list self.detected_codes.extend(analysis.get("codes_applied", [])) self.detected_codes.extend(analysis.get("emergent_codes", [])) return analysis except Exception as e: print(f"โŒ Gemini error: {type(e).__name__}: {str(e)}") return {"error": f"Analysis error: {str(e)}"} def format_analysis_output(self, analysis: Dict, show_segment_info: bool = True) -> str: """Format analysis output with segment information""" if "error" in analysis: return f"โŒ {analysis['error']}" # Determine analysis type is_combined = analysis.get("analysis_type") == "combined" segment_num = analysis.get("segment_number", "Unknown") # Format alerts section alerts_text = "" if "alerts" in analysis: alerts_text = "### ๐Ÿ“ข Analysis Alerts:\n" for alert in analysis.get("alerts", []): alerts_text += f"{alert.get('text', '')}\n" # Format codes section codes_section = "" applied_codes = analysis.get("codes_applied", []) emergent_codes = analysis.get("emergent_codes", []) if applied_codes: codes_section += f"**Applied Codes:** {', '.join(applied_codes)}\n" if emergent_codes: codes_section += f"**โœณ๏ธ Emergent Codes:** {', '.join(emergent_codes)}\n" # Build header based on type if is_combined: header = "### ๐Ÿ“Š Combined Analysis Results (All Segments)" segment_info = f"**Total Segments Analyzed:** {len(self.session_segments)}\n" else: header = f"### ๐Ÿ“Š Analysis Results - Segment {segment_num}" segment_info = f"**๐Ÿ“ Segment {segment_num} Summary:** {analysis.get('segment_summary', 'Analysis of this segment')}\n" # Get file name for current segment file_info = "" if not is_combined and segment_num != "Unknown" and isinstance(segment_num, int): if segment_num <= len(self.session_segments): file_info = f"**File:** {self.session_segments[segment_num - 1].get('file_name', 'unknown')}\n" # Build main analysis text analysis_text = f"""{header} {segment_info if show_segment_info else ""}{file_info}**Research Questions Addressed:** {', '.join([f"RQ{n}" for n in analysis.get('rq_addressed', [])])} {alerts_text} **Codes/Themes:** {codes_section} **Protocol Coverage:** {', '.join([f"Q{n}" for n in analysis.get('coverage', {}).get('protocol_covered', [])])} **Completion:** {analysis.get('coverage', {}).get('completion_percent', 0)}% of protocol addressed **Key Insights:** {chr(10).join(['โ€ข ' + insight for insight in analysis.get('insights', [])])}""" # Add combined-specific sections if is_combined: if "cross_segment_patterns" in analysis: analysis_text += "\n\n**Cross-Segment Patterns:**\n" analysis_text += chr(10).join( ['โ€ข ' + pattern for pattern in analysis.get('cross_segment_patterns', [])]) if "theme_evolution" in analysis: analysis_text += f"\n\n**Theme Evolution:**\n{analysis.get('theme_evolution', '')}" missing_topics = analysis.get('coverage', {}).get('missing_topics', []) if missing_topics: analysis_text += f"\n\n**Missing Topics:**\n{chr(10).join(['โ€ข ' + topic for topic in missing_topics])}" return analysis_text def generate_multi_view_analysis(self): """Generate both individual segment analyses and combined analysis""" if not hasattr(self, 'segment_analyses') or not self.segment_analyses: return "No segments analyzed yet", "", "" # Format individual segment analyses individual_analyses = "## ๐Ÿ“‘ Individual Segment Analyses\n\n" for seg_num in sorted(self.segment_analyses.keys()): analysis = self.segment_analyses[seg_num] formatted = self.format_analysis_output(analysis, show_segment_info=True) individual_analyses += f"{formatted}\n\n{'=' * 50}\n\n" # Generate combined analysis if multiple segments combined_analysis = "" if len(self.segment_analyses) > 1: # Combine all transcripts all_transcripts = "\n\n".join(self.transcript_history) # Run combined analysis combined_result = self.analyze_transcript_with_gemini_enhanced(all_transcripts, segment_num=None) combined_analysis = "## ๐Ÿ”— Combined Analysis (All Segments Together)\n\n" combined_analysis += self.format_analysis_output(combined_result, show_segment_info=True) else: combined_analysis = "Combined analysis requires at least 2 segments" # Generate comparison view comparison_view = self.generate_comparison_view() return individual_analyses, combined_analysis, comparison_view def generate_comparison_view(self): """Generate a comparison view of segments""" if not hasattr(self, 'segment_analyses') or not self.segment_analyses: return "No segments to compare" comparison = "## ๐Ÿ“Š Segment Comparison\n\n" # Create comparison table comparison += "| Segment | RQs Addressed | Codes Applied | Emergent Codes | Completion % |\n" comparison += "|---------|---------------|---------------|----------------|-------------|\n" for seg_num in sorted(self.segment_analyses.keys()): analysis = self.segment_analyses[seg_num] rqs = ', '.join([f"RQ{n}" for n in analysis.get('rq_addressed', [])]) applied = len(analysis.get('codes_applied', [])) emergent = len(analysis.get('emergent_codes', [])) completion = analysis.get('coverage', {}).get('completion_percent', 0) comparison += f"| {seg_num} | {rqs} | {applied} | {emergent} | {completion}% |\n" # Add theme tracking comparison += "\n### ๐Ÿ“ˆ Theme Frequency Across Segments\n\n" # Track code frequency by segment code_by_segment = {} for seg_num, analysis in self.segment_analyses.items(): all_codes = analysis.get('codes_applied', []) + analysis.get('emergent_codes', []) for code in all_codes: if code not in code_by_segment: code_by_segment[code] = {} code_by_segment[code][seg_num] = code_by_segment[code].get(seg_num, 0) + 1 # Display theme tracking for code, segments in sorted(code_by_segment.items()): seg_info = ', '.join([f"Seg{s}: {count}x" for s, count in sorted(segments.items())]) comparison += f"- **{code}**: {seg_info}\n" return comparison def process_interview_segment(self, audio_path, progress_callback=None): """Process an audio segment and return transcript and analysis""" print(f"\n๐ŸŽฏ Starting process_interview_segment") print(f" Audio path provided: {audio_path}") print(f" Type of audio_path: {type(audio_path)}") # Handle different types of audio input actual_audio_path = None # Case 1: audio_path is a tuple (sample_rate, audio_data) from recording if isinstance(audio_path, tuple) and len(audio_path) == 2: print(" Detected audio data tuple (recording)") sample_rate, audio_data = audio_path # Save the audio data to a temporary file temp_path = os.path.join(self.temp_dir, f"recorded_{datetime.now().strftime('%H%M%S')}.wav") wavfile.write(temp_path, sample_rate, audio_data) actual_audio_path = temp_path print(f" Saved recording to: {temp_path}") # Case 2: audio_path is a string (file path) elif isinstance(audio_path, str): actual_audio_path = audio_path # Case 3: audio_path is None, check if we have a saved file elif audio_path is None and self.current_file_info: actual_audio_path = self.current_file_info.get("path") print(f" Using saved path: {actual_audio_path}") # Validate we have a valid path if not actual_audio_path or not os.path.exists(actual_audio_path): return "", "โŒ No audio file found. Please upload a file or record audio first.", "", "", "No file to process" # Get file info if isinstance(audio_path, tuple): file_name = f"recorded_{datetime.now().strftime('%H%M%S')}.wav" file_size = os.path.getsize(actual_audio_path) / (1024 * 1024) # Update current file info for recording self.current_file_info = { "name": file_name, "size_mb": file_size, "path": actual_audio_path } else: file_name = self.current_file_info.get("name", os.path.basename(actual_audio_path)) file_size = self.current_file_info.get("size_mb", os.path.getsize(actual_audio_path) / (1024 * 1024)) # Progress update progress = f"""๐Ÿ”„ Processing: {file_name} ({file_size:.1f} MB) ๐Ÿ“Š Current Step: Transcribing audio with Whisper... โฑ๏ธ Estimated time: {int(file_size * 0.5)}-{int(file_size * 1)} minutes for transcription ๐Ÿ’ก Tip: Larger files take longer. A 10MB file typically takes 5-10 minutes.""" # Update progress callback if provided if progress_callback: progress_callback(progress) # Transcribe with Whisper print(f"๐ŸŽต Starting transcription of {file_size:.1f} MB file...") start_time = datetime.now() transcript = self.transcribe_audio(actual_audio_path, progress_callback) transcription_time = (datetime.now() - start_time).total_seconds() print(f"โœ… Transcription completed in {transcription_time:.1f} seconds") if transcript.startswith("Error:"): return transcript, "โŒ Transcription failed", "", "", progress + "\n\nโŒ Transcription failed" # Add to history with file info timestamp = datetime.now().strftime("%H:%M:%S") # Safely check for continuation attributes is_continuation = getattr(self, 'is_continuation', False) segment_number = getattr(self, 'segment_number', 1) segment_label = f"Segment {segment_number}" if is_continuation else "Segment 1" self.transcript_history.append(f"[{timestamp}] [{file_name}] [{segment_label}] {transcript}") # Check if research context is set up if not self.research_questions: full_transcript = "\n\n".join(self.transcript_history) return full_transcript, "โš ๏ธ Please set up research questions first", "", "", progress # Update progress for analysis phase progress = f"""โœ… Transcription complete! ({transcription_time:.1f} seconds) ๐Ÿ“Š Current Step: Analyzing with Gemini 1.5 Pro... ๐Ÿ” Analyzing {segment_label} โฑ๏ธ This usually takes 10-30 seconds...""" if progress_callback: progress_callback(progress) # Analyze with Gemini print(f"๐Ÿค– Starting Gemini analysis...") analysis_start = datetime.now() analysis = self.analyze_transcript_with_gemini(transcript) analysis_time = (datetime.now() - analysis_start).total_seconds() print(f"โœ… Analysis completed in {analysis_time:.1f} seconds") # Format outputs full_transcript = "\n\n".join(self.transcript_history) if "error" not in analysis: # Format analysis output analysis_text = self.format_analysis_output(analysis) follow_ups = "### ๐Ÿ’ก Suggested Follow-ups:\n" + \ '\n'.join(analysis.get('follow_ups', [])) rq_coverage = sum(self.coverage_status["rq_covered"]) / len( self.research_questions) * 100 if self.research_questions else 0 protocol_coverage = sum(self.coverage_status["protocol_covered"]) / len( self.interview_protocol) * 100 if self.interview_protocol else 0 # Track unique codes all_codes = list(set(self.detected_codes)) applied_unique = list(set(analysis.get("codes_applied", []))) emergent_unique = list(set(analysis.get("emergent_codes", []))) coverage = f"""### ๐Ÿ“ˆ Overall Progress: - **Research Questions:** {rq_coverage:.0f}% ({sum(self.coverage_status["rq_covered"])}/{len(self.research_questions)}) - **Protocol Questions:** {protocol_coverage:.0f}% ({sum(self.coverage_status["protocol_covered"])}/{len(self.interview_protocol)}) - **Total Unique Codes:** {len(all_codes)} - Framework Codes: {len(applied_unique)} - Emergent Codes: {len(emergent_unique)} - **Segments Processed:** {len(self.processed_files)}""" progress = f"โœ… Completed: {file_name} ({segment_label})" else: analysis_text = f"โŒ {analysis['error']}" follow_ups = "Unable to generate follow-ups" coverage = "Unable to calculate coverage" progress = f"โŒ Failed: {file_name}" return full_transcript, analysis_text, follow_ups, coverage, progress # Initialize copilot = InterviewCoPilot() # Create improved interface with gr.Blocks(title="Research Interview Co-Pilot", theme=gr.themes.Soft(), css=""" .file-info { background-color: #f0f0f0; padding: 10px; border-radius: 5px; margin: 10px 0; } .success { color: #28a745; } .warning { color: #ffc107; } .error { color: #dc3545; } h1 { text-align: center; } .contain { max-width: 1200px; margin: auto; } """) as app: gr.Markdown(""" # ๐ŸŽ™๏ธ Research Interview Co-Pilot - Enhanced with Multi-View Analysis **Transcription:** OpenAI Whisper | **Analysis:** Google Gemini Pro Now with individual segment analysis, combined analysis, and segment comparison! """) with gr.Tab("๐Ÿ“‹ Setup"): gr.Markdown("### Set up your research context") with gr.Row(): with gr.Column(): rq_input = gr.Textbox( label="Research Questions (one per line) *", placeholder="What pedagogical strategies are evident in AI educators?\nHow do AI tools emphasize practical applications?\nWhat are the differences between various AI approaches?", lines=6 ) protocol_input = gr.Textbox( label="Interview Protocol Questions (one per line)", placeholder="Tell me about your experience with AI\nHow do you use AI tools?\nWhat challenges have you faced?", lines=6 ) with gr.Column(): framework_input = gr.Textbox( label="Theoretical Framework (optional)", placeholder="e.g., Technology Acceptance Model (TAM)\nGrounded Theory approach\nActivity Theory lens", lines=3 ) codes_input = gr.Textbox( label="Predefined Codes (optional - format: 'Category: code1, code2')", placeholder="Pedagogical: Scaffolding, Direct Instruction, Guided Practice\nPractical: Application, Implementation, Real-world Use\nEthical: Privacy Concerns, Bias Awareness, Transparency", lines=6 ) focus_input = gr.Textbox( label="Analysis Focus Areas (optional - one per line)", placeholder="Look for emotional responses\nPay attention to metaphors used\nNote any resistance or enthusiasm", lines=3 ) # Segment continuation option with gr.Row(): continue_interview = gr.Checkbox( label="This is a continuation of a previous interview segment", value=False ) segment_info = gr.Textbox( label="Segment Info", value="Segment 1", interactive=False ) setup_btn = gr.Button("Setup Research Context", variant="primary", size="lg") setup_output = gr.Textbox(label="Setup Status", interactive=False, lines=6) # Save/Load framework buttons with gr.Row(): save_framework_btn = gr.Button("๐Ÿ’พ Save Framework", size="sm") load_framework_btn = gr.Button("๐Ÿ“‚ Load Framework", size="sm") framework_file = gr.File(label="Framework File", visible=False, file_types=[".json"]) def update_segment_info(is_continuation): if is_continuation: copilot.is_continuation = True copilot.segment_number += 1 return f"Segment {copilot.segment_number} (Continuing from previous)" else: copilot.is_continuation = False copilot.segment_number = 1 return "Segment 1" def save_framework(rq, protocol, framework, codes, focus): """Save current framework to JSON file""" framework_data = { "research_questions": rq, "interview_protocol": protocol, "theoretical_framework": framework, "predefined_codes": codes, "analysis_focus": focus, "saved_date": datetime.now().isoformat() } filename = f"framework_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" filepath = os.path.join(copilot.temp_dir, filename) with open(filepath, 'w') as f: json.dump(framework_data, f, indent=2) return gr.update(visible=True, value=filepath) def load_framework(file): """Load framework from JSON file""" if not file: return "", "", "", "", "", "No file selected" try: with open(file.name, 'r') as f: data = json.load(f) return ( data.get("research_questions", ""), data.get("interview_protocol", ""), data.get("theoretical_framework", ""), data.get("predefined_codes", ""), data.get("analysis_focus", ""), f"โœ… Loaded framework from {os.path.basename(file.name)}" ) except Exception as e: return "", "", "", "", "", f"โŒ Error loading file: {str(e)}" continue_interview.change( update_segment_info, inputs=[continue_interview], outputs=[segment_info] ) setup_btn.click( fn=copilot.setup_research_context, inputs=[rq_input, protocol_input, framework_input, codes_input, focus_input], outputs=setup_output ) save_framework_btn.click( save_framework, inputs=[rq_input, protocol_input, framework_input, codes_input, focus_input], outputs=[framework_file] ) framework_file.change( lambda x: gr.update(visible=False), inputs=[framework_file], outputs=[framework_file] ) load_framework_btn.click( lambda: gr.update(visible=True), outputs=[framework_file] ).then( load_framework, inputs=[framework_file], outputs=[rq_input, protocol_input, framework_input, codes_input, focus_input, setup_output] ) with gr.Tab("๐ŸŽค Interview Processing"): gr.Markdown("### Process interview audio with multi-view analysis") # Session info at the top with gr.Row(): session_info = gr.Markdown(copilot.get_session_summary()) with gr.Row(): # Session control buttons new_file_btn = gr.Button("๐Ÿ“ New File, Keep Setup", variant="secondary") reset_session_btn = gr.Button("๐Ÿ”„ Reset Session", variant="secondary") reset_all_btn = gr.Button("๐Ÿ—‘๏ธ Reset Everything", variant="stop") with gr.Row(): with gr.Column(scale=1): # File upload with preview audio_input = gr.Audio( sources=["upload", "microphone"], type="filepath", label="๐Ÿ“ Upload Audio File or ๐ŸŽค Record", interactive=True ) file_status = gr.Markdown("*Upload a file to see its status*") # Compression tool with gr.Accordion("๐Ÿ”ง Audio Compression Tool", open=False): gr.Markdown("Compress large audio files") quality_select = gr.Radio( choices=["high", "medium", "low"], value="medium", label="Compression Quality" ) compress_btn = gr.Button("Compress Audio", variant="secondary") compress_output = gr.Markdown() compressed_audio = gr.Audio( label="Compressed Audio", visible=False ) process_btn = gr.Button("๐Ÿ” Process & Analyze", variant="primary", size="lg") # Add visual processing indicator processing_status = gr.Markdown( value="", visible=True ) # Add progress bar with gr.Row(): progress_bar = gr.Progress() progress_status = gr.Textbox( label="Progress", interactive=False, lines=4, value="Ready to process audio..." ) # Add multi-view analysis button AFTER progress status generate_multiview_btn = gr.Button( "๐Ÿ“Š Generate Multi-View Analysis", variant="secondary", size="lg", visible=True # Always visible for now ) with gr.Column(scale=2): # Results area with enhanced tabs with gr.Tabs(): with gr.Tab("๐Ÿ“ Transcript"): transcript_output = gr.Textbox( label="Full Transcript", lines=15, max_lines=25, interactive=False ) with gr.Tab("๐Ÿ” Current Segment"): current_analysis_output = gr.Markdown( value="*Process a segment to see analysis*" ) with gr.Tab("๐Ÿ“‘ All Segments"): all_segments_output = gr.Markdown( value="*Individual analyses will appear here*" ) with gr.Tab("๐Ÿ”— Combined Analysis"): combined_analysis_output = gr.Markdown( value="*Combined analysis will appear here after 2+ segments*" ) with gr.Tab("๐Ÿ“Š Comparison"): comparison_output = gr.Markdown( value="*Segment comparison will appear here*" ) with gr.Tab("๐Ÿ’ก Follow-ups"): followup_output = gr.Markdown() with gr.Tab("๐Ÿ“ˆ Coverage"): coverage_output = gr.Markdown() # Hidden state to store file path audio_state = gr.State() # Session management functions def new_file_keep_setup(): """Clear audio input but keep framework""" copilot.is_continuation = True copilot.segment_number = len(copilot.session_segments) + 1 return ( None, # Clear audio input "*Upload a new file to continue the interview*", f"Ready for Segment {copilot.segment_number}", copilot.get_session_summary() ) def reset_session(): """Reset session but keep framework""" result = copilot.reset_session(keep_framework=True) return ( None, # Clear audio "*Session reset. Framework kept.*", "Ready to process audio...", copilot.get_session_summary(), "" # Clear transcript ) def reset_everything(): """Reset everything including framework""" result = copilot.reset_session(keep_framework=False) return ( None, # Clear audio "*Everything reset. Please set up framework again.*", "Ready to process audio...", copilot.get_session_summary(), "", # Clear transcript "โŒ Framework cleared. Please go to Setup tab." ) # File status update - store the path in state audio_input.change( fn=copilot.check_audio_file, inputs=[audio_input], outputs=[audio_input, file_status, audio_state] ) # Compression - update state with compressed file compress_btn.click( fn=copilot.compress_audio, inputs=[audio_state, quality_select], outputs=[compressed_audio, compress_output] ).then( fn=lambda x, msg: (gr.update(visible=True), x) if x else (gr.update(visible=False), None), inputs=[compressed_audio, compress_output], outputs=[compressed_audio, audio_state] ) # Modified process function to handle multi-view def process_and_update_session_multiview(audio_path, progress=gr.Progress()): """Process audio and update session info with multi-view support""" # Create a progress callback function def update_progress(message): progress(0.5, desc=message) return message # Initialize progress progress(0, desc="Starting audio processing...") # First, process the current segment with progress callback results = copilot.process_interview_segment(audio_path, progress_callback=update_progress) # Update progress to complete progress(1.0, desc="Processing complete!") # Add to session if successful if results[4].startswith("โœ…"): file_name = copilot.current_file_info.get("name", "unknown") duration = copilot.current_file_info.get("size_mb", 0) * 0.5 # Rough estimate transcript_length = len(results[0]) copilot.add_segment_to_session(file_name, duration, transcript_length) # Get current segment analysis current_segment_analysis = results[1] # Check if we should show multi-view button (only after 2+ segments for meaningful comparison) show_multiview = len(copilot.session_segments) >= 2 # Return results plus updated session info return ( results[0], # transcript current_segment_analysis, # current segment analysis results[2], # follow-ups results[3], # coverage results[4], # progress copilot.get_session_summary(), # session info gr.update(visible=show_multiview) # multi-view button visibility ) # Multi-view generation function def generate_all_views(): """Generate all analysis views""" individual, combined, comparison = copilot.generate_multi_view_analysis() return individual, combined, comparison # Connect the process button with loading state process_btn.click( fn=lambda: gr.update( value="๐Ÿ”„ **Processing in progress...** Please wait, this may take several minutes for large files."), outputs=[processing_status] ).then( fn=process_and_update_session_multiview, inputs=[audio_state], outputs=[ transcript_output, current_analysis_output, followup_output, coverage_output, progress_status, session_info, generate_multiview_btn ] ).then( fn=lambda: gr.update(value=""), outputs=[processing_status] ) # Connect the multi-view button generate_multiview_btn.click( fn=generate_all_views, outputs=[ all_segments_output, combined_analysis_output, comparison_output ] ) # Session control buttons new_file_btn.click( fn=new_file_keep_setup, outputs=[audio_input, file_status, progress_status, session_info] ) reset_session_btn.click( fn=reset_session, outputs=[audio_input, file_status, progress_status, session_info, transcript_output] ) reset_all_btn.click( fn=reset_everything, outputs=[audio_input, file_status, progress_status, session_info, transcript_output, current_analysis_output] ) with gr.Tab("๐Ÿ“Š Summary & Export"): gr.Markdown("### Generate comprehensive summary with multi-view analysis") def generate_enhanced_summary(): if not copilot.transcript_history: return "No interview data yet.", "", "" unique_codes = list(set(copilot.detected_codes)) # Generate different formats markdown_summary = f"""# Interview Summary Report **Generated:** {datetime.now().strftime("%Y-%m-%d %H:%M")} **Analysis Engine:** Google Gemini Pro **Files Processed:** {', '.join(copilot.processed_files)} **Total Segments:** {len(copilot.session_segments)} ## Research Question Coverage {chr(10).join([f"- {'โœ…' if covered else 'โŒ'} {q}" for q, covered in zip(copilot.research_questions, copilot.coverage_status["rq_covered"])])} ## Detected Codes/Themes ({len(unique_codes)} unique) {chr(10).join(['- ' + code for code in unique_codes])} ## Segment-by-Segment Analysis {"Included in multi-view analysis - see Interview Processing tab" if copilot.segment_analyses else "No individual analyses yet"} ## Full Transcript {chr(10).join(copilot.transcript_history)}""" # CSV format for codes csv_codes = "Code,Frequency\n" code_freq = {} for code in copilot.detected_codes: code_freq[code] = code_freq.get(code, 0) + 1 for code, freq in sorted(code_freq.items(), key=lambda x: x[1], reverse=True): csv_codes += f'"{code}",{freq}\n' # JSON format with segment analyses json_export = json.dumps({ "metadata": { "date": datetime.now().isoformat(), "files": copilot.processed_files, "total_segments": len(copilot.transcript_history), "analysis_engine": "Gemini Pro" }, "research_questions": { "questions": copilot.research_questions, "coverage": copilot.coverage_status["rq_covered"] }, "codes": unique_codes, "transcripts": copilot.transcript_history, "segment_analyses": {str(k): v for k, v in copilot.segment_analyses.items()} if hasattr(copilot, 'segment_analyses') else {} }, indent=2) return markdown_summary, csv_codes, json_export with gr.Row(): summary_btn = gr.Button("Generate All Formats", variant="primary", size="lg") with gr.Row(): with gr.Column(): summary_display = gr.Markdown(label="Summary Preview") with gr.Column(): with gr.Accordion("๐Ÿ“ฅ Export Options", open=True): csv_export = gr.Textbox( label="CSV Export (Codes)", lines=10, interactive=True ) json_export = gr.Textbox( label="JSON Export (Complete Data)", lines=10, interactive=True ) summary_btn.click( fn=generate_enhanced_summary, outputs=[summary_display, csv_export, json_export] ) with gr.Tab("โ„น๏ธ Help"): gr.Markdown(f""" ### System Information **Temp Directory:** {copilot.temp_dir} **Transcription Engine:** OpenAI Whisper - Requires: OPENAI_API_KEY in .env file - Max file size: 25 MB - Supported formats: MP3, WAV, M4A, OGG, WEBM, MP4, MPEG, MPGA **Analysis Engine:** Google Gemini Pro - Requires: GEMINI_API_KEY in .env file - Free tier: 60 requests per minute - No file size limits (only processes text) ### Multi-View Analysis Features **Current Segment View:** Shows analysis of the just-processed segment **All Segments View:** Shows individual analyses for each segment **Combined Analysis:** Analyzes all segments together to find patterns **Comparison View:** Side-by-side comparison of all segments ### File Handling Tips **To reduce file size:** 1. Use the built-in compression tool 2. Record at lower quality (16kHz, mono) 3. Split long recordings into segments **Best practices:** - Process 3-5 minute segments for optimal results - Use clear file names for easy tracking - Check file size before processing ### Troubleshooting **If recording doesn't work:** - Check browser permissions for microphone - Try a different browser (Chrome/Edge work best) - Use upload instead of recording **If processing fails:** - Check the console for detailed error messages - Verify your API keys are correct - Ensure the audio file format is supported ### Required API Keys Add to your `.env` file: ``` OPENAI_API_KEY=sk-your-openai-key GEMINI_API_KEY=your-gemini-key ``` """) # Launch if __name__ == "__main__": print("\n" + "=" * 50) print("๐Ÿš€ Starting Enhanced Research Interview Co-Pilot with Multi-View Analysis") print("=" * 50) # Check temp directory print(f"๐Ÿ“ Temp directory: {copilot.temp_dir}") print(f" - Free space: {shutil.disk_usage(tempfile.gettempdir()).free / (1024 ** 3):.1f} GB") # Check dependencies if shutil.which('ffmpeg'): print("โœ… FFmpeg found - compression available") else: print("โš ๏ธ FFmpeg not found - compression unavailable") # Check API keys if not os.getenv("OPENAI_API_KEY"): print("โŒ No OpenAI API key found (required for transcription)") else: print("โœ… OpenAI API key loaded (Whisper transcription)") # Test OpenAI client initialization try: test_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) print("โœ… OpenAI client initialized successfully") except Exception as e: print(f"โŒ Error initializing OpenAI client: {e}") if not os.getenv("GEMINI_API_KEY"): print("โŒ No Gemini API key found (required for analysis)") else: print("โœ… Gemini API key loaded (analysis)") if not os.getenv("OPENAI_API_KEY") or not os.getenv("GEMINI_API_KEY"): print("\nโš ๏ธ Please add missing API keys to your .env file") else: print("\nโœ… All systems ready!") print("\n๐Ÿ“Œ Launching application...") app.queue().launch()