regulens / scripts /utility_functions.py
amougou-mbida's picture
Update scripts/utility_functions.py
37df49d verified
import difflib
import html
import os
import json
import re
import time
import random
import asyncio
import httpx
from dotenv import load_dotenv
import pymupdf
import pymupdf4llm
from rapidfuzz import fuzz
from agentic_doc.parse import parse
import requests
from scripts.models import RegulatoryChange
from scripts.regulatory_change_foundation import (
CLASSIFICATION_INFO,
FEW_SHOT_EXAMPLES,
BASE_PROMPT_TEMPLATE,
)
load_dotenv()
# Define hex colors as RGB tuples (0–1 range)
color_mapping_old = {
"addition": (0, 0.4, 0), # green
"deletion": (1, 0, 0), # red
"modification": (0, 0.6, 1), # blue
}
color_mapping = {
"addition": (0.0, 0.45, 0.7), # blue
"deletion": (0.9, 0.6, 0.0), # orange
"modification": (0.5, 0.5, 0.5), # gray
}
def to_rgb(color_tuple):
return f"rgb({int(color_tuple[0] * 255)}, {int(color_tuple[1] * 255)}, {int(color_tuple[2] * 255)})"
css_styles = f"""
<style>
.custom-link {{
display: inline-block;
padding: 8px 16px;
color: white !important;
text-decoration: none;
border-radius: 8px;
transition: background-color 0.3s ease;
}}
.custom-link:hover {{
background-color: #45a049;
}}
.tooltip {{
font-weight: bold;
cursor: help;
background-color: white;
}}
.addition-tooltip {{
color: {to_rgb(color_mapping["addition"])};
}}
.modification-tooltip {{
color: {to_rgb(color_mapping["modification"])};
}}
.deletion-tooltip {{
color: {to_rgb(color_mapping["deletion"])};
}}
.default-tooltip {{
color: yellow;
}}
</style>
"""
def get_color_mapping_hex():
return {key: tuple(int(c * 255) for c in rgb) for key, rgb in color_mapping.items()}
def get_tooltip_text(change):
return (
change.type
if hasattr(change, "type")
else "Type unspecified"
+ " - "
+ (change.category if hasattr(change, "category") else "Category unspecified")
+ "\n"
+ (change.context if hasattr(change, "context") else "")
)
def highlight_nth(text, change, skip_failed=False):
n = change.occurrence_index if hasattr(change, "occurrence_index") else 0
target = re.sub(r"\\\s+", r".*?", change.text)
# OPTIMIZATION: Compile regex once and find only up to n+1 matches (early exit)
pattern = re.compile(target, flags=re.IGNORECASE | re.DOTALL)
matches = []
for match in pattern.finditer(text):
matches.append(match)
if len(matches) > n: # Early exit - we have enough matches
break
if len(matches) > n:
match = matches[n]
start, end = match.start(), match.end()
tooltip_raw = get_tooltip_text(change)
tooltip_escaped = html.escape(tooltip_raw, quote=True)
highlighted_span = f"""<span id='marked_section' class='tooltip {change.type if hasattr(change, "type") else "default"}-tooltip' title='{tooltip_escaped}'>
{text[start:end]}
</span>"""
return text[:start] + highlighted_span + text[end:]
else:
return highlight_fuzzy_match(text, change, n, skip_failed=skip_failed)
# TODO:check treshhold->51 would get always a result
# if we make it lower we get guaranteed matches but they might be different from the original target, but if threshold is too high we might not find any match eg when a word is missing
def highlight_fuzzy_match(text, change, n=0, threshold=80, skip_failed=False):
target = change.text
window_size = len(target)
step = 1
candidates = []
for i in range(0, len(text) - window_size, step):
window = text[i : i + window_size]
score = fuzz.partial_ratio(window.lower(), target.lower())
if score >= threshold:
candidates.append((score, i, i + window_size))
if not candidates and not skip_failed:
return (
f"""
<span class='hover-tooltip' title='No match found'>
<strong style='color: red;'>No match found for: "{target}"</strong>
<br>
</span>
<span style="color: red;">Please verify if it is part of the original text or if it was extracted incorrectly.</span><br>
"""
+ text
)
if not candidates and skip_failed:
return text
# Pick top-N match
candidates.sort(reverse=True)
_, start_norm, end_norm = candidates[min(n, len(candidates) - 1)]
tooltip_raw = get_tooltip_text(change)
tooltip_escaped = html.escape(tooltip_raw, quote=True)
highlighted_span = f"""<span id='marked_section' class='tooltip {change.type if hasattr(change, "type") else "default"}-tooltip' title='{tooltip_escaped}'>{text[start_norm:end_norm]}</span>"""
return text[:start_norm] + highlighted_span + text[end_norm:]
# TODO:check treshhold->51 would get always a result
# if we make it lower we get guaranteed matches but they might be different from the original target, but if threshold is too high we might not find any match eg when a word is missing
def get_best_fuzzy_match(text, change: RegulatoryChange, threshold=65):
"""Find the best fuzzy match for a change in the text and return the matched section
Caller needs to account for potentially None return value"""
n = change.occurrence_index if hasattr(change, "occurrence_index") else 0
target = change.text
window_size = len(target)
step = 1
candidates = []
for i in range(0, len(text) - window_size, step):
window = text[i : i + window_size]
score = fuzz.partial_ratio(window.lower(), target.lower())
if score >= threshold:
candidates.append((score, i, i + window_size))
if not candidates:
return None
# Pick top-N match
candidates.sort(reverse=True)
_, start_norm, end_norm = candidates[min(n, len(candidates) - 1)]
return text[start_norm:end_norm]
def render_prompt(text, include_nlp=False, preprocessed_data=None):
classification_json = json.dumps(CLASSIFICATION_INFO, indent=2)
few_shot_json = json.dumps(FEW_SHOT_EXAMPLES, indent=2)
if include_nlp and preprocessed_data:
chunk_entities = [
ent for ent in preprocessed_data["entities"] if ent["text"] in text
]
chunk_nouns = [
nc for nc in preprocessed_data["noun_chunks"] if nc["text"] in text
]
nlp_insights_json = json.dumps(
{"entities": chunk_entities, "key_noun_phrases": chunk_nouns}, indent=2
)
nlp_section = ", and NLP insights"
nlp_insights = f"\n\nNLP Insights:\n{nlp_insights_json}"
evidence_block = ',\n "evidence": {\n "entities_involved": ["relevant named entities"],\n "key_phrases": ["relevant noun phrases or key terms"]\n }'
else:
nlp_section = ""
nlp_insights = ""
evidence_block = ""
return BASE_PROMPT_TEMPLATE.format(
classification_info=classification_json,
few_shot_examples=few_shot_json,
nlp_section=nlp_section,
nlp_insights=nlp_insights,
text=text,
evidence_block=evidence_block,
)
def save_json_to_file(data, output_dir, output_file):
"""Save the JSON data to a file and print the file path."""
# Create output directory if it doesn't exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Save JSON data to the specified file
file_path = os.path.join(output_dir, output_file)
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4, ensure_ascii=False)
# Print the location of the saved file
print(f"JSON data saved successfully at: {file_path}")
MICROSERVICE_KEY = os.getenv("MICROSERVICE_KEY")
nlp_semaphore = asyncio.Semaphore(100) # Limit to 100 concurrent requests
timeout = httpx.Timeout(
connect=20.0, # time to establish connection
read=60.0, # time to read the response
write=30.0, # time to send the request
pool=80.0, # time to acquire a connection from the pool
)
async def call_nlp_service(payload, method, max_retries=5, base_delay=1.0):
url = f"https://amougou-mbida-nlp-preprocessor.hf.space/{method}"
headers = {"Authorization": f"Bearer {MICROSERVICE_KEY}"}
async with nlp_semaphore:
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, data=payload, headers=headers)
# Success
if response.status_code == 200:
return response.json()
# Rate limited
if response.status_code == 429:
if attempt == max_retries - 1:
break
retry_after = response.headers.get("Retry-After")
delay = (
float(retry_after)
if retry_after
else (base_delay * (2**attempt) + random.uniform(0, 0.5))
)
await asyncio.sleep(delay)
continue
# Other HTTP errors
raise Exception(
f"NLP service error: {response.status_code} - {response.text}"
)
except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.NetworkError) as e:
# Retry on network issues
if attempt == max_retries - 1:
raise Exception(
f"NLP service network error after {max_retries} attempts: {e}"
)
delay = base_delay * (2**attempt) + random.uniform(0, 0.5)
await asyncio.sleep(delay)
continue
raise Exception(f"NLP service error: failed after {max_retries} retries")
def lerp_color(value, start_color=(255, 0, 0), end_color=(0, 255, 0)):
"""
Linearly interpolate between start_color and end_color by value.
Parameters:
- value: float between 0 and 1
- start_color: tuple (r, g, b), default red
- end_color: tuple (r, g, b), default green
Returns:
- CSS rgb color string, e.g. 'rgb(255, 0, 0)'
"""
r = int(start_color[0] + (end_color[0] - start_color[0]) * value)
g = int(start_color[1] + (end_color[1] - start_color[1]) * value)
b = int(start_color[2] + (end_color[2] - start_color[2]) * value)
return f"rgb({r}, {g}, {b})"
def extract_markdown(file_bytes: bytes) -> str:
"""Extract markdown text from PDF bytes using pymupdf4llm."""
return pymupdf4llm.to_markdown(
pymupdf.open(
stream=file_bytes,
filetype="pdf",
)
)
def remove_html_comments(text: str) -> str:
clean_text = re.sub(r"<!--.*?-->", "", text, flags=re.DOTALL)
return clean_text
def normalize_markdown_indentation(content):
"""Normalize excessive indentation to prevent code block interpretation."""
lines = content.split("\n")
normalized_lines = []
for line in lines:
# Check if line is a list item with excessive indentation
stripped = line.lstrip()
if stripped.startswith(("-", "*", "+")):
# Count leading spaces
leading_spaces = len(line) - len(stripped)
# Normalize to max 4 spaces for nested lists
if leading_spaces > 4:
# Convert to proper nested list (2 spaces per level)
nest_level = min(leading_spaces // 6, 2) # Max 2 levels deep
normalized_line = " " * nest_level + stripped
normalized_lines.append(normalized_line)
else:
normalized_lines.append(line)
else:
normalized_lines.append(line)
return "\n".join(normalized_lines)
def highlight_differences_words(text1: str, text2: str):
"""
Return two HTML strings: highlighted version of text1 and text2.
Highlights:
- deletion-tooltip for words deleted from text1 => appear in highlighted_text1 only
- addition-tooltip for words inserted into text2 => appear in highlighted_text2 only
- modification-tooltip for words replaced (both sides)
Preserves newlines.
"""
# Split into words and newlines, preserving newlines as tokens
words1 = re.split(r"(\s+)", text1)
words2 = re.split(r"(\s+)", text2)
sm = difflib.SequenceMatcher(a=words1, b=words2, isjunk=lambda x: x in " \t")
out1 = []
out2 = []
def esc(w):
return html.escape(w)
for tag, i1, i2, j1, j2 in sm.get_opcodes():
if tag == "equal":
out1.extend([esc(w) for w in words1[i1:i2]])
out2.extend([esc(w) for w in words2[j1:j2]])
elif tag == "replace":
out1.extend(
[
f'<span class="tooltip modification-tooltip" title="Changed">{esc(w)}</span>'
for w in words1[i1:i2]
]
)
out2.extend(
[
f'<span class="tooltip modification-tooltip" title="Changed">{esc(w)}</span>'
for w in words2[j1:j2]
]
)
elif tag == "delete":
out1.extend(
[
f'<span class="tooltip deletion-tooltip" title="Removed">{esc(w)}</span>'
for w in words1[i1:i2]
]
)
# deleted words are not added to out2
elif tag == "insert":
out2.extend(
[
f'<span class="tooltip addition-tooltip" title="Added">{esc(w)}</span>'
for w in words2[j1:j2]
]
)
# inserted words are not added to out1
highlighted_text1 = "".join(out1)
highlighted_text2 = "".join(out2)
return highlighted_text1, highlighted_text2
def map_categorical_impact_assessment(
changes: list[RegulatoryChange],
) -> list[RegulatoryChange]:
"""Map categorical impact assessment actions based on changetype"""
import copy
action_map = {
"Textual and Editorial Changes": {
"actions": [
{"label": "Update documentation", "completed": False},
{"label": "Adjust UI wording", "completed": False},
{"label": "Inform stakeholders", "completed": False},
],
},
"Data and Field Changes": {
"actions": [
{"label": "Add/modify fields", "completed": False},
{"label": "Create migration scripts", "completed": False},
{"label": "Update forms/APIs/test cases", "completed": False},
],
},
"Procedural Changes": {
"actions": [
{"label": "Update process automation", "completed": False},
{"label": "Adjust workflow steps", "completed": False},
{"label": "Reassign roles or access", "completed": False},
],
},
"Compliance and Enforcement Changes": {
"actions": [
{"label": "Implement logging or alerts", "completed": False},
{"label": "Update compliance documentation", "completed": False},
{"label": "Conduct internal review", "completed": False},
],
},
"Policy Changes": {
"actions": [
{"label": "Adjust rule sets", "completed": False},
{"label": "Revalidate configurations", "completed": False},
{"label": "Run simulations or validations", "completed": False},
],
},
}
# Compare if the action labels match; if not, replace with correct ones while preserving completion status
for change in changes:
if change.category in action_map:
mapped_actions = action_map[change.category]["actions"]
current_labels = [action.get("label") for action in change.actions]
expected_labels = [action["label"] for action in mapped_actions]
# Only update if the labels don't match
# Create deep copies to prevent shared references across changes
if current_labels != expected_labels:
change.actions = copy.deepcopy(mapped_actions)
# If labels match but user has different completion status, preserve their progress
return changes
def landing_ai_available() -> bool:
"""Check if we have Landing AI credits available."""
try:
result = json.loads(parse("c".encode("utf-8"))[0].model_dump_json())
errors = result.get("errors", [])
if errors:
for error in errors:
if "402" in error.get("error", ""):
print("Landing AI credits exhausted.")
return False
return True
except Exception as e:
print(f"Error checking Landing AI credits: {e}")
return False
landing_ai_available_flag = landing_ai_available()
extraction_methods = (
{
"agentic": "Agentic (Landing AI)",
"llm": "LLM (gpt-4o-mini)",
"pymupdf": "PyMuPDF (PDF Parsing Library)",
}
if landing_ai_available_flag
else {
"pymupdf": "PyMuPDF (PDF Parsing Library)",
"llm": "LLM (gpt-4o-mini)",
}
)