import json import os import asyncio from dotenv import load_dotenv from openai import AsyncOpenAI from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES from scripts.utility_functions import call_nlp_service, render_prompt # Load environment variables from .env file load_dotenv() api_key = os.getenv("OPENAI_API_KEY") openai_client = AsyncOpenAI(api_key=api_key, timeout=60) async def preprocess_text_with_nlp(text, max_chunk_size=512, overlap=50): """Enhanced NLP preprocessing identical to your first experiment using PyMuPDF text extraction""" return await call_nlp_service({"text": text}, "preprocess_text_with_nlp_pymupdf") def create_prompt_with_nlp(chunk, preprocessed_data): return render_prompt(chunk, include_nlp=True, preprocessed_data=preprocessed_data) async def classify_changes_with_nlp(text_content, location_info): """Classify changes with NLP preprocessing.""" # Apply NLP preprocessing preprocessed_data = await preprocess_text_with_nlp(text_content) # Split into chunks (using the same method as your first experiment) result = await call_nlp_service({"text": text_content}, "recursive_character_text_splitter") chunks = result["chunks"] async def process_chunk(chunk): try: response = await openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": "You are a legal expert analyzing German regulatory changes. Return only JSON.", }, { "role": "user", "content": create_prompt_with_nlp(chunk, preprocessed_data), }, ], temperature=0.7, max_tokens=1024, ) result = json.loads(response.choices[0].message.content) if result.get("changes_detected", False): result["location"] = location_info result["source_text"] = chunk return result except (json.JSONDecodeError, Exception): return None tasks = [process_chunk(chunk) for chunk in chunks] results = await asyncio.gather(*tasks) filtered_results = [r for r in results if r is not None] return filtered_results if filtered_results else None # Async wrapper for backward compatibility async def classify_changes_with_nlp_async(text_content, location_info): return await classify_changes_with_nlp(text_content, location_info) def extract_hierarchical_text(block): """Extract text from a block including its parent and grandparent contexts.""" text_parts = [] # Check if block has a grandparent if ( "parent" in block and block["parent"] is not None and "parent" in block["parent"] and block["parent"]["parent"] is not None ): text_parts.append(block["parent"]["parent"]["text"]) # Check if block has a parent if "parent" in block and block["parent"] is not None: text_parts.append(block["parent"]["text"]) # Add the current block's text text_parts.append(block["text"]) # Join all text parts with newlines between them return "\n\n".join(text_parts) async def traverse_blocks_with_nlp(blocks, parent=None): """Traverse hierarchy with NLP-enhanced analysis using asyncio.gather().""" async def process_block(block, parent): block["parent"] = parent if "children" in block and not block["children"]: # Leaf node text_content = extract_hierarchical_text(block) location_info = { "page_number": block["page_number"], "block_text": block["text"], } changes = await classify_changes_with_nlp(text_content, location_info) if changes: for change in changes: change["full_text"] = text_content return changes else: # Process children recursively return await traverse_blocks_with_nlp(block["children"], block) return [] # Process all blocks concurrently tasks = [process_block(block, parent) for block in blocks] results = await asyncio.gather(*tasks) # Flatten results flattened = [] for result in results: if isinstance(result, list): flattened.extend(result) return flattened def pymupdf_regulatory_change_detector_with_nlp_insights(hierarchical_structure, progress_callback=None, status_callback=None): """Main function with NLP integration.""" if not hierarchical_structure: return {"error": "No structure provided"}, [] analysis_summary = { "total_changes_detected": 0, "changes_by_type": {"addition": 0, "deletion": 0, "modification": 0}, } changes_by_page = {} if status_callback: status_callback("Analyzing all document blocks concurrently with NLP...") # Run async processing results = asyncio.run(traverse_blocks_with_nlp(hierarchical_structure["blocks"])) for change in results: analysis_summary["total_changes_detected"] += len(change["classifications"]) for classification in change["classifications"]: analysis_summary["changes_by_type"][classification["change_type"]] += 1 change_subtype = ( "context" if classification["change"] in CONTEXT_CATEGORIES else "scope" ) page_num = change["location"]["page_number"] changes_by_page.setdefault(page_num, []).append( { "change": classification["change"], "change_type": classification["change_type"], "change_subtype": change_subtype, "relevant_text": classification["relevant_text"], "explanation": classification["explanation"], "nlp_evidence": classification["evidence"], } ) return { "analysis_summary": analysis_summary, "changes_by_page": changes_by_page, }, results