regulens / scripts /text_extraction_landing_ai.py
amougou-fortiss's picture
Upload 9 files
ce77033 verified
import os
import json
import glob
from agentic_doc.parse import parse
from scripts.pymupdf_nlp_preprocessing import classify_changes_with_nlp
from scripts.pymupdf_no_nlp_preprocessing import classify_changes_without_nlp_insights
def extract_document_agentic(
uploaded_document_name: str,
uploaded_document_bytes: bytes,
do_nlp_preprocessing=True,
extraction_dir="text_extractions/",
):
# Ensure extraction directory exists
os.makedirs(extraction_dir, exist_ok=True)
# Get the base document name (without extension)
document_name = os.path.splitext(uploaded_document_name)[0]
# Pattern to match existing extractions (e.g., "documentABC_*.json")
existing_extraction_pattern = os.path.join(
extraction_dir, f"{document_name}_*.json"
)
existing_files = glob.glob(existing_extraction_pattern)
# Check if extraction already exists
if existing_files:
print(f"Extraction already exists for {document_name}, using existing file...")
# Use the first matching file (assuming only one extraction per doc)
existing_file = existing_files[0]
return json.load(open(existing_file, "r", encoding="utf-8"))
else:
try:
print(f"No existing extraction found for {document_name}, calling API...")
result = json.loads(parse(uploaded_document_bytes)[0].model_dump_json())
print(f"Successfully extracted {document_name}")
except Exception as e:
print(f"Error extracting {document_name}: {str(e)}")
result = {"status": "error", "error": str(e)}
return result
if result:
if "chunks" in result and isinstance(result["chunks"], list):
for chunk in result["chunks"]:
if do_nlp_preprocessing:
classification_result = classify_changes_with_nlp(chunk["text"], "")
# flatten into a single json element so it matches non-nlp part
if classification_result and len(classification_result) > 0:
flattened_classifications = {"changes_detected": classification_result[0].get("changes_detected", False), "classifications": []}
for class_res in classification_result:
if class_res.get("changes_detected", False):
flattened_classifications["classifications"].extend(class_res.get("classifications", []))
classification_result = flattened_classifications
else:
classification_result = classify_changes_without_nlp_insights(
chunk["text"], ""
)
if classification_result and classification_result.get(
"changes_detected", False
):
subchunks = []
for subchunk in classification_result.get(
"classifications", []
):
subchunks.append(
{
"text": subchunk.get("relevant_text", ""),
"validated": False,
"confirmed": False,
"category": subchunk.get("change", ""),
"type": subchunk.get("change_type", ""),
"context": subchunk.get("explanation", ""),
}
)
chunk["subchunks"] = subchunks
else:
result["chunks"].remove(chunk)
# Create flattened list of subchunks for UI compatibility
flattened_changes = []
for chunk in result["chunks"]:
if "subchunks" in chunk:
for subchunk in chunk["subchunks"]:
subchunk["grounding"] = chunk["grounding"]
subchunk["grounding"][0]["line"] = -1
subchunk["chunk_id"] = chunk["chunk_id"]
flattened_changes.append(subchunk)
return flattened_changes, result.get("markdown", "")