File size: 17,511 Bytes
e696463
cf5dbc0
755ec14
 
cf5dbc0
09a324c
 
 
 
 
4691828
 
cf5dbc0
bdc7d9a
ce77033
3fef715
755ec14
 
 
 
 
 
09a324c
cf5dbc0
5f5dd48
cf5dbc0
 
 
 
 
5f5dd48
 
 
e696463
5f5dd48
 
bdc7d9a
cf5dbc0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bdc7d9a
 
 
cf5dbc0
3fef715
cf5dbc0
3fef715
cf5dbc0
 
 
 
3fef715
 
09a324c
 
 
 
 
 
 
 
 
cf5dbc0
 
 
 
 
3fef715
cf5dbc0
 
 
 
 
 
 
 
 
ce77033
3fef715
cf5dbc0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3fef715
cf5dbc0
 
755ec14
ce77033
 
3fef715
5820bde
 
3fef715
 
ce77033
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
755ec14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ce77033
 
09a324c
 
 
 
 
 
 
 
ce77033
09a324c
 
37df49d
09a324c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e696463
 
7b92765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e696463
 
4691828
 
 
 
 
 
 
 
 
 
e696463
 
 
 
4691828
d6b760c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e696463
 
 
 
 
 
 
 
 
 
 
4691828
 
e696463
 
 
 
 
 
 
 
 
 
 
 
 
4691828
 
 
 
 
 
 
 
 
 
 
 
e696463
4691828
 
 
 
 
 
e696463
 
4691828
 
 
 
 
 
e696463
 
 
 
 
3fef715
 
bdc7d9a
 
 
3fef715
d6b760c
 
3fef715
bdc7d9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3fef715
 
 
 
 
 
bdc7d9a
3fef715
d6b760c
3fef715
d6b760c
3fef715
bdc7d9a
0259fee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
import difflib
import html
import os
import json
import re
import time
import random
import asyncio
import httpx
from dotenv import load_dotenv
import pymupdf
import pymupdf4llm
from rapidfuzz import fuzz
from agentic_doc.parse import parse
import requests
from scripts.models import RegulatoryChange
from scripts.regulatory_change_foundation import (
    CLASSIFICATION_INFO,
    FEW_SHOT_EXAMPLES,
    BASE_PROMPT_TEMPLATE,
)

load_dotenv()
# Define hex colors as RGB tuples (0–1 range)
color_mapping_old = {
    "addition": (0, 0.4, 0),  # green
    "deletion": (1, 0, 0),  # red
    "modification": (0, 0.6, 1),  # blue
}

color_mapping = {
    "addition": (0.0, 0.45, 0.7),  # blue
    "deletion": (0.9, 0.6, 0.0),  # orange
    "modification": (0.5, 0.5, 0.5),  # gray
}


def to_rgb(color_tuple):
    return f"rgb({int(color_tuple[0] * 255)}, {int(color_tuple[1] * 255)}, {int(color_tuple[2] * 255)})"


css_styles = f"""
    <style>
        .custom-link {{
            display: inline-block;
            padding: 8px 16px;
            color: white !important;
            text-decoration: none;
            border-radius: 8px;
            transition: background-color 0.3s ease;
        }}
        .custom-link:hover {{
            background-color: #45a049;
        }}
        .tooltip {{
            font-weight: bold;
            cursor: help;
            background-color: white;
        }}
        .addition-tooltip {{
            color: {to_rgb(color_mapping["addition"])};
        }}
        .modification-tooltip {{
            color: {to_rgb(color_mapping["modification"])};
        }}
        .deletion-tooltip {{
            color: {to_rgb(color_mapping["deletion"])};
        }}
        .default-tooltip {{
            color: yellow;
        }}
    </style>
    """


def get_color_mapping_hex():
    return {key: tuple(int(c * 255) for c in rgb) for key, rgb in color_mapping.items()}


def get_tooltip_text(change):
    return (
        change.type
        if hasattr(change, "type")
        else "Type unspecified"
        + " - "
        + (change.category if hasattr(change, "category") else "Category unspecified")
        + "\n"
        + (change.context if hasattr(change, "context") else "")
    )


