import difflib import html import os import json import re import time import random import asyncio import httpx from dotenv import load_dotenv import pymupdf import pymupdf4llm from rapidfuzz import fuzz from agentic_doc.parse import parse import requests from scripts.models import RegulatoryChange from scripts.regulatory_change_foundation import ( CLASSIFICATION_INFO, FEW_SHOT_EXAMPLES, BASE_PROMPT_TEMPLATE, ) load_dotenv() # Define hex colors as RGB tuples (0–1 range) color_mapping_old = { "addition": (0, 0.4, 0), # green "deletion": (1, 0, 0), # red "modification": (0, 0.6, 1), # blue } color_mapping = { "addition": (0.0, 0.45, 0.7), # blue "deletion": (0.9, 0.6, 0.0), # orange "modification": (0.5, 0.5, 0.5), # gray } def to_rgb(color_tuple): return f"rgb({int(color_tuple[0] * 255)}, {int(color_tuple[1] * 255)}, {int(color_tuple[2] * 255)})" css_styles = f""" """ def get_color_mapping_hex(): return {key: tuple(int(c * 255) for c in rgb) for key, rgb in color_mapping.items()} def get_tooltip_text(change): return ( change.type if hasattr(change, "type") else "Type unspecified" + " - " + (change.category if hasattr(change, "category") else "Category unspecified") + "\n" + (change.context if hasattr(change, "context") else "") ) def highlight_nth(text, change, skip_failed=False): n = change.occurrence_index if hasattr(change, "occurrence_index") else 0 target = re.sub(r"\\\s+", r".*?", change.text) # OPTIMIZATION: Compile regex once and find only up to n+1 matches (early exit) pattern = re.compile(target, flags=re.IGNORECASE | re.DOTALL) matches = [] for match in pattern.finditer(text): matches.append(match) if len(matches) > n: # Early exit - we have enough matches break if len(matches) > n: match = matches[n] start, end = match.start(), match.end() tooltip_raw = get_tooltip_text(change) tooltip_escaped = html.escape(tooltip_raw, quote=True) highlighted_span = f""" {text[start:end]} """ return text[:start] + highlighted_span + text[end:] else: return highlight_fuzzy_match(text, change, n, skip_failed=skip_failed) # TODO:check treshhold->51 would get always a result # if we make it lower we get guaranteed matches but they might be different from the original target, but if threshold is too high we might not find any match eg when a word is missing def highlight_fuzzy_match(text, change, n=0, threshold=80, skip_failed=False): target = change.text window_size = len(target) step = 1 candidates = [] for i in range(0, len(text) - window_size, step): window = text[i : i + window_size] score = fuzz.partial_ratio(window.lower(), target.lower()) if score >= threshold: candidates.append((score, i, i + window_size)) if not candidates and not skip_failed: return ( f""" No match found for: "{target}"
Please verify if it is part of the original text or if it was extracted incorrectly.
""" + text ) if not candidates and skip_failed: return text # Pick top-N match candidates.sort(reverse=True) _, start_norm, end_norm = candidates[min(n, len(candidates) - 1)] tooltip_raw = get_tooltip_text(change) tooltip_escaped = html.escape(tooltip_raw, quote=True) highlighted_span = f"""{text[start_norm:end_norm]}""" return text[:start_norm] + highlighted_span + text[end_norm:] # TODO:check treshhold->51 would get always a result # if we make it lower we get guaranteed matches but they might be different from the original target, but if threshold is too high we might not find any match eg when a word is missing def get_best_fuzzy_match(text, change: RegulatoryChange, threshold=65): """Find the best fuzzy match for a change in the text and return the matched section Caller needs to account for potentially None return value""" n = change.occurrence_index if hasattr(change, "occurrence_index") else 0 target = change.text window_size = len(target) step = 1 candidates = [] for i in range(0, len(text) - window_size, step): window = text[i : i + window_size] score = fuzz.partial_ratio(window.lower(), target.lower()) if score >= threshold: candidates.append((score, i, i + window_size)) if not candidates: return None # Pick top-N match candidates.sort(reverse=True) _, start_norm, end_norm = candidates[min(n, len(candidates) - 1)] return text[start_norm:end_norm] def render_prompt(text, include_nlp=False, preprocessed_data=None): classification_json = json.dumps(CLASSIFICATION_INFO, indent=2) few_shot_json = json.dumps(FEW_SHOT_EXAMPLES, indent=2) if include_nlp and preprocessed_data: chunk_entities = [ ent for ent in preprocessed_data["entities"] if ent["text"] in text ] chunk_nouns = [ nc for nc in preprocessed_data["noun_chunks"] if nc["text"] in text ] nlp_insights_json = json.dumps( {"entities": chunk_entities, "key_noun_phrases": chunk_nouns}, indent=2 ) nlp_section = ", and NLP insights" nlp_insights = f"\n\nNLP Insights:\n{nlp_insights_json}" evidence_block = ',\n "evidence": {\n "entities_involved": ["relevant named entities"],\n "key_phrases": ["relevant noun phrases or key terms"]\n }' else: nlp_section = "" nlp_insights = "" evidence_block = "" return BASE_PROMPT_TEMPLATE.format( classification_info=classification_json, few_shot_examples=few_shot_json, nlp_section=nlp_section, nlp_insights=nlp_insights, text=text, evidence_block=evidence_block, ) def save_json_to_file(data, output_dir, output_file): """Save the JSON data to a file and print the file path.""" # Create output directory if it doesn't exist if not os.path.exists(output_dir): os.makedirs(output_dir) # Save JSON data to the specified file file_path = os.path.join(output_dir, output_file) with open(file_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=4, ensure_ascii=False) # Print the location of the saved file print(f"JSON data saved successfully at: {file_path}") MICROSERVICE_KEY = os.getenv("MICROSERVICE_KEY") nlp_semaphore = asyncio.Semaphore(100) # Limit to 100 concurrent requests timeout = httpx.Timeout( connect=20.0, # time to establish connection read=60.0, # time to read the response write=30.0, # time to send the request pool=80.0, # time to acquire a connection from the pool ) async def call_nlp_service(payload, method, max_retries=5, base_delay=1.0): url = f"https://amougou-mbida-nlp-preprocessor.hf.space/{method}" headers = {"Authorization": f"Bearer {MICROSERVICE_KEY}"} async with nlp_semaphore: for attempt in range(max_retries): try: async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post(url, data=payload, headers=headers) # Success if response.status_code == 200: return response.json() # Rate limited if response.status_code == 429: if attempt == max_retries - 1: break retry_after = response.headers.get("Retry-After") delay = ( float(retry_after) if retry_after else (base_delay * (2**attempt) + random.uniform(0, 0.5)) ) await asyncio.sleep(delay) continue # Other HTTP errors raise Exception( f"NLP service error: {response.status_code} - {response.text}" ) except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.NetworkError) as e: # Retry on network issues if attempt == max_retries - 1: raise Exception( f"NLP service network error after {max_retries} attempts: {e}" ) delay = base_delay * (2**attempt) + random.uniform(0, 0.5) await asyncio.sleep(delay) continue raise Exception(f"NLP service error: failed after {max_retries} retries") def lerp_color(value, start_color=(255, 0, 0), end_color=(0, 255, 0)): """ Linearly interpolate between start_color and end_color by value. Parameters: - value: float between 0 and 1 - start_color: tuple (r, g, b), default red - end_color: tuple (r, g, b), default green Returns: - CSS rgb color string, e.g. 'rgb(255, 0, 0)' """ r = int(start_color[0] + (end_color[0] - start_color[0]) * value) g = int(start_color[1] + (end_color[1] - start_color[1]) * value) b = int(start_color[2] + (end_color[2] - start_color[2]) * value) return f"rgb({r}, {g}, {b})" def extract_markdown(file_bytes: bytes) -> str: """Extract markdown text from PDF bytes using pymupdf4llm.""" return pymupdf4llm.to_markdown( pymupdf.open( stream=file_bytes, filetype="pdf", ) ) def remove_html_comments(text: str) -> str: clean_text = re.sub(r"", "", text, flags=re.DOTALL) return clean_text def normalize_markdown_indentation(content): """Normalize excessive indentation to prevent code block interpretation.""" lines = content.split("\n") normalized_lines = [] for line in lines: # Check if line is a list item with excessive indentation stripped = line.lstrip() if stripped.startswith(("-", "*", "+")): # Count leading spaces leading_spaces = len(line) - len(stripped) # Normalize to max 4 spaces for nested lists if leading_spaces > 4: # Convert to proper nested list (2 spaces per level) nest_level = min(leading_spaces // 6, 2) # Max 2 levels deep normalized_line = " " * nest_level + stripped normalized_lines.append(normalized_line) else: normalized_lines.append(line) else: normalized_lines.append(line) return "\n".join(normalized_lines) def highlight_differences_words(text1: str, text2: str): """ Return two HTML strings: highlighted version of text1 and text2. Highlights: - deletion-tooltip for words deleted from text1 => appear in highlighted_text1 only - addition-tooltip for words inserted into text2 => appear in highlighted_text2 only - modification-tooltip for words replaced (both sides) Preserves newlines. """ # Split into words and newlines, preserving newlines as tokens words1 = re.split(r"(\s+)", text1) words2 = re.split(r"(\s+)", text2) sm = difflib.SequenceMatcher(a=words1, b=words2, isjunk=lambda x: x in " \t") out1 = [] out2 = [] def esc(w): return html.escape(w) for tag, i1, i2, j1, j2 in sm.get_opcodes(): if tag == "equal": out1.extend([esc(w) for w in words1[i1:i2]]) out2.extend([esc(w) for w in words2[j1:j2]]) elif tag == "replace": out1.extend( [ f'{esc(w)}' for w in words1[i1:i2] ] ) out2.extend( [ f'{esc(w)}' for w in words2[j1:j2] ] ) elif tag == "delete": out1.extend( [ f'{esc(w)}' for w in words1[i1:i2] ] ) # deleted words are not added to out2 elif tag == "insert": out2.extend( [ f'{esc(w)}' for w in words2[j1:j2] ] ) # inserted words are not added to out1 highlighted_text1 = "".join(out1) highlighted_text2 = "".join(out2) return highlighted_text1, highlighted_text2 def map_categorical_impact_assessment( changes: list[RegulatoryChange], ) -> list[RegulatoryChange]: """Map categorical impact assessment actions based on changetype""" import copy action_map = { "Textual and Editorial Changes": { "actions": [ {"label": "Update documentation", "completed": False}, {"label": "Adjust UI wording", "completed": False}, {"label": "Inform stakeholders", "completed": False}, ], }, "Data and Field Changes": { "actions": [ {"label": "Add/modify fields", "completed": False}, {"label": "Create migration scripts", "completed": False}, {"label": "Update forms/APIs/test cases", "completed": False}, ], }, "Procedural Changes": { "actions": [ {"label": "Update process automation", "completed": False}, {"label": "Adjust workflow steps", "completed": False}, {"label": "Reassign roles or access", "completed": False}, ], }, "Compliance and Enforcement Changes": { "actions": [ {"label": "Implement logging or alerts", "completed": False}, {"label": "Update compliance documentation", "completed": False}, {"label": "Conduct internal review", "completed": False}, ], }, "Policy Changes": { "actions": [ {"label": "Adjust rule sets", "completed": False}, {"label": "Revalidate configurations", "completed": False}, {"label": "Run simulations or validations", "completed": False}, ], }, } # Compare if the action labels match; if not, replace with correct ones while preserving completion status for change in changes: if change.category in action_map: mapped_actions = action_map[change.category]["actions"] current_labels = [action.get("label") for action in change.actions] expected_labels = [action["label"] for action in mapped_actions] # Only update if the labels don't match # Create deep copies to prevent shared references across changes if current_labels != expected_labels: change.actions = copy.deepcopy(mapped_actions) # If labels match but user has different completion status, preserve their progress return changes def landing_ai_available() -> bool: """Check if we have Landing AI credits available.""" try: result = json.loads(parse("c".encode("utf-8"))[0].model_dump_json()) errors = result.get("errors", []) if errors: for error in errors: if "402" in error.get("error", ""): print("Landing AI credits exhausted.") return False return True except Exception as e: print(f"Error checking Landing AI credits: {e}") return False landing_ai_available_flag = landing_ai_available() extraction_methods = ( { "agentic": "Agentic (Landing AI)", "llm": "LLM (gpt-4o-mini)", "pymupdf": "PyMuPDF (PDF Parsing Library)", } if landing_ai_available_flag else { "pymupdf": "PyMuPDF (PDF Parsing Library)", "llm": "LLM (gpt-4o-mini)", } )