import json import re from pathlib import Path from typing import Any REQUIRED_COLUMNS = ["date", "description", "voucher_type", "amount", "closing"] def extract_json_object(text: str) -> dict[str, Any]: cleaned = text.strip() cleaned = re.sub(r"^```(?:json)?\s*", "", cleaned, flags=re.IGNORECASE) cleaned = re.sub(r"\s*```$", "", cleaned) decoder = json.JSONDecoder() try: parsed = json.loads(cleaned) except json.JSONDecodeError: parsed = None for match in re.finditer(r"\{", cleaned): try: candidate, _ = decoder.raw_decode(cleaned[match.start() :]) except json.JSONDecodeError: continue if ( isinstance(candidate, dict) and candidate.get("success") is True and isinstance(candidate.get("columns"), list) and isinstance(candidate.get("data"), list) ): parsed = candidate break if parsed is None: raise ValueError("The AI response did not contain a complete JSON object.") if not isinstance(parsed, dict): raise ValueError("The AI response JSON must be an object.") if not isinstance(parsed.get("data"), list): raise ValueError("The AI response JSON must contain a data array.") if parsed["data"] == [ { "date": "DD/MM/YYYY", "description": "Full transaction narration exactly as shown", "voucher_type": "Payment or Receipt", "amount": "0.00", "closing": "0.00", } ]: raise ValueError("The AI returned the placeholder example row instead of extracted transactions.") return parsed def combine_batch_files(batch_files: list[Path], output_path: Path) -> dict[str, Any]: combined = {"success": True, "columns": REQUIRED_COLUMNS, "data": []} for batch_file in batch_files: parsed = json.loads(batch_file.read_text(encoding="utf-8")) rows = parsed.get("data", []) if not isinstance(rows, list): raise ValueError(f"{batch_file.name} does not contain a data array.") combined["data"].extend(rows) output_path.write_text( json.dumps(combined, ensure_ascii=False, indent=2), encoding="utf-8", ) return combined