import gradio as gr from huggingface_hub import InferenceClient import os import torch import transformers from tensorflow import keras from transformers import AutoTokenizer, pipeline, AutoModelForSeq2SeqLM,AutoModelForCausalLM from langchain.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import Chroma from langchain.prompts import ChatPromptTemplate from langchain.schema.runnable import RunnablePassthrough from langchain.schema.output_parser import StrOutputParser from langchain.llms import HuggingFacePipeline import gradio as gr from sentence_transformers import SentenceTransformer import chromadb from rank_bm25 import BM25Okapi import nltk from collections import deque from sentence_transformers import CrossEncoder from transformers import T5Tokenizer, T5ForConditionalGeneration import numpy as np from guardrails.validators import Validator, register_validator, ValidationResult, FailResult, PassResult from presidio_analyzer import AnalyzerEngine from presidio_analyzer.nlp_engine import SpacyNlpEngine, NlpEngineProvider from better_profanity import profanity import inflection from presidio_analyzer import PatternRecognizer, Pattern from guardrails import Guard import re from bs4 import BeautifulSoup import warnings # Suppress all warnings warnings.filterwarnings("ignore") from rank_bm25 import BM25Okapi nltk.download("punkt") nltk.download("punkt_tab") """ For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference """ client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") print("GPU Available:", torch.cuda.is_available()) print("GPU Name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU Found") """# 2. Data Collection & Preprocessing""" pdf_files = ["Apple-10K-2023.pdf", "Apple-10K-2024.pdf"] """### š Step 1: Load Multiple 10-K Financial Report PDFs This step loads multiple financial reports (10-K filings) from PDFs stored in Google Drive. - It initializes a list of PDF file paths. - Each PDF is processed using `PyPDFLoader` to extract text content. - The extracted documents are combined into a single list (`all_documents`) for further processing. - This ensures that all relevant financial data is available for retrieval in the RAG pipeline.
Here each split will also have a metadata defining the location of the chunk in the actual document for citation,also other details post cleaning (Removing extra spaces, newlines and html tags) the text load from the PDF Contents """ all_documents = [] def preprocess_text(text): # Remove HTML tags text = BeautifulSoup(text, "html.parser").get_text() # Remove extra whitespace and newlines text = re.sub(r'\s+', ' ', text).strip() return text for pdf_path in pdf_files: loader = PyPDFLoader(pdf_path) documents = loader.load() for doc in documents: doc.page_content = preprocess_text(doc.page_content) all_documents.extend(documents) """### š Step 2: Adaptive Chunking with Different Sizes Uses different chunk sizes (500 & 1000) and latesr selects the best one dynamically """ chunk_sizes = [500, 1000] # Create chunk dictionaries chunked_texts = {} # Generate chunks for each chunk size for size in chunk_sizes: text_splitter = RecursiveCharacterTextSplitter( chunk_size=size, chunk_overlap=int(size * 0.2), length_function=len ) chunked_texts[size] = text_splitter.split_documents(all_documents) print(f"š¹ Chunk size {size}: {len(chunked_texts[size])} chunks") """### š Step 2: Create Embeddings using Sentence Transformers""" # Load embedding model embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") # Generate embeddings for both chunk sizes embeddings_dict = { size: embedding_model.encode( [doc.page_content for doc in chunked_texts[size]], convert_to_numpy=True ) for size in chunk_sizes } print("ā Generated embeddings for all chunk sizes.") """### š Step 3: Store and retrieve using a basic vector database""" # Initialize ChromaDB client chroma_client = chromadb.PersistentClient(path="./chroma_financials") # Create collections for each chunk size collections = { size: chroma_client.get_or_create_collection(name=f"financial_chunks_{size}") for size in chunk_sizes } # Insert chunks into ChromaDB for size in chunk_sizes: texts = [doc.page_content for doc in chunked_texts[size]] embeddings = embeddings_dict[size] for idx, (chunk, embedding) in enumerate(zip(texts, embeddings)): collections[size].add( ids=[f"{size}_{idx}"], embeddings=[embedding.tolist()], metadatas=[{"text": chunk}] ) print("ā Stored all chunk sizes in ChromaDB.") """# 3. Advanced RAG Implementation ### š Step 1: BM25 for keyword-based search alongside embeddings """ # Tokenize all documents bm25_corpus = { size: [nltk.word_tokenize(doc.page_content.lower()) for doc in chunked_texts[size]] for size in chunk_sizes } # Create BM25 index bm25_models = { size: BM25Okapi(bm25_corpus[size]) for size in chunk_sizes } print("ā Initialized BM25 models.") """### š Step 2: Memory-Augmented Retrieval with Hybrid Search""" # Memory store for past queries memory_store = deque(maxlen=10) # Stores last 10 queries # Memomry Augumented Retrievel def memory_augmented_retrieval(query): # Check memory store with correct unpacking for stored_query, stored_data in memory_store: if stored_query == query: stored_size, stored_results = stored_data print("š¹ Retrieved from memory store.") return {stored_size: stored_results} else: return None # Hybrid search with Memomry Augumented Retrievel def hybrid_search(query, top_k=5): # Check memory store with correct unpacking mar_result = memory_augmented_retrieval(query) if mar_result != None: return mar_result # Encode query for embeddings query_embedding = embedding_model.encode(query).tolist() # Perform search for both chunk sizes results = {} for size in chunk_sizes: # BM25 Retrieval tokenized_query = nltk.word_tokenize(query.lower()) bm25_scores = bm25_models[size].get_scores(tokenized_query) bm25_top_idxs = sorted(range(len(bm25_scores)), key=lambda i: bm25_scores[i], reverse=True)[:top_k] bm25_results = [chunked_texts[size][idx].page_content for idx in bm25_top_idxs] # Embedding Retrieval retrieved = collections[size].query(query_embeddings=[query_embedding], n_results=top_k) embedding_results = [item["text"] for item in retrieved["metadatas"][0]] # Merge BM25 + Embedding results combined_results = list(set(bm25_results + embedding_results)) results[size] = combined_results # ā Ensure results is a dictionary return results # ā Return results as a dictionary """### š Step 3: Testing different chunk sizes & retrieval methods for better accuracy""" def score_retrieval_results(results): """ Score retrieval results based on text length, keyword coverage, and diversity. Higher score means better retrieval quality. """ scores = {} for size, texts in results.items(): total_length = sum(len(txt) for txt in texts) # Longer retrieved text is better unique_chunks = len(set(texts)) # More unique chunks = better diversity # Assign a heuristic score (you can improve this with LLM-based re-ranking) scores[size] = total_length + (unique_chunks * 10) # Weight uniqueness higher print(f"Retrieval Scores:{scores}") return scores def search_with_dynamic_chunk(query, top_k=5): results = hybrid_search(query, top_k) # Score and select the best chunk size dynamically scores = score_retrieval_results(results) best_size = max(scores, key=scores.get) # Select chunk size with highest score # Store in memory with correct format memory_store.append((query, (best_size, results[best_size]))) return best_size, results[best_size] query = "Apple's revenue in 2023" best_chunk_size, retrieved_docs = search_with_dynamic_chunk(query) # Display results print(f"š¹ Best Chunk Size Selected: {best_chunk_size}") print("š Retrieved Results:",len(retrieved_docs)) """### š Step 4: Re-Ranking with Cross-Encoders""" # Load re-ranking model reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2") def rerank(query, retrieved_docs): query_doc_pairs = [[query, doc] for doc in retrieved_docs] scores = reranker.predict(query_doc_pairs) ranked_docs = [doc for _, doc in sorted(zip(scores, retrieved_docs), reverse=True)] return ranked_docs # Apply re-ranking reranked_docs = rerank(query, retrieved_docs) print("Re-ranked Documents:", reranked_docs) print(retrieved_docs) """### š Step 5: Load and Configure the Language Model for Text Generation This step initializes and configures the **FLAN-T5-Large** model for text generation. - **Model Selection**: - The `"google/flan-t5-large"` model is chosen, which is fine-tuned for instruction-following tasks. - This model is well-suited for answering questions based on retrieved context. """ # Load Flan-T5 model model_name = "google/flan-t5-large" tokenizer = T5Tokenizer.from_pretrained(model_name) model = T5ForConditionalGeneration.from_pretrained(model_name) def generate_answer_with_confidence(context, query): """Generate an answer using Flan-T5 and compute confidence score.""" prompt = f"Given the following financial context, answer the question:\n\nContext:\n{context}\n\nQuestion: {query}" inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512) with torch.no_grad(): outputs = model.generate(**inputs, max_length=200, return_dict_in_generate=True, output_scores=True) generated_text = tokenizer.decode(outputs.sequences[0], skip_special_tokens=True) # Compute confidence score from token probabilities probs = torch.stack(outputs.scores).softmax(dim=-1) token_confidences = probs.max(dim=-1).values.mean().item() return generated_text, token_confidences """### š Step 6: Confidence score retrieval""" def compute_retrieval_confidence(query, retrieved_docs, best_chunk_size): """Compute retrieval confidence based on BM25 and embedding similarity.""" # BM25 Score Normalization tokenized_query = nltk.word_tokenize(query.lower()) bm25_scores = np.array(bm25_models[best_chunk_size].get_scores(tokenized_query)) bm25_confidence = np.mean(bm25_scores) / max(bm25_scores) # Normalize # Embedding Similarity Confidence query_embedding = embedding_model.encode(query) retrieved_embeddings = [embedding_model.encode(doc) for doc in retrieved_docs] similarities = [np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb)) for emb in retrieved_embeddings] embedding_confidence = np.mean(similarities) # Combined retrieval confidence (weighted sum) retrieval_confidence = 0.5 * bm25_confidence + 0.5 * embedding_confidence return retrieval_confidence """# 4. Guard Rail Implementation ### š Step 1: Input-Side: Validate and filter user queries to prevent irrelevant/harmful inputs """ ## GuardRail validators # Define NLP Configuration with lang_code nlp_configuration = { "nlp_engine_name": "spacy", "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}], } # Define SSN Pattern ssn_regex = r"\b\d{3}-\d{2}-\d{4}\b" # Matches US SSN format (123-45-6789) ssn_pattern = Pattern(name="SSN Pattern", regex=ssn_regex, score=0.85) # Score between 0-1 # Create Custom SSN Recognizer ssn_recognizer = PatternRecognizer(supported_entity="SSN", patterns=[ssn_pattern]) analyzer = AnalyzerEngine() analyzer.registry.add_recognizer(ssn_recognizer) @register_validator(name="custom_pii_detector", data_type="string") class CustomPIIDetector(Validator): def validate(self, value, metadata={}) -> ValidationResult: # Analyze text for PII results = analyzer.analyze(text=value, entities=["PHONE_NUMBER", "EMAIL_ADDRESS", "CREDIT_CARD", "SSN"], language="en") if results: detected_entities = ", ".join(set([res.entity_type for res in results])) return FailResult( error_message=f"Query contains PII: {detected_entities}." ) return PassResult() # Custom Profanity Detector using better-profanity @register_validator(name="custom_profanity_detector", data_type="string") class CustomProfanityDetector(Validator): def validate(self, value, metadata={}) -> ValidationResult: if profanity.contains_profanity(value): return FailResult( error_message="Query contains profanity." ) return PassResult() # Custom Relevance Validator for Finance and Apple-related Queries @register_validator(name="custom_relevance_detector", data_type="string") class CustomRelevanceDetector(Validator): def validate(self, value, metadata={}) -> ValidationResult: finance_keywords = {"revenue", "profit", "expenses", "balance sheet", "earnings", "financial", "investment", "dividends", "assets", "liabilities", "cash flow", "loss","turnover"} apple_keywords = {"apple", "iphone", "macbook", "tim cook", "apple inc", "ios", "mac", "ipad"} text_lower = value.lower() # Check if any finance-related or Apple-related keyword appears in the query if not any(keyword in text_lower for keyword in (finance_keywords | apple_keywords)): return FailResult( error_message="Query is not related to finance or Apple." ) return PassResult() guard = Guard().use(CustomPIIDetector).use(CustomProfanityDetector).use(CustomRelevanceDetector) """# 3. Testing & Validation ### š Step 1: Test with simple Financial Questions """ def to_camel_case(text): """Convert normal text to camelCase using inflection package.""" camel_text = inflection.camelize(text, uppercase_first_letter=True) return camel_text def execute_rag_query_with_confidence(query): """Run RAG with confidence scoring.""" ## implement input side guardrail try: res = guard.validate(query) except Exception as e: return f"ā Guardrail {str(e)}" best_chunk_size, retrieved_docs = search_with_dynamic_chunk(query) reranked_docs = rerank(query, retrieved_docs) context = " ".join(reranked_docs[:3]) # Use top 3 retrieved chunks print(f"š Best Chunk Size Selected: {best_chunk_size}") print("š Retrieved Context:") print(context[:500]) # Show preview of context # Compute retrieval confidence retrieval_confidence = compute_retrieval_confidence(query, reranked_docs, best_chunk_size) # Generate Answer with Flan-T5 Confidence answer, generation_confidence = generate_answer_with_confidence(context, query) answer = to_camel_case(answer) # Final confidence score (weighted average) final_confidence = 0.6 * generation_confidence + 0.4 * retrieval_confidence print("\nš¤ Answer:", answer) print(f"š¹ Confidence Score: {final_confidence:.2f} (Gen: {generation_confidence:.2f}, Retrieval: {retrieval_confidence:.2f})") response = f"Answer: {answer}\n\nConfidence Score: {final_confidence:.2f} (Gen: {generation_confidence:.2f}, Retrieval: {retrieval_confidence:.2f})" return response # A relevant financial question (high-confidence). user_input = "what are the biggest challenges for Apple?" execute_rag_query_with_confidence(user_input) query = "What was net profit of Apple's in 2024?" execute_rag_query_with_confidence(query) query = "What are factors impacting Apple's financial growth?" execute_rag_query_with_confidence(query) # A relevant financial question (low-confidence). user_input = "What was Apple's revenue in 2023?" execute_rag_query_with_confidence(user_input) # An irrelevant question (e.g., "What is the capital of France?") to check system robustness. user_input = "What is the capital of France?" execute_rag_query_with_confidence(user_input) """# 5. UI Development ### š Step 1: Answering user queries with confidence score """ # Define Chatbot Function def chat_with_rag(message, history): try: response = execute_rag_query_with_confidence(message) return response except Exception as e: return f"Error: {str(e)}" """### š Step 2: Integrate with Gradio UI""" # Create Gradio Chatbot UI with Auto-Clearing Input demo = gr.ChatInterface( fn= chat_with_rag, # Function to generate responses title="š Financial Advanced RAG Chatbot with GuardRails", description="Ask questions about Apple's financial reports and get AI-powered answers!", theme="soft", examples=[ ["What was net profit of Apple's in 2024?"], ["What was Apple's revenue in 2023?"], ["What are factors impacting Apple's financial growth?"], ["Who is the prime minister of India?"], ["Is Apple's financial strategy stupid?"], ["Email Apple's 2023 revenue details to test@example.com"], ], submit_btn="Ask", stop_btn=None, ) if __name__ == "__main__": demo.launch()