regulens / scripts /llm_no_nlp_preprocessing.py
Maximilian Amougou
Upload 6 files
d6b760c verified
import json
import os
import asyncio
from dotenv import load_dotenv
from openai import AsyncOpenAI
from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES
from scripts.utility_functions import render_prompt
# Load environment variables from .env file
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
openai_client = AsyncOpenAI(api_key=api_key, timeout=60)
def create_prompt_without_nlp_insights(text):
return render_prompt(text, include_nlp=False)
async def classify_changes_without_nlp_insights(text_content, subtitle):
"""Classify changes in text chunks using OpenAI."""
chunks = text_content.split("\n\n")
async def process_chunk(chunk):
try:
response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.",
},
{"role": "user", "content": create_prompt_without_nlp_insights(chunk)},
],
temperature=0.7,
max_tokens=1024,
)
result = json.loads(response.choices[0].message.content)
if result.get("changes_detected", False):
result["location"] = {"subtitle": subtitle}
result["source_text"] = chunk
return result
except (json.JSONDecodeError, Exception):
return None
tasks = [process_chunk(chunk) for chunk in chunks]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
# Async wrapper for backward compatibility
async def classify_changes_without_nlp_insights_async(text_content, subtitle):
return await classify_changes_without_nlp_insights(text_content, subtitle)
def llm_regulatory_change_detector_without_nlp_insights(hierarchical_structure, progress_callback=None, status_callback=None):
if hierarchical_structure:
analysis_summary = {
"total_changes_detected": 0,
"changes_by_type": {"additions": 0, "deletions": 0, "modifications": 0},
}
subtitles = {}
async def process_all_sections():
async def process_section(section):
subtitle = section["subtitle"]
content = section["content"]
if isinstance(content, list):
content = "\n".join(content)
# Detect changes for this subtitle
changes = await classify_changes_without_nlp_insights(content, subtitle)
return subtitle, changes
if status_callback:
status_callback(f"Processing all {len(hierarchical_structure['sections'])} sections concurrently...")
tasks = [process_section(section) for section in hierarchical_structure["sections"]]
results = await asyncio.gather(*tasks)
return results
# Run async processing
section_results = asyncio.run(process_all_sections())
# Process results
for subtitle, changes in section_results:
# Update analysis summary
for change in changes:
analysis_summary["total_changes_detected"] += len(
change["classifications"]
)
for classification in change["classifications"]:
change_type = classification["change_type"]
analysis_summary["changes_by_type"][f"{change_type}s"] += 1
# Group changes by subtitle
subtitles[subtitle] = []
for change in changes:
for classification in change["classifications"]:
change_subtype = (
"context"
if classification["change"] in CONTEXT_CATEGORIES
else "scope"
)
subtitles[subtitle].append(
{
"change": classification["change"],
"change_type": classification["change_type"],
"change_subtype": change_subtype,
"relevant_text": classification["relevant_text"],
"explanation": classification["explanation"],
}
)
# Combine analysis summary and grouped changes
final_output = {"analysis_summary": analysis_summary, "results": subtitles}
return final_output