rodunia's picture
Update app.py
58e2ca7 verified
import os
import gradio as gr
import json
from datetime import datetime
from typing import List, Dict, Tuple
from dotenv import load_dotenv
import shutil
import tempfile
import google.generativeai as genai
import traceback
import numpy as np
import scipy.io.wavfile as wavfile
# Load environment variables
load_dotenv()
# Import OpenAI for Whisper transcription
from openai import OpenAI
# Initialize OpenAI client
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Configure Gemini for analysis
gemini_api_key = os.getenv("GEMINI_API_KEY")
if gemini_api_key:
genai.configure(api_key=gemini_api_key)
# Try to use the best available Gemini model
try:
# List available models
available_models = genai.list_models()
print("πŸ“‹ Available Gemini models:")
gemini_models = []
for model in available_models:
if 'generateContent' in model.supported_generation_methods:
print(f" - {model.name}")
gemini_models.append(model.name)
# Priority order: Try the best models first
model_priority = [
'models/gemini-1.5-pro-latest', # Latest 1.5 Pro
'models/gemini-1.5-pro', # Stable 1.5 Pro
'models/gemini-1.5-pro-002', # Specific version
'models/gemini-1.5-flash', # Faster but still good
'models/gemini-pro' # Original Pro
]
gemini_model = None
for model_name in model_priority:
if model_name in gemini_models:
try:
gemini_model = genai.GenerativeModel(
model_name.replace('models/', ''),
generation_config={
'temperature': 0.7, # Balance creativity and consistency
'top_p': 0.95,
'top_k': 40,
'max_output_tokens': 8192, # Increased for detailed analysis
}
)
print(f"βœ… Using {model_name} - Best available model!")
break
except Exception as e:
print(f" Could not initialize {model_name}: {e}")
# Fallback if none of the preferred models work
if not gemini_model and gemini_models:
model_name = gemini_models[0].replace('models/', '')
gemini_model = genai.GenerativeModel(model_name)
print(f"βœ… Using {model_name}")
if not gemini_model:
print("❌ No suitable Gemini models found!")
except Exception as e:
print(f"⚠️ Error listing Gemini models: {e}")
# Try direct initialization with best model
try:
gemini_model = genai.GenerativeModel(
'gemini-1.5-pro',
generation_config={
'temperature': 0.7,
'top_p': 0.95,
'top_k': 40,
'max_output_tokens': 8192,
}
)
print("βœ… Gemini 1.5 Pro initialized (direct)")
except:
try:
gemini_model = genai.GenerativeModel('gemini-pro')
print("βœ… Gemini Pro initialized (fallback)")
except:
print("❌ Could not initialize any Gemini model!")
gemini_model = None
else:
print("⚠️ No Gemini API key found!")
gemini_model = None
class InterviewCoPilot:
def __init__(self):
self.transcript_history = []
self.research_questions = []
self.interview_protocol = []
self.detected_codes = []
self.coverage_status = {
"rq_covered": [],
"protocol_covered": []
}
# Add file tracking
self.processed_files = []
self.current_file_info = {}
self.current_audio_path = None # Store the current audio path
# Enhanced framework support - Initialize all attributes
self.theoretical_framework = ""
self.predefined_codes = {} # {category: [codes]}
self.analysis_focus = []
self.is_continuation = False # Initialize here
self.segment_number = 1 # Initialize here
# Session memory for Phase 1
self.session_segments = [] # List of processed segments
self.session_name = f"Interview_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.framework_loaded = False
# Create a persistent temp directory for this session
self.temp_dir = tempfile.mkdtemp(prefix="interview_copilot_")
print(f"πŸ“ Created temp directory: {self.temp_dir}")
# Multi-view analysis support
self.segment_analyses = {} # Store individual segment analyses
def __del__(self):
"""Cleanup temp directory on exit"""
if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
print(f"🧹 Cleaned up temp directory: {self.temp_dir}")
except:
pass
def setup_research_context(self, research_questions: str, interview_protocol: str,
theoretical_framework: str = "", predefined_codes: str = "",
analysis_focus: str = ""):
"""Setup the research context before starting interviews"""
if not research_questions.strip():
return "❌ Please provide at least research questions"
# Parse research questions
self.research_questions = [q.strip() for q in research_questions.split('\n') if q.strip()]
# Parse interview protocol
self.interview_protocol = [q.strip() for q in interview_protocol.split('\n') if q.strip()]
# Store theoretical framework
self.theoretical_framework = theoretical_framework.strip()
# Parse predefined codes (format: "Category: code1, code2, code3")
self.predefined_codes = {}
if predefined_codes.strip():
for line in predefined_codes.split('\n'):
if ':' in line:
category, codes = line.split(':', 1)
self.predefined_codes[category.strip()] = [
code.strip() for code in codes.split(',') if code.strip()
]
# Parse analysis focus areas
self.analysis_focus = [f.strip() for f in analysis_focus.split('\n') if f.strip()]
# Initialize coverage tracking
self.coverage_status = {
"rq_covered": [False] * len(self.research_questions),
"protocol_covered": [False] * len(self.interview_protocol)
}
# Build status message
status_parts = [
f"βœ… Setup complete!",
f"πŸ“‹ Research Questions: {len(self.research_questions)}",
f"πŸ“ Protocol Questions: {len(self.interview_protocol)}"
]
if self.theoretical_framework:
status_parts.append(f"πŸ“š Theoretical Framework: Yes")
if self.predefined_codes:
total_codes = sum(len(codes) for codes in self.predefined_codes.values())
status_parts.append(f"🏷️ Predefined Codes: {total_codes} codes in {len(self.predefined_codes)} categories")
if self.analysis_focus:
status_parts.append(f"🎯 Analysis Focus Areas: {len(self.analysis_focus)}")
# Mark framework as loaded
self.framework_loaded = True
return "\n".join(status_parts)
def add_segment_to_session(self, file_name, duration, transcript_length):
"""Add a processed segment to the current session"""
segment_info = {
"number": len(self.session_segments) + 1,
"file_name": file_name,
"duration": duration,
"transcript_length": transcript_length,
"timestamp": datetime.now().strftime("%H:%M:%S"),
"codes_found": len(self.detected_codes)
}
self.session_segments.append(segment_info)
return segment_info
def get_session_summary(self):
"""Get a summary of the current session"""
if not self.session_segments:
return "No segments processed yet"
total_duration = sum(seg.get("duration", 0) for seg in self.session_segments)
total_transcript = sum(seg.get("transcript_length", 0) for seg in self.session_segments)
summary = f"""### πŸ“Š Current Session: {self.session_name}
**Segments Processed:** {len(self.session_segments)}
**Total Duration:** {total_duration:.1f} minutes
**Total Transcript:** {total_transcript:,} characters
**Unique Codes Found:** {len(set(self.detected_codes))}
**Processed Files:**
"""
for seg in self.session_segments:
summary += f"\nβœ“ Segment {seg['number']} - {seg['file_name']} ({seg['timestamp']})"
return summary
def reset_session(self, keep_framework=True):
"""Reset the session but optionally keep the framework"""
self.session_segments = []
self.transcript_history = []
self.detected_codes = []
self.processed_files = []
self.segment_number = 1
self.is_continuation = False
self.segment_analyses = {} # Reset segment analyses
if not keep_framework:
self.research_questions = []
self.interview_protocol = []
self.theoretical_framework = ""
self.predefined_codes = {}
self.analysis_focus = []
self.framework_loaded = False
self.coverage_status = {
"rq_covered": [],
"protocol_covered": []
}
else:
# Reset only coverage status
self.coverage_status = {
"rq_covered": [False] * len(self.research_questions),
"protocol_covered": [False] * len(self.interview_protocol)
}
self.session_name = f"Interview_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return "βœ… Session reset. " + ("Framework kept." if keep_framework else "Everything cleared.")
def save_uploaded_file(self, audio_path):
"""Save uploaded file to our temp directory to ensure it persists"""
if not audio_path or not os.path.exists(audio_path):
return None
try:
# Copy file to our temp directory
file_name = os.path.basename(audio_path)
saved_path = os.path.join(self.temp_dir, file_name)
# If file already exists, add timestamp to make unique
if os.path.exists(saved_path):
name, ext = os.path.splitext(file_name)
timestamp = datetime.now().strftime("%H%M%S")
file_name = f"{name}_{timestamp}{ext}"
saved_path = os.path.join(self.temp_dir, file_name)
shutil.copy2(audio_path, saved_path)
print(f"πŸ’Ύ Saved file to: {saved_path}")
return saved_path
except Exception as e:
print(f"❌ Error saving file: {str(e)}")
return None
def check_audio_file(self, audio_path):
"""Pre-check audio file before processing"""
if not audio_path:
return None, "No file selected", None
try:
# Save the file to our temp directory
saved_path = self.save_uploaded_file(audio_path)
if not saved_path:
return None, "❌ Error saving uploaded file", None
file_size = os.path.getsize(saved_path)
file_size_mb = file_size / (1024 * 1024)
file_name = os.path.basename(saved_path)
# Store file info
self.current_file_info = {
"name": file_name,
"size_mb": file_size_mb,
"path": saved_path,
"original_path": audio_path
}
# Debug info
print(f"πŸ“Š File check:")
print(f" - Original path: {audio_path}")
print(f" - Saved path: {saved_path}")
print(f" - Size: {file_size_mb:.2f} MB")
print(f" - Exists: {os.path.exists(saved_path)}")
# Check file size
if file_size_mb > 25:
status = f"""⚠️ **File too large for direct processing**
- File: {file_name}
- Size: {file_size_mb:.1f} MB
- Maximum: 25 MB
**Options:**
1. Compress the file using the compression tool below
2. Split into smaller segments
3. Use a different recording with lower quality settings"""
return None, status, saved_path
# Good to go
status = f"""βœ… **File ready for processing**
- File: {file_name}
- Size: {file_size_mb:.1f} MB
- Status: Within limits
- Saved to: {os.path.basename(self.temp_dir)}/"""
return saved_path, status, saved_path
except Exception as e:
print(f"❌ Error in check_audio_file: {traceback.format_exc()}")
return None, f"❌ Error checking file: {str(e)}", None
def compress_audio(self, audio_path, quality="medium"):
"""Compress audio file with different quality settings"""
# Handle different input types
actual_path = None
# If it's a tuple (sample_rate, audio_data), save it first
if isinstance(audio_path, tuple) and len(audio_path) == 2:
sample_rate, audio_data = audio_path
# Save to temporary file
temp_path = os.path.join(self.temp_dir, f"temp_audio_{datetime.now().strftime('%H%M%S')}.wav")
wavfile.write(temp_path, sample_rate, audio_data)
actual_path = temp_path
elif isinstance(audio_path, str):
actual_path = audio_path
else:
return None, "No valid audio file to compress"
if not actual_path or not os.path.exists(actual_path):
return None, "No file to compress or file not found"
try:
import subprocess
# Quality presets
quality_settings = {
"high": {"bitrate": "128k", "sample_rate": "44100"},
"medium": {"bitrate": "64k", "sample_rate": "22050"},
"low": {"bitrate": "32k", "sample_rate": "16000"}
}
settings = quality_settings.get(quality, quality_settings["medium"])
# Create output filename in our temp directory
input_name = os.path.basename(actual_path)
name, ext = os.path.splitext(input_name)
output_path = os.path.join(self.temp_dir, f"{name}_compressed{ext}")
# Compress
cmd = [
'ffmpeg', '-i', actual_path,
'-b:a', settings["bitrate"],
'-ar', settings["sample_rate"],
'-ac', '1', # Mono
'-y', output_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
# Check new size
new_size = os.path.getsize(output_path) / (1024 * 1024)
old_size = os.path.getsize(actual_path) / (1024 * 1024)
# Update file info
self.current_file_info["path"] = output_path
self.current_file_info["size_mb"] = new_size
return output_path, f"""βœ… **Compression successful!**
- Original size: {old_size:.1f} MB
- Compressed size: {new_size:.1f} MB
- Reduction: {((old_size - new_size) / old_size * 100):.0f}%
- Quality setting: {quality}
- Saved to: {os.path.basename(output_path)}"""
else:
return None, f"❌ Compression failed: {result.stderr}"
except subprocess.SubprocessError as e:
return None, f"❌ FFmpeg error: {str(e)}\n\nMake sure ffmpeg is installed."
except Exception as e:
return None, f"❌ Error: {str(e)}"
def transcribe_audio(self, audio_path: str, progress_callback=None) -> str:
"""Transcribe audio using Whisper API with progress updates"""
if not audio_path:
return "Error: No audio file provided"
if not os.path.exists(audio_path):
return f"Error: Audio file not found at path: {audio_path}"
if not openai_client.api_key:
return "Error: OpenAI API key not found (needed for transcription)"
try:
file_size = os.path.getsize(audio_path)
file_size_mb = file_size / (1024 * 1024)
print(f"πŸ“Š Transcribing file: {audio_path}")
print(f"πŸ“Š File size: {file_size_mb:.2f} MB ({file_size} bytes)")
# Check if it's actually over 25MB (OpenAI's limit)
if file_size_mb > 25:
return f"Error: Audio file too large. File size: {file_size_mb:.1f} MB (limit: 25 MB)"
# Update progress if callback provided
if progress_callback:
progress_callback(f"🎡 Transcribing {file_size_mb:.1f} MB file with OpenAI Whisper...")
with open(audio_path, "rb") as audio_file:
print("πŸ“Š Sending to OpenAI Whisper API...")
# New OpenAI v1.x syntax
transcript = openai_client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="text"
)
# In the new API, the response is directly the text
text = transcript if isinstance(transcript, str) else str(transcript)
# Add file info to transcript
file_name = self.current_file_info.get("name", "unknown")
if file_name not in self.processed_files:
self.processed_files.append(file_name)
print(f"βœ… Transcription successful! Length: {len(text)} characters")
return text
except Exception as e:
error_msg = str(e)
print(f"❌ OpenAI API error: {error_msg}")
# Check for specific error types
if "Invalid file format" in error_msg:
return "Error: Invalid audio file format. Supported formats: mp3, mp4, mpeg, mpga, m4a, wav, webm"
elif "too large" in error_msg.lower():
return "Error: Audio file too large. Please use files under 25MB."
elif "Incorrect API key" in error_msg or "Authentication" in error_msg:
return "Error: Invalid OpenAI API key. Please check your .env file."
elif "Rate limit" in error_msg:
return "Error: OpenAI rate limit reached. Please wait a moment and try again."
else:
return f"Error: {error_msg}"
def analyze_transcript_with_gemini(self, text: str) -> Dict:
"""Analyze transcript using Gemini with advanced prompt"""
# Use the enhanced version by default
return self.analyze_transcript_with_gemini_enhanced(text, segment_num=self.segment_number)
def analyze_transcript_with_gemini_enhanced(self, text: str, segment_num: int = None) -> Dict:
"""Enhanced analysis that tracks individual segments and can combine them"""
if not text or len(text.strip()) < 10:
return {"error": "Text too short to analyze"}
if not self.research_questions:
return {"error": "Please set up research questions first"}
if not gemini_model:
return {"error": "Gemini API not configured"}
# Determine if this is a specific segment or combined analysis
is_combined = segment_num is None
current_segment = segment_num if segment_num else self.segment_number
# Build context section
context_parts = []
if is_combined:
context_parts.append("This is a COMBINED ANALYSIS of all segments.")
context_parts.append(f"Total segments: {len(self.session_segments)}")
else:
context_parts.append(f"This is Segment {current_segment} of the interview.")
if current_segment > 1:
context_parts.append("Previous segments have covered:")
covered_rqs = [f"RQ{i + 1}" for i, covered in enumerate(self.coverage_status["rq_covered"]) if covered]
if covered_rqs:
context_parts.append(f"- Research Questions: {', '.join(covered_rqs)}")
context_section = "\n".join(context_parts)
# Build framework section
framework_section = ""
if self.theoretical_framework:
framework_section += f"\nTHEORETICAL FRAMEWORK:\n{self.theoretical_framework}\n"
if self.predefined_codes:
framework_section += "\nPREDEFINED CODES:\n"
for category, codes in self.predefined_codes.items():
framework_section += f"- {category}: {', '.join(codes)}\n"
if self.analysis_focus:
framework_section += "\nANALYSIS FOCUS:\n"
framework_section += "\n".join([f"- {focus}" for focus in self.analysis_focus])
# Modified prompt for combined vs individual analysis
analysis_type = "COMBINED TRANSCRIPT" if is_combined else f"SEGMENT {current_segment}"
prompt = f"""You are a Qualitative Research Analysis Assistant.
{context_section}
{analysis_type}: "{text}"
RESEARCH FRAMEWORK:
- Research Questions:
{chr(10).join([f" RQ{i + 1}: {q}" for i, q in enumerate(self.research_questions)])}
- Interview Protocol:
{chr(10).join([f" Q{i + 1}: {q}" for i, q in enumerate(self.interview_protocol)])}
{framework_section}
ANALYSIS TASKS:
1. Apply predefined codes where relevant
2. Identify emergent codes not in the framework
3. Track research question coverage
4. Note theoretical alignments or challenges
5. Consider the analysis focus areas
{"6. Identify patterns across segments" if is_combined else ""}
{"7. Note evolution of themes" if is_combined else ""}
PROVIDE YOUR ANALYSIS IN THIS EXACT JSON FORMAT:
{{
"segment_number": {current_segment if not is_combined else '"combined"'},
"analysis_type": "{"combined" if is_combined else "individual"}",
"alerts": [
{{"type": "supports", "code": "Code Name", "text": "βœ… Supports [Theory/Concept]: ..."}},
{{"type": "challenges", "text": "⚠️ Challenges [Framework]: ..."}},
{{"type": "missing", "text": "πŸ” Missing [Dimension]: ..."}},
{{"type": "emergent", "code": "New Code", "text": "✳️ Emergent theme: ..."}},
{{"type": "noteworthy", "text": "πŸ“Œ Noteworthy: ..."}}
],
"rq_addressed": [1, 2],
"codes_applied": ["Code 1", "Code 2"],
"emergent_codes": ["New Theme 1"],
"coverage": {{
"protocol_covered": [1, 3, 5],
"completion_percent": 40,
"missing_topics": ["Topic A", "Topic B"]
}},
"follow_ups": [
"🧭 To explore [concept], ask: 'Question?'",
"🧭 RQ3 needs data on [topic]"
],
"insights": [
"Key pattern or finding",
"Theoretical implication"
],
"segment_summary": "Brief summary of {"all segments combined" if is_combined else "this segment's contribution"}"{', "cross_segment_patterns": ["Pattern 1", "Pattern 2"],' if is_combined else ""}{'"theme_evolution": "Description of how themes evolved across segments"' if is_combined else ""}
}}
Return ONLY the JSON."""
try:
print(f"πŸ€– Analyzing {analysis_type} with Gemini...")
response = gemini_model.generate_content(prompt)
content = response.text.strip()
# Parse JSON response
try:
start = content.find('{')
end = content.rfind('}') + 1
if start >= 0 and end > start:
json_str = content[start:end]
analysis = json.loads(json_str)
else:
analysis = json.loads(content)
except json.JSONDecodeError:
print(f"JSON parsing error. Raw response: {content[:200]}...")
# Return a default structure
analysis = {
"segment_number": current_segment if not is_combined else "combined",
"analysis_type": "combined" if is_combined else "individual",
"alerts": [],
"rq_addressed": [],
"codes_applied": [],
"emergent_codes": [],
"coverage": {
"protocol_covered": [],
"completion_percent": 0,
"missing_topics": []
},
"follow_ups": ["Please try again"],
"insights": ["Unable to parse response"],
"segment_summary": "Analysis failed"
}
# Store individual segment analysis
if not is_combined:
self.segment_analyses[current_segment] = analysis
# Update coverage tracking
for rq_num in analysis.get("rq_addressed", []):
if isinstance(rq_num, int) and 0 < rq_num <= len(self.research_questions):
self.coverage_status["rq_covered"][rq_num - 1] = True
for pq_num in analysis.get("coverage", {}).get("protocol_covered", []):
if isinstance(pq_num, int) and 0 < pq_num <= len(self.interview_protocol):
self.coverage_status["protocol_covered"][pq_num - 1] = True
# Add codes to master list
self.detected_codes.extend(analysis.get("codes_applied", []))
self.detected_codes.extend(analysis.get("emergent_codes", []))
return analysis
except Exception as e:
print(f"❌ Gemini error: {type(e).__name__}: {str(e)}")
return {"error": f"Analysis error: {str(e)}"}
def format_analysis_output(self, analysis: Dict, show_segment_info: bool = True) -> str:
"""Format analysis output with segment information"""
if "error" in analysis:
return f"❌ {analysis['error']}"
# Determine analysis type
is_combined = analysis.get("analysis_type") == "combined"
segment_num = analysis.get("segment_number", "Unknown")
# Format alerts section
alerts_text = ""
if "alerts" in analysis:
alerts_text = "### πŸ“’ Analysis Alerts:\n"
for alert in analysis.get("alerts", []):
alerts_text += f"{alert.get('text', '')}\n"
# Format codes section
codes_section = ""
applied_codes = analysis.get("codes_applied", [])
emergent_codes = analysis.get("emergent_codes", [])
if applied_codes:
codes_section += f"**Applied Codes:** {', '.join(applied_codes)}\n"
if emergent_codes:
codes_section += f"**✳️ Emergent Codes:** {', '.join(emergent_codes)}\n"
# Build header based on type
if is_combined:
header = "### πŸ“Š Combined Analysis Results (All Segments)"
segment_info = f"**Total Segments Analyzed:** {len(self.session_segments)}\n"
else:
header = f"### πŸ“Š Analysis Results - Segment {segment_num}"
segment_info = f"**πŸ“ Segment {segment_num} Summary:** {analysis.get('segment_summary', 'Analysis of this segment')}\n"
# Get file name for current segment
file_info = ""
if not is_combined and segment_num != "Unknown" and isinstance(segment_num, int):
if segment_num <= len(self.session_segments):
file_info = f"**File:** {self.session_segments[segment_num - 1].get('file_name', 'unknown')}\n"
# Build main analysis text
analysis_text = f"""{header}
{segment_info if show_segment_info else ""}{file_info}**Research Questions Addressed:** {', '.join([f"RQ{n}" for n in analysis.get('rq_addressed', [])])}
{alerts_text}
**Codes/Themes:**
{codes_section}
**Protocol Coverage:** {', '.join([f"Q{n}" for n in analysis.get('coverage', {}).get('protocol_covered', [])])}
**Completion:** {analysis.get('coverage', {}).get('completion_percent', 0)}% of protocol addressed
**Key Insights:**
{chr(10).join(['β€’ ' + insight for insight in analysis.get('insights', [])])}"""
# Add combined-specific sections
if is_combined:
if "cross_segment_patterns" in analysis:
analysis_text += "\n\n**Cross-Segment Patterns:**\n"
analysis_text += chr(10).join(
['β€’ ' + pattern for pattern in analysis.get('cross_segment_patterns', [])])
if "theme_evolution" in analysis:
analysis_text += f"\n\n**Theme Evolution:**\n{analysis.get('theme_evolution', '')}"
missing_topics = analysis.get('coverage', {}).get('missing_topics', [])
if missing_topics:
analysis_text += f"\n\n**Missing Topics:**\n{chr(10).join(['β€’ ' + topic for topic in missing_topics])}"
return analysis_text
def generate_multi_view_analysis(self):
"""Generate both individual segment analyses and combined analysis"""
if not hasattr(self, 'segment_analyses') or not self.segment_analyses:
return "No segments analyzed yet", "", ""
# Format individual segment analyses
individual_analyses = "## πŸ“‘ Individual Segment Analyses\n\n"
for seg_num in sorted(self.segment_analyses.keys()):
analysis = self.segment_analyses[seg_num]
formatted = self.format_analysis_output(analysis, show_segment_info=True)
individual_analyses += f"{formatted}\n\n{'=' * 50}\n\n"
# Generate combined analysis if multiple segments
combined_analysis = ""
if len(self.segment_analyses) > 1:
# Combine all transcripts
all_transcripts = "\n\n".join(self.transcript_history)
# Run combined analysis
combined_result = self.analyze_transcript_with_gemini_enhanced(all_transcripts, segment_num=None)
combined_analysis = "## πŸ”— Combined Analysis (All Segments Together)\n\n"
combined_analysis += self.format_analysis_output(combined_result, show_segment_info=True)
else:
combined_analysis = "Combined analysis requires at least 2 segments"
# Generate comparison view
comparison_view = self.generate_comparison_view()
return individual_analyses, combined_analysis, comparison_view
def generate_comparison_view(self):
"""Generate a comparison view of segments"""
if not hasattr(self, 'segment_analyses') or not self.segment_analyses:
return "No segments to compare"
comparison = "## πŸ“Š Segment Comparison\n\n"
# Create comparison table
comparison += "| Segment | RQs Addressed | Codes Applied | Emergent Codes | Completion % |\n"
comparison += "|---------|---------------|---------------|----------------|-------------|\n"
for seg_num in sorted(self.segment_analyses.keys()):
analysis = self.segment_analyses[seg_num]
rqs = ', '.join([f"RQ{n}" for n in analysis.get('rq_addressed', [])])
applied = len(analysis.get('codes_applied', []))
emergent = len(analysis.get('emergent_codes', []))
completion = analysis.get('coverage', {}).get('completion_percent', 0)
comparison += f"| {seg_num} | {rqs} | {applied} | {emergent} | {completion}% |\n"
# Add theme tracking
comparison += "\n### πŸ“ˆ Theme Frequency Across Segments\n\n"
# Track code frequency by segment
code_by_segment = {}
for seg_num, analysis in self.segment_analyses.items():
all_codes = analysis.get('codes_applied', []) + analysis.get('emergent_codes', [])
for code in all_codes:
if code not in code_by_segment:
code_by_segment[code] = {}
code_by_segment[code][seg_num] = code_by_segment[code].get(seg_num, 0) + 1
# Display theme tracking
for code, segments in sorted(code_by_segment.items()):
seg_info = ', '.join([f"Seg{s}: {count}x" for s, count in sorted(segments.items())])
comparison += f"- **{code}**: {seg_info}\n"
return comparison
def process_interview_segment(self, audio_path, progress_callback=None):
"""Process an audio segment and return transcript and analysis"""
print(f"\n🎯 Starting process_interview_segment")
print(f" Audio path provided: {audio_path}")
print(f" Type of audio_path: {type(audio_path)}")
# Handle different types of audio input
actual_audio_path = None
# Case 1: audio_path is a tuple (sample_rate, audio_data) from recording
if isinstance(audio_path, tuple) and len(audio_path) == 2:
print(" Detected audio data tuple (recording)")
sample_rate, audio_data = audio_path
# Save the audio data to a temporary file
temp_path = os.path.join(self.temp_dir, f"recorded_{datetime.now().strftime('%H%M%S')}.wav")
wavfile.write(temp_path, sample_rate, audio_data)
actual_audio_path = temp_path
print(f" Saved recording to: {temp_path}")
# Case 2: audio_path is a string (file path)
elif isinstance(audio_path, str):
actual_audio_path = audio_path
# Case 3: audio_path is None, check if we have a saved file
elif audio_path is None and self.current_file_info:
actual_audio_path = self.current_file_info.get("path")
print(f" Using saved path: {actual_audio_path}")
# Validate we have a valid path
if not actual_audio_path or not os.path.exists(actual_audio_path):
return "", "❌ No audio file found. Please upload a file or record audio first.", "", "", "No file to process"
# Get file info
if isinstance(audio_path, tuple):
file_name = f"recorded_{datetime.now().strftime('%H%M%S')}.wav"
file_size = os.path.getsize(actual_audio_path) / (1024 * 1024)
# Update current file info for recording
self.current_file_info = {
"name": file_name,
"size_mb": file_size,
"path": actual_audio_path
}
else:
file_name = self.current_file_info.get("name", os.path.basename(actual_audio_path))
file_size = self.current_file_info.get("size_mb", os.path.getsize(actual_audio_path) / (1024 * 1024))
# Progress update
progress = f"""πŸ”„ Processing: {file_name} ({file_size:.1f} MB)
πŸ“Š Current Step: Transcribing audio with Whisper...
⏱️ Estimated time: {int(file_size * 0.5)}-{int(file_size * 1)} minutes for transcription
πŸ’‘ Tip: Larger files take longer. A 10MB file typically takes 5-10 minutes."""
# Update progress callback if provided
if progress_callback:
progress_callback(progress)
# Transcribe with Whisper
print(f"🎡 Starting transcription of {file_size:.1f} MB file...")
start_time = datetime.now()
transcript = self.transcribe_audio(actual_audio_path, progress_callback)
transcription_time = (datetime.now() - start_time).total_seconds()
print(f"βœ… Transcription completed in {transcription_time:.1f} seconds")
if transcript.startswith("Error:"):
return transcript, "❌ Transcription failed", "", "", progress + "\n\n❌ Transcription failed"
# Add to history with file info
timestamp = datetime.now().strftime("%H:%M:%S")
# Safely check for continuation attributes
is_continuation = getattr(self, 'is_continuation', False)
segment_number = getattr(self, 'segment_number', 1)
segment_label = f"Segment {segment_number}" if is_continuation else "Segment 1"
self.transcript_history.append(f"[{timestamp}] [{file_name}] [{segment_label}] {transcript}")
# Check if research context is set up
if not self.research_questions:
full_transcript = "\n\n".join(self.transcript_history)
return full_transcript, "⚠️ Please set up research questions first", "", "", progress
# Update progress for analysis phase
progress = f"""βœ… Transcription complete! ({transcription_time:.1f} seconds)
πŸ“Š Current Step: Analyzing with Gemini 1.5 Pro...
πŸ” Analyzing {segment_label}
⏱️ This usually takes 10-30 seconds..."""
if progress_callback:
progress_callback(progress)
# Analyze with Gemini
print(f"πŸ€– Starting Gemini analysis...")
analysis_start = datetime.now()
analysis = self.analyze_transcript_with_gemini(transcript)
analysis_time = (datetime.now() - analysis_start).total_seconds()
print(f"βœ… Analysis completed in {analysis_time:.1f} seconds")
# Format outputs
full_transcript = "\n\n".join(self.transcript_history)
if "error" not in analysis:
# Format analysis output
analysis_text = self.format_analysis_output(analysis)
follow_ups = "### πŸ’‘ Suggested Follow-ups:\n" + \
'\n'.join(analysis.get('follow_ups', []))
rq_coverage = sum(self.coverage_status["rq_covered"]) / len(
self.research_questions) * 100 if self.research_questions else 0
protocol_coverage = sum(self.coverage_status["protocol_covered"]) / len(
self.interview_protocol) * 100 if self.interview_protocol else 0
# Track unique codes
all_codes = list(set(self.detected_codes))
applied_unique = list(set(analysis.get("codes_applied", [])))
emergent_unique = list(set(analysis.get("emergent_codes", [])))
coverage = f"""### πŸ“ˆ Overall Progress:
- **Research Questions:** {rq_coverage:.0f}% ({sum(self.coverage_status["rq_covered"])}/{len(self.research_questions)})
- **Protocol Questions:** {protocol_coverage:.0f}% ({sum(self.coverage_status["protocol_covered"])}/{len(self.interview_protocol)})
- **Total Unique Codes:** {len(all_codes)}
- Framework Codes: {len(applied_unique)}
- Emergent Codes: {len(emergent_unique)}
- **Segments Processed:** {len(self.processed_files)}"""
progress = f"βœ… Completed: {file_name} ({segment_label})"
else:
analysis_text = f"❌ {analysis['error']}"
follow_ups = "Unable to generate follow-ups"
coverage = "Unable to calculate coverage"
progress = f"❌ Failed: {file_name}"
return full_transcript, analysis_text, follow_ups, coverage, progress
# Initialize
copilot = InterviewCoPilot()
# Create improved interface
with gr.Blocks(title="Research Interview Co-Pilot", theme=gr.themes.Soft(), css="""
.file-info { background-color: #f0f0f0; padding: 10px; border-radius: 5px; margin: 10px 0; }
.success { color: #28a745; }
.warning { color: #ffc107; }
.error { color: #dc3545; }
h1 { text-align: center; }
.contain { max-width: 1200px; margin: auto; }
""") as app:
gr.Markdown("""
# πŸŽ™οΈ Research Interview Co-Pilot - Enhanced with Multi-View Analysis
**Transcription:** OpenAI Whisper | **Analysis:** Google Gemini Pro
Now with individual segment analysis, combined analysis, and segment comparison!
""")
with gr.Tab("πŸ“‹ Setup"):
gr.Markdown("### Set up your research context")
with gr.Row():
with gr.Column():
rq_input = gr.Textbox(
label="Research Questions (one per line) *",
placeholder="What pedagogical strategies are evident in AI educators?\nHow do AI tools emphasize practical applications?\nWhat are the differences between various AI approaches?",
lines=6
)
protocol_input = gr.Textbox(
label="Interview Protocol Questions (one per line)",
placeholder="Tell me about your experience with AI\nHow do you use AI tools?\nWhat challenges have you faced?",
lines=6
)
with gr.Column():
framework_input = gr.Textbox(
label="Theoretical Framework (optional)",
placeholder="e.g., Technology Acceptance Model (TAM)\nGrounded Theory approach\nActivity Theory lens",
lines=3
)
codes_input = gr.Textbox(
label="Predefined Codes (optional - format: 'Category: code1, code2')",
placeholder="Pedagogical: Scaffolding, Direct Instruction, Guided Practice\nPractical: Application, Implementation, Real-world Use\nEthical: Privacy Concerns, Bias Awareness, Transparency",
lines=6
)
focus_input = gr.Textbox(
label="Analysis Focus Areas (optional - one per line)",
placeholder="Look for emotional responses\nPay attention to metaphors used\nNote any resistance or enthusiasm",
lines=3
)
# Segment continuation option
with gr.Row():
continue_interview = gr.Checkbox(
label="This is a continuation of a previous interview segment",
value=False
)
segment_info = gr.Textbox(
label="Segment Info",
value="Segment 1",
interactive=False
)
setup_btn = gr.Button("Setup Research Context", variant="primary", size="lg")
setup_output = gr.Textbox(label="Setup Status", interactive=False, lines=6)
# Save/Load framework buttons
with gr.Row():
save_framework_btn = gr.Button("πŸ’Ύ Save Framework", size="sm")
load_framework_btn = gr.Button("πŸ“‚ Load Framework", size="sm")
framework_file = gr.File(label="Framework File", visible=False, file_types=[".json"])
def update_segment_info(is_continuation):
if is_continuation:
copilot.is_continuation = True
copilot.segment_number += 1
return f"Segment {copilot.segment_number} (Continuing from previous)"
else:
copilot.is_continuation = False
copilot.segment_number = 1
return "Segment 1"
def save_framework(rq, protocol, framework, codes, focus):
"""Save current framework to JSON file"""
framework_data = {
"research_questions": rq,
"interview_protocol": protocol,
"theoretical_framework": framework,
"predefined_codes": codes,
"analysis_focus": focus,
"saved_date": datetime.now().isoformat()
}
filename = f"framework_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
filepath = os.path.join(copilot.temp_dir, filename)
with open(filepath, 'w') as f:
json.dump(framework_data, f, indent=2)
return gr.update(visible=True, value=filepath)
def load_framework(file):
"""Load framework from JSON file"""
if not file:
return "", "", "", "", "", "No file selected"
try:
with open(file.name, 'r') as f:
data = json.load(f)
return (
data.get("research_questions", ""),
data.get("interview_protocol", ""),
data.get("theoretical_framework", ""),
data.get("predefined_codes", ""),
data.get("analysis_focus", ""),
f"βœ… Loaded framework from {os.path.basename(file.name)}"
)
except Exception as e:
return "", "", "", "", "", f"❌ Error loading file: {str(e)}"
continue_interview.change(
update_segment_info,
inputs=[continue_interview],
outputs=[segment_info]
)
setup_btn.click(
fn=copilot.setup_research_context,
inputs=[rq_input, protocol_input, framework_input, codes_input, focus_input],
outputs=setup_output
)
save_framework_btn.click(
save_framework,
inputs=[rq_input, protocol_input, framework_input, codes_input, focus_input],
outputs=[framework_file]
)
framework_file.change(
lambda x: gr.update(visible=False),
inputs=[framework_file],
outputs=[framework_file]
)
load_framework_btn.click(
lambda: gr.update(visible=True),
outputs=[framework_file]
).then(
load_framework,
inputs=[framework_file],
outputs=[rq_input, protocol_input, framework_input, codes_input, focus_input, setup_output]
)
with gr.Tab("🎀 Interview Processing"):
gr.Markdown("### Process interview audio with multi-view analysis")
# Session info at the top
with gr.Row():
session_info = gr.Markdown(copilot.get_session_summary())
with gr.Row():
# Session control buttons
new_file_btn = gr.Button("πŸ“ New File, Keep Setup", variant="secondary")
reset_session_btn = gr.Button("πŸ”„ Reset Session", variant="secondary")
reset_all_btn = gr.Button("πŸ—‘οΈ Reset Everything", variant="stop")
with gr.Row():
with gr.Column(scale=1):
# File upload with preview
audio_input = gr.Audio(
sources=["upload", "microphone"],
type="filepath",
label="πŸ“ Upload Audio File or 🎀 Record",
interactive=True
)
file_status = gr.Markdown("*Upload a file to see its status*")
# Compression tool
with gr.Accordion("πŸ”§ Audio Compression Tool", open=False):
gr.Markdown("Compress large audio files")
quality_select = gr.Radio(
choices=["high", "medium", "low"],
value="medium",
label="Compression Quality"
)
compress_btn = gr.Button("Compress Audio", variant="secondary")
compress_output = gr.Markdown()
compressed_audio = gr.Audio(
label="Compressed Audio",
visible=False
)
process_btn = gr.Button("πŸ” Process & Analyze", variant="primary", size="lg")
# Add visual processing indicator
processing_status = gr.Markdown(
value="",
visible=True
)
# Add progress bar
with gr.Row():
progress_bar = gr.Progress()
progress_status = gr.Textbox(
label="Progress",
interactive=False,
lines=4,
value="Ready to process audio..."
)
# Add multi-view analysis button AFTER progress status
generate_multiview_btn = gr.Button(
"πŸ“Š Generate Multi-View Analysis",
variant="secondary",
size="lg",
visible=True # Always visible for now
)
with gr.Column(scale=2):
# Results area with enhanced tabs
with gr.Tabs():
with gr.Tab("πŸ“ Transcript"):
transcript_output = gr.Textbox(
label="Full Transcript",
lines=15,
max_lines=25,
interactive=False
)
with gr.Tab("πŸ” Current Segment"):
current_analysis_output = gr.Markdown(
value="*Process a segment to see analysis*"
)
with gr.Tab("πŸ“‘ All Segments"):
all_segments_output = gr.Markdown(
value="*Individual analyses will appear here*"
)
with gr.Tab("πŸ”— Combined Analysis"):
combined_analysis_output = gr.Markdown(
value="*Combined analysis will appear here after 2+ segments*"
)
with gr.Tab("πŸ“Š Comparison"):
comparison_output = gr.Markdown(
value="*Segment comparison will appear here*"
)
with gr.Tab("πŸ’‘ Follow-ups"):
followup_output = gr.Markdown()
with gr.Tab("πŸ“ˆ Coverage"):
coverage_output = gr.Markdown()
# Hidden state to store file path
audio_state = gr.State()
# Session management functions
def new_file_keep_setup():
"""Clear audio input but keep framework"""
copilot.is_continuation = True
copilot.segment_number = len(copilot.session_segments) + 1
return (
None, # Clear audio input
"*Upload a new file to continue the interview*",
f"Ready for Segment {copilot.segment_number}",
copilot.get_session_summary()
)
def reset_session():
"""Reset session but keep framework"""
result = copilot.reset_session(keep_framework=True)
return (
None, # Clear audio
"*Session reset. Framework kept.*",
"Ready to process audio...",
copilot.get_session_summary(),
"" # Clear transcript
)
def reset_everything():
"""Reset everything including framework"""
result = copilot.reset_session(keep_framework=False)
return (
None, # Clear audio
"*Everything reset. Please set up framework again.*",
"Ready to process audio...",
copilot.get_session_summary(),
"", # Clear transcript
"❌ Framework cleared. Please go to Setup tab."
)
# File status update - store the path in state
audio_input.change(
fn=copilot.check_audio_file,
inputs=[audio_input],
outputs=[audio_input, file_status, audio_state]
)
# Compression - update state with compressed file
compress_btn.click(
fn=copilot.compress_audio,
inputs=[audio_state, quality_select],
outputs=[compressed_audio, compress_output]
).then(
fn=lambda x, msg: (gr.update(visible=True), x) if x else (gr.update(visible=False), None),
inputs=[compressed_audio, compress_output],
outputs=[compressed_audio, audio_state]
)
# Modified process function to handle multi-view
def process_and_update_session_multiview(audio_path, progress=gr.Progress()):
"""Process audio and update session info with multi-view support"""
# Create a progress callback function
def update_progress(message):
progress(0.5, desc=message)
return message
# Initialize progress
progress(0, desc="Starting audio processing...")
# First, process the current segment with progress callback
results = copilot.process_interview_segment(audio_path, progress_callback=update_progress)
# Update progress to complete
progress(1.0, desc="Processing complete!")
# Add to session if successful
if results[4].startswith("βœ…"):
file_name = copilot.current_file_info.get("name", "unknown")
duration = copilot.current_file_info.get("size_mb", 0) * 0.5 # Rough estimate
transcript_length = len(results[0])
copilot.add_segment_to_session(file_name, duration, transcript_length)
# Get current segment analysis
current_segment_analysis = results[1]
# Check if we should show multi-view button (only after 2+ segments for meaningful comparison)
show_multiview = len(copilot.session_segments) >= 2
# Return results plus updated session info
return (
results[0], # transcript
current_segment_analysis, # current segment analysis
results[2], # follow-ups
results[3], # coverage
results[4], # progress
copilot.get_session_summary(), # session info
gr.update(visible=show_multiview) # multi-view button visibility
)
# Multi-view generation function
def generate_all_views():
"""Generate all analysis views"""
individual, combined, comparison = copilot.generate_multi_view_analysis()
return individual, combined, comparison
# Connect the process button with loading state
process_btn.click(
fn=lambda: gr.update(
value="πŸ”„ **Processing in progress...** Please wait, this may take several minutes for large files."),
outputs=[processing_status]
).then(
fn=process_and_update_session_multiview,
inputs=[audio_state],
outputs=[
transcript_output,
current_analysis_output,
followup_output,
coverage_output,
progress_status,
session_info,
generate_multiview_btn
]
).then(
fn=lambda: gr.update(value=""),
outputs=[processing_status]
)
# Connect the multi-view button
generate_multiview_btn.click(
fn=generate_all_views,
outputs=[
all_segments_output,
combined_analysis_output,
comparison_output
]
)
# Session control buttons
new_file_btn.click(
fn=new_file_keep_setup,
outputs=[audio_input, file_status, progress_status, session_info]
)
reset_session_btn.click(
fn=reset_session,
outputs=[audio_input, file_status, progress_status, session_info, transcript_output]
)
reset_all_btn.click(
fn=reset_everything,
outputs=[audio_input, file_status, progress_status, session_info, transcript_output,
current_analysis_output]
)
with gr.Tab("πŸ“Š Summary & Export"):
gr.Markdown("### Generate comprehensive summary with multi-view analysis")
def generate_enhanced_summary():
if not copilot.transcript_history:
return "No interview data yet.", "", ""
unique_codes = list(set(copilot.detected_codes))
# Generate different formats
markdown_summary = f"""# Interview Summary Report
**Generated:** {datetime.now().strftime("%Y-%m-%d %H:%M")}
**Analysis Engine:** Google Gemini Pro
**Files Processed:** {', '.join(copilot.processed_files)}
**Total Segments:** {len(copilot.session_segments)}
## Research Question Coverage
{chr(10).join([f"- {'βœ…' if covered else '❌'} {q}" for q, covered in zip(copilot.research_questions, copilot.coverage_status["rq_covered"])])}
## Detected Codes/Themes ({len(unique_codes)} unique)
{chr(10).join(['- ' + code for code in unique_codes])}
## Segment-by-Segment Analysis
{"Included in multi-view analysis - see Interview Processing tab" if copilot.segment_analyses else "No individual analyses yet"}
## Full Transcript
{chr(10).join(copilot.transcript_history)}"""
# CSV format for codes
csv_codes = "Code,Frequency\n"
code_freq = {}
for code in copilot.detected_codes:
code_freq[code] = code_freq.get(code, 0) + 1
for code, freq in sorted(code_freq.items(), key=lambda x: x[1], reverse=True):
csv_codes += f'"{code}",{freq}\n'
# JSON format with segment analyses
json_export = json.dumps({
"metadata": {
"date": datetime.now().isoformat(),
"files": copilot.processed_files,
"total_segments": len(copilot.transcript_history),
"analysis_engine": "Gemini Pro"
},
"research_questions": {
"questions": copilot.research_questions,
"coverage": copilot.coverage_status["rq_covered"]
},
"codes": unique_codes,
"transcripts": copilot.transcript_history,
"segment_analyses": {str(k): v for k, v in copilot.segment_analyses.items()} if hasattr(copilot,
'segment_analyses') else {}
}, indent=2)
return markdown_summary, csv_codes, json_export
with gr.Row():
summary_btn = gr.Button("Generate All Formats", variant="primary", size="lg")
with gr.Row():
with gr.Column():
summary_display = gr.Markdown(label="Summary Preview")
with gr.Column():
with gr.Accordion("πŸ“₯ Export Options", open=True):
csv_export = gr.Textbox(
label="CSV Export (Codes)",
lines=10,
interactive=True
)
json_export = gr.Textbox(
label="JSON Export (Complete Data)",
lines=10,
interactive=True
)
summary_btn.click(
fn=generate_enhanced_summary,
outputs=[summary_display, csv_export, json_export]
)
with gr.Tab("ℹ️ Help"):
gr.Markdown(f"""
### System Information
**Temp Directory:** {copilot.temp_dir}
**Transcription Engine:** OpenAI Whisper
- Requires: OPENAI_API_KEY in .env file
- Max file size: 25 MB
- Supported formats: MP3, WAV, M4A, OGG, WEBM, MP4, MPEG, MPGA
**Analysis Engine:** Google Gemini Pro
- Requires: GEMINI_API_KEY in .env file
- Free tier: 60 requests per minute
- No file size limits (only processes text)
### Multi-View Analysis Features
**Current Segment View:** Shows analysis of the just-processed segment
**All Segments View:** Shows individual analyses for each segment
**Combined Analysis:** Analyzes all segments together to find patterns
**Comparison View:** Side-by-side comparison of all segments
### File Handling Tips
**To reduce file size:**
1. Use the built-in compression tool
2. Record at lower quality (16kHz, mono)
3. Split long recordings into segments
**Best practices:**
- Process 3-5 minute segments for optimal results
- Use clear file names for easy tracking
- Check file size before processing
### Troubleshooting
**If recording doesn't work:**
- Check browser permissions for microphone
- Try a different browser (Chrome/Edge work best)
- Use upload instead of recording
**If processing fails:**
- Check the console for detailed error messages
- Verify your API keys are correct
- Ensure the audio file format is supported
### Required API Keys
Add to your `.env` file:
```
OPENAI_API_KEY=sk-your-openai-key
GEMINI_API_KEY=your-gemini-key
```
""")
# Launch
if __name__ == "__main__":
print("\n" + "=" * 50)
print("πŸš€ Starting Enhanced Research Interview Co-Pilot with Multi-View Analysis")
print("=" * 50)
# Check temp directory
print(f"πŸ“ Temp directory: {copilot.temp_dir}")
print(f" - Free space: {shutil.disk_usage(tempfile.gettempdir()).free / (1024 ** 3):.1f} GB")
# Check dependencies
if shutil.which('ffmpeg'):
print("βœ… FFmpeg found - compression available")
else:
print("⚠️ FFmpeg not found - compression unavailable")
# Check API keys
if not os.getenv("OPENAI_API_KEY"):
print("❌ No OpenAI API key found (required for transcription)")
else:
print("βœ… OpenAI API key loaded (Whisper transcription)")
# Test OpenAI client initialization
try:
test_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
print("βœ… OpenAI client initialized successfully")
except Exception as e:
print(f"❌ Error initializing OpenAI client: {e}")
if not os.getenv("GEMINI_API_KEY"):
print("❌ No Gemini API key found (required for analysis)")
else:
print("βœ… Gemini API key loaded (analysis)")
if not os.getenv("OPENAI_API_KEY") or not os.getenv("GEMINI_API_KEY"):
print("\n⚠️ Please add missing API keys to your .env file")
else:
print("\nβœ… All systems ready!")
print("\nπŸ“Œ Launching application...")
app.queue().launch()