import json import os import asyncio from dotenv import load_dotenv from openai import AsyncOpenAI from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES from scripts.utility_functions import render_prompt # Load environment variables from .env file load_dotenv() api_key = os.getenv("OPENAI_API_KEY") openai_client = AsyncOpenAI(api_key=api_key, timeout=60) def create_prompt_without_nlp_insights(text): return render_prompt(text, include_nlp=False) async def classify_changes_without_nlp_insights(text_content, subtitle): """Classify changes in text chunks using OpenAI.""" chunks = text_content.split("\n\n") async def process_chunk(chunk): try: response = await openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.", }, {"role": "user", "content": create_prompt_without_nlp_insights(chunk)}, ], temperature=0.7, max_tokens=1024, ) result = json.loads(response.choices[0].message.content) if result.get("changes_detected", False): result["location"] = {"subtitle": subtitle} result["source_text"] = chunk return result except (json.JSONDecodeError, Exception): return None tasks = [process_chunk(chunk) for chunk in chunks] results = await asyncio.gather(*tasks) return [r for r in results if r is not None] # Async wrapper for backward compatibility async def classify_changes_without_nlp_insights_async(text_content, subtitle): return await classify_changes_without_nlp_insights(text_content, subtitle) def llm_regulatory_change_detector_without_nlp_insights(hierarchical_structure, progress_callback=None, status_callback=None): if hierarchical_structure: analysis_summary = { "total_changes_detected": 0, "changes_by_type": {"additions": 0, "deletions": 0, "modifications": 0}, } subtitles = {} async def process_all_sections(): async def process_section(section): subtitle = section["subtitle"] content = section["content"] if isinstance(content, list): content = "\n".join(content) # Detect changes for this subtitle changes = await classify_changes_without_nlp_insights(content, subtitle) return subtitle, changes if status_callback: status_callback(f"Processing all {len(hierarchical_structure['sections'])} sections concurrently...") tasks = [process_section(section) for section in hierarchical_structure["sections"]] results = await asyncio.gather(*tasks) return results # Run async processing section_results = asyncio.run(process_all_sections()) # Process results for subtitle, changes in section_results: # Update analysis summary for change in changes: analysis_summary["total_changes_detected"] += len( change["classifications"] ) for classification in change["classifications"]: change_type = classification["change_type"] analysis_summary["changes_by_type"][f"{change_type}s"] += 1 # Group changes by subtitle subtitles[subtitle] = [] for change in changes: for classification in change["classifications"]: change_subtype = ( "context" if classification["change"] in CONTEXT_CATEGORIES else "scope" ) subtitles[subtitle].append( { "change": classification["change"], "change_type": classification["change_type"], "change_subtype": change_subtype, "relevant_text": classification["relevant_text"], "explanation": classification["explanation"], } ) # Combine analysis summary and grouped changes final_output = {"analysis_summary": analysis_summary, "results": subtitles} return final_output