|
|
import html |
|
|
import os |
|
|
import json |
|
|
import re |
|
|
from rapidfuzz import fuzz |
|
|
import requests |
|
|
from scripts.regulatory_change_foundation import ( |
|
|
CLASSIFICATION_INFO, |
|
|
FEW_SHOT_EXAMPLES, |
|
|
BASE_PROMPT_TEMPLATE, |
|
|
) |
|
|
|
|
|
|
|
|
color_mapping = { |
|
|
"addition": (0, 0.4, 0), |
|
|
"deletion": (1, 0, 0), |
|
|
"modification": (0, 0.6, 1), |
|
|
} |
|
|
|
|
|
|
|
|
def to_rgb(color_tuple): |
|
|
return f"rgb({int(color_tuple[0] * 255)}, {int(color_tuple[1] * 255)}, {int(color_tuple[2] * 255)})" |
|
|
|
|
|
|
|
|
css_styles = f""" |
|
|
<style> |
|
|
.custom-link {{ |
|
|
display: inline-block; |
|
|
padding: 8px 16px; |
|
|
color: white !important; |
|
|
text-decoration: none; |
|
|
border-radius: 8px; |
|
|
transition: background-color 0.3s ease; |
|
|
}} |
|
|
.custom-link:hover {{ |
|
|
background-color: #45a049; |
|
|
}} |
|
|
.tooltip {{ |
|
|
font-weight: bold; |
|
|
cursor: help; |
|
|
background-color: white; |
|
|
}} |
|
|
.addition-tooltip {{ |
|
|
color: {to_rgb(color_mapping["addition"])}; |
|
|
}} |
|
|
.modification-tooltip {{ |
|
|
color: {to_rgb(color_mapping["modification"])}; |
|
|
}} |
|
|
.deletion-tooltip {{ |
|
|
color: {to_rgb(color_mapping["deletion"])}; |
|
|
}} |
|
|
.default-tooltip {{ |
|
|
color: yellow; |
|
|
}} |
|
|
</style> |
|
|
""" |
|
|
|
|
|
|
|
|
def get_color_mapping_hex(): |
|
|
return {key: tuple(int(c * 255) for c in rgb) for key, rgb in color_mapping.items()} |
|
|
|
|
|
|
|
|
def get_tooltip_text(change): |
|
|
return ( |
|
|
change.get("type", "Type unspecified") |
|
|
+ " - " |
|
|
+ change.get("category", "Category unspecified") |
|
|
+ "\n" |
|
|
+ change.get("context", "") |
|
|
) |
|
|
|
|
|
|
|
|
def highlight_nth(text, change, skip_failed=False): |
|
|
n = change.get("occurrence_index", 0) |
|
|
target = re.sub(r"\\\s+", r".*?", change["text"]) |
|
|
matches = list(re.finditer(target, text, flags=re.IGNORECASE | re.DOTALL)) |
|
|
if len(matches) > n: |
|
|
match = matches[n] |
|
|
start, end = match.start(), match.end() |
|
|
tooltip_raw = get_tooltip_text(change) |
|
|
tooltip_escaped = html.escape(tooltip_raw, quote=True) |
|
|
highlighted_span = f"""<span id='marked_section' class='tooltip {change.get("type", "default")}-tooltip' title='{tooltip_escaped}'> |
|
|
{text[start:end]} |
|
|
</span>""" |
|
|
return text[:start] + highlighted_span + text[end:] |
|
|
else: |
|
|
return highlight_fuzzy_match(text, change, n, skip_failed=skip_failed) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def highlight_fuzzy_match(text, change, n=0, threshold=80, skip_failed=False): |
|
|
target = change["text"] |
|
|
window_size = len(target) |
|
|
step = 1 |
|
|
|
|
|
candidates = [] |
|
|
for i in range(0, len(text) - window_size, step): |
|
|
window = text[i : i + window_size] |
|
|
score = fuzz.partial_ratio(window.lower(), target.lower()) |
|
|
if score >= threshold: |
|
|
candidates.append((score, i, i + window_size)) |
|
|
|
|
|
if not candidates and not skip_failed: |
|
|
return ( |
|
|
f""" |
|
|
<span class='hover-tooltip' title='No match found'> |
|
|
<strong style='color: red;'>No match found for: "{target}"</strong> |
|
|
<br> |
|
|
</span> |
|
|
<span style="color: red;">Please verify if it is part of the original text or if it was extracted incorrectly.</span><br> |
|
|
""" |
|
|
+ text |
|
|
) |
|
|
if not candidates and skip_failed: |
|
|
return text |
|
|
|
|
|
candidates.sort(reverse=True) |
|
|
_, start_norm, end_norm = candidates[min(n, len(candidates) - 1)] |
|
|
|
|
|
tooltip_raw = get_tooltip_text(change) |
|
|
tooltip_escaped = html.escape(tooltip_raw, quote=True) |
|
|
highlighted_span = f"""<span id='marked_section' class='tooltip {change.get("type", "default")}-tooltip' title='{tooltip_escaped}'>{text[start_norm:end_norm]}</span>""" |
|
|
return text[:start_norm] + highlighted_span + text[end_norm:] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_best_fuzzy_match(text, change, threshold=65): |
|
|
"""Find the best fuzzy match for a change in the text and return the matched section |
|
|
Caller needs to account for potentially None return value""" |
|
|
n = change.get("occurrence_index", 0) |
|
|
target = change["text"] |
|
|
window_size = len(target) |
|
|
step = 1 |
|
|
|
|
|
candidates = [] |
|
|
for i in range(0, len(text) - window_size, step): |
|
|
window = text[i : i + window_size] |
|
|
score = fuzz.partial_ratio(window.lower(), target.lower()) |
|
|
if score >= threshold: |
|
|
candidates.append((score, i, i + window_size)) |
|
|
|
|
|
if not candidates: |
|
|
return None |
|
|
|
|
|
candidates.sort(reverse=True) |
|
|
_, start_norm, end_norm = candidates[min(n, len(candidates) - 1)] |
|
|
|
|
|
return text[start_norm:end_norm] |
|
|
|
|
|
|
|
|
def render_prompt(text, include_nlp=False, preprocessed_data=None): |
|
|
classification_json = json.dumps(CLASSIFICATION_INFO, indent=2) |
|
|
few_shot_json = json.dumps(FEW_SHOT_EXAMPLES, indent=2) |
|
|
|
|
|
if include_nlp and preprocessed_data: |
|
|
chunk_entities = [ |
|
|
ent for ent in preprocessed_data["entities"] if ent["text"] in text |
|
|
] |
|
|
chunk_nouns = [ |
|
|
nc for nc in preprocessed_data["noun_chunks"] if nc["text"] in text |
|
|
] |
|
|
nlp_insights_json = json.dumps( |
|
|
{"entities": chunk_entities, "key_noun_phrases": chunk_nouns}, indent=2 |
|
|
) |
|
|
|
|
|
nlp_section = ", and NLP insights" |
|
|
nlp_insights = f"\n\nNLP Insights:\n{nlp_insights_json}" |
|
|
evidence_block = ',\n "evidence": {\n "entities_involved": ["relevant named entities"],\n "key_phrases": ["relevant noun phrases or key terms"]\n }' |
|
|
else: |
|
|
nlp_section = "" |
|
|
nlp_insights = "" |
|
|
evidence_block = "" |
|
|
|
|
|
return BASE_PROMPT_TEMPLATE.format( |
|
|
classification_info=classification_json, |
|
|
few_shot_examples=few_shot_json, |
|
|
nlp_section=nlp_section, |
|
|
nlp_insights=nlp_insights, |
|
|
text=text, |
|
|
evidence_block=evidence_block, |
|
|
) |
|
|
|
|
|
|
|
|
def save_json_to_file(data, output_dir, output_file): |
|
|
"""Save the JSON data to a file and print the file path.""" |
|
|
|
|
|
|
|
|
if not os.path.exists(output_dir): |
|
|
os.makedirs(output_dir) |
|
|
|
|
|
|
|
|
file_path = os.path.join(output_dir, output_file) |
|
|
with open(file_path, "w", encoding="utf-8") as f: |
|
|
json.dump(data, f, indent=4, ensure_ascii=False) |
|
|
|
|
|
|
|
|
print(f"JSON data saved successfully at: {file_path}") |
|
|
|
|
|
|
|
|
def call_nlp_service(payload, method): |
|
|
url = f"https://amougou-fortiss-nlp-preprocessor.hf.space/{method}" |
|
|
|
|
|
|
|
|
response = requests.post(url, data=payload) |
|
|
if response.status_code == 200: |
|
|
return response.json() |
|
|
else: |
|
|
raise Exception(f"NLP service error: {response.status_code} - {response.text}") |
|
|
|
|
|
def lerp_color(value, start_color=(255, 0, 0), end_color=(0, 255, 0)): |
|
|
""" |
|
|
Linearly interpolate between start_color and end_color by value. |
|
|
|
|
|
Parameters: |
|
|
- value: float between 0 and 1 |
|
|
- start_color: tuple (r, g, b), default red |
|
|
- end_color: tuple (r, g, b), default green |
|
|
|
|
|
Returns: |
|
|
- CSS rgb color string, e.g. 'rgb(255, 0, 0)' |
|
|
""" |
|
|
r = int(start_color[0] + (end_color[0] - start_color[0]) * value) |
|
|
g = int(start_color[1] + (end_color[1] - start_color[1]) * value) |
|
|
b = int(start_color[2] + (end_color[2] - start_color[2]) * value) |
|
|
return f"rgb({r}, {g}, {b})" |
|
|
|