Spaces:
Build error
Build error
| import gradio as gr | |
| import fitz | |
| import numpy as np | |
| import requests | |
| import faiss | |
| import re | |
| import json | |
| import pandas as pd | |
| from docx import Document | |
| from pptx import Presentation | |
| from sentence_transformers import SentenceTransformer | |
| from concurrent.futures import ThreadPoolExecutor | |
| import os | |
| # os.environ.get() | |
| # Configuration - Get API key from environment variables | |
| GEMINI_API_KEY = "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" | |
| MODEL_NAME = "all-MiniLM-L6-v2" | |
| CHUNK_SIZE = 1024 | |
| MAX_TOKENS = 4096 | |
| WORKERS = 8 | |
| # Initialize model with error handling | |
| try: | |
| MODEL = SentenceTransformer(MODEL_NAME, device='cpu') | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to initialize model: {str(e)}") | |
| class DocumentProcessor: | |
| def __init__(self): | |
| self.index = faiss.IndexFlatIP(MODEL.get_sentence_embedding_dimension()) | |
| self.chunks = [] | |
| self.processor_pool = ThreadPoolExecutor(max_workers=WORKERS) | |
| # File processing methods | |
| def extract_text_from_pptx(self, file_path): | |
| try: | |
| prs = Presentation(file_path) | |
| return " ".join([shape.text for slide in prs.slides for shape in slide.shapes if hasattr(shape, "text")]) | |
| except Exception as e: | |
| print(f"PPTX Error: {str(e)}") | |
| return "" | |
| def extract_text_from_xls_csv(self, file_path): | |
| try: | |
| if file_path.endswith(('.xls', '.xlsx')): | |
| df = pd.read_excel(file_path) | |
| else: | |
| df = pd.read_csv(file_path) | |
| return " ".join(df.astype(str).values.flatten()) | |
| except Exception as e: | |
| print(f"Spreadsheet Error: {str(e)}") | |
| return "" | |
| def extract_text_from_pdf(self, file_path): | |
| try: | |
| doc = fitz.open(file_path) | |
| return " ".join(page.get_text("text", flags=fitz.TEXT_PRESERVE_WHITESPACE) for page in doc) | |
| except Exception as e: | |
| print(f"PDF Error: {str(e)}") | |
| return "" | |
| def process_file(self, file): | |
| try: | |
| file_path = file.name | |
| print(f"Processing: {file_path}") | |
| if file_path.endswith('.pdf'): | |
| text = self.extract_text_from_pdf(file_path) | |
| elif file_path.endswith('.docx'): | |
| text = " ".join(p.text for p in Document(file_path).paragraphs) | |
| elif file_path.endswith('.txt'): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| elif file_path.endswith('.pptx'): | |
| text = self.extract_text_from_pptx(file_path) | |
| elif file_path.endswith(('.xls', '.xlsx', '.csv')): | |
| text = self.extract_text_from_xls_csv(file_path) | |
| else: | |
| return "" | |
| clean_text = re.sub(r'\s+', ' ', text).strip() | |
| print(f"Extracted {len(clean_text)} characters from {file_path}") | |
| return clean_text | |
| except Exception as e: | |
| print(f"Processing Error: {str(e)}") | |
| return "" | |
| def semantic_chunking(self, text): | |
| words = re.findall(r'\S+\s*', text) | |
| chunks = [''.join(words[i:i+CHUNK_SIZE//2]) for i in range(0, len(words), CHUNK_SIZE//2)] | |
| return chunks[:1000] | |
| def process_documents(self, files): | |
| self.chunks = [] | |
| if not files: | |
| return "No files uploaded!" | |
| print("\n" + "="*40 + " PROCESSING DOCUMENTS " + "="*40) | |
| texts = list(self.processor_pool.map(self.process_file, files)) | |
| with ThreadPoolExecutor(max_workers=WORKERS) as executor: | |
| chunk_lists = list(executor.map(self.semantic_chunking, texts)) | |
| all_chunks = [chunk for chunk_list in chunk_lists for chunk in chunk_list] | |
| print(f"Total chunks generated: {len(all_chunks)}") | |
| if not all_chunks: | |
| return "Error: No chunks generated from documents" | |
| try: | |
| embeddings = MODEL.encode( | |
| all_chunks, | |
| batch_size=256, | |
| convert_to_tensor=True, | |
| show_progress_bar=False | |
| ).cpu().numpy().astype('float32') | |
| self.index.reset() | |
| self.index.add(embeddings) | |
| self.chunks = all_chunks | |
| return f"✅ Processed {len(all_chunks)} chunks from {len(files)} files" | |
| except Exception as e: | |
| print(f"Embedding Error: {str(e)}") | |
| return f"Error: {str(e)}" | |
| def query(self, question): | |
| if not self.chunks: | |
| return "Please process documents first", False | |
| try: | |
| print("\n" + "="*40 + " QUERY PROCESSING " + "="*40) | |
| print(f"Question: {question}") | |
| # Generate embedding for the question | |
| question_embedding = MODEL.encode([question], convert_to_tensor=True).cpu().numpy().astype('float32') | |
| # Search FAISS index | |
| _, indices = self.index.search(question_embedding, 3) | |
| print(f"Top indices: {indices}") | |
| # Get context from top chunks | |
| context = "\n".join([self.chunks[i] for i in indices[0] if i < len(self.chunks)]) | |
| print(f"Context length: {len(context)} characters") | |
| # Gemini API Call | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent?key={GEMINI_API_KEY}" | |
| headers = {"Content-Type": "application/json"} | |
| payload = { | |
| "contents": [{ | |
| "parts": [{ | |
| "text": f"Answer concisely based on this context: {context}\n\nQuestion: {question}" | |
| }] | |
| }], | |
| "generationConfig": { | |
| "temperature": 0.3, | |
| "maxOutputTokens": MAX_TOKENS | |
| } | |
| } | |
| response = requests.post( | |
| url, | |
| headers=headers, | |
| json=payload, | |
| timeout=20 | |
| ) | |
| if response.status_code != 200: | |
| return f"API Error: {response.text}", False | |
| # Parse response | |
| try: | |
| response_json = response.json() | |
| final_answer = response_json['candidates'][0]['content']['parts'][0]['text'] | |
| except (KeyError, IndexError) as e: | |
| print(f"Response parsing error: {str(e)}") | |
| return "Error: Could not parse API response", False | |
| return final_answer, True | |
| except Exception as e: | |
| print(f"Query Error: {str(e)}") | |
| return f"Error: {str(e)}", False | |
| # Initialize processor | |
| processor = DocumentProcessor() | |
| # Gradio interface with improved error handling | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Document Chatbot") as app: | |
| gr.Markdown("## Multi-Format Document Chatbot") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| files = gr.File( | |
| file_count="multiple", | |
| file_types=[".pdf", ".docx", ".txt", ".pptx", ".xls", ".xlsx", ".csv"], | |
| label="Upload Documents" | |
| ) | |
| process_btn = gr.Button("Process Documents", variant="primary") | |
| status = gr.Textbox(label="Processing Status") | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot(height=500, label="Chat History") | |
| question = gr.Textbox( | |
| label="Ask a question", | |
| placeholder="Type your question here...", | |
| max_lines=3 | |
| ) | |
| with gr.Row(): | |
| ask_btn = gr.Button("Ask", variant="primary") | |
| clear_btn = gr.Button("Clear Chat") | |
| process_btn.click( | |
| fn=processor.process_documents, | |
| inputs=files, | |
| outputs=status | |
| ) | |
| def ask_question(question, chat_history): | |
| if not question.strip(): | |
| return chat_history + [("", "Please enter a valid question")] | |
| answer, success = processor.query(question) | |
| return chat_history + [(question, answer)] | |
| ask_btn.click( | |
| fn=ask_question, | |
| inputs=[question, chatbot], | |
| outputs=chatbot | |
| ).then(lambda: "", None, question) | |
| clear_btn.click( | |
| fn=lambda: [], | |
| inputs=None, | |
| outputs=chatbot | |
| ) | |
| if __name__ == "__main__": | |
| app.launch(debug=True) | |