File size: 2,428 Bytes
db06ffa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""Normalize Unstructured partition elements into the canonical parse schema."""

from __future__ import annotations

from typing import Any, Iterable

from zsgdp.normalize.markdown import normalize_markdown_candidate
from zsgdp.schema import DocumentProfile, ParseCandidate


def normalize_unstructured_parts(
    *,
    parts: Iterable[Any],
    profile: DocumentProfile,
    source_path: str,
) -> ParseCandidate:
    markdown = _parts_to_markdown(parts)
    candidate = normalize_markdown_candidate(
        markdown=markdown,
        doc_id=profile.doc_id,
        source_path=source_path,
        file_type=profile.file_type,
        parser_name="unstructured",
        confidence=0.78,
        provenance={"backend": "unstructured", "normalizer": "normalize_unstructured_parts"},
    )
    return candidate


def _parts_to_markdown(parts: Iterable[Any]) -> str:
    lines: list[str] = []
    current_page: int | None = None
    for part in parts:
        text = str(part).strip()
        if not text:
            continue
        page_num = _part_page_num(part)
        if page_num and page_num != current_page:
            current_page = page_num
            if lines:
                lines.append("")
            lines.append(f"<!-- page:{page_num} -->")
            lines.append("")
        lines.append(_part_to_markdown(part, text))
        lines.append("")
    return "\n".join(lines).strip() + ("\n" if lines else "")


def _part_to_markdown(part: Any, text: str) -> str:
    category = _part_category(part).lower()
    if category in {"title", "header"} and not text.startswith("#"):
        return f"# {text}"
    if category in {"table"}:
        html = _part_metadata_value(part, "text_as_html")
        return str(html).strip() if html else text
    return text


def _part_category(part: Any) -> str:
    category = getattr(part, "category", None)
    if category:
        return str(category)
    return part.__class__.__name__


def _part_page_num(part: Any) -> int | None:
    value = _part_metadata_value(part, "page_number")
    try:
        return int(value) if value is not None else None
    except (TypeError, ValueError):
        return None


def _part_metadata_value(part: Any, key: str) -> Any:
    metadata = getattr(part, "metadata", None)
    if metadata is None:
        return None
    if isinstance(metadata, dict):
        return metadata.get(key)
    return getattr(metadata, key, None)