Spaces:
Running
Running
| import json | |
| import os | |
| import asyncio | |
| from dotenv import load_dotenv | |
| from openai import AsyncOpenAI | |
| from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES | |
| from scripts.utility_functions import call_nlp_service, render_prompt | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| openai_client = AsyncOpenAI(api_key=api_key, timeout=60) | |
| async def preprocess_text_with_nlp(text, max_chunk_size=512, overlap=50): | |
| result = await call_nlp_service({"text": text}, "preprocess_text_with_nlp_llm") | |
| return result["chunks"], result["preprocessed_data"] | |
| def create_prompt(chunk, preprocessed_data): | |
| return render_prompt(chunk, include_nlp=True, preprocessed_data=preprocessed_data) | |
| async def search_for_regulatory_changes(chunks, preprocessed_data, subtitle): | |
| async def process_chunk(chunk): | |
| try: | |
| response = await openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.", | |
| }, | |
| {"role": "user", "content": create_prompt(chunk, preprocessed_data)}, | |
| ], | |
| temperature=0.7, | |
| max_tokens=1024, | |
| ) | |
| result = json.loads(response.choices[0].message.content) | |
| if result.get("changes_detected", False): | |
| result["location"] = {"subtitle": subtitle} | |
| result["source_text"] = chunk | |
| return result | |
| except (json.JSONDecodeError, Exception): | |
| return None | |
| tasks = [process_chunk(chunk) for chunk in chunks] | |
| results = await asyncio.gather(*tasks) | |
| return [r for r in results if r is not None] | |
| async def detect_regulatory_changes(text_content, subtitle): | |
| """ | |
| Main function to detect regulatory changes from text content. | |
| Args: | |
| text_content (str): The raw text content to analyze | |
| subtitle (str): The subtitle associated with the content | |
| Returns: | |
| dict: Structured output containing detected changes and analysis summary | |
| """ | |
| # Preprocess text with enhanced NLP | |
| chunks, preprocessed_data = await preprocess_text_with_nlp(text_content) | |
| # Classify changes using NLP insights | |
| results = await search_for_regulatory_changes(chunks, preprocessed_data, subtitle) | |
| return results | |
| def llm_regulatory_change_detector(hierarchical_structure, progress_callback=None, status_callback=None): | |
| if hierarchical_structure: | |
| analysis_summary = { | |
| "total_changes_detected": 0, | |
| "changes_by_type": {"additions": 0, "deletions": 0, "modifications": 0}, | |
| } | |
| subtitles = {} | |
| async def process_all_sections(): | |
| async def process_section(section): | |
| subtitle = section["subtitle"] | |
| content = section["content"] | |
| if isinstance(content, list): | |
| content = "\n".join(content) | |
| # Detect changes for this subtitle | |
| changes = await detect_regulatory_changes(content, subtitle) | |
| return subtitle, changes | |
| if status_callback: | |
| status_callback(f"Processing all {len(hierarchical_structure['sections'])} sections concurrently...") | |
| tasks = [process_section(section) for section in hierarchical_structure["sections"]] | |
| results = await asyncio.gather(*tasks) | |
| return results | |
| # Run async processing | |
| section_results = asyncio.run(process_all_sections()) | |
| # Process results | |
| for subtitle, changes in section_results: | |
| # Update analysis summary | |
| for change in changes: | |
| analysis_summary["total_changes_detected"] += len( | |
| change["classifications"] | |
| ) | |
| for classification in change["classifications"]: | |
| change_type = classification["change_type"] | |
| analysis_summary["changes_by_type"][f"{change_type}s"] += 1 | |
| # Group changes by subtitle | |
| subtitles[subtitle] = [] | |
| for change in changes: | |
| for classification in change["classifications"]: | |
| change_subtype = ( | |
| "context" | |
| if classification["change"] in CONTEXT_CATEGORIES | |
| else "scope" | |
| ) | |
| subtitles[subtitle].append( | |
| { | |
| "change": classification["change"], | |
| "change_type": classification["change_type"], | |
| "change_subtype": change_subtype, | |
| "relevant_text": classification["relevant_text"], | |
| "explanation": classification["explanation"], | |
| "nlp_evidence": classification["evidence"], | |
| } | |
| ) | |
| # Combine analysis summary and grouped changes | |
| final_output = {"analysis_summary": analysis_summary, "results": subtitles} | |
| return final_output | |