import json import os from dotenv import load_dotenv from openai import OpenAI from tqdm import tqdm from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES from scripts.utility_functions import render_prompt # Load environment variables from .env file load_dotenv() api_key = os.getenv("OPENAI_API_KEY") openai_client = OpenAI(api_key=api_key) def create_prompt_without_nlp_insights(text): return render_prompt(text, include_nlp=False) def classify_changes_without_nlp_insights(text_content, subtitle): """Classify changes in text chunks using OpenAI.""" chunks = text_content.split("\n\n") results = [] for chunk in chunks: response = openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.", }, {"role": "user", "content": create_prompt_without_nlp_insights(chunk)}, ], temperature=0.7, max_tokens=1024, ) try: result = json.loads(response.choices[0].message.content) if result.get("changes_detected", False): result["location"] = {"subtitle": subtitle} # Use subtitle as location result["source_text"] = chunk results.append(result) except json.JSONDecodeError: continue return results def llm_regulatory_change_detector_without_nlp_insights(hierarchical_structure): if hierarchical_structure: analysis_summary = { "total_changes_detected": 0, "changes_by_type": {"additions": 0, "deletions": 0, "modifications": 0}, } subtitles = {} # Iterate over sections and analyze content for section in tqdm( hierarchical_structure["sections"], desc="Analyzing Sections" ): subtitle = section["subtitle"] content = section["content"] if isinstance(content, list): content = "\n".join(content) # Detect changes for this subtitle changes = classify_changes_without_nlp_insights(content, subtitle) # Update analysis summary for change in changes: analysis_summary["total_changes_detected"] += len( change["classifications"] ) for classification in change["classifications"]: change_type = classification["change_type"] analysis_summary["changes_by_type"][f"{change_type}s"] += 1 # Group changes by subtitle subtitles[subtitle] = [] for change in changes: for classification in change["classifications"]: change_subtype = ( "context" if classification["change"] in CONTEXT_CATEGORIES else "scope" ) subtitles[subtitle].append( { "change": classification["change"], "change_type": classification["change_type"], "change_subtype": change_subtype, "relevant_text": classification["relevant_text"], "explanation": classification["explanation"], } ) # Combine analysis summary and grouped changes final_output = {"analysis_summary": analysis_summary, "results": subtitles} return final_output