Spaces:
Running
Running
| """ | |
| LexiBot Data Ingestion Script with Context Injection | |
| Processes legal documents and uploads to Pinecone with Act-prefixed chunks. | |
| CRITICAL: This script implements "Context Injection" to solve the section overlap issue | |
| where queries about "Section 3" would confuse sections from different Acts. | |
| Uses Pinecone's built-in embeddings to avoid Google API rate limits. | |
| """ | |
| import os | |
| import re | |
| import time | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| from dotenv import load_dotenv | |
| from langchain.schema import Document | |
| from langchain_pinecone import PineconeEmbeddings, PineconeVectorStore | |
| from pinecone import Pinecone, ServerlessSpec | |
| load_dotenv() | |
| # Configuration | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
| PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "lexibot-legal-docs") | |
| RAW_DATA_DIR = "./RawData" | |
| # Act name mapping from filename | |
| ACT_NAME_MAP = { | |
| "Consumer-Protection-Act.txt": "Consumer Protection Act, 2019", | |
| "IPC-SECTIONS-FOR-HARRASMENT.txt": "Indian Penal Code (Harassment Sections)", | |
| "Motor-Vehicles-Act.txt": "Motor Vehicles (Amendment) Act, 2019", | |
| "The-Proctection-Of-Women-From-Domestic-Violence.txt": "Protection of Women from Domestic Violence Act, 2005", | |
| "The-Protection-Of-Children-From-Sexual-Offences.txt": "Protection of Children from Sexual Offences Act, 2012", | |
| "The-sexual-harassment-of-women-at-workplace.txt": "Sexual Harassment of Women at Workplace Act, 2013" | |
| } | |
| def clean_government_headers(text: str) -> str: | |
| """Remove standardized government document headers and noise.""" | |
| # Remove ministry headers | |
| patterns_to_remove = [ | |
| r"MINISTRY OF LAW AND JUSTICE.*?(?=\n\n|\d+\.)", | |
| r"\(Legislative Department\)", | |
| r"New Delhi, the \d+.*?Saka\)", | |
| r"The following Act of Parliament.*?information:β", | |
| r"\[.*?\d{4}\.\]", | |
| ] | |
| for pattern in patterns_to_remove: | |
| text = re.sub(pattern, "", text, flags=re.DOTALL | re.IGNORECASE) | |
| return text.strip() | |
| def split_by_legal_sections(text: str) -> List[Dict[str, str]]: | |
| """ | |
| Split text by legal sections using the pattern: Start of line + Number (with optional letter suffix) + . | |
| Returns list of dicts with section_number and content. | |
| CRITICAL: Uses alphanumeric pattern to catch sections like "215D." or "354A." | |
| which are common in Motor Vehicles Act and IPC. | |
| """ | |
| # Pattern matches: beginning of line, one or more digits, optional letters, followed by period | |
| # Examples: "10.", "215D.", "354A.", "182B." | |
| section_pattern = r"^(\d+[A-Za-z]*)\.\s+" | |
| sections = [] | |
| current_section = None | |
| current_content = [] | |
| for line in text.split("\n"): | |
| match = re.match(section_pattern, line) | |
| if match: | |
| # Save previous section if exists | |
| if current_section is not None: | |
| sections.append({ | |
| "section_number": current_section, | |
| "content": "\n".join(current_content).strip() | |
| }) | |
| current_section = match.group(1) | |
| current_content = [line] | |
| else: | |
| current_content.append(line) | |
| # Don't forget the last section | |
| if current_section is not None: | |
| sections.append({ | |
| "section_number": current_section, | |
| "content": "\n".join(current_content).strip() | |
| }) | |
| return sections | |
| def create_context_injected_chunks( | |
| filename: str, | |
| sections: List[Dict[str, str]], | |
| max_chunk_size: int = 1500 | |
| ) -> List[Document]: | |
| """ | |
| Create LangChain Document objects with Context Injection. | |
| CRITICAL: Prepends Act Name to every chunk to solve the section overlap issue. | |
| Format: | |
| Act: Motor Vehicles Act, 2019 | |
| Section: 3 | |
| Content: ...driving license... | |
| """ | |
| act_name = ACT_NAME_MAP.get(filename, filename.replace("-", " ").replace(".txt", "")) | |
| documents = [] | |
| for section in sections: | |
| section_num = section["section_number"] | |
| content = section["content"] | |
| # Skip very short sections (likely noise) | |
| if len(content) < 50: | |
| continue | |
| # Context Injection: Prepend Act and Section info | |
| injected_content = f"""Act: {act_name} | |
| Section: {section_num} | |
| Content: {content}""" | |
| # If content is too long, split into smaller chunks | |
| if len(injected_content) > max_chunk_size: | |
| # Split large sections while maintaining context header | |
| header = f"Act: {act_name}\nSection: {section_num} (continued)\nContent: " | |
| remaining = content | |
| chunk_idx = 0 | |
| while remaining: | |
| chunk_size = max_chunk_size - len(header) | |
| chunk_text = remaining[:chunk_size] | |
| remaining = remaining[chunk_size:] | |
| doc = Document( | |
| page_content=header + chunk_text, | |
| metadata={ | |
| "source": filename, | |
| "act_name": act_name, | |
| "section_number": section_num, | |
| "chunk_index": chunk_idx, | |
| "type": "legal_section" | |
| } | |
| ) | |
| documents.append(doc) | |
| chunk_idx += 1 | |
| else: | |
| doc = Document( | |
| page_content=injected_content, | |
| metadata={ | |
| "source": filename, | |
| "act_name": act_name, | |
| "section_number": section_num, | |
| "chunk_index": 0, | |
| "type": "legal_section" | |
| } | |
| ) | |
| documents.append(doc) | |
| return documents | |
| def process_all_documents() -> List[Document]: | |
| """Process all legal documents in RawData directory.""" | |
| all_documents = [] | |
| raw_data_path = Path(RAW_DATA_DIR) | |
| for txt_file in raw_data_path.glob("*.txt"): | |
| print(f"π Processing: {txt_file.name}") | |
| with open(txt_file, "r", encoding="utf-8") as f: | |
| raw_text = f.read() | |
| # Clean headers | |
| cleaned_text = clean_government_headers(raw_text) | |
| # Split by legal sections | |
| sections = split_by_legal_sections(cleaned_text) | |
| print(f" Found {len(sections)} sections") | |
| # Create context-injected chunks | |
| documents = create_context_injected_chunks(txt_file.name, sections) | |
| print(f" Created {len(documents)} chunks") | |
| all_documents.extend(documents) | |
| return all_documents | |
| def initialize_pinecone(): | |
| """Initialize Pinecone client and create index if needed.""" | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| # Check if index exists | |
| existing_indexes = [idx.name for idx in pc.list_indexes()] | |
| if PINECONE_INDEX_NAME not in existing_indexes: | |
| print(f"π§ Creating Pinecone index: {PINECONE_INDEX_NAME}") | |
| pc.create_index( | |
| name=PINECONE_INDEX_NAME, | |
| dimension=768, # Google embedding-001 dimension | |
| metric="cosine", | |
| spec=ServerlessSpec( | |
| cloud="aws", | |
| region="us-east-1" | |
| ) | |
| ) | |
| else: | |
| print(f"β Using existing Pinecone index: {PINECONE_INDEX_NAME}") | |
| return pc.Index(PINECONE_INDEX_NAME) | |
| def main(): | |
| """Main ingestion pipeline.""" | |
| print("π LexiBot Data Ingestion with Context Injection") | |
| print("=" * 50) | |
| # Validate environment | |
| if not PINECONE_API_KEY: | |
| raise ValueError("PINECONE_API_KEY not set in environment") | |
| # Process documents | |
| documents = process_all_documents() | |
| print(f"\nπ Total documents to ingest: {len(documents)}") | |
| if len(documents) == 0: | |
| print("β No documents found. Check RawData directory.") | |
| return | |
| # Initialize Pinecone embeddings (FREE - no rate limits!) | |
| print("\nπ Initializing Pinecone Embeddings (multilingual-e5-large)...") | |
| embeddings = PineconeEmbeddings( | |
| model="multilingual-e5-large", | |
| pinecone_api_key=PINECONE_API_KEY | |
| ) | |
| # Initialize Pinecone index (dimension 1024 for multilingual-e5-large) | |
| print("π² Initializing Pinecone...") | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| # Check if index exists | |
| existing_indexes = [idx.name for idx in pc.list_indexes()] | |
| if PINECONE_INDEX_NAME not in existing_indexes: | |
| print(f"π§ Creating Pinecone index: {PINECONE_INDEX_NAME}") | |
| pc.create_index( | |
| name=PINECONE_INDEX_NAME, | |
| dimension=1024, # multilingual-e5-large dimension | |
| metric="cosine", | |
| spec=ServerlessSpec( | |
| cloud="aws", | |
| region="us-east-1" | |
| ) | |
| ) | |
| # Wait for index to be ready | |
| print(" β³ Waiting for index to be ready...") | |
| time.sleep(10) | |
| else: | |
| print(f"β Using existing Pinecone index: {PINECONE_INDEX_NAME}") | |
| # Upload to Pinecone in batches | |
| print("\nβ¬οΈ Uploading to Pinecone Vector Store...") | |
| BATCH_SIZE = 100 # Pinecone embeddings have no rate limits | |
| total_batches = (len(documents) + BATCH_SIZE - 1) // BATCH_SIZE | |
| for i in range(0, len(documents), BATCH_SIZE): | |
| batch = documents[i:i + BATCH_SIZE] | |
| batch_num = (i // BATCH_SIZE) + 1 | |
| print(f" π¦ Uploading batch {batch_num}/{total_batches} ({len(batch)} documents)...") | |
| if i == 0: | |
| # First batch creates the vector store | |
| vectorstore = PineconeVectorStore.from_documents( | |
| documents=batch, | |
| embedding=embeddings, | |
| index_name=PINECONE_INDEX_NAME | |
| ) | |
| else: | |
| # Subsequent batches add to existing | |
| vectorstore.add_documents(batch) | |
| print("\nβ Ingestion Complete!") | |
| print(f" Index: {PINECONE_INDEX_NAME}") | |
| print(f" Documents: {len(documents)}") | |
| print("\nπ‘ You can now start the API with: uvicorn app:app --reload --port 7860") | |
| if __name__ == "__main__": | |
| main() | |