Kalpokoch commited on
Commit
b91ce4b
·
verified ·
1 Parent(s): 3c0b78f

Update create_granular_chunks.py

Browse files
Files changed (1) hide show
  1. create_granular_chunks.py +76 -62
create_granular_chunks.py CHANGED
@@ -5,25 +5,51 @@ import re
5
  from typing import List, Dict, Any
6
  import nltk
7
 
 
 
 
8
  # Download punkt tokenizer if not already done (Ensure this runs once in your environment setup)
9
  nltk.download('punkt')
10
- nltk.download('punkt_tab') # Also download punkt_tab to avoid LookupError
11
 
12
  # --- Configuration ---
13
  INPUT_FILE = "combined_context.jsonl"
14
- OUTPUT_FILE = "granular_chunks_final.jsonl" # Keep filename consistent
15
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
  # --- Global State ---
18
  chunk_counter = 0
19
 
20
-
21
  def get_unique_id() -> str:
22
- """Returns a unique, incrementing ID for each chunk."""
23
  global chunk_counter
24
  chunk_counter += 1
25
  return f"chunk-{chunk_counter}"
26
 
 
 
 
 
 
 
 
 
 
27
 
28
  def create_chunk(context: Dict, text: str) -> Dict:
29
  """Creates a standardized chunk dictionary with rich metadata."""
@@ -33,31 +59,27 @@ def create_chunk(context: Dict, text: str) -> Dict:
33
  "title": context.get("title"),
34
  "source_description": context.get("description"),
35
  }
36
- # Add other primitive metadata keys
37
  for key, value in context.items():
38
  if key not in metadata and isinstance(value, (str, int, float, bool)):
39
  metadata[key] = value
40
 
 
 
 
41
  return {
42
  "id": get_unique_id(),
43
  "text": text.strip(),
44
  "metadata": {k: v for k, v in metadata.items() if v is not None}
45
  }
46
 
47
-
48
  def format_delegation_text(delegation: Any) -> str:
49
- """
50
- Formats a delegation dictionary or string into a readable string.
51
- Explicitly includes "NIL" or "---" to capture no power cases.
52
- """
53
  if not isinstance(delegation, dict):
54
  return str(delegation)
55
- parts = [f"the limit for {auth} is {limit if limit and str(limit) != '---' else 'NIL'}" for auth, limit in delegation.items()]
 
56
  return ", ".join(parts) if parts else "No specific delegation provided."
57
 
58
-
59
  def format_remarks(remarks: Any) -> str:
60
- """Safely formats the 'remarks' field, handling various data types."""
61
  if isinstance(remarks, list):
62
  remark_parts = []
63
  for item in remarks:
@@ -69,21 +91,13 @@ def format_remarks(remarks: Any) -> str:
69
  return " ".join(remark_parts)
70
  return str(remarks)
71
 
72
-
73
  def build_descriptive_text(context: Dict) -> str:
74
- """
75
- Builds a clear, descriptive, natural language text by combining fields.
76
- Focused for best relevance and contextual richness.
77
- """
78
  text_parts = []
79
-
80
  if context.get("title"):
81
  text_parts.append(f"Regarding the policy '{context['title']}'")
82
-
83
  specific_desc = context.get('description') or context.get('method')
84
  if specific_desc and specific_desc != context.get('title'):
85
  text_parts.append(f"specifically for '{specific_desc}'")
86
-
87
  if "delegation" in context:
88
  delegation_text = format_delegation_text(context["delegation"])
89
  text_parts.append(f", financial delegations are: {delegation_text}.")
@@ -96,68 +110,72 @@ def build_descriptive_text(context: Dict) -> str:
96
  else f"the {role} are: {', '.join(members)}")
97
  composition_parts.append(member_text)
98
  text_parts.append(f", the composition is: {'; '.join(composition_parts)}.")
99
-
100
  if "remarks" in context and context["remarks"]:
101
  remarks_text = format_remarks(context["remarks"])
102
  text_parts.append(f" Important remarks include: {remarks_text}")
103
-
104
- # Join all parts into a flowing sentence
105
  return " ".join(text_parts).strip()
106
 
