Spaces:
Running
Running
| import pymupdf | |
| from scripts.models import RegulatoryChange | |
| from scripts.utility_functions import color_mapping, get_best_fuzzy_match | |
| def add_infos_to_pdf_agentic(doc, changes, successful_annotations, extraction_method="Landing AI", nlp_preprocessing=True): | |
| type_counts = { | |
| "addition": 0, | |
| "modification": 0, | |
| "deletion": 0, | |
| "unspecified": 0, | |
| } | |
| for change in changes: | |
| change_type = change.type if hasattr(change, "type") else "unspecified" | |
| if change_type in type_counts: | |
| type_counts[change_type] += 1 | |
| else: | |
| type_counts["unspecified"] += 1 | |
| summary_text = ( | |
| "Regulatory Summary:\n" | |
| f"- Extraction Method: {extraction_method}\n" | |
| f"- Nlp preprocessing: {'yes' if nlp_preprocessing else 'no'}\n" | |
| f"- Total Changes: {len(changes)}, Successful Annotations: {successful_annotations}\n" | |
| f"- Additions: {type_counts.get('addition', 0)}\n" | |
| f"- Deletions: {type_counts.get('deletion', 0)}\n" | |
| f"- Modifications: {type_counts.get('modification', 0)}\n" | |
| ) | |
| page = doc.load_page(0) | |
| rect = pymupdf.Rect(10, 10, 550, 150) | |
| page.insert_textbox( | |
| rect, | |
| summary_text, | |
| fontsize=9, | |
| fontname="helv", | |
| align=pymupdf.TEXT_ALIGN_LEFT, | |
| color=(0, 0, 0.7), | |
| overlay=True, | |
| ) | |
| metadata = doc.metadata | |
| metadata["title"] = "Annotated " + ( | |
| metadata["title"] if metadata["title"] else "PDF" | |
| ) | |
| metadata["author"] = "Fortiss ReguLens" + ( | |
| " & " + metadata["author"] if metadata["author"] else "" | |
| ) | |
| metadata["subject"] = "Annotated PDF with regulatory changes" | |
| metadata["keywords"] = "regulatory, changes, annotations, pdf" | |
| doc.set_metadata(metadata) | |
| def add_failed_annotations_to_pdf_agentic(doc, failed_annotations): | |
| """ | |
| Doc is edited in place. | |
| Adds failed annotations to the end of the PDF document. | |
| :param doc: The PyMuPDF document object. | |
| :type doc: pymupdf.Document | |
| :param failed_annotations: The failed annotations to be added. | |
| :type failed_annotations: array | |
| """ | |
| if not failed_annotations: | |
| return | |
| page = doc.new_page(pno=-1) | |
| annotation_str = "Failed Annotations:\n" | |
| for failed_annotation in failed_annotations: | |
| text = failed_annotation["change"].text | |
| change_type = failed_annotation["change"].type | |
| change_str = failed_annotation["change"].category | |
| page_num = failed_annotation["page"] | |
| annotation_str += ( | |
| f"Page {page_num}: {text} ({change_type}) Change: {change_str}\n" | |
| ) | |
| rect = pymupdf.Rect(20, 20, 580, 822) | |
| page.insert_textbox( | |
| rect, | |
| annotation_str, | |
| fontsize=9, | |
| fontname="helv", | |
| align=pymupdf.TEXT_ALIGN_LEFT, | |
| color=(0, 0, 0.7), | |
| ) | |
| def agentic_pdf_annotator(changes: list[RegulatoryChange], file_bytes, extraction_method="Landing AI", nlp_preprocessing=True): | |
| changes = [ | |
| c for c in changes if c.confirmed and c.validated | |
| ] | |
| if not changes: | |
| return "" | |
| successful_annotations = 0 | |
| failed_annotations = [] | |
| try: | |
| doc = pymupdf.open(stream=file_bytes, filetype="pdf") | |
| except Exception as e: | |
| return "" | |
| # Sort by length of relevant_text in descending order to avoid overlapping highlights | |
| changes = sorted(changes, key=lambda c: -len(c.text)) | |
| annotated_areas = {} | |
| # OPTIMIZATION: Pre-cache all pages and their text content | |
| page_cache = {} | |
| page_text_cache = {} | |
| full_text = "" | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| page_cache[page_num] = page | |
| page_text = page.get_text() | |
| page_text_cache[page_num] = page_text | |
| full_text += page_text | |
| for change in changes: | |
| page_num = int(change.grounding[0].page) | |
| text = change.text | |
| change_type = change.type | |
| change_str = change.category | |
| comment = change.context | |
| if page_num < 0 or page_num >= len(doc): | |
| results = [] | |
| for pnr in range(len(doc)): # search all pages | |
| annotated_areas.setdefault(f"{pnr}", []) | |
| page = page_cache[pnr] # Use cached page | |
| text_instances = page.search_for(text) | |
| for inst in text_instances: | |
| page_num = pnr# remove? | |
| results.append({"page": pnr, "bbox": inst}) | |
| results = list( | |
| filter( | |
| lambda result: not any( | |
| result["bbox"].intersects(area) | |
| for area in annotated_areas[f"{result['page']}"] | |
| ), | |
| results, | |
| ) | |
| ) | |
| if not results: | |
| best_match = get_best_fuzzy_match(full_text, change) | |
| if best_match and len(best_match) > 0: | |
| print("found best fuzzy match: ", best_match) | |
| for page_num in range(len(doc)): # search all pages | |
| page = page_cache[page_num] # Use cached page | |
| text_instances = page.search_for(best_match) | |
| for inst in text_instances: | |
| results.append({"page": page_num, "bbox": inst}) | |
| # we only want the results that do not overlap with already annotated areas | |
| results = list( | |
| filter( | |
| lambda result: not any( | |
| result["bbox"].intersects(area) | |
| for area in annotated_areas[f"{result['page']}"] | |
| ), | |
| results, | |
| ) | |
| ) | |
| if results: # "flattenning" the results | |
| page_num = results[0]["page"] | |
| doc_page = page_cache[page_num] # Use cached page | |
| results = [r["bbox"] for r in results if r["page"] == page_num] | |
| else: | |
| doc_page = page_cache[page_num] # Use cached page | |
| annotated_areas.setdefault(f"{page_num}", []) | |
| # Search for the relevant text on the page | |
| results = doc_page.search_for(text) | |
| # we only want the results that do not overlap with already annotated areas | |
| results = list( | |
| filter( | |
| lambda result: not any( | |
| result.intersects(area) | |
| for area in annotated_areas[f"{page_num}"] | |
| ), | |
| results, | |
| ) | |
| ) | |
| if not results: | |
| best_match = get_best_fuzzy_match( | |
| page_text_cache[page_num], change # Use cached text | |
| ) | |
| if best_match and len(best_match) > 0: | |
| results = doc_page.search_for(best_match) | |
| print("found best fuzzy match: ", best_match) | |
| # we only want the results that do not overlap with already annotated areas | |
| results = list( | |
| filter( | |
| lambda result: not any( | |
| result.intersects(area) | |
| for area in annotated_areas[f"{page_num}"] | |
| ), | |
| results, | |
| ) | |
| ) | |
| if not results: | |
| print(f"No non-overlapping match found on page {page_num} for: '{text}'") | |
| failed_annotations.append({"change": change, "page": page_num}) | |
| continue | |
| color = color_mapping.get(change_type, (1, 1, 0)) | |
| annotated_areas[f"{page_num}"].append(results[0]) | |
| highlight = doc_page.add_highlight_annot(results[0]) | |
| highlight.set_colors({"stroke": color}) | |
| highlight.set_info( | |
| info={ | |
| "title": "Comment", | |
| "content": f"{change_type} - {change_str}\n{comment}", | |
| "name": change_type, | |
| } | |
| ) | |
| highlight.update() | |
| successful_annotations += 1 | |
| # if the resulting rects contain anything other than our search text we know it is a multiline highlight because for each line | |
| # we will have a new result rect. We need to check if the text in the rect is not equal to our search text but is inside of it | |
| for result in results[1:]: | |
| resulttext = doc_page.get_textbox(result) | |
| if ( | |
| (resulttext.strip() != text.strip()) | |
| & (resulttext.strip() in text.strip()) | |
| # & ( | |
| # not any( | |
| # result.intersects(area) | |
| # for area in annotated_areas[f"{page_num}"] | |
| # ) | |
| # ) | |
| ): | |
| highlight = doc_page.add_highlight_annot(result) | |
| highlight.set_colors({"stroke": color}) | |
| highlight.update() | |
| annotated_areas[f"{page_num}"].append(result) | |
| add_infos_to_pdf_agentic(doc, changes, successful_annotations, extraction_method, nlp_preprocessing) | |
| add_failed_annotations_to_pdf_agentic(doc, failed_annotations) | |
| result_bytes = doc.tobytes() | |
| return result_bytes | |