policylens-rag-api / rag_engine /chunking /table_chunker.py
DEVJHAWAR11
sync: deploy to HuggingFace Space
5b7955a
"""Chunker for markdown tables in policy schedules."""
from __future__ import annotations
from typing import List, Optional, Tuple
from rag_engine.chunking.token_utils import count_tokens
from rag_engine.schemas.chunk_metadata import ChunkMetadata, ClauseType
from rag_engine.utils.logger import get_logger
logger = get_logger(__name__)
MAX_TABLE_TOKENS: int = 1024
_DEDUCTIBLE_KEYWORDS = {"deductible", "excess", "retention", "self-insured"}
_LIMIT_KEYWORDS = {"limit", "maximum", "sum insured", "aggregate", "sub-limit", "cap"}
class TableChunker:
def chunk_table(
self,
text: str,
policy_id: str,
source_file: str,
section_name: str = "Table",
section_number: Optional[str] = None,
page_number: int = 1,
base_chunk_index: int = 0,
) -> List[Tuple[str, ChunkMetadata]]:
lines = text.strip().splitlines()
# find header + separator rows
header_idx: Optional[int] = None
separator_idx: Optional[int] = None
for i, line in enumerate(lines):
stripped = line.strip()
if "|" in stripped and header_idx is None:
header_idx = i
elif header_idx is not None and separator_idx is None:
if set(stripped.replace("|", "").strip()) <= {"-", " ", ":"}:
separator_idx = i
break
# no table found, just return the whole thing as one chunk
if header_idx is None or separator_idx is None:
logger.debug("No markdown table found — single chunk fallback.")
return self._single_chunk(
text, policy_id, source_file, section_name,
section_number, page_number, base_chunk_index,
)
preamble_lines = lines[:header_idx]
header_line = lines[header_idx]
separator_line = lines[separator_idx]
data_lines = lines[separator_idx + 1:]
preamble = "\n".join(preamble_lines).strip()
header_block = f"{header_line}\n{separator_line}"
# group data rows into batches that fit under MAX_TABLE_TOKENS
batches: List[List[str]] = []
current_batch: List[str] = []
current_tokens = count_tokens(header_block) + (
count_tokens(preamble) if preamble else 0
)
for row in data_lines:
row_stripped = row.strip()
if not row_stripped:
continue
row_tokens = count_tokens(row_stripped)
if current_batch and (current_tokens + row_tokens) > MAX_TABLE_TOKENS:
batches.append(current_batch)
current_batch = []
current_tokens = count_tokens(header_block) + (
count_tokens(preamble) if preamble else 0
)
current_batch.append(row_stripped)
current_tokens += row_tokens
if current_batch:
batches.append(current_batch)
if not batches:
return self._single_chunk(
text, policy_id, source_file, section_name,
section_number, page_number, base_chunk_index,
)
# build chunks — header re-included in every split
text_lower = text.lower()
deductible_related = any(kw in text_lower for kw in _DEDUCTIBLE_KEYWORDS)
limit_related = any(kw in text_lower for kw in _LIMIT_KEYWORDS)
results: List[Tuple[str, ChunkMetadata]] = []
for batch_idx, batch_rows in enumerate(batches):
parts = []
if preamble:
parts.append(preamble)
parts.append(header_block)
parts.extend(batch_rows)
chunk_text = "\n".join(parts)
meta = ChunkMetadata(
policy_id=policy_id,
source_file=source_file,
section_name=section_name,
section_number=section_number,
page_number=page_number,
chunk_index=base_chunk_index + batch_idx,
clause_type=ClauseType.SCHEDULE,
table_chunk=True,
deductible_related=deductible_related,
limit_related=limit_related,
token_count=count_tokens(chunk_text),
)
results.append((chunk_text, meta))
return results
@staticmethod
def _single_chunk(
text: str,
policy_id: str,
source_file: str,
section_name: str,
section_number: Optional[str],
page_number: int,
chunk_index: int,
) -> List[Tuple[str, ChunkMetadata]]:
text_lower = text.lower()
meta = ChunkMetadata(
policy_id=policy_id,
source_file=source_file,
section_name=section_name,
section_number=section_number,
page_number=page_number,
chunk_index=chunk_index,
clause_type=ClauseType.SCHEDULE,
table_chunk=True,
deductible_related=any(kw in text_lower for kw in _DEDUCTIBLE_KEYWORDS),
limit_related=any(kw in text_lower for kw in _LIMIT_KEYWORDS),
token_count=count_tokens(text),
)
return [(text, meta)]