MARAdvancedRag / app.py
AliceRolan's picture
Update app.py
df52d5f verified
import gradio as gr
from huggingface_hub import InferenceClient
import os
import torch
import transformers
from tensorflow import keras
from transformers import AutoTokenizer, pipeline, AutoModelForSeq2SeqLM,AutoModelForCausalLM
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser
from langchain.llms import HuggingFacePipeline
import gradio as gr
from sentence_transformers import SentenceTransformer
import chromadb
from rank_bm25 import BM25Okapi
import nltk
from collections import deque
from sentence_transformers import CrossEncoder
from transformers import T5Tokenizer, T5ForConditionalGeneration
import numpy as np
from guardrails.validators import Validator, register_validator, ValidationResult, FailResult, PassResult
from presidio_analyzer import AnalyzerEngine
from presidio_analyzer.nlp_engine import SpacyNlpEngine, NlpEngineProvider
from better_profanity import profanity
import inflection
from presidio_analyzer import PatternRecognizer, Pattern
from guardrails import Guard
import re
from bs4 import BeautifulSoup
import warnings
# Suppress all warnings
warnings.filterwarnings("ignore")
from rank_bm25 import BM25Okapi
nltk.download("punkt")
nltk.download("punkt_tab")
"""
For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference
"""
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
print("GPU Available:", torch.cuda.is_available())
print("GPU Name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU Found")
"""# 2. Data Collection & Preprocessing"""
pdf_files = ["Apple-10K-2023.pdf", "Apple-10K-2024.pdf"]
"""### πŸ“Œ Step 1: Load Multiple 10-K Financial Report PDFs
This step loads multiple financial reports (10-K filings) from PDFs stored in Google Drive.
- It initializes a list of PDF file paths.
- Each PDF is processed using `PyPDFLoader` to extract text content.
- The extracted documents are combined into a single list (`all_documents`) for further processing.
- This ensures that all relevant financial data is available for retrieval in the RAG pipeline.
<p> Here each split will also have a metadata defining the location of the chunk in the actual document for citation,also other details post cleaning (Removing extra spaces, newlines and html tags) the text load from the PDF Contents
"""
all_documents = []
def preprocess_text(text):
# Remove HTML tags
text = BeautifulSoup(text, "html.parser").get_text()
# Remove extra whitespace and newlines
text = re.sub(r'\s+', ' ', text).strip()
return text
for pdf_path in pdf_files:
loader = PyPDFLoader(pdf_path)
documents = loader.load()
for doc in documents:
doc.page_content = preprocess_text(doc.page_content)
all_documents.extend(documents)
"""### πŸ“Œ Step 2: Adaptive Chunking with Different Sizes
Uses different chunk sizes (500 & 1000) and latesr selects the best one dynamically
"""
chunk_sizes = [500, 1000]
# Create chunk dictionaries
chunked_texts = {}
# Generate chunks for each chunk size
for size in chunk_sizes:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=size, chunk_overlap=int(size * 0.2), length_function=len
)
chunked_texts[size] = text_splitter.split_documents(all_documents)
print(f"πŸ”Ή Chunk size {size}: {len(chunked_texts[size])} chunks")
"""### πŸ“Œ Step 2: Create Embeddings using Sentence Transformers"""
# Load embedding model
embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
# Generate embeddings for both chunk sizes
embeddings_dict = {
size: embedding_model.encode(
[doc.page_content for doc in chunked_texts[size]], convert_to_numpy=True
)
for size in chunk_sizes
}
print("βœ… Generated embeddings for all chunk sizes.")
"""### πŸ“Œ Step 3: Store and retrieve using a basic vector database"""
# Initialize ChromaDB client
chroma_client = chromadb.PersistentClient(path="./chroma_financials")
# Create collections for each chunk size
collections = {
size: chroma_client.get_or_create_collection(name=f"financial_chunks_{size}")
for size in chunk_sizes
}
# Insert chunks into ChromaDB
for size in chunk_sizes:
texts = [doc.page_content for doc in chunked_texts[size]]
embeddings = embeddings_dict[size]
for idx, (chunk, embedding) in enumerate(zip(texts, embeddings)):
collections[size].add(
ids=[f"{size}_{idx}"],
embeddings=[embedding.tolist()],
metadatas=[{"text": chunk}]
)
print("βœ… Stored all chunk sizes in ChromaDB.")
"""# 3. Advanced RAG Implementation
### πŸ“Œ Step 1: BM25 for keyword-based search alongside embeddings
"""
# Tokenize all documents
bm25_corpus = {
size: [nltk.word_tokenize(doc.page_content.lower()) for doc in chunked_texts[size]]
for size in chunk_sizes
}
# Create BM25 index
bm25_models = {
size: BM25Okapi(bm25_corpus[size]) for size in chunk_sizes
}
print("βœ… Initialized BM25 models.")
"""### πŸ“Œ Step 2: Memory-Augmented Retrieval with Hybrid Search"""
# Memory store for past queries
memory_store = deque(maxlen=10) # Stores last 10 queries
# Memomry Augumented Retrievel
def memory_augmented_retrieval(query):
# Check memory store with correct unpacking
for stored_query, stored_data in memory_store:
if stored_query == query:
stored_size, stored_results = stored_data
print("πŸ”Ή Retrieved from memory store.")
return {stored_size: stored_results}
else:
return None
# Hybrid search with Memomry Augumented Retrievel
def hybrid_search(query, top_k=5):
# Check memory store with correct unpacking
mar_result = memory_augmented_retrieval(query)
if mar_result != None:
return mar_result
# Encode query for embeddings
query_embedding = embedding_model.encode(query).tolist()
# Perform search for both chunk sizes
results = {}
for size in chunk_sizes:
# BM25 Retrieval
tokenized_query = nltk.word_tokenize(query.lower())
bm25_scores = bm25_models[size].get_scores(tokenized_query)
bm25_top_idxs = sorted(range(len(bm25_scores)), key=lambda i: bm25_scores[i], reverse=True)[:top_k]
bm25_results = [chunked_texts[size][idx].page_content for idx in bm25_top_idxs]
# Embedding Retrieval
retrieved = collections[size].query(query_embeddings=[query_embedding], n_results=top_k)
embedding_results = [item["text"] for item in retrieved["metadatas"][0]]
# Merge BM25 + Embedding results
combined_results = list(set(bm25_results + embedding_results))
results[size] = combined_results # βœ… Ensure results is a dictionary
return results # βœ… Return results as a dictionary
"""### πŸ“Œ Step 3: Testing different chunk sizes & retrieval methods for better accuracy"""
def score_retrieval_results(results):
"""
Score retrieval results based on text length, keyword coverage, and diversity.
Higher score means better retrieval quality.
"""
scores = {}
for size, texts in results.items():
total_length = sum(len(txt) for txt in texts) # Longer retrieved text is better
unique_chunks = len(set(texts)) # More unique chunks = better diversity
# Assign a heuristic score (you can improve this with LLM-based re-ranking)
scores[size] = total_length + (unique_chunks * 10) # Weight uniqueness higher
print(f"Retrieval Scores:{scores}")
return scores
def search_with_dynamic_chunk(query, top_k=5):
results = hybrid_search(query, top_k)
# Score and select the best chunk size dynamically
scores = score_retrieval_results(results)
best_size = max(scores, key=scores.get) # Select chunk size with highest score
# Store in memory with correct format
memory_store.append((query, (best_size, results[best_size])))
return best_size, results[best_size]
query = "Apple's revenue in 2023"
best_chunk_size, retrieved_docs = search_with_dynamic_chunk(query)
# Display results
print(f"πŸ”Ή Best Chunk Size Selected: {best_chunk_size}")
print("πŸ”Ž Retrieved Results:",len(retrieved_docs))
"""### πŸ“Œ Step 4: Re-Ranking with Cross-Encoders"""
# Load re-ranking model
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
def rerank(query, retrieved_docs):
query_doc_pairs = [[query, doc] for doc in retrieved_docs]
scores = reranker.predict(query_doc_pairs)
ranked_docs = [doc for _, doc in sorted(zip(scores, retrieved_docs), reverse=True)]
return ranked_docs
# Apply re-ranking
reranked_docs = rerank(query, retrieved_docs)
print("Re-ranked Documents:", reranked_docs)
print(retrieved_docs)
"""### πŸ“Œ Step 5: Load and Configure the Language Model for Text Generation
This step initializes and configures the **FLAN-T5-Large** model for text generation.
- **Model Selection**:
- The `"google/flan-t5-large"` model is chosen, which is fine-tuned for instruction-following tasks.
- This model is well-suited for answering questions based on retrieved context.
"""
# Load Flan-T5 model
model_name = "google/flan-t5-large"
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name)
def generate_answer_with_confidence(context, query):
"""Generate an answer using Flan-T5 and compute confidence score."""
prompt = f"Given the following financial context, answer the question:\n\nContext:\n{context}\n\nQuestion: {query}"
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512)
with torch.no_grad():
outputs = model.generate(**inputs, max_length=200, return_dict_in_generate=True, output_scores=True)
generated_text = tokenizer.decode(outputs.sequences[0], skip_special_tokens=True)
# Compute confidence score from token probabilities
probs = torch.stack(outputs.scores).softmax(dim=-1)
token_confidences = probs.max(dim=-1).values.mean().item()
return generated_text, token_confidences
"""### πŸ“Œ Step 6: Confidence score retrieval"""
def compute_retrieval_confidence(query, retrieved_docs, best_chunk_size):
"""Compute retrieval confidence based on BM25 and embedding similarity."""
# BM25 Score Normalization
tokenized_query = nltk.word_tokenize(query.lower())
bm25_scores = np.array(bm25_models[best_chunk_size].get_scores(tokenized_query))
bm25_confidence = np.mean(bm25_scores) / max(bm25_scores) # Normalize
# Embedding Similarity Confidence
query_embedding = embedding_model.encode(query)
retrieved_embeddings = [embedding_model.encode(doc) for doc in retrieved_docs]
similarities = [np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb))
for emb in retrieved_embeddings]
embedding_confidence = np.mean(similarities)
# Combined retrieval confidence (weighted sum)
retrieval_confidence = 0.5 * bm25_confidence + 0.5 * embedding_confidence
return retrieval_confidence
"""# 4. Guard Rail Implementation
### πŸ“Œ Step 1: Input-Side: Validate and filter user queries to prevent irrelevant/harmful inputs
"""
## GuardRail validators
# Define NLP Configuration with lang_code
nlp_configuration = {
"nlp_engine_name": "spacy",
"models": [{"lang_code": "en", "model_name": "en_core_web_lg"}],
}
# Define SSN Pattern
ssn_regex = r"\b\d{3}-\d{2}-\d{4}\b" # Matches US SSN format (123-45-6789)
ssn_pattern = Pattern(name="SSN Pattern", regex=ssn_regex, score=0.85) # Score between 0-1
# Create Custom SSN Recognizer
ssn_recognizer = PatternRecognizer(supported_entity="SSN", patterns=[ssn_pattern])
analyzer = AnalyzerEngine()
analyzer.registry.add_recognizer(ssn_recognizer)
@register_validator(name="custom_pii_detector", data_type="string")
class CustomPIIDetector(Validator):
def validate(self, value, metadata={}) -> ValidationResult:
# Analyze text for PII
results = analyzer.analyze(text=value, entities=["PHONE_NUMBER", "EMAIL_ADDRESS", "CREDIT_CARD", "SSN"], language="en")
if results:
detected_entities = ", ".join(set([res.entity_type for res in results]))
return FailResult(
error_message=f"Query contains PII: {detected_entities}."
)
return PassResult()
# Custom Profanity Detector using better-profanity
@register_validator(name="custom_profanity_detector", data_type="string")
class CustomProfanityDetector(Validator):
def validate(self, value, metadata={}) -> ValidationResult:
if profanity.contains_profanity(value):
return FailResult(
error_message="Query contains profanity."
)
return PassResult()
# Custom Relevance Validator for Finance and Apple-related Queries
@register_validator(name="custom_relevance_detector", data_type="string")
class CustomRelevanceDetector(Validator):
def validate(self, value, metadata={}) -> ValidationResult:
finance_keywords = {"revenue", "profit", "expenses", "balance sheet", "earnings", "financial", "investment", "dividends", "assets", "liabilities", "cash flow", "loss","turnover"}
apple_keywords = {"apple", "iphone", "macbook", "tim cook", "apple inc", "ios", "mac", "ipad"}
text_lower = value.lower()
# Check if any finance-related or Apple-related keyword appears in the query
if not any(keyword in text_lower for keyword in (finance_keywords | apple_keywords)):
return FailResult(
error_message="Query is not related to finance or Apple."
)
return PassResult()
guard = Guard().use(CustomPIIDetector).use(CustomProfanityDetector).use(CustomRelevanceDetector)
"""# 3. Testing & Validation
### πŸ“Œ Step 1: Test with simple Financial Questions
"""
def to_camel_case(text):
"""Convert normal text to camelCase using inflection package."""
camel_text = inflection.camelize(text, uppercase_first_letter=True)
return camel_text
def execute_rag_query_with_confidence(query):
"""Run RAG with confidence scoring."""
## implement input side guardrail
try:
res = guard.validate(query)
except Exception as e:
return f"❌ Guardrail {str(e)}"
best_chunk_size, retrieved_docs = search_with_dynamic_chunk(query)
reranked_docs = rerank(query, retrieved_docs)
context = " ".join(reranked_docs[:3]) # Use top 3 retrieved chunks
print(f"πŸ“ Best Chunk Size Selected: {best_chunk_size}")
print("πŸ“– Retrieved Context:")
print(context[:500]) # Show preview of context
# Compute retrieval confidence
retrieval_confidence = compute_retrieval_confidence(query, reranked_docs, best_chunk_size)
# Generate Answer with Flan-T5 Confidence
answer, generation_confidence = generate_answer_with_confidence(context, query)
answer = to_camel_case(answer)
# Final confidence score (weighted average)
final_confidence = 0.6 * generation_confidence + 0.4 * retrieval_confidence
print("\nπŸ€– Answer:", answer)
print(f"πŸ”Ή Confidence Score: {final_confidence:.2f} (Gen: {generation_confidence:.2f}, Retrieval: {retrieval_confidence:.2f})")
response = f"Answer: {answer}\n\nConfidence Score: {final_confidence:.2f} (Gen: {generation_confidence:.2f}, Retrieval: {retrieval_confidence:.2f})"
return response
# A relevant financial question (high-confidence).
user_input = "what are the biggest challenges for Apple?"
execute_rag_query_with_confidence(user_input)
query = "What was net profit of Apple's in 2024?"
execute_rag_query_with_confidence(query)
query = "What are factors impacting Apple's financial growth?"
execute_rag_query_with_confidence(query)
# A relevant financial question (low-confidence).
user_input = "What was Apple's revenue in 2023?"
execute_rag_query_with_confidence(user_input)
# An irrelevant question (e.g., "What is the capital of France?") to check system robustness.
user_input = "What is the capital of France?"
execute_rag_query_with_confidence(user_input)
"""# 5. UI Development
### πŸ“Œ Step 1: Answering user queries with confidence score
"""
# Define Chatbot Function
def chat_with_rag(message, history):
try:
response = execute_rag_query_with_confidence(message)
return response
except Exception as e:
return f"Error: {str(e)}"
"""### πŸ“Œ Step 2: Integrate with Gradio UI"""
# Create Gradio Chatbot UI with Auto-Clearing Input
demo = gr.ChatInterface(
fn= chat_with_rag, # Function to generate responses
title="πŸ“Š Financial Advanced RAG Chatbot with GuardRails",
description="Ask questions about Apple's financial reports and get AI-powered answers!",
theme="soft",
examples=[
["What was net profit of Apple's in 2024?"],
["What was Apple's revenue in 2023?"],
["What are factors impacting Apple's financial growth?"],
["Who is the prime minister of India?"],
["Is Apple's financial strategy stupid?"],
["Email Apple's 2023 revenue details to test@example.com"],
],
submit_btn="Ask",
stop_btn=None,
)
if __name__ == "__main__":
demo.launch()