Spaces:
Running
Running
| import json | |
| import os | |
| import asyncio | |
| from dotenv import load_dotenv | |
| from openai import AsyncOpenAI | |
| from scripts.regulatory_change_foundation import CONTEXT_CATEGORIES | |
| from scripts.utility_functions import call_nlp_service, render_prompt | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| openai_client = AsyncOpenAI(api_key=api_key, timeout=60) | |
| async def preprocess_text_with_nlp(text, max_chunk_size=512, overlap=50): | |
| """Enhanced NLP preprocessing identical to your first experiment using PyMuPDF text extraction""" | |
| return await call_nlp_service({"text": text}, "preprocess_text_with_nlp_pymupdf") | |
| def create_prompt_with_nlp(chunk, preprocessed_data): | |
| return render_prompt(chunk, include_nlp=True, preprocessed_data=preprocessed_data) | |
| async def classify_changes_with_nlp(text_content, location_info): | |
| """Classify changes with NLP preprocessing.""" | |
| # Apply NLP preprocessing | |
| preprocessed_data = await preprocess_text_with_nlp(text_content) | |
| # Split into chunks (using the same method as your first experiment) | |
| result = await call_nlp_service({"text": text_content}, "recursive_character_text_splitter") | |
| chunks = result["chunks"] | |
| async def process_chunk(chunk): | |
| try: | |
| response = await openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "You are a legal expert analyzing German regulatory changes. Return only JSON.", | |
| }, | |
| { | |
| "role": "user", | |
| "content": create_prompt_with_nlp(chunk, preprocessed_data), | |
| }, | |
| ], | |
| temperature=0.7, | |
| max_tokens=1024, | |
| ) | |
| result = json.loads(response.choices[0].message.content) | |
| if result.get("changes_detected", False): | |
| result["location"] = location_info | |
| result["source_text"] = chunk | |
| return result | |
| except (json.JSONDecodeError, Exception): | |
| return None | |
| tasks = [process_chunk(chunk) for chunk in chunks] | |
| results = await asyncio.gather(*tasks) | |
| filtered_results = [r for r in results if r is not None] | |
| return filtered_results if filtered_results else None | |
| # Async wrapper for backward compatibility | |
| async def classify_changes_with_nlp_async(text_content, location_info): | |
| return await classify_changes_with_nlp(text_content, location_info) | |
| def extract_hierarchical_text(block): | |
| """Extract text from a block including its parent and grandparent contexts.""" | |
| text_parts = [] | |
| # Check if block has a grandparent | |
| if ( | |
| "parent" in block | |
| and block["parent"] is not None | |
| and "parent" in block["parent"] | |
| and block["parent"]["parent"] is not None | |
| ): | |
| text_parts.append(block["parent"]["parent"]["text"]) | |
| # Check if block has a parent | |
| if "parent" in block and block["parent"] is not None: | |
| text_parts.append(block["parent"]["text"]) | |
| # Add the current block's text | |
| text_parts.append(block["text"]) | |
| # Join all text parts with newlines between them | |
| return "\n\n".join(text_parts) | |
| async def traverse_blocks_with_nlp(blocks, parent=None): | |
| """Traverse hierarchy with NLP-enhanced analysis using asyncio.gather().""" | |
| async def process_block(block, parent): | |
| block["parent"] = parent | |
| if "children" in block and not block["children"]: # Leaf node | |
| text_content = extract_hierarchical_text(block) | |
| location_info = { | |
| "page_number": block["page_number"], | |
| "block_text": block["text"], | |
| } | |
| changes = await classify_changes_with_nlp(text_content, location_info) | |
| if changes: | |
| for change in changes: | |
| change["full_text"] = text_content | |
| return changes | |
| else: | |
| # Process children recursively | |
| return await traverse_blocks_with_nlp(block["children"], block) | |
| return [] | |
| # Process all blocks concurrently | |
| tasks = [process_block(block, parent) for block in blocks] | |
| results = await asyncio.gather(*tasks) | |
| # Flatten results | |
| flattened = [] | |
| for result in results: | |
| if isinstance(result, list): | |
| flattened.extend(result) | |
| return flattened | |
| def pymupdf_regulatory_change_detector_with_nlp_insights(hierarchical_structure, progress_callback=None, status_callback=None): | |
| """Main function with NLP integration.""" | |
| if not hierarchical_structure: | |
| return {"error": "No structure provided"}, [] | |
| analysis_summary = { | |
| "total_changes_detected": 0, | |
| "changes_by_type": {"addition": 0, "deletion": 0, "modification": 0}, | |
| } | |
| changes_by_page = {} | |
| if status_callback: | |
| status_callback("Analyzing all document blocks concurrently with NLP...") | |
| # Run async processing | |
| results = asyncio.run(traverse_blocks_with_nlp(hierarchical_structure["blocks"])) | |
| for change in results: | |
| analysis_summary["total_changes_detected"] += len(change["classifications"]) | |
| for classification in change["classifications"]: | |
| analysis_summary["changes_by_type"][classification["change_type"]] += 1 | |
| change_subtype = ( | |
| "context" if classification["change"] in CONTEXT_CATEGORIES else "scope" | |
| ) | |
| page_num = change["location"]["page_number"] | |
| changes_by_page.setdefault(page_num, []).append( | |
| { | |
| "change": classification["change"], | |
| "change_type": classification["change_type"], | |
| "change_subtype": change_subtype, | |
| "relevant_text": classification["relevant_text"], | |
| "explanation": classification["explanation"], | |
| "nlp_evidence": classification["evidence"], | |
| } | |
| ) | |
| return { | |
| "analysis_summary": analysis_summary, | |
| "changes_by_page": changes_by_page, | |
| }, results | |