107
-
108
- def split_text_into_chunks(text: str, max_char_length: int = 1500, overlap: int = 200) -> List[str]:
109
- """
110
- Splits a long text into smaller chunks with controlled overlap.
111
- Uses sentence tokenization for natural splits.
112
- """
113
- text = text.strip()
114
- if len(text) <= max_char_length:
115
- return [text]
116
-
117
- # Explicitly specify language to avoid punkt_tab error
118
- sentences = nltk.tokenize.sent_tokenize(text, language='english')
 
 
 
 
 
 
 
 
 
 
 
119
  chunks = []
120
  current_chunk = ""
121
-
122
- for sentence in sentences:
123
- # +1 for space/newline likely added between sentences
124
- if len(current_chunk) + len(sentence) + 1 <= max_char_length:
125
  current_chunk += (" " + sentence) if current_chunk else sentence
 
126
  else:
127
  chunks.append(current_chunk.strip())
128
- # Start next chunk with overlap from end of previous chunk (by characters)
129
- if overlap < len(current_chunk):
130
- current_chunk = current_chunk[-overlap:] + " " + sentence
 
 
131
  else:
132
  current_chunk = sentence
133
-
134
  if current_chunk:
135
  chunks.append(current_chunk.strip())
136
  return chunks
137
 
138
-
139
  def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]:
140
- """
141
- Processes a JSON policy entry and returns granular, context-rich chunks.
142
- Applies recursive traversal and implements chunk size limiting.
143
- """
144
  context = {**(parent_context or {}), **data}
145
  chunks = []
146
 
147
- # Handler 1: Simple Item Lists (ex: rules, exclusions)
148
  list_key = next((key for key in ["items", "exclusions"] if key in data and isinstance(data.get(key), list)), None)
149
  if list_key:
150
  base_title = context.get('title', 'a policy')
151
  for item in data[list_key]:
152
  if isinstance(item, str):
153
- # Build chunk text with clear descriptive prefix for relevance
154
  text = f"A rule regarding '{base_title}' is: {item}."
155
- # Split if too long
156
- for sub_chunk in split_text_into_chunks(text):
157
  chunks.append(create_chunk(context, sub_chunk))
158
  return chunks
159
 
160
- # Handler 2: Recursive traversal for nested dictionaries/lists
161
  has_recursed = False
162
  for key, value in data.items():
163
  if isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
@@ -168,16 +186,13 @@ def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]:
168
  # Handler 3: Leaf nodes with delegation, composition or description
169
  if not has_recursed and ("delegation" in data or "composition" in data or "description" in data):
170
  text = build_descriptive_text(context)
171
- # Split long descriptive text intelligently
172
- for chunk_text in split_text_into_chunks(text):
173
  chunks.append(create_chunk(context, chunk_text))
174
 
175
  return chunks
176
 
177
-
178
  def main():
179
- """Main orchestration to read input, process, and write chunks."""
180
- print(f"Starting to process '{INPUT_FILE}' for improved granular chunking...")
181
  all_chunks = []
182
 
183
  try:
@@ -208,10 +223,9 @@ def main():
208
  # Write output in JSONL format for later vector DB ingestion
209
  with open(OUTPUT_FILE, 'w', encoding='utf-8') as outf:
210
  for chunk in unique_chunks:
211
- outf.write(json.dumps(chunk, ensure_ascii=False) + "\n")
212
 
213
  print(f"Successfully wrote improved granular chunks to '{OUTPUT_FILE}'.")
214
 
215
-
216
  if __name__ == "__main__":
217
  main()
 
5
  from typing import List, Dict, Any
6
  import nltk
7
 
8
+ # --- Tokenizer Import ---
9
+ import tiktoken # pip install tiktoken
10
+
11
  # Download punkt tokenizer if not already done (Ensure this runs once in your environment setup)
12
  nltk.download('punkt')
 
13
 
14
  # --- Configuration ---
15
  INPUT_FILE = "combined_context.jsonl"
