Spaces:
Running
Running
File size: 5,440 Bytes
ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 09a324c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 d6b760c ce77033 09a324c ce77033 d6b760c ce77033 bdc7d9a ce77033 d6b760c bdc7d9a d6b760c ce77033 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | import json
import os
import asyncio
from dotenv import load_dotenv
from openai import AsyncOpenAI
from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES
from scripts.utility_functions import call_nlp_service, render_prompt
# Load environment variables from .env file
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
openai_client = AsyncOpenAI(api_key=api_key, timeout=60)
async def preprocess_text_with_nlp(text, max_chunk_size=512, overlap=50):
result = await call_nlp_service({"text": text}, "preprocess_text_with_nlp_llm")
return result["chunks"], result["preprocessed_data"]
def create_prompt(chunk, preprocessed_data):
return render_prompt(chunk, include_nlp=True, preprocessed_data=preprocessed_data)
async def search_for_regulatory_changes(chunks, preprocessed_data, subtitle):
async def process_chunk(chunk):
try:
response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.",
},
{"role": "user", "content": create_prompt(chunk, preprocessed_data)},
],
temperature=0.7,
max_tokens=1024,
)
result = json.loads(response.choices[0].message.content)
if result.get("changes_detected", False):
result["location"] = {"subtitle": subtitle}
result["source_text"] = chunk
return result
except (json.JSONDecodeError, Exception):
return None
tasks = [process_chunk(chunk) for chunk in chunks]
results = await asyncio.gather(*tasks)
return [r for r in results if r is not None]
async def detect_regulatory_changes(text_content, subtitle):
"""
Main function to detect regulatory changes from text content.
Args:
text_content (str): The raw text content to analyze
subtitle (str): The subtitle associated with the content
Returns:
dict: Structured output containing detected changes and analysis summary
"""
# Preprocess text with enhanced NLP
chunks, preprocessed_data = await preprocess_text_with_nlp(text_content)
# Classify changes using NLP insights
results = await search_for_regulatory_changes(chunks, preprocessed_data, subtitle)
return results
def llm_regulatory_change_detector(hierarchical_structure, progress_callback=None, status_callback=None):
if hierarchical_structure:
analysis_summary = {
"total_changes_detected": 0,
"changes_by_type": {"additions": 0, "deletions": 0, "modifications": 0},
}
subtitles = {}
async def process_all_sections():
async def process_section(section):
subtitle = section["subtitle"]
content = section["content"]
if isinstance(content, list):
content = "\n".join(content)
# Detect changes for this subtitle
changes = await detect_regulatory_changes(content, subtitle)
return subtitle, changes
if status_callback:
status_callback(f"Processing all {len(hierarchical_structure['sections'])} sections concurrently...")
tasks = [process_section(section) for section in hierarchical_structure["sections"]]
results = await asyncio.gather(*tasks)
return results
# Run async processing
section_results = asyncio.run(process_all_sections())
# Process results
for subtitle, changes in section_results:
# Update analysis summary
for change in changes:
analysis_summary["total_changes_detected"] += len(
change["classifications"]
)
for classification in change["classifications"]:
change_type = classification["change_type"]
analysis_summary["changes_by_type"][f"{change_type}s"] += 1
# Group changes by subtitle
subtitles[subtitle] = []
for change in changes:
for classification in change["classifications"]:
change_subtype = (
"context"
if classification["change"] in CONTEXT_CATEGORIES
else "scope"
)
subtitles[subtitle].append(
{
"change": classification["change"],
"change_type": classification["change_type"],
"change_subtype": change_subtype,
"relevant_text": classification["relevant_text"],
"explanation": classification["explanation"],
"nlp_evidence": classification["evidence"],
}
)
# Combine analysis summary and grouped changes
final_output = {"analysis_summary": analysis_summary, "results": subtitles}
return final_output
|