Spaces:
Running
Running
| import json | |
| import os | |
| import asyncio | |
| from dotenv import load_dotenv | |
| from openai import AsyncOpenAI | |
| from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES | |
| from scripts.utility_functions import render_prompt | |
| from scripts.pymupdf_nlp_preprocessing import extract_hierarchical_text | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| #nlp = spacy.load("de_core_news_sm") | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| openai_client = AsyncOpenAI(api_key=api_key, timeout=60) | |
| def create_prompt_without_nlp_insights(text): | |
| return render_prompt(text, include_nlp=False) | |
| async def classify_changes_without_nlp_insights(text_content, location_info): | |
| """Classify changes in text chunks using OpenAI.""" | |
| try: | |
| response = await openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "You are a legal expert specializing in analyzing German regulatory documents with a focus on identifying regulatory changes. Only return JSON output.", | |
| }, | |
| { | |
| "role": "user", | |
| "content": create_prompt_without_nlp_insights(text_content), | |
| }, | |
| ], | |
| temperature=0.7, | |
| max_tokens=1024, | |
| ) | |
| result = json.loads(response.choices[0].message.content) | |
| if result.get("changes_detected", False): | |
| result["location"] = location_info | |
| result["source_text"] = text_content | |
| return result | |
| except (json.JSONDecodeError, Exception): | |
| pass | |
| return None | |
| # Async wrapper for backward compatibility | |
| async def classify_changes_without_nlp_insights_async(text_content, location_info): | |
| return await classify_changes_without_nlp_insights(text_content, location_info) | |
| async def traverse_blocks(blocks, parent=None): | |
| """Traverse the hierarchical structure and analyze leaf nodes using asyncio.gather().""" | |
| async def process_block(block, parent): | |
| block["parent"] = parent | |
| if "children" in block and (not block["children"] or len(block["children"]) == 0): # Leaf node | |
| # Extract hierarchical text | |
| text_content = extract_hierarchical_text(block) | |
| # Define location info | |
| location_info = { | |
| "page_number": block["page_number"], | |
| "block_text": block["text"], | |
| } | |
| # Analyze the text for changes | |
| changes = await classify_changes_without_nlp_insights(text_content, location_info) | |
| if changes: | |
| # Add the full hierarchical text to the result | |
| changes["text"] = text_content | |
| return [changes] | |
| else: | |
| # Process children recursively | |
| return await traverse_blocks(block["children"], block) | |
| return [] | |
| # Process all blocks concurrently | |
| tasks = [process_block(block, parent) for block in blocks] | |
| results = await asyncio.gather(*tasks) | |
| # Flatten results | |
| flattened = [] | |
| for result in results: | |
| if isinstance(result, list): | |
| flattened.extend(result) | |
| return flattened | |
| def pymupdf_regulatory_change_detector_without_nlp_insights(hierarchical_structure, progress_callback=None, status_callback=None): | |
| """Main function to detect regulatory changes in the hierarchical structure.""" | |
| if not hierarchical_structure: | |
| return {"error": "No hierarchical structure provided"} | |
| analysis_summary = { | |
| "total_changes_detected": 0, | |
| "changes_by_type": {"addition": 0, "deletion": 0, "modification": 0}, | |
| } | |
| changes_by_page = {} | |
| if status_callback: | |
| status_callback("Analyzing all document blocks concurrently...") | |
| # Run async processing | |
| results = asyncio.run(traverse_blocks(hierarchical_structure["blocks"])) | |
| # Update analysis summary | |
| for change in results: | |
| analysis_summary["total_changes_detected"] += len(change["classifications"]) | |
| for classification in change["classifications"]: | |
| change_type = classification["change_type"] | |
| analysis_summary["changes_by_type"][change_type] += 1 | |
| # Group changes by page number | |
| page_number = change["location"]["page_number"] | |
| if page_number not in changes_by_page: | |
| changes_by_page[page_number] = [] | |
| change_subtype = ( | |
| "context" if classification["change"] in CONTEXT_CATEGORIES else "scope" | |
| ) | |
| changes_by_page[page_number].append( | |
| { | |
| "change": classification["change"], | |
| "change_type": classification["change_type"], | |
| "change_subtype": change_subtype, | |
| "relevant_text": classification["relevant_text"], | |
| "text": change["text"], | |
| "explanation": classification["explanation"], | |
| } | |
| ) | |
| # Combine analysis summary and grouped changes | |
| final_output = { | |
| "analysis_summary": analysis_summary, | |
| "changes_by_page": changes_by_page, | |
| } | |
| return final_output, results | |