regulens / scripts /pymupdf_nlp_preprocessing.py
Maximilian Amougou
Upload 7 files
09a324c verified
import json
import os
import asyncio
from dotenv import load_dotenv
from openai import AsyncOpenAI
from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES
from scripts.utility_functions import call_nlp_service, render_prompt
# Load environment variables from .env file
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
openai_client = AsyncOpenAI(api_key=api_key, timeout=60)
async def preprocess_text_with_nlp(text, max_chunk_size=512, overlap=50):
"""Enhanced NLP preprocessing identical to your first experiment using PyMuPDF text extraction"""
return await call_nlp_service({"text": text}, "preprocess_text_with_nlp_pymupdf")
def create_prompt_with_nlp(chunk, preprocessed_data):
return render_prompt(chunk, include_nlp=True, preprocessed_data=preprocessed_data)
async def classify_changes_with_nlp(text_content, location_info):
"""Classify changes with NLP preprocessing."""
# Apply NLP preprocessing
preprocessed_data = await preprocess_text_with_nlp(text_content)
# Split into chunks (using the same method as your first experiment)
result = await call_nlp_service({"text": text_content}, "recursive_character_text_splitter")
chunks = result["chunks"]
async def process_chunk(chunk):
try:
response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a legal expert analyzing German regulatory changes. Return only JSON.",
},
{
"role": "user",
"content": create_prompt_with_nlp(chunk, preprocessed_data),
},
],
temperature=0.7,
max_tokens=1024,
)
result = json.loads(response.choices[0].message.content)
if result.get("changes_detected", False):
result["location"] = location_info
result["source_text"] = chunk
return result
except (json.JSONDecodeError, Exception):
return None
tasks = [process_chunk(chunk) for chunk in chunks]
results = await asyncio.gather(*tasks)
filtered_results = [r for r in results if r is not None]
return filtered_results if filtered_results else None
# Async wrapper for backward compatibility
async def classify_changes_with_nlp_async(text_content, location_info):
return await classify_changes_with_nlp(text_content, location_info)
def extract_hierarchical_text(block):
"""Extract text from a block including its parent and grandparent contexts."""
text_parts = []
# Check if block has a grandparent
if (
"parent" in block
and block["parent"] is not None
and "parent" in block["parent"]
and block["parent"]["parent"] is not None
):
text_parts.append(block["parent"]["parent"]["text"])
# Check if block has a parent
if "parent" in block and block["parent"] is not None:
text_parts.append(block["parent"]["text"])
# Add the current block's text
text_parts.append(block["text"])
# Join all text parts with newlines between them
return "\n\n".join(text_parts)
async def traverse_blocks_with_nlp(blocks, parent=None):
"""Traverse hierarchy with NLP-enhanced analysis using asyncio.gather()."""
async def process_block(block, parent):
block["parent"] = parent
if "children" in block and not block["children"]: # Leaf node
text_content = extract_hierarchical_text(block)
location_info = {
"page_number": block["page_number"],
"block_text": block["text"],
}
changes = await classify_changes_with_nlp(text_content, location_info)
if changes:
for change in changes:
change["full_text"] = text_content
return changes
else:
# Process children recursively
return await traverse_blocks_with_nlp(block["children"], block)
return []
# Process all blocks concurrently
tasks = [process_block(block, parent) for block in blocks]
results = await asyncio.gather(*tasks)
# Flatten results
flattened = []
for result in results:
if isinstance(result, list):
flattened.extend(result)
return flattened
def pymupdf_regulatory_change_detector_with_nlp_insights(hierarchical_structure, progress_callback=None, status_callback=None):
"""Main function with NLP integration."""
if not hierarchical_structure:
return {"error": "No structure provided"}, []
analysis_summary = {
"total_changes_detected": 0,
"changes_by_type": {"addition": 0, "deletion": 0, "modification": 0},
}
changes_by_page = {}
if status_callback:
status_callback("Analyzing all document blocks concurrently with NLP...")
# Run async processing
results = asyncio.run(traverse_blocks_with_nlp(hierarchical_structure["blocks"]))
for change in results:
analysis_summary["total_changes_detected"] += len(change["classifications"])
for classification in change["classifications"]:
analysis_summary["changes_by_type"][classification["change_type"]] += 1
change_subtype = (
"context" if classification["change"] in CONTEXT_CATEGORIES else "scope"
)
page_num = change["location"]["page_number"]
changes_by_page.setdefault(page_num, []).append(
{
"change": classification["change"],
"change_type": classification["change_type"],
"change_subtype": change_subtype,
"relevant_text": classification["relevant_text"],
"explanation": classification["explanation"],
"nlp_evidence": classification["evidence"],
}
)
return {
"analysis_summary": analysis_summary,
"changes_by_page": changes_by_page,
}, results