owenkaplinsky's picture
update from github stable code (#3)
3370983 verified
"""Section parsing and processing utilities."""
import json
import re
from collections import OrderedDict
from pathlib import Path
from typing import Dict, List
from ftfy import fix_text
from .text import normalize_bullets, tag_contacts
def parse_sections_from_json_text(text: str) -> List[Dict[str, str]]:
"""
Parse STRICT JSON from the API response.
Attempts direct JSON parsing first, then falls back to
extracting JSON array from surrounding text.
Args:
text: Raw text that should contain a JSON array.
Returns:
List of section dicts with 'title' and 'body' keys.
"""
# Try direct parse
try:
data = json.loads(text)
if isinstance(data, list):
out: List[Dict[str, str]] = []
for item in data:
if isinstance(item, dict):
out.append(
{
"title": str(item.get("title", "")).strip(),
"body": str(item.get("body", "")).strip(),
}
)
return out
except Exception:
pass
# Try to extract JSON array from text
m = re.search(r"\[\s*\{[\s\S]*\}\s*\]", text)
if m:
try:
data = json.loads(m.group(0))
if isinstance(data, list):
out: List[Dict[str, str]] = []
for item in data:
if isinstance(item, dict):
out.append(
{
"title": str(item.get("title", "")).strip(),
"body": str(item.get("body", "")).strip(),
}
)
return out
except Exception:
pass
return []
def normalize_sections(sections: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Normalize text encoding with ftfy (fixes mojibake, etc.)."""
norm: List[Dict[str, str]] = []
for s in sections:
title = fix_text((s.get("title") or "").strip())
body = fix_text((s.get("body") or "").strip())
norm.append({"title": title, "body": body})
return norm
def merge_duplicate_titles(sections: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Merge sections with duplicate titles while preserving order."""
merged: "OrderedDict[str, str]" = OrderedDict()
for s in sections:
title = s.get("title", "").strip()
body = (s.get("body", "") or "").strip()
if title in merged:
if body:
prev = merged[title]
merged[title] = (prev + ("\n\n" if prev else "") + body).strip()
else:
merged[title] = body
return [{"title": t, "body": b} for t, b in merged.items()]
def build_contact_section_from_filename(pdf_file: Path) -> Dict[str, str]:
"""
Create a simple 'Adresse' section based on the PDF filename.
Useful as a fallback when contact info isn't parsed from the document.
"""
stem = pdf_file.stem.replace("_", " ").strip()
tokens = stem.split(maxsplit=1)
if tokens and len(tokens[0]) == 1 and tokens[0].isalpha():
stem = tokens[1] if len(tokens) > 1 else ""
name = stem.strip() or pdf_file.name
return {"title": "Adresse", "body": f"Name: {name}"}
def process_section(section: Dict[str, str]) -> Dict[str, str]:
"""Normalize bullets and tag contact info for a single section."""
title = section.get("title", "")
body = section.get("body", "")
return {
"title": tag_contacts(normalize_bullets(title)),
"body": tag_contacts(normalize_bullets(body)),
}
def apply_postprocessing(sections: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Apply bullet normalization and contact tagging to all sections."""
return [process_section(s) for s in sections]