from __future__ import annotations from html.parser import HTMLParser from app.processing.text_utils import ( looks_like_heading, looks_like_table, looks_like_widget, normalize_text, rows_to_table_text, ) from app.schemas import RawDocument, StructureBlock class HTMLStructureParser(HTMLParser): def __init__(self) -> None: super().__init__() self.blocks: list[StructureBlock] = [] self.heading_path: list[str] = [] self._tag_stack: list[str] = [] self._capture_tag: str | None = None self._capture_parts: list[str] = [] self._table_depth = 0 self._table_rows: list[list[str]] = [] self._current_row: list[str] | None = None self._current_cell: list[str] | None = None def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: self._tag_stack.append(tag) if tag == "table": self._flush_capture() self._table_depth += 1 self._table_rows = [] elif self._table_depth and tag == "tr": self._current_row = [] elif self._table_depth and tag in {"td", "th"}: self._current_cell = [] elif tag in {"h1", "h2", "h3", "h4", "h5", "h6", "p", "li"}: self._flush_capture() self._capture_tag = tag self._capture_parts = [] elif tag == "br" and self._capture_tag: self._capture_parts.append("\n") def handle_endtag(self, tag: str) -> None: if self._table_depth and tag in {"td", "th"} and self._current_cell is not None: cell = normalize_text(" ".join(self._current_cell)) if self._current_row is not None: self._current_row.append(cell) self._current_cell = None elif self._table_depth and tag == "tr" and self._current_row is not None: if any(cell for cell in self._current_row): self._table_rows.append(self._current_row) self._current_row = None elif tag == "table" and self._table_depth: self._table_depth -= 1 table_text = rows_to_table_text(self._table_rows) if table_text: self.blocks.append( StructureBlock( text=table_text, structure_type="table", heading_path=list(self.heading_path), metadata={"row_count": len(self._table_rows)}, ) ) self._table_rows = [] elif tag == self._capture_tag: self._flush_capture() if self._tag_stack and self._tag_stack[-1] == tag: self._tag_stack.pop() def handle_data(self, data: str) -> None: if self._current_cell is not None: self._current_cell.append(data) elif self._capture_tag: self._capture_parts.append(data) def _flush_capture(self) -> None: if not self._capture_tag: return text = normalize_text(" ".join(self._capture_parts)) tag = self._capture_tag self._capture_tag = None self._capture_parts = [] if not text: return if tag.startswith("h"): level = int(tag[1]) self.heading_path = self.heading_path[: level - 1] + [text] self.blocks.append( StructureBlock( text=text, structure_type="heading", heading_path=list(self.heading_path), metadata={"heading_level": level}, ) ) return self.blocks.append( StructureBlock( text=text, structure_type="paragraph", heading_path=list(self.heading_path), metadata={"html_tag": tag}, ) ) def parse_text_blocks(text: str) -> list[StructureBlock]: blocks: list[StructureBlock] = [] heading_path: list[str] = [] paragraph_lines: list[str] = [] table_lines: list[str] = [] widget_lines: list[str] = [] def flush_paragraph() -> None: nonlocal paragraph_lines paragraph = normalize_text("\n".join(paragraph_lines)) paragraph_lines = [] if paragraph: blocks.append(StructureBlock(paragraph, "paragraph", list(heading_path), {})) def flush_table() -> None: nonlocal table_lines table = normalize_text("\n".join(table_lines)) table_lines = [] if table: blocks.append(StructureBlock(table, "table", list(heading_path), {})) def flush_widget() -> None: nonlocal widget_lines widget = normalize_text("\n".join(widget_lines)) widget_lines = [] if widget: blocks.append(StructureBlock(widget, "widget", list(heading_path), {})) for raw_line in text.splitlines(): line = normalize_text(raw_line) if not line: flush_paragraph() flush_table() flush_widget() continue if looks_like_heading(line): flush_paragraph() flush_table() flush_widget() heading = line.lstrip("#").strip() heading_path = [heading] blocks.append( StructureBlock( heading, "heading", list(heading_path), {"heading_level": 1}, ) ) elif looks_like_table(line): flush_paragraph() flush_widget() table_lines.append(line) elif looks_like_widget(line): flush_paragraph() flush_table() widget_lines.append(line) else: flush_table() flush_widget() paragraph_lines.append(line) flush_paragraph() flush_table() flush_widget() return blocks def parse_document_structures(document: RawDocument, csv_rows: list[list[str]] | None = None) -> list[StructureBlock]: suffix = document.source_path.suffix.lower() if suffix == ".html": parser = HTMLStructureParser() parser.feed(document.text) parser._flush_capture() if parser.blocks: return parser.blocks if suffix == ".csv": rows = csv_rows or [] table = rows_to_table_text(rows) return [ StructureBlock( table, "table", [], {"row_count": len(rows), "source_format": "csv"}, ) ] if suffix in {".png", ".jpg", ".jpeg", ".webp"}: return [ StructureBlock( document.text, "widget", [], {"source_format": "image"}, ) ] return parse_text_blocks(document.text)