Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import fitz | |
| import numpy as np | |
| import requests | |
| import faiss | |
| import re | |
| import json | |
| import pandas as pd | |
| from docx import Document | |
| from pptx import Presentation | |
| from sentence_transformers import SentenceTransformer | |
| from concurrent.futures import ThreadPoolExecutor | |
| # Configuration | |
| GROQ_API_KEY = "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" # 🔑 REPLACE WITH YOUR ACTUAL KEY | |
| MODEL_NAME = "all-MiniLM-L6-v2" | |
| CHUNK_SIZE = 1024 #512 | |
| MAX_TOKENS = 4096 | |
| MODEL = SentenceTransformer(MODEL_NAME) | |
| WORKERS = 8 | |
| class DocumentProcessor: | |
| def __init__(self): | |
| self.index = faiss.IndexFlatIP(MODEL.get_sentence_embedding_dimension()) | |
| self.chunks = [] | |
| self.processor_pool = ThreadPoolExecutor(max_workers=WORKERS) | |
| def extract_text_from_pptx(self, file_path): | |
| try: | |
| prs = Presentation(file_path) | |
| return " ".join([shape.text for slide in prs.slides for shape in slide.shapes if hasattr(shape, "text")]) | |
| except Exception as e: | |
| print(f"PPTX Error: {str(e)}") | |
| return "" | |
| def extract_text_from_xls_csv(self, file_path): | |
| try: | |
| if file_path.endswith(('.xls', '.xlsx')): | |
| df = pd.read_excel(file_path) | |
| else: | |
| df = pd.read_csv(file_path) | |
| return " ".join(df.astype(str).values.flatten()) | |
| except Exception as e: | |
| print(f"Spreadsheet Error: {str(e)}") | |
| return "" | |
| def extract_text_from_pdf(self, file_path): | |
| try: | |
| doc = fitz.open(file_path) | |
| return " ".join(page.get_text("text", flags=fitz.TEXT_PRESERVE_WHITESPACE) for page in doc) | |
| except Exception as e: | |
| print(f"PDF Error: {str(e)}") | |
| return "" | |
| def process_file(self, file): | |
| try: | |
| file_path = file.name | |
| print(f"Processing: {file_path}") # Debug print | |
| if file_path.endswith('.pdf'): | |
| text = self.extract_text_from_pdf(file_path) | |
| elif file_path.endswith('.docx'): | |
| text = " ".join(p.text for p in Document(file_path).paragraphs) | |
| elif file_path.endswith('.txt'): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| elif file_path.endswith('.pptx'): | |
| text = self.extract_text_from_pptx(file_path) | |
| elif file_path.endswith(('.xls', '.xlsx', '.csv')): | |
| text = self.extract_text_from_xls_csv(file_path) | |
| else: | |
| return "" | |
| clean_text = re.sub(r'\s+', ' ', text).strip() | |
| print(f"Extracted {len(clean_text)} characters from {file_path}") # Debug | |
| return clean_text | |
| except Exception as e: | |
| print(f"Processing Error: {str(e)}") # Debug | |
| return "" | |
| def semantic_chunking(self, text): | |
| words = re.findall(r'\S+\s*', text) | |
| chunks = [''.join(words[i:i+CHUNK_SIZE//2]) for i in range(0, len(words), CHUNK_SIZE//2)] | |
| return chunks[:] # Limit to 1000 chunks per document | |
| def process_documents(self, files): | |
| self.chunks = [] | |
| if not files: | |
| return "No files uploaded!" | |
| print("\n" + "="*40 + " PROCESSING DOCUMENTS " + "="*40) | |
| texts = list(self.processor_pool.map(self.process_file, files)) | |
| with ThreadPoolExecutor(max_workers=WORKERS) as executor: | |
| chunk_lists = list(executor.map(self.semantic_chunking, texts)) | |
| all_chunks = [chunk for chunk_list in chunk_lists for chunk in chunk_list] | |
| print(f"Total chunks generated: {len(all_chunks)}") # Debug | |
| if not all_chunks: | |
| return "Error: No chunks generated from documents" | |
| try: | |
| embeddings = MODEL.encode( | |
| all_chunks, | |
| batch_size=256, #512 | |
| convert_to_tensor=True, | |
| show_progress_bar=False | |
| ).cpu().numpy().astype('float32') | |
| self.index.reset() | |
| self.index.add(embeddings) | |
| self.chunks = all_chunks | |
| return f"✅ Processed {len(all_chunks)} chunks from {len(files)} files" | |
| except Exception as e: | |
| print(f"Embedding Error: {str(e)}") | |
| return f"Error: {str(e)}" | |
| def query(self, question): | |
| if not self.chunks: | |
| return "Please process documents first", False | |
| try: | |
| print("\n" + "="*40 + " QUERY PROCESSING " + "="*40) | |
| print(f"Question: {question}") | |
| # Generate embedding for the question | |
| question_embedding = MODEL.encode([question], convert_to_tensor=True).cpu().numpy().astype('float32') | |
| # Search FAISS index | |
| _, indices = self.index.search(question_embedding, 3) | |
| print(f"Top indices: {indices}") | |
| # Get context from top chunks | |
| context = "\n".join([self.chunks[i] for i in indices[0] if i < len(self.chunks)]) | |
| print(f"Context length: {len(context)} characters") | |
| # API Call with error handling | |
| headers = { | |
| "Authorization": f"Bearer {GROQ_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "messages": [{ | |
| "role": "user", | |
| "content": f"Answer concisely: {question}\nContext: {context}" | |
| }], | |
| "model": "mixtral-8x7b-32768", | |
| "temperature": 0.3, | |
| "max_tokens": MAX_TOKENS, | |
| "stream": True | |
| } | |
| response = requests.post( | |
| "https://api.groq.com/openai/v1/chat/completions", | |
| headers=headers, | |
| json=payload, | |
| timeout=20 | |
| ) | |
| print(f"API Status Code: {response.status_code}") # Debug | |
| if response.status_code != 200: | |
| return f"API Error: {response.text}", False | |
| full_answer = [] | |
| for chunk in response.iter_lines(): | |
| if chunk: | |
| try: | |
| decoded = chunk.decode('utf-8').strip() | |
| if decoded.startswith('data:'): | |
| data = json.loads(decoded[5:]) | |
| if content := data.get('choices', [{}])[0].get('delta', {}).get('content', ''): | |
| full_answer.append(content) | |
| except Exception as e: | |
| print(f"Chunk Error: {str(e)}") | |
| continue | |
| final_answer = ''.join(full_answer) | |
| print(f"Final Answer: {final_answer}") # Debug | |
| return final_answer, True | |
| except Exception as e: | |
| print(f"Query Error: {str(e)}") # Debug | |
| return f"Error: {str(e)}", False | |
| # Initialize processor | |
| processor = DocumentProcessor() | |
| # Gradio interface with improved error handling | |
| def ask_question(question, chat_history=''): | |
| if not question.strip(): | |
| return chat_history + [("", "Please enter a valid question")] | |
| answer, success = processor.query(question) | |
| return chat_history + [(question, answer)] | |
| with gr.Blocks(title="RAG System") as app: | |
| gr.Markdown("## 🚀 Multi-Format-Reader Chat-Bot") | |
| with gr.Row(): | |
| files = gr.File(file_count="multiple", | |
| file_types=[".pdf", ".docx", ".txt", ".pptx", ".xls", ".xlsx", ".csv"], | |
| label="Upload Documents") | |
| process_btn = gr.Button("Process", variant="primary") | |
| status = gr.Textbox(label="Processing Status", interactive=False) | |
| chatbot = gr.Chatbot(height=500, label="Chat History") | |
| with gr.Row(): | |
| question = gr.Textbox(label="Your Query", | |
| placeholder="Enter your question...", | |
| max_lines=3) | |
| ask_btn = gr.Button("Ask", variant="primary") | |
| clear_btn = gr.Button("Clear Chat") | |
| process_btn.click( | |
| fn=processor.process_documents, | |
| inputs=files, | |
| outputs=status | |
| ) | |
| ask_btn.click( | |
| fn=ask_question, | |
| inputs=[question, chatbot], | |
| outputs=chatbot | |
| ).then(lambda: "", None, question) # Clear input after submission | |
| clear_btn.click( | |
| fn=lambda: [], | |
| inputs=None, | |
| outputs=chatbot | |
| ) | |
| app.launch(share=True, debug=True) | |