Spaces:
Running
Running
File size: 5,288 Bytes
ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c bdc7d9a d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 bdc7d9a ce77033 bdc7d9a d6b760c bdc7d9a d6b760c ce77033 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | import json
import os
import asyncio
from dotenv import load_dotenv
from openai import AsyncOpenAI
from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES
from scripts.utility_functions import render_prompt
from scripts.pymupdf_nlp_preprocessing import extract_hierarchical_text
# Load environment variables from .env file
load_dotenv()
#nlp = spacy.load("de_core_news_sm")
api_key = os.getenv("OPENAI_API_KEY")
openai_client = AsyncOpenAI(api_key=api_key, timeout=60)
def create_prompt_without_nlp_insights(text):
return render_prompt(text, include_nlp=False)
async def classify_changes_without_nlp_insights(text_content, location_info):
"""Classify changes in text chunks using OpenAI."""
try:
response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.",
},
{
"role": "user",
"content": create_prompt_without_nlp_insights(text_content),
},
],
temperature=0.7,
max_tokens=1024,
)
result = json.loads(response.choices[0].message.content)
if result.get("changes_detected", False):
result["location"] = location_info
result["source_text"] = text_content
return result
except (json.JSONDecodeError, Exception):
pass
return None
# Async wrapper for backward compatibility
async def classify_changes_without_nlp_insights_async(text_content, location_info):
return await classify_changes_without_nlp_insights(text_content, location_info)
async def traverse_blocks(blocks, parent=None):
"""Traverse the hierarchical structure and analyze leaf nodes using asyncio.gather()."""
async def process_block(block, parent):
block["parent"] = parent
if "children" in block and (not block["children"] or len(block["children"]) == 0): # Leaf node
# Extract hierarchical text
text_content = extract_hierarchical_text(block)
# Define location info
location_info = {
"page_number": block["page_number"],
"block_text": block["text"],
}
# Analyze the text for changes
changes = await classify_changes_without_nlp_insights(text_content, location_info)
if changes:
# Add the full hierarchical text to the result
changes["text"] = text_content
return [changes]
else:
# Process children recursively
return await traverse_blocks(block["children"], block)
return []
# Process all blocks concurrently
tasks = [process_block(block, parent) for block in blocks]
results = await asyncio.gather(*tasks)
# Flatten results
flattened = []
for result in results:
if isinstance(result, list):
flattened.extend(result)
return flattened
def pymupdf_regulatory_change_detector_without_nlp_insights(hierarchical_structure, progress_callback=None, status_callback=None):
"""Main function to detect regulatory changes in the hierarchical structure."""
if not hierarchical_structure:
return {"error": "No hierarchical structure provided"}
analysis_summary = {
"total_changes_detected": 0,
"changes_by_type": {"addition": 0, "deletion": 0, "modification": 0},
}
changes_by_page = {}
if status_callback:
status_callback("Analyzing all document blocks concurrently...")
# Run async processing
results = asyncio.run(traverse_blocks(hierarchical_structure["blocks"]))
# Update analysis summary
for change in results:
analysis_summary["total_changes_detected"] += len(change["classifications"])
for classification in change["classifications"]:
change_type = classification["change_type"]
analysis_summary["changes_by_type"][change_type] += 1
# Group changes by page number
page_number = change["location"]["page_number"]
if page_number not in changes_by_page:
changes_by_page[page_number] = []
change_subtype = (
"context" if classification["change"] in CONTEXT_CATEGORIES else "scope"
)
changes_by_page[page_number].append(
{
"change": classification["change"],
"change_type": classification["change_type"],
"change_subtype": change_subtype,
"relevant_text": classification["relevant_text"],
"text": change["text"],
"explanation": classification["explanation"],
}
)
# Combine analysis summary and grouped changes
final_output = {
"analysis_summary": analysis_summary,
"changes_by_page": changes_by_page,
}
return final_output, results
|