# create_granular_chunks.py (place this in root directory) import json import re import hashlib from typing import List, Dict, Any, Set import tiktoken def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int: """Count tokens using tiktoken.""" try: encoding = tiktoken.encoding_for_model(model) return len(encoding.encode(text)) except Exception: # Fallback to simple word-based estimation return len(text.split()) * 1.3 def extract_financial_keywords(text: str) -> List[str]: """Extract financial keywords from text.""" financial_patterns = [ r'₹[\d,]+(?:\.\d{1,2})?(?:\s*(?:crore|lakh|thousand))?', r'\b(?:budget|cost|expenditure|estimate|payment|procurement)\b', r'\b(?:tender|contract|purchase|award)\b', r'\b(?:crore|lakh|thousand)\b' ] keywords = set() for pattern in financial_patterns: matches = re.findall(pattern, text, re.IGNORECASE) keywords.update(matches) return list(keywords)[:10] # Limit to 10 keywords def extract_authority_keywords(text: str) -> List[str]: """Extract authority/designation keywords from text.""" authority_patterns = [ r'\b(?:D\([TPF]\)|ED|CGM|GM|DGM|Sr\.?\s*M(?:anager)?)\b', r'\b(?:Director|Manager|Chief|Head)\b', r'\b(?:CMD|BOD|HOP|HOD|HOF)\b', r'\b(?:approval|sanction|delegation|authority|power)\b' ] keywords = set() for pattern in authority_patterns: matches = re.findall(pattern, text, re.IGNORECASE) keywords.update(matches) return list(keywords)[:10] # Limit to 10 keywords def create_chunk_text_from_item(item: Dict) -> str: """Create comprehensive chunk text from a single item.""" parts = [] # Add section and title context if item.get('section'): parts.append(f"Regarding the policy '{item.get('title', 'Unknown')}' under section '{item['section']}':") # Add main description if item.get('description'): parts.append(item['description']) # Add items if present if item.get('items'): if len(item['items']) == 1: parts.append(f"This covers: {item['items'][0]}") else: parts.append("This covers the following:") for i, sub_item in enumerate(item['items'], 1): parts.append(f"{i}. {sub_item}") # Add delegation information if item.get('delegation'): parts.append("Authority delegation:") for role, limit in item['delegation'].items(): if limit and limit != "NIL": parts.append(f"- {role}: {limit}") # Add subclauses if item.get('subclauses'): parts.append("This includes:") for subclause in item['subclauses']: if subclause.get('description'): parts.append(f"• {subclause['description']}") if subclause.get('delegation'): for role, limit in subclause['delegation'].items(): if limit and limit != "NIL": parts.append(f" - {role}: {limit}") # Add methods (for complex delegation structures) if item.get('methods'): for method in item['methods']: if method.get('delegation'): parts.append(f"For {method.get('method', 'this method')}:") for role, limit in method['delegation'].items(): if limit and limit != "NIL": parts.append(f"- {role}: {limit}") # Add remarks if item.get('remarks'): parts.append("Important notes:") if isinstance(item['remarks'], list): for remark in item['remarks']: if isinstance(remark, str): parts.append(f"• {remark}") elif isinstance(item['remarks'], str): parts.append(f"• {item['remarks']}") return " ".join(parts) def split_into_token_chunks(text: str, max_tokens: int = 400, overlap_tokens: int = 50) -> List[str]: """Split text into chunks based on token count.""" sentences = re.split(r'[.!?]\s+', text) chunks = [] current_chunk = "" current_tokens = 0 for sentence in sentences: sentence = sentence.strip() if not sentence: continue sentence_tokens = count_tokens(sentence) # If adding this sentence would exceed max_tokens, finalize current chunk if current_tokens + sentence_tokens > max_tokens and current_chunk: chunks.append(current_chunk.strip()) # Start new chunk with overlap if overlap_tokens > 0 and chunks: overlap_text = current_chunk[-overlap_tokens*5:] # Rough overlap estimation current_chunk = overlap_text + " " + sentence else: current_chunk = sentence current_tokens = count_tokens(current_chunk) else: current_chunk += (" " if current_chunk else "") + sentence current_tokens += sentence_tokens # Add the last chunk if it has content if current_chunk.strip(): chunks.append(current_chunk.strip()) return chunks def create_chunk_hash(text: str) -> str: """Create a hash of the chunk text for deduplication.""" return hashlib.md5(text.encode('utf-8')).hexdigest()[:12] def process_jsonl_file(file_path: str, output_path: str): """Process the JSONL file and create granular chunks.""" print(f"Starting to process '{file_path}' with token-based chunking and keyword enhancement...") all_chunks = [] chunk_hashes = set() # For deduplication chunk_id_counter = 1 try: with open(file_path, 'r', encoding='utf-8') as file: for line_num, line in enumerate(file, 1): try: item = json.loads(line.strip()) # Create comprehensive text from the item chunk_text = create_chunk_text_from_item(item) if not chunk_text.strip(): continue # Split into token-based chunks text_chunks = split_into_token_chunks(chunk_text) for i, chunk in enumerate(text_chunks): if not chunk.strip(): continue # Check for duplicates chunk_hash = create_chunk_hash(chunk) if chunk_hash in chunk_hashes: continue chunk_hashes.add(chunk_hash) # Extract keywords financial_keywords = extract_financial_keywords(chunk) authority_keywords = extract_authority_keywords(chunk) # Create chunk object chunk_obj = { 'id': f'chunk-{chunk_id_counter}', 'text': chunk, 'metadata': { 'section': item.get('section', ''), 'clause': item.get('clause', ''), 'title': item.get('title', ''), 'chunk_index': i, 'source_line': line_num, 'financial_keywords': financial_keywords, 'authority_keywords': authority_keywords, 'token_count': count_tokens(chunk) } } all_chunks.append(chunk_obj) chunk_id_counter += 1 except json.JSONDecodeError as e: print(f"Warning: Invalid JSON on line {line_num}: {e}") continue except FileNotFoundError: print(f"Error: File '{file_path}' not found.") return except Exception as e: print(f"Error reading file: {e}") return print(f"Generated {len(all_chunks)} chunks before deduplication.") print(f"{len(chunk_hashes)} unique chunks after deduplication.") # Write chunks to output file try: with open(output_path, 'w', encoding='utf-8') as output_file: for chunk in all_chunks: json.dump(chunk, output_file, ensure_ascii=False) output_file.write('\n') print(f"Successfully wrote improved granular chunks to '{output_path}'.") print(f"Sample chunk structure:") if all_chunks: sample = all_chunks[0] print(f" ID: {sample['id']}") print(f" Text length: {len(sample['text'])} chars") print(f" Section: {sample['metadata']['section']}") print(f" Financial keywords: {sample['metadata']['financial_keywords'][:3]}...") print(f" Token count: {sample['metadata']['token_count']}") except Exception as e: print(f"Error writing output file: {e}") if __name__ == "__main__": input_file = "combined_context.jsonl" output_file = "granular_chunks_final.jsonl" process_jsonl_file(input_file, output_file)