regulens / scripts /pymupdf_no_nlp_preprocessing.py
Maximilian Amougou
Upload 6 files
d6b760c verified
import json
import os
import asyncio
from dotenv import load_dotenv
from openai import AsyncOpenAI
from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES
from scripts.utility_functions import render_prompt
from scripts.pymupdf_nlp_preprocessing import extract_hierarchical_text
# Load environment variables from .env file
load_dotenv()
#nlp = spacy.load("de_core_news_sm")
api_key = os.getenv("OPENAI_API_KEY")
openai_client = AsyncOpenAI(api_key=api_key, timeout=60)
def create_prompt_without_nlp_insights(text):
return render_prompt(text, include_nlp=False)
async def classify_changes_without_nlp_insights(text_content, location_info):
"""Classify changes in text chunks using OpenAI."""
try:
response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.",
},
{
"role": "user",
"content": create_prompt_without_nlp_insights(text_content),
},
],
temperature=0.7,
max_tokens=1024,
)
result = json.loads(response.choices[0].message.content)
if result.get("changes_detected", False):
result["location"] = location_info
result["source_text"] = text_content
return result
except (json.JSONDecodeError, Exception):
pass
return None
# Async wrapper for backward compatibility
async def classify_changes_without_nlp_insights_async(text_content, location_info):
return await classify_changes_without_nlp_insights(text_content, location_info)
async def traverse_blocks(blocks, parent=None):
"""Traverse the hierarchical structure and analyze leaf nodes using asyncio.gather()."""
async def process_block(block, parent):
block["parent"] = parent
if "children" in block and (not block["children"] or len(block["children"]) == 0): # Leaf node
# Extract hierarchical text
text_content = extract_hierarchical_text(block)
# Define location info
location_info = {
"page_number": block["page_number"],
"block_text": block["text"],
}
# Analyze the text for changes
changes = await classify_changes_without_nlp_insights(text_content, location_info)
if changes:
# Add the full hierarchical text to the result
changes["text"] = text_content
return [changes]
else:
# Process children recursively
return await traverse_blocks(block["children"], block)
return []
# Process all blocks concurrently
tasks = [process_block(block, parent) for block in blocks]
results = await asyncio.gather(*tasks)
# Flatten results
flattened = []
for result in results:
if isinstance(result, list):
flattened.extend(result)
return flattened
def pymupdf_regulatory_change_detector_without_nlp_insights(hierarchical_structure, progress_callback=None, status_callback=None):
"""Main function to detect regulatory changes in the hierarchical structure."""
if not hierarchical_structure:
return {"error": "No hierarchical structure provided"}
analysis_summary = {
"total_changes_detected": 0,
"changes_by_type": {"addition": 0, "deletion": 0, "modification": 0},
}
changes_by_page = {}
if status_callback:
status_callback("Analyzing all document blocks concurrently...")
# Run async processing
results = asyncio.run(traverse_blocks(hierarchical_structure["blocks"]))
# Update analysis summary
for change in results:
analysis_summary["total_changes_detected"] += len(change["classifications"])
for classification in change["classifications"]:
change_type = classification["change_type"]
analysis_summary["changes_by_type"][change_type] += 1
# Group changes by page number
page_number = change["location"]["page_number"]
if page_number not in changes_by_page:
changes_by_page[page_number] = []
change_subtype = (
"context" if classification["change"] in CONTEXT_CATEGORIES else "scope"
)
changes_by_page[page_number].append(
{
"change": classification["change"],
"change_type": classification["change_type"],
"change_subtype": change_subtype,
"relevant_text": classification["relevant_text"],
"text": change["text"],
"explanation": classification["explanation"],
}
)
# Combine analysis summary and grouped changes
final_output = {
"analysis_summary": analysis_summary,
"changes_by_page": changes_by_page,
}
return final_output, results