File size: 3,927 Bytes
3370983 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
"""Section parsing and processing utilities."""
import json
import re
from collections import OrderedDict
from pathlib import Path
from typing import Dict, List
from ftfy import fix_text
from .text import normalize_bullets, tag_contacts
def parse_sections_from_json_text(text: str) -> List[Dict[str, str]]:
"""
Parse STRICT JSON from the API response.
Attempts direct JSON parsing first, then falls back to
extracting JSON array from surrounding text.
Args:
text: Raw text that should contain a JSON array.
Returns:
List of section dicts with 'title' and 'body' keys.
"""
# Try direct parse
try:
data = json.loads(text)
if isinstance(data, list):
out: List[Dict[str, str]] = []
for item in data:
if isinstance(item, dict):
out.append(
{
"title": str(item.get("title", "")).strip(),
"body": str(item.get("body", "")).strip(),
}
)
return out
except Exception:
pass
# Try to extract JSON array from text
m = re.search(r"\[\s*\{[\s\S]*\}\s*\]", text)
if m:
try:
data = json.loads(m.group(0))
if isinstance(data, list):
out: List[Dict[str, str]] = []
for item in data:
if isinstance(item, dict):
out.append(
{
"title": str(item.get("title", "")).strip(),
"body": str(item.get("body", "")).strip(),
}
)
return out
except Exception:
pass
return []
def normalize_sections(sections: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Normalize text encoding with ftfy (fixes mojibake, etc.)."""
norm: List[Dict[str, str]] = []
for s in sections:
title = fix_text((s.get("title") or "").strip())
body = fix_text((s.get("body") or "").strip())
norm.append({"title": title, "body": body})
return norm
def merge_duplicate_titles(sections: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Merge sections with duplicate titles while preserving order."""
merged: "OrderedDict[str, str]" = OrderedDict()
for s in sections:
title = s.get("title", "").strip()
body = (s.get("body", "") or "").strip()
if title in merged:
if body:
prev = merged[title]
merged[title] = (prev + ("\n\n" if prev else "") + body).strip()
else:
merged[title] = body
return [{"title": t, "body": b} for t, b in merged.items()]
def build_contact_section_from_filename(pdf_file: Path) -> Dict[str, str]:
"""
Create a simple 'Adresse' section based on the PDF filename.
Useful as a fallback when contact info isn't parsed from the document.
"""
stem = pdf_file.stem.replace("_", " ").strip()
tokens = stem.split(maxsplit=1)
if tokens and len(tokens[0]) == 1 and tokens[0].isalpha():
stem = tokens[1] if len(tokens) > 1 else ""
name = stem.strip() or pdf_file.name
return {"title": "Adresse", "body": f"Name: {name}"}
def process_section(section: Dict[str, str]) -> Dict[str, str]:
"""Normalize bullets and tag contact info for a single section."""
title = section.get("title", "")
body = section.get("body", "")
return {
"title": tag_contacts(normalize_bullets(title)),
"body": tag_contacts(normalize_bullets(body)),
}
def apply_postprocessing(sections: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Apply bullet normalization and contact tagging to all sections."""
return [process_section(s) for s in sections]
|