| | |
| | """ |
| | Financial Chatbot Utilities |
| | Core functionality for RAG-based financial chatbot |
| | """ |
| |
|
| | import os |
| | import re |
| | import nltk |
| | from nltk.corpus import stopwords |
| | from collections import deque |
| | from typing import Tuple |
| | import torch |
| |
|
| | |
| | from langchain_community.document_loaders import PyPDFLoader |
| | from langchain.text_splitter import RecursiveCharacterTextSplitter |
| | from langchain_community.vectorstores import Chroma |
| | from langchain_huggingface import HuggingFaceEmbeddings |
| |
|
| | |
| | from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline |
| | from rank_bm25 import BM25Okapi |
| | from sentence_transformers import CrossEncoder |
| | from sklearn.metrics.pairwise import cosine_similarity |
| |
|
| | |
| | |
| | |
| | nltk.data.path.append('./nltk_data') |
| | stop_words = set(nltk.corpus.stopwords.words('english')) |
| |
|
| | |
| | import sys |
| | sys.path.append('/mount/src/gen_ai_dev') |
| |
|
| | |
| | DATA_PATH = "./Infy financial report/" |
| | DATA_FILES = ["INFY_2022_2023.pdf", "INFY_2023_2024.pdf"] |
| | EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" |
| | LLM_MODEL = "gpt2" |
| |
|
| | |
| | os.environ["TOKENIZERS_PARALLELISM"] = "false" |
| | os.environ["CHROMA_DISABLE_TELEMETRY"] = "true" |
| |
|
| | |
| | import warnings |
| |
|
| | warnings.filterwarnings("ignore", message=".*oneDNN custom operations.*") |
| | warnings.filterwarnings("ignore", message=".*cuBLAS factory.*") |
| |
|
| |
|
| | |
| | |
| | |
| | def load_and_chunk_documents(): |
| | """Load and split PDF documents into manageable chunks""" |
| | text_splitter = RecursiveCharacterTextSplitter( |
| | chunk_size=500, |
| | chunk_overlap=100, |
| | separators=["\n\n", "\n", ".", " ", ""] |
| | ) |
| |
|
| | all_chunks = [] |
| | for file in DATA_FILES: |
| | try: |
| | loader = PyPDFLoader(os.path.join(DATA_PATH, file)) |
| | pages = loader.load() |
| | all_chunks.extend(text_splitter.split_documents(pages)) |
| | except Exception as e: |
| | print(f"Error loading {file}: {e}") |
| |
|
| | return all_chunks |
| |
|
| |
|
| | |
| | |
| | |
| | text_chunks = load_and_chunk_documents() |
| | embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) |
| |
|
| | vector_db = Chroma.from_documents( |
| | documents=text_chunks, |
| | embedding=embeddings, |
| | persist_directory="./chroma_db" |
| | ) |
| | vector_db.persist() |
| |
|
| | |
| | bm25_corpus = [chunk.page_content for chunk in text_chunks] |
| | bm25_tokenized = [doc.split() for doc in bm25_corpus] |
| | bm25 = BM25Okapi(bm25_tokenized) |
| |
|
| | |
| | cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') |
| |
|
| |
|
| | |
| | |
| | |
| | class ConversationMemory: |
| | """Stores recent conversation context""" |
| |
|
| | def __init__(self, max_size=5): |
| | self.buffer = deque(maxlen=max_size) |
| |
|
| | def add_interaction(self, query: str, response: str) -> None: |
| | self.buffer.append((query, response)) |
| |
|
| | def get_context(self) -> str: |
| | return "\n".join( |
| | [f"Previous Q: {q}\nPrevious A: {r}" for q, r in self.buffer] |
| | ) |
| |
|
| |
|
| | memory = ConversationMemory(max_size=3) |
| |
|
| |
|
| | |
| | |
| | |
| | def hybrid_retrieval(query: str, top_k: int = 5) -> str: |
| | try: |
| | |
| | semantic_results = vector_db.similarity_search(query, k=top_k * 2) |
| | print(f"\n\n[For Debug Only] Semantic Results: {semantic_results}") |
| |
|
| | |
| | keyword_results = bm25.get_top_n(query.split(), bm25_corpus, n=top_k * 2) |
| | print(f"\n\n[For Debug Only] Keyword Results: {keyword_results}\n\n") |
| |
|
| | |
| | combined = [] |
| | seen = set() |
| |
|
| | for doc in semantic_results: |
| | content = doc.page_content |
| | if content not in seen: |
| | combined.append((content, "semantic")) |
| | seen.add(content) |
| |
|
| | for doc in keyword_results: |
| | if doc not in seen: |
| | combined.append((doc, "keyword")) |
| | seen.add(doc) |
| |
|
| | |
| | pairs = [(query, content) for content, _ in combined] |
| | scores = cross_encoder.predict(pairs) |
| |
|
| | |
| | sorted_results = sorted( |
| | zip(combined, scores), |
| | key=lambda x: x[1], |
| | reverse=True |
| | ) |
| |
|
| | final_results = [f"[{source}] {content}" for (content, source), _ in sorted_results[:top_k]] |
| |
|
| | memory_context = memory.get_context() |
| | if memory_context: |
| | final_results.append(f"[memory] {memory_context}") |
| |
|
| | return "\n\n".join(final_results) |
| |
|
| | except Exception as e: |
| | print(f"Retrieval error: {e}") |
| | return "" |
| |
|
| |
|
| | |
| | |
| | |
| | class SafetyGuard: |
| | """Validates input and filters output""" |
| |
|
| | def __init__(self): |
| | |
| | |
| | |
| | |
| | self.blocked_topics = { |
| | 'politics', 'sports', 'entertainment', 'religion', |
| | 'medical', 'hypothetical', 'opinion', 'personal' |
| | } |
| |
|
| | def validate_input(self, query: str) -> Tuple[bool, str]: |
| | query_lower = query.lower() |
| | |
| | |
| | if any(topic in query_lower for topic in self.blocked_topics): |
| | return False, "I only discuss financial topics." |
| | return True, "" |
| |
|
| | def filter_output(self, response: str) -> str: |
| | phrases_to_remove = { |
| | "I'm not sure", "I don't know", "maybe", |
| | "possibly", "could be", "uncertain", "perhaps" |
| | } |
| | for phrase in phrases_to_remove: |
| | response = response.replace(phrase, "") |
| |
|
| | sentences = re.split(r'[.!?]', response) |
| | if len(sentences) > 2: |
| | response = '. '.join(sentences[:2]) + '.' |
| |
|
| | return response.strip() |
| |
|
| |
|
| | guard = SafetyGuard() |
| |
|
| | |
| | |
| | |
| | try: |
| | tokenizer = AutoTokenizer.from_pretrained(LLM_MODEL) |
| | model = AutoModelForCausalLM.from_pretrained( |
| | LLM_MODEL, |
| | device_map="cpu", |
| | torch_dtype=torch.float32 |
| | ) |
| |
|
| | generator = pipeline( |
| | "text-generation", |
| | model=model, |
| | tokenizer=tokenizer, |
| | max_new_tokens=400, |
| | do_sample=True, |
| | temperature=0.3, |
| | top_k=30, |
| | top_p=0.9, |
| | repetition_penalty=1.2 |
| | ) |
| | except Exception as e: |
| | print(f"Error loading model: {e}") |
| | raise |
| |
|
| |
|
| | |
| | |
| | |
| | def extract_final_response(full_response: str) -> str: |
| | parts = full_response.split("<|im_start|>assistant") |
| | if len(parts) > 1: |
| | response = parts[-1].split("<|im_end|>")[0] |
| | return re.sub(r'\s+', ' ', response).strip() |
| | return full_response |
| |
|
| |
|
| | def generate_answer(query: str) -> Tuple[str, float]: |
| | try: |
| | is_valid, msg = guard.validate_input(query) |
| | if not is_valid: |
| | return msg, 0.0 |
| |
|
| | context = hybrid_retrieval(query) |
| | vector_db.persist() |
| |
|
| | prompt = f"""<|im_start|>system |
| | You are a financial analyst. Provide a brief answer using the context. |
| | Context: {context}<|im_end|> |
| | <|im_start|>user |
| | {query}<|im_end|> |
| | <|im_start|>assistant |
| | Answer:""" |
| |
|
| | print(f"\n\n[For Debug Only] Prompt: {prompt}\n\n") |
| | response = generator(prompt)[0]['generated_text'] |
| | print(f"\n\n[For Debug Only] response: {response}\n\n") |
| | |
| | clean_response = extract_final_response(response) |
| | clean_response = guard.filter_output(clean_response) |
| | print(f"\n\n[For Debug Only] clean_response: {clean_response}\n\n") |
| | |
| | query_embed = embeddings.embed_query(query) |
| | print(f"\n\n[For Debug Only] query_embed: {query_embed}\n\n") |
| | |
| | response_embed = embeddings.embed_query(clean_response) |
| | print(f"\n\n[For Debug Only] response_embed: {response_embed}\n\n") |
| | |
| | confidence = cosine_similarity([query_embed], [response_embed])[0][0] |
| | print(f"\n\n[For Debug Only] confidence: {confidence}\n\n") |
| | |
| | memory.add_interaction(query, clean_response) |
| | |
| | print(f"\n\n[For Debug Only] I'm Done \n\n") |
| | return clean_response, round(confidence, 2) |
| |
|
| | except Exception as e: |
| | return f"Error processing request: {e}", 0.0 |