# Importing libraries for web scraping import requests # For making HTTP requests from bs4 import BeautifulSoup # For parsing HTML content # Importing library for data handling import pandas as pd # OS and file handling libraries import os import shutil # LlamaIndex imports for document indexing and retrieval from llama_index.core import VectorStoreIndex, SimpleDirectoryReader from llama_index.core.node_parser import HierarchicalNodeParser # Importing ChromaDB for persistent vector storage import chromadb # LlamaIndex wrapper for using Chroma as a vector store from llama_index.vector_stores.chroma import ChromaVectorStore # HuggingFace embedding model for generating vector representations from llama_index.embeddings.huggingface import HuggingFaceEmbedding # Ingestion pipeline to preprocess and ingest documents into a vector store from llama_index.core.ingestion import IngestionPipeline # Tools for creating complex metadata-based filters for search and retrieval from llama_index.core.vector_stores import MetadataFilters, ExactMatchFilter, MetadataFilter, FilterOperator, FilterCondition # For retrieving relevant documents using a vector index from llama_index.core.retrievers import VectorIndexRetriever # OpenRouter LLM wrapper to use models via OpenRouter platform from llama_index.llms.openrouter import OpenRouter # Synthesizer to generate responses from retrieved documents from llama_index.core.response_synthesizers import get_response_synthesizer # Query engine that combines retriever and synthesizer for answering queries from llama_index.core.query_engine import RetrieverQueryEngine # Import core classes from CrewAI from crewai import Crew, Agent, Task def fetch_and_download_policy_documents(insurer, UIN, results, save_path): """ Fetches health insurance policy documents from the IRDAI website using the insurer name and UIN. Downloads the associated PDF files and saves metadata as a CSV. Args: insurer (str): Name of the insurance provider. UIN (str): Unique Identification Number for the insurance product. results (int): Number of search results to fetch. save_path (str): Local directory path where documents will be downloaded. Returns: pd.DataFrame: DataFrame containing metadata of the downloaded documents. """ # Construct the URL for IRDAI document search with filters applied url = ( f'https://irdai.gov.in/health-insurance-products' f'?p_p_id=com_irdai_document_media_IRDAIDocumentMediaPortlet' f'&p_p_lifecycle=0&p_p_state=normal&p_p_mode=view' f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_filterInsurer={insurer}' f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_filterUIN={UIN}' f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_filterApprovalDateFrom=01%2F01%2F2020' f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_resetCur=false' f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_delta={results}' ) # Set headers to mimic a browser request headers = { "User-Agent": "Mozilla/5.0" } # Make a GET request and parse the HTML content response = requests.get(url, headers=headers) soup = BeautifulSoup(response.content, "html.parser") # Find the table containing policy data table = soup.find("table") if not table: raise ValueError("No table found – the content structure may have changed.") # Extract all rows in the table rows = table.find_all("tr") data = [] # Extract column headers and append additional metadata columns header_row = rows[0] header_cols = [th.text.strip() for th in header_row.find_all("th")] header_cols.append("Document URL") header_cols.append("Document Name") # Parse each row to extract text data and document link info for row in rows[1:-1]: cols = row.find_all("td") text_data = [ele.text.strip() for ele in cols] # Extract the document link and name from the relevant column doc_col = cols[7] link_tag = doc_col.find("a") href = link_tag['href'] if link_tag and 'href' in link_tag.attrs else None doc_name = link_tag.text.strip() if link_tag else None text_data.append(href) text_data.append(doc_name) data.append(text_data) # Create a DataFrame from the extracted data df = pd.DataFrame(data, columns=header_cols) # Remove the directory if it already exists to avoid old file conflicts try: shutil.rmtree(save_path) except FileNotFoundError: pass # Ignore if directory does not exist # Create directory for saving documents os.makedirs(save_path, exist_ok=True) # Download each document using the extracted URLs for index, row in df.iterrows(): document_url = row['Document URL'] if document_url: try: # Stream download to avoid loading entire file in memory response = requests.get(document_url, stream=True, headers=headers) response.raise_for_status() # Construct filename using UIN and save to file filename = row['UIN'] + '.pdf' filepath = os.path.join(save_path, filename) with open(filepath, 'wb') as file: for chunk in response.iter_content(chunk_size=8192): file.write(chunk) # Uncomment to log downloaded files # print(f"Downloaded: {filename}") except requests.exceptions.RequestException as e: print(f"Error downloading {document_url}: {e}") else: # Skip rows without a valid document link print(f"Skipping row {index}: No document URL found.") # Save the DataFrame with document metadata to a CSV file csv_file_path = './policy_documents_metadata.csv' df.to_csv(csv_file_path, index=False) print('./policy_documents_metadata.csv has been saved') return df async def create_vDB(doc_path, vDB_path, vDB_colection, embedding_model): """ Asynchronously creates a vector database (vDB) using ChromaDB and stores embedded document data. Args: doc_path (str): Path to the folder containing input documents. vDB_path (str): Directory path for storing the persistent ChromaDB vector database. vDB_colection (str): Name of the vector collection inside ChromaDB. embedding_model (str): Name of the HuggingFace model used for embedding text. Returns: ChromaVectorStore: An instance of the vector store containing embedded document nodes. """ # Load all documents from the specified directory documents = SimpleDirectoryReader(doc_path).load_data() # Add 'UIN' metadata to each document using the filename (excluding extension) for doc in documents: doc.metadata['UIN'] = doc.metadata['file_name'][:-4] # Parse documents into hierarchical nodes for structured semantic representation node_parser = HierarchicalNodeParser.from_defaults() nodes = node_parser.get_nodes_from_documents(documents) # Create a persistent Chroma client using the specified vector DB path db = chromadb.PersistentClient(path=vDB_path) # Remove the existing collection if it exists (for a fresh start) try: db.delete_collection(name=vDB_colection) except Exception as e: pass # Ignore errors if the collection does not exist # Create or retrieve a vector collection in ChromaDB chroma_collection = db.get_or_create_collection(name=vDB_colection) # Initialize the Chroma-based vector store vector_store = ChromaVectorStore(chroma_collection=chroma_collection) # Set up an ingestion pipeline that includes HuggingFace embedding transformation pipeline = IngestionPipeline( transformations=[ HuggingFaceEmbedding(model_name=embedding_model), ], vector_store=vector_store, ) # Set batch size to control memory usage during ingestion BATCH_SIZE = 1000 # Asynchronously ingest nodes into the vector store in batches async def ingest_in_batches(nodes): for i in range(0, len(nodes), BATCH_SIZE): batch = nodes[i:i + BATCH_SIZE] print(f"Ingesting batch {i // BATCH_SIZE + 1} ({len(batch)} nodes)...") await pipeline.arun(nodes=batch) # Run the batch ingestion process await ingest_in_batches(nodes) # Return the vector store instance for further querying or retrieval return vector_store def create_query_engine(UIN, embedding_model, vector_store, similarity_top_k, llm_model, api_key): """ Creates a RetrieverQueryEngine that performs filtered semantic search and generates responses using an LLM. Args: UIN (str): Unique Identification Number used to filter relevant documents. embedding_model (str): Name of the HuggingFace model used for embedding text. vector_store (ChromaVectorStore): Pre-built vector store containing embedded documents. similarity_top_k (int): Number of most semantically similar nodes to retrieve. llm_model (str): Name of the language model served via OpenRouter for generating responses. api_key (str): API key for accessing the OpenRouter platform. Returns: RetrieverQueryEngine: A query engine capable of semantic search and LLM-powered response generation. """ # Build a vector index from the existing vector store using the specified embedding model index = VectorStoreIndex.from_vector_store( vector_store=vector_store, embed_model=HuggingFaceEmbedding(model_name=embedding_model) ) # Define metadata filters to limit search results to documents matching the specified UIN filters = MetadataFilters( filters=[ ExactMatchFilter(key="UIN", value=UIN) ] ) # Create a retriever that uses both vector similarity and metadata filters retriever = VectorIndexRetriever( index=index, filters=filters, similarity_top_k=similarity_top_k # Retrieve top x most semantically similar nodes ) # Initialize the LLM from OpenRouter using the specified model name llm = OpenRouter( api_key=api_key, model=llm_model, ) # Create a response synthesizer that leverages the LLM to answer user queries response_synthesizer = get_response_synthesizer(llm=llm) # Set up the complete query engine by combining retriever and response synthesizer query_engine = RetrieverQueryEngine( retriever=retriever, response_synthesizer=response_synthesizer ) return query_engine def archive_vdb(vdb_path, archive_path): """ Archives the vDB (vector database) directory into a ZIP file. Args: vdb_path (str): Path to the directory containing the vector database to archive. archive_path (str): Full path (including .zip extension) where the archive will be saved. Returns: None """ try: # Create a ZIP archive of the vDB directory # shutil.make_archive requires the archive path without the extension shutil.make_archive(archive_path[:-4], 'zip', vdb_path) # Remove .zip before archiving print(f"vDB successfully archived to {archive_path}") except FileNotFoundError: # Handle case where vDB path does not exist print(f"Error: vDB directory not found at {vdb_path}") except Exception as e: # Catch-all for any unexpected errors during archiving print(f"An error occurred during archiving: {e}") def load_vdb_from_archive(archive_path, vdb_path, collection): """ Extracts and loads a Chroma-based vector database (vDB) from a ZIP archive. Args: archive_path (str): Full path to the ZIP archive containing the vDB. vdb_path (str): Destination directory where the archive contents will be extracted. collection (str): Name of the Chroma collection within the vDB. Returns: ChromaVectorStore or None: A vector store object ready for use, or None if loading fails. """ try: # Extract the archive to the specified vdb_path directory shutil.unpack_archive(archive_path, vdb_path) print(f"vDB archive extracted to {vdb_path}") # Initialize a persistent ChromaDB client from the extracted directory db = chromadb.PersistentClient(path=vdb_path) # Retrieve or create the 'IRDAI' collection from the ChromaDB chroma_collection = db.get_or_create_collection(name=collection) # Wrap the Chroma collection in a ChromaVectorStore object for use with LlamaIndex vector_store = ChromaVectorStore(chroma_collection=chroma_collection) print("ChromaDB loaded successfully from archive.") return vector_store except FileNotFoundError: # Handle case where the ZIP archive does not exist print(f"Error: vDB archive not found at {archive_path}") return None except Exception as e: # Catch-all for any unexpected errors during extraction or loading print(f"An error occurred during loading: {e}") return None