""" LexiBot Data Ingestion Script with Context Injection Processes legal documents and uploads to Pinecone with Act-prefixed chunks. CRITICAL: This script implements "Context Injection" to solve the section overlap issue where queries about "Section 3" would confuse sections from different Acts. Uses Pinecone's built-in embeddings to avoid Google API rate limits. """ import os import re import time from pathlib import Path from typing import List, Dict, Any from dotenv import load_dotenv from langchain.schema import Document from langchain_pinecone import PineconeEmbeddings, PineconeVectorStore from pinecone import Pinecone, ServerlessSpec load_dotenv() # Configuration GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "lexibot-legal-docs") RAW_DATA_DIR = "./RawData" # Act name mapping from filename ACT_NAME_MAP = { "Consumer-Protection-Act.txt": "Consumer Protection Act, 2019", "IPC-SECTIONS-FOR-HARRASMENT.txt": "Indian Penal Code (Harassment Sections)", "Motor-Vehicles-Act.txt": "Motor Vehicles (Amendment) Act, 2019", "The-Proctection-Of-Women-From-Domestic-Violence.txt": "Protection of Women from Domestic Violence Act, 2005", "The-Protection-Of-Children-From-Sexual-Offences.txt": "Protection of Children from Sexual Offences Act, 2012", "The-sexual-harassment-of-women-at-workplace.txt": "Sexual Harassment of Women at Workplace Act, 2013" } def clean_government_headers(text: str) -> str: """Remove standardized government document headers and noise.""" # Remove ministry headers patterns_to_remove = [ r"MINISTRY OF LAW AND JUSTICE.*?(?=\n\n|\d+\.)", r"\(Legislative Department\)", r"New Delhi, the \d+.*?Saka\)", r"The following Act of Parliament.*?information:—", r"\[.*?\d{4}\.\]", ] for pattern in patterns_to_remove: text = re.sub(pattern, "", text, flags=re.DOTALL | re.IGNORECASE) return text.strip() def split_by_legal_sections(text: str) -> List[Dict[str, str]]: """ Split text by legal sections using the pattern: Start of line + Number (with optional letter suffix) + . Returns list of dicts with section_number and content. CRITICAL: Uses alphanumeric pattern to catch sections like "215D." or "354A." which are common in Motor Vehicles Act and IPC. """ # Pattern matches: beginning of line, one or more digits, optional letters, followed by period # Examples: "10.", "215D.", "354A.", "182B." section_pattern = r"^(\d+[A-Za-z]*)\.\s+" sections = [] current_section = None current_content = [] for line in text.split("\n"): match = re.match(section_pattern, line) if match: # Save previous section if exists if current_section is not None: sections.append({ "section_number": current_section, "content": "\n".join(current_content).strip() }) current_section = match.group(1) current_content = [line] else: current_content.append(line) # Don't forget the last section if current_section is not None: sections.append({ "section_number": current_section, "content": "\n".join(current_content).strip() }) return sections def create_context_injected_chunks( filename: str, sections: List[Dict[str, str]], max_chunk_size: int = 1500 ) -> List[Document]: """ Create LangChain Document objects with Context Injection. CRITICAL: Prepends Act Name to every chunk to solve the section overlap issue. Format: Act: Motor Vehicles Act, 2019 Section: 3 Content: ...driving license... """ act_name = ACT_NAME_MAP.get(filename, filename.replace("-", " ").replace(".txt", "")) documents = [] for section in sections: section_num = section["section_number"] content = section["content"] # Skip very short sections (likely noise) if len(content) < 50: continue # Context Injection: Prepend Act and Section info injected_content = f"""Act: {act_name} Section: {section_num} Content: {content}""" # If content is too long, split into smaller chunks if len(injected_content) > max_chunk_size: # Split large sections while maintaining context header header = f"Act: {act_name}\nSection: {section_num} (continued)\nContent: " remaining = content chunk_idx = 0 while remaining: chunk_size = max_chunk_size - len(header) chunk_text = remaining[:chunk_size] remaining = remaining[chunk_size:] doc = Document( page_content=header + chunk_text, metadata={ "source": filename, "act_name": act_name, "section_number": section_num, "chunk_index": chunk_idx, "type": "legal_section" } ) documents.append(doc) chunk_idx += 1 else: doc = Document( page_content=injected_content, metadata={ "source": filename, "act_name": act_name, "section_number": section_num, "chunk_index": 0, "type": "legal_section" } ) documents.append(doc) return documents def process_all_documents() -> List[Document]: """Process all legal documents in RawData directory.""" all_documents = [] raw_data_path = Path(RAW_DATA_DIR) for txt_file in raw_data_path.glob("*.txt"): print(f"šŸ“„ Processing: {txt_file.name}") with open(txt_file, "r", encoding="utf-8") as f: raw_text = f.read() # Clean headers cleaned_text = clean_government_headers(raw_text) # Split by legal sections sections = split_by_legal_sections(cleaned_text) print(f" Found {len(sections)} sections") # Create context-injected chunks documents = create_context_injected_chunks(txt_file.name, sections) print(f" Created {len(documents)} chunks") all_documents.extend(documents) return all_documents def initialize_pinecone(): """Initialize Pinecone client and create index if needed.""" pc = Pinecone(api_key=PINECONE_API_KEY) # Check if index exists existing_indexes = [idx.name for idx in pc.list_indexes()] if PINECONE_INDEX_NAME not in existing_indexes: print(f"šŸ”§ Creating Pinecone index: {PINECONE_INDEX_NAME}") pc.create_index( name=PINECONE_INDEX_NAME, dimension=768, # Google embedding-001 dimension metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) ) else: print(f"āœ… Using existing Pinecone index: {PINECONE_INDEX_NAME}") return pc.Index(PINECONE_INDEX_NAME) def main(): """Main ingestion pipeline.""" print("šŸš€ LexiBot Data Ingestion with Context Injection") print("=" * 50) # Validate environment if not PINECONE_API_KEY: raise ValueError("PINECONE_API_KEY not set in environment") # Process documents documents = process_all_documents() print(f"\nšŸ“š Total documents to ingest: {len(documents)}") if len(documents) == 0: print("āŒ No documents found. Check RawData directory.") return # Initialize Pinecone embeddings (FREE - no rate limits!) print("\nšŸ”— Initializing Pinecone Embeddings (multilingual-e5-large)...") embeddings = PineconeEmbeddings( model="multilingual-e5-large", pinecone_api_key=PINECONE_API_KEY ) # Initialize Pinecone index (dimension 1024 for multilingual-e5-large) print("🌲 Initializing Pinecone...") pc = Pinecone(api_key=PINECONE_API_KEY) # Check if index exists existing_indexes = [idx.name for idx in pc.list_indexes()] if PINECONE_INDEX_NAME not in existing_indexes: print(f"šŸ”§ Creating Pinecone index: {PINECONE_INDEX_NAME}") pc.create_index( name=PINECONE_INDEX_NAME, dimension=1024, # multilingual-e5-large dimension metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) ) # Wait for index to be ready print(" ā³ Waiting for index to be ready...") time.sleep(10) else: print(f"āœ… Using existing Pinecone index: {PINECONE_INDEX_NAME}") # Upload to Pinecone in batches print("\nā¬†ļø Uploading to Pinecone Vector Store...") BATCH_SIZE = 100 # Pinecone embeddings have no rate limits total_batches = (len(documents) + BATCH_SIZE - 1) // BATCH_SIZE for i in range(0, len(documents), BATCH_SIZE): batch = documents[i:i + BATCH_SIZE] batch_num = (i // BATCH_SIZE) + 1 print(f" šŸ“¦ Uploading batch {batch_num}/{total_batches} ({len(batch)} documents)...") if i == 0: # First batch creates the vector store vectorstore = PineconeVectorStore.from_documents( documents=batch, embedding=embeddings, index_name=PINECONE_INDEX_NAME ) else: # Subsequent batches add to existing vectorstore.add_documents(batch) print("\nāœ… Ingestion Complete!") print(f" Index: {PINECONE_INDEX_NAME}") print(f" Documents: {len(documents)}") print("\nšŸ’” You can now start the API with: uvicorn app:app --reload --port 7860") if __name__ == "__main__": main()