16
+ OUTPUT_FILE = "granular_chunks_final.jsonl"
17
+
18
+ # Token-based chunking parameters (typical LLM embedding context ~512 tokens)
19
+ MAX_TOKENS = 400
20
+ OVERLAP_TOKENS = 50
21
+ TOKENIZER_MODEL = "cl100k_base" # use "cl100k_base" for OpenAI, adjust as needed
22
+
23
+ # --- Keyword Enhancement ---
24
+ FINANCIAL_KEYWORDS = [
25
+ "₹", "INR", "crore", "lakh", "limit", "delegation", "expenditure", "budget", "revenue", "capital",
26
+ "surplus", "investment", "write-off", "dividend", "pay", "salary", "contract value"
27
+ ]
28
+ AUTHORITY_KEYWORDS = [
29
+ "CMD", "Chairman", "Board", "Director", "ED", "Executive Director", "CGM", "GM", "DGM", "Sr. M",
30
+ "Manager", "HOD", "Head of Finance", "Finance Head", "Project Head"
31
+ ]
32
+
33
+ def get_encoding():
34
+ return tiktoken.get_encoding(TOKENIZER_MODEL)
35
 
36
  # --- Global State ---
37
  chunk_counter = 0
38
 
 
39
  def get_unique_id() -> str:
 
40
  global chunk_counter
41
  chunk_counter += 1
42
  return f"chunk-{chunk_counter}"
43
 
44
+ def enhance_chunk_with_keywords(text: str, metadata: dict) -> dict:
45
+ """Add keywords (financial and authority) to metadata if present in text."""
46
+ present_financial = [kw for kw in FINANCIAL_KEYWORDS if kw.lower() in text.lower()]
47
+ present_authority = [kw for kw in AUTHORITY_KEYWORDS if kw.lower() in text.lower()]
48
+ if present_financial:
49
+ metadata['financial_keywords'] = present_financial
50
+ if present_authority:
51
+ metadata['authority_keywords'] = present_authority
52
+ return metadata
53
 
54
  def create_chunk(context: Dict, text: str) -> Dict:
55
  """Creates a standardized chunk dictionary with rich metadata."""
 
59
  "title": context.get("title"),
60
  "source_description": context.get("description"),
61
  }
 
62
  for key, value in context.items():
63
  if key not in metadata and isinstance(value, (str, int, float, bool)):
64
  metadata[key] = value
65
 
66
+ # --- Keyword Enhancement ---
67
+ metadata = enhance_chunk_with_keywords(text, metadata)
68
+
69
  return {
70
  "id": get_unique_id(),
71
  "text": text.strip(),
72
  "metadata": {k: v for k, v in metadata.items() if v is not None}
73
  }
74
 
 
75
  def format_delegation_text(delegation: Any) -> str:
 
 
 
 
76
  if not isinstance(delegation, dict):
77
  return str(delegation)
78
+ parts = [f"the limit for {auth} is {limit if limit and str(limit) != '---' else 'NIL'}"
79
+ for auth, limit in delegation.items()]
80
  return ", ".join(parts) if parts else "No specific delegation provided."
81
 
 
82
  def format_remarks(remarks: Any) -> str:
 
83
  if isinstance(remarks, list):
84
  remark_parts = []
85
  for item in remarks:
 
91
  return " ".join(remark_parts)
92
  return str(remarks)
93
 
 
94
  def build_descriptive_text(context: Dict) -> str:
 
 
 
 
95
  text_parts = []
 
96
  if context.get("title"):
97
  text_parts.append(f"Regarding the policy '{context['title']}'")
 
98
  specific_desc = context.get('description') or context.get('method')
99
  if specific_desc and specific_desc != context.get('title'):
100
  text_parts.append(f"specifically for '{specific_desc}'")
 
101
  if "delegation" in context:
102
  delegation_text = format_delegation_text(context["delegation"])
103
  text_parts.append(f", financial delegations are: {delegation_text}.")
 
110
  else f"the {role} are: {', '.join(members)}")
111
  composition_parts.append(member_text)
112
  text_parts.append(f", the composition is: {'; '.join(composition_parts)}.")
 
113
  if "remarks" in context and context["remarks"]:
114
  remarks_text = format_remarks(context["remarks"])
115
  text_parts.append(f" Important remarks include: {remarks_text}")
 
 
116
  return " ".join(text_parts).strip()
117
 
