Spaces:
Sleeping
Sleeping
| """ | |
| French Conversation Tutor - Main Application | |
| Practice French through natural conversation with Mr. Mistral! | |
| """ | |
| import gradio as gr | |
| import numpy as np | |
| import os | |
| import io | |
| import wave | |
| import tempfile | |
| import time | |
| from datetime import datetime | |
| from typing import List, Dict, Tuple | |
| import re | |
| import random | |
| import shutil | |
| from dotenv import load_dotenv | |
| import soundfile as sf # Added missing import | |
| # Load environment variables | |
| load_dotenv() | |
| # Model imports | |
| from mistralai import Mistral | |
| import google.generativeai as genai | |
| from groq import Groq | |
| import openai | |
| # Load API keys | |
| mistral_api_key = os.environ.get("MISTRAL_API_KEY") | |
| gemini_api_key = os.environ.get("GEMINI_API_KEY") | |
| groq_api_key = os.environ.get("GROQ_API_KEY") | |
| openai_api_key = os.environ.get("OPENAI_API_KEY") | |
| # Debug: Check if keys are loaded | |
| print(f"Mistral API key loaded: {'Yes' if mistral_api_key else 'No'}") | |
| print(f"Gemini API key loaded: {'Yes' if gemini_api_key else 'No'}") | |
| print(f"Groq API key loaded: {'Yes' if groq_api_key else 'No'}") | |
| print(f"OpenAI API key loaded: {'Yes' if openai_api_key else 'No'}") | |
| # Initialize clients | |
| mistral_client = None | |
| if mistral_api_key: | |
| mistral_client = Mistral(api_key=mistral_api_key) | |
| current_llm = "Mistral AI" | |
| elif gemini_api_key: | |
| genai.configure(api_key=gemini_api_key) | |
| current_llm = "Google Gemini (Fallback)" | |
| else: | |
| raise ValueError("Neither MISTRAL_API_KEY nor GEMINI_API_KEY found in environment variables.") | |
| # Initialize Gemini for fallback even if Mistral is primary | |
| if gemini_api_key and mistral_api_key: | |
| genai.configure(api_key=gemini_api_key) | |
| if not groq_api_key: | |
| raise ValueError("GROQ_API_KEY not found in environment variables.") | |
| groq_client = Groq(api_key=groq_api_key) | |
| # Global list to track temp files (to prevent deletion before serving) | |
| temp_audio_files = [] | |
| current_llm = "Unknown" # Track which LLM is being used | |
| def cleanup_old_audio_files(): | |
| global temp_audio_files | |
| # Keep more files and add delay to avoid deleting files being served | |
| if len(temp_audio_files) > 20: # Increased from 10 to 20 | |
| old_files = temp_audio_files[:-20] | |
| for file_path in old_files: | |
| try: | |
| # Check if file is older than 60 seconds before deleting | |
| if os.path.exists(file_path): | |
| file_age = datetime.now().timestamp() - os.path.getmtime(file_path) | |
| if file_age > 60: # Only delete files older than 60 seconds | |
| os.remove(file_path) | |
| temp_audio_files.remove(file_path) | |
| except: | |
| pass | |
| def get_system_prompt(): | |
| return """You are Mr. Mistral, a French tutor having a conversation with ONE student. | |
| CRITICAL: You are ONLY the tutor. The student will speak to you, and you respond ONLY to what they actually said. | |
| NEVER: | |
| - Create dialogue for the student | |
| - Imagine what the student might say | |
| - Write "You:" or "Student:" or any dialogue | |
| - Continue the conversation by yourself | |
| ALWAYS: | |
| - Wait for the student's actual input | |
| - Respond with ONE French sentence only | |
| - Use exactly 3 lines: | |
| French sentence | |
| (pronunciation) | |
| [translation] | |
| Example - if student says "Bonjour": | |
| Bonjour! Comment allez-vous? | |
| (bohn-ZHOOR! koh-mahn tah-lay VOO?) | |
| [Hello! How are you?] | |
| ONE sentence response only. NO additional dialogue.""" | |
| def validate_response_format(response: str) -> Tuple[bool, str]: | |
| lines = response.strip().split('\n') | |
| cleaned_lines = [] | |
| for line in lines: | |
| line = line.strip() | |
| if any(marker in line.lower() for marker in ['you:', 'user:', 'student:', 'me:', 'moi:']): | |
| continue | |
| if 'what do you' in line.lower() or "qu'est-ce que" in line.lower(): | |
| continue | |
| if line: | |
| cleaned_lines.append(line) | |
| french_line = None | |
| pronunciation_line = None | |
| translation_line = None | |
| for i, line in enumerate(cleaned_lines): | |
| if '(' in line and ')' in line and not pronunciation_line: | |
| pronunciation_line = line | |
| if i > 0 and not french_line: | |
| french_line = cleaned_lines[i-1] | |
| elif '[' in line and ']' in line and not translation_line: | |
| translation_line = line | |
| if not french_line: | |
| for line in cleaned_lines: | |
| if line and not any(c in line for c in ['(', ')', '[', ']', '*']): | |
| french_line = line | |
| break | |
| if french_line: | |
| if not pronunciation_line: | |
| pronunciation_line = "(pronunciation guide not available)" | |
| if not translation_line: | |
| translation_line = "[translation not available]" | |
| return True, f"{french_line}\n{pronunciation_line}\n{translation_line}" | |
| return False, response | |
| def generate_scenario(): | |
| """Generate initial scenario and hints""" | |
| try: | |
| # List of diverse topics | |
| topics = [ | |
| { | |
| "name": "Daily Routine", | |
| "phrases": [ | |
| "Je me rΓ©veille Γ ... (zhuh muh ray-vay ah) [I wake up at...]", | |
| "Je prends le petit dΓ©jeuner (zhuh prahn luh puh-tee day-zhuh-nay) [I have breakfast]", | |
| "Je travaille de... Γ ... (zhuh trah-vay duh... ah) [I work from... to...]", | |
| "Le soir, je... (luh swahr, zhuh) [In the evening, I...]" | |
| ], | |
| "opening": "Γ quelle heure vous levez-vous le matin?\n(ah kel uhr voo luh-vay voo luh mah-tahn?)\n[What time do you get up in the morning?]" | |
| }, | |
| { | |
| "name": "Favorite Foods", | |
| "phrases": [ | |
| "Mon plat prΓ©fΓ©rΓ© est... (mohn plah pray-fay-ray ay) [My favorite dish is...]", | |
| "J'adore... (zhah-dohr) [I love...]", | |
| "Je n'aime pas... (zhuh nehm pah) [I don't like...]", | |
| "C'est dΓ©licieux! (say day-lee-see-uh) [It's delicious!]" | |
| ], | |
| "opening": "Quel est votre plat prΓ©fΓ©rΓ©?\n(kel ay voh-truh plah pray-fay-ray?)\n[What is your favorite dish?]" | |
| }, | |
| { | |
| "name": "Work and Career", | |
| "phrases": [ | |
| "Je travaille comme... (zhuh trah-vay kohm) [I work as...]", | |
| "Mon bureau est... (mohn bew-roh ay) [My office is...]", | |
| "J'aime mon travail (zhehm mohn trah-vay) [I like my job]", | |
| "Mes collègues sont... (may koh-lehg sohn) [My colleagues are...]" | |
| ], | |
| "opening": "Qu'est-ce que vous faites comme travail?\n(kess-kuh voo feht kohm trah-vay?)\n[What do you do for work?]" | |
| }, | |
| { | |
| "name": "Music and Hobbies", | |
| "phrases": [ | |
| "J'Γ©coute... (zhay-koot) [I listen to...]", | |
| "Mon chanteur prΓ©fΓ©rΓ© est... (mohn shahn-tuhr pray-fay-ray ay) [My favorite singer is...]", | |
| "Je joue de... (zhuh zhoo duh) [I play (instrument)...]", | |
| "Dans mon temps libre... (dahn mohn tahn lee-bruh) [In my free time...]" | |
| ], | |
| "opening": "Quel type de musique aimez-vous?\n(kel teep duh mew-zeek ay-may voo?)\n[What type of music do you like?]" | |
| }, | |
| { | |
| "name": "Weekend Plans", | |
| "phrases": [ | |
| "Ce weekend, je vais... (suh wee-kehnd, zhuh vay) [This weekend, I'm going to...]", | |
| "J'aimerais... (zheh-muh-ray) [I would like to...]", | |
| "Avec mes amis... (ah-vek may zah-mee) [With my friends...]", | |
| "Γa sera amusant! (sah suh-rah ah-mew-zahn) [It will be fun!]" | |
| ], | |
| "opening": "Qu'est-ce que vous faites ce weekend?\n(kess-kuh voo feht suh wee-kehnd?)\n[What are you doing this weekend?]" | |
| }, | |
| { | |
| "name": "Family and Friends", | |
| "phrases": [ | |
| "Ma famille habite... (mah fah-mee ah-beet) [My family lives...]", | |
| "J'ai... frères/soeurs (zhay... frehr/suhr) [I have... brothers/sisters]", | |
| "Mon meilleur ami... (mohn may-yuhr ah-mee) [My best friend...]", | |
| "Nous aimons... ensemble (noo zeh-mohn... ahn-sahm-bluh) [We like to... together]" | |
| ], | |
| "opening": "Parlez-moi de votre famille!\n(pahr-lay mwah duh voh-truh fah-mee!)\n[Tell me about your family!]" | |
| }, | |
| { | |
| "name": "Weather and Seasons", | |
| "phrases": [ | |
| "Il fait beau/mauvais (eel feh boh/moh-veh) [The weather is nice/bad]", | |
| "J'aime l'Γ©tΓ©/l'hiver (zhehm lay-tay/lee-vehr) [I like summer/winter]", | |
| "Il pleut souvent (eel pluh soo-vahn) [It rains often]", | |
| "Ma saison prΓ©fΓ©rΓ©e est... (mah seh-zohn pray-fay-ray ay) [My favorite season is...]" | |
| ], | |
| "opening": "Quel temps fait-il aujourd'hui?\n(kel tahn feh-teel oh-zhoor-dwee?)\n[What's the weather like today?]" | |
| }, | |
| { | |
| "name": "Travel and Vacations", | |
| "phrases": [ | |
| "J'ai visitΓ©... (zhay vee-zee-tay) [I visited...]", | |
| "Je voudrais aller Γ ... (zhuh voo-dray ah-lay ah) [I would like to go to...]", | |
| "En vacances, je... (ahn vah-kahns, zhuh) [On vacation, I...]", | |
| "C'Γ©tait magnifique! (say-teh mahn-yee-feek) [It was magnificent!]" | |
| ], | |
| "opening": "OΓΉ aimez-vous voyager?\n(oo ay-may voo vwah-yah-zhay?)\n[Where do you like to travel?]" | |
| } | |
| ] | |
| # Select a random topic | |
| selected_topic = random.choice(topics) | |
| # Format the scenario directly without using LLM | |
| scenario = f"""**Topic: {selected_topic['name']}** | |
| **Helpful phrases:** | |
| - {selected_topic['phrases'][0]} | |
| - {selected_topic['phrases'][1]} | |
| - {selected_topic['phrases'][2]} | |
| - {selected_topic['phrases'][3]} | |
| {selected_topic['opening']}""" | |
| return scenario | |
| except Exception as e: | |
| return f"Error generating scenario: {str(e)}" | |
| def extract_french_for_tts(text: str) -> str: | |
| """Extract only the French text (first line without parentheses/brackets)""" | |
| lines = text.strip().split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| if line and '(' not in line and '[' not in line and '*' not in line and not line.startswith('**'): | |
| return line | |
| return "" | |
| def process_speech_to_text(audio_tuple) -> Tuple[str, bool]: | |
| """Convert audio to text using Groq Whisper""" | |
| if audio_tuple is None: | |
| return "No audio received", False | |
| try: | |
| sample_rate, audio_data = audio_tuple | |
| wav_buffer = io.BytesIO() | |
| sf.write(wav_buffer, audio_data, sample_rate, format='WAV') | |
| wav_buffer.seek(0) | |
| transcription = groq_client.audio.transcriptions.create( | |
| file=("audio.wav", wav_buffer), | |
| model="whisper-large-v3-turbo", | |
| language="fr" | |
| ) | |
| return transcription.text, True | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "401" in error_msg or "Invalid API Key" in error_msg: | |
| return "Error: Invalid Groq API key. Please check your GROQ_API_KEY.", False | |
| elif "quota" in error_msg.lower(): | |
| return "Error: Groq API quota exceeded. Please check your account.", False | |
| else: | |
| return f"Error in speech recognition: {error_msg}", False | |
| def generate_tutor_response(conversation_history: List[Dict], user_text: str) -> str: | |
| global current_llm | |
| # Try Mistral first | |
| if mistral_client: | |
| try: | |
| messages = [ | |
| {"role": "system", "content": get_system_prompt()} | |
| ] | |
| for msg in conversation_history: | |
| role = "user" if msg["role"] == "user" else "assistant" | |
| messages.append({"role": role, "content": msg["content"]}) | |
| messages.append({"role": "user", "content": user_text}) | |
| response = mistral_client.chat.complete( | |
| model="mistral-large-latest", | |
| messages=messages | |
| ) | |
| raw_response = response.choices[0].message.content | |
| current_llm = "Mistral AI" | |
| is_valid, cleaned_response = validate_response_format(raw_response) | |
| if not is_valid: | |
| french_text = extract_french_for_tts(raw_response) | |
| if french_text: | |
| cleaned_response = f"{french_text}\n(pronunciation not available)\n[translation not available]" | |
| return cleaned_response | |
| except Exception as e: | |
| print(f"Mistral error: {str(e)}, falling back to Gemini") | |
| if not gemini_api_key: | |
| return f"Error: Mistral failed and no Gemini fallback available: {str(e)}" | |
| # Fallback to Gemini | |
| if gemini_api_key: | |
| try: | |
| genai.configure(api_key=gemini_api_key) | |
| model = genai.GenerativeModel("models/gemini-1.5-flash-latest") | |
| messages = [ | |
| {"role": "user", "parts": [get_system_prompt()]} | |
| ] | |
| for msg in conversation_history: | |
| messages.append({"role": msg["role"], "parts": [msg["content"]]}) | |
| messages.append({"role": "user", "parts": [user_text]}) | |
| response = model.generate_content(messages) | |
| raw_response = response.text | |
| current_llm = "Google Gemini (Fallback)" | |
| is_valid, cleaned_response = validate_response_format(raw_response) | |
| if not is_valid: | |
| french_text = extract_french_for_tts(raw_response) | |
| if french_text: | |
| cleaned_response = f"{french_text}\n(pronunciation not available)\n[translation not available]" | |
| return cleaned_response | |
| except Exception as e: | |
| return f"Error: Both Mistral and Gemini failed: {str(e)}" | |
| return "Error: No LLM available" | |
| def text_to_speech(text: str) -> str: | |
| global temp_audio_files | |
| try: | |
| french_text = extract_french_for_tts(text) | |
| if not french_text: | |
| return None | |
| # Use Groq TTS | |
| tts_response = groq_client.audio.speech.create( | |
| model="tts-1", # or "tts-1-hd" for higher quality | |
| voice="alloy", # or another supported voice, e.g., "echo", "fable", "onyx", "nova" | |
| input=french_text | |
| ) | |
| temp_dir = tempfile.mkdtemp() | |
| temp_path = os.path.join(temp_dir, f"audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3") | |
| with open(temp_path, "wb") as f: | |
| f.write(tts_response.content) | |
| temp_audio_files.append(temp_path) | |
| cleanup_old_audio_files() | |
| return temp_path | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "401" in error_msg or "Invalid API Key" in error_msg: | |
| print(f"Groq TTS Error: Invalid API key, falling back to gTTS") | |
| else: | |
| print(f"Groq TTS Error: {error_msg}, falling back to gTTS") | |
| # Fallback to gTTS if Groq fails | |
| try: | |
| from gtts import gTTS | |
| tts = gTTS(text=french_text, lang='fr') | |
| temp_dir = tempfile.mkdtemp() | |
| temp_path = os.path.join(temp_dir, f"audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3") | |
| tts.save(temp_path) | |
| temp_audio_files.append(temp_path) | |
| cleanup_old_audio_files() | |
| return temp_path | |
| except Exception as e2: | |
| print(f"gTTS Fallback Error: {str(e2)}") | |
| return None | |
| def analyze_conversation(full_transcript: List[Dict]) -> str: | |
| global current_llm | |
| transcript_text = "\n".join([ | |
| f"{msg['role']}: {msg['content']}" for msg in full_transcript | |
| ]) | |
| analysis_prompt = """Analyze this French conversation and provide:\n1. Grammar corrections with specific examples\n2. Pronunciation tips for common mistakes\n3. Vocabulary suggestions to improve fluency\n4. Overall assessment with encouragement\n\nBe specific, constructive, and encouraging. Format clearly with sections.""" | |
| # Try Mistral first | |
| if mistral_client: | |
| try: | |
| messages = [ | |
| {"role": "system", "content": analysis_prompt}, | |
| {"role": "user", "content": f"Analyze this conversation:\n{transcript_text}"} | |
| ] | |
| response = mistral_client.chat.complete( | |
| model="mistral-large-latest", | |
| messages=messages | |
| ) | |
| current_llm = "Mistral AI" | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| print(f"Mistral error in analysis: {str(e)}, falling back to Gemini") | |
| # Fallback to Gemini | |
| if gemini_api_key: | |
| try: | |
| genai.configure(api_key=gemini_api_key) | |
| model = genai.GenerativeModel("models/gemini-1.5-flash-latest") | |
| messages = [ | |
| {"role": "user", "parts": [analysis_prompt]}, | |
| {"role": "user", "parts": [f"Analyze this conversation:\n{transcript_text}"]} | |
| ] | |
| response = model.generate_content(messages) | |
| current_llm = "Google Gemini (Fallback)" | |
| return response.text | |
| except Exception as e: | |
| return f"Error generating analysis: {str(e)}" | |
| return "Error: No LLM available for analysis" | |
| def create_app(): | |
| with gr.Blocks(title="French Tutor", theme=gr.themes.Soft()) as app: | |
| # State management | |
| conversation_state = gr.State([]) | |
| exchange_count = gr.State(0) | |
| full_transcript = gr.State([]) | |
| current_scenario = gr.State("") | |
| gr.Markdown("# π«π· French Conversation Tutor") | |
| gr.Markdown("Practice French through natural conversation! (3 exchanges per session)") | |
| # Model info banner | |
| with gr.Row(): | |
| model_info = gr.Markdown( | |
| f"**π€ Models:** LLM: {current_llm} | STT: Groq Whisper | TTS: gTTS", | |
| elem_id="model-info" | |
| ) | |
| # Main layout with two columns | |
| with gr.Row(): | |
| # Left sidebar (30% width) | |
| with gr.Column(scale=3): | |
| gr.Markdown("## π Control Panel") | |
| # Start/New Topic buttons | |
| start_btn = gr.Button("Start New Conversation", variant="primary", size="lg") | |
| new_topic_btn = gr.Button("π² Generate New Topic & Restart", variant="secondary", visible=False) | |
| # Topic display in sidebar | |
| with gr.Group(): | |
| gr.Markdown("### Current Topic") | |
| sidebar_scenario = gr.Markdown("Click 'Start' to begin", elem_id="sidebar-scenario") | |
| # Analysis section in sidebar | |
| with gr.Group(visible=False) as analysis_group: | |
| gr.Markdown("### π Your Analysis") | |
| analysis_box = gr.Markdown() | |
| restart_btn = gr.Button("π Start Another Conversation", variant="secondary", size="lg") | |
| # Status in sidebar | |
| status_text = gr.Textbox( | |
| label="System Status", | |
| value="Ready to start", | |
| interactive=False | |
| ) | |
| # Right main content (70% width) | |
| with gr.Column(scale=7): | |
| # Conversation interface | |
| with gr.Column(visible=False) as conversation_ui: | |
| gr.Markdown("## π¬ Conversation") | |
| # Chat display - always visible | |
| chat_display = gr.Markdown(value="", elem_id="chat-display") | |
| # Progress indicator | |
| progress_text = gr.Textbox( | |
| label="Progress", | |
| value="Ready to start", | |
| interactive=False | |
| ) | |
| # Audio interface | |
| with gr.Row(): | |
| audio_input = gr.Audio( | |
| sources=["microphone"], | |
| type="numpy", | |
| label="π€ Record your response in French" | |
| ) | |
| record_btn = gr.Button("Send Response", variant="primary") | |
| # Tutor's audio response | |
| audio_output = gr.Audio( | |
| label="π Tutor's Response", | |
| type="filepath", | |
| autoplay=True | |
| ) | |
| def reset_conversation_states(): | |
| """Helper to reset all conversation states""" | |
| return [], 0, [], "", gr.update(value=None) | |
| def start_conversation(scenario_text=None): | |
| """Initialize a new conversation""" | |
| # Reset global state | |
| global current_llm | |
| print("Starting new conversation...") | |
| # Generate scenario if not provided | |
| if scenario_text is None: | |
| scenario = generate_scenario() | |
| else: | |
| scenario = scenario_text | |
| # Extract the tutor's first message for audio | |
| audio_path = text_to_speech(scenario) | |
| if audio_path is None: | |
| audio_path = gr.update() # No change to audio output | |
| # Format the scenario for display | |
| scenario_display = scenario.strip() | |
| # Create fresh empty states | |
| new_conversation_state = [] | |
| new_full_transcript = [] | |
| new_exchange_count = 0 | |
| print(f"Reset states - Exchange count: {new_exchange_count}, History length: {len(new_conversation_state)}") | |
| return ( | |
| gr.update(visible=True), # conversation_ui | |
| scenario_display, # sidebar_scenario | |
| scenario, # current_scenario state | |
| "", # clear chat_display | |
| new_exchange_count, # reset exchange_count | |
| new_conversation_state, # reset conversation_state | |
| new_full_transcript, # reset full_transcript | |
| audio_path, # play initial audio | |
| "Ready to start - 3 exchanges to go", # progress | |
| gr.update(visible=False), # hide analysis_group | |
| gr.update(visible=False), # hide start_btn | |
| gr.update(visible=True), # show new_topic_btn | |
| gr.update(value=None), # clear audio input | |
| gr.update(interactive=True), # enable record button | |
| "Ready to start" # status text | |
| ) | |
| def generate_new_topic_and_start(): | |
| """Generate a new topic and start the conversation""" | |
| scenario = generate_scenario() | |
| # Return all the values that start_conversation returns | |
| result = start_conversation(scenario) | |
| # Update the progress text | |
| result_list = list(result) | |
| result_list[8] = "New topic generated! Ready to start - 3 exchanges to go" # Update progress text | |
| return tuple(result_list) | |
| def process_user_audio(audio, chat_text, exchanges, history, transcript, scenario): | |
| """Process user's audio input and generate response""" | |
| global current_llm | |
| print(f"Processing audio - Exchange count: {exchanges}, History length: {len(history) if history else 0}") | |
| # Ensure exchange count is an integer | |
| if exchanges is None: | |
| exchanges = 0 | |
| # Check if conversation is complete | |
| if exchanges >= 3: | |
| return ( | |
| chat_text, exchanges, history, transcript, | |
| "Conversation complete! Check your analysis in the sidebar.", | |
| f"Exchange {exchanges} of 3 - Complete!", | |
| gr.update(), gr.update(value=None), | |
| gr.update() # no change to model_info | |
| ) | |
| # Ensure states are properly initialized | |
| if history is None: | |
| history = [] | |
| if transcript is None: | |
| transcript = [] | |
| if chat_text is None: | |
| chat_text = "" | |
| # Check for audio | |
| if audio is None: | |
| return ( | |
| chat_text, exchanges, history, transcript, | |
| "Please record audio first", | |
| f"Exchange {exchanges} of 3", | |
| gr.update(), gr.update(value=None), | |
| gr.update() # no change to model_info | |
| ) | |
| # Transcribe user's speech | |
| user_text, success = process_speech_to_text(audio) | |
| if not success: | |
| return ( | |
| chat_text, exchanges, history, transcript, | |
| user_text, # Error message | |
| f"Exchange {exchanges} of 3", | |
| gr.update(), gr.update(value=None), | |
| gr.update() # no change to model_info | |
| ) | |
| # Update chat display with user's message | |
| if chat_text: | |
| chat_text += f"\n\n**You:** {user_text}" | |
| else: | |
| # First message - include scenario context | |
| chat_text = f"{scenario}\n\n---\n\n**You:** {user_text}" | |
| # Get tutor's response | |
| tutor_response = generate_tutor_response(history, user_text) | |
| # Generate audio for tutor's response | |
| audio_path = text_to_speech(tutor_response) | |
| if audio_path is None: | |
| audio_path = gr.update() # No change to audio output | |
| # Update chat display with tutor's response | |
| chat_text += f"\n\n**Mr. Mistral:**\n{tutor_response}" | |
| # Update conversation history (for context) | |
| history.append({"role": "user", "content": user_text}) | |
| history.append({"role": "assistant", "content": tutor_response}) | |
| # Update transcript (for analysis) | |
| transcript.extend([ | |
| {"role": "user", "content": user_text}, | |
| {"role": "assistant", "content": tutor_response} | |
| ]) | |
| # Increment exchange counter | |
| exchanges += 1 | |
| # Check if this was the last exchange | |
| if exchanges >= 3: | |
| progress_msg = "Exchange 3 of 3 - Complete! Analysis ready." | |
| else: | |
| progress_msg = f"Exchange {exchanges} of 3 - Keep going!" | |
| # Update model info | |
| model_info_text = f"**π€ Models:** LLM: {current_llm} | STT: Groq Whisper | TTS: gTTS" | |
| # Return updated state | |
| return ( | |
| chat_text, | |
| exchanges, | |
| history, | |
| transcript, | |
| f"Great! {progress_msg}", | |
| progress_msg, | |
| audio_path, | |
| gr.update(value=None), # Clear audio input properly | |
| gr.update(value=model_info_text) # Update model info | |
| ) | |
| def show_analysis_if_complete(exchanges, transcript): | |
| """Show analysis in sidebar if conversation is complete""" | |
| if exchanges >= 3: | |
| analysis = analyze_conversation(transcript) | |
| return ( | |
| gr.update(visible=True, value=analysis), # analysis_box with content | |
| gr.update(visible=True), # analysis_group | |
| gr.update(interactive=False), # disable record button | |
| gr.update(visible=False) # hide new topic button | |
| ) | |
| return ( | |
| gr.update(), # no change to analysis_box | |
| gr.update(), # no change to analysis_group | |
| gr.update(interactive=True), # keep record button enabled | |
| gr.update() # no change to new topic button | |
| ) | |
| # Initialize API on load | |
| def check_initialization(): | |
| status_msgs = [] | |
| if mistral_client: | |
| status_msgs.append("β Mistral AI ready") | |
| if gemini_api_key: | |
| status_msgs.append("β Gemini fallback ready") | |
| if groq_client: | |
| status_msgs.append("β Groq STT ready") | |
| status_msgs.append("β gTTS ready") | |
| if not status_msgs: | |
| return "β No APIs initialized!" | |
| return " | ".join(status_msgs) | |
| app.load( | |
| fn=check_initialization, | |
| outputs=status_text | |
| ) | |
| # Start conversation | |
| start_btn.click( | |
| fn=start_conversation, | |
| outputs=[ | |
| conversation_ui, sidebar_scenario, current_scenario, | |
| chat_display, exchange_count, conversation_state, | |
| full_transcript, audio_output, progress_text, | |
| analysis_group, start_btn, new_topic_btn, | |
| audio_input, record_btn, status_text | |
| ] | |
| ) | |
| # Generate new topic and start conversation | |
| new_topic_btn.click( | |
| fn=generate_new_topic_and_start, | |
| outputs=[ | |
| conversation_ui, sidebar_scenario, current_scenario, | |
| chat_display, exchange_count, conversation_state, | |
| full_transcript, audio_output, progress_text, | |
| analysis_group, start_btn, new_topic_btn, | |
| audio_input, record_btn, status_text | |
| ] | |
| ) | |
| # Process user audio | |
| record_btn.click( | |
| fn=process_user_audio, | |
| inputs=[ | |
| audio_input, chat_display, exchange_count, | |
| conversation_state, full_transcript, current_scenario | |
| ], | |
| outputs=[ | |
| chat_display, exchange_count, conversation_state, | |
| full_transcript, status_text, progress_text, | |
| audio_output, audio_input, model_info | |
| ], | |
| queue=False # Disable queueing to avoid state issues | |
| ).then( | |
| fn=show_analysis_if_complete, | |
| inputs=[exchange_count, full_transcript], | |
| outputs=[analysis_box, analysis_group, record_btn, new_topic_btn], | |
| queue=False # Disable queueing to avoid state issues | |
| ) | |
| # Restart conversation | |
| restart_btn.click( | |
| fn=start_conversation, | |
| outputs=[ | |
| conversation_ui, sidebar_scenario, current_scenario, | |
| chat_display, exchange_count, conversation_state, | |
| full_transcript, audio_output, progress_text, | |
| analysis_group, start_btn, new_topic_btn, | |
| audio_input, record_btn, status_text | |
| ] | |
| ) | |
| return app | |
| # Launch the app | |
| if __name__ == "__main__": | |
| try: | |
| app = create_app() | |
| app.launch() | |
| except Exception as e: | |
| print(f"Failed to start app: {e}") |