Spaces:
Sleeping
Sleeping
| """ | |
| Qwen2.5 PDF RAG System for Hugging Face Spaces | |
| Adapted for deployment on Hugging Face Spaces with optimizations for the cloud environment. | |
| """ | |
| import os | |
| import time | |
| import gradio as gr | |
| from typing import List, Dict, Any, Tuple | |
| import torch | |
| # LangChain imports - updated to avoid deprecation warnings | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain.schema import Document | |
| # Transformers for Qwen2.5 models (more compatible with HF Spaces) | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| class PDFRagSystem: | |
| """PDF RAG System using Qwen2.5, ChromaDB, and LangChain - HF Spaces optimized""" | |
| def __init__(self, model_name: str = "Qwen/Qwen2.5-1.5B-Instruct", persist_directory: str = "db"): | |
| """ | |
| Initialize the RAG system | |
| Args: | |
| model_name: Name of the Qwen model to use | |
| persist_directory: Directory to store the vector database | |
| """ | |
| self.model_name = model_name | |
| self.persist_directory = persist_directory | |
| self.pipe = None | |
| self.tokenizer = None | |
| self.model = None | |
| self.vectorstore = None | |
| self.embeddings = None | |
| self.top_sources = [] | |
| # Check available device | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"Using device: {self.device}") | |
| # Initialize embedding model | |
| print("Loading embedding model...") | |
| try: | |
| self.embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| model_kwargs={"device": self.device}, | |
| encode_kwargs={"normalize_embeddings": True} | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Error loading HuggingFaceEmbeddings, trying alternative: {e}") | |
| # Fallback to basic embeddings if HuggingFaceEmbeddings fails | |
| from sentence_transformers import SentenceTransformer | |
| self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| self.embeddings = self._create_custom_embeddings() | |
| # Load LLM | |
| self._load_llm() | |
| def _create_custom_embeddings(self): | |
| """Create custom embeddings wrapper if HuggingFaceEmbeddings fails""" | |
| class CustomEmbeddings: | |
| def __init__(self, model): | |
| self.model = model | |
| def embed_documents(self, texts): | |
| return self.model.encode(texts).tolist() | |
| def embed_query(self, text): | |
| return self.model.encode([text])[0].tolist() | |
| return CustomEmbeddings(self.embedding_model) | |
| def change_model(self, model_name: str) -> str: | |
| """ | |
| Change the LLM model | |
| Args: | |
| model_name: New model name to use | |
| Returns: | |
| Status message | |
| """ | |
| if self.model_name == model_name: | |
| return f"Already using model: {model_name}" | |
| self.model_name = model_name | |
| try: | |
| # Clear GPU memory | |
| if hasattr(self, 'model') and self.model is not None: | |
| del self.model | |
| del self.tokenizer | |
| del self.pipe | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| self._load_llm() | |
| return f"Successfully switched to model: {model_name}" | |
| except Exception as e: | |
| return f"Error switching model: {str(e)}" | |
| def _load_llm(self): | |
| """Load the Qwen2.5 model with optimized settings for HF Spaces""" | |
| print(f"\nLoading {self.model_name} model...") | |
| start_time = time.time() | |
| try: | |
| # Load tokenizer | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.model_name, | |
| trust_remote_code=True | |
| ) | |
| # Configure model loading for limited resources | |
| model_kwargs = { | |
| "trust_remote_code": True, | |
| "torch_dtype": torch.float16 if self.device == "cuda" else torch.float32, | |
| "low_cpu_mem_usage": True, | |
| } | |
| if self.device == "cuda": | |
| model_kwargs["device_map"] = "auto" | |
| # Load model | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| self.model_name, | |
| **model_kwargs | |
| ) | |
| if self.device == "cpu": | |
| self.model = self.model.to(self.device) | |
| # Create pipeline | |
| self.pipe = pipeline( | |
| "text-generation", | |
| model=self.model, | |
| tokenizer=self.tokenizer, | |
| device=0 if self.device == "cuda" else -1, | |
| torch_dtype=torch.float16 if self.device == "cuda" else torch.float32, | |
| return_full_text=False | |
| ) | |
| load_time = time.time() - start_time | |
| print(f"Model loaded in {load_time:.2f} seconds") | |
| except Exception as e: | |
| print(f"Error loading model: {e}") | |
| # Fallback to a smaller model if the requested one fails | |
| if "1.5B" not in self.model_name: | |
| print("Falling back to Qwen2.5-1.5B-Instruct...") | |
| self.model_name = "Qwen/Qwen2.5-1.5B-Instruct" | |
| self._load_llm() | |
| else: | |
| raise e | |
| def process_pdf(self, pdf_file: str) -> List[Document]: | |
| """ | |
| Process a PDF file into documents for the vectorstore | |
| Args: | |
| pdf_file: Path to the PDF file | |
| Returns: | |
| List of document chunks | |
| """ | |
| try: | |
| loader = PyPDFLoader(pdf_file) | |
| documents = loader.load() | |
| # Split documents into chunks | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=800, # Smaller chunks for better performance | |
| chunk_overlap=150, | |
| separators=["\n\n", "\n", ". ", " ", ""] | |
| ) | |
| chunks = text_splitter.split_documents(documents) | |
| return chunks | |
| except Exception as e: | |
| print(f"Error processing PDF {pdf_file}: {e}") | |
| return [] | |
| def create_vectorstore(self, pdf_files: List[str]) -> str: | |
| """ | |
| Create or update the vector store with documents from PDF files | |
| Args: | |
| pdf_files: List of paths to PDF files | |
| Returns: | |
| Status message | |
| """ | |
| if not pdf_files: | |
| return "No files provided." | |
| all_chunks = [] | |
| processed_files = 0 | |
| for pdf_file in pdf_files: | |
| if not os.path.exists(pdf_file): | |
| print(f"Warning: File {pdf_file} does not exist. Skipping.") | |
| continue | |
| print(f"Processing {pdf_file}...") | |
| chunks = self.process_pdf(pdf_file) | |
| if chunks: | |
| print(f"Created {len(chunks)} chunks from {pdf_file}") | |
| all_chunks.extend(chunks) | |
| processed_files += 1 | |
| else: | |
| print(f"Failed to process {pdf_file}") | |
| if not all_chunks: | |
| return "No documents were successfully processed." | |
| try: | |
| # Create or update vectorstore | |
| if os.path.exists(self.persist_directory) and len(os.listdir(self.persist_directory)) > 0: | |
| print("Loading existing vectorstore...") | |
| self.vectorstore = Chroma( | |
| persist_directory=self.persist_directory, | |
| embedding_function=self.embeddings | |
| ) | |
| print("Adding new documents to existing vectorstore...") | |
| self.vectorstore.add_documents(all_chunks) | |
| else: | |
| print("Creating new vectorstore...") | |
| self.vectorstore = Chroma.from_documents( | |
| documents=all_chunks, | |
| embedding=self.embeddings, | |
| persist_directory=self.persist_directory | |
| ) | |
| # Persist to disk | |
| self.vectorstore.persist() | |
| return f"Successfully processed {processed_files} PDFs with {len(all_chunks)} chunks." | |
| except Exception as e: | |
| return f"Error creating vectorstore: {str(e)}" | |
| def retrieve_context(self, query: str, k: int = 4) -> Tuple[str, List[Dict]]: | |
| """ | |
| Retrieve relevant context for a query | |
| Args: | |
| query: User query | |
| k: Number of top documents to retrieve | |
| Returns: | |
| Tuple of (concatenated context string, list of source documents) | |
| """ | |
| if not self.vectorstore: | |
| return "", [] | |
| try: | |
| # Search for relevant documents | |
| docs_with_scores = self.vectorstore.similarity_search_with_score(query, k=k) | |
| context_parts = [] | |
| sources = [] | |
| for i, (doc, score) in enumerate(docs_with_scores): | |
| context_part = f"Document {i+1}:\n{doc.page_content}\n" | |
| context_parts.append(context_part) | |
| # Clean metadata for serialization | |
| clean_metadata = {} | |
| for key, value in doc.metadata.items(): | |
| str_key = str(key) | |
| if isinstance(value, (str, int, float, bool, type(None))): | |
| clean_metadata[str_key] = value | |
| else: | |
| clean_metadata[str_key] = str(value) | |
| source_info = { | |
| "content": str(doc.page_content), | |
| "metadata": clean_metadata, | |
| "score": float(score), | |
| "source_id": i+1 | |
| } | |
| sources.append(source_info) | |
| self.top_sources = sources | |
| context = "\n".join(context_parts) | |
| return context, sources | |
| except Exception as e: | |
| print(f"Error retrieving context: {e}") | |
| return "", [] | |
| def generate_response(self, query: str, system_prompt: str = "You are a helpful assistant that answers questions based on the provided documents.") -> str: | |
| """ | |
| Generate a response using RAG | |
| Args: | |
| query: User query | |
| system_prompt: System prompt to set assistant behavior | |
| Returns: | |
| Model response | |
| """ | |
| # Retrieve relevant context | |
| context, _ = self.retrieve_context(query) | |
| if not context: | |
| return "No relevant documents found in the database. Please upload some PDF files first." | |
| # Create RAG prompt | |
| rag_prompt = f"""Based on the following context, please answer the question. If the answer is not in the context, say that you don't know. | |
| Context: | |
| {context} | |
| Question: {query} | |
| Answer:""" | |
| try: | |
| # Generate response | |
| print(f"Running inference for query: {query}") | |
| start_time = time.time() | |
| # Use the pipeline for generation | |
| response = self.pipe( | |
| rag_prompt, | |
| max_new_tokens=300, | |
| temperature=0.7, | |
| top_p=0.9, | |
| do_sample=True, | |
| pad_token_id=self.tokenizer.eos_token_id | |
| ) | |
| inference_time = time.time() - start_time | |
| print(f"Inference completed in {inference_time:.2f} seconds") | |
| # Extract the generated text | |
| if isinstance(response, list) and len(response) > 0: | |
| result = response[0].get('generated_text', '').strip() | |
| else: | |
| result = str(response).strip() | |
| return result if result else "I couldn't generate a response. Please try again." | |
| except Exception as e: | |
| print(f"Error generating response: {e}") | |
| return f"Error generating response: {str(e)}" | |
| def get_top_sources(self) -> List[Dict]: | |
| """Get the top sources used for the last query""" | |
| return self.top_sources | |
| class RagUI: | |
| """Gradio UI for the PDF RAG System - HF Spaces optimized""" | |
| def __init__(self, rag_system: PDFRagSystem): | |
| self.rag_system = rag_system | |
| self.interface = None | |
| # Define available models (optimized for HF Spaces) | |
| self.models = { | |
| "Qwen2.5-1.5B (Recommended)": "Qwen/Qwen2.5-1.5B-Instruct", | |
| "Qwen2.5-3B": "Qwen/Qwen2.5-3B-Instruct" | |
| } | |
| self.current_model = "Qwen2.5-1.5B (Recommended)" | |
| def _upload_files(self, files) -> str: | |
| """Handle file upload""" | |
| if not files: | |
| return "No files selected." | |
| try: | |
| file_paths = [f.name for f in files] | |
| return self.rag_system.create_vectorstore(file_paths) | |
| except Exception as e: | |
| return f"Error processing files: {str(e)}" | |
| def _switch_model(self, model_name: str) -> str: | |
| """Switch the model""" | |
| if model_name not in self.models: | |
| return f"Unknown model: {model_name}" | |
| full_model_name = self.models[model_name] | |
| self.current_model = model_name | |
| return self.rag_system.change_model(full_model_name) | |
| def _query(self, query: str, system_prompt: str) -> Tuple[str, str]: | |
| """Process a query""" | |
| if not query.strip(): | |
| return "Please enter a question.", "" | |
| response = self.rag_system.generate_response(query, system_prompt) | |
| sources = self.rag_system.get_top_sources() | |
| sources_html = self._format_source_display(sources) | |
| return response, sources_html | |
| def _format_source_display(self, sources: List[Dict]) -> str: | |
| """Format sources for display""" | |
| if not sources: | |
| return "<div class='source-container'>No sources available.</div>" | |
| html = "<div class='source-container'>" | |
| for i, source in enumerate(sources): | |
| try: | |
| if not isinstance(source, dict): | |
| continue | |
| metadata = source.get("metadata", {}) | |
| if not isinstance(metadata, dict): | |
| metadata = {} | |
| page_num = metadata.get("page", "Unknown") | |
| source_file = metadata.get("source", "Unknown") | |
| content = source.get("content", "No content available")[:500] + "..." # Limit content length | |
| score = source.get("score", 0.0) | |
| source_id = source.get("source_id", i+1) | |
| # Determine relevance class | |
| if score <= 0.5: # Lower is better for distance-based similarity | |
| relevance_class = "relevance-high" | |
| elif score <= 0.8: | |
| relevance_class = "relevance-medium" | |
| else: | |
| relevance_class = "relevance-low" | |
| html += f""" | |
| <div class="source-card"> | |
| <div class="source-header"> | |
| Source {source_id} (<span class="{relevance_class}">Distance: {score:.2f}</span>) | |
| </div> | |
| <div class="source-meta"> | |
| <strong>File:</strong> {os.path.basename(str(source_file))} | |
| </div> | |
| <div class="source-meta"> | |
| <strong>Page:</strong> {page_num} | |
| </div> | |
| <div class="source-content"> | |
| {content} | |
| </div> | |
| </div> | |
| """ | |
| except Exception as e: | |
| html += f'<div class="source-card">Error displaying source {i+1}: {str(e)}</div>' | |
| html += "</div>" | |
| return html | |
| def build_interface(self): | |
| """Build the Gradio interface""" | |
| # Custom CSS for better appearance | |
| css = """ | |
| .source-container { | |
| max-height: 600px; | |
| overflow-y: auto; | |
| padding: 10px; | |
| } | |
| .source-card { | |
| margin-bottom: 15px; | |
| padding: 12px; | |
| border: 1px solid #ddd; | |
| border-radius: 6px; | |
| background-color: #f8f9fa; | |
| box-shadow: 0 1px 3px rgba(0,0,0,0.1); | |
| } | |
| .source-header { | |
| font-size: 16px; | |
| font-weight: bold; | |
| margin-bottom: 8px; | |
| color: #333; | |
| } | |
| .source-meta { | |
| color: #666; | |
| margin-bottom: 6px; | |
| font-size: 14px; | |
| } | |
| .source-content { | |
| background-color: #fff; | |
| padding: 10px; | |
| border-radius: 4px; | |
| border-left: 3px solid #007bff; | |
| font-family: 'Segoe UI', sans-serif; | |
| line-height: 1.4; | |
| font-size: 14px; | |
| } | |
| .relevance-high { color: #28a745; font-weight: bold; } | |
| .relevance-medium { color: #ffc107; font-weight: bold; } | |
| .relevance-low { color: #dc3545; font-weight: bold; } | |
| """ | |
| with gr.Blocks(title="Qwen2.5 PDF RAG System", css=css) as interface: | |
| gr.Markdown(""" | |
| # π€ Qwen2.5 PDF RAG System | |
| Upload PDF documents and ask questions about their content using advanced AI. | |
| **β‘ Powered by Qwen2.5 Language Models** | |
| """) | |
| with gr.Tab("π Main Interface"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π§ Settings") | |
| # Model selection | |
| model_dropdown = gr.Dropdown( | |
| choices=list(self.models.keys()), | |
| value=self.current_model, | |
| label="AI Model", | |
| info="1.5B model recommended for stability" | |
| ) | |
| model_switch_btn = gr.Button("π Switch Model", size="sm") | |
| model_status = gr.Textbox( | |
| label="Model Status", | |
| value=f"Using: {self.current_model}", | |
| interactive=False | |
| ) | |
| gr.Markdown("### π Upload Documents") | |
| file_input = gr.File( | |
| file_count="multiple", | |
| file_types=[".pdf"], | |
| label="PDF Files" | |
| ) | |
| upload_button = gr.Button("π€ Process PDFs", variant="primary") | |
| upload_status = gr.Textbox( | |
| label="Status", | |
| interactive=False, | |
| placeholder="Upload status will appear here..." | |
| ) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π¬ Ask Questions") | |
| system_prompt = gr.Textbox( | |
| label="System Instructions", | |
| value="You are a helpful AI assistant. Answer questions based only on the provided documents. Be concise and cite relevant information.", | |
| lines=3 | |
| ) | |
| query_input = gr.Textbox( | |
| label="Your Question", | |
| placeholder="What would you like to know about your documents?", | |
| lines=2 | |
| ) | |
| query_button = gr.Button("π Ask Question", variant="primary") | |
| answer_output = gr.Textbox( | |
| label="Answer", | |
| interactive=False, | |
| lines=8, | |
| placeholder="Answers will appear here..." | |
| ) | |
| with gr.Tab("π Sources"): | |
| gr.Markdown("### π Reference Documents") | |
| gr.Markdown("View the source documents used to generate answers.") | |
| sources_display = gr.HTML( | |
| label="Sources", | |
| value="<p>No sources available yet. Ask a question first!</p>" | |
| ) | |
| with gr.Tab("βΉοΈ Info"): | |
| gr.Markdown(""" | |
| ### About This System | |
| This is a **Retrieval-Augmented Generation (RAG)** system that: | |
| - π€ **Processes PDF documents** and stores them in a vector database | |
| - π **Searches** for relevant content based on your questions | |
| - π€ **Generates answers** using Qwen2.5 language models | |
| - π **Shows sources** so you can verify the information | |
| ### Available Models | |
| - **Qwen2.5-1.5B** β‘ - Fast and efficient (Recommended for HF Spaces) | |
| - **Qwen2.5-3B** π§ - More capable but slower | |
| ### Tips for Best Results | |
| 1. π Upload clear, text-based PDFs (not scanned images) | |
| 2. β Ask specific questions rather than broad topics | |
| 3. π Check the "Sources" tab to see what documents were used | |
| 4. π Try rephrasing your question if you don't get good results | |
| ### Technical Details | |
| - **Vector Store**: ChromaDB with cosine similarity | |
| - **Embeddings**: sentence-transformers/all-MiniLM-L6-v2 | |
| - **Chunk Size**: 800 tokens with 150 token overlap | |
| - **Context Window**: Up to 4 most relevant document chunks | |
| """) | |
| # Event handlers | |
| upload_button.click( | |
| fn=self._upload_files, | |
| inputs=[file_input], | |
| outputs=[upload_status] | |
| ) | |
| query_button.click( | |
| fn=self._query, | |
| inputs=[query_input, system_prompt], | |
| outputs=[answer_output, sources_display] | |
| ) | |
| query_input.submit( | |
| fn=self._query, | |
| inputs=[query_input, system_prompt], | |
| outputs=[answer_output, sources_display] | |
| ) | |
| model_switch_btn.click( | |
| fn=self._switch_model, | |
| inputs=[model_dropdown], | |
| outputs=[model_status] | |
| ) | |
| self.interface = interface | |
| return interface | |
| def launch(self, **kwargs): | |
| """Launch the Gradio interface""" | |
| if not self.interface: | |
| self.build_interface() | |
| return self.interface.launch(**kwargs) | |
| # Initialize and launch the application | |
| def main(): | |
| """Main function optimized for Hugging Face Spaces""" | |
| print("π Starting Qwen2.5 PDF RAG System...") | |
| print(f"π± Device: {'GPU' if torch.cuda.is_available() else 'CPU'}") | |
| # Use the lightweight model by default for HF Spaces | |
| model_name = "Qwen/Qwen2.5-1.5B-Instruct" | |
| # Create RAG system | |
| try: | |
| rag_system = PDFRagSystem(model_name, persist_directory="chroma_db") | |
| # Create and launch UI | |
| ui = RagUI(rag_system) | |
| ui.launch( | |
| share=True, | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True | |
| ) | |
| except Exception as e: | |
| print(f"β Error starting application: {e}") | |
| # Create a simple error interface | |
| def error_interface(): | |
| return "β Failed to initialize the RAG system. Please check the logs." | |
| error_app = gr.Interface( | |
| fn=error_interface, | |
| inputs=[], | |
| outputs="text", | |
| title="Error - Qwen2.5 PDF RAG System" | |
| ) | |
| error_app.launch() | |
| if __name__ == "__main__": | |
| main() |