📚 Hindi RAG Voice Demo - Groq Whisper
हिंदी पुस्तक आवाज़ सहायक
Audio transcription limited to first 10 seconds
#!/usr/bin/env python3 """ Hindi RAG Voice Demo - Gradio Implementation (Groq Whisper API Version) A streamlined voice-enabled RAG system for Hindi content using Gradio Uses Groq Whisper API for transcription and assumes PDFs have selectable text """ import gradio as gr import os import tempfile import time import uuid from datetime import datetime import fitz # PyMuPDF import requests import json import numpy as np from sentence_transformers import SentenceTransformer import faiss from groq import Groq from gtts import gTTS import subprocess import warnings warnings.filterwarnings("ignore") # Global configuration CONFIG = { 'PASSCODE': os.getenv('PASSCODE'), 'MAX_FILE_SIZE': 10 * 1024 * 1024, # 10MB 'MAX_QUERIES_PER_SESSION': 5, 'MAX_AUDIO_DURATION': 120, # 2 minutes 'GROQ_API_KEY': os.getenv('GAPI'), 'AUDIO_CLIP_DURATION': 10, # First 10 seconds only 'BOOK_THUMBNAILS_DIR': './book_thumbnails', 'OCR_BOOKS_DIR': './ocr_books', } # Global session storage SESSION_DATA = { 'authenticated': False, 'session_id': str(uuid.uuid4()), 'query_count': 0, 'document_chunks': [], 'faiss_index': None, 'author_name': '', 'book_title': '', 'embedding_model': None, 'groq_client': None } # Initialize models and clients (cached) def load_models(): """Load and cache models and clients""" if SESSION_DATA['embedding_model'] is None: print("Loading embedding model...") SESSION_DATA['embedding_model'] = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') if SESSION_DATA['groq_client'] is None: if CONFIG['GROQ_API_KEY']: print("Initializing Groq client...") SESSION_DATA['groq_client'] = Groq(api_key=CONFIG['GROQ_API_KEY']) else: print("Warning: GROQ_API_KEY not found") return SESSION_DATA['embedding_model'], SESSION_DATA['groq_client'] # Audio processing functions def trim_audio_to_duration(input_path, output_path, duration=10): """Trim audio to specified duration using ffmpeg""" try: # Use ffmpeg to trim audio to first N seconds cmd = [ 'ffmpeg', '-i', input_path, '-t', str(duration), '-acodec', 'copy', '-y', # Overwrite output file output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: return True else: print(f"FFmpeg error: {result.stderr}") return False except Exception as e: print(f"Error trimming audio: {str(e)}") return False def transcribe_audio(audio_file): """Transcribe audio using Groq Whisper API (first 10 seconds only)""" if audio_file is None: return "" if not CONFIG['GROQ_API_KEY'] or SESSION_DATA['groq_client'] is None: return "Error: Groq API key not configured" try: # Create temporary file for trimmed audio with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: trimmed_audio_path = tmp_file.name # Trim audio to first 10 seconds if not trim_audio_to_duration(audio_file, trimmed_audio_path, CONFIG['AUDIO_CLIP_DURATION']): # If trimming fails, use original file but warn user print("Warning: Could not trim audio, using full duration") trimmed_audio_path = audio_file # Transcribe using Groq Whisper API with open(trimmed_audio_path, "rb") as file: transcription = SESSION_DATA['groq_client'].audio.transcriptions.create( file=(os.path.basename(trimmed_audio_path), file.read()), model="whisper-large-v3", response_format="verbose_json", language="hi" # Specify Hindi language ) # Clean up temporary file if we created one if trimmed_audio_path != audio_file: try: os.unlink(trimmed_audio_path) except: pass return transcription.text except Exception as e: # Clean up on error try: if 'trimmed_audio_path' in locals() and trimmed_audio_path != audio_file: os.unlink(trimmed_audio_path) except: pass return f"Transcription error: {str(e)}" def text_to_speech(text): """Convert text to speech in Hindi""" if not text or len(text.strip()) == 0: return None try: tts = gTTS(text=text, lang='hi', slow=False) # Save to temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: tts.save(tmp_file.name) return tmp_file.name except Exception as e: print(f"TTS Error: {str(e)}") return None # Text extraction functions def extract_text_from_txt(txt_path): """Extract text from TXT file""" try: # Try different encodings encodings = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252'] for encoding in encodings: try: with open(txt_path, 'r', encoding=encoding) as file: text_content = file.read() if text_content.strip(): print(f"Successfully extracted {len(text_content)} characters from TXT file using {encoding} encoding") return text_content except UnicodeDecodeError: continue return "Error: Could not decode TXT file with any supported encoding" except Exception as e: print(f"TXT extraction error: {str(e)}") return f"Error extracting text: {str(e)}" def extract_text_from_pdf(pdf_path): """Extract text from PDF using PyMuPDF (assumes selectable text)""" text_content = "" try: pdf_document = fitz.open(pdf_path) total_pages = len(pdf_document) print(f"Processing PDF with {total_pages} pages...") # Process all pages (removed page limit for production use) for page_num in range(total_pages): page = pdf_document.load_page(page_num) page_text = page.get_text() # Add page text if it exists if page_text.strip(): text_content += page_text + "\n" else: print(f"Warning: Page {page_num + 1} appears to have no selectable text") pdf_document.close() if not text_content.strip(): return "Error: No selectable text found in PDF. Please ensure the PDF contains selectable text, not just images." print(f"Successfully extracted {len(text_content)} characters from PDF") return text_content except Exception as e: print(f"PDF extraction error: {str(e)}") return f"Error extracting text: {str(e)}" def extract_text_from_file(file_path): """Extract text from file (supports PDF and TXT)""" file_extension = os.path.splitext(file_path)[1].lower() if file_extension == '.pdf': return extract_text_from_pdf(file_path) elif file_extension == '.txt': return extract_text_from_txt(file_path) else: return f"Error: Unsupported file format {file_extension}. Only PDF and TXT files are supported." def extract_metadata(text): """Extract author name and book title from text""" lines = [line.strip() for line in text.split('\n')[:25] if line.strip()] author_name = "अज्ञात लेखक" book_title = "अनाम पुस्तक" # Simple heuristics for metadata extraction for i, line in enumerate(lines): # Look for author patterns if any(word in line.lower() for word in ['लेखक', 'author', 'by', 'द्वारा', 'रचयिता']): author_name = line # First substantial line might be title elif 10 < len(line) < 100 and not any(char.isdigit() for char in line[:20]): if book_title == "अनाम पुस्तक": book_title = line return author_name, book_title def chunk_text(text, chunk_size=400, overlap=50): """Split text into overlapping chunks""" words = text.split() chunks = [] for i in range(0, len(words), chunk_size - overlap): chunk = ' '.join(words[i:i + chunk_size]) if chunk.strip(): chunks.append(chunk) return chunks # Vector search functions def create_embeddings(chunks): """Create embeddings and FAISS index""" embedding_model, _ = load_models() embeddings = embedding_model.encode(chunks, show_progress_bar=False) # Create FAISS index dimension = embeddings.shape[1] index = faiss.IndexFlatIP(dimension) # Normalize embeddings for cosine similarity faiss.normalize_L2(embeddings) index.add(embeddings.astype('float32')) return index def search_similar_chunks(query, top_k=3): """Search for similar chunks""" if SESSION_DATA['faiss_index'] is None or not SESSION_DATA['document_chunks']: return [] embedding_model, _ = load_models() query_embedding = embedding_model.encode([query], show_progress_bar=False) faiss.normalize_L2(query_embedding) scores, indices = SESSION_DATA['faiss_index'].search(query_embedding.astype('float32'), top_k) results = [] for i, idx in enumerate(indices[0]): if idx >= 0 and idx < len(SESSION_DATA['document_chunks']): results.append({ 'text': SESSION_DATA['document_chunks'][idx], 'score': float(scores[0][i]) }) return results # LLM functions def call_groq_api(prompt, model="llama-3.1-8b-instant"): """Call Groq API for LLM inference""" if not CONFIG['GROQ_API_KEY'] or CONFIG['GROQ_API_KEY'] == 'your_groq_api_key_here': return "⚠️ Groq API key not configured. Please set GROQ_API_KEY environment variable." url = "https://api.groq.com/openai/v1/chat/completions" headers = { "Authorization": f"Bearer {CONFIG['GROQ_API_KEY']}", "Content-Type": "application/json" } data = { "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.7, "max_tokens": 800 } try: response = requests.post(url, headers=headers, json=data, timeout=30) response.raise_for_status() return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Error calling LLM: {str(e)}" def generate_rag_response(query, context_chunks): """Generate response using RAG""" if not context_chunks: return "मुझे इस प्रश्न का उत्तर देने के लिए पर्याप्त जानकारी नहीं मिली।" context = "\n\n".join([chunk['text'] for chunk in context_chunks]) prompt = f"""आप एक हिंदी पुस्तक सहायक हैं। निम्नलिखित जानकारी के आधार पर प्रश्न का उत्तर दें: पुस्तक: {SESSION_DATA['book_title']} लेखक: {SESSION_DATA['author_name']} संदर्भ: {context} प्रश्न: {query} निर्देश: - हिंदी में संक्षिप्त और सटीक उत्तर दें - उत्तर की शुरुआत में पुस्तक और लेखक का संदर्भ शामिल करें - केवल दिए गए संदर्भ के आधार पर ही उत्तर दें """ response = call_groq_api(prompt) return response # Authentication function def authenticate(passcode): """Check passcode authentication""" if passcode == CONFIG['PASSCODE']: SESSION_DATA['authenticated'] = True return gr.update(visible=False), gr.update(visible=True), "✅ Access granted! / पहुंच मिली!" else: return gr.update(visible=True), gr.update(visible=False), "❌ Invalid passcode / गलत पासकोड" # Document processing function def process_document(document_file): """Process uploaded document (PDF or TXT)""" if document_file is None: return "कृपया एक PDF या TXT फ़ाइल अपलोड करें।", "", "", gr.update(visible=False) try: print(f"Processing uploaded file: {document_file.name}") # Check file extension file_extension = os.path.splitext(document_file.name)[1].lower() if file_extension not in ['.pdf', '.txt']: return "केवल PDF और TXT फ़ाइलें समर्थित हैं।", "", "", gr.update(visible=False) # Check file size file_size = os.path.getsize(document_file.name) print(f"File size: {file_size} bytes") if file_size > CONFIG['MAX_FILE_SIZE']: return f"फ़ाइल बहुत बड़ी है! अधिकतम आकार: {CONFIG['MAX_FILE_SIZE'] // (1024*1024)}MB", "", "", gr.update(visible=False) # Extract text using unified function print(f"Extracting text from {file_extension.upper()} file...") text_content = extract_text_from_file(document_file.name) # Check if extraction failed if not text_content.strip(): return "Error: फ़ाइल से टेक्स्ट निकालने में असफल।", "", "", gr.update(visible=False) if text_content.startswith("Error"): return text_content, "", "", gr.update(visible=False) print(f"Text extraction successful. Length: {len(text_content)} characters") # Extract metadata print("Extracting metadata...") author_name, book_title = extract_metadata(text_content) SESSION_DATA['author_name'] = author_name SESSION_DATA['book_title'] = book_title # Create chunks print("Creating text chunks...") chunks = chunk_text(text_content) SESSION_DATA['document_chunks'] = chunks # Create embeddings and index print("Creating embeddings and search index...") SESSION_DATA['faiss_index'] = create_embeddings(chunks) # Reset query count SESSION_DATA['query_count'] = 0 # Calculate statistics word_count = len(text_content.split()) char_count = len(text_content) print(f"Processing complete. Chunks: {len(chunks)}, Words: {word_count}") success_msg = f"""✅ दस्तावेज़ सफलतापूर्वक प्रसंस्करित! 📖 पुस्तक: {book_title} ✍️ लेखक: {author_name} 📄 टेक्स्ट खंड: {len(chunks)} 📊 शब्द संख्या: {word_count:,} 📝 अक्षर संख्या: {char_count:,} अब आप प्रश्न पूछ सकते हैं।""" return success_msg, book_title, author_name, gr.update(visible=True) except Exception as e: error_msg = f"दस्तावेज़ प्रसंस्करण में त्रुटि: {str(e)}" print(f"Error in process_document: {str(e)}") return error_msg, "", "", gr.update(visible=False) # Query processing function def process_query(audio_input, text_input): """Process user query (audio or text)""" if SESSION_DATA['query_count'] >= CONFIG['MAX_QUERIES_PER_SESSION']: return "⚠️ प्रश्न सीमा समाप्त (5 प्रश्न प्रति सत्र)", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}" if not SESSION_DATA['document_chunks']: return "कृपया पहले एक PDF दस्तावेज़ अपलोड करें।", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}" # Get query text query_text = "" if audio_input: query_text = transcribe_audio(audio_input) if "error" in query_text.lower(): query_text = "" if not query_text.strip() and text_input.strip(): query_text = text_input.strip() if not query_text.strip(): return "कृपया आवाज़ या टेक्स्ट के माध्यम से प्रश्न दें।", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}" try: # Search similar chunks similar_chunks = search_similar_chunks(query_text) # Generate response response_text = generate_rag_response(query_text, similar_chunks) # Generate TTS audio_response = text_to_speech(response_text) # Update query count SESSION_DATA['query_count'] += 1 # Format response with context formatted_response = f"""**प्रश्न:** {query_text} **उत्तर:** {response_text} **संदर्भ स्रोत:** """ for i, chunk in enumerate(similar_chunks): formatted_response += f"\n{i+1}. {chunk['text'][:150]}... (स्कोर: {chunk['score']:.3f})" return formatted_response, audio_response, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}" except Exception as e: return f"प्रश्न प्रसंस्करण में त्रुटि: {str(e)}", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}" def reset_session(): """Reset the session""" SESSION_DATA.update({ 'query_count': 0, 'document_chunks': [], 'faiss_index': None, 'author_name': '', 'book_title': '', 'session_id': str(uuid.uuid4()) }) return "✅ नया सत्र शुरू किया गया!", "", "", gr.update(visible=False), "प्रश्न: 0/5" # Book management functions def get_available_books(): """Get list of available books with their thumbnails and document files (PDF/TXT)""" books = [] try: # Get all image files from thumbnails directory thumbnail_dir = CONFIG['BOOK_THUMBNAILS_DIR'] ocr_dir = CONFIG['OCR_BOOKS_DIR'] if os.path.exists(thumbnail_dir): thumbnail_files = [f for f in os.listdir(thumbnail_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))] else: thumbnail_files = [] # Get all supported document files from OCR directory if os.path.exists(ocr_dir): document_files = [f for f in os.listdir(ocr_dir) if f.lower().endswith(('.pdf', '.txt'))] else: document_files = [] # Create book entries for document files for doc_file in document_files: book_name = os.path.splitext(doc_file)[0] file_extension = os.path.splitext(doc_file)[1].lower() # Look for matching thumbnail thumbnail_path = None for thumb_file in thumbnail_files: thumb_name = os.path.splitext(thumb_file)[0] if thumb_name.lower() == book_name.lower(): thumbnail_path = os.path.join(thumbnail_dir, thumb_file) break # If no matching thumbnail found, use a default placeholder if not thumbnail_path: # Create a simple text-based placeholder placeholder_path = create_text_placeholder(book_name) thumbnail_path = placeholder_path books.append({ 'name': book_name, 'display_name': f"{book_name.replace('_', ' ').title()} ({file_extension.upper()})", 'document_file': os.path.join(ocr_dir, doc_file), 'file_type': file_extension, 'thumbnail': thumbnail_path }) return books except Exception as e: print(f"Error getting available books: {str(e)}") return [] def create_text_placeholder(book_name): """Create a simple text placeholder image for books without thumbnails""" try: import matplotlib.pyplot as plt import matplotlib.patches as patches # Create a simple text-based image fig, ax = plt.subplots(1, 1, figsize=(3, 4)) ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.axis('off') # Add background rect = patches.Rectangle((0, 0), 1, 1, linewidth=2, edgecolor='#2E86AB', facecolor='#E8F4FD') ax.add_patch(rect) # Add text ax.text(0.5, 0.5, book_name.replace('_', '\n'), ha='center', va='center', fontsize=10, weight='bold', color='#2E86AB') # Save to temporary file placeholder_path = os.path.join(tempfile.gettempdir(), f"{book_name}_placeholder.png") plt.savefig(placeholder_path, dpi=100, bbox_inches='tight') plt.close() return placeholder_path except Exception as e: print(f"Error creating placeholder: {str(e)}") return None def load_book_document(book_info): """Load text content from a pre-existing document (PDF or TXT)""" try: # Extract text from document using the unified function text_content = extract_text_from_file(book_info['document_file']) if not text_content.strip() or "Error" in text_content: return text_content return text_content except Exception as e: return f"Error loading document: {str(e)}" def process_selected_book(selected_book_name): """Process a pre-selected book""" if not selected_book_name or selected_book_name == "None": return "कृपया एक पुस्तक चुनें।", "", "", gr.update(visible=False) try: # Get available books available_books = get_available_books() # Find the selected book selected_book = None for book in available_books: if book['name'] == selected_book_name: selected_book = book break if not selected_book: return "चुनी गई पुस्तक नहीं मिली।", "", "", gr.update(visible=False) # Load document content and extract text text_content = load_book_document(selected_book) if not text_content.strip() or "Error" in text_content: return text_content, "", "", gr.update(visible=False) # Extract metadata (use book name if no metadata found in text) author_name, book_title = extract_metadata(text_content) # If metadata extraction didn't work well, use the book name if author_name == "अज्ञात लेखक": author_name = "संग्रहित पुस्तक" if book_title == "अनाम पुस्तक": book_title = selected_book['display_name'] SESSION_DATA['author_name'] = author_name SESSION_DATA['book_title'] = book_title # Create chunks chunks = chunk_text(text_content) SESSION_DATA['document_chunks'] = chunks # Create embeddings and index print("Creating embeddings and search index for selected book...") SESSION_DATA['faiss_index'] = create_embeddings(chunks) # Reset query count SESSION_DATA['query_count'] = 0 # Calculate statistics word_count = len(text_content.split()) char_count = len(text_content) success_msg = f"""✅ पुस्तक सफलतापूर्वक लोड की गई! 📖 पुस्तक: {book_title} ✍️ लेखक: {author_name} 📄 टेक्स्ट खंड: {len(chunks)} 📊 शब्द संख्या: {word_count:,} 📝 अक्षर संख्या: {char_count:,} अब आप प्रश्न पूछ सकते हैं।""" return success_msg, book_title, author_name, gr.update(visible=True) except Exception as e: return f"पुस्तक लोड करने में त्रुटि: {str(e)}", "", "", gr.update(visible=False) def create_book_gallery(): """Create a gallery of available books with thumbnails""" available_books = get_available_books() if not available_books: return [], "कोई पुस्तक उपलब्ध नहीं है।" # Create gallery data: list of (image_path, title) tuples gallery_data = [] book_names = ["None"] # Add None option for book in available_books: if book['thumbnail'] and os.path.exists(book['thumbnail']): gallery_data.append((book['thumbnail'], book['display_name'])) book_names.append(book['name']) return gallery_data, book_names def handle_gallery_selection(evt: gr.SelectData): """Handle book selection from gallery click""" if evt.index is None: return "None" # Get available books to map gallery index to book name available_books = get_available_books() # Filter books that have valid thumbnails (same as in create_book_gallery) valid_books = [] for book in available_books: if book['thumbnail'] and os.path.exists(book['thumbnail']): valid_books.append(book) # Check if the selected index is valid if 0 <= evt.index < len(valid_books): selected_book = valid_books[evt.index] return selected_book['name'] return "None" # LFS file handling for Hugging Face Spaces def ensure_lfs_files_downloaded(): """Ensure LFS files are downloaded in Hugging Face Spaces environment""" try: # Check if we're in a Hugging Face Spaces environment if os.getenv('SPACE_ID') or os.getenv('HUGGINGFACE_HUB_CACHE'): print("🔄 Detected Hugging Face Spaces environment, checking LFS files...") # Check if document files exist and are not LFS pointers ocr_dir = CONFIG['OCR_BOOKS_DIR'] if os.path.exists(ocr_dir): document_files = [f for f in os.listdir(ocr_dir) if f.lower().endswith(('.pdf', '.txt'))] for doc_file in document_files: doc_path = os.path.join(ocr_dir, doc_file) # Check if file is an LFS pointer (small text file) if os.path.exists(doc_path): file_size = os.path.getsize(doc_path) # LFS pointer files are typically very small (< 200 bytes) # But TXT files might legitimately be small, so only check PDFs for LFS if file_size < 200 and doc_file.lower().endswith('.pdf'): print(f"📁 {doc_file} appears to be an LFS pointer, attempting download...") # Try to download using git lfs pull for this specific file try: result = subprocess.run( ['git', 'lfs', 'pull', '--include', f"ocr_books/{doc_file}"], cwd=os.getcwd(), capture_output=True, text=True, timeout=60 ) if result.returncode == 0: print(f"✅ Successfully downloaded {doc_file}") else: print(f"⚠️ Could not download {doc_file}: {result.stderr}") except subprocess.TimeoutExpired: print(f"⏰ Timeout downloading {doc_file}") except Exception as e: print(f"❌ Error downloading {doc_file}: {str(e)}") else: file_type = "PDF" if doc_file.lower().endswith('.pdf') else "TXT" print(f"✅ {doc_file} ({file_type}) already available ({file_size:,} bytes)") # Also check thumbnails thumbnail_dir = CONFIG['BOOK_THUMBNAILS_DIR'] if os.path.exists(thumbnail_dir): image_files = [f for f in os.listdir(thumbnail_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))] for img_file in image_files: img_path = os.path.join(thumbnail_dir, img_file) if os.path.exists(img_path): file_size = os.path.getsize(img_path) if file_size < 200: # Likely an LFS pointer print(f"📁 {img_file} appears to be an LFS pointer, attempting download...") try: result = subprocess.run( ['git', 'lfs', 'pull', '--include', f"book_thumbnails/{img_file}"], cwd=os.getcwd(), capture_output=True, text=True, timeout=30 ) if result.returncode == 0: print(f"✅ Successfully downloaded {img_file}") except Exception as e: print(f"❌ Error downloading {img_file}: {str(e)}") except Exception as e: print(f"⚠️ Error checking LFS files: {str(e)}") # Create Gradio interface def create_interface(): """Create the Gradio interface""" with gr.Blocks( title="Hindi RAG Voice Demo - Groq Whisper", theme=gr.themes.Soft(), css=""" .main-header { text-align: center; color: #2E86AB; margin-bottom: 2rem; } .section-header { color: #A23B72; font-weight: bold; margin: 1rem 0; } .info-box { background: #F18F01; color: white; padding: 1rem; border-radius: 8px; margin: 1rem 0; } """ ) as demo: gr.HTML("""
Audio transcription limited to first 10 seconds