Spaces:
Build error
Build error
| import streamlit as st | |
| import io | |
| from io import BytesIO | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import google.generativeai as genai | |
| from datetime import datetime | |
| import json | |
| import numpy as np | |
| from docx import Document | |
| import re | |
| from prompts import SESSION_EVALUATION_PROMPT, MI_SYSTEM_PROMPT | |
| def show_session_analysis(): | |
| st.title("MI Session Analysis Dashboard") | |
| # Initialize session state for analysis results | |
| if 'analysis_results' not in st.session_state: | |
| st.session_state.analysis_results = None | |
| if 'current_transcript' not in st.session_state: | |
| st.session_state.current_transcript = None | |
| # Main layout | |
| col1, col2 = st.columns([1, 2]) | |
| with col1: | |
| show_upload_section() | |
| with col2: | |
| if st.session_state.analysis_results: | |
| show_analysis_results() | |
| def show_upload_section(): | |
| st.header("Session Data Upload") | |
| upload_type = st.radio( | |
| "Select Input Method:", | |
| ["Audio Recording", "Video Recording", "Text Transcript", "Session Notes", "Previous Session Data"] | |
| ) | |
| if upload_type in ["Audio Recording", "Video Recording"]: | |
| file = st.file_uploader( | |
| f"Upload {upload_type}", | |
| type=["wav", "mp3", "mp4"] if upload_type == "Audio Recording" else ["mp4", "avi", "mov"] | |
| ) | |
| if file: | |
| process_media_file(file, upload_type) | |
| elif upload_type == "Text Transcript": | |
| file = st.file_uploader("Upload Transcript", type=["txt", "doc", "docx", "json"]) | |
| if file: | |
| process_text_file(file) | |
| elif upload_type == "Session Notes": | |
| show_manual_input_form() | |
| else: # Previous Session Data | |
| show_previous_sessions_selector() | |
| def process_video_file(video_file): | |
| """Process uploaded video file""" | |
| try: | |
| # Create a unique temporary file name | |
| temp_path = f"temp_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" | |
| # Save video temporarily | |
| with open(temp_path, "wb") as f: | |
| f.write(video_file.getbuffer()) | |
| # Display video | |
| st.video(temp_path) | |
| # Add transcript input | |
| transcript = st.text_area( | |
| "Enter the session transcript:", | |
| height=300, | |
| help="Paste or type the transcript of the session here." | |
| ) | |
| # Add analyze button | |
| if st.button("Analyze Transcript"): | |
| if transcript.strip(): | |
| with st.spinner('Analyzing transcript...'): | |
| analyze_session_content(transcript) | |
| else: | |
| st.warning("Please enter a transcript before analyzing.") | |
| # Clean up temporary file | |
| try: | |
| os.remove(temp_path) | |
| except: | |
| pass | |
| except Exception as e: | |
| st.error(f"Error processing video: {str(e)}") | |
| def process_audio_file(audio_file): | |
| """Process uploaded audio file""" | |
| try: | |
| # Save audio temporarily | |
| temp_path = f"temp_audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" | |
| with open(temp_path, "wb") as f: | |
| f.write(audio_file.getbuffer()) | |
| st.audio(temp_path) | |
| st.info("Audio uploaded successfully. Please provide transcript.") | |
| # Add manual transcript input | |
| transcript = st.text_area("Enter the session transcript:", height=300) | |
| # Add analyze button | |
| if st.button("Analyze Transcript"): | |
| if transcript: | |
| with st.spinner('Analyzing transcript...'): | |
| st.session_state.current_transcript = transcript | |
| analyze_session_content(transcript) | |
| else: | |
| st.warning("Please enter a transcript before analyzing.") | |
| except Exception as e: | |
| st.error(f"Error processing audio: {str(e)}") | |
| def process_media_file(file, type): | |
| st.write(f"Processing {type}...") | |
| # Add processing status | |
| status = st.empty() | |
| progress_bar = st.progress(0) | |
| try: | |
| # Read file content | |
| file_content = file.read() | |
| status.text("Generating transcript...") | |
| progress_bar.progress(50) | |
| # Generate transcript using Gemini | |
| model = genai.GenerativeModel('gemini-pro') | |
| # Convert file content to text | |
| if type == "Audio Recording": | |
| # For audio files, create a prompt that describes the audio | |
| prompt = f""" | |
| This is an audio recording of a therapy session. | |
| Please transcribe the conversation and include speaker labels where possible. | |
| Focus on capturing: | |
| 1. The therapist's questions and reflections | |
| 2. The client's responses and statements | |
| 3. Any significant pauses or non-verbal sounds | |
| """ | |
| else: # Video Recording | |
| # For video files, create a prompt that describes the video | |
| prompt = f""" | |
| This is a video recording of a therapy session. | |
| Please transcribe the conversation and include: | |
| 1. Speaker labels | |
| 2. Verbal communication | |
| 3. Relevant non-verbal cues and body language | |
| 4. Significant pauses or interactions | |
| """ | |
| # Generate transcript | |
| response = model.generate_content(prompt) | |
| transcript = response.text | |
| if transcript: | |
| st.session_state.current_transcript = transcript | |
| status.text("Analyzing content...") | |
| progress_bar.progress(80) | |
| analyze_session_content(transcript) | |
| progress_bar.progress(100) | |
| status.text("Processing complete!") | |
| except Exception as e: | |
| st.error(f"Error processing file: {str(e)}") | |
| finally: | |
| status.empty() | |
| progress_bar.empty() | |
| def get_processing_step_name(step): | |
| steps = [ | |
| "Loading media file", | |
| "Converting to audio", | |
| "Performing speech recognition", | |
| "Generating transcript", | |
| "Preparing analysis" | |
| ] | |
| return steps[step] | |
| def process_text_file(file): | |
| """Process uploaded text file""" | |
| try: | |
| # Read file content | |
| content = file.getvalue().decode("utf-8") | |
| st.session_state.current_transcript = content | |
| # Display transcript with edit option | |
| edited_transcript = st.text_area( | |
| "Review and edit transcript if needed:", | |
| value=content, | |
| height=300 | |
| ) | |
| # Add analyze button | |
| if st.button("Analyze Transcript"): | |
| with st.spinner('Analyzing transcript...'): | |
| st.session_state.current_transcript = edited_transcript | |
| analyze_session_content(edited_transcript) | |
| except Exception as e: | |
| st.error(f"Error processing file: {str(e)}") | |
| def parse_analysis_results(raw_results): | |
| """Parse the raw analysis results into structured format""" | |
| if isinstance(raw_results, dict): | |
| return raw_results # Already parsed | |
| try: | |
| # If it's a string, try to extract structured data | |
| analysis = { | |
| 'mi_adherence_score': 0, | |
| 'key_themes': [], | |
| 'technique_usage': {}, | |
| 'strengths': [], | |
| 'areas_for_improvement': [], | |
| 'session_summary': '' | |
| } | |
| # Extract score (assuming it's in format "Score: XX") | |
| score_match = re.search(r'Score:\s*(\d+)', raw_results) | |
| if score_match: | |
| analysis['mi_adherence_score'] = int(score_match.group(1)) | |
| # Extract themes (assuming they're listed after "Key Themes:") | |
| themes_match = re.search(r'Key Themes:(.*?)(?=\n\n|\Z)', raw_results, re.DOTALL) | |
| if themes_match: | |
| themes = themes_match.group(1).strip().split('\n') | |
| analysis['key_themes'] = [t.strip('- ') for t in themes if t.strip()] | |
| # Extract techniques (assuming they're listed with counts) | |
| techniques = re.findall(r'(\w+\s*\w*)\s*:\s*(\d+)', raw_results) | |
| if techniques: | |
| analysis['technique_usage'] = {t[0]: int(t[1]) for t in techniques} | |
| # Extract strengths | |
| strengths_match = re.search(r'Strengths:(.*?)(?=Areas for Improvement|\Z)', raw_results, re.DOTALL) | |
| if strengths_match: | |
| strengths = strengths_match.group(1).strip().split('\n') | |
| analysis['strengths'] = [s.strip('- ') for s in strengths if s.strip()] | |
| # Extract areas for improvement | |
| improvements_match = re.search(r'Areas for Improvement:(.*?)(?=\n\n|\Z)', raw_results, re.DOTALL) | |
| if improvements_match: | |
| improvements = improvements_match.group(1).strip().split('\n') | |
| analysis['areas_for_improvement'] = [i.strip('- ') for i in improvements if i.strip()] | |
| # Extract summary | |
| summary_match = re.search(r'Summary:(.*?)(?=\n\n|\Z)', raw_results, re.DOTALL) | |
| if summary_match: | |
| analysis['session_summary'] = summary_match.group(1).strip() | |
| return analysis | |
| except Exception as e: | |
| st.error(f"Error parsing analysis results: {str(e)}") | |
| return None | |
| def show_manual_input_form(): | |
| st.subheader("Session Details") | |
| with st.form("session_notes_form"): | |
| # Basic session information | |
| session_date = st.date_input("Session Date", datetime.now()) | |
| session_duration = st.number_input("Duration (minutes)", min_value=15, max_value=120, value=50) | |
| # Session content | |
| session_notes = st.text_area( | |
| "Session Notes", | |
| height=300, | |
| placeholder="Enter detailed session notes here..." | |
| ) | |
| # Key themes and observations | |
| key_themes = st.text_area( | |
| "Key Themes", | |
| height=100, | |
| placeholder="Enter key themes identified during the session..." | |
| ) | |
| # MI specific elements | |
| mi_techniques_used = st.multiselect( | |
| "MI Techniques Used", | |
| ["Open Questions", "Affirmations", "Reflections", "Summaries", | |
| "Change Talk", "Commitment Language", "Planning"] | |
| ) | |
| # Submit button | |
| submitted = st.form_submit_button("Analyze Session") | |
| if submitted and session_notes: | |
| # Combine all input into a structured format | |
| session_data = { | |
| 'date': session_date, | |
| 'duration': session_duration, | |
| 'notes': session_notes, | |
| 'themes': key_themes, | |
| 'techniques': mi_techniques_used | |
| } | |
| # Process the session data | |
| st.session_state.current_transcript = format_session_data(session_data) | |
| analyze_session_content(st.session_state.current_transcript) | |
| def analyze_session_content(transcript): | |
| """Analyze the session transcript using Gemini""" | |
| try: | |
| if not transcript: | |
| st.warning("Please provide a transcript for analysis.") | |
| return | |
| # Configure the model | |
| model = genai.GenerativeModel('gemini-pro') | |
| # Structured prompt for MI analysis | |
| prompt = f""" | |
| As an MI (Motivational Interviewing) expert, analyze this therapy session transcript and provide detailed feedback in the following format: | |
| === MI Adherence === | |
| Score: [Provide a score from 0-100] | |
| Strengths: | |
| - [List 3 specific strengths with examples] | |
| Areas for Growth: | |
| - [List 3 specific areas needing improvement with examples] | |
| === Technical Analysis === | |
| OARS Usage Count: | |
| - Open Questions: [number] | |
| - Affirmations: [number] | |
| - Reflections: [number] | |
| - Summaries: [number] | |
| === Client Language Analysis === | |
| Change Talk Examples: | |
| - [List 3-4 specific quotes showing change talk] | |
| Sustain Talk Examples: | |
| - [List 2-3 specific quotes showing sustain talk] | |
| Change Talk/Sustain Talk Ratio: [X:Y] | |
| === Session Flow === | |
| Key Moments: | |
| 1. [Describe key moment 1] | |
| 2. [Describe key moment 2] | |
| 3. [Describe key moment 3] | |
| Therapeutic Process: | |
| - [Describe how the session progressed] | |
| - [Note any significant shifts] | |
| === Recommendations === | |
| Priority Actions: | |
| 1. [Specific recommendation 1] | |
| 2. [Specific recommendation 2] | |
| 3. [Specific recommendation 3] | |
| Development Strategies: | |
| - [Practical strategy 1] | |
| - [Practical strategy 2] | |
| Analyze this transcript: | |
| {transcript} | |
| """ | |
| # Generate response | |
| response = model.generate_content(prompt) | |
| # Store results | |
| st.session_state.analysis_results = response.text | |
| return True | |
| except Exception as e: | |
| st.error(f"Error in analysis: {str(e)}") | |
| return False | |
| def generate_transcript(audio_content): | |
| """ | |
| Generate transcript from audio content using Google Speech-to-Text | |
| Note: This requires the Google Cloud Speech-to-Text API | |
| """ | |
| try: | |
| # Initialize Speech-to-Text client | |
| client = speech_v1.SpeechClient() | |
| # Configure audio and recognition settings | |
| audio = speech_v1.RecognitionAudio(content=audio_content) | |
| config = speech_v1.RecognitionConfig( | |
| encoding=speech_v1.RecognitionConfig.AudioEncoding.LINEAR16, | |
| sample_rate_hertz=16000, | |
| language_code="en-US", | |
| enable_automatic_punctuation=True, | |
| ) | |
| # Perform the transcription | |
| response = client.recognize(config=config, audio=audio) | |
| # Combine all transcriptions | |
| transcript = "" | |
| for result in response.results: | |
| transcript += result.alternatives[0].transcript + " " | |
| return transcript.strip() | |
| except Exception as e: | |
| st.error(f"Error in transcript generation: {str(e)}") | |
| return None | |
| def convert_video_to_audio(video_file): | |
| """ | |
| Convert video file to audio content | |
| Note: This is a placeholder - you'll need to implement actual video to audio conversion | |
| """ | |
| # Placeholder for video to audio conversion | |
| # You might want to use libraries like moviepy or ffmpeg-python | |
| st.warning("Video to audio conversion not implemented yet") | |
| return None | |
| def process_analysis_results(raw_analysis): | |
| """Process and structure the analysis results""" | |
| # Parse the raw analysis text and extract structured data | |
| sections = extract_analysis_sections(raw_analysis) | |
| # Calculate metrics | |
| metrics = calculate_mi_metrics(raw_analysis) | |
| return { | |
| "raw_analysis": raw_analysis, | |
| "structured_sections": sections, | |
| "metrics": metrics, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def show_mi_metrics_dashboard(metrics): | |
| st.subheader("MI Performance Dashboard") | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| show_metric_card( | |
| "MI Spirit Score", | |
| metrics.get('mi_spirit_score', 0), | |
| "0-5 scale" | |
| ) | |
| with col2: | |
| show_metric_card( | |
| "Change Talk Ratio", | |
| metrics.get('change_talk_ratio', 0), | |
| "Change vs Sustain" | |
| ) | |
| with col3: | |
| show_metric_card( | |
| "Reflection Ratio", | |
| metrics.get('reflection_ratio', 0), | |
| "Reflections/Questions" | |
| ) | |
| with col4: | |
| show_metric_card( | |
| "Overall Adherence", | |
| metrics.get('overall_adherence', 0), | |
| "Percentage" | |
| ) | |
| def show_metric_card(title, value, subtitle): | |
| st.markdown( | |
| f""" | |
| <div style="border:1px solid #ccc; padding:10px; border-radius:5px; text-align:center;"> | |
| <h3>{title}</h3> | |
| <h2>{value:.2f}</h2> | |
| <p>{subtitle}</p> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| def show_mi_adherence_analysis(results): | |
| st.subheader("MI Adherence Analysis") | |
| # OARS Implementation | |
| st.write("### OARS Implementation") | |
| show_oars_chart(results['metrics'].get('oars_metrics', {})) | |
| # MI Spirit Components | |
| st.write("### MI Spirit Components") | |
| show_mi_spirit_chart(results['metrics'].get('mi_spirit_metrics', {})) | |
| # Detailed breakdown | |
| st.write("### Detailed Analysis") | |
| st.markdown(results['structured_sections'].get('mi_adherence', '')) | |
| def show_technical_skills_analysis(results): | |
| st.subheader("Technical Skills Analysis") | |
| # Question Analysis | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| show_question_type_chart(results['metrics'].get('question_metrics', {})) | |
| with col2: | |
| show_reflection_depth_chart(results['metrics'].get('reflection_metrics', {})) | |
| # Detailed analysis | |
| st.markdown(results['structured_sections'].get('technical_skills', '')) | |
| def show_client_language_analysis(results): | |
| st.subheader("Client Language Analysis") | |
| # Change Talk Timeline | |
| show_change_talk_timeline(results['metrics'].get('change_talk_timeline', [])) | |
| # Language Categories | |
| show_language_categories_chart(results['metrics'].get('language_categories', {})) | |
| # Detailed analysis | |
| st.markdown(results['structured_sections'].get('client_language', '')) | |
| def show_session_flow_analysis(results): | |
| st.subheader("Session Flow Analysis") | |
| # Session Flow Timeline | |
| show_session_flow_timeline(results['metrics'].get('session_flow', [])) | |
| # Engagement Metrics | |
| show_engagement_metrics(results['metrics'].get('engagement_metrics', {})) | |
| # Detailed analysis | |
| st.markdown(results['structured_sections'].get('session_flow', '')) | |
| def show_recommendations(results): | |
| st.subheader("Recommendations and Next Steps") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("### Strengths") | |
| strengths = results['structured_sections'].get('strengths', []) | |
| for strength in strengths: | |
| st.markdown(f"✓ {strength}") | |
| with col2: | |
| st.write("### Growth Areas") | |
| growth_areas = results['structured_sections'].get('growth_areas', []) | |
| for area in growth_areas: | |
| st.markdown(f"→ {area}") | |
| st.write("### Suggested Interventions") | |
| st.markdown(results['structured_sections'].get('suggested_interventions', '')) | |
| st.write("### Next Session Planning") | |
| st.markdown(results['structured_sections'].get('next_session_plan', '')) | |
| # Utility functions for charts and visualizations | |
| def show_oars_chart(oars_metrics): | |
| # Create OARS radar chart using plotly | |
| categories = ['Open Questions', 'Affirmations', 'Reflections', 'Summaries'] | |
| values = [ | |
| oars_metrics.get('open_questions', 0), | |
| oars_metrics.get('affirmations', 0), | |
| oars_metrics.get('reflections', 0), | |
| oars_metrics.get('summaries', 0) | |
| ] | |
| fig = go.Figure(data=go.Scatterpolar( | |
| r=values, | |
| theta=categories, | |
| fill='toself' | |
| )) | |
| fig.update_layout( | |
| polar=dict( | |
| radialaxis=dict( | |
| visible=True, | |
| range=[0, max(values) + 1] | |
| )), | |
| showlegend=False | |
| ) | |
| st.plotly_chart(fig) | |
| def save_analysis_results(): | |
| """Save analysis results to file""" | |
| if st.session_state.analysis_results: | |
| try: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"analysis_results_{timestamp}.json" | |
| with open(filename, "w") as f: | |
| json.dump(st.session_state.analysis_results, f, indent=4) | |
| st.success(f"Analysis results saved to {filename}") | |
| except Exception as e: | |
| st.error(f"Error saving analysis results: {str(e)}") | |
| def show_upload_section(): | |
| """Display the upload section of the dashboard""" | |
| st.subheader("Upload Session") | |
| upload_type = st.radio( | |
| "Choose input method:", | |
| ["Text Transcript", "Video Recording", "Audio Recording", "Session Notes", "Previous Sessions"] | |
| ) | |
| if upload_type == "Text Transcript": | |
| file = st.file_uploader("Upload transcript file", type=['txt', 'doc', 'docx']) | |
| if file: | |
| process_text_file(file) | |
| elif upload_type == "Video Recording": | |
| video_file = st.file_uploader("Upload video file", type=['mp4', 'mov', 'avi']) | |
| if video_file: | |
| process_video_file(video_file) | |
| elif upload_type == "Audio Recording": | |
| audio_file = st.file_uploader("Upload audio file", type=['mp3', 'wav', 'm4a']) | |
| if audio_file: | |
| process_audio_file(audio_file) | |
| elif upload_type == "Session Notes": | |
| show_manual_input_form() | |
| else: | |
| show_previous_sessions_selector() | |
| def process_text_file(file): | |
| try: | |
| if file.name.endswith('.json'): | |
| content = json.loads(file.read().decode()) | |
| transcript = extract_transcript_from_json(content) | |
| elif file.name.endswith('.docx'): | |
| doc = Document(file) | |
| transcript = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) | |
| else: | |
| transcript = file.read().decode() | |
| if transcript: | |
| st.session_state.current_transcript = transcript | |
| analyze_session_content(transcript) | |
| except Exception as e: | |
| st.error(f"Error processing file: {str(e)}") | |
| def show_export_options(): | |
| st.sidebar.subheader("Export Options") | |
| if st.sidebar.button("Export Analysis Report"): | |
| save_analysis_results() | |
| report_format = st.sidebar.selectbox( | |
| "Report Format", | |
| ["PDF", "DOCX", "JSON"] | |
| ) | |
| if st.sidebar.button("Generate Report"): | |
| generate_report(report_format) | |
| def generate_report(format): | |
| """Generate analysis report in specified format""" | |
| # Add report generation logic here | |
| st.info(f"Generating {format} report... (Feature coming soon)") | |
| def show_previous_sessions_selector(): | |
| """Display selector for previous session data""" | |
| st.subheader("Previous Sessions") | |
| # Load or initialize previous sessions data | |
| if 'previous_sessions' not in st.session_state: | |
| st.session_state.previous_sessions = load_previous_sessions() | |
| if not st.session_state.previous_sessions: | |
| st.info("No previous sessions found.") | |
| return | |
| # Create session selector | |
| sessions = st.session_state.previous_sessions | |
| session_dates = [session['date'] for session in sessions] | |
| selected_date = st.selectbox( | |
| "Select Session Date:", | |
| session_dates, | |
| format_func=lambda x: x.strftime("%Y-%m-%d %H:%M") | |
| ) | |
| # Show selected session data | |
| if selected_date: | |
| selected_session = next( | |
| (session for session in sessions if session['date'] == selected_date), | |
| None | |
| ) | |
| if selected_session: | |
| st.session_state.current_transcript = selected_session['transcript'] | |
| analyze_session_content(selected_session['transcript']) | |
| def load_previous_sessions(): | |
| """Load previous session data from storage""" | |
| try: | |
| # Initialize empty list for sessions | |
| sessions = [] | |
| # Here you would typically load from your database or file storage | |
| # For demonstration, we'll create some sample data | |
| sample_sessions = [ | |
| { | |
| 'date': datetime.now(), | |
| 'transcript': "Sample transcript 1...", | |
| 'analysis': "Sample analysis 1..." | |
| }, | |
| { | |
| 'date': datetime.now(), | |
| 'transcript': "Sample transcript 2...", | |
| 'analysis': "Sample analysis 2..." | |
| } | |
| ] | |
| return sample_sessions | |
| except Exception as e: | |
| st.error(f"Error loading previous sessions: {str(e)}") | |
| return [] | |
| def format_session_data(session_data): | |
| """Format session data into analyzable transcript""" | |
| formatted_text = f""" | |
| Session Date: {session_data['date']} | |
| Duration: {session_data['duration']} minutes | |
| SESSION NOTES: | |
| {session_data['notes']} | |
| KEY THEMES: | |
| {session_data['themes']} | |
| MI TECHNIQUES USED: | |
| {', '.join(session_data['techniques'])} | |
| """ | |
| return formatted_text | |
| def show_analysis_results(): | |
| """Display the analysis results in organized tabs""" | |
| if 'analysis_results' not in st.session_state or not st.session_state.analysis_results: | |
| st.info("Please analyze a transcript first.") | |
| return | |
| results = st.session_state.analysis_results | |
| # Create tabs | |
| tabs = st.tabs([ | |
| "MI Adherence", | |
| "Technical Skills", | |
| "Client Language", | |
| "Session Flow", | |
| "Recommendations" | |
| ]) | |
| # MI Adherence Tab | |
| with tabs[0]: | |
| st.subheader("MI Adherence Analysis") | |
| # Extract score | |
| score_match = re.search(r'Score:\s*(\d+)', results) | |
| if score_match: | |
| score = int(score_match.group(1)) | |
| # Create score gauge | |
| fig = go.Figure(go.Indicator( | |
| mode="gauge+number", | |
| value=score, | |
| domain={'x': [0, 1], 'y': [0, 1]}, | |
| gauge={ | |
| 'axis': {'range': [0, 100]}, | |
| 'bar': {'color': "rgb(26, 118, 255)"}, | |
| 'steps': [ | |
| {'range': [0, 33], 'color': "lightgray"}, | |
| {'range': [33, 66], 'color': "gray"}, | |
| {'range': [66, 100], 'color': "darkgray"} | |
| ] | |
| } | |
| )) | |
| st.plotly_chart(fig) | |
| # Display strengths and areas for growth | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Strengths") | |
| strengths = re.findall(r'Strengths:\n((?:- .*\n)*)', results) | |
| if strengths: | |
| for strength in strengths[0].strip().split('\n'): | |
| if strength.startswith('- '): | |
| st.markdown(f"✅ {strength[2:]}") | |
| with col2: | |
| st.subheader("Areas for Growth") | |
| growth = re.findall(r'Areas for Growth:\n((?:- .*\n)*)', results) | |
| if growth: | |
| for area in growth[0].strip().split('\n'): | |
| if area.startswith('- '): | |
| st.markdown(f"🔄 {area[2:]}") | |
| # Technical Skills Tab | |
| with tabs[1]: | |
| st.subheader("OARS Technique Analysis") | |
| # Extract OARS counts | |
| oars_pattern = r'OARS Usage Count:\n- Open Questions: (\d+)\n- Affirmations: (\d+)\n- Reflections: (\d+)\n- Summaries: (\d+)' | |
| oars_match = re.search(oars_pattern, results) | |
| if oars_match: | |
| open_q = int(oars_match.group(1)) | |
| affirm = int(oars_match.group(2)) | |
| reflect = int(oars_match.group(3)) | |
| summ = int(oars_match.group(4)) | |
| # Create bar chart | |
| fig = go.Figure(data=[ | |
| go.Bar( | |
| x=['Open Questions', 'Affirmations', 'Reflections', 'Summaries'], | |
| y=[open_q, affirm, reflect, summ], | |
| marker_color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728'] | |
| ) | |
| ]) | |
| fig.update_layout( | |
| title="OARS Techniques Usage", | |
| xaxis_title="Technique Type", | |
| yaxis_title="Frequency", | |
| showlegend=False, | |
| height=400 | |
| ) | |
| st.plotly_chart(fig) | |
| # Display detailed breakdown | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("### Technique Counts") | |
| st.markdown(f"🔹 **Open Questions:** {open_q}") | |
| st.markdown(f"🔹 **Affirmations:** {affirm}") | |
| st.markdown(f"🔹 **Reflections:** {reflect}") | |
| st.markdown(f"🔹 **Summaries:** {summ}") | |
| with col2: | |
| # Calculate total and percentages | |
| total = open_q + affirm + reflect + summ | |
| st.markdown("### Technique Distribution") | |
| st.markdown(f"🔸 **Open Questions:** {(open_q/total*100):.1f}%") | |
| st.markdown(f"🔸 **Affirmations:** {(affirm/total*100):.1f}%") | |
| st.markdown(f"🔸 **Reflections:** {(reflect/total*100):.1f}%") | |
| st.markdown(f"🔸 **Summaries:** {(summ/total*100):.1f}%") | |
| # Add reflection-to-question ratio | |
| st.markdown("### Key Metrics") | |
| if open_q > 0: | |
| r_to_q = reflect / open_q | |
| st.metric( | |
| label="Reflection-to-Question Ratio", | |
| value=f"{r_to_q:.2f}", | |
| help="Target ratio is 2:1 or higher" | |
| ) | |
| # Add MI best practice guidelines | |
| st.markdown("### MI Best Practices") | |
| st.info(""" | |
| 📌 **Ideal OARS Distribution:** | |
| - Reflections should exceed questions (2:1 ratio) | |
| - Regular use of affirmations (at least 1-2 per session) | |
| - Strategic use of summaries at transition points | |
| - Open questions > 70% of all questions | |
| """) | |
| else: | |
| st.warning("Technical skills analysis data not found in the results.") | |
| # Client Language Tab | |
| with tabs[2]: | |
| st.subheader("Client Language Analysis") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("### Change Talk 🌱") | |
| change_talk = re.findall(r'Change Talk Examples:\n((?:- .*\n)*)', results) | |
| if change_talk: | |
| for talk in change_talk[0].strip().split('\n'): | |
| if talk.startswith('- '): | |
| st.markdown(f"- {talk[2:]}") | |
| with col2: | |
| st.markdown("### Sustain Talk 🔄") | |
| sustain_talk = re.findall(r'Sustain Talk Examples:\n((?:- .*\n)*)', results) | |
| if sustain_talk: | |
| for talk in sustain_talk[0].strip().split('\n'): | |
| if talk.startswith('- '): | |
| st.markdown(f"- {talk[2:]}") | |
| # Session Flow Tab | |
| with tabs[3]: | |
| st.subheader("Session Flow Analysis") | |
| # Key Moments | |
| st.markdown("### Key Moments") | |
| key_moments = re.findall(r'Key Moments:\n((?:\d\. .*\n)*)', results) | |
| if key_moments: | |
| for moment in key_moments[0].strip().split('\n'): | |
| if moment.strip(): | |
| st.markdown(f"{moment}") | |
| # Therapeutic Process | |
| st.markdown("### Therapeutic Process") | |
| process = re.findall(r'Therapeutic Process:\n((?:- .*\n)*)', results) | |
| if process: | |
| for item in process[0].strip().split('\n'): | |
| if item.startswith('- '): | |
| st.markdown(f"- {item[2:]}") | |
| # Recommendations Tab | |
| with tabs[4]: | |
| st.subheader("Recommendations") | |
| # Priority Actions | |
| st.markdown("### Priority Actions 🎯") | |
| priorities = re.findall(r'Priority Actions:\n((?:\d\. .*\n)*)', results) | |
| if priorities: | |
| for priority in priorities[0].strip().split('\n'): | |
| if priority.strip(): | |
| st.markdown(f"{priority}") | |
| # Development Strategies | |
| st.markdown("### Development Strategies 📈") | |
| strategies = re.findall(r'Development Strategies:\n((?:- .*\n)*)', results) | |
| if strategies: | |
| for strategy in strategies[0].strip().split('\n'): | |
| if strategy.startswith('- '): | |
| st.markdown(f"- {strategy[2:]}") | |
| def get_technique_description(technique): | |
| """Return description for MI techniques""" | |
| descriptions = { | |
| "Open Questions": "Questions that allow for elaboration and cannot be answered with a simple yes/no.", | |
| "Reflections": "Statements that mirror, rephrase, or elaborate on the client's speech.", | |
| "Affirmations": "Statements that recognize client strengths and acknowledge behaviors that lead to positive change.", | |
| "Summaries": "Statements that collect, link, and transition between client statements.", | |
| "Information Giving": "Providing information with permission and in response to client needs.", | |
| # Add more techniques as needed | |
| } | |
| return descriptions.get(technique, "Description not available") | |
| def create_session_timeline(timeline_data): | |
| """Create a visual timeline of the session""" | |
| if not timeline_data: | |
| st.info("Detailed timeline not available") | |
| return | |
| fig = go.Figure() | |
| # Add timeline visualization code here | |
| st.plotly_chart(fig) | |
| def get_improvement_suggestion(area): | |
| """Return specific suggestions for improvement areas""" | |
| suggestions = { | |
| "Open Questions": "Try replacing closed questions with open-ended ones. Instead of 'Did you exercise?', ask 'What kinds of physical activity have you been doing?'", | |
| "Reflections": "Practice using more complex reflections by adding meaning or emotion to what the client has said.", | |
| "Empathy": "Focus on seeing the situation from the client's perspective and verbalize your understanding.", | |
| # Add more suggestions as needed | |
| } | |
| return suggestions.get(area, "Work on incorporating this element more intentionally in your sessions.") | |
| def create_action_items(analysis): | |
| """Create specific action items based on analysis""" | |
| st.write("Based on the analysis, consider focusing on these specific actions:") | |
| # Example action items | |
| action_items = [ | |
| "Practice one new MI skill each session", | |
| "Record and review your sessions", | |
| "Focus on developing complex reflections", | |
| "Track change talk/sustain talk ratio" | |
| ] | |
| for item in action_items: | |
| st.checkbox(item) | |
| def show_relevant_resources(analysis): | |
| """Display relevant resources based on analysis""" | |
| resources = [ | |
| {"title": "MI Practice Exercises", "url": "#"}, | |
| {"title": "Reflection Templates", "url": "#"}, | |
| {"title": "Change Talk Recognition Guide", "url": "#"}, | |
| {"title": "MI Community of Practice", "url": "#"} | |
| ] | |
| for resource in resources: | |
| st.markdown(f"[{resource['title']}]({resource['url']})") | |
| def parse_analysis_response(response_text): | |
| """Parse the AI response into structured analysis results""" | |
| try: | |
| # Initialize default structure for analysis results | |
| analysis = { | |
| 'mi_adherence_score': 0.0, | |
| 'key_themes': [], | |
| 'technique_usage': {}, | |
| 'strengths': [], | |
| 'areas_for_improvement': [], | |
| 'recommendations': [], | |
| 'change_talk_instances': [], | |
| 'session_summary': "" | |
| } | |
| # Extract MI adherence score | |
| score_match = re.search(r'MI Adherence Score:\s*(\d+\.?\d*)', response_text) | |
| if score_match: | |
| analysis['mi_adherence_score'] = float(score_match.group(1)) | |
| # Extract key themes | |
| themes_section = re.search(r'Key Themes:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL) | |
| if themes_section: | |
| themes = themes_section.group(1).strip().split('\n') | |
| analysis['key_themes'] = [theme.strip('- ') for theme in themes if theme.strip()] | |
| # Extract technique usage | |
| technique_section = re.search(r'Technique Usage:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL) | |
| if technique_section: | |
| techniques = technique_section.group(1).strip().split('\n') | |
| for technique in techniques: | |
| if ':' in technique: | |
| name, count = technique.split(':') | |
| analysis['technique_usage'][name.strip()] = int(count.strip()) | |
| # Extract strengths | |
| strengths_section = re.search(r'Strengths:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL) | |
| if strengths_section: | |
| strengths = strengths_section.group(1).strip().split('\n') | |
| analysis['strengths'] = [s.strip('- ') for s in strengths if s.strip()] | |
| # Extract areas for improvement | |
| improvements_section = re.search(r'Areas for Improvement:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL) | |
| if improvements_section: | |
| improvements = improvements_section.group(1).strip().split('\n') | |
| analysis['areas_for_improvement'] = [i.strip('- ') for i in improvements if i.strip()] | |
| # Extract session summary | |
| summary_section = re.search(r'Session Summary:(.*?)(?=\n\n|\Z)', response_text, re.DOTALL) | |
| if summary_section: | |
| analysis['session_summary'] = summary_section.group(1).strip() | |
| return analysis | |
| except Exception as e: | |
| st.error(f"Error parsing analysis response: {str(e)}") | |
| return None | |
| def get_improvement_suggestion(area): | |
| """Return specific suggestions for improvement areas""" | |
| suggestions = { | |
| "open questions": "Practice replacing closed questions with open-ended ones. For example:\n- Instead of: 'Did you exercise?'\n- Try: 'What kinds of physical activity have you been doing?'", | |
| "reflections": "Work on using more complex reflections by adding meaning or emotion to what the client has said. Try to make at least two complex reflections for every simple reflection.", | |
| "empathy": "Focus on seeing the situation from the client's perspective. Take time to verbalize your understanding of their emotions and experiences.", | |
| "summaries": "Use more collecting summaries to gather key points discussed and transition summaries to move between topics.", | |
| "affirmations": "Look for opportunities to genuinely affirm client strengths and efforts, not just outcomes." | |
| } | |
| # Look for matching suggestions using partial string matching | |
| for key, value in suggestions.items(): | |
| if key in area.lower(): | |
| return value | |
| return "Focus on incorporating this element more intentionally in your sessions. Consider recording your sessions and reviewing them with a supervisor or peer." | |
| def create_gauge_chart(score): | |
| """Create a gauge chart for MI Adherence Score""" | |
| fig = go.Figure(go.Indicator( | |
| mode = "gauge+number", | |
| value = score, | |
| domain = {'x': [0, 1], 'y': [0, 1]}, | |
| title = {'text': "MI Adherence"}, | |
| gauge = { | |
| 'axis': {'range': [0, 100]}, | |
| 'bar': {'color': "darkblue"}, | |
| 'steps': [ | |
| {'range': [0, 40], 'color': "lightgray"}, | |
| {'range': [40, 70], 'color': "gray"}, | |
| {'range': [70, 100], 'color': "darkgray"} | |
| ], | |
| 'threshold': { | |
| 'line': {'color': "red", 'width': 4}, | |
| 'thickness': 0.75, | |
| 'value': 90 | |
| } | |
| } | |
| )) | |
| st.plotly_chart(fig) | |
| def create_technique_usage_chart(technique_usage): | |
| """Create a bar chart for MI technique usage""" | |
| df = pd.DataFrame(list(technique_usage.items()), columns=['Technique', 'Count']) | |
| fig = px.bar( | |
| df, | |
| x='Technique', | |
| y='Count', | |
| title='MI Technique Usage Frequency' | |
| ) | |
| fig.update_layout( | |
| xaxis_title="Technique", | |
| yaxis_title="Frequency", | |
| showlegend=False | |
| ) | |
| st.plotly_chart(fig) | |
| def extract_transcript_from_json(content): | |
| """Extract transcript from JSON content""" | |
| if isinstance(content, dict): | |
| return json.dumps(content, indent=2) | |
| return str(content) | |
| # Analysis display functions | |
| def show_mi_adherence_analysis(analysis): | |
| st.subheader("MI Adherence Analysis") | |
| st.write(analysis.get('raw_text', 'No analysis available')) | |
| def show_technical_skills_analysis(analysis): | |
| st.subheader("Technical Skills Analysis") | |
| st.write(analysis.get('raw_text', 'No analysis available')) | |
| def show_client_language_analysis(analysis): | |
| st.subheader("Client Language Analysis") | |
| st.write(analysis.get('raw_text', 'No analysis available')) | |
| def show_session_flow_analysis(analysis): | |
| st.subheader("Session Flow Analysis") | |
| st.write(analysis.get('raw_text', 'No analysis available')) | |
| def show_recommendations(analysis): | |
| st.subheader("Recommendations") | |
| st.write(analysis.get('raw_text', 'No recommendations available')) | |
| if __name__ == "__main__": | |
| show_session_analysis() |