lexibot-api / ingest_optimized.py
Mrigank005's picture
Upload 12 files
56e1ad9 verified
"""
LexiBot Data Ingestion Script with Context Injection
Processes legal documents and uploads to Pinecone with Act-prefixed chunks.
CRITICAL: This script implements "Context Injection" to solve the section overlap issue
where queries about "Section 3" would confuse sections from different Acts.
Uses Pinecone's built-in embeddings to avoid Google API rate limits.
"""
import os
import re
import time
from pathlib import Path
from typing import List, Dict, Any
from dotenv import load_dotenv
from langchain.schema import Document
from langchain_pinecone import PineconeEmbeddings, PineconeVectorStore
from pinecone import Pinecone, ServerlessSpec
load_dotenv()
# Configuration
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "lexibot-legal-docs")
RAW_DATA_DIR = "./RawData"
# Act name mapping from filename
ACT_NAME_MAP = {
"Consumer-Protection-Act.txt": "Consumer Protection Act, 2019",
"IPC-SECTIONS-FOR-HARRASMENT.txt": "Indian Penal Code (Harassment Sections)",
"Motor-Vehicles-Act.txt": "Motor Vehicles (Amendment) Act, 2019",
"The-Proctection-Of-Women-From-Domestic-Violence.txt": "Protection of Women from Domestic Violence Act, 2005",
"The-Protection-Of-Children-From-Sexual-Offences.txt": "Protection of Children from Sexual Offences Act, 2012",
"The-sexual-harassment-of-women-at-workplace.txt": "Sexual Harassment of Women at Workplace Act, 2013"
}
def clean_government_headers(text: str) -> str:
"""Remove standardized government document headers and noise."""
# Remove ministry headers
patterns_to_remove = [
r"MINISTRY OF LAW AND JUSTICE.*?(?=\n\n|\d+\.)",
r"\(Legislative Department\)",
r"New Delhi, the \d+.*?Saka\)",
r"The following Act of Parliament.*?information:β€”",
r"\[.*?\d{4}\.\]",
]
for pattern in patterns_to_remove:
text = re.sub(pattern, "", text, flags=re.DOTALL | re.IGNORECASE)
return text.strip()
def split_by_legal_sections(text: str) -> List[Dict[str, str]]:
"""
Split text by legal sections using the pattern: Start of line + Number (with optional letter suffix) + .
Returns list of dicts with section_number and content.
CRITICAL: Uses alphanumeric pattern to catch sections like "215D." or "354A."
which are common in Motor Vehicles Act and IPC.
"""
# Pattern matches: beginning of line, one or more digits, optional letters, followed by period
# Examples: "10.", "215D.", "354A.", "182B."
section_pattern = r"^(\d+[A-Za-z]*)\.\s+"
sections = []
current_section = None
current_content = []
for line in text.split("\n"):
match = re.match(section_pattern, line)
if match:
# Save previous section if exists
if current_section is not None:
sections.append({
"section_number": current_section,
"content": "\n".join(current_content).strip()
})
current_section = match.group(1)
current_content = [line]
else:
current_content.append(line)
# Don't forget the last section
if current_section is not None:
sections.append({
"section_number": current_section,
"content": "\n".join(current_content).strip()
})
return sections
def create_context_injected_chunks(
filename: str,
sections: List[Dict[str, str]],
max_chunk_size: int = 1500
) -> List[Document]:
"""
Create LangChain Document objects with Context Injection.
CRITICAL: Prepends Act Name to every chunk to solve the section overlap issue.
Format:
Act: Motor Vehicles Act, 2019
Section: 3
Content: ...driving license...
"""
act_name = ACT_NAME_MAP.get(filename, filename.replace("-", " ").replace(".txt", ""))
documents = []
for section in sections:
section_num = section["section_number"]
content = section["content"]
# Skip very short sections (likely noise)
if len(content) < 50:
continue
# Context Injection: Prepend Act and Section info
injected_content = f"""Act: {act_name}
Section: {section_num}
Content: {content}"""
# If content is too long, split into smaller chunks
if len(injected_content) > max_chunk_size:
# Split large sections while maintaining context header
header = f"Act: {act_name}\nSection: {section_num} (continued)\nContent: "
remaining = content
chunk_idx = 0
while remaining:
chunk_size = max_chunk_size - len(header)
chunk_text = remaining[:chunk_size]
remaining = remaining[chunk_size:]
doc = Document(
page_content=header + chunk_text,
metadata={
"source": filename,
"act_name": act_name,
"section_number": section_num,
"chunk_index": chunk_idx,
"type": "legal_section"
}
)
documents.append(doc)
chunk_idx += 1
else:
doc = Document(
page_content=injected_content,
metadata={
"source": filename,
"act_name": act_name,
"section_number": section_num,
"chunk_index": 0,
"type": "legal_section"
}
)
documents.append(doc)
return documents
def process_all_documents() -> List[Document]:
"""Process all legal documents in RawData directory."""
all_documents = []
raw_data_path = Path(RAW_DATA_DIR)
for txt_file in raw_data_path.glob("*.txt"):
print(f"πŸ“„ Processing: {txt_file.name}")
with open(txt_file, "r", encoding="utf-8") as f:
raw_text = f.read()
# Clean headers
cleaned_text = clean_government_headers(raw_text)
# Split by legal sections
sections = split_by_legal_sections(cleaned_text)
print(f" Found {len(sections)} sections")
# Create context-injected chunks
documents = create_context_injected_chunks(txt_file.name, sections)
print(f" Created {len(documents)} chunks")
all_documents.extend(documents)
return all_documents
def initialize_pinecone():
"""Initialize Pinecone client and create index if needed."""
pc = Pinecone(api_key=PINECONE_API_KEY)
# Check if index exists
existing_indexes = [idx.name for idx in pc.list_indexes()]
if PINECONE_INDEX_NAME not in existing_indexes:
print(f"πŸ”§ Creating Pinecone index: {PINECONE_INDEX_NAME}")
pc.create_index(
name=PINECONE_INDEX_NAME,
dimension=768, # Google embedding-001 dimension
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
else:
print(f"βœ… Using existing Pinecone index: {PINECONE_INDEX_NAME}")
return pc.Index(PINECONE_INDEX_NAME)
def main():
"""Main ingestion pipeline."""
print("πŸš€ LexiBot Data Ingestion with Context Injection")
print("=" * 50)
# Validate environment
if not PINECONE_API_KEY:
raise ValueError("PINECONE_API_KEY not set in environment")
# Process documents
documents = process_all_documents()
print(f"\nπŸ“š Total documents to ingest: {len(documents)}")
if len(documents) == 0:
print("❌ No documents found. Check RawData directory.")
return
# Initialize Pinecone embeddings (FREE - no rate limits!)
print("\nπŸ”— Initializing Pinecone Embeddings (multilingual-e5-large)...")
embeddings = PineconeEmbeddings(
model="multilingual-e5-large",
pinecone_api_key=PINECONE_API_KEY
)
# Initialize Pinecone index (dimension 1024 for multilingual-e5-large)
print("🌲 Initializing Pinecone...")
pc = Pinecone(api_key=PINECONE_API_KEY)
# Check if index exists
existing_indexes = [idx.name for idx in pc.list_indexes()]
if PINECONE_INDEX_NAME not in existing_indexes:
print(f"πŸ”§ Creating Pinecone index: {PINECONE_INDEX_NAME}")
pc.create_index(
name=PINECONE_INDEX_NAME,
dimension=1024, # multilingual-e5-large dimension
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
# Wait for index to be ready
print(" ⏳ Waiting for index to be ready...")
time.sleep(10)
else:
print(f"βœ… Using existing Pinecone index: {PINECONE_INDEX_NAME}")
# Upload to Pinecone in batches
print("\n⬆️ Uploading to Pinecone Vector Store...")
BATCH_SIZE = 100 # Pinecone embeddings have no rate limits
total_batches = (len(documents) + BATCH_SIZE - 1) // BATCH_SIZE
for i in range(0, len(documents), BATCH_SIZE):
batch = documents[i:i + BATCH_SIZE]
batch_num = (i // BATCH_SIZE) + 1
print(f" πŸ“¦ Uploading batch {batch_num}/{total_batches} ({len(batch)} documents)...")
if i == 0:
# First batch creates the vector store
vectorstore = PineconeVectorStore.from_documents(
documents=batch,
embedding=embeddings,
index_name=PINECONE_INDEX_NAME
)
else:
# Subsequent batches add to existing
vectorstore.add_documents(batch)
print("\nβœ… Ingestion Complete!")
print(f" Index: {PINECONE_INDEX_NAME}")
print(f" Documents: {len(documents)}")
print("\nπŸ’‘ You can now start the API with: uvicorn app:app --reload --port 7860")
if __name__ == "__main__":
main()