regulens / scripts /llm_nlp_preprocessing.py
Maximilian Amougou
Upload 7 files
09a324c verified
import json
import os
import asyncio
from dotenv import load_dotenv
from openai import AsyncOpenAI
from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES
from scripts.utility_functions import call_nlp_service, render_prompt
# Load environment variables from .env file
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
openai_client = AsyncOpenAI(api_key=api_key, timeout=60)
async def preprocess_text_with_nlp(text, max_chunk_size=512, overlap=50):
result = await call_nlp_service({"text": text}, "preprocess_text_with_nlp_llm")
return result["chunks"], result["preprocessed_data"]
def create_prompt(chunk, preprocessed_data):
return render_prompt(chunk, include_nlp=True, preprocessed_data=preprocessed_data)
async def search_for_regulatory_changes(chunks, preprocessed_data, subtitle):
async def process_chunk(chunk):
try:
response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.",
},
{"role": "user", "content": create_prompt(chunk, preprocessed_data)},
],
temperature=0.7,
max_tokens=1024,
)
result = json.loads(response.choices[0].message.content)
if result.get("changes_detected", False):
result["location"] = {"subtitle": subtitle}
result["source_text"] = chunk
return result
except (json.JSONDecodeError, Exception):
return None
tasks = [process_chunk(chunk) for chunk in chunks]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
async def detect_regulatory_changes(text_content, subtitle):
"""
Main function to detect regulatory changes from text content.
Args:
text_content (str): The raw text content to analyze
subtitle (str): The subtitle associated with the content
Returns:
dict: Structured output containing detected changes and analysis summary
"""
# Preprocess text with enhanced NLP
chunks, preprocessed_data = await preprocess_text_with_nlp(text_content)
# Classify changes using NLP insights
results = await search_for_regulatory_changes(chunks, preprocessed_data, subtitle)
return results
def llm_regulatory_change_detector(hierarchical_structure, progress_callback=None, status_callback=None):
if hierarchical_structure:
analysis_summary = {
"total_changes_detected": 0,
"changes_by_type": {"additions": 0, "deletions": 0, "modifications": 0},
}
subtitles = {}
async def process_all_sections():
async def process_section(section):
subtitle = section["subtitle"]
content = section["content"]
if isinstance(content, list):
content = "\n".join(content)
# Detect changes for this subtitle
changes = await detect_regulatory_changes(content, subtitle)
return subtitle, changes
if status_callback:
status_callback(f"Processing all {len(hierarchical_structure['sections'])} sections concurrently...")
tasks = [process_section(section) for section in hierarchical_structure["sections"]]
results = await asyncio.gather(*tasks)
return results
# Run async processing
section_results = asyncio.run(process_all_sections())
# Process results
for subtitle, changes in section_results:
# Update analysis summary
for change in changes:
analysis_summary["total_changes_detected"] += len(
change["classifications"]
)
for classification in change["classifications"]:
change_type = classification["change_type"]
analysis_summary["changes_by_type"][f"{change_type}s"] += 1
# Group changes by subtitle
subtitles[subtitle] = []
for change in changes:
for classification in change["classifications"]:
change_subtype = (
"context"
if classification["change"] in CONTEXT_CATEGORIES
else "scope"
)
subtitles[subtitle].append(
{
"change": classification["change"],
"change_type": classification["change_type"],
"change_subtype": change_subtype,
"relevant_text": classification["relevant_text"],
"explanation": classification["explanation"],
"nlp_evidence": classification["evidence"],
}
)
# Combine analysis summary and grouped changes
final_output = {"analysis_summary": analysis_summary, "results": subtitles}
return final_output