File size: 5,248 Bytes
5b7955a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""Chunker for markdown tables in policy schedules."""

from __future__ import annotations

from typing import List, Optional, Tuple

from rag_engine.chunking.token_utils import count_tokens
from rag_engine.schemas.chunk_metadata import ChunkMetadata, ClauseType
from rag_engine.utils.logger import get_logger

logger = get_logger(__name__)

MAX_TABLE_TOKENS: int = 1024

_DEDUCTIBLE_KEYWORDS = {"deductible", "excess", "retention", "self-insured"}
_LIMIT_KEYWORDS = {"limit", "maximum", "sum insured", "aggregate", "sub-limit", "cap"}


class TableChunker:

    def chunk_table(
        self,
        text: str,
        policy_id: str,
        source_file: str,
        section_name: str = "Table",
        section_number: Optional[str] = None,
        page_number: int = 1,
        base_chunk_index: int = 0,
    ) -> List[Tuple[str, ChunkMetadata]]:
        lines = text.strip().splitlines()

        # find header + separator rows
        header_idx: Optional[int] = None
        separator_idx: Optional[int] = None

        for i, line in enumerate(lines):
            stripped = line.strip()
            if "|" in stripped and header_idx is None:
                header_idx = i
            elif header_idx is not None and separator_idx is None:
                if set(stripped.replace("|", "").strip()) <= {"-", " ", ":"}:
                    separator_idx = i
                    break

        # no table found, just return the whole thing as one chunk
        if header_idx is None or separator_idx is None:
            logger.debug("No markdown table found — single chunk fallback.")
            return self._single_chunk(
                text, policy_id, source_file, section_name,
                section_number, page_number, base_chunk_index,
            )

        preamble_lines = lines[:header_idx]
        header_line = lines[header_idx]
        separator_line = lines[separator_idx]
        data_lines = lines[separator_idx + 1:]

        preamble = "\n".join(preamble_lines).strip()
        header_block = f"{header_line}\n{separator_line}"

        # group data rows into batches that fit under MAX_TABLE_TOKENS
        batches: List[List[str]] = []
        current_batch: List[str] = []
        current_tokens = count_tokens(header_block) + (
            count_tokens(preamble) if preamble else 0
        )

        for row in data_lines:
            row_stripped = row.strip()
            if not row_stripped:
                continue
            row_tokens = count_tokens(row_stripped)
            if current_batch and (current_tokens + row_tokens) > MAX_TABLE_TOKENS:
                batches.append(current_batch)
                current_batch = []
                current_tokens = count_tokens(header_block) + (
                    count_tokens(preamble) if preamble else 0
                )
            current_batch.append(row_stripped)
            current_tokens += row_tokens

        if current_batch:
            batches.append(current_batch)

        if not batches:
            return self._single_chunk(
                text, policy_id, source_file, section_name,
                section_number, page_number, base_chunk_index,
            )

        # build chunks — header re-included in every split
        text_lower = text.lower()
        deductible_related = any(kw in text_lower for kw in _DEDUCTIBLE_KEYWORDS)
        limit_related = any(kw in text_lower for kw in _LIMIT_KEYWORDS)

        results: List[Tuple[str, ChunkMetadata]] = []
        for batch_idx, batch_rows in enumerate(batches):
            parts = []
            if preamble:
                parts.append(preamble)
            parts.append(header_block)
            parts.extend(batch_rows)
            chunk_text = "\n".join(parts)

            meta = ChunkMetadata(
                policy_id=policy_id,
                source_file=source_file,
                section_name=section_name,
                section_number=section_number,
                page_number=page_number,
                chunk_index=base_chunk_index + batch_idx,
                clause_type=ClauseType.SCHEDULE,
                table_chunk=True,
                deductible_related=deductible_related,
                limit_related=limit_related,
                token_count=count_tokens(chunk_text),
            )
            results.append((chunk_text, meta))

        return results

    @staticmethod
    def _single_chunk(
        text: str,
        policy_id: str,
        source_file: str,
        section_name: str,
        section_number: Optional[str],
        page_number: int,
        chunk_index: int,
    ) -> List[Tuple[str, ChunkMetadata]]:
        text_lower = text.lower()
        meta = ChunkMetadata(
            policy_id=policy_id,
            source_file=source_file,
            section_name=section_name,
            section_number=section_number,
            page_number=page_number,
            chunk_index=chunk_index,
            clause_type=ClauseType.SCHEDULE,
            table_chunk=True,
            deductible_related=any(kw in text_lower for kw in _DEDUCTIBLE_KEYWORDS),
            limit_related=any(kw in text_lower for kw in _LIMIT_KEYWORDS),
            token_count=count_tokens(text),
        )
        return [(text, meta)]