import os
import io
import json
import asyncio
import numpy as np
import tempfile
import gradio as gr
from dotenv import load_dotenv
import markdown
from selectolax.parser import HTMLParser
from loguru import logger
from pathlib import Path
import edge_tts
import soundfile as sf
import sys
# Import LightRAG components
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed
# Configure loguru
logger.remove()
logger.add(
"legal_assistant.log",
rotation="10 MB",
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}"
)
logger.add(lambda msg: print(msg), level="INFO", format="{time:HH:mm:ss} | {level: <8} | {message}")
# Load environment variables
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
logger.critical("OPENAI_API_KEY environment variable is required")
raise ValueError("OPENAI_API_KEY environment variable is required")
logger.info("Using Edge TTS for audio generation and Web Speech API for recognition")
class LocalLegalRAG:
def __init__(self, working_dir: str = "./laws_storage"):
self.working_dir = Path(working_dir)
self.working_dir.mkdir(exist_ok=True)
logger.info(f"Initializing LegalRAG with working_dir: {working_dir}")
try:
self.rag = LightRAG(
working_dir=str(self.working_dir),
llm_model_func=gpt_4o_mini_complete,
embedding_func=openai_embed,
)
logger.success("LocalLegalRAG initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize LightRAG: {e}")
self.rag = None
async def query(self, query: str, mode: str = "mix") -> str:
"""Query the local RAG system"""
if not self.rag:
return "RAG system not initialized properly."
try:
custom_prompt = f"""As an expert legal assistant specializing in Ghanaian law, please provide accurate, detailed responses with specific legal citations when available. Format your responses clearly with relevant legal provisions, interpretations, and practical implications.
Question: {query}"""
result = await self.rag.aquery(
custom_prompt,
param=QueryParam(mode=mode)
)
return str(result)
except Exception as e:
logger.error(f"Query error: {e}")
return f"Query failed: {str(e)}"
# Initialize the local RAG system
try:
local_rag = LocalLegalRAG()
logger.info("Local RAG system ready")
except Exception as e:
logger.error(f"Failed to initialize local RAG: {e}")
local_rag = None
# Text processing functions
def format_response(text):
"""Format text for display in HTML format."""
try:
if '|' in text and '-|' in text:
html = markdown.markdown(text, extensions=['tables'])
return html
else:
html = markdown.markdown(text)
return html
except Exception as e:
logger.exception(f"Error formatting text: {e}")
return text
def clean_text_for_speech(text):
"""Clean text for speech synthesis."""
try:
html = markdown.markdown(text)
tree = HTMLParser(html)
return tree.body.text(separator=" ", strip=True) if tree.body else text
except Exception as e:
logger.exception(f"Error cleaning text for speech: {e}")
return text
async def get_legal_response_local(query, mode="mix"):
"""Get response from local LightRAG."""
if not local_rag:
return "Local RAG system not available. Please check the initialization."
try:
logger.debug(f"Fetching response from local RAG (mode: {mode})")
answer = await local_rag.query(query, mode)
logger.debug("Response from local RAG fetched successfully")
return answer
except Exception as e:
logger.error(f"Error querying local RAG: {e}")
return f"I apologize, but I couldn't retrieve information from the local knowledge base. Error: {str(e)}"
async def text_to_speech_edge(text, voice="en-GB-SoniaNeural"):
"""Convert text to speech using Edge TTS and return filepath."""
try:
logger.info("Converting response to speech with Edge TTS")
clean_text = clean_text_for_speech(text)
# Truncate text if too long (Edge TTS has limits)
if len(clean_text) > 3000:
clean_text = clean_text[:2997] + "..."
# Generate audio with Edge TTS
communicate = edge_tts.Communicate(clean_text, voice)
# Create temporary file
temp_dir = tempfile.gettempdir()
audio_path = os.path.join(temp_dir, "response_audio.wav")
# Save audio to file
await communicate.save(audio_path)
logger.debug(f"Speech synthesis completed, saved to {audio_path}")
return audio_path
except Exception as e:
logger.exception(f"Error converting text to speech: {e}")
return None
def text_to_speech(text, voice="en-GB-SoniaNeural"):
"""Sync wrapper for text_to_speech_edge."""
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(text_to_speech_edge(text, voice))
# Available Edge TTS voices
EDGE_VOICES = {
"British Female (Sonia)": "en-GB-SoniaNeural",
"British Male (Ryan)": "en-GB-RyanNeural",
"US Female (Aria)": "en-US-AriaNeural",
"US Male (Guy)": "en-US-GuyNeural",
"Nigerian Female (Ezinne)": "en-NG-EzinneNeural",
"Nigerian Male (Abeo)": "en-NG-AbeoNeural",
"South African Female (Leah)": "en-ZA-LeahNeural",
"South African Male (Luke)": "en-ZA-LukeNeural"
}
def get_mode_value(mode_text):
"""Convert display name to mode code."""
mode_map = {
"Mix (recommended)": "mix",
"Local (specific entities)": "local",
"Global (broad concepts)": "global",
"Naive (simple search)": "naive"
}
return mode_map.get(mode_text, "mix")
def update_transcription(transcribed_text, query_input):
"""Update the text input with transcribed speech."""
return transcribed_text
def process_transcribed_query(query_text, mode_text, voice_selection, audio_enabled=True):
"""Process the transcribed text query."""
if not query_text.strip():
return "Please provide a question via speech or text.", None
return process_query(query_text, mode_text, voice_selection, audio_enabled)
def process_query(query, mode_text, voice_selection, audio_enabled=True):
"""Process query and return response."""
if not query.strip():
return "Please enter a query.", None
if not local_rag:
return "Local RAG system not available. Please check the configuration.", None
mode = get_mode_value(mode_text)
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
logger.info(f"Processing query with {mode}: {query[:50]}...")
response = loop.run_until_complete(get_legal_response_local(query, mode))
formatted_response = format_response(response)
# Generate audio if enabled
audio_data = None
if audio_enabled:
try:
voice_code = EDGE_VOICES.get(voice_selection, "en-GB-SoniaNeural")
audio_data = text_to_speech(response, voice_code)
logger.info("Audio generated successfully")
except Exception as e:
logger.exception(f"Failed to generate audio: {e}")
return formatted_response, audio_data
except Exception as e:
logger.exception(f"Error in process_query: {e}")
return f"An error occurred: {str(e)}", None
# Custom CSS with speech recognition styling
custom_css = """
.mode-selector {
margin-bottom: 20px;
}
.voice-selector {
margin-bottom: 15px;
}
.speech-input {
background: linear-gradient(90deg, #f0f8ff, #e6f3ff);
border-radius: 8px;
padding: 10px;
}
table {
border-collapse: collapse;
width: 100%;
margin: 15px 0;
}
th, td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
th {
background-color: #f7f9fc;
}
"""
# Create Gradio interface
with gr.Blocks(title="Local Legal Assistant", theme=gr.themes.Soft(), css=custom_css) as demo:
gr.Markdown("# Ese - Ghana's AI Legal Assistant")
gr.Markdown("Ask questions about Ghanaian laws using voice or text with your local knowledge base.")
with gr.Row():
with gr.Column(scale=3):
# Query mode selector
mode_selector = gr.Radio(
label="Select Query Mode",
choices=[
"Mix (recommended)",
"Local (specific entities)",
"Global (broad concepts)",
"Naive (simple search)"
],
value="Mix (recommended)",
container=True,
elem_classes="mode-selector"
)
# Speech input with Web Speech API
with gr.Group(elem_classes="speech-input"):
gr.Markdown("### 🎤 Voice Input")
# Add Gradio's built-in audio input as fallback
audio_input = gr.Audio(
label="Record question (Safari/Mac users)",
sources=["microphone"],
type="filepath"
)
speech_input = gr.Textbox(
label="Or use Web Speech (Chrome/Edge)",
placeholder="Click the microphone and speak...",
lines=2,
interactive=True
)
with gr.Row():
speech_btn = gr.Button("🎤 Start Speaking", variant="secondary")
transcribe_btn = gr.Button("🎧 Transcribe Audio", variant="secondary")
# Text input
query_input = gr.Textbox(
label="Or type your legal question",
placeholder="Enter your legal question here...",
lines=3
)
with gr.Row():
submit_btn = gr.Button("Submit", variant="primary")
clear_btn = gr.Button("Clear")
# Audio settings
audio_toggle = gr.Checkbox(
label="Enable speech output",
value=True
)
voice_selector = gr.Dropdown(
label="Select Voice",
choices=list(EDGE_VOICES.keys()),
value="British Female (Sonia)",
visible=True,
elem_classes="voice-selector"
)
with gr.Column(scale=4):
response_output = gr.HTML(label="Response")
audio_output = gr.Audio(
label="Audio Response",
type="filepath"
)
# Add JavaScript for Web Speech API
demo.load(
None,
None,
None,
js="""
function() {
setTimeout(() => {
// Find elements by looking for button text
const buttons = Array.from(document.querySelectorAll('button'));
const speechBtn = buttons.find(btn => btn.textContent.includes('Start Speaking'));
const speechInput = document.querySelector('textarea[placeholder*="microphone"]');
if (!speechBtn || !speechInput) {
console.log('Speech elements not found');
return;
}
if ('webkitSpeechRecognition' in window || 'SpeechRecognition' in window) {
const SpeechRecognition = window.SpeechRecognition || window.webkitSpeechRecognition;
const recognition = new SpeechRecognition();
recognition.continuous = false;
recognition.interimResults = false;
recognition.lang = 'en-US';
let isListening = false;
recognition.onstart = () => {
isListening = true;
speechBtn.textContent = '🎤 Listening...';
speechBtn.style.backgroundColor = '#ff4444';
speechBtn.style.color = 'white';
};
recognition.onresult = (event) => {
const transcript = event.results[0][0].transcript;
speechInput.value = transcript;
speechInput.dispatchEvent(new Event('input', { bubbles: true }));
speechInput.dispatchEvent(new Event('change', { bubbles: true }));
};
recognition.onend = () => {
isListening = false;
speechBtn.textContent = '🎤 Start Speaking';
speechBtn.style.backgroundColor = '';
speechBtn.style.color = '';
};
recognition.onerror = (event) => {
console.error('Speech error:', event.error);
isListening = false;
speechBtn.textContent = '🎤 Error - Try Again';
speechBtn.style.backgroundColor = '';
speechBtn.style.color = '';
};
speechBtn.addEventListener('click', (e) => {
e.preventDefault();
e.stopPropagation();
if (!isListening) {
try {
recognition.start();
} catch (err) {
console.error('Recognition start error:', err);
if (err.name === 'NotAllowedError') {
speechBtn.textContent = '🎤 Permission Denied';
alert('Please allow microphone access in browser settings and refresh');
}
}
}
});
} else {
speechBtn.textContent = '🎤 Not Supported';
speechBtn.disabled = true;
}
}, 2000);
}
"""
)
# Event handlers
def handle_submit_speech(speech_text, query_text, mode, voice, audio_enabled):
# Use speech input if available, otherwise use typed input
final_query = speech_text.strip() if speech_text.strip() else query_text.strip()
return process_transcribed_query(final_query, mode, voice, audio_enabled)
def handle_clear():
return "", "", None
# Submit button handles both speech and text input
submit_btn.click(
fn=handle_submit_speech,
inputs=[speech_input, query_input, mode_selector, voice_selector, audio_toggle],
outputs=[response_output, audio_output],
queue=False
)
clear_btn.click(
fn=handle_clear,
inputs=[],
outputs=[speech_input, query_input, audio_output],
queue=False
)
gr.Markdown("### How to use")
gr.Markdown(f"""
**Voice Input:** Click 🎤 Start Speaking, ask your question, then Submit
**Text Input:** Type directly in the text box
**Browser Speech:** Uses your browser's built-in speech recognition (Chrome/Edge recommended)
**Knowledge Base:** `{local_rag.working_dir if local_rag else 'Not available'}`
""")
# Launch the app
if __name__ == "__main__":
logger.info("Starting Ese - Ghana's Legal Assistant with Web Speech API")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
ssr_mode=False,
share=True
)