import json import os import asyncio from dotenv import load_dotenv from openai import AsyncOpenAI from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES from scripts.utility_functions import call_nlp_service, render_prompt # Load environment variables from .env file load_dotenv() api_key = os.getenv("OPENAI_API_KEY") openai_client = AsyncOpenAI(api_key=api_key, timeout=60) async def preprocess_text_with_nlp(text, max_chunk_size=512, overlap=50): result = await call_nlp_service({"text": text}, "preprocess_text_with_nlp_llm") return result["chunks"], result["preprocessed_data"] def create_prompt(chunk, preprocessed_data): return render_prompt(chunk, include_nlp=True, preprocessed_data=preprocessed_data) async def search_for_regulatory_changes(chunks, preprocessed_data, subtitle): async def process_chunk(chunk): try: response = await openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.", }, {"role": "user", "content": create_prompt(chunk, preprocessed_data)}, ], temperature=0.7, max_tokens=1024, ) result = json.loads(response.choices[0].message.content) if result.get("changes_detected", False): result["location"] = {"subtitle": subtitle} result["source_text"] = chunk return result except (json.JSONDecodeError, Exception): return None tasks = [process_chunk(chunk) for chunk in chunks] results = await asyncio.gather(*tasks) return [r for r in results if r is not None] async def detect_regulatory_changes(text_content, subtitle): """ Main function to detect regulatory changes from text content. Args: text_content (str): The raw text content to analyze subtitle (str): The subtitle associated with the content Returns: dict: Structured output containing detected changes and analysis summary """ # Preprocess text with enhanced NLP chunks, preprocessed_data = await preprocess_text_with_nlp(text_content) # Classify changes using NLP insights results = await search_for_regulatory_changes(chunks, preprocessed_data, subtitle) return results def llm_regulatory_change_detector(hierarchical_structure, progress_callback=None, status_callback=None): if hierarchical_structure: analysis_summary = { "total_changes_detected": 0, "changes_by_type": {"additions": 0, "deletions": 0, "modifications": 0}, } subtitles = {} async def process_all_sections(): async def process_section(section): subtitle = section["subtitle"] content = section["content"] if isinstance(content, list): content = "\n".join(content) # Detect changes for this subtitle changes = await detect_regulatory_changes(content, subtitle) return subtitle, changes if status_callback: status_callback(f"Processing all {len(hierarchical_structure['sections'])} sections concurrently...") tasks = [process_section(section) for section in hierarchical_structure["sections"]] results = await asyncio.gather(*tasks) return results # Run async processing section_results = asyncio.run(process_all_sections()) # Process results for subtitle, changes in section_results: # Update analysis summary for change in changes: analysis_summary["total_changes_detected"] += len( change["classifications"] ) for classification in change["classifications"]: change_type = classification["change_type"] analysis_summary["changes_by_type"][f"{change_type}s"] += 1 # Group changes by subtitle subtitles[subtitle] = [] for change in changes: for classification in change["classifications"]: change_subtype = ( "context" if classification["change"] in CONTEXT_CATEGORIES else "scope" ) subtitles[subtitle].append( { "change": classification["change"], "change_type": classification["change_type"], "change_subtype": change_subtype, "relevant_text": classification["relevant_text"], "explanation": classification["explanation"], "nlp_evidence": classification["evidence"], } ) # Combine analysis summary and grouped changes final_output = {"analysis_summary": analysis_summary, "results": subtitles} return final_output