import json import os import asyncio from dotenv import load_dotenv from openai import AsyncOpenAI from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES from scripts.utility_functions import render_prompt from scripts.pymupdf_nlp_preprocessing import extract_hierarchical_text # Load environment variables from .env file load_dotenv() #nlp = spacy.load("de_core_news_sm") api_key = os.getenv("OPENAI_API_KEY") openai_client = AsyncOpenAI(api_key=api_key, timeout=60) def create_prompt_without_nlp_insights(text): return render_prompt(text, include_nlp=False) async def classify_changes_without_nlp_insights(text_content, location_info): """Classify changes in text chunks using OpenAI.""" try: response = await openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.", }, { "role": "user", "content": create_prompt_without_nlp_insights(text_content), }, ], temperature=0.7, max_tokens=1024, ) result = json.loads(response.choices[0].message.content) if result.get("changes_detected", False): result["location"] = location_info result["source_text"] = text_content return result except (json.JSONDecodeError, Exception): pass return None # Async wrapper for backward compatibility async def classify_changes_without_nlp_insights_async(text_content, location_info): return await classify_changes_without_nlp_insights(text_content, location_info) async def traverse_blocks(blocks, parent=None): """Traverse the hierarchical structure and analyze leaf nodes using asyncio.gather().""" async def process_block(block, parent): block["parent"] = parent if "children" in block and (not block["children"] or len(block["children"]) == 0): # Leaf node # Extract hierarchical text text_content = extract_hierarchical_text(block) # Define location info location_info = { "page_number": block["page_number"], "block_text": block["text"], } # Analyze the text for changes changes = await classify_changes_without_nlp_insights(text_content, location_info) if changes: # Add the full hierarchical text to the result changes["text"] = text_content return [changes] else: # Process children recursively return await traverse_blocks(block["children"], block) return [] # Process all blocks concurrently tasks = [process_block(block, parent) for block in blocks] results = await asyncio.gather(*tasks) # Flatten results flattened = [] for result in results: if isinstance(result, list): flattened.extend(result) return flattened def pymupdf_regulatory_change_detector_without_nlp_insights(hierarchical_structure, progress_callback=None, status_callback=None): """Main function to detect regulatory changes in the hierarchical structure.""" if not hierarchical_structure: return {"error": "No hierarchical structure provided"} analysis_summary = { "total_changes_detected": 0, "changes_by_type": {"addition": 0, "deletion": 0, "modification": 0}, } changes_by_page = {} if status_callback: status_callback("Analyzing all document blocks concurrently...") # Run async processing results = asyncio.run(traverse_blocks(hierarchical_structure["blocks"])) # Update analysis summary for change in results: analysis_summary["total_changes_detected"] += len(change["classifications"]) for classification in change["classifications"]: change_type = classification["change_type"] analysis_summary["changes_by_type"][change_type] += 1 # Group changes by page number page_number = change["location"]["page_number"] if page_number not in changes_by_page: changes_by_page[page_number] = [] change_subtype = ( "context" if classification["change"] in CONTEXT_CATEGORIES else "scope" ) changes_by_page[page_number].append( { "change": classification["change"], "change_type": classification["change_type"], "change_subtype": change_subtype, "relevant_text": classification["relevant_text"], "text": change["text"], "explanation": classification["explanation"], } ) # Combine analysis summary and grouped changes final_output = { "analysis_summary": analysis_summary, "changes_by_page": changes_by_page, } return final_output, results