Spaces:
Sleeping
Sleeping
| """Parse and merge structured OCR JSON from MiniCPM-V.""" | |
| from __future__ import annotations | |
| import json | |
| import re | |
| from typing import Any, Dict, List, Optional, Tuple | |
| # Reject placeholder keys the model sometimes copies from schema examples. | |
| _GENERIC_KEY_PATTERN = re.compile( | |
| r"^(label|value|field\d*|column\d*|cell\d*|key|example|sample|placeholder|" | |
| r"header\d*|row\d*|item\d*|data\d*|text\d*|name\d*)$", | |
| re.IGNORECASE, | |
| ) | |
| _GENERIC_SECTION_TITLES = { | |
| "details", | |
| "section name", | |
| "table section name", | |
| "account information", | |
| "balance summary", | |
| "line items", | |
| "transactions", | |
| "key value", | |
| "key_value", | |
| } | |
| def _strip_json_fence(text: str) -> str: | |
| cleaned = text.strip() | |
| cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned, flags=re.IGNORECASE) | |
| cleaned = re.sub(r"\s*```$", "", cleaned) | |
| return cleaned.strip() | |
| def _is_generic_key(key: str) -> bool: | |
| stripped = key.strip() | |
| if not stripped: | |
| return True | |
| return bool(_GENERIC_KEY_PATTERN.match(stripped)) | |
| def _normalize_section_title(title: str, fallback: str = "Extracted fields") -> str: | |
| cleaned = title.strip() | |
| if not cleaned or cleaned.lower() in _GENERIC_SECTION_TITLES: | |
| return fallback | |
| return cleaned | |
| def _coerce_fields(section: Dict[str, Any]) -> Dict[str, str]: | |
| """Accept fields dict or list-of-pairs formats from the model.""" | |
| fields: Dict[str, str] = {} | |
| raw_fields = section.get("fields") | |
| if isinstance(raw_fields, dict): | |
| for key, value in raw_fields.items(): | |
| key_str = str(key).strip() | |
| if not key_str or value is None or _is_generic_key(key_str): | |
| continue | |
| value_str = str(value).strip() | |
| if value_str: | |
| fields[key_str] = value_str | |
| for list_key in ("pairs", "key_values", "key_value_pairs", "items"): | |
| raw_list = section.get(list_key) | |
| if not isinstance(raw_list, list): | |
| continue | |
| for item in raw_list: | |
| if not isinstance(item, dict): | |
| continue | |
| label = ( | |
| item.get("key") | |
| or item.get("label") | |
| or item.get("name") | |
| or item.get("field") | |
| ) | |
| value = item.get("value") or item.get("text") or item.get("content") | |
| if label is None or value is None: | |
| continue | |
| label_str = str(label).strip() | |
| value_str = str(value).strip() | |
| if label_str and value_str and not _is_generic_key(label_str): | |
| fields[label_str] = value_str | |
| return fields | |
| def _coerce_table(section: Dict[str, Any]) -> Tuple[List[str], List[List[str]]]: | |
| headers = [str(h).strip() for h in (section.get("headers") or []) if str(h).strip()] | |
| headers = [h for h in headers if not _is_generic_key(h)] | |
| rows: List[List[str]] = [] | |
| for row in section.get("rows") or []: | |
| if not isinstance(row, list): | |
| continue | |
| cells = [str(cell).strip() for cell in row] | |
| if any(cells): | |
| rows.append(cells) | |
| # Some models return columns as objects instead of headers+rows. | |
| columns = section.get("columns") | |
| if isinstance(columns, list) and columns and not rows: | |
| col_headers = [] | |
| col_values: List[List[str]] = [] | |
| for col in columns: | |
| if not isinstance(col, dict): | |
| continue | |
| header = str(col.get("header") or col.get("name") or "").strip() | |
| values = col.get("values") or col.get("cells") or [] | |
| if header and not _is_generic_key(header): | |
| col_headers.append(header) | |
| col_values.append([str(v).strip() for v in values if v is not None]) | |
| if col_headers and col_values: | |
| max_len = max(len(values) for values in col_values) | |
| headers = col_headers | |
| rows = [] | |
| for idx in range(max_len): | |
| rows.append([values[idx] if idx < len(values) else "" for values in col_values]) | |
| return headers, rows | |
| def _normalize_sections(sections: Any) -> List[Dict[str, Any]]: | |
| if not isinstance(sections, list): | |
| return [] | |
| normalized: List[Dict[str, Any]] = [] | |
| kv_fallback_idx = 1 | |
| for section in sections: | |
| if not isinstance(section, dict): | |
| continue | |
| section_type = str(section.get("type") or "key_value").lower() | |
| title = _normalize_section_title( | |
| str(section.get("title") or ""), | |
| fallback=f"Extracted fields {kv_fallback_idx}", | |
| ) | |
| if section_type == "table": | |
| headers, rows = _coerce_table(section) | |
| if headers or rows: | |
| normalized.append( | |
| { | |
| "title": title, | |
| "type": "table", | |
| "headers": headers, | |
| "rows": rows, | |
| } | |
| ) | |
| continue | |
| fields = _coerce_fields(section) | |
| if fields: | |
| if title.startswith("Extracted fields"): | |
| kv_fallback_idx += 1 | |
| normalized.append( | |
| { | |
| "title": title, | |
| "type": "key_value", | |
| "fields": fields, | |
| } | |
| ) | |
| return normalized | |
| def parse_structured_page(raw: str, page_number: int = 1) -> Dict[str, Any]: | |
| """Parse model JSON for one page; return a safe default on failure.""" | |
| fallback = { | |
| "page_number": page_number, | |
| "document_type": "other", | |
| "document_title": "", | |
| "sections": [], | |
| "parse_error": True, | |
| "raw_text": raw.strip(), | |
| } | |
| if not raw or not raw.strip(): | |
| return fallback | |
| try: | |
| data = json.loads(_strip_json_fence(raw)) | |
| except json.JSONDecodeError: | |
| match = re.search(r"\{[\s\S]*\}", raw) | |
| if not match: | |
| return fallback | |
| try: | |
| data = json.loads(match.group(0)) | |
| except json.JSONDecodeError: | |
| return fallback | |
| sections = _normalize_sections(data.get("sections")) | |
| meta_keys = { | |
| "document_type", | |
| "document_title", | |
| "sections", | |
| "pages", | |
| "fields", | |
| "pairs", | |
| "key_values", | |
| "key_value_pairs", | |
| "items", | |
| "columns", | |
| "headers", | |
| "rows", | |
| "type", | |
| "title", | |
| } | |
| flat_fields: Dict[str, str] = {} | |
| for key, value in data.items(): | |
| if key in meta_keys or value is None: | |
| continue | |
| if isinstance(value, (str, int, float)): | |
| key_str = str(key).strip() | |
| value_str = str(value).strip() | |
| if key_str and value_str and not _is_generic_key(key_str): | |
| flat_fields[key_str] = value_str | |
| top_fields = _coerce_fields(data) | |
| flat_fields.update(top_fields) | |
| if flat_fields and not sections: | |
| sections = [ | |
| { | |
| "title": _normalize_section_title( | |
| str(data.get("document_title") or "Document header"), | |
| fallback="Document header", | |
| ), | |
| "type": "key_value", | |
| "fields": flat_fields, | |
| } | |
| ] | |
| return { | |
| "page_number": page_number, | |
| "document_type": str(data.get("document_type") or "other"), | |
| "document_title": str(data.get("document_title") or "").strip(), | |
| "sections": sections, | |
| } | |
| def merge_structured_pages( | |
| pages: List[Dict[str, Any]], | |
| filename: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| doc_type = next( | |
| (p["document_type"] for p in pages if p.get("document_type") and p["document_type"] != "other"), | |
| pages[0]["document_type"] if pages else "other", | |
| ) | |
| document_title = next( | |
| (p["document_title"] for p in pages if p.get("document_title")), | |
| "", | |
| ) | |
| return { | |
| "filename": filename, | |
| "document_type": doc_type, | |
| "document_title": document_title, | |
| "page_count": len(pages), | |
| "pages": pages, | |
| } | |
| def structured_to_plain_text(structured: Dict[str, Any]) -> str: | |
| """Flatten structured OCR for copy/search fallback.""" | |
| lines: List[str] = [] | |
| doc_type = structured.get("document_type", "other") | |
| doc_title = structured.get("document_title", "") | |
| if doc_title: | |
| lines.append(doc_title) | |
| lines.append(f"Document type: {doc_type}") | |
| for page in structured.get("pages") or []: | |
| page_num = page.get("page_number", 1) | |
| if structured.get("page_count", 1) > 1: | |
| lines.append(f"\n--- Page {page_num} ---") | |
| page_title = page.get("document_title") | |
| if page_title and page_title != doc_title: | |
| lines.append(page_title) | |
| for section in page.get("sections") or []: | |
| title = section.get("title", "Details") | |
| lines.append(f"\n## {title}") | |
| if section.get("type") == "table": | |
| headers = section.get("headers") or [] | |
| rows = section.get("rows") or [] | |
| if headers: | |
| lines.append(" | ".join(headers)) | |
| lines.append(" | ".join(["---"] * len(headers))) | |
| for row in rows: | |
| lines.append(" | ".join(row)) | |
| else: | |
| for key, value in (section.get("fields") or {}).items(): | |
| lines.append(f"{key}: {value}") | |
| if page.get("parse_error") and page.get("raw_text"): | |
| lines.append("\nRaw extraction:") | |
| lines.append(page["raw_text"]) | |
| return "\n".join(lines).strip() | |