| import os |
| import io |
| import json |
| import asyncio |
| import numpy as np |
| import tempfile |
| import gradio as gr |
| from dotenv import load_dotenv |
| import markdown |
| from selectolax.parser import HTMLParser |
| from loguru import logger |
| from pathlib import Path |
| import edge_tts |
| import soundfile as sf |
| import sys |
| |
| from lightrag import LightRAG, QueryParam |
| from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed |
|
|
| |
| logger.remove() |
| logger.add( |
| "legal_assistant.log", |
| rotation="10 MB", |
| level="INFO", |
| format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}" |
| ) |
| logger.add(lambda msg: print(msg), level="INFO", format="{time:HH:mm:ss} | <level>{level: <8}</level> | <level>{message}</level>") |
|
|
| |
| load_dotenv() |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
|
| if not OPENAI_API_KEY: |
| logger.critical("OPENAI_API_KEY environment variable is required") |
| raise ValueError("OPENAI_API_KEY environment variable is required") |
|
|
| logger.info("Using Edge TTS for audio generation and Web Speech API for recognition") |
|
|
| class LocalLegalRAG: |
| def __init__(self, working_dir: str = "./laws_storage"): |
| self.working_dir = Path(working_dir) |
| self.working_dir.mkdir(exist_ok=True) |
| |
| logger.info(f"Initializing LegalRAG with working_dir: {working_dir}") |
| try: |
| self.rag = LightRAG( |
| working_dir=str(self.working_dir), |
| llm_model_func=gpt_4o_mini_complete, |
| embedding_func=openai_embed, |
| ) |
| logger.success("LocalLegalRAG initialized successfully") |
| except Exception as e: |
| logger.error(f"Failed to initialize LightRAG: {e}") |
| self.rag = None |
| |
| async def query(self, query: str, mode: str = "mix") -> str: |
| """Query the local RAG system""" |
| if not self.rag: |
| return "RAG system not initialized properly." |
| |
| try: |
| custom_prompt = f"""As an expert legal assistant specializing in Ghanaian law, please provide accurate, detailed responses with specific legal citations when available. Format your responses clearly with relevant legal provisions, interpretations, and practical implications. |
| |
| Question: {query}""" |
| |
| result = await self.rag.aquery( |
| custom_prompt, |
| param=QueryParam(mode=mode) |
| ) |
| return str(result) |
| except Exception as e: |
| logger.error(f"Query error: {e}") |
| return f"Query failed: {str(e)}" |
|
|
| |
| try: |
| local_rag = LocalLegalRAG() |
| logger.info("Local RAG system ready") |
| except Exception as e: |
| logger.error(f"Failed to initialize local RAG: {e}") |
| local_rag = None |
|
|
| |
| def format_response(text): |
| """Format text for display in HTML format.""" |
| try: |
| if '|' in text and '-|' in text: |
| html = markdown.markdown(text, extensions=['tables']) |
| return html |
| else: |
| html = markdown.markdown(text) |
| return html |
| except Exception as e: |
| logger.exception(f"Error formatting text: {e}") |
| return text |
|
|
| def clean_text_for_speech(text): |
| """Clean text for speech synthesis.""" |
| try: |
| html = markdown.markdown(text) |
| tree = HTMLParser(html) |
| return tree.body.text(separator=" ", strip=True) if tree.body else text |
| except Exception as e: |
| logger.exception(f"Error cleaning text for speech: {e}") |
| return text |
|
|
| async def get_legal_response_local(query, mode="mix"): |
| """Get response from local LightRAG.""" |
| if not local_rag: |
| return "Local RAG system not available. Please check the initialization." |
| |
| try: |
| logger.debug(f"Fetching response from local RAG (mode: {mode})") |
| answer = await local_rag.query(query, mode) |
| logger.debug("Response from local RAG fetched successfully") |
| return answer |
| except Exception as e: |
| logger.error(f"Error querying local RAG: {e}") |
| return f"I apologize, but I couldn't retrieve information from the local knowledge base. Error: {str(e)}" |
|
|
| async def text_to_speech_edge(text, voice="en-GB-SoniaNeural"): |
| """Convert text to speech using Edge TTS and return filepath.""" |
| try: |
| logger.info("Converting response to speech with Edge TTS") |
| clean_text = clean_text_for_speech(text) |
| |
| |
| if len(clean_text) > 3000: |
| clean_text = clean_text[:2997] + "..." |
| |
| |
| communicate = edge_tts.Communicate(clean_text, voice) |
| |
| |
| temp_dir = tempfile.gettempdir() |
| audio_path = os.path.join(temp_dir, "response_audio.wav") |
| |
| |
| await communicate.save(audio_path) |
| |
| logger.debug(f"Speech synthesis completed, saved to {audio_path}") |
| return audio_path |
| |
| except Exception as e: |
| logger.exception(f"Error converting text to speech: {e}") |
| return None |
|
|
| def text_to_speech(text, voice="en-GB-SoniaNeural"): |
| """Sync wrapper for text_to_speech_edge.""" |
| try: |
| loop = asyncio.get_event_loop() |
| if loop.is_closed(): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| except RuntimeError: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| |
| return loop.run_until_complete(text_to_speech_edge(text, voice)) |
|
|
| |
| EDGE_VOICES = { |
| "British Female (Sonia)": "en-GB-SoniaNeural", |
| "British Male (Ryan)": "en-GB-RyanNeural", |
| "US Female (Aria)": "en-US-AriaNeural", |
| "US Male (Guy)": "en-US-GuyNeural", |
| "Nigerian Female (Ezinne)": "en-NG-EzinneNeural", |
| "Nigerian Male (Abeo)": "en-NG-AbeoNeural", |
| "South African Female (Leah)": "en-ZA-LeahNeural", |
| "South African Male (Luke)": "en-ZA-LukeNeural" |
| } |
|
|
| def get_mode_value(mode_text): |
| """Convert display name to mode code.""" |
| mode_map = { |
| "Mix (recommended)": "mix", |
| "Local (specific entities)": "local", |
| "Global (broad concepts)": "global", |
| "Naive (simple search)": "naive" |
| } |
| return mode_map.get(mode_text, "mix") |
|
|
| def update_transcription(transcribed_text, query_input): |
| """Update the text input with transcribed speech.""" |
| return transcribed_text |
|
|
| def process_transcribed_query(query_text, mode_text, voice_selection, audio_enabled=True): |
| """Process the transcribed text query.""" |
| if not query_text.strip(): |
| return "Please provide a question via speech or text.", None |
| |
| return process_query(query_text, mode_text, voice_selection, audio_enabled) |
|
|
| def process_query(query, mode_text, voice_selection, audio_enabled=True): |
| """Process query and return response.""" |
| if not query.strip(): |
| return "Please enter a query.", None |
| |
| if not local_rag: |
| return "Local RAG system not available. Please check the configuration.", None |
| |
| mode = get_mode_value(mode_text) |
| |
| try: |
| loop = asyncio.get_event_loop() |
| if loop.is_closed(): |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| except RuntimeError: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| |
| try: |
| logger.info(f"Processing query with {mode}: {query[:50]}...") |
| response = loop.run_until_complete(get_legal_response_local(query, mode)) |
| formatted_response = format_response(response) |
| |
| |
| audio_data = None |
| if audio_enabled: |
| try: |
| voice_code = EDGE_VOICES.get(voice_selection, "en-GB-SoniaNeural") |
| audio_data = text_to_speech(response, voice_code) |
| logger.info("Audio generated successfully") |
| except Exception as e: |
| logger.exception(f"Failed to generate audio: {e}") |
| |
| return formatted_response, audio_data |
| except Exception as e: |
| logger.exception(f"Error in process_query: {e}") |
| return f"An error occurred: {str(e)}", None |
|
|
| |
| custom_css = """ |
| .mode-selector { |
| margin-bottom: 20px; |
| } |
| .voice-selector { |
| margin-bottom: 15px; |
| } |
| .speech-input { |
| background: linear-gradient(90deg, #f0f8ff, #e6f3ff); |
| border-radius: 8px; |
| padding: 10px; |
| } |
| table { |
| border-collapse: collapse; |
| width: 100%; |
| margin: 15px 0; |
| } |
| th, td { |
| border: 1px solid #ddd; |
| padding: 8px; |
| text-align: left; |
| } |
| th { |
| background-color: #f7f9fc; |
| } |
| """ |
|
|
| |
| with gr.Blocks(title="Local Legal Assistant", theme=gr.themes.Soft(), css=custom_css) as demo: |
| gr.Markdown("# Ese - Ghana's AI Legal Assistant") |
| gr.Markdown("Ask questions about Ghanaian laws using voice or text with your local knowledge base.") |
| |
| with gr.Row(): |
| with gr.Column(scale=3): |
| |
| mode_selector = gr.Radio( |
| label="Select Query Mode", |
| choices=[ |
| "Mix (recommended)", |
| "Local (specific entities)", |
| "Global (broad concepts)", |
| "Naive (simple search)" |
| ], |
| value="Mix (recommended)", |
| container=True, |
| elem_classes="mode-selector" |
| ) |
| |
| |
| with gr.Group(elem_classes="speech-input"): |
| gr.Markdown("### 🎤 Voice Input") |
| |
| |
| audio_input = gr.Audio( |
| label="Record question (Safari/Mac users)", |
| sources=["microphone"], |
| type="filepath" |
| ) |
| |
| speech_input = gr.Textbox( |
| label="Or use Web Speech (Chrome/Edge)", |
| placeholder="Click the microphone and speak...", |
| lines=2, |
| interactive=True |
| ) |
| |
| with gr.Row(): |
| speech_btn = gr.Button("🎤 Start Speaking", variant="secondary") |
| transcribe_btn = gr.Button("🎧 Transcribe Audio", variant="secondary") |
| |
| |
| query_input = gr.Textbox( |
| label="Or type your legal question", |
| placeholder="Enter your legal question here...", |
| lines=3 |
| ) |
| |
| with gr.Row(): |
| submit_btn = gr.Button("Submit", variant="primary") |
| clear_btn = gr.Button("Clear") |
| |
| |
| audio_toggle = gr.Checkbox( |
| label="Enable speech output", |
| value=True |
| ) |
| |
| voice_selector = gr.Dropdown( |
| label="Select Voice", |
| choices=list(EDGE_VOICES.keys()), |
| value="British Female (Sonia)", |
| visible=True, |
| elem_classes="voice-selector" |
| ) |
| |
| with gr.Column(scale=4): |
| response_output = gr.HTML(label="Response") |
| audio_output = gr.Audio( |
| label="Audio Response", |
| type="filepath" |
| ) |
| |
| |
| demo.load( |
| None, |
| None, |
| None, |
| js=""" |
| function() { |
| setTimeout(() => { |
| // Find elements by looking for button text |
| const buttons = Array.from(document.querySelectorAll('button')); |
| const speechBtn = buttons.find(btn => btn.textContent.includes('Start Speaking')); |
| const speechInput = document.querySelector('textarea[placeholder*="microphone"]'); |
| |
| if (!speechBtn || !speechInput) { |
| console.log('Speech elements not found'); |
| return; |
| } |
| |
| if ('webkitSpeechRecognition' in window || 'SpeechRecognition' in window) { |
| const SpeechRecognition = window.SpeechRecognition || window.webkitSpeechRecognition; |
| const recognition = new SpeechRecognition(); |
| |
| recognition.continuous = false; |
| recognition.interimResults = false; |
| recognition.lang = 'en-US'; |
| |
| let isListening = false; |
| |
| recognition.onstart = () => { |
| isListening = true; |
| speechBtn.textContent = '🎤 Listening...'; |
| speechBtn.style.backgroundColor = '#ff4444'; |
| speechBtn.style.color = 'white'; |
| }; |
| |
| recognition.onresult = (event) => { |
| const transcript = event.results[0][0].transcript; |
| speechInput.value = transcript; |
| speechInput.dispatchEvent(new Event('input', { bubbles: true })); |
| speechInput.dispatchEvent(new Event('change', { bubbles: true })); |
| }; |
| |
| recognition.onend = () => { |
| isListening = false; |
| speechBtn.textContent = '🎤 Start Speaking'; |
| speechBtn.style.backgroundColor = ''; |
| speechBtn.style.color = ''; |
| }; |
| |
| recognition.onerror = (event) => { |
| console.error('Speech error:', event.error); |
| isListening = false; |
| speechBtn.textContent = '🎤 Error - Try Again'; |
| speechBtn.style.backgroundColor = ''; |
| speechBtn.style.color = ''; |
| }; |
| |
| speechBtn.addEventListener('click', (e) => { |
| e.preventDefault(); |
| e.stopPropagation(); |
| if (!isListening) { |
| try { |
| recognition.start(); |
| } catch (err) { |
| console.error('Recognition start error:', err); |
| if (err.name === 'NotAllowedError') { |
| speechBtn.textContent = '🎤 Permission Denied'; |
| alert('Please allow microphone access in browser settings and refresh'); |
| } |
| } |
| } |
| }); |
| } else { |
| speechBtn.textContent = '🎤 Not Supported'; |
| speechBtn.disabled = true; |
| } |
| }, 2000); |
| } |
| """ |
| ) |
| |
| |
| def handle_submit_speech(speech_text, query_text, mode, voice, audio_enabled): |
| |
| final_query = speech_text.strip() if speech_text.strip() else query_text.strip() |
| return process_transcribed_query(final_query, mode, voice, audio_enabled) |
| |
| def handle_clear(): |
| return "", "", None |
| |
| |
| submit_btn.click( |
| fn=handle_submit_speech, |
| inputs=[speech_input, query_input, mode_selector, voice_selector, audio_toggle], |
| outputs=[response_output, audio_output], |
| queue=False |
| ) |
| |
| clear_btn.click( |
| fn=handle_clear, |
| inputs=[], |
| outputs=[speech_input, query_input, audio_output], |
| queue=False |
| ) |
| |
| gr.Markdown("### How to use") |
| gr.Markdown(f""" |
| **Voice Input:** Click 🎤 Start Speaking, ask your question, then Submit |
| **Text Input:** Type directly in the text box |
| **Browser Speech:** Uses your browser's built-in speech recognition (Chrome/Edge recommended) |
| |
| **Knowledge Base:** `{local_rag.working_dir if local_rag else 'Not available'}` |
| """) |
|
|
| |
| if __name__ == "__main__": |
| logger.info("Starting Ese - Ghana's Legal Assistant with Web Speech API") |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| show_error=True, |
| ssr_mode=False, |
| share=True |
| ) |