|
|
import json |
|
|
import os |
|
|
from dotenv import load_dotenv |
|
|
from openai import OpenAI |
|
|
from tqdm import tqdm |
|
|
from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES |
|
|
from scripts.utility_functions import call_nlp_service, render_prompt |
|
|
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
api_key = os.getenv("OPENAI_API_KEY") |
|
|
openai_client = OpenAI(api_key=api_key) |
|
|
|
|
|
|
|
|
def preprocess_text_with_nlp(text, max_chunk_size=512, overlap=50): |
|
|
"""Enhanced NLP preprocessing identical to your first experiment using PyMuPDF text extraction""" |
|
|
return call_nlp_service({"text": text}, "preprocess_text_with_nlp_pymupdf") |
|
|
|
|
|
|
|
|
def create_prompt_with_nlp(chunk, preprocessed_data): |
|
|
return render_prompt(chunk, include_nlp=True, preprocessed_data=preprocessed_data) |
|
|
|
|
|
|
|
|
def classify_changes_with_nlp(text_content, location_info): |
|
|
"""Classify changes with NLP preprocessing.""" |
|
|
|
|
|
preprocessed_data = preprocess_text_with_nlp(text_content) |
|
|
|
|
|
|
|
|
result = call_nlp_service({"text": text_content}, "recursive_character_text_splitter") |
|
|
chunks = result["chunks"] |
|
|
|
|
|
results = [] |
|
|
for chunk in chunks: |
|
|
response = openai_client.chat.completions.create( |
|
|
model="gpt-4o-mini", |
|
|
messages=[ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": "You are a legal expert analyzing German regulatory changes. Return only JSON.", |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": create_prompt_with_nlp(chunk, preprocessed_data), |
|
|
}, |
|
|
], |
|
|
temperature=0.7, |
|
|
max_tokens=1024, |
|
|
) |
|
|
|
|
|
try: |
|
|
result = json.loads(response.choices[0].message.content) |
|
|
if result.get("changes_detected", False): |
|
|
result["location"] = location_info |
|
|
result["source_text"] = chunk |
|
|
results.append(result) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
|
|
|
return results if results else None |
|
|
|
|
|
|
|
|
def extract_hierarchical_text(block): |
|
|
"""Extract text from a block including its parent and grandparent contexts.""" |
|
|
text_parts = [] |
|
|
|
|
|
|
|
|
if ( |
|
|
"parent" in block |
|
|
and block["parent"] is not None |
|
|
and "parent" in block["parent"] |
|
|
and block["parent"]["parent"] is not None |
|
|
): |
|
|
text_parts.append(block["parent"]["parent"]["text"]) |
|
|
|
|
|
|
|
|
if "parent" in block and block["parent"] is not None: |
|
|
text_parts.append(block["parent"]["text"]) |
|
|
|
|
|
|
|
|
text_parts.append(block["text"]) |
|
|
|
|
|
|
|
|
return "\n\n".join(text_parts) |
|
|
|
|
|
|
|
|
def traverse_blocks_with_nlp(blocks, parent=None, results=None, is_top_level=True): |
|
|
"""Traverse hierarchy with NLP-enhanced analysis.""" |
|
|
if results is None: |
|
|
results = [] |
|
|
|
|
|
iterable = ( |
|
|
tqdm(blocks, desc="Processing Text blocks with NLP") if is_top_level else blocks |
|
|
) |
|
|
|
|
|
for block in iterable: |
|
|
block["parent"] = parent |
|
|
|
|
|
if "children" in block and not block["children"]: |
|
|
text_content = extract_hierarchical_text(block) |
|
|
location_info = { |
|
|
"page_number": block["page_number"], |
|
|
"block_text": block["text"], |
|
|
} |
|
|
|
|
|
changes = classify_changes_with_nlp(text_content, location_info) |
|
|
if changes: |
|
|
for change in changes: |
|
|
change["full_text"] = text_content |
|
|
results.append(change) |
|
|
else: |
|
|
traverse_blocks_with_nlp( |
|
|
block["children"], block, results, is_top_level=False |
|
|
) |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
def pymupdf_regulatory_change_detector_with_nlp_insights(hierarchical_structure): |
|
|
"""Main function with NLP integration.""" |
|
|
if not hierarchical_structure: |
|
|
return {"error": "No structure provided"}, [] |
|
|
|
|
|
analysis_summary = { |
|
|
"total_changes_detected": 0, |
|
|
"changes_by_type": {"addition": 0, "deletion": 0, "modification": 0}, |
|
|
} |
|
|
changes_by_page = {} |
|
|
|
|
|
results = traverse_blocks_with_nlp(hierarchical_structure["blocks"]) |
|
|
|
|
|
for change in results: |
|
|
analysis_summary["total_changes_detected"] += len(change["classifications"]) |
|
|
for classification in change["classifications"]: |
|
|
analysis_summary["changes_by_type"][classification["change_type"]] += 1 |
|
|
|
|
|
change_subtype = ( |
|
|
"context" if classification["change"] in CONTEXT_CATEGORIES else "scope" |
|
|
) |
|
|
page_num = change["location"]["page_number"] |
|
|
changes_by_page.setdefault(page_num, []).append( |
|
|
{ |
|
|
"change": classification["change"], |
|
|
"change_type": classification["change_type"], |
|
|
"change_subtype": change_subtype, |
|
|
"relevant_text": classification["relevant_text"], |
|
|
"explanation": classification["explanation"], |
|
|
"nlp_evidence": classification["evidence"], |
|
|
} |
|
|
) |
|
|
|
|
|
return { |
|
|
"analysis_summary": analysis_summary, |
|
|
"changes_by_page": changes_by_page, |
|
|
}, results |
|
|
|