118
+ def count_tokens(text: str) -> int:
119
+ encoding = get_encoding()
120
+ return len(encoding.encode(text))
121
+
122
+ def get_token_overlap(text: str, overlap_tokens: int) -> str:
123
+ """Return the last `overlap_tokens` worth of text from the input string."""
124
+ encoding = get_encoding()
125
+ tokens = encoding.encode(text)
126
+ if len(tokens) <= overlap_tokens:
127
+ return text
128
+ # Decode only the last overlap_tokens tokens
129
+ overlapped = encoding.decode(tokens[-overlap_tokens:])
130
+ # Remove possible split word inconsistencies by finding last complete sentence
131
+ # This is optional: can simply return overlapped
132
+ last_period = overlapped.rfind('.')
133
+ if last_period != -1 and last_period < len(overlapped) - 2:
134
+ return overlapped[last_period+1:].strip()
135
+ return overlapped.strip()
136
+
137
+ def split_text_by_tokens(text: str, max_tokens: int = MAX_TOKENS, overlap_tokens: int = OVERLAP_TOKENS) -> List[str]:
138
+ """Split text into chunks based on token count, with specified overlap."""
139
+ encoding = get_encoding()
140
+ sents = nltk.tokenize.sent_tokenize(text, language='english')
141
  chunks = []
142
  current_chunk = ""
143
+ current_tokens = 0
144
+ for sentence in sents:
145
+ sentence_tokens = len(encoding.encode(sentence))
146
+ if current_tokens + sentence_tokens <= max_tokens:
147
  current_chunk += (" " + sentence) if current_chunk else sentence
148
+ current_tokens += sentence_tokens
149
  else:
150
  chunks.append(current_chunk.strip())
151
+ # Overlap logic
152
+ if overlap_tokens < current_tokens:
153
+ overlap_text = get_token_overlap(current_chunk, overlap_tokens)
154
+ current_chunk = overlap_text + " " + sentence
155
+ current_tokens = len(encoding.encode(current_chunk))
156
  else:
157
  current_chunk = sentence
158
+ current_tokens = sentence_tokens
159
  if current_chunk:
160
  chunks.append(current_chunk.strip())
161
  return chunks
162
 
 
163
  def process_entry(data: Dict, parent_context: Dict = None) -> List[Dict]:
 
 
 
 
164
  context = {**(parent_context or {}), **data}
165
  chunks = []
166
 
167
+ # Handler 1: Simple Item Lists
168
  list_key = next((key for key in ["items", "exclusions"] if key in data and isinstance(data.get(key), list)), None)
169
  if list_key:
170
  base_title = context.get('title', 'a policy')
171
  for item in data[list_key]:
172
  if isinstance(item, str):
 
173
  text = f"A rule regarding '{base_title}' is: {item}."
174
+ for sub_chunk in split_text_by_tokens(text):
 
175
  chunks.append(create_chunk(context, sub_chunk))
176
  return chunks
177
 
178
+ # Handler 2: Recursive traversal for nested dicts/lists
179
  has_recursed = False
180
  for key, value in data.items():
181
  if isinstance(value, list) and value and all(isinstance(item, dict) for item in value):
 
186
  # Handler 3: Leaf nodes with delegation, composition or description
187
  if not has_recursed and ("delegation" in data or "composition" in data or "description" in data):
188
  text = build_descriptive_text(context)
189
+ for chunk_text in split_text_by_tokens(text):
 
190
  chunks.append(create_chunk(context, chunk_text))
191
 
192
  return chunks
193
 
 
194
  def main():
195
+ print(f"Starting to process '{INPUT_FILE}' with token-based chunking and keyword enhancement...")
 
196
  all_chunks = []
197
 
198
  try:
 
223
  # Write output in JSONL format for later vector DB ingestion
224
  with open(OUTPUT_FILE, 'w', encoding='utf-8') as outf:
225
  for chunk in unique_chunks:
226
+ outf.write(json.dumps(chunk, ensure_ascii=False) + "\\n")
227
 
228
  print(f"Successfully wrote improved granular chunks to '{OUTPUT_FILE}'.")
229
 
 
230
  if __name__ == "__main__":
231
  main()