Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Hindi RAG Voice Demo - Gradio Implementation (Groq Whisper API Version) | |
| A streamlined voice-enabled RAG system for Hindi content using Gradio | |
| Uses Groq Whisper API for transcription and assumes PDFs have selectable text | |
| """ | |
| import gradio as gr | |
| import os | |
| import tempfile | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| import fitz # PyMuPDF | |
| import requests | |
| import json | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| from groq import Groq | |
| from gtts import gTTS | |
| import subprocess | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| # Global configuration | |
| CONFIG = { | |
| 'PASSCODE': os.getenv('PASSCODE'), | |
| 'MAX_FILE_SIZE': 10 * 1024 * 1024, # 10MB | |
| 'MAX_QUERIES_PER_SESSION': 5, | |
| 'MAX_AUDIO_DURATION': 120, # 2 minutes | |
| 'GROQ_API_KEY': os.getenv('GAPI'), | |
| 'AUDIO_CLIP_DURATION': 10, # First 10 seconds only | |
| 'BOOK_THUMBNAILS_DIR': './book_thumbnails', | |
| 'OCR_BOOKS_DIR': './ocr_books', | |
| } | |
| # Global session storage | |
| SESSION_DATA = { | |
| 'authenticated': False, | |
| 'session_id': str(uuid.uuid4()), | |
| 'query_count': 0, | |
| 'document_chunks': [], | |
| 'faiss_index': None, | |
| 'author_name': '', | |
| 'book_title': '', | |
| 'embedding_model': None, | |
| 'groq_client': None | |
| } | |
| # Initialize models and clients (cached) | |
| def load_models(): | |
| """Load and cache models and clients""" | |
| if SESSION_DATA['embedding_model'] is None: | |
| print("Loading embedding model...") | |
| SESSION_DATA['embedding_model'] = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') | |
| if SESSION_DATA['groq_client'] is None: | |
| if CONFIG['GROQ_API_KEY']: | |
| print("Initializing Groq client...") | |
| SESSION_DATA['groq_client'] = Groq(api_key=CONFIG['GROQ_API_KEY']) | |
| else: | |
| print("Warning: GROQ_API_KEY not found") | |
| return SESSION_DATA['embedding_model'], SESSION_DATA['groq_client'] | |
| # Audio processing functions | |
| def trim_audio_to_duration(input_path, output_path, duration=10): | |
| """Trim audio to specified duration using ffmpeg""" | |
| try: | |
| # Use ffmpeg to trim audio to first N seconds | |
| cmd = [ | |
| 'ffmpeg', '-i', input_path, | |
| '-t', str(duration), | |
| '-acodec', 'copy', | |
| '-y', # Overwrite output file | |
| output_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| return True | |
| else: | |
| print(f"FFmpeg error: {result.stderr}") | |
| return False | |
| except Exception as e: | |
| print(f"Error trimming audio: {str(e)}") | |
| return False | |
| def transcribe_audio(audio_file): | |
| """Transcribe audio using Groq Whisper API (first 10 seconds only)""" | |
| if audio_file is None: | |
| return "" | |
| if not CONFIG['GROQ_API_KEY'] or SESSION_DATA['groq_client'] is None: | |
| return "Error: Groq API key not configured" | |
| try: | |
| # Create temporary file for trimmed audio | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | |
| trimmed_audio_path = tmp_file.name | |
| # Trim audio to first 10 seconds | |
| if not trim_audio_to_duration(audio_file, trimmed_audio_path, CONFIG['AUDIO_CLIP_DURATION']): | |
| # If trimming fails, use original file but warn user | |
| print("Warning: Could not trim audio, using full duration") | |
| trimmed_audio_path = audio_file | |
| # Transcribe using Groq Whisper API | |
| with open(trimmed_audio_path, "rb") as file: | |
| transcription = SESSION_DATA['groq_client'].audio.transcriptions.create( | |
| file=(os.path.basename(trimmed_audio_path), file.read()), | |
| model="whisper-large-v3", | |
| response_format="verbose_json", | |
| language="hi" # Specify Hindi language | |
| ) | |
| # Clean up temporary file if we created one | |
| if trimmed_audio_path != audio_file: | |
| try: | |
| os.unlink(trimmed_audio_path) | |
| except: | |
| pass | |
| return transcription.text | |
| except Exception as e: | |
| # Clean up on error | |
| try: | |
| if 'trimmed_audio_path' in locals() and trimmed_audio_path != audio_file: | |
| os.unlink(trimmed_audio_path) | |
| except: | |
| pass | |
| return f"Transcription error: {str(e)}" | |
| def text_to_speech(text): | |
| """Convert text to speech in Hindi""" | |
| if not text or len(text.strip()) == 0: | |
| return None | |
| try: | |
| tts = gTTS(text=text, lang='hi', slow=False) | |
| # Save to temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
| tts.save(tmp_file.name) | |
| return tmp_file.name | |
| except Exception as e: | |
| print(f"TTS Error: {str(e)}") | |
| return None | |
| # Text extraction functions | |
| def extract_text_from_txt(txt_path): | |
| """Extract text from TXT file""" | |
| try: | |
| # Try different encodings | |
| encodings = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252'] | |
| for encoding in encodings: | |
| try: | |
| with open(txt_path, 'r', encoding=encoding) as file: | |
| text_content = file.read() | |
| if text_content.strip(): | |
| print(f"Successfully extracted {len(text_content)} characters from TXT file using {encoding} encoding") | |
| return text_content | |
| except UnicodeDecodeError: | |
| continue | |
| return "Error: Could not decode TXT file with any supported encoding" | |
| except Exception as e: | |
| print(f"TXT extraction error: {str(e)}") | |
| return f"Error extracting text: {str(e)}" | |
| def extract_text_from_pdf(pdf_path): | |
| """Extract text from PDF using PyMuPDF (assumes selectable text)""" | |
| text_content = "" | |
| try: | |
| pdf_document = fitz.open(pdf_path) | |
| total_pages = len(pdf_document) | |
| print(f"Processing PDF with {total_pages} pages...") | |
| # Process all pages (removed page limit for production use) | |
| for page_num in range(total_pages): | |
| page = pdf_document.load_page(page_num) | |
| page_text = page.get_text() | |
| # Add page text if it exists | |
| if page_text.strip(): | |
| text_content += page_text + "\n" | |
| else: | |
| print(f"Warning: Page {page_num + 1} appears to have no selectable text") | |
| pdf_document.close() | |
| if not text_content.strip(): | |
| return "Error: No selectable text found in PDF. Please ensure the PDF contains selectable text, not just images." | |
| print(f"Successfully extracted {len(text_content)} characters from PDF") | |
| return text_content | |
| except Exception as e: | |
| print(f"PDF extraction error: {str(e)}") | |
| return f"Error extracting text: {str(e)}" | |
| def extract_text_from_file(file_path): | |
| """Extract text from file (supports PDF and TXT)""" | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| if file_extension == '.pdf': | |
| return extract_text_from_pdf(file_path) | |
| elif file_extension == '.txt': | |
| return extract_text_from_txt(file_path) | |
| else: | |
| return f"Error: Unsupported file format {file_extension}. Only PDF and TXT files are supported." | |
| def extract_metadata(text): | |
| """Extract author name and book title from text""" | |
| lines = [line.strip() for line in text.split('\n')[:25] if line.strip()] | |
| author_name = "अज्ञात लेखक" | |
| book_title = "अनाम पुस्तक" | |
| # Simple heuristics for metadata extraction | |
| for i, line in enumerate(lines): | |
| # Look for author patterns | |
| if any(word in line.lower() for word in ['लेखक', 'author', 'by', 'द्वारा', 'रचयिता']): | |
| author_name = line | |
| # First substantial line might be title | |
| elif 10 < len(line) < 100 and not any(char.isdigit() for char in line[:20]): | |
| if book_title == "अनाम पुस्तक": | |
| book_title = line | |
| return author_name, book_title | |
| def chunk_text(text, chunk_size=400, overlap=50): | |
| """Split text into overlapping chunks""" | |
| words = text.split() | |
| chunks = [] | |
| for i in range(0, len(words), chunk_size - overlap): | |
| chunk = ' '.join(words[i:i + chunk_size]) | |
| if chunk.strip(): | |
| chunks.append(chunk) | |
| return chunks | |
| # Vector search functions | |
| def create_embeddings(chunks): | |
| """Create embeddings and FAISS index""" | |
| embedding_model, _ = load_models() | |
| embeddings = embedding_model.encode(chunks, show_progress_bar=False) | |
| # Create FAISS index | |
| dimension = embeddings.shape[1] | |
| index = faiss.IndexFlatIP(dimension) | |
| # Normalize embeddings for cosine similarity | |
| faiss.normalize_L2(embeddings) | |
| index.add(embeddings.astype('float32')) | |
| return index | |
| def search_similar_chunks(query, top_k=3): | |
| """Search for similar chunks""" | |
| if SESSION_DATA['faiss_index'] is None or not SESSION_DATA['document_chunks']: | |
| return [] | |
| embedding_model, _ = load_models() | |
| query_embedding = embedding_model.encode([query], show_progress_bar=False) | |
| faiss.normalize_L2(query_embedding) | |
| scores, indices = SESSION_DATA['faiss_index'].search(query_embedding.astype('float32'), top_k) | |
| results = [] | |
| for i, idx in enumerate(indices[0]): | |
| if idx >= 0 and idx < len(SESSION_DATA['document_chunks']): | |
| results.append({ | |
| 'text': SESSION_DATA['document_chunks'][idx], | |
| 'score': float(scores[0][i]) | |
| }) | |
| return results | |
| # LLM functions | |
| def call_groq_api(prompt, model="llama-3.1-8b-instant"): | |
| """Call Groq API for LLM inference""" | |
| if not CONFIG['GROQ_API_KEY'] or CONFIG['GROQ_API_KEY'] == 'your_groq_api_key_here': | |
| return "⚠️ Groq API key not configured. Please set GROQ_API_KEY environment variable." | |
| url = "https://api.groq.com/openai/v1/chat/completions" | |
| headers = { | |
| "Authorization": f"Bearer {CONFIG['GROQ_API_KEY']}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "model": model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "temperature": 0.7, | |
| "max_tokens": 800 | |
| } | |
| try: | |
| response = requests.post(url, headers=headers, json=data, timeout=30) | |
| response.raise_for_status() | |
| return response.json()['choices'][0]['message']['content'] | |
| except Exception as e: | |
| return f"Error calling LLM: {str(e)}" | |
| def generate_rag_response(query, context_chunks): | |
| """Generate response using RAG""" | |
| if not context_chunks: | |
| return "मुझे इस प्रश्न का उत्तर देने के लिए पर्याप्त जानकारी नहीं मिली।" | |
| context = "\n\n".join([chunk['text'] for chunk in context_chunks]) | |
| prompt = f"""आप एक हिंदी पुस्तक सहायक हैं। निम्नलिखित जानकारी के आधार पर प्रश्न का उत्तर दें: | |
| पुस्तक: {SESSION_DATA['book_title']} | |
| लेखक: {SESSION_DATA['author_name']} | |
| संदर्भ: | |
| {context} | |
| प्रश्न: {query} | |
| निर्देश: | |
| - हिंदी में संक्षिप्त और सटीक उत्तर दें | |
| - उत्तर की शुरुआत में पुस्तक और लेखक का संदर्भ शामिल करें | |
| - केवल दिए गए संदर्भ के आधार पर ही उत्तर दें | |
| """ | |
| response = call_groq_api(prompt) | |
| return response | |
| # Authentication function | |
| def authenticate(passcode): | |
| """Check passcode authentication""" | |
| if passcode == CONFIG['PASSCODE']: | |
| SESSION_DATA['authenticated'] = True | |
| return gr.update(visible=False), gr.update(visible=True), "✅ Access granted! / पहुंच मिली!" | |
| else: | |
| return gr.update(visible=True), gr.update(visible=False), "❌ Invalid passcode / गलत पासकोड" | |
| # Document processing function | |
| def process_document(document_file): | |
| """Process uploaded document (PDF or TXT)""" | |
| if document_file is None: | |
| return "कृपया एक PDF या TXT फ़ाइल अपलोड करें।", "", "", gr.update(visible=False) | |
| try: | |
| print(f"Processing uploaded file: {document_file.name}") | |
| # Check file extension | |
| file_extension = os.path.splitext(document_file.name)[1].lower() | |
| if file_extension not in ['.pdf', '.txt']: | |
| return "केवल PDF और TXT फ़ाइलें समर्थित हैं।", "", "", gr.update(visible=False) | |
| # Check file size | |
| file_size = os.path.getsize(document_file.name) | |
| print(f"File size: {file_size} bytes") | |
| if file_size > CONFIG['MAX_FILE_SIZE']: | |
| return f"फ़ाइल बहुत बड़ी है! अधिकतम आकार: {CONFIG['MAX_FILE_SIZE'] // (1024*1024)}MB", "", "", gr.update(visible=False) | |
| # Extract text using unified function | |
| print(f"Extracting text from {file_extension.upper()} file...") | |
| text_content = extract_text_from_file(document_file.name) | |
| # Check if extraction failed | |
| if not text_content.strip(): | |
| return "Error: फ़ाइल से टेक्स्ट निकालने में असफल।", "", "", gr.update(visible=False) | |
| if text_content.startswith("Error"): | |
| return text_content, "", "", gr.update(visible=False) | |
| print(f"Text extraction successful. Length: {len(text_content)} characters") | |
| # Extract metadata | |
| print("Extracting metadata...") | |
| author_name, book_title = extract_metadata(text_content) | |
| SESSION_DATA['author_name'] = author_name | |
| SESSION_DATA['book_title'] = book_title | |
| # Create chunks | |
| print("Creating text chunks...") | |
| chunks = chunk_text(text_content) | |
| SESSION_DATA['document_chunks'] = chunks | |
| # Create embeddings and index | |
| print("Creating embeddings and search index...") | |
| SESSION_DATA['faiss_index'] = create_embeddings(chunks) | |
| # Reset query count | |
| SESSION_DATA['query_count'] = 0 | |
| # Calculate statistics | |
| word_count = len(text_content.split()) | |
| char_count = len(text_content) | |
| print(f"Processing complete. Chunks: {len(chunks)}, Words: {word_count}") | |
| success_msg = f"""✅ दस्तावेज़ सफलतापूर्वक प्रसंस्करित! | |
| 📖 पुस्तक: {book_title} | |
| ✍️ लेखक: {author_name} | |
| 📄 टेक्स्ट खंड: {len(chunks)} | |
| 📊 शब्द संख्या: {word_count:,} | |
| 📝 अक्षर संख्या: {char_count:,} | |
| अब आप प्रश्न पूछ सकते हैं।""" | |
| return success_msg, book_title, author_name, gr.update(visible=True) | |
| except Exception as e: | |
| error_msg = f"दस्तावेज़ प्रसंस्करण में त्रुटि: {str(e)}" | |
| print(f"Error in process_document: {str(e)}") | |
| return error_msg, "", "", gr.update(visible=False) | |
| # Query processing function | |
| def process_query(audio_input, text_input): | |
| """Process user query (audio or text)""" | |
| if SESSION_DATA['query_count'] >= CONFIG['MAX_QUERIES_PER_SESSION']: | |
| return "⚠️ प्रश्न सीमा समाप्त (5 प्रश्न प्रति सत्र)", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}" | |
| if not SESSION_DATA['document_chunks']: | |
| return "कृपया पहले एक PDF दस्तावेज़ अपलोड करें।", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}" | |
| # Get query text | |
| query_text = "" | |
| if audio_input: | |
| query_text = transcribe_audio(audio_input) | |
| if "error" in query_text.lower(): | |
| query_text = "" | |
| if not query_text.strip() and text_input.strip(): | |
| query_text = text_input.strip() | |
| if not query_text.strip(): | |
| return "कृपया आवाज़ या टेक्स्ट के माध्यम से प्रश्न दें।", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}" | |
| try: | |
| # Search similar chunks | |
| similar_chunks = search_similar_chunks(query_text) | |
| # Generate response | |
| response_text = generate_rag_response(query_text, similar_chunks) | |
| # Generate TTS | |
| audio_response = text_to_speech(response_text) | |
| # Update query count | |
| SESSION_DATA['query_count'] += 1 | |
| # Format response with context | |
| formatted_response = f"""**प्रश्न:** {query_text} | |
| **उत्तर:** | |
| {response_text} | |
| **संदर्भ स्रोत:** | |
| """ | |
| for i, chunk in enumerate(similar_chunks): | |
| formatted_response += f"\n{i+1}. {chunk['text'][:150]}... (स्कोर: {chunk['score']:.3f})" | |
| return formatted_response, audio_response, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}" | |
| except Exception as e: | |
| return f"प्रश्न प्रसंस्करण में त्रुटि: {str(e)}", None, f"प्रश्न: {SESSION_DATA['query_count']}/{CONFIG['MAX_QUERIES_PER_SESSION']}" | |
| def reset_session(): | |
| """Reset the session""" | |
| SESSION_DATA.update({ | |
| 'query_count': 0, | |
| 'document_chunks': [], | |
| 'faiss_index': None, | |
| 'author_name': '', | |
| 'book_title': '', | |
| 'session_id': str(uuid.uuid4()) | |
| }) | |
| return "✅ नया सत्र शुरू किया गया!", "", "", gr.update(visible=False), "प्रश्न: 0/5" | |
| # Book management functions | |
| def get_available_books(): | |
| """Get list of available books with their thumbnails and document files (PDF/TXT)""" | |
| books = [] | |
| try: | |
| # Get all image files from thumbnails directory | |
| thumbnail_dir = CONFIG['BOOK_THUMBNAILS_DIR'] | |
| ocr_dir = CONFIG['OCR_BOOKS_DIR'] | |
| if os.path.exists(thumbnail_dir): | |
| thumbnail_files = [f for f in os.listdir(thumbnail_dir) | |
| if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))] | |
| else: | |
| thumbnail_files = [] | |
| # Get all supported document files from OCR directory | |
| if os.path.exists(ocr_dir): | |
| document_files = [f for f in os.listdir(ocr_dir) | |
| if f.lower().endswith(('.pdf', '.txt'))] | |
| else: | |
| document_files = [] | |
| # Create book entries for document files | |
| for doc_file in document_files: | |
| book_name = os.path.splitext(doc_file)[0] | |
| file_extension = os.path.splitext(doc_file)[1].lower() | |
| # Look for matching thumbnail | |
| thumbnail_path = None | |
| for thumb_file in thumbnail_files: | |
| thumb_name = os.path.splitext(thumb_file)[0] | |
| if thumb_name.lower() == book_name.lower(): | |
| thumbnail_path = os.path.join(thumbnail_dir, thumb_file) | |
| break | |
| # If no matching thumbnail found, use a default placeholder | |
| if not thumbnail_path: | |
| # Create a simple text-based placeholder | |
| placeholder_path = create_text_placeholder(book_name) | |
| thumbnail_path = placeholder_path | |
| books.append({ | |
| 'name': book_name, | |
| 'display_name': f"{book_name.replace('_', ' ').title()} ({file_extension.upper()})", | |
| 'document_file': os.path.join(ocr_dir, doc_file), | |
| 'file_type': file_extension, | |
| 'thumbnail': thumbnail_path | |
| }) | |
| return books | |
| except Exception as e: | |
| print(f"Error getting available books: {str(e)}") | |
| return [] | |
| def create_text_placeholder(book_name): | |
| """Create a simple text placeholder image for books without thumbnails""" | |
| try: | |
| import matplotlib.pyplot as plt | |
| import matplotlib.patches as patches | |
| # Create a simple text-based image | |
| fig, ax = plt.subplots(1, 1, figsize=(3, 4)) | |
| ax.set_xlim(0, 1) | |
| ax.set_ylim(0, 1) | |
| ax.axis('off') | |
| # Add background | |
| rect = patches.Rectangle((0, 0), 1, 1, linewidth=2, edgecolor='#2E86AB', facecolor='#E8F4FD') | |
| ax.add_patch(rect) | |
| # Add text | |
| ax.text(0.5, 0.5, book_name.replace('_', '\n'), | |
| ha='center', va='center', fontsize=10, weight='bold', color='#2E86AB') | |
| # Save to temporary file | |
| placeholder_path = os.path.join(tempfile.gettempdir(), f"{book_name}_placeholder.png") | |
| plt.savefig(placeholder_path, dpi=100, bbox_inches='tight') | |
| plt.close() | |
| return placeholder_path | |
| except Exception as e: | |
| print(f"Error creating placeholder: {str(e)}") | |
| return None | |
| def load_book_document(book_info): | |
| """Load text content from a pre-existing document (PDF or TXT)""" | |
| try: | |
| # Extract text from document using the unified function | |
| text_content = extract_text_from_file(book_info['document_file']) | |
| if not text_content.strip() or "Error" in text_content: | |
| return text_content | |
| return text_content | |
| except Exception as e: | |
| return f"Error loading document: {str(e)}" | |
| def process_selected_book(selected_book_name): | |
| """Process a pre-selected book""" | |
| if not selected_book_name or selected_book_name == "None": | |
| return "कृपया एक पुस्तक चुनें।", "", "", gr.update(visible=False) | |
| try: | |
| # Get available books | |
| available_books = get_available_books() | |
| # Find the selected book | |
| selected_book = None | |
| for book in available_books: | |
| if book['name'] == selected_book_name: | |
| selected_book = book | |
| break | |
| if not selected_book: | |
| return "चुनी गई पुस्तक नहीं मिली।", "", "", gr.update(visible=False) | |
| # Load document content and extract text | |
| text_content = load_book_document(selected_book) | |
| if not text_content.strip() or "Error" in text_content: | |
| return text_content, "", "", gr.update(visible=False) | |
| # Extract metadata (use book name if no metadata found in text) | |
| author_name, book_title = extract_metadata(text_content) | |
| # If metadata extraction didn't work well, use the book name | |
| if author_name == "अज्ञात लेखक": | |
| author_name = "संग्रहित पुस्तक" | |
| if book_title == "अनाम पुस्तक": | |
| book_title = selected_book['display_name'] | |
| SESSION_DATA['author_name'] = author_name | |
| SESSION_DATA['book_title'] = book_title | |
| # Create chunks | |
| chunks = chunk_text(text_content) | |
| SESSION_DATA['document_chunks'] = chunks | |
| # Create embeddings and index | |
| print("Creating embeddings and search index for selected book...") | |
| SESSION_DATA['faiss_index'] = create_embeddings(chunks) | |
| # Reset query count | |
| SESSION_DATA['query_count'] = 0 | |
| # Calculate statistics | |
| word_count = len(text_content.split()) | |
| char_count = len(text_content) | |
| success_msg = f"""✅ पुस्तक सफलतापूर्वक लोड की गई! | |
| 📖 पुस्तक: {book_title} | |
| ✍️ लेखक: {author_name} | |
| 📄 टेक्स्ट खंड: {len(chunks)} | |
| 📊 शब्द संख्या: {word_count:,} | |
| 📝 अक्षर संख्या: {char_count:,} | |
| अब आप प्रश्न पूछ सकते हैं।""" | |
| return success_msg, book_title, author_name, gr.update(visible=True) | |
| except Exception as e: | |
| return f"पुस्तक लोड करने में त्रुटि: {str(e)}", "", "", gr.update(visible=False) | |
| def create_book_gallery(): | |
| """Create a gallery of available books with thumbnails""" | |
| available_books = get_available_books() | |
| if not available_books: | |
| return [], "कोई पुस्तक उपलब्ध नहीं है।" | |
| # Create gallery data: list of (image_path, title) tuples | |
| gallery_data = [] | |
| book_names = ["None"] # Add None option | |
| for book in available_books: | |
| if book['thumbnail'] and os.path.exists(book['thumbnail']): | |
| gallery_data.append((book['thumbnail'], book['display_name'])) | |
| book_names.append(book['name']) | |
| return gallery_data, book_names | |
| def handle_gallery_selection(evt: gr.SelectData): | |
| """Handle book selection from gallery click""" | |
| if evt.index is None: | |
| return "None" | |
| # Get available books to map gallery index to book name | |
| available_books = get_available_books() | |
| # Filter books that have valid thumbnails (same as in create_book_gallery) | |
| valid_books = [] | |
| for book in available_books: | |
| if book['thumbnail'] and os.path.exists(book['thumbnail']): | |
| valid_books.append(book) | |
| # Check if the selected index is valid | |
| if 0 <= evt.index < len(valid_books): | |
| selected_book = valid_books[evt.index] | |
| return selected_book['name'] | |
| return "None" | |
| # LFS file handling for Hugging Face Spaces | |
| def ensure_lfs_files_downloaded(): | |
| """Ensure LFS files are downloaded in Hugging Face Spaces environment""" | |
| try: | |
| # Check if we're in a Hugging Face Spaces environment | |
| if os.getenv('SPACE_ID') or os.getenv('HUGGINGFACE_HUB_CACHE'): | |
| print("🔄 Detected Hugging Face Spaces environment, checking LFS files...") | |
| # Check if document files exist and are not LFS pointers | |
| ocr_dir = CONFIG['OCR_BOOKS_DIR'] | |
| if os.path.exists(ocr_dir): | |
| document_files = [f for f in os.listdir(ocr_dir) if f.lower().endswith(('.pdf', '.txt'))] | |
| for doc_file in document_files: | |
| doc_path = os.path.join(ocr_dir, doc_file) | |
| # Check if file is an LFS pointer (small text file) | |
| if os.path.exists(doc_path): | |
| file_size = os.path.getsize(doc_path) | |
| # LFS pointer files are typically very small (< 200 bytes) | |
| # But TXT files might legitimately be small, so only check PDFs for LFS | |
| if file_size < 200 and doc_file.lower().endswith('.pdf'): | |
| print(f"📁 {doc_file} appears to be an LFS pointer, attempting download...") | |
| # Try to download using git lfs pull for this specific file | |
| try: | |
| result = subprocess.run( | |
| ['git', 'lfs', 'pull', '--include', f"ocr_books/{doc_file}"], | |
| cwd=os.getcwd(), | |
| capture_output=True, | |
| text=True, | |
| timeout=60 | |
| ) | |
| if result.returncode == 0: | |
| print(f"✅ Successfully downloaded {doc_file}") | |
| else: | |
| print(f"⚠️ Could not download {doc_file}: {result.stderr}") | |
| except subprocess.TimeoutExpired: | |
| print(f"⏰ Timeout downloading {doc_file}") | |
| except Exception as e: | |
| print(f"❌ Error downloading {doc_file}: {str(e)}") | |
| else: | |
| file_type = "PDF" if doc_file.lower().endswith('.pdf') else "TXT" | |
| print(f"✅ {doc_file} ({file_type}) already available ({file_size:,} bytes)") | |
| # Also check thumbnails | |
| thumbnail_dir = CONFIG['BOOK_THUMBNAILS_DIR'] | |
| if os.path.exists(thumbnail_dir): | |
| image_files = [f for f in os.listdir(thumbnail_dir) | |
| if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))] | |
| for img_file in image_files: | |
| img_path = os.path.join(thumbnail_dir, img_file) | |
| if os.path.exists(img_path): | |
| file_size = os.path.getsize(img_path) | |
| if file_size < 200: # Likely an LFS pointer | |
| print(f"📁 {img_file} appears to be an LFS pointer, attempting download...") | |
| try: | |
| result = subprocess.run( | |
| ['git', 'lfs', 'pull', '--include', f"book_thumbnails/{img_file}"], | |
| cwd=os.getcwd(), | |
| capture_output=True, | |
| text=True, | |
| timeout=30 | |
| ) | |
| if result.returncode == 0: | |
| print(f"✅ Successfully downloaded {img_file}") | |
| except Exception as e: | |
| print(f"❌ Error downloading {img_file}: {str(e)}") | |
| except Exception as e: | |
| print(f"⚠️ Error checking LFS files: {str(e)}") | |
| # Create Gradio interface | |
| def create_interface(): | |
| """Create the Gradio interface""" | |
| with gr.Blocks( | |
| title="Hindi RAG Voice Demo - Groq Whisper", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .main-header { text-align: center; color: #2E86AB; margin-bottom: 2rem; } | |
| .section-header { color: #A23B72; font-weight: bold; margin: 1rem 0; } | |
| .info-box { background: #F18F01; color: white; padding: 1rem; border-radius: 8px; margin: 1rem 0; } | |
| """ | |
| ) as demo: | |
| gr.HTML(""" | |
| <div class="main-header"> | |
| <h1>📚 Hindi RAG Voice Demo - Groq Whisper</h1> | |
| <h3>हिंदी पुस्तक आवाज़ सहायक</h3> | |
| <p><em>Audio transcription limited to first 10 seconds</em></p> | |
| </div> | |
| """) | |
| # Authentication section | |
| with gr.Group(visible=True) as auth_section: | |
| gr.Markdown("### 🔐 Access Control / पहुंच नियंत्रण") | |
| gr.Markdown("Please enter the passcode to access the demo / कृपया डेमो एक्सेस करने के लिए पासकोड दर्ज करें") | |
| passcode_input = gr.Textbox( | |
| label="Passcode / पासकोड", | |
| type="password", | |
| placeholder="Enter passcode here..." | |
| ) | |
| auth_button = gr.Button("🔓 Access Demo / डेमो एक्सेस करें", variant="primary") | |
| auth_status = gr.Textbox(label="Status", interactive=False) | |
| # Main application section | |
| with gr.Group(visible=False) as main_section: | |
| # Session info | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| gr.Markdown("### 📊 Session Information") | |
| with gr.Column(scale=1): | |
| query_counter = gr.Textbox( | |
| label="Query Usage", | |
| value="प्रश्न: 0/5", | |
| interactive=False | |
| ) | |
| # Document selection/upload section | |
| gr.Markdown("### 📁 Step 1: Choose Your Book / अपनी पुस्तक चुनें") | |
| # Book selection section | |
| with gr.Tab("📚 Select from Library / पुस्तकालय से चुनें"): | |
| gr.Markdown("**Choose from available books / उपलब्ध पुस्तकों में से चुनें**") | |
| # Initialize book gallery and dropdown | |
| available_books = get_available_books() | |
| gallery_data, book_options = create_book_gallery() | |
| # Always create these components, even if no books are available | |
| if available_books: | |
| book_gallery = gr.Gallery( | |
| value=gallery_data, | |
| label="Available Books / उपलब्ध पुस्तकें", | |
| show_label=True, | |
| elem_id="book_gallery", | |
| columns=3, | |
| rows=2, | |
| height="auto", | |
| allow_preview=True | |
| ) | |
| book_dropdown = gr.Dropdown( | |
| choices=book_options, | |
| label="Select Book / पुस्तक चुनें", | |
| value="None", | |
| interactive=True | |
| ) | |
| select_book_btn = gr.Button("📖 Load Selected Book / चुनी गई पुस्तक लोड करें", variant="primary") | |
| else: | |
| gr.Markdown("⚠️ No books available in library / पुस्तकालय में कोई पुस्तक उपलब्ध नहीं है") | |
| book_gallery = None | |
| book_dropdown = gr.Dropdown(choices=["None"], value="None", visible=False) | |
| select_book_btn = gr.Button("No books available", interactive=False) | |
| # PDF/TXT upload section | |
| with gr.Tab("📄 Upload Document / दस्तावेज़ अपलोड करें"): | |
| gr.Markdown("**Upload your own PDF or TXT file / अपनी PDF या TXT फ़ाइल अपलोड करें**") | |
| gr.Markdown("**Note:** For PDF files, please ensure they contain selectable text (not scanned images)") | |
| document_upload = gr.File( | |
| label="Upload PDF or TXT / PDF या TXT अपलोड करें", | |
| file_types=[".pdf", ".txt"], | |
| type="filepath" | |
| ) | |
| process_document_btn = gr.Button("📖 Process Document / दस्तावेज़ प्रसंस्करित करें", variant="primary") | |
| doc_status = gr.Textbox(label="Processing Status / प्रसंस्करण स्थिति", interactive=False) | |
| with gr.Row(): | |
| book_title_display = gr.Textbox(label="Book Title / पुस्तक शीर्षक", interactive=False) | |
| author_display = gr.Textbox(label="Author / लेखक", interactive=False) | |
| # Query section | |
| with gr.Group(visible=False) as query_section: | |
| gr.Markdown("### 🎤 Step 2: Ask Questions / प्रश्न पूछें") | |
| gr.Markdown("**Note:** Audio recordings are limited to first 10 seconds for transcription") | |
| with gr.Row(): | |
| with gr.Column(): | |
| audio_input = gr.Audio( | |
| label="🎙️ Record Voice Question / आवाज़ प्रश्न रिकॉर्ड करें", | |
| sources=["microphone"], | |
| type="filepath" | |
| ) | |
| with gr.Column(): | |
| text_input = gr.Textbox( | |
| label="💬 Or Type Question / या प्रश्न टाइप करें", | |
| placeholder="उदाहरण: इस पुस्तक में मुख्य विषय क्या है?", | |
| lines=3 | |
| ) | |
| ask_button = gr.Button("🔍 Get Answer / उत्तर पाएं", variant="primary", size="lg") | |
| # Response section | |
| with gr.Column(): | |
| response_text = gr.Textbox( | |
| label="📝 Response / उत्तर", | |
| lines=8, | |
| interactive=False | |
| ) | |
| response_audio = gr.Audio( | |
| label="🔊 Audio Response / आवाज़ उत्तर", | |
| interactive=False | |
| ) | |
| # Reset section | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| reset_btn = gr.Button("🔄 Start New Session / नया सत्र शुरू करें", variant="secondary") | |
| with gr.Column(): | |
| gr.Markdown(""" | |
| **Requirements & Limits / आवश्यकताएं और सीमा:** | |
| - PDF with selectable text (no scanned images) or TXT files | |
| - Max file size: 10MB | |
| - Max queries: 5 per session | |
| - Audio transcription: First 10 seconds only | |
| - Supported: Hindi & English text | |
| - Requires: Groq API key and ffmpeg | |
| """) | |
| # Event handlers | |
| auth_button.click( | |
| authenticate, | |
| inputs=[passcode_input], | |
| outputs=[auth_section, main_section, auth_status] | |
| ) | |
| # Document upload event handler - Always available | |
| process_document_btn.click( | |
| process_document, | |
| inputs=[document_upload], | |
| outputs=[doc_status, book_title_display, author_display, query_section] | |
| ) | |
| # Book selection event handler - Only if books are available | |
| if available_books: | |
| select_book_btn.click( | |
| process_selected_book, | |
| inputs=[book_dropdown], | |
| outputs=[doc_status, book_title_display, author_display, query_section] | |
| ) | |
| # Gallery selection event handler - Only if gallery exists | |
| if book_gallery is not None: | |
| book_gallery.select( | |
| handle_gallery_selection, | |
| outputs=[book_dropdown] | |
| ) | |
| ask_button.click( | |
| process_query, | |
| inputs=[audio_input, text_input], | |
| outputs=[response_text, response_audio, query_counter] | |
| ) | |
| reset_btn.click( | |
| reset_session, | |
| outputs=[doc_status, book_title_display, author_display, query_section, query_counter] | |
| ) | |
| # Load models on startup | |
| demo.load(load_models) | |
| return demo | |
| # Main function | |
| def main(): | |
| """Main function to launch the application""" | |
| print("🚀 Starting Hindi RAG Voice Demo (Groq Whisper API Version)...") | |
| # Ensure LFS files are available (important for Hugging Face Spaces) | |
| ensure_lfs_files_downloaded() | |
| print("📋 Loading AI models (this may take a moment)...") | |
| # Pre-load models | |
| load_models() | |
| # Create and launch interface | |
| demo = create_interface() | |
| print("✅ Models loaded successfully!") | |
| print(f"🔑 Demo passcode: {CONFIG['PASSCODE']}") | |
| print("🌐 Launching web interface...") | |
| demo.launch( | |
| share=True, | |
| show_error=True, | |
| ) | |
| if __name__ == "__main__": | |
| main() |