| import os |
| import json |
| import glob |
| from agentic_doc.parse import parse |
|
|
| from scripts.pymupdf_nlp_preprocessing import classify_changes_with_nlp |
| from scripts.pymupdf_no_nlp_preprocessing import classify_changes_without_nlp_insights |
|
|
|
|
| def extract_document_agentic( |
| uploaded_document_name: str, |
| uploaded_document_bytes: bytes, |
| do_nlp_preprocessing=True, |
| extraction_dir="text_extractions/", |
| ): |
| |
| os.makedirs(extraction_dir, exist_ok=True) |
|
|
| |
| document_name = os.path.splitext(uploaded_document_name)[0] |
|
|
| |
| existing_extraction_pattern = os.path.join( |
| extraction_dir, f"{document_name}_*.json" |
| ) |
| existing_files = glob.glob(existing_extraction_pattern) |
|
|
| |
| if existing_files: |
| print(f"Extraction already exists for {document_name}, using existing file...") |
| |
| existing_file = existing_files[0] |
| return json.load(open(existing_file, "r", encoding="utf-8")) |
| else: |
| try: |
| print(f"No existing extraction found for {document_name}, calling API...") |
| result = json.loads(parse(uploaded_document_bytes)[0].model_dump_json()) |
| print(f"Successfully extracted {document_name}") |
| except Exception as e: |
| print(f"Error extracting {document_name}: {str(e)}") |
| result = {"status": "error", "error": str(e)} |
| return result |
| if result: |
| if "chunks" in result and isinstance(result["chunks"], list): |
| for chunk in result["chunks"]: |
| if do_nlp_preprocessing: |
| classification_result = classify_changes_with_nlp(chunk["text"], "") |
| |
| if classification_result and len(classification_result) > 0: |
| flattened_classifications = {"changes_detected": classification_result[0].get("changes_detected", False), "classifications": []} |
| for class_res in classification_result: |
| if class_res.get("changes_detected", False): |
| flattened_classifications["classifications"].extend(class_res.get("classifications", [])) |
| classification_result = flattened_classifications |
| else: |
| classification_result = classify_changes_without_nlp_insights( |
| chunk["text"], "" |
| ) |
| if classification_result and classification_result.get( |
| "changes_detected", False |
| ): |
| subchunks = [] |
| for subchunk in classification_result.get( |
| "classifications", [] |
| ): |
| subchunks.append( |
| { |
| "text": subchunk.get("relevant_text", ""), |
| "validated": False, |
| "confirmed": False, |
| "category": subchunk.get("change", ""), |
| "type": subchunk.get("change_type", ""), |
| "context": subchunk.get("explanation", ""), |
| } |
| ) |
| chunk["subchunks"] = subchunks |
| else: |
| result["chunks"].remove(chunk) |
| |
| flattened_changes = [] |
| for chunk in result["chunks"]: |
| if "subchunks" in chunk: |
| for subchunk in chunk["subchunks"]: |
| subchunk["grounding"] = chunk["grounding"] |
| subchunk["grounding"][0]["line"] = -1 |
| subchunk["chunk_id"] = chunk["chunk_id"] |
| flattened_changes.append(subchunk) |
| return flattened_changes, result.get("markdown", "") |