Spaces:
Sleeping
Sleeping
File size: 5,248 Bytes
5b7955a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | """Chunker for markdown tables in policy schedules."""
from __future__ import annotations
from typing import List, Optional, Tuple
from rag_engine.chunking.token_utils import count_tokens
from rag_engine.schemas.chunk_metadata import ChunkMetadata, ClauseType
from rag_engine.utils.logger import get_logger
logger = get_logger(__name__)
MAX_TABLE_TOKENS: int = 1024
_DEDUCTIBLE_KEYWORDS = {"deductible", "excess", "retention", "self-insured"}
_LIMIT_KEYWORDS = {"limit", "maximum", "sum insured", "aggregate", "sub-limit", "cap"}
class TableChunker:
def chunk_table(
self,
text: str,
policy_id: str,
source_file: str,
section_name: str = "Table",
section_number: Optional[str] = None,
page_number: int = 1,
base_chunk_index: int = 0,
) -> List[Tuple[str, ChunkMetadata]]:
lines = text.strip().splitlines()
# find header + separator rows
header_idx: Optional[int] = None
separator_idx: Optional[int] = None
for i, line in enumerate(lines):
stripped = line.strip()
if "|" in stripped and header_idx is None:
header_idx = i
elif header_idx is not None and separator_idx is None:
if set(stripped.replace("|", "").strip()) <= {"-", " ", ":"}:
separator_idx = i
break
# no table found, just return the whole thing as one chunk
if header_idx is None or separator_idx is None:
logger.debug("No markdown table found — single chunk fallback.")
return self._single_chunk(
text, policy_id, source_file, section_name,
section_number, page_number, base_chunk_index,
)
preamble_lines = lines[:header_idx]
header_line = lines[header_idx]
separator_line = lines[separator_idx]
data_lines = lines[separator_idx + 1:]
preamble = "\n".join(preamble_lines).strip()
header_block = f"{header_line}\n{separator_line}"
# group data rows into batches that fit under MAX_TABLE_TOKENS
batches: List[List[str]] = []
current_batch: List[str] = []
current_tokens = count_tokens(header_block) + (
count_tokens(preamble) if preamble else 0
)
for row in data_lines:
row_stripped = row.strip()
if not row_stripped:
continue
row_tokens = count_tokens(row_stripped)
if current_batch and (current_tokens + row_tokens) > MAX_TABLE_TOKENS:
batches.append(current_batch)
current_batch = []
current_tokens = count_tokens(header_block) + (
count_tokens(preamble) if preamble else 0
)
current_batch.append(row_stripped)
current_tokens += row_tokens
if current_batch:
batches.append(current_batch)
if not batches:
return self._single_chunk(
text, policy_id, source_file, section_name,
section_number, page_number, base_chunk_index,
)
# build chunks — header re-included in every split
text_lower = text.lower()
deductible_related = any(kw in text_lower for kw in _DEDUCTIBLE_KEYWORDS)
limit_related = any(kw in text_lower for kw in _LIMIT_KEYWORDS)
results: List[Tuple[str, ChunkMetadata]] = []
for batch_idx, batch_rows in enumerate(batches):
parts = []
if preamble:
parts.append(preamble)
parts.append(header_block)
parts.extend(batch_rows)
chunk_text = "\n".join(parts)
meta = ChunkMetadata(
policy_id=policy_id,
source_file=source_file,
section_name=section_name,
section_number=section_number,
page_number=page_number,
chunk_index=base_chunk_index + batch_idx,
clause_type=ClauseType.SCHEDULE,
table_chunk=True,
deductible_related=deductible_related,
limit_related=limit_related,
token_count=count_tokens(chunk_text),
)
results.append((chunk_text, meta))
return results
@staticmethod
def _single_chunk(
text: str,
policy_id: str,
source_file: str,
section_name: str,
section_number: Optional[str],
page_number: int,
chunk_index: int,
) -> List[Tuple[str, ChunkMetadata]]:
text_lower = text.lower()
meta = ChunkMetadata(
policy_id=policy_id,
source_file=source_file,
section_name=section_name,
section_number=section_number,
page_number=page_number,
chunk_index=chunk_index,
clause_type=ClauseType.SCHEDULE,
table_chunk=True,
deductible_related=any(kw in text_lower for kw in _DEDUCTIBLE_KEYWORDS),
limit_related=any(kw in text_lower for kw in _LIMIT_KEYWORDS),
token_count=count_tokens(text),
)
return [(text, meta)]
|