Nimadi-Rag / app.py
pranavinani
added text file
3f24fca
#!/usr/bin/env python3
"""
Hindi RAG Voice Demo - Gradio Implementation (Groq Whisper API Version)
A streamlined voice-enabled RAG system for Hindi content using Gradio
Uses Groq Whisper API for transcription and assumes PDFs have selectable text
"""
import gradio as gr
import os
import tempfile
import time
import uuid
from datetime import datetime
import fitz # PyMuPDF
import requests
import json
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
from groq import Groq
from gtts import gTTS
import subprocess
import warnings
warnings.filterwarnings("ignore")
# Global configuration
CONFIG = {
'PASSCODE': os.getenv('PASSCODE'),
'MAX_FILE_SIZE': 10 * 1024 * 1024, # 10MB
'MAX_QUERIES_PER_SESSION': 5,
'MAX_AUDIO_DURATION': 120, # 2 minutes
'GROQ_API_KEY': os.getenv('GAPI'),
'AUDIO_CLIP_DURATION': 10, # First 10 seconds only
'BOOK_THUMBNAILS_DIR': './book_thumbnails',
'OCR_BOOKS_DIR': './ocr_books',
}
# Global session storage
SESSION_DATA = {
'authenticated': False,
'session_id': str(uuid.uuid4()),
'query_count': 0,
'document_chunks': [],
'faiss_index': None,
'author_name': '',
'book_title': '',
'embedding_model': None,
'groq_client': None
}
# Initialize models and clients (cached)
def load_models():
"""Load and cache models and clients"""
if SESSION_DATA['embedding_model'] is None:
print("Loading embedding model...")
SESSION_DATA['embedding_model'] = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
if SESSION_DATA['groq_client'] is None:
if CONFIG['GROQ_API_KEY']:
print("Initializing Groq client...")
SESSION_DATA['groq_client'] = Groq(api_key=CONFIG['GROQ_API_KEY'])
else:
print("Warning: GROQ_API_KEY not found")
return SESSION_DATA['embedding_model'], SESSION_DATA['groq_client']
# Audio processing functions
def trim_audio_to_duration(input_path, output_path, duration=10):
"""Trim audio to specified duration using ffmpeg"""
try:
# Use ffmpeg to trim audio to first N seconds
cmd = [
'ffmpeg', '-i', input_path,
'-t', str(duration),
'-acodec', 'copy',
'-y', # Overwrite output file
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
return True
else:
print(f"FFmpeg error: {result.stderr}")
return False
except Exception as e:
print(f"Error trimming audio: {str(e)}")
return False
def transcribe_audio(audio_file):
"""Transcribe audio using Groq Whisper API (first 10 seconds only)"""
if audio_file is None:
return ""
if not CONFIG['GROQ_API_KEY'] or SESSION_DATA['groq_client'] is None:
return "Error: Groq API key not configured"
try:
# Create temporary file for trimmed audio
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
trimmed_audio_path = tmp_file.name
# Trim audio to first 10 seconds
if not trim_audio_to_duration(audio_file, trimmed_audio_path, CONFIG['AUDIO_CLIP_DURATION']):
# If trimming fails, use original file but warn user
print("Warning: Could not trim audio, using full duration")
trimmed_audio_path = audio_file
# Transcribe using Groq Whisper API
with open(trimmed_audio_path, "rb") as file:
transcription = SESSION_DATA['groq_client'].audio.transcriptions.create(
file=(os.path.basename(trimmed_audio_path), file.read()),
model="whisper-large-v3",
response_format="verbose_json",
language="hi" # Specify Hindi language
)
# Clean up temporary file if we created one
if trimmed_audio_path != audio_file:
try:
os.unlink(trimmed_audio_path)
except:
pass
return transcription.text
except Exception as e:
# Clean up on error
try:
if 'trimmed_audio_path' in locals() and trimmed_audio_path != audio_file:
os.unlink(trimmed_audio_path)
except:
pass
return f"Transcription error: {str(e)}"
def text_to_speech(text):
"""Convert text to speech in Hindi"""
if not text or len(text.strip()) == 0:
return None
try:
tts = gTTS(text=text, lang='hi', slow=False)
# Save to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
tts.save(tmp_file.name)
return tmp_file.name
except Exception as e:
print(f"TTS Error: {str(e)}")
return None
# Text extraction functions
def extract_text_from_txt(txt_path):
"""Extract text from TXT file"""
try:
# Try different encodings
encodings = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252']
for encoding in encodings:
try:
with open(txt_path, 'r', encoding=encoding) as file:
text_content = file.read()
if text_content.strip():
print(f"Successfully extracted {len(text_content)} characters from TXT file using {encoding} encoding")
return text_content
except UnicodeDecodeError:
continue
return "Error: Could not decode TXT file with any supported encoding"
except Exception as e:
print(f"TXT extraction error: {str(e)}")
return f"Error extracting text: {str(e)}"
def extract_text_from_pdf(pdf_path):
"""Extract text from PDF using PyMuPDF (assumes selectable text)"""
text_content = ""
try:
pdf_document = fitz.open(pdf_path)
total_pages = len(pdf_document)
print(f"Processing PDF with {total_pages} pages...")
# Process all pages (removed page limit for production use)
for page_num in range(total_pages):
page = pdf_document.load_page(page_num)
page_text = page.get_text()
# Add page text if it exists
if page_text.strip():
text_content += page_text + "\n"
else:
print(f"Warning: Page {page_num + 1} appears to have no selectable text")
pdf_document.close()
if not text_content.strip():
return "Error: No selectable text found in PDF. Please ensure the PDF contains selectable text, not just images."
print(f"Successfully extracted {len(text_content)} characters from PDF")
return text_content
except Exception as e:
print(f"PDF extraction error: {str(e)}")
return f"Error extracting text: {str(e)}"
def extract_text_from_file(file_path):
"""Extract text from file (supports PDF and TXT)"""
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension == '.pdf':
return extract_text_from_pdf(file_path)
elif file_extension == '.txt':
return extract_text_from_txt(file_path)
else:
return f"Error: Unsupported file format {file_extension}. Only PDF and TXT files are supported."
def extract_metadata(text):
"""Extract author name and book title from text"""
lines = [line.strip() for line in text.split('\n')[:25] if line.strip()]
author_name = "अज्ञात लेखक"
book_title = "अनाम पुस्तक"
# Simple heuristics for metadata extraction
for i, line in enumerate(lines):
# Look for author patterns
if any(word in line.lower() for word in ['लेखक', 'author', 'by', 'द्वारा', 'रचयिता']):
author_name = line
# First substantial line might be title
elif 10 < len(line) < 100 and not any(char.isdigit() for char in line[:20]):
if book_title == "अनाम पुस्तक":
book_title = line
return author_name, book_title
def chunk_text(text, chunk_size=400, overlap=50):
"""Split text into overlapping chunks"""
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk = ' '.join(words[i:i + chunk_size])
if chunk.strip():
chunks.append(chunk)
return chunks
# Vector search functions
def create_embeddings(chunks):
"""Create embeddings and FAISS index"""
embedding_model, _ = load_models()
embeddings = embedding_model.encode(chunks, show_progress_bar=False)
# Create FAISS index
dimension = embeddings.shape[1]
index = faiss.IndexFlatIP(dimension)
# Normalize embeddings for cosine similarity
faiss.normalize_L2(embeddings)
index.add(embeddings.astype('float32'))
return index
def search_similar_chunks(query, top_k=3):
"""Search for similar chunks"""
if SESSION_DATA['faiss_index'] is None or not SESSION_DATA['document_chunks']:
return []
embedding_model, _ = load_models()
query_embedding = embedding_model.encode([query], show_progress_bar=False)
faiss.normalize_L2(query_embedding)
scores, indices = SESSION_DATA['faiss_index'].search(query_embedding.astype('float32'), top_k)
results = []
for i, idx in enumerate(indices[0]):
if idx >= 0 and idx < len(SESSION_DATA['document_chunks']):
results.append({
'text': SESSION_DATA['document_chunks'][idx],
'score': float(scores[0][i])
})
return results
# LLM functions
def call_groq_api(prompt, model="llama-3.1-8b-instant"):
"""Call Groq API for LLM inference"""
if not CONFIG['GROQ_API_KEY'] or CONFIG['GROQ_API_KEY'] == 'your_groq_api_key_here':
return "⚠️ Groq API key not configured. Please set GROQ_API_KEY environment variable."
url = "https://api.groq.com/openai/v1/chat/completions"
headers = {
"Authorization": f"Bearer {CONFIG['GROQ_API_KEY']}",
"Content-Type": "application/json"
}
data = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.7,
"max_tokens": 800
}
try:
response = requests.post(url, headers=headers, json=data, timeout=30)
response.raise_for_status()
return response.json()['choices'][0]['message']['content']
except Exception as e:
return f"Error calling LLM: {str(e)}"
def generate_rag_response(query, context_chunks):
"""Generate response using RAG"""
if not context_chunks:
return "मुझे इस प्रश्न का उत्तर देने के लिए पर्याप्त जानकारी नहीं मिली।"
context = "\n\n".join([chunk['text'] for chunk in context_chunks])
prompt = f"""आप एक हिंदी पुस्तक सहायक हैं। निम्नलिखित जानकारी के आधार पर प्रश्न का उत्तर दें:
पुस्तक: {SESSION_DATA['book_title']}
लेखक: {SESSION_DATA['author_name']}
संदर्भ:
{context}
प्रश्न: {query}
निर्देश:
- हिंदी में संक्षिप्त और सटीक उत्तर दें
- उत्तर की शुरुआत में पुस्तक और लेखक का संदर्भ शामिल करें
- केवल दिए गए संदर्भ के आधार पर ही उत्तर दें
"""
response = call_groq_api(prompt)
return response
# Authentication function
def authenticate(passcode):
"""Check passcode authentication"""
if passcode == CONFIG['PASSCODE']:
SESSION_DATA['authenticated'] = True
return gr.update(visible=False), gr.update(visible=True), "✅ Access granted! / पहुंच मिली!"
else:
return gr.update(visible=True), gr.update(visible=False), "❌ Invalid passcode / गलत पासकोड"
# Document processing function
def process_document(document_file):
"""Process uploaded document (PDF or TXT)"""
if document_file is None:
return "कृपया एक PDF या TXT फ़ाइल अपलोड करें।", "", "", gr.update(visible=False)
try:
print(f"Processing uploaded file: {document_file.name}")
# Check file extension
file_extension = os.path.splitext(document_file.name)[1].lower()
if file_extension not in ['.pdf', '.txt']:
return "केवल PDF और TXT फ़ाइलें समर्थित हैं।", "", "", gr.update(visible=False)
# Check file size
file_size = os.path.getsize(document_file.name)
print(f"File size: {file_size} bytes")
if file_size > CONFIG['MAX_FILE_SIZE']:
return f"फ़ाइल बहुत बड़ी है! अधिकतम आकार: {CONFIG['MAX_FILE_SIZE'] // (1024*1024)}MB", "", "", gr.update(visible=False)
# Extract text using unified function
print(f"Extracting text from {file_extension.upper()} file...")
text_content = extract_text_from_file(document_file.name)
# Check if extraction failed
if not text_content.strip():
return "Error: फ़ाइल से टेक्स्ट निकालने में असफल।", "", "", gr.update(visible=False)
if text_content.startswith("Error"):
return text_content, "", "", gr.update(visible=False)
print(f"Text extraction successful. Length: {len(text_content)} characters")
# Extract metadata
print("Extracting metadata...")
author_name, book_title = extract_metadata(text_content)
SESSION_DATA['author_name'] = author_name
SESSION_DATA['book_title'] = book_title
# Create chunks
print("Creating text chunks...")
chunks = chunk_text(text_content)
SESSION_DATA['document_chunks'] = chunks
# Create embeddings and index
print("Creating embeddings and search index...")
SESSION_DATA['faiss_index'] = create_embeddings(chunks)
# Reset query count
SESSION_DATA['query_count'] = 0
# Calculate statistics
word_count = len(text_content.split())
char_count = len(text_content)
print(f"Processing complete. Chunks: {len(chunks)}, Words: {word_count}")
success_msg = f"""✅ दस्तावेज़ सफलतापूर्वक प्रसंस्करित!
📖 पुस्तक: {book_title}
✍️ लेखक: {author_name}
📄 टेक्स्ट खंड: {len(chunks)}
📊 शब्द संख्या: {word_count:,}
📝 अक्षर संख्या: {char_count:,}
अब आप प्रश्न पूछ सकते हैं।"""
return success_msg, book_title, author_name, gr.update(visible=True)
except Exception as e:
error_msg = f"दस्तावेज़ प्रसंस्करण में त्रुटि: {str(e)}"
print(f"Error in process_document: {str(e)}")
return error_msg, "", "", gr.update(visible=False)
# Query processing function
def process_query(audio_input, text_input):
"""Process user query (audio or text)"""
if SESSION_DATA['query_count'] >= CONFIG['MAX_QUERIES_PER_SESSION']:
return "⚠️ प्रश्न सीमा समाप्त (5 प्रश्न प्रति सत्र)", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}"
if not SESSION_DATA['document_chunks']:
return "कृपया पहले एक PDF दस्तावेज़ अपलोड करें।", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}"
# Get query text
query_text = ""
if audio_input:
query_text = transcribe_audio(audio_input)
if "error" in query_text.lower():
query_text = ""
if not query_text.strip() and text_input.strip():
query_text = text_input.strip()
if not query_text.strip():
return "कृपया आवाज़ या टेक्स्ट के माध्यम से प्रश्न दें।", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}"
try:
# Search similar chunks
similar_chunks = search_similar_chunks(query_text)
# Generate response
response_text = generate_rag_response(query_text, similar_chunks)
# Generate TTS
audio_response = text_to_speech(response_text)
# Update query count
SESSION_DATA['query_count'] += 1
# Format response with context
formatted_response = f"""**प्रश्न:** {query_text}
**उत्तर:**
{response_text}
**संदर्भ स्रोत:**
"""
for i, chunk in enumerate(similar_chunks):
formatted_response += f"\n{i+1}. {chunk['text'][:150]}... (स्कोर: {chunk['score']:.3f})"
return formatted_response, audio_response, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}"
except Exception as e:
return f"प्रश्न प्रसंस्करण में त्रुटि: {str(e)}", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}"
def reset_session():
"""Reset the session"""
SESSION_DATA.update({
'query_count': 0,
'document_chunks': [],
'faiss_index': None,
'author_name': '',
'book_title': '',
'session_id': str(uuid.uuid4())
})
return "✅ नया सत्र शुरू किया गया!", "", "", gr.update(visible=False), "प्रश्न: 0/5"
# Book management functions
def get_available_books():
"""Get list of available books with their thumbnails and document files (PDF/TXT)"""
books = []
try:
# Get all image files from thumbnails directory
thumbnail_dir = CONFIG['BOOK_THUMBNAILS_DIR']
ocr_dir = CONFIG['OCR_BOOKS_DIR']
if os.path.exists(thumbnail_dir):
thumbnail_files = [f for f in os.listdir(thumbnail_dir)
if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))]
else:
thumbnail_files = []
# Get all supported document files from OCR directory
if os.path.exists(ocr_dir):
document_files = [f for f in os.listdir(ocr_dir)
if f.lower().endswith(('.pdf', '.txt'))]
else:
document_files = []
# Create book entries for document files
for doc_file in document_files:
book_name = os.path.splitext(doc_file)[0]
file_extension = os.path.splitext(doc_file)[1].lower()
# Look for matching thumbnail
thumbnail_path = None
for thumb_file in thumbnail_files:
thumb_name = os.path.splitext(thumb_file)[0]
if thumb_name.lower() == book_name.lower():
thumbnail_path = os.path.join(thumbnail_dir, thumb_file)
break
# If no matching thumbnail found, use a default placeholder
if not thumbnail_path:
# Create a simple text-based placeholder
placeholder_path = create_text_placeholder(book_name)
thumbnail_path = placeholder_path
books.append({
'name': book_name,
'display_name': f"{book_name.replace('_', ' ').title()} ({file_extension.upper()})",
'document_file': os.path.join(ocr_dir, doc_file),
'file_type': file_extension,
'thumbnail': thumbnail_path
})
return books
except Exception as e:
print(f"Error getting available books: {str(e)}")
return []
def create_text_placeholder(book_name):
"""Create a simple text placeholder image for books without thumbnails"""
try:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
# Create a simple text-based image
fig, ax = plt.subplots(1, 1, figsize=(3, 4))
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.axis('off')
# Add background
rect = patches.Rectangle((0, 0), 1, 1, linewidth=2, edgecolor='#2E86AB', facecolor='#E8F4FD')
ax.add_patch(rect)
# Add text
ax.text(0.5, 0.5, book_name.replace('_', '\n'),
ha='center', va='center', fontsize=10, weight='bold', color='#2E86AB')
# Save to temporary file
placeholder_path = os.path.join(tempfile.gettempdir(), f"{book_name}_placeholder.png")
plt.savefig(placeholder_path, dpi=100, bbox_inches='tight')
plt.close()
return placeholder_path
except Exception as e:
print(f"Error creating placeholder: {str(e)}")
return None
def load_book_document(book_info):
"""Load text content from a pre-existing document (PDF or TXT)"""
try:
# Extract text from document using the unified function
text_content = extract_text_from_file(book_info['document_file'])
if not text_content.strip() or "Error" in text_content:
return text_content
return text_content
except Exception as e:
return f"Error loading document: {str(e)}"
def process_selected_book(selected_book_name):
"""Process a pre-selected book"""
if not selected_book_name or selected_book_name == "None":
return "कृपया एक पुस्तक चुनें।", "", "", gr.update(visible=False)
try:
# Get available books
available_books = get_available_books()
# Find the selected book
selected_book = None
for book in available_books:
if book['name'] == selected_book_name:
selected_book = book
break
if not selected_book:
return "चुनी गई पुस्तक नहीं मिली।", "", "", gr.update(visible=False)
# Load document content and extract text
text_content = load_book_document(selected_book)
if not text_content.strip() or "Error" in text_content:
return text_content, "", "", gr.update(visible=False)
# Extract metadata (use book name if no metadata found in text)
author_name, book_title = extract_metadata(text_content)
# If metadata extraction didn't work well, use the book name
if author_name == "अज्ञात लेखक":
author_name = "संग्रहित पुस्तक"
if book_title == "अनाम पुस्तक":
book_title = selected_book['display_name']
SESSION_DATA['author_name'] = author_name
SESSION_DATA['book_title'] = book_title
# Create chunks
chunks = chunk_text(text_content)
SESSION_DATA['document_chunks'] = chunks
# Create embeddings and index
print("Creating embeddings and search index for selected book...")
SESSION_DATA['faiss_index'] = create_embeddings(chunks)
# Reset query count
SESSION_DATA['query_count'] = 0
# Calculate statistics
word_count = len(text_content.split())
char_count = len(text_content)
success_msg = f"""✅ पुस्तक सफलतापूर्वक लोड की गई!
📖 पुस्तक: {book_title}
✍️ लेखक: {author_name}
📄 टेक्स्ट खंड: {len(chunks)}
📊 शब्द संख्या: {word_count:,}
📝 अक्षर संख्या: {char_count:,}
अब आप प्रश्न पूछ सकते हैं।"""
return success_msg, book_title, author_name, gr.update(visible=True)
except Exception as e:
return f"पुस्तक लोड करने में त्रुटि: {str(e)}", "", "", gr.update(visible=False)
def create_book_gallery():
"""Create a gallery of available books with thumbnails"""
available_books = get_available_books()
if not available_books:
return [], "कोई पुस्तक उपलब्ध नहीं है।"
# Create gallery data: list of (image_path, title) tuples
gallery_data = []
book_names = ["None"] # Add None option
for book in available_books:
if book['thumbnail'] and os.path.exists(book['thumbnail']):
gallery_data.append((book['thumbnail'], book['display_name']))
book_names.append(book['name'])
return gallery_data, book_names
def handle_gallery_selection(evt: gr.SelectData):
"""Handle book selection from gallery click"""
if evt.index is None:
return "None"
# Get available books to map gallery index to book name
available_books = get_available_books()
# Filter books that have valid thumbnails (same as in create_book_gallery)
valid_books = []
for book in available_books:
if book['thumbnail'] and os.path.exists(book['thumbnail']):
valid_books.append(book)
# Check if the selected index is valid
if 0 <= evt.index < len(valid_books):
selected_book = valid_books[evt.index]
return selected_book['name']
return "None"
# LFS file handling for Hugging Face Spaces
def ensure_lfs_files_downloaded():
"""Ensure LFS files are downloaded in Hugging Face Spaces environment"""
try:
# Check if we're in a Hugging Face Spaces environment
if os.getenv('SPACE_ID') or os.getenv('HUGGINGFACE_HUB_CACHE'):
print("🔄 Detected Hugging Face Spaces environment, checking LFS files...")
# Check if document files exist and are not LFS pointers
ocr_dir = CONFIG['OCR_BOOKS_DIR']
if os.path.exists(ocr_dir):
document_files = [f for f in os.listdir(ocr_dir) if f.lower().endswith(('.pdf', '.txt'))]
for doc_file in document_files:
doc_path = os.path.join(ocr_dir, doc_file)
# Check if file is an LFS pointer (small text file)
if os.path.exists(doc_path):
file_size = os.path.getsize(doc_path)
# LFS pointer files are typically very small (< 200 bytes)
# But TXT files might legitimately be small, so only check PDFs for LFS
if file_size < 200 and doc_file.lower().endswith('.pdf'):
print(f"📁 {doc_file} appears to be an LFS pointer, attempting download...")
# Try to download using git lfs pull for this specific file
try:
result = subprocess.run(
['git', 'lfs', 'pull', '--include', f"ocr_books/{doc_file}"],
cwd=os.getcwd(),
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
print(f"✅ Successfully downloaded {doc_file}")
else:
print(f"⚠️ Could not download {doc_file}: {result.stderr}")
except subprocess.TimeoutExpired:
print(f"⏰ Timeout downloading {doc_file}")
except Exception as e:
print(f"❌ Error downloading {doc_file}: {str(e)}")
else:
file_type = "PDF" if doc_file.lower().endswith('.pdf') else "TXT"
print(f"✅ {doc_file} ({file_type}) already available ({file_size:,} bytes)")
# Also check thumbnails
thumbnail_dir = CONFIG['BOOK_THUMBNAILS_DIR']
if os.path.exists(thumbnail_dir):
image_files = [f for f in os.listdir(thumbnail_dir)
if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))]
for img_file in image_files:
img_path = os.path.join(thumbnail_dir, img_file)
if os.path.exists(img_path):
file_size = os.path.getsize(img_path)
if file_size < 200: # Likely an LFS pointer
print(f"📁 {img_file} appears to be an LFS pointer, attempting download...")
try:
result = subprocess.run(
['git', 'lfs', 'pull', '--include', f"book_thumbnails/{img_file}"],
cwd=os.getcwd(),
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
print(f"✅ Successfully downloaded {img_file}")
except Exception as e:
print(f"❌ Error downloading {img_file}: {str(e)}")
except Exception as e:
print(f"⚠️ Error checking LFS files: {str(e)}")
# Create Gradio interface
def create_interface():
"""Create the Gradio interface"""
with gr.Blocks(
title="Hindi RAG Voice Demo - Groq Whisper",
theme=gr.themes.Soft(),
css="""
.main-header { text-align: center; color: #2E86AB; margin-bottom: 2rem; }
.section-header { color: #A23B72; font-weight: bold; margin: 1rem 0; }
.info-box { background: #F18F01; color: white; padding: 1rem; border-radius: 8px; margin: 1rem 0; }
"""
) as demo:
gr.HTML("""
<div class="main-header">
<h1>📚 Hindi RAG Voice Demo - Groq Whisper</h1>
<h3>हिंदी पुस्तक आवाज़ सहायक</h3>
<p><em>Audio transcription limited to first 10 seconds</em></p>
</div>
""")
# Authentication section
with gr.Group(visible=True) as auth_section:
gr.Markdown("### 🔐 Access Control / पहुंच नियंत्रण")
gr.Markdown("Please enter the passcode to access the demo / कृपया डेमो एक्सेस करने के लिए पासकोड दर्ज करें")
passcode_input = gr.Textbox(
label="Passcode / पासकोड",
type="password",
placeholder="Enter passcode here..."
)
auth_button = gr.Button("🔓 Access Demo / डेमो एक्सेस करें", variant="primary")
auth_status = gr.Textbox(label="Status", interactive=False)
# Main application section
with gr.Group(visible=False) as main_section:
# Session info
with gr.Row():
with gr.Column(scale=3):
gr.Markdown("### 📊 Session Information")
with gr.Column(scale=1):
query_counter = gr.Textbox(
label="Query Usage",
value="प्रश्न: 0/5",
interactive=False
)
# Document selection/upload section
gr.Markdown("### 📁 Step 1: Choose Your Book / अपनी पुस्तक चुनें")
# Book selection section
with gr.Tab("📚 Select from Library / पुस्तकालय से चुनें"):
gr.Markdown("**Choose from available books / उपलब्ध पुस्तकों में से चुनें**")
# Initialize book gallery and dropdown
available_books = get_available_books()
gallery_data, book_options = create_book_gallery()
# Always create these components, even if no books are available
if available_books:
book_gallery = gr.Gallery(
value=gallery_data,
label="Available Books / उपलब्ध पुस्तकें",
show_label=True,
elem_id="book_gallery",
columns=3,
rows=2,
height="auto",
allow_preview=True
)
book_dropdown = gr.Dropdown(
choices=book_options,
label="Select Book / पुस्तक चुनें",
value="None",
interactive=True
)
select_book_btn = gr.Button("📖 Load Selected Book / चुनी गई पुस्तक लोड करें", variant="primary")
else:
gr.Markdown("⚠️ No books available in library / पुस्तकालय में कोई पुस्तक उपलब्ध नहीं है")
book_gallery = None
book_dropdown = gr.Dropdown(choices=["None"], value="None", visible=False)
select_book_btn = gr.Button("No books available", interactive=False)
# PDF/TXT upload section
with gr.Tab("📄 Upload Document / दस्तावेज़ अपलोड करें"):
gr.Markdown("**Upload your own PDF or TXT file / अपनी PDF या TXT फ़ाइल अपलोड करें**")
gr.Markdown("**Note:** For PDF files, please ensure they contain selectable text (not scanned images)")
document_upload = gr.File(
label="Upload PDF or TXT / PDF या TXT अपलोड करें",
file_types=[".pdf", ".txt"],
type="filepath"
)
process_document_btn = gr.Button("📖 Process Document / दस्तावेज़ प्रसंस्करित करें", variant="primary")
doc_status = gr.Textbox(label="Processing Status / प्रसंस्करण स्थिति", interactive=False)
with gr.Row():
book_title_display = gr.Textbox(label="Book Title / पुस्तक शीर्षक", interactive=False)
author_display = gr.Textbox(label="Author / लेखक", interactive=False)
# Query section
with gr.Group(visible=False) as query_section:
gr.Markdown("### 🎤 Step 2: Ask Questions / प्रश्न पूछें")
gr.Markdown("**Note:** Audio recordings are limited to first 10 seconds for transcription")
with gr.Row():
with gr.Column():
audio_input = gr.Audio(
label="🎙️ Record Voice Question / आवाज़ प्रश्न रिकॉर्ड करें",
sources=["microphone"],
type="filepath"
)
with gr.Column():
text_input = gr.Textbox(
label="💬 Or Type Question / या प्रश्न टाइप करें",
placeholder="उदाहरण: इस पुस्तक में मुख्य विषय क्या है?",
lines=3
)
ask_button = gr.Button("🔍 Get Answer / उत्तर पाएं", variant="primary", size="lg")
# Response section
with gr.Column():
response_text = gr.Textbox(
label="📝 Response / उत्तर",
lines=8,
interactive=False
)
response_audio = gr.Audio(
label="🔊 Audio Response / आवाज़ उत्तर",
interactive=False
)
# Reset section
gr.Markdown("---")
with gr.Row():
reset_btn = gr.Button("🔄 Start New Session / नया सत्र शुरू करें", variant="secondary")
with gr.Column():
gr.Markdown("""
**Requirements & Limits / आवश्यकताएं और सीमा:**
- PDF with selectable text (no scanned images) or TXT files
- Max file size: 10MB
- Max queries: 5 per session
- Audio transcription: First 10 seconds only
- Supported: Hindi & English text
- Requires: Groq API key and ffmpeg
""")
# Event handlers
auth_button.click(
authenticate,
inputs=[passcode_input],
outputs=[auth_section, main_section, auth_status]
)
# Document upload event handler - Always available
process_document_btn.click(
process_document,
inputs=[document_upload],
outputs=[doc_status, book_title_display, author_display, query_section]
)
# Book selection event handler - Only if books are available
if available_books:
select_book_btn.click(
process_selected_book,
inputs=[book_dropdown],
outputs=[doc_status, book_title_display, author_display, query_section]
)
# Gallery selection event handler - Only if gallery exists
if book_gallery is not None:
book_gallery.select(
handle_gallery_selection,
outputs=[book_dropdown]
)
ask_button.click(
process_query,
inputs=[audio_input, text_input],
outputs=[response_text, response_audio, query_counter]
)
reset_btn.click(
reset_session,
outputs=[doc_status, book_title_display, author_display, query_section, query_counter]
)
# Load models on startup
demo.load(load_models)
return demo
# Main function
def main():
"""Main function to launch the application"""
print("🚀 Starting Hindi RAG Voice Demo (Groq Whisper API Version)...")
# Ensure LFS files are available (important for Hugging Face Spaces)
ensure_lfs_files_downloaded()
print("📋 Loading AI models (this may take a moment)...")
# Pre-load models
load_models()
# Create and launch interface
demo = create_interface()
print("✅ Models loaded successfully!")
print(f"🔑 Demo passcode: {CONFIG['PASSCODE']}")
print("🌐 Launching web interface...")
demo.launch(
share=True,
show_error=True,
)
if __name__ == "__main__":
main()