import os import io import json import asyncio import numpy as np import tempfile import gradio as gr from dotenv import load_dotenv import markdown from selectolax.parser import HTMLParser from loguru import logger from pathlib import Path import edge_tts import soundfile as sf import sys # Import LightRAG components from lightrag import LightRAG, QueryParam from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed # Configure loguru logger.remove() logger.add( "legal_assistant.log", rotation="10 MB", level="INFO", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}" ) logger.add(lambda msg: print(msg), level="INFO", format="{time:HH:mm:ss} | {level: <8} | {message}") # Load environment variables load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") if not OPENAI_API_KEY: logger.critical("OPENAI_API_KEY environment variable is required") raise ValueError("OPENAI_API_KEY environment variable is required") logger.info("Using Edge TTS for audio generation and Web Speech API for recognition") class LocalLegalRAG: def __init__(self, working_dir: str = "./laws_storage"): self.working_dir = Path(working_dir) self.working_dir.mkdir(exist_ok=True) logger.info(f"Initializing LegalRAG with working_dir: {working_dir}") try: self.rag = LightRAG( working_dir=str(self.working_dir), llm_model_func=gpt_4o_mini_complete, embedding_func=openai_embed, ) logger.success("LocalLegalRAG initialized successfully") except Exception as e: logger.error(f"Failed to initialize LightRAG: {e}") self.rag = None async def query(self, query: str, mode: str = "mix") -> str: """Query the local RAG system""" if not self.rag: return "RAG system not initialized properly." try: custom_prompt = f"""As an expert legal assistant specializing in Ghanaian law, please provide accurate, detailed responses with specific legal citations when available. Format your responses clearly with relevant legal provisions, interpretations, and practical implications. Question: {query}""" result = await self.rag.aquery( custom_prompt, param=QueryParam(mode=mode) ) return str(result) except Exception as e: logger.error(f"Query error: {e}") return f"Query failed: {str(e)}" # Initialize the local RAG system try: local_rag = LocalLegalRAG() logger.info("Local RAG system ready") except Exception as e: logger.error(f"Failed to initialize local RAG: {e}") local_rag = None # Text processing functions def format_response(text): """Format text for display in HTML format.""" try: if '|' in text and '-|' in text: html = markdown.markdown(text, extensions=['tables']) return html else: html = markdown.markdown(text) return html except Exception as e: logger.exception(f"Error formatting text: {e}") return text def clean_text_for_speech(text): """Clean text for speech synthesis.""" try: html = markdown.markdown(text) tree = HTMLParser(html) return tree.body.text(separator=" ", strip=True) if tree.body else text except Exception as e: logger.exception(f"Error cleaning text for speech: {e}") return text async def get_legal_response_local(query, mode="mix"): """Get response from local LightRAG.""" if not local_rag: return "Local RAG system not available. Please check the initialization." try: logger.debug(f"Fetching response from local RAG (mode: {mode})") answer = await local_rag.query(query, mode) logger.debug("Response from local RAG fetched successfully") return answer except Exception as e: logger.error(f"Error querying local RAG: {e}") return f"I apologize, but I couldn't retrieve information from the local knowledge base. Error: {str(e)}" async def text_to_speech_edge(text, voice="en-GB-SoniaNeural"): """Convert text to speech using Edge TTS and return filepath.""" try: logger.info("Converting response to speech with Edge TTS") clean_text = clean_text_for_speech(text) # Truncate text if too long (Edge TTS has limits) if len(clean_text) > 3000: clean_text = clean_text[:2997] + "..." # Generate audio with Edge TTS communicate = edge_tts.Communicate(clean_text, voice) # Create temporary file temp_dir = tempfile.gettempdir() audio_path = os.path.join(temp_dir, "response_audio.wav") # Save audio to file await communicate.save(audio_path) logger.debug(f"Speech synthesis completed, saved to {audio_path}") return audio_path except Exception as e: logger.exception(f"Error converting text to speech: {e}") return None def text_to_speech(text, voice="en-GB-SoniaNeural"): """Sync wrapper for text_to_speech_edge.""" try: loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(text_to_speech_edge(text, voice)) # Available Edge TTS voices EDGE_VOICES = { "British Female (Sonia)": "en-GB-SoniaNeural", "British Male (Ryan)": "en-GB-RyanNeural", "US Female (Aria)": "en-US-AriaNeural", "US Male (Guy)": "en-US-GuyNeural", "Nigerian Female (Ezinne)": "en-NG-EzinneNeural", "Nigerian Male (Abeo)": "en-NG-AbeoNeural", "South African Female (Leah)": "en-ZA-LeahNeural", "South African Male (Luke)": "en-ZA-LukeNeural" } def get_mode_value(mode_text): """Convert display name to mode code.""" mode_map = { "Mix (recommended)": "mix", "Local (specific entities)": "local", "Global (broad concepts)": "global", "Naive (simple search)": "naive" } return mode_map.get(mode_text, "mix") def update_transcription(transcribed_text, query_input): """Update the text input with transcribed speech.""" return transcribed_text def process_transcribed_query(query_text, mode_text, voice_selection, audio_enabled=True): """Process the transcribed text query.""" if not query_text.strip(): return "Please provide a question via speech or text.", None return process_query(query_text, mode_text, voice_selection, audio_enabled) def process_query(query, mode_text, voice_selection, audio_enabled=True): """Process query and return response.""" if not query.strip(): return "Please enter a query.", None if not local_rag: return "Local RAG system not available. Please check the configuration.", None mode = get_mode_value(mode_text) try: loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: logger.info(f"Processing query with {mode}: {query[:50]}...") response = loop.run_until_complete(get_legal_response_local(query, mode)) formatted_response = format_response(response) # Generate audio if enabled audio_data = None if audio_enabled: try: voice_code = EDGE_VOICES.get(voice_selection, "en-GB-SoniaNeural") audio_data = text_to_speech(response, voice_code) logger.info("Audio generated successfully") except Exception as e: logger.exception(f"Failed to generate audio: {e}") return formatted_response, audio_data except Exception as e: logger.exception(f"Error in process_query: {e}") return f"An error occurred: {str(e)}", None # Custom CSS with speech recognition styling custom_css = """ .mode-selector { margin-bottom: 20px; } .voice-selector { margin-bottom: 15px; } .speech-input { background: linear-gradient(90deg, #f0f8ff, #e6f3ff); border-radius: 8px; padding: 10px; } table { border-collapse: collapse; width: 100%; margin: 15px 0; } th, td { border: 1px solid #ddd; padding: 8px; text-align: left; } th { background-color: #f7f9fc; } """ # Create Gradio interface with gr.Blocks(title="Local Legal Assistant", theme=gr.themes.Soft(), css=custom_css) as demo: gr.Markdown("# Ese - Ghana's AI Legal Assistant") gr.Markdown("Ask questions about Ghanaian laws using voice or text with your local knowledge base.") with gr.Row(): with gr.Column(scale=3): # Query mode selector mode_selector = gr.Radio( label="Select Query Mode", choices=[ "Mix (recommended)", "Local (specific entities)", "Global (broad concepts)", "Naive (simple search)" ], value="Mix (recommended)", container=True, elem_classes="mode-selector" ) # Speech input with Web Speech API with gr.Group(elem_classes="speech-input"): gr.Markdown("### 🎤 Voice Input") # Add Gradio's built-in audio input as fallback audio_input = gr.Audio( label="Record question (Safari/Mac users)", sources=["microphone"], type="filepath" ) speech_input = gr.Textbox( label="Or use Web Speech (Chrome/Edge)", placeholder="Click the microphone and speak...", lines=2, interactive=True ) with gr.Row(): speech_btn = gr.Button("🎤 Start Speaking", variant="secondary") transcribe_btn = gr.Button("🎧 Transcribe Audio", variant="secondary") # Text input query_input = gr.Textbox( label="Or type your legal question", placeholder="Enter your legal question here...", lines=3 ) with gr.Row(): submit_btn = gr.Button("Submit", variant="primary") clear_btn = gr.Button("Clear") # Audio settings audio_toggle = gr.Checkbox( label="Enable speech output", value=True ) voice_selector = gr.Dropdown( label="Select Voice", choices=list(EDGE_VOICES.keys()), value="British Female (Sonia)", visible=True, elem_classes="voice-selector" ) with gr.Column(scale=4): response_output = gr.HTML(label="Response") audio_output = gr.Audio( label="Audio Response", type="filepath" ) # Add JavaScript for Web Speech API demo.load( None, None, None, js=""" function() { setTimeout(() => { // Find elements by looking for button text const buttons = Array.from(document.querySelectorAll('button')); const speechBtn = buttons.find(btn => btn.textContent.includes('Start Speaking')); const speechInput = document.querySelector('textarea[placeholder*="microphone"]'); if (!speechBtn || !speechInput) { console.log('Speech elements not found'); return; } if ('webkitSpeechRecognition' in window || 'SpeechRecognition' in window) { const SpeechRecognition = window.SpeechRecognition || window.webkitSpeechRecognition; const recognition = new SpeechRecognition(); recognition.continuous = false; recognition.interimResults = false; recognition.lang = 'en-US'; let isListening = false; recognition.onstart = () => { isListening = true; speechBtn.textContent = '🎤 Listening...'; speechBtn.style.backgroundColor = '#ff4444'; speechBtn.style.color = 'white'; }; recognition.onresult = (event) => { const transcript = event.results[0][0].transcript; speechInput.value = transcript; speechInput.dispatchEvent(new Event('input', { bubbles: true })); speechInput.dispatchEvent(new Event('change', { bubbles: true })); }; recognition.onend = () => { isListening = false; speechBtn.textContent = '🎤 Start Speaking'; speechBtn.style.backgroundColor = ''; speechBtn.style.color = ''; }; recognition.onerror = (event) => { console.error('Speech error:', event.error); isListening = false; speechBtn.textContent = '🎤 Error - Try Again'; speechBtn.style.backgroundColor = ''; speechBtn.style.color = ''; }; speechBtn.addEventListener('click', (e) => { e.preventDefault(); e.stopPropagation(); if (!isListening) { try { recognition.start(); } catch (err) { console.error('Recognition start error:', err); if (err.name === 'NotAllowedError') { speechBtn.textContent = '🎤 Permission Denied'; alert('Please allow microphone access in browser settings and refresh'); } } } }); } else { speechBtn.textContent = '🎤 Not Supported'; speechBtn.disabled = true; } }, 2000); } """ ) # Event handlers def handle_submit_speech(speech_text, query_text, mode, voice, audio_enabled): # Use speech input if available, otherwise use typed input final_query = speech_text.strip() if speech_text.strip() else query_text.strip() return process_transcribed_query(final_query, mode, voice, audio_enabled) def handle_clear(): return "", "", None # Submit button handles both speech and text input submit_btn.click( fn=handle_submit_speech, inputs=[speech_input, query_input, mode_selector, voice_selector, audio_toggle], outputs=[response_output, audio_output], queue=False ) clear_btn.click( fn=handle_clear, inputs=[], outputs=[speech_input, query_input, audio_output], queue=False ) gr.Markdown("### How to use") gr.Markdown(f""" **Voice Input:** Click 🎤 Start Speaking, ask your question, then Submit **Text Input:** Type directly in the text box **Browser Speech:** Uses your browser's built-in speech recognition (Chrome/Edge recommended) **Knowledge Base:** `{local_rag.working_dir if local_rag else 'Not available'}` """) # Launch the app if __name__ == "__main__": logger.info("Starting Ese - Ghana's Legal Assistant with Web Speech API") demo.launch( server_name="0.0.0.0", server_port=7860, show_error=True, ssr_mode=False, share=True )