Spaces:
Running
Running
| import difflib | |
| import html | |
| import os | |
| import json | |
| import re | |
| import time | |
| import random | |
| import asyncio | |
| import httpx | |
| from dotenv import load_dotenv | |
| import pymupdf | |
| import pymupdf4llm | |
| from rapidfuzz import fuzz | |
| from agentic_doc.parse import parse | |
| import requests | |
| from scripts.models import RegulatoryChange | |
| from scripts.regulatory_change_foundation import ( | |
| CLASSIFICATION_INFO, | |
| FEW_SHOT_EXAMPLES, | |
| BASE_PROMPT_TEMPLATE, | |
| ) | |
| load_dotenv() | |
| # Define hex colors as RGB tuples (0–1 range) | |
| color_mapping_old = { | |
| "addition": (0, 0.4, 0), # green | |
| "deletion": (1, 0, 0), # red | |
| "modification": (0, 0.6, 1), # blue | |
| } | |
| color_mapping = { | |
| "addition": (0.0, 0.45, 0.7), # blue | |
| "deletion": (0.9, 0.6, 0.0), # orange | |
| "modification": (0.5, 0.5, 0.5), # gray | |
| } | |
| def to_rgb(color_tuple): | |
| return f"rgb({int(color_tuple[0] * 255)}, {int(color_tuple[1] * 255)}, {int(color_tuple[2] * 255)})" | |
| css_styles = f""" | |
| <style> | |
| .custom-link {{ | |
| display: inline-block; | |
| padding: 8px 16px; | |
| color: white !important; | |
| text-decoration: none; | |
| border-radius: 8px; | |
| transition: background-color 0.3s ease; | |
| }} | |
| .custom-link:hover {{ | |
| background-color: #45a049; | |
| }} | |
| .tooltip {{ | |
| font-weight: bold; | |
| cursor: help; | |
| background-color: white; | |
| }} | |
| .addition-tooltip {{ | |
| color: {to_rgb(color_mapping["addition"])}; | |
| }} | |
| .modification-tooltip {{ | |
| color: {to_rgb(color_mapping["modification"])}; | |
| }} | |
| .deletion-tooltip {{ | |
| color: {to_rgb(color_mapping["deletion"])}; | |
| }} | |
| .default-tooltip {{ | |
| color: yellow; | |
| }} | |
| </style> | |
| """ | |
| def get_color_mapping_hex(): | |
| return {key: tuple(int(c * 255) for c in rgb) for key, rgb in color_mapping.items()} | |
| def get_tooltip_text(change): | |
| return ( | |
| change.type | |
| if hasattr(change, "type") | |
| else "Type unspecified" | |
| + " - " | |
| + (change.category if hasattr(change, "category") else "Category unspecified") | |
| + "\n" | |
| + (change.context if hasattr(change, "context") else "") | |
| ) | |
| def highlight_nth(text, change, skip_failed=False): | |
| n = change.occurrence_index if hasattr(change, "occurrence_index") else 0 | |
| target = re.sub(r"\\\s+", r".*?", change.text) | |
| # OPTIMIZATION: Compile regex once and find only up to n+1 matches (early exit) | |
| pattern = re.compile(target, flags=re.IGNORECASE | re.DOTALL) | |
| matches = [] | |
| for match in pattern.finditer(text): | |
| matches.append(match) | |
| if len(matches) > n: # Early exit - we have enough matches | |
| break | |
| if len(matches) > n: | |
| match = matches[n] | |
| start, end = match.start(), match.end() | |
| tooltip_raw = get_tooltip_text(change) | |
| tooltip_escaped = html.escape(tooltip_raw, quote=True) | |
| highlighted_span = f"""<span id='marked_section' class='tooltip {change.type if hasattr(change, "type") else "default"}-tooltip' title='{tooltip_escaped}'> | |
| {text[start:end]} | |
| </span>""" | |
| return text[:start] + highlighted_span + text[end:] | |
| else: | |
| return highlight_fuzzy_match(text, change, n, skip_failed=skip_failed) | |
| # TODO:check treshhold->51 would get always a result | |
| # if we make it lower we get guaranteed matches but they might be different from the original target, but if threshold is too high we might not find any match eg when a word is missing | |
| def highlight_fuzzy_match(text, change, n=0, threshold=80, skip_failed=False): | |
| target = change.text | |
| window_size = len(target) | |
| step = 1 | |
| candidates = [] | |
| for i in range(0, len(text) - window_size, step): | |
| window = text[i : i + window_size] | |
| score = fuzz.partial_ratio(window.lower(), target.lower()) | |
| if score >= threshold: | |
| candidates.append((score, i, i + window_size)) | |
| if not candidates and not skip_failed: | |
| return ( | |
| f""" | |
| <span class='hover-tooltip' title='No match found'> | |
| <strong style='color: red;'>No match found for: "{target}"</strong> | |
| <br> | |
| </span> | |
| <span style="color: red;">Please verify if it is part of the original text or if it was extracted incorrectly.</span><br> | |
| """ | |
| + text | |
| ) | |
| if not candidates and skip_failed: | |
| return text | |
| # Pick top-N match | |
| candidates.sort(reverse=True) | |
| _, start_norm, end_norm = candidates[min(n, len(candidates) - 1)] | |
| tooltip_raw = get_tooltip_text(change) | |
| tooltip_escaped = html.escape(tooltip_raw, quote=True) | |
| highlighted_span = f"""<span id='marked_section' class='tooltip {change.type if hasattr(change, "type") else "default"}-tooltip' title='{tooltip_escaped}'>{text[start_norm:end_norm]}</span>""" | |
| return text[:start_norm] + highlighted_span + text[end_norm:] | |
| # TODO:check treshhold->51 would get always a result | |
| # if we make it lower we get guaranteed matches but they might be different from the original target, but if threshold is too high we might not find any match eg when a word is missing | |
| def get_best_fuzzy_match(text, change: RegulatoryChange, threshold=65): | |
| """Find the best fuzzy match for a change in the text and return the matched section | |
| Caller needs to account for potentially None return value""" | |
| n = change.occurrence_index if hasattr(change, "occurrence_index") else 0 | |
| target = change.text | |
| window_size = len(target) | |
| step = 1 | |
| candidates = [] | |
| for i in range(0, len(text) - window_size, step): | |
| window = text[i : i + window_size] | |
| score = fuzz.partial_ratio(window.lower(), target.lower()) | |
| if score >= threshold: | |
| candidates.append((score, i, i + window_size)) | |
| if not candidates: | |
| return None | |
| # Pick top-N match | |
| candidates.sort(reverse=True) | |
| _, start_norm, end_norm = candidates[min(n, len(candidates) - 1)] | |
| return text[start_norm:end_norm] | |
| def render_prompt(text, include_nlp=False, preprocessed_data=None): | |
| classification_json = json.dumps(CLASSIFICATION_INFO, indent=2) | |
| few_shot_json = json.dumps(FEW_SHOT_EXAMPLES, indent=2) | |
| if include_nlp and preprocessed_data: | |
| chunk_entities = [ | |
| ent for ent in preprocessed_data["entities"] if ent["text"] in text | |
| ] | |
| chunk_nouns = [ | |
| nc for nc in preprocessed_data["noun_chunks"] if nc["text"] in text | |
| ] | |
| nlp_insights_json = json.dumps( | |
| {"entities": chunk_entities, "key_noun_phrases": chunk_nouns}, indent=2 | |
| ) | |
| nlp_section = ", and NLP insights" | |
| nlp_insights = f"\n\nNLP Insights:\n{nlp_insights_json}" | |
| evidence_block = ',\n "evidence": {\n "entities_involved": ["relevant named entities"],\n "key_phrases": ["relevant noun phrases or key terms"]\n }' | |
| else: | |
| nlp_section = "" | |
| nlp_insights = "" | |
| evidence_block = "" | |
| return BASE_PROMPT_TEMPLATE.format( | |
| classification_info=classification_json, | |
| few_shot_examples=few_shot_json, | |
| nlp_section=nlp_section, | |
| nlp_insights=nlp_insights, | |
| text=text, | |
| evidence_block=evidence_block, | |
| ) | |
| def save_json_to_file(data, output_dir, output_file): | |
| """Save the JSON data to a file and print the file path.""" | |
| # Create output directory if it doesn't exist | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| # Save JSON data to the specified file | |
| file_path = os.path.join(output_dir, output_file) | |
| with open(file_path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=4, ensure_ascii=False) | |
| # Print the location of the saved file | |
| print(f"JSON data saved successfully at: {file_path}") | |
| MICROSERVICE_KEY = os.getenv("MICROSERVICE_KEY") | |
| nlp_semaphore = asyncio.Semaphore(100) # Limit to 100 concurrent requests | |
| timeout = httpx.Timeout( | |
| connect=20.0, # time to establish connection | |
| read=60.0, # time to read the response | |
| write=30.0, # time to send the request | |
| pool=80.0, # time to acquire a connection from the pool | |
| ) | |
| async def call_nlp_service(payload, method, max_retries=5, base_delay=1.0): | |
| url = f"https://amougou-mbida-nlp-preprocessor.hf.space/{method}" | |
| headers = {"Authorization": f"Bearer {MICROSERVICE_KEY}"} | |
| async with nlp_semaphore: | |
| for attempt in range(max_retries): | |
| try: | |
| async with httpx.AsyncClient(timeout=timeout) as client: | |
| response = await client.post(url, data=payload, headers=headers) | |
| # Success | |
| if response.status_code == 200: | |
| return response.json() | |
| # Rate limited | |
| if response.status_code == 429: | |
| if attempt == max_retries - 1: | |
| break | |
| retry_after = response.headers.get("Retry-After") | |
| delay = ( | |
| float(retry_after) | |
| if retry_after | |
| else (base_delay * (2**attempt) + random.uniform(0, 0.5)) | |
| ) | |
| await asyncio.sleep(delay) | |
| continue | |
| # Other HTTP errors | |
| raise Exception( | |
| f"NLP service error: {response.status_code} - {response.text}" | |
| ) | |
| except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.NetworkError) as e: | |
| # Retry on network issues | |
| if attempt == max_retries - 1: | |
| raise Exception( | |
| f"NLP service network error after {max_retries} attempts: {e}" | |
| ) | |
| delay = base_delay * (2**attempt) + random.uniform(0, 0.5) | |
| await asyncio.sleep(delay) | |
| continue | |
| raise Exception(f"NLP service error: failed after {max_retries} retries") | |
| def lerp_color(value, start_color=(255, 0, 0), end_color=(0, 255, 0)): | |
| """ | |
| Linearly interpolate between start_color and end_color by value. | |
| Parameters: | |
| - value: float between 0 and 1 | |
| - start_color: tuple (r, g, b), default red | |
| - end_color: tuple (r, g, b), default green | |
| Returns: | |
| - CSS rgb color string, e.g. 'rgb(255, 0, 0)' | |
| """ | |
| r = int(start_color[0] + (end_color[0] - start_color[0]) * value) | |
| g = int(start_color[1] + (end_color[1] - start_color[1]) * value) | |
| b = int(start_color[2] + (end_color[2] - start_color[2]) * value) | |
| return f"rgb({r}, {g}, {b})" | |
| def extract_markdown(file_bytes: bytes) -> str: | |
| """Extract markdown text from PDF bytes using pymupdf4llm.""" | |
| return pymupdf4llm.to_markdown( | |
| pymupdf.open( | |
| stream=file_bytes, | |
| filetype="pdf", | |
| ) | |
| ) | |
| def remove_html_comments(text: str) -> str: | |
| clean_text = re.sub(r"<!--.*?-->", "", text, flags=re.DOTALL) | |
| return clean_text | |
| def normalize_markdown_indentation(content): | |
| """Normalize excessive indentation to prevent code block interpretation.""" | |
| lines = content.split("\n") | |
| normalized_lines = [] | |
| for line in lines: | |
| # Check if line is a list item with excessive indentation | |
| stripped = line.lstrip() | |
| if stripped.startswith(("-", "*", "+")): | |
| # Count leading spaces | |
| leading_spaces = len(line) - len(stripped) | |
| # Normalize to max 4 spaces for nested lists | |
| if leading_spaces > 4: | |
| # Convert to proper nested list (2 spaces per level) | |
| nest_level = min(leading_spaces // 6, 2) # Max 2 levels deep | |
| normalized_line = " " * nest_level + stripped | |
| normalized_lines.append(normalized_line) | |
| else: | |
| normalized_lines.append(line) | |
| else: | |
| normalized_lines.append(line) | |
| return "\n".join(normalized_lines) | |
| def highlight_differences_words(text1: str, text2: str): | |
| """ | |
| Return two HTML strings: highlighted version of text1 and text2. | |
| Highlights: | |
| - deletion-tooltip for words deleted from text1 => appear in highlighted_text1 only | |
| - addition-tooltip for words inserted into text2 => appear in highlighted_text2 only | |
| - modification-tooltip for words replaced (both sides) | |
| Preserves newlines. | |
| """ | |
| # Split into words and newlines, preserving newlines as tokens | |
| words1 = re.split(r"(\s+)", text1) | |
| words2 = re.split(r"(\s+)", text2) | |
| sm = difflib.SequenceMatcher(a=words1, b=words2, isjunk=lambda x: x in " \t") | |
| out1 = [] | |
| out2 = [] | |
| def esc(w): | |
| return html.escape(w) | |
| for tag, i1, i2, j1, j2 in sm.get_opcodes(): | |
| if tag == "equal": | |
| out1.extend([esc(w) for w in words1[i1:i2]]) | |
| out2.extend([esc(w) for w in words2[j1:j2]]) | |
| elif tag == "replace": | |
| out1.extend( | |
| [ | |
| f'<span class="tooltip modification-tooltip" title="Changed">{esc(w)}</span>' | |
| for w in words1[i1:i2] | |
| ] | |
| ) | |
| out2.extend( | |
| [ | |
| f'<span class="tooltip modification-tooltip" title="Changed">{esc(w)}</span>' | |
| for w in words2[j1:j2] | |
| ] | |
| ) | |
| elif tag == "delete": | |
| out1.extend( | |
| [ | |
| f'<span class="tooltip deletion-tooltip" title="Removed">{esc(w)}</span>' | |
| for w in words1[i1:i2] | |
| ] | |
| ) | |
| # deleted words are not added to out2 | |
| elif tag == "insert": | |
| out2.extend( | |
| [ | |
| f'<span class="tooltip addition-tooltip" title="Added">{esc(w)}</span>' | |
| for w in words2[j1:j2] | |
| ] | |
| ) | |
| # inserted words are not added to out1 | |
| highlighted_text1 = "".join(out1) | |
| highlighted_text2 = "".join(out2) | |
| return highlighted_text1, highlighted_text2 | |
| def map_categorical_impact_assessment( | |
| changes: list[RegulatoryChange], | |
| ) -> list[RegulatoryChange]: | |
| """Map categorical impact assessment actions based on changetype""" | |
| import copy | |
| action_map = { | |
| "Textual and Editorial Changes": { | |
| "actions": [ | |
| {"label": "Update documentation", "completed": False}, | |
| {"label": "Adjust UI wording", "completed": False}, | |
| {"label": "Inform stakeholders", "completed": False}, | |
| ], | |
| }, | |
| "Data and Field Changes": { | |
| "actions": [ | |
| {"label": "Add/modify fields", "completed": False}, | |
| {"label": "Create migration scripts", "completed": False}, | |
| {"label": "Update forms/APIs/test cases", "completed": False}, | |
| ], | |
| }, | |
| "Procedural Changes": { | |
| "actions": [ | |
| {"label": "Update process automation", "completed": False}, | |
| {"label": "Adjust workflow steps", "completed": False}, | |
| {"label": "Reassign roles or access", "completed": False}, | |
| ], | |
| }, | |
| "Compliance and Enforcement Changes": { | |
| "actions": [ | |
| {"label": "Implement logging or alerts", "completed": False}, | |
| {"label": "Update compliance documentation", "completed": False}, | |
| {"label": "Conduct internal review", "completed": False}, | |
| ], | |
| }, | |
| "Policy Changes": { | |
| "actions": [ | |
| {"label": "Adjust rule sets", "completed": False}, | |
| {"label": "Revalidate configurations", "completed": False}, | |
| {"label": "Run simulations or validations", "completed": False}, | |
| ], | |
| }, | |
| } | |
| # Compare if the action labels match; if not, replace with correct ones while preserving completion status | |
| for change in changes: | |
| if change.category in action_map: | |
| mapped_actions = action_map[change.category]["actions"] | |
| current_labels = [action.get("label") for action in change.actions] | |
| expected_labels = [action["label"] for action in mapped_actions] | |
| # Only update if the labels don't match | |
| # Create deep copies to prevent shared references across changes | |
| if current_labels != expected_labels: | |
| change.actions = copy.deepcopy(mapped_actions) | |
| # If labels match but user has different completion status, preserve their progress | |
| return changes | |
| def landing_ai_available() -> bool: | |
| """Check if we have Landing AI credits available.""" | |
| try: | |
| result = json.loads(parse("c".encode("utf-8"))[0].model_dump_json()) | |
| errors = result.get("errors", []) | |
| if errors: | |
| for error in errors: | |
| if "402" in error.get("error", ""): | |
| print("Landing AI credits exhausted.") | |
| return False | |
| return True | |
| except Exception as e: | |
| print(f"Error checking Landing AI credits: {e}") | |
| return False | |
| landing_ai_available_flag = landing_ai_available() | |
| extraction_methods = ( | |
| { | |
| "agentic": "Agentic (Landing AI)", | |
| "llm": "LLM (gpt-4o-mini)", | |
| "pymupdf": "PyMuPDF (PDF Parsing Library)", | |
| } | |
| if landing_ai_available_flag | |
| else { | |
| "pymupdf": "PyMuPDF (PDF Parsing Library)", | |
| "llm": "LLM (gpt-4o-mini)", | |
| } | |
| ) |