def highlight_nth(text, change, skip_failed=False):
    n = change.occurrence_index if hasattr(change, "occurrence_index") else 0
    target = re.sub(r"\\\s+", r".*?", change.text)

    # OPTIMIZATION: Compile regex once and find only up to n+1 matches (early exit)
    pattern = re.compile(target, flags=re.IGNORECASE | re.DOTALL)
    matches = []
    for match in pattern.finditer(text):
        matches.append(match)
        if len(matches) > n:  # Early exit - we have enough matches
            break

    if len(matches) > n:
        match = matches[n]
        start, end = match.start(), match.end()
        tooltip_raw = get_tooltip_text(change)
        tooltip_escaped = html.escape(tooltip_raw, quote=True)
        highlighted_span = f"""<span id='marked_section' class='tooltip {change.type if hasattr(change, "type") else "default"}-tooltip' title='{tooltip_escaped}'>
    {text[start:end]}
</span>"""
        return text[:start] + highlighted_span + text[end:]
    else:
        return highlight_fuzzy_match(text, change, n, skip_failed=skip_failed)


# TODO:check treshhold->51 would get always a result
# if we make it lower we get guaranteed matches but they might be different from the original target, but if threshold is too high we might not find any match eg when a word is missing
def highlight_fuzzy_match(text, change, n=0, threshold=80, skip_failed=False):
    target = change.text
    window_size = len(target)
    step = 1

    candidates = []
    for i in range(0, len(text) - window_size, step):
        window = text[i : i + window_size]
        score = fuzz.partial_ratio(window.lower(), target.lower())
        if score >= threshold:
            candidates.append((score, i, i + window_size))

    if not candidates and not skip_failed:
        return (
            f"""
        <span class='hover-tooltip' title='No match found'>
        <strong style='color: red;'>No match found for: "{target}"</strong> 
        <br>
        </span>
        <span style="color: red;">Please verify if it is part of the original text or if it was extracted incorrectly.</span><br>
        """
            + text
        )
    if not candidates and skip_failed:
        return text
    # Pick top-N match
    candidates.sort(reverse=True)
    _, start_norm, end_norm = candidates[min(n, len(candidates) - 1)]

    tooltip_raw = get_tooltip_text(change)
    tooltip_escaped = html.escape(tooltip_raw, quote=True)
    highlighted_span = f"""<span id='marked_section' class='tooltip {change.type if hasattr(change, "type") else "default"}-tooltip' title='{tooltip_escaped}'>{text[start_norm:end_norm]}</span>"""
    return text[:start_norm] + highlighted_span + text[end_norm:]


# TODO:check treshhold->51 would get always a result
# if we make it lower we get guaranteed matches but they might be different from the original target, but if threshold is too high we might not find any match eg when a word is missing
def get_best_fuzzy_match(text, change: RegulatoryChange, threshold=65):
    """Find the best fuzzy match for a change in the text and return the matched section
    Caller needs to account for potentially None return value"""
    n = change.occurrence_index if hasattr(change, "occurrence_index") else 0
    target = change.text
    window_size = len(target)
    step = 1

    candidates = []
    for i in range(0, len(text) - window_size, step):
        window = text[i : i + window_size]
        score = fuzz.partial_ratio(window.lower(), target.lower())
        if score >= threshold:
            candidates.append((score, i, i + window_size))

    if not candidates:
        return None
    # Pick top-N match
    candidates.sort(reverse=True)
    _, start_norm, end_norm = candidates[min(n, len(candidates) - 1)]

    return text[start_norm:end_norm]


def render_prompt(text, include_nlp=False, preprocessed_data=None):
    classification_json = json.dumps(CLASSIFICATION_INFO, indent=2)
    few_shot_json = json.dumps(FEW_SHOT_EXAMPLES, indent=2)

    if include_nlp and preprocessed_data:
        chunk_entities = [
            ent for ent in preprocessed_data["entities"] if ent["text"] in text
        ]
        chunk_nouns = [
            nc for nc in preprocessed_data["noun_chunks"] if nc["text"] in text
        ]
        nlp_insights_json = json.dumps(
            {"entities": chunk_entities, "key_noun_phrases": chunk_nouns}, indent=2
        )

        nlp_section = ", and NLP insights"
        nlp_insights = f"\n\nNLP Insights:\n{nlp_insights_json}"
        evidence_block = ',\n            "evidence": {\n                "entities_involved": ["relevant named entities"],\n                "key_phrases": ["relevant noun phrases or key terms"]\n            }'
    else:
        nlp_section = ""
        nlp_insights = ""
        evidence_block = ""

    return BASE_PROMPT_TEMPLATE.format(
        classification_info=classification_json,
        few_shot_examples=few_shot_json,
        nlp_section=nlp_section,
        nlp_insights=nlp_insights,
        text=text,
        evidence_block=evidence_block,
    )


def save_json_to_file(data, output_dir, output_file):
    """Save the JSON data to a file and print the file path."""

    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Save JSON data to the specified file
    file_path = os.path.join(output_dir, output_file)
    with open(file_path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=4, ensure_ascii=False)

    # Print the location of the saved file
    print(f"JSON data saved successfully at: {file_path}")


MICROSERVICE_KEY = os.getenv("MICROSERVICE_KEY")
nlp_semaphore = asyncio.Semaphore(100)  # Limit to 100 concurrent requests
timeout = httpx.Timeout(
    connect=20.0,  # time to establish connection
    read=60.0,  # time to read the response
    write=30.0,  # time to send the request
    pool=80.0,  # time to acquire a connection from the pool
)


async def call_nlp_service(payload, method, max_retries=5, base_delay=1.0):
    url = f"https://amougou-mbida-nlp-preprocessor.hf.space/{method}"
    headers = {"Authorization": f"Bearer {MICROSERVICE_KEY}"}

    async with nlp_semaphore:
        for attempt in range(max_retries):
            try:
                async with httpx.AsyncClient(timeout=timeout) as client:
                    response = await client.post(url, data=payload, headers=headers)

                # Success
                if response.status_code == 200:
                    return response.json()

                # Rate limited
                if response.status_code == 429:
                    if attempt == max_retries - 1:
                        break
                    retry_after = response.headers.get("Retry-After")
                    delay = (
                        float(retry_after)
                        if retry_after
                        else (base_delay * (2**attempt) + random.uniform(0, 0.5))
                    )
                    await asyncio.sleep(delay)
                    continue

                # Other HTTP errors
                raise Exception(
                    f"NLP service error: {response.status_code} - {response.text}"
                )

            except (httpx.ConnectTimeout, httpx.ReadTimeout, httpx.NetworkError) as e:
                # Retry on network issues
                if attempt == max_retries - 1:
                    raise Exception(
                        f"NLP service network error after {max_retries} attempts: {e}"
                    )
                delay = base_delay * (2**attempt) + random.uniform(0, 0.5)
                await asyncio.sleep(delay)
                continue

    raise Exception(f"NLP service error: failed after {max_retries} retries")


def lerp_color(value, start_color=(255, 0, 0), end_color=(0, 255, 0)):
    """
    Linearly interpolate between start_color and end_color by value.

    Parameters:
    - value: float between 0 and 1
    - start_color: tuple (r, g, b), default red
    - end_color: tuple (r, g, b), default green

    Returns:
    - CSS rgb color string, e.g. 'rgb(255, 0, 0)'
    """
    r = int(start_color[0] + (end_color[0] - start_color[0]) * value)
    g = int(start_color[1] + (end_color[1] - start_color[1]) * value)
    b = int(start_color[2] + (end_color[2] - start_color[2]) * value)
    return f"rgb({r}, {g}, {b})"


def extract_markdown(file_bytes: bytes) -> str:
    """Extract markdown text from PDF bytes using pymupdf4llm."""
    return pymupdf4llm.to_markdown(
        pymupdf.open(
            stream=file_bytes,
            filetype="pdf",
        )
    )


def remove_html_comments(text: str) -> str:
    clean_text = re.sub(r"<!--.*?-->", "", text, flags=re.DOTALL)
    return clean_text


def normalize_markdown_indentation(content):
    """Normalize excessive indentation to prevent code block interpretation."""
    lines = content.split("\n")
    normalized_lines = []

    for line in lines:
        # Check if line is a list item with excessive indentation
        stripped = line.lstrip()
        if stripped.startswith(("-", "*", "+")):
            # Count leading spaces
            leading_spaces = len(line) - len(stripped)
            # Normalize to max 4 spaces for nested lists
            if leading_spaces > 4:
                # Convert to proper nested list (2 spaces per level)
                nest_level = min(leading_spaces // 6, 2)  # Max 2 levels deep
                normalized_line = "  " * nest_level + stripped
                normalized_lines.append(normalized_line)
            else:
                normalized_lines.append(line)
        else:
            normalized_lines.append(line)

    return "\n".join(normalized_lines)


def highlight_differences_words(text1: str, text2: str):
    """
    Return two HTML strings: highlighted version of text1 and text2.
    Highlights:
      - deletion-tooltip for words deleted from text1 => appear in highlighted_text1 only
      - addition-tooltip for words inserted into text2 => appear in highlighted_text2 only
      - modification-tooltip for words replaced (both sides)
    Preserves newlines.
    """

    # Split into words and newlines, preserving newlines as tokens
    words1 = re.split(r"(\s+)", text1)
    words2 = re.split(r"(\s+)", text2)

    sm = difflib.SequenceMatcher(a=words1, b=words2, isjunk=lambda x: x in " \t")
    out1 = []
    out2 = []

    def esc(w):
        return html.escape(w)

    for tag, i1, i2, j1, j2 in sm.get_opcodes():
        if tag == "equal":
            out1.extend([esc(w) for w in words1[i1:i2]])
            out2.extend([esc(w) for w in words2[j1:j2]])
        elif tag == "replace":
            out1.extend(
                [
                    f'<span class="tooltip modification-tooltip" title="Changed">{esc(w)}</span>'
                    for w in words1[i1:i2]
                ]
            )
            out2.extend(
                [
                    f'<span class="tooltip modification-tooltip" title="Changed">{esc(w)}</span>'
                    for w in words2[j1:j2]
                ]
            )
        elif tag == "delete":
            out1.extend(
                [
                    f'<span class="tooltip deletion-tooltip" title="Removed">{esc(w)}</span>'
                    for w in words1[i1:i2]
                ]
            )
            # deleted words are not added to out2
        elif tag == "insert":
            out2.extend(
                [
                    f'<span class="tooltip addition-tooltip" title="Added">{esc(w)}</span>'
                    for w in words2[j1:j2]
                ]
            )
            # inserted words are not added to out1

    highlighted_text1 = "".join(out1)
    highlighted_text2 = "".join(out2)
    return highlighted_text1, highlighted_text2


def map_categorical_impact_assessment(
    changes: list[RegulatoryChange],
) -> list[RegulatoryChange]:
    """Map categorical impact assessment actions based on changetype"""
    import copy

    action_map = {
        "Textual and Editorial Changes": {
            "actions": [
                {"label": "Update documentation", "completed": False},
                {"label": "Adjust UI wording", "completed": False},
                {"label": "Inform stakeholders", "completed": False},
            ],
        },
        "Data and Field Changes": {
            "actions": [
                {"label": "Add/modify fields", "completed": False},
                {"label": "Create migration scripts", "completed": False},
                {"label": "Update forms/APIs/test cases", "completed": False},
            ],
        },
        "Procedural Changes": {
            "actions": [
                {"label": "Update process automation", "completed": False},
                {"label": "Adjust workflow steps", "completed": False},
                {"label": "Reassign roles or access", "completed": False},
            ],
        },
        "Compliance and Enforcement Changes": {
            "actions": [
                {"label": "Implement logging or alerts", "completed": False},
                {"label": "Update compliance documentation", "completed": False},
                {"label": "Conduct internal review", "completed": False},
            ],
        },
        "Policy Changes": {
            "actions": [
                {"label": "Adjust rule sets", "completed": False},
                {"label": "Revalidate configurations", "completed": False},
                {"label": "Run simulations or validations", "completed": False},
            ],
        },
    }
    # Compare if the action labels match; if not, replace with correct ones while preserving completion status
    for change in changes:
        if change.category in action_map:
            mapped_actions = action_map[change.category]["actions"]
            current_labels = [action.get("label") for action in change.actions]
            expected_labels = [action["label"] for action in mapped_actions]

            # Only update if the labels don't match
            # Create deep copies to prevent shared references across changes
            if current_labels != expected_labels:
                change.actions = copy.deepcopy(mapped_actions)
            # If labels match but user has different completion status, preserve their progress
    return changes

def landing_ai_available() -> bool:
    """Check if we have Landing AI credits available."""
    try:
        result = json.loads(parse("c".encode("utf-8"))[0].model_dump_json())
        errors = result.get("errors", [])
        if errors:
            for error in errors:
                if "402" in error.get("error", ""):
                    print("Landing AI credits exhausted.")
                    return False
        return True

    except Exception as e:
        print(f"Error checking Landing AI credits: {e}")
        return False


landing_ai_available_flag = landing_ai_available()
extraction_methods = (
    {
        "agentic": "Agentic (Landing AI)",
        "llm": "LLM (gpt-4o-mini)",
        "pymupdf": "PyMuPDF (PDF Parsing Library)",
    }
    if landing_ai_available_flag
    else {
        "pymupdf": "PyMuPDF (PDF Parsing Library)",
        "llm": "LLM (gpt-4o-mini)",
    }
)