from __future__ import annotations from app.config import DEFAULT_CHUNK_OVERLAP, DEFAULT_CHUNK_SIZE, RAW_DIR from app.processing.readers import read_csv_rows from app.processing.structures import parse_document_structures from app.processing.text_utils import detokenize, stable_id, token_count, tokenize from app.schemas import Chunk, RawDocument, StructureBlock def split_block_by_tokens(block: StructureBlock, max_tokens: int, overlap: int) -> list[StructureBlock]: tokens = tokenize(block.text) if len(tokens) <= max_tokens: return [block] blocks: list[StructureBlock] = [] start = 0 part_index = 0 while start < len(tokens): end = min(start + max_tokens, len(tokens)) text = detokenize(tokens[start:end]) blocks.append( StructureBlock( text=text, structure_type=block.structure_type, heading_path=block.heading_path, metadata={**block.metadata, "split_part": part_index}, ) ) if end >= len(tokens): break start = max(end - overlap, start + 1) part_index += 1 return blocks def chunk_blocks( blocks: list[StructureBlock], max_tokens: int, overlap: int, ) -> list[tuple[str, str, list[str], int, dict]]: chunks: list[tuple[str, str, list[str], int, dict]] = [] current_blocks: list[StructureBlock] = [] current_tokens = 0 def flush() -> None: nonlocal current_blocks, current_tokens if not current_blocks: return text = "\n\n".join(block.text for block in current_blocks) structure_types = [block.structure_type for block in current_blocks] heading_path = current_blocks[-1].heading_path metadata = { "structure_types": structure_types, "primary_structure_type": structure_types[0], "block_count": len(current_blocks), "block_metadata": [block.metadata for block in current_blocks], } chunks.append((text, structure_types[0], heading_path, token_count(text), metadata)) current_blocks = [] current_tokens = 0 for block in blocks: for part in split_block_by_tokens(block, max_tokens, overlap): part_tokens = token_count(part.text) if current_blocks and current_tokens + part_tokens > max_tokens: flush() current_blocks.append(part) current_tokens += part_tokens flush() return chunks def enrich_chunk_metadata(document: RawDocument, block_metadata: dict) -> dict: return { **document.metadata, "document_id": document.id, "source_file": document.source_path.name, "parser": "structure-aware-token-chunker", **block_metadata, } def chunk_documents( documents: list[RawDocument], chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_CHUNK_OVERLAP, ) -> list[Chunk]: chunks: list[Chunk] = [] for document in documents: source_path = document.source_path.relative_to(RAW_DIR.parent).as_posix() csv_rows = read_csv_rows(document.source_path) if document.source_path.suffix.lower() == ".csv" else None blocks = parse_document_structures(document, csv_rows=csv_rows) for index, (text, structure_type, heading_path, tokens, block_metadata) in enumerate( chunk_blocks(blocks, chunk_size, overlap) ): chunks.append( Chunk( id=stable_id(document.id, str(index), text[:120]), text=text, ticker=document.ticker, modality=document.modality, source_path=source_path, chunk_index=index, structure_type=structure_type, heading_path=heading_path, token_count=tokens, metadata=enrich_chunk_metadata(document, block_metadata), scope=document.scope, ) ) return chunks