File size: 4,816 Bytes
ce77033 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
import json
import os
from dotenv import load_dotenv
from openai import OpenAI
from tqdm import tqdm
from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES
from scripts.utility_functions import render_prompt
from scripts.pymupdf_nlp_preprocessing import extract_hierarchical_text
# Load environment variables from .env file
load_dotenv()
#nlp = spacy.load("de_core_news_sm")
api_key = os.getenv("OPENAI_API_KEY")
openai_client = OpenAI(api_key=api_key)
def create_prompt_without_nlp_insights(text):
return render_prompt(text, include_nlp=False)
def classify_changes_without_nlp_insights(text_content, location_info):
"""Classify changes in text chunks using OpenAI."""
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.",
},
{
"role": "user",
"content": create_prompt_without_nlp_insights(text_content),
},
],
temperature=0.7,
max_tokens=1024,
)
try:
result = json.loads(response.choices[0].message.content)
if result.get("changes_detected", False):
result["location"] = location_info
result["source_text"] = text_content
return result
return None
except json.JSONDecodeError:
return None
def traverse_blocks(
blocks, parent=None, grandparent=None, results=None, is_top_level=True
):
"""Traverse the hierarchical structure in a depth-first manner and analyze leaf nodes."""
if results is None:
results = []
iterable = (
tqdm(blocks, desc="Processing Text blocks with NLP") if is_top_level else blocks
)
for block in iterable:
# Add parent and grandparent references to the block for context tracking
block["parent"] = parent
if "children" in block and (
not block["children"] or len(block["children"]) == 0
): # This is a leaf node
# Extract hierarchical text
text_content = extract_hierarchical_text(block)
# Define location info
location_info = {
"page_number": block["page_number"],
"block_text": block["text"],
}
# Analyze the text for changes
changes = classify_changes_without_nlp_insights(text_content, location_info)
if changes:
# Add the full hierarchical text to the result
changes["text"] = text_content
results.append(changes)
else:
traverse_blocks(
block["children"], block, parent, results, is_top_level=False
)
return results
def pymupdf_regulatory_change_detector_without_nlp_insights(hierarchical_structure):
"""Main function to detect regulatory changes in the hierarchical structure."""
if not hierarchical_structure:
return {"error": "No hierarchical structure provided"}
analysis_summary = {
"total_changes_detected": 0,
"changes_by_type": {"addition": 0, "deletion": 0, "modification": 0},
}
changes_by_page = {}
# Traverse the blocks and analyze leaf nodes
results = traverse_blocks(hierarchical_structure["blocks"])
# Update analysis summary
for change in results:
analysis_summary["total_changes_detected"] += len(change["classifications"])
for classification in change["classifications"]:
change_type = classification["change_type"]
analysis_summary["changes_by_type"][change_type] += 1
# Group changes by page number
page_number = change["location"]["page_number"]
if page_number not in changes_by_page:
changes_by_page[page_number] = []
change_subtype = (
"context" if classification["change"] in CONTEXT_CATEGORIES else "scope"
)
changes_by_page[page_number].append(
{
"change": classification["change"],
"change_type": classification["change_type"],
"change_subtype": change_subtype,
"relevant_text": classification["relevant_text"],
"text": change["text"],
"explanation": classification["explanation"],
}
)
# Combine analysis summary and grouped changes
final_output = {
"analysis_summary": analysis_summary,
"changes_by_page": changes_by_page,
}
return final_output, results
|