Spaces:
No application file
No application file
| # Importing libraries for web scraping | |
| import requests # For making HTTP requests | |
| from bs4 import BeautifulSoup # For parsing HTML content | |
| # Importing library for data handling | |
| import pandas as pd | |
| # OS and file handling libraries | |
| import os | |
| import shutil | |
| # LlamaIndex imports for document indexing and retrieval | |
| from llama_index.core import VectorStoreIndex, SimpleDirectoryReader | |
| from llama_index.core.node_parser import HierarchicalNodeParser | |
| # Importing ChromaDB for persistent vector storage | |
| import chromadb | |
| # LlamaIndex wrapper for using Chroma as a vector store | |
| from llama_index.vector_stores.chroma import ChromaVectorStore | |
| # HuggingFace embedding model for generating vector representations | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
| # Ingestion pipeline to preprocess and ingest documents into a vector store | |
| from llama_index.core.ingestion import IngestionPipeline | |
| # Tools for creating complex metadata-based filters for search and retrieval | |
| from llama_index.core.vector_stores import MetadataFilters, ExactMatchFilter, MetadataFilter, FilterOperator, FilterCondition | |
| # For retrieving relevant documents using a vector index | |
| from llama_index.core.retrievers import VectorIndexRetriever | |
| # OpenRouter LLM wrapper to use models via OpenRouter platform | |
| from llama_index.llms.openrouter import OpenRouter | |
| # Synthesizer to generate responses from retrieved documents | |
| from llama_index.core.response_synthesizers import get_response_synthesizer | |
| # Query engine that combines retriever and synthesizer for answering queries | |
| from llama_index.core.query_engine import RetrieverQueryEngine | |
| # Import core classes from CrewAI | |
| from crewai import Crew, Agent, Task | |
| def fetch_and_download_policy_documents(insurer, UIN, results, save_path): | |
| """ | |
| Fetches health insurance policy documents from the IRDAI website using the insurer name and UIN. | |
| Downloads the associated PDF files and saves metadata as a CSV. | |
| Args: | |
| insurer (str): Name of the insurance provider. | |
| UIN (str): Unique Identification Number for the insurance product. | |
| results (int): Number of search results to fetch. | |
| save_path (str): Local directory path where documents will be downloaded. | |
| Returns: | |
| pd.DataFrame: DataFrame containing metadata of the downloaded documents. | |
| """ | |
| # Construct the URL for IRDAI document search with filters applied | |
| url = ( | |
| f'https://irdai.gov.in/health-insurance-products' | |
| f'?p_p_id=com_irdai_document_media_IRDAIDocumentMediaPortlet' | |
| f'&p_p_lifecycle=0&p_p_state=normal&p_p_mode=view' | |
| f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_filterInsurer={insurer}' | |
| f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_filterUIN={UIN}' | |
| f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_filterApprovalDateFrom=01%2F01%2F2020' | |
| f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_resetCur=false' | |
| f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_delta={results}' | |
| ) | |
| # Set headers to mimic a browser request | |
| headers = { | |
| "User-Agent": "Mozilla/5.0" | |
| } | |
| # Make a GET request and parse the HTML content | |
| response = requests.get(url, headers=headers) | |
| soup = BeautifulSoup(response.content, "html.parser") | |
| # Find the table containing policy data | |
| table = soup.find("table") | |
| if not table: | |
| raise ValueError("No table found – the content structure may have changed.") | |
| # Extract all rows in the table | |
| rows = table.find_all("tr") | |
| data = [] | |
| # Extract column headers and append additional metadata columns | |
| header_row = rows[0] | |
| header_cols = [th.text.strip() for th in header_row.find_all("th")] | |
| header_cols.append("Document URL") | |
| header_cols.append("Document Name") | |
| # Parse each row to extract text data and document link info | |
| for row in rows[1:-1]: | |
| cols = row.find_all("td") | |
| text_data = [ele.text.strip() for ele in cols] | |
| # Extract the document link and name from the relevant column | |
| doc_col = cols[7] | |
| link_tag = doc_col.find("a") | |
| href = link_tag['href'] if link_tag and 'href' in link_tag.attrs else None | |
| doc_name = link_tag.text.strip() if link_tag else None | |
| text_data.append(href) | |
| text_data.append(doc_name) | |
| data.append(text_data) | |
| # Create a DataFrame from the extracted data | |
| df = pd.DataFrame(data, columns=header_cols) | |
| # Remove the directory if it already exists to avoid old file conflicts | |
| try: | |
| shutil.rmtree(save_path) | |
| except FileNotFoundError: | |
| pass # Ignore if directory does not exist | |
| # Create directory for saving documents | |
| os.makedirs(save_path, exist_ok=True) | |
| # Download each document using the extracted URLs | |
| for index, row in df.iterrows(): | |
| document_url = row['Document URL'] | |
| if document_url: | |
| try: | |
| # Stream download to avoid loading entire file in memory | |
| response = requests.get(document_url, stream=True, headers=headers) | |
| response.raise_for_status() | |
| # Construct filename using UIN and save to file | |
| filename = row['UIN'] + '.pdf' | |
| filepath = os.path.join(save_path, filename) | |
| with open(filepath, 'wb') as file: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| file.write(chunk) | |
| # Uncomment to log downloaded files | |
| # print(f"Downloaded: {filename}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error downloading {document_url}: {e}") | |
| else: | |
| # Skip rows without a valid document link | |
| print(f"Skipping row {index}: No document URL found.") | |
| # Save the DataFrame with document metadata to a CSV file | |
| csv_file_path = './policy_documents_metadata.csv' | |
| df.to_csv(csv_file_path, index=False) | |
| print('./policy_documents_metadata.csv has been saved') | |
| return df | |
| async def create_vDB(doc_path, vDB_path, vDB_colection, embedding_model): | |
| """ | |
| Asynchronously creates a vector database (vDB) using ChromaDB and stores embedded document data. | |
| Args: | |
| doc_path (str): Path to the folder containing input documents. | |
| vDB_path (str): Directory path for storing the persistent ChromaDB vector database. | |
| vDB_colection (str): Name of the vector collection inside ChromaDB. | |
| embedding_model (str): Name of the HuggingFace model used for embedding text. | |
| Returns: | |
| ChromaVectorStore: An instance of the vector store containing embedded document nodes. | |
| """ | |
| # Load all documents from the specified directory | |
| documents = SimpleDirectoryReader(doc_path).load_data() | |
| # Add 'UIN' metadata to each document using the filename (excluding extension) | |
| for doc in documents: | |
| doc.metadata['UIN'] = doc.metadata['file_name'][:-4] | |
| # Parse documents into hierarchical nodes for structured semantic representation | |
| node_parser = HierarchicalNodeParser.from_defaults() | |
| nodes = node_parser.get_nodes_from_documents(documents) | |
| # Create a persistent Chroma client using the specified vector DB path | |
| db = chromadb.PersistentClient(path=vDB_path) | |
| # Remove the existing collection if it exists (for a fresh start) | |
| try: | |
| db.delete_collection(name=vDB_colection) | |
| except Exception as e: | |
| pass # Ignore errors if the collection does not exist | |
| # Create or retrieve a vector collection in ChromaDB | |
| chroma_collection = db.get_or_create_collection(name=vDB_colection) | |
| # Initialize the Chroma-based vector store | |
| vector_store = ChromaVectorStore(chroma_collection=chroma_collection) | |
| # Set up an ingestion pipeline that includes HuggingFace embedding transformation | |
| pipeline = IngestionPipeline( | |
| transformations=[ | |
| HuggingFaceEmbedding(model_name=embedding_model), | |
| ], | |
| vector_store=vector_store, | |
| ) | |
| # Set batch size to control memory usage during ingestion | |
| BATCH_SIZE = 1000 | |
| # Asynchronously ingest nodes into the vector store in batches | |
| async def ingest_in_batches(nodes): | |
| for i in range(0, len(nodes), BATCH_SIZE): | |
| batch = nodes[i:i + BATCH_SIZE] | |
| print(f"Ingesting batch {i // BATCH_SIZE + 1} ({len(batch)} nodes)...") | |
| await pipeline.arun(nodes=batch) | |
| # Run the batch ingestion process | |
| await ingest_in_batches(nodes) | |
| # Return the vector store instance for further querying or retrieval | |
| return vector_store | |
| def create_query_engine(UIN, embedding_model, vector_store, similarity_top_k, llm_model, api_key): | |
| """ | |
| Creates a RetrieverQueryEngine that performs filtered semantic search and generates responses using an LLM. | |
| Args: | |
| UIN (str): Unique Identification Number used to filter relevant documents. | |
| embedding_model (str): Name of the HuggingFace model used for embedding text. | |
| vector_store (ChromaVectorStore): Pre-built vector store containing embedded documents. | |
| similarity_top_k (int): Number of most semantically similar nodes to retrieve. | |
| llm_model (str): Name of the language model served via OpenRouter for generating responses. | |
| api_key (str): API key for accessing the OpenRouter platform. | |
| Returns: | |
| RetrieverQueryEngine: A query engine capable of semantic search and LLM-powered response generation. | |
| """ | |
| # Build a vector index from the existing vector store using the specified embedding model | |
| index = VectorStoreIndex.from_vector_store( | |
| vector_store=vector_store, | |
| embed_model=HuggingFaceEmbedding(model_name=embedding_model) | |
| ) | |
| # Define metadata filters to limit search results to documents matching the specified UIN | |
| filters = MetadataFilters( | |
| filters=[ | |
| ExactMatchFilter(key="UIN", value=UIN) | |
| ] | |
| ) | |
| # Create a retriever that uses both vector similarity and metadata filters | |
| retriever = VectorIndexRetriever( | |
| index=index, | |
| filters=filters, | |
| similarity_top_k=similarity_top_k # Retrieve top x most semantically similar nodes | |
| ) | |
| # Initialize the LLM from OpenRouter using the specified model name | |
| llm = OpenRouter( | |
| api_key=api_key, | |
| model=llm_model, | |
| ) | |
| # Create a response synthesizer that leverages the LLM to answer user queries | |
| response_synthesizer = get_response_synthesizer(llm=llm) | |
| # Set up the complete query engine by combining retriever and response synthesizer | |
| query_engine = RetrieverQueryEngine( | |
| retriever=retriever, | |
| response_synthesizer=response_synthesizer | |
| ) | |
| return query_engine | |
| def archive_vdb(vdb_path, archive_path): | |
| """ | |
| Archives the vDB (vector database) directory into a ZIP file. | |
| Args: | |
| vdb_path (str): Path to the directory containing the vector database to archive. | |
| archive_path (str): Full path (including .zip extension) where the archive will be saved. | |
| Returns: | |
| None | |
| """ | |
| try: | |
| # Create a ZIP archive of the vDB directory | |
| # shutil.make_archive requires the archive path without the extension | |
| shutil.make_archive(archive_path[:-4], 'zip', vdb_path) # Remove .zip before archiving | |
| print(f"vDB successfully archived to {archive_path}") | |
| except FileNotFoundError: | |
| # Handle case where vDB path does not exist | |
| print(f"Error: vDB directory not found at {vdb_path}") | |
| except Exception as e: | |
| # Catch-all for any unexpected errors during archiving | |
| print(f"An error occurred during archiving: {e}") | |
| def load_vdb_from_archive(archive_path, vdb_path, collection): | |
| """ | |
| Extracts and loads a Chroma-based vector database (vDB) from a ZIP archive. | |
| Args: | |
| archive_path (str): Full path to the ZIP archive containing the vDB. | |
| vdb_path (str): Destination directory where the archive contents will be extracted. | |
| collection (str): Name of the Chroma collection within the vDB. | |
| Returns: | |
| ChromaVectorStore or None: A vector store object ready for use, or None if loading fails. | |
| """ | |
| try: | |
| # Extract the archive to the specified vdb_path directory | |
| shutil.unpack_archive(archive_path, vdb_path) | |
| print(f"vDB archive extracted to {vdb_path}") | |
| # Initialize a persistent ChromaDB client from the extracted directory | |
| db = chromadb.PersistentClient(path=vdb_path) | |
| # Retrieve or create the 'IRDAI' collection from the ChromaDB | |
| chroma_collection = db.get_or_create_collection(name=collection) | |
| # Wrap the Chroma collection in a ChromaVectorStore object for use with LlamaIndex | |
| vector_store = ChromaVectorStore(chroma_collection=chroma_collection) | |
| print("ChromaDB loaded successfully from archive.") | |
| return vector_store | |
| except FileNotFoundError: | |
| # Handle case where the ZIP archive does not exist | |
| print(f"Error: vDB archive not found at {archive_path}") | |
| return None | |
| except Exception as e: | |
| # Catch-all for any unexpected errors during extraction or loading | |
| print(f"An error occurred during loading: {e}") | |
| return None | |