msmaje's picture
Update app.py
1ae38bf verified
"""
Qwen2.5 PDF RAG System for Hugging Face Spaces
Adapted for deployment on Hugging Face Spaces with optimizations for the cloud environment.
"""
import os
import time
import gradio as gr
from typing import List, Dict, Any, Tuple
import torch
# LangChain imports - updated to avoid deprecation warnings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.schema import Document
# Transformers for Qwen2.5 models (more compatible with HF Spaces)
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import warnings
warnings.filterwarnings("ignore")
class PDFRagSystem:
"""PDF RAG System using Qwen2.5, ChromaDB, and LangChain - HF Spaces optimized"""
def __init__(self, model_name: str = "Qwen/Qwen2.5-1.5B-Instruct", persist_directory: str = "db"):
"""
Initialize the RAG system
Args:
model_name: Name of the Qwen model to use
persist_directory: Directory to store the vector database
"""
self.model_name = model_name
self.persist_directory = persist_directory
self.pipe = None
self.tokenizer = None
self.model = None
self.vectorstore = None
self.embeddings = None
self.top_sources = []
# Check available device
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {self.device}")
# Initialize embedding model
print("Loading embedding model...")
try:
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": self.device},
encode_kwargs={"normalize_embeddings": True}
)
except Exception as e:
print(f"Warning: Error loading HuggingFaceEmbeddings, trying alternative: {e}")
# Fallback to basic embeddings if HuggingFaceEmbeddings fails
from sentence_transformers import SentenceTransformer
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
self.embeddings = self._create_custom_embeddings()
# Load LLM
self._load_llm()
def _create_custom_embeddings(self):
"""Create custom embeddings wrapper if HuggingFaceEmbeddings fails"""
class CustomEmbeddings:
def __init__(self, model):
self.model = model
def embed_documents(self, texts):
return self.model.encode(texts).tolist()
def embed_query(self, text):
return self.model.encode([text])[0].tolist()
return CustomEmbeddings(self.embedding_model)
def change_model(self, model_name: str) -> str:
"""
Change the LLM model
Args:
model_name: New model name to use
Returns:
Status message
"""
if self.model_name == model_name:
return f"Already using model: {model_name}"
self.model_name = model_name
try:
# Clear GPU memory
if hasattr(self, 'model') and self.model is not None:
del self.model
del self.tokenizer
del self.pipe
if torch.cuda.is_available():
torch.cuda.empty_cache()
self._load_llm()
return f"Successfully switched to model: {model_name}"
except Exception as e:
return f"Error switching model: {str(e)}"
def _load_llm(self):
"""Load the Qwen2.5 model with optimized settings for HF Spaces"""
print(f"\nLoading {self.model_name} model...")
start_time = time.time()
try:
# Load tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(
self.model_name,
trust_remote_code=True
)
# Configure model loading for limited resources
model_kwargs = {
"trust_remote_code": True,
"torch_dtype": torch.float16 if self.device == "cuda" else torch.float32,
"low_cpu_mem_usage": True,
}
if self.device == "cuda":
model_kwargs["device_map"] = "auto"
# Load model
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
**model_kwargs
)
if self.device == "cpu":
self.model = self.model.to(self.device)
# Create pipeline
self.pipe = pipeline(
"text-generation",
model=self.model,
tokenizer=self.tokenizer,
device=0 if self.device == "cuda" else -1,
torch_dtype=torch.float16 if self.device == "cuda" else torch.float32,
return_full_text=False
)
load_time = time.time() - start_time
print(f"Model loaded in {load_time:.2f} seconds")
except Exception as e:
print(f"Error loading model: {e}")
# Fallback to a smaller model if the requested one fails
if "1.5B" not in self.model_name:
print("Falling back to Qwen2.5-1.5B-Instruct...")
self.model_name = "Qwen/Qwen2.5-1.5B-Instruct"
self._load_llm()
else:
raise e
def process_pdf(self, pdf_file: str) -> List[Document]:
"""
Process a PDF file into documents for the vectorstore
Args:
pdf_file: Path to the PDF file
Returns:
List of document chunks
"""
try:
loader = PyPDFLoader(pdf_file)
documents = loader.load()
# Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=800, # Smaller chunks for better performance
chunk_overlap=150,
separators=["\n\n", "\n", ". ", " ", ""]
)
chunks = text_splitter.split_documents(documents)
return chunks
except Exception as e:
print(f"Error processing PDF {pdf_file}: {e}")
return []
def create_vectorstore(self, pdf_files: List[str]) -> str:
"""
Create or update the vector store with documents from PDF files
Args:
pdf_files: List of paths to PDF files
Returns:
Status message
"""
if not pdf_files:
return "No files provided."
all_chunks = []
processed_files = 0
for pdf_file in pdf_files:
if not os.path.exists(pdf_file):
print(f"Warning: File {pdf_file} does not exist. Skipping.")
continue
print(f"Processing {pdf_file}...")
chunks = self.process_pdf(pdf_file)
if chunks:
print(f"Created {len(chunks)} chunks from {pdf_file}")
all_chunks.extend(chunks)
processed_files += 1
else:
print(f"Failed to process {pdf_file}")
if not all_chunks:
return "No documents were successfully processed."
try:
# Create or update vectorstore
if os.path.exists(self.persist_directory) and len(os.listdir(self.persist_directory)) > 0:
print("Loading existing vectorstore...")
self.vectorstore = Chroma(
persist_directory=self.persist_directory,
embedding_function=self.embeddings
)
print("Adding new documents to existing vectorstore...")
self.vectorstore.add_documents(all_chunks)
else:
print("Creating new vectorstore...")
self.vectorstore = Chroma.from_documents(
documents=all_chunks,
embedding=self.embeddings,
persist_directory=self.persist_directory
)
# Persist to disk
self.vectorstore.persist()
return f"Successfully processed {processed_files} PDFs with {len(all_chunks)} chunks."
except Exception as e:
return f"Error creating vectorstore: {str(e)}"
def retrieve_context(self, query: str, k: int = 4) -> Tuple[str, List[Dict]]:
"""
Retrieve relevant context for a query
Args:
query: User query
k: Number of top documents to retrieve
Returns:
Tuple of (concatenated context string, list of source documents)
"""
if not self.vectorstore:
return "", []
try:
# Search for relevant documents
docs_with_scores = self.vectorstore.similarity_search_with_score(query, k=k)
context_parts = []
sources = []
for i, (doc, score) in enumerate(docs_with_scores):
context_part = f"Document {i+1}:\n{doc.page_content}\n"
context_parts.append(context_part)
# Clean metadata for serialization
clean_metadata = {}
for key, value in doc.metadata.items():
str_key = str(key)
if isinstance(value, (str, int, float, bool, type(None))):
clean_metadata[str_key] = value
else:
clean_metadata[str_key] = str(value)
source_info = {
"content": str(doc.page_content),
"metadata": clean_metadata,
"score": float(score),
"source_id": i+1
}
sources.append(source_info)
self.top_sources = sources
context = "\n".join(context_parts)
return context, sources
except Exception as e:
print(f"Error retrieving context: {e}")
return "", []
def generate_response(self, query: str, system_prompt: str = "You are a helpful assistant that answers questions based on the provided documents.") -> str:
"""
Generate a response using RAG
Args:
query: User query
system_prompt: System prompt to set assistant behavior
Returns:
Model response
"""
# Retrieve relevant context
context, _ = self.retrieve_context(query)
if not context:
return "No relevant documents found in the database. Please upload some PDF files first."
# Create RAG prompt
rag_prompt = f"""Based on the following context, please answer the question. If the answer is not in the context, say that you don't know.
Context:
{context}
Question: {query}
Answer:"""
try:
# Generate response
print(f"Running inference for query: {query}")
start_time = time.time()
# Use the pipeline for generation
response = self.pipe(
rag_prompt,
max_new_tokens=300,
temperature=0.7,
top_p=0.9,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
inference_time = time.time() - start_time
print(f"Inference completed in {inference_time:.2f} seconds")
# Extract the generated text
if isinstance(response, list) and len(response) > 0:
result = response[0].get('generated_text', '').strip()
else:
result = str(response).strip()
return result if result else "I couldn't generate a response. Please try again."
except Exception as e:
print(f"Error generating response: {e}")
return f"Error generating response: {str(e)}"
def get_top_sources(self) -> List[Dict]:
"""Get the top sources used for the last query"""
return self.top_sources
class RagUI:
"""Gradio UI for the PDF RAG System - HF Spaces optimized"""
def __init__(self, rag_system: PDFRagSystem):
self.rag_system = rag_system
self.interface = None
# Define available models (optimized for HF Spaces)
self.models = {
"Qwen2.5-1.5B (Recommended)": "Qwen/Qwen2.5-1.5B-Instruct",
"Qwen2.5-3B": "Qwen/Qwen2.5-3B-Instruct"
}
self.current_model = "Qwen2.5-1.5B (Recommended)"
def _upload_files(self, files) -> str:
"""Handle file upload"""
if not files:
return "No files selected."
try:
file_paths = [f.name for f in files]
return self.rag_system.create_vectorstore(file_paths)
except Exception as e:
return f"Error processing files: {str(e)}"
def _switch_model(self, model_name: str) -> str:
"""Switch the model"""
if model_name not in self.models:
return f"Unknown model: {model_name}"
full_model_name = self.models[model_name]
self.current_model = model_name
return self.rag_system.change_model(full_model_name)
def _query(self, query: str, system_prompt: str) -> Tuple[str, str]:
"""Process a query"""
if not query.strip():
return "Please enter a question.", ""
response = self.rag_system.generate_response(query, system_prompt)
sources = self.rag_system.get_top_sources()
sources_html = self._format_source_display(sources)
return response, sources_html
def _format_source_display(self, sources: List[Dict]) -> str:
"""Format sources for display"""
if not sources:
return "<div class='source-container'>No sources available.</div>"
html = "<div class='source-container'>"
for i, source in enumerate(sources):
try:
if not isinstance(source, dict):
continue
metadata = source.get("metadata", {})
if not isinstance(metadata, dict):
metadata = {}
page_num = metadata.get("page", "Unknown")
source_file = metadata.get("source", "Unknown")
content = source.get("content", "No content available")[:500] + "..." # Limit content length
score = source.get("score", 0.0)
source_id = source.get("source_id", i+1)
# Determine relevance class
if score <= 0.5: # Lower is better for distance-based similarity
relevance_class = "relevance-high"
elif score <= 0.8:
relevance_class = "relevance-medium"
else:
relevance_class = "relevance-low"
html += f"""
<div class="source-card">
<div class="source-header">
Source {source_id} (<span class="{relevance_class}">Distance: {score:.2f}</span>)
</div>
<div class="source-meta">
<strong>File:</strong> {os.path.basename(str(source_file))}
</div>
<div class="source-meta">
<strong>Page:</strong> {page_num}
</div>
<div class="source-content">
{content}
</div>
</div>
"""
except Exception as e:
html += f'<div class="source-card">Error displaying source {i+1}: {str(e)}</div>'
html += "</div>"
return html
def build_interface(self):
"""Build the Gradio interface"""
# Custom CSS for better appearance
css = """
.source-container {
max-height: 600px;
overflow-y: auto;
padding: 10px;
}
.source-card {
margin-bottom: 15px;
padding: 12px;
border: 1px solid #ddd;
border-radius: 6px;
background-color: #f8f9fa;
box-shadow: 0 1px 3px rgba(0,0,0,0.1);
}
.source-header {
font-size: 16px;
font-weight: bold;
margin-bottom: 8px;
color: #333;
}
.source-meta {
color: #666;
margin-bottom: 6px;
font-size: 14px;
}
.source-content {
background-color: #fff;
padding: 10px;
border-radius: 4px;
border-left: 3px solid #007bff;
font-family: 'Segoe UI', sans-serif;
line-height: 1.4;
font-size: 14px;
}
.relevance-high { color: #28a745; font-weight: bold; }
.relevance-medium { color: #ffc107; font-weight: bold; }
.relevance-low { color: #dc3545; font-weight: bold; }
"""
with gr.Blocks(title="Qwen2.5 PDF RAG System", css=css) as interface:
gr.Markdown("""
# πŸ€– Qwen2.5 PDF RAG System
Upload PDF documents and ask questions about their content using advanced AI.
**⚑ Powered by Qwen2.5 Language Models**
""")
with gr.Tab("πŸ“š Main Interface"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ”§ Settings")
# Model selection
model_dropdown = gr.Dropdown(
choices=list(self.models.keys()),
value=self.current_model,
label="AI Model",
info="1.5B model recommended for stability"
)
model_switch_btn = gr.Button("πŸ”„ Switch Model", size="sm")
model_status = gr.Textbox(
label="Model Status",
value=f"Using: {self.current_model}",
interactive=False
)
gr.Markdown("### πŸ“„ Upload Documents")
file_input = gr.File(
file_count="multiple",
file_types=[".pdf"],
label="PDF Files"
)
upload_button = gr.Button("πŸ“€ Process PDFs", variant="primary")
upload_status = gr.Textbox(
label="Status",
interactive=False,
placeholder="Upload status will appear here..."
)
with gr.Column(scale=2):
gr.Markdown("### πŸ’¬ Ask Questions")
system_prompt = gr.Textbox(
label="System Instructions",
value="You are a helpful AI assistant. Answer questions based only on the provided documents. Be concise and cite relevant information.",
lines=3
)
query_input = gr.Textbox(
label="Your Question",
placeholder="What would you like to know about your documents?",
lines=2
)
query_button = gr.Button("πŸ” Ask Question", variant="primary")
answer_output = gr.Textbox(
label="Answer",
interactive=False,
lines=8,
placeholder="Answers will appear here..."
)
with gr.Tab("πŸ“– Sources"):
gr.Markdown("### πŸ“š Reference Documents")
gr.Markdown("View the source documents used to generate answers.")
sources_display = gr.HTML(
label="Sources",
value="<p>No sources available yet. Ask a question first!</p>"
)
with gr.Tab("ℹ️ Info"):
gr.Markdown("""
### About This System
This is a **Retrieval-Augmented Generation (RAG)** system that:
- πŸ“€ **Processes PDF documents** and stores them in a vector database
- πŸ” **Searches** for relevant content based on your questions
- πŸ€– **Generates answers** using Qwen2.5 language models
- πŸ“š **Shows sources** so you can verify the information
### Available Models
- **Qwen2.5-1.5B** ⚑ - Fast and efficient (Recommended for HF Spaces)
- **Qwen2.5-3B** 🧠 - More capable but slower
### Tips for Best Results
1. πŸ“„ Upload clear, text-based PDFs (not scanned images)
2. ❓ Ask specific questions rather than broad topics
3. πŸ” Check the "Sources" tab to see what documents were used
4. πŸ”„ Try rephrasing your question if you don't get good results
### Technical Details
- **Vector Store**: ChromaDB with cosine similarity
- **Embeddings**: sentence-transformers/all-MiniLM-L6-v2
- **Chunk Size**: 800 tokens with 150 token overlap
- **Context Window**: Up to 4 most relevant document chunks
""")
# Event handlers
upload_button.click(
fn=self._upload_files,
inputs=[file_input],
outputs=[upload_status]
)
query_button.click(
fn=self._query,
inputs=[query_input, system_prompt],
outputs=[answer_output, sources_display]
)
query_input.submit(
fn=self._query,
inputs=[query_input, system_prompt],
outputs=[answer_output, sources_display]
)
model_switch_btn.click(
fn=self._switch_model,
inputs=[model_dropdown],
outputs=[model_status]
)
self.interface = interface
return interface
def launch(self, **kwargs):
"""Launch the Gradio interface"""
if not self.interface:
self.build_interface()
return self.interface.launch(**kwargs)
# Initialize and launch the application
def main():
"""Main function optimized for Hugging Face Spaces"""
print("πŸš€ Starting Qwen2.5 PDF RAG System...")
print(f"πŸ“± Device: {'GPU' if torch.cuda.is_available() else 'CPU'}")
# Use the lightweight model by default for HF Spaces
model_name = "Qwen/Qwen2.5-1.5B-Instruct"
# Create RAG system
try:
rag_system = PDFRagSystem(model_name, persist_directory="chroma_db")
# Create and launch UI
ui = RagUI(rag_system)
ui.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)
except Exception as e:
print(f"❌ Error starting application: {e}")
# Create a simple error interface
def error_interface():
return "❌ Failed to initialize the RAG system. Please check the logs."
error_app = gr.Interface(
fn=error_interface,
inputs=[],
outputs="text",
title="Error - Qwen2.5 PDF RAG System"
)
error_app.launch()
if __name__ == "__main__":
main()