AIdvisor / functions.py
Krishna Kumar S
cc
2f971a9
# Importing libraries for web scraping
import requests # For making HTTP requests
from bs4 import BeautifulSoup # For parsing HTML content
# Importing library for data handling
import pandas as pd
# OS and file handling libraries
import os
import shutil
# LlamaIndex imports for document indexing and retrieval
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.node_parser import HierarchicalNodeParser
# Importing ChromaDB for persistent vector storage
import chromadb
# LlamaIndex wrapper for using Chroma as a vector store
from llama_index.vector_stores.chroma import ChromaVectorStore
# HuggingFace embedding model for generating vector representations
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
# Ingestion pipeline to preprocess and ingest documents into a vector store
from llama_index.core.ingestion import IngestionPipeline
# Tools for creating complex metadata-based filters for search and retrieval
from llama_index.core.vector_stores import MetadataFilters, ExactMatchFilter, MetadataFilter, FilterOperator, FilterCondition
# For retrieving relevant documents using a vector index
from llama_index.core.retrievers import VectorIndexRetriever
# OpenRouter LLM wrapper to use models via OpenRouter platform
from llama_index.llms.openrouter import OpenRouter
# Synthesizer to generate responses from retrieved documents
from llama_index.core.response_synthesizers import get_response_synthesizer
# Query engine that combines retriever and synthesizer for answering queries
from llama_index.core.query_engine import RetrieverQueryEngine
# Import core classes from CrewAI
from crewai import Crew, Agent, Task
def fetch_and_download_policy_documents(insurer, UIN, results, save_path):
"""
Fetches health insurance policy documents from the IRDAI website using the insurer name and UIN.
Downloads the associated PDF files and saves metadata as a CSV.
Args:
insurer (str): Name of the insurance provider.
UIN (str): Unique Identification Number for the insurance product.
results (int): Number of search results to fetch.
save_path (str): Local directory path where documents will be downloaded.
Returns:
pd.DataFrame: DataFrame containing metadata of the downloaded documents.
"""
# Construct the URL for IRDAI document search with filters applied
url = (
f'https://irdai.gov.in/health-insurance-products'
f'?p_p_id=com_irdai_document_media_IRDAIDocumentMediaPortlet'
f'&p_p_lifecycle=0&p_p_state=normal&p_p_mode=view'
f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_filterInsurer={insurer}'
f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_filterUIN={UIN}'
f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_filterApprovalDateFrom=01%2F01%2F2020'
f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_resetCur=false'
f'&_com_irdai_document_media_IRDAIDocumentMediaPortlet_delta={results}'
)
# Set headers to mimic a browser request
headers = {
"User-Agent": "Mozilla/5.0"
}
# Make a GET request and parse the HTML content
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.content, "html.parser")
# Find the table containing policy data
table = soup.find("table")
if not table:
raise ValueError("No table found – the content structure may have changed.")
# Extract all rows in the table
rows = table.find_all("tr")
data = []
# Extract column headers and append additional metadata columns
header_row = rows[0]
header_cols = [th.text.strip() for th in header_row.find_all("th")]
header_cols.append("Document URL")
header_cols.append("Document Name")
# Parse each row to extract text data and document link info
for row in rows[1:-1]:
cols = row.find_all("td")
text_data = [ele.text.strip() for ele in cols]
# Extract the document link and name from the relevant column
doc_col = cols[7]
link_tag = doc_col.find("a")
href = link_tag['href'] if link_tag and 'href' in link_tag.attrs else None
doc_name = link_tag.text.strip() if link_tag else None
text_data.append(href)
text_data.append(doc_name)
data.append(text_data)
# Create a DataFrame from the extracted data
df = pd.DataFrame(data, columns=header_cols)
# Remove the directory if it already exists to avoid old file conflicts
try:
shutil.rmtree(save_path)
except FileNotFoundError:
pass # Ignore if directory does not exist
# Create directory for saving documents
os.makedirs(save_path, exist_ok=True)
# Download each document using the extracted URLs
for index, row in df.iterrows():
document_url = row['Document URL']
if document_url:
try:
# Stream download to avoid loading entire file in memory
response = requests.get(document_url, stream=True, headers=headers)
response.raise_for_status()
# Construct filename using UIN and save to file
filename = row['UIN'] + '.pdf'
filepath = os.path.join(save_path, filename)
with open(filepath, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
# Uncomment to log downloaded files
# print(f"Downloaded: {filename}")
except requests.exceptions.RequestException as e:
print(f"Error downloading {document_url}: {e}")
else:
# Skip rows without a valid document link
print(f"Skipping row {index}: No document URL found.")
# Save the DataFrame with document metadata to a CSV file
csv_file_path = './policy_documents_metadata.csv'
df.to_csv(csv_file_path, index=False)
print('./policy_documents_metadata.csv has been saved')
return df
async def create_vDB(doc_path, vDB_path, vDB_colection, embedding_model):
"""
Asynchronously creates a vector database (vDB) using ChromaDB and stores embedded document data.
Args:
doc_path (str): Path to the folder containing input documents.
vDB_path (str): Directory path for storing the persistent ChromaDB vector database.
vDB_colection (str): Name of the vector collection inside ChromaDB.
embedding_model (str): Name of the HuggingFace model used for embedding text.
Returns:
ChromaVectorStore: An instance of the vector store containing embedded document nodes.
"""
# Load all documents from the specified directory
documents = SimpleDirectoryReader(doc_path).load_data()
# Add 'UIN' metadata to each document using the filename (excluding extension)
for doc in documents:
doc.metadata['UIN'] = doc.metadata['file_name'][:-4]
# Parse documents into hierarchical nodes for structured semantic representation
node_parser = HierarchicalNodeParser.from_defaults()
nodes = node_parser.get_nodes_from_documents(documents)
# Create a persistent Chroma client using the specified vector DB path
db = chromadb.PersistentClient(path=vDB_path)
# Remove the existing collection if it exists (for a fresh start)
try:
db.delete_collection(name=vDB_colection)
except Exception as e:
pass # Ignore errors if the collection does not exist
# Create or retrieve a vector collection in ChromaDB
chroma_collection = db.get_or_create_collection(name=vDB_colection)
# Initialize the Chroma-based vector store
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
# Set up an ingestion pipeline that includes HuggingFace embedding transformation
pipeline = IngestionPipeline(
transformations=[
HuggingFaceEmbedding(model_name=embedding_model),
],
vector_store=vector_store,
)
# Set batch size to control memory usage during ingestion
BATCH_SIZE = 1000
# Asynchronously ingest nodes into the vector store in batches
async def ingest_in_batches(nodes):
for i in range(0, len(nodes), BATCH_SIZE):
batch = nodes[i:i + BATCH_SIZE]
print(f"Ingesting batch {i // BATCH_SIZE + 1} ({len(batch)} nodes)...")
await pipeline.arun(nodes=batch)
# Run the batch ingestion process
await ingest_in_batches(nodes)
# Return the vector store instance for further querying or retrieval
return vector_store
def create_query_engine(UIN, embedding_model, vector_store, similarity_top_k, llm_model, api_key):
"""
Creates a RetrieverQueryEngine that performs filtered semantic search and generates responses using an LLM.
Args:
UIN (str): Unique Identification Number used to filter relevant documents.
embedding_model (str): Name of the HuggingFace model used for embedding text.
vector_store (ChromaVectorStore): Pre-built vector store containing embedded documents.
similarity_top_k (int): Number of most semantically similar nodes to retrieve.
llm_model (str): Name of the language model served via OpenRouter for generating responses.
api_key (str): API key for accessing the OpenRouter platform.
Returns:
RetrieverQueryEngine: A query engine capable of semantic search and LLM-powered response generation.
"""
# Build a vector index from the existing vector store using the specified embedding model
index = VectorStoreIndex.from_vector_store(
vector_store=vector_store,
embed_model=HuggingFaceEmbedding(model_name=embedding_model)
)
# Define metadata filters to limit search results to documents matching the specified UIN
filters = MetadataFilters(
filters=[
ExactMatchFilter(key="UIN", value=UIN)
]
)
# Create a retriever that uses both vector similarity and metadata filters
retriever = VectorIndexRetriever(
index=index,
filters=filters,
similarity_top_k=similarity_top_k # Retrieve top x most semantically similar nodes
)
# Initialize the LLM from OpenRouter using the specified model name
llm = OpenRouter(
api_key=api_key,
model=llm_model,
)
# Create a response synthesizer that leverages the LLM to answer user queries
response_synthesizer = get_response_synthesizer(llm=llm)
# Set up the complete query engine by combining retriever and response synthesizer
query_engine = RetrieverQueryEngine(
retriever=retriever,
response_synthesizer=response_synthesizer
)
return query_engine
def archive_vdb(vdb_path, archive_path):
"""
Archives the vDB (vector database) directory into a ZIP file.
Args:
vdb_path (str): Path to the directory containing the vector database to archive.
archive_path (str): Full path (including .zip extension) where the archive will be saved.
Returns:
None
"""
try:
# Create a ZIP archive of the vDB directory
# shutil.make_archive requires the archive path without the extension
shutil.make_archive(archive_path[:-4], 'zip', vdb_path) # Remove .zip before archiving
print(f"vDB successfully archived to {archive_path}")
except FileNotFoundError:
# Handle case where vDB path does not exist
print(f"Error: vDB directory not found at {vdb_path}")
except Exception as e:
# Catch-all for any unexpected errors during archiving
print(f"An error occurred during archiving: {e}")
def load_vdb_from_archive(archive_path, vdb_path, collection):
"""
Extracts and loads a Chroma-based vector database (vDB) from a ZIP archive.
Args:
archive_path (str): Full path to the ZIP archive containing the vDB.
vdb_path (str): Destination directory where the archive contents will be extracted.
collection (str): Name of the Chroma collection within the vDB.
Returns:
ChromaVectorStore or None: A vector store object ready for use, or None if loading fails.
"""
try:
# Extract the archive to the specified vdb_path directory
shutil.unpack_archive(archive_path, vdb_path)
print(f"vDB archive extracted to {vdb_path}")
# Initialize a persistent ChromaDB client from the extracted directory
db = chromadb.PersistentClient(path=vdb_path)
# Retrieve or create the 'IRDAI' collection from the ChromaDB
chroma_collection = db.get_or_create_collection(name=collection)
# Wrap the Chroma collection in a ChromaVectorStore object for use with LlamaIndex
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
print("ChromaDB loaded successfully from archive.")
return vector_store
except FileNotFoundError:
# Handle case where the ZIP archive does not exist
print(f"Error: vDB archive not found at {archive_path}")
return None
except Exception as e:
# Catch-all for any unexpected errors during extraction or loading
print(f"An error occurred during loading: {e}")
return None