Spaces:
Running
Running
| import base64 | |
| import pymupdf | |
| import pymupdf4llm | |
| # from agentic_doc.parse import parse | |
| from scripts.models import RegulatoryChange | |
| from scripts.llm_nlp_preprocessing import llm_regulatory_change_detector | |
| from scripts.llm_no_nlp_preprocessing import ( | |
| llm_regulatory_change_detector_without_nlp_insights, | |
| ) | |
| from scripts.pymupdf_nlp_preprocessing import ( | |
| pymupdf_regulatory_change_detector_with_nlp_insights, | |
| ) | |
| from scripts.pymupdf_no_nlp_preprocessing import ( | |
| pymupdf_regulatory_change_detector_without_nlp_insights, | |
| ) | |
| from scripts.pdf_text_extractor import ( | |
| create_hierarchical_structure_by_llm, | |
| create_hierarchical_structure_by_pymupdf, | |
| ) | |
| # Define hex colors as RGB tuples (0–1 range) | |
| color_mapping = { | |
| "addition": (0, 1, 0), # green | |
| "deletion": (1, 0, 0), # red | |
| "modification": (0, 0.6, 1), # blue | |
| } | |
| def add_infos_to_pdf(doc, analysis_summary, extraction_method, do_nlp_preprocessing): | |
| """ | |
| Doc is edited in place. | |
| Adds metadata to the PDF document. | |
| Adds a summary of the analysis to the first page of the PDF. | |
| :param doc: The PyMuPDF document object. | |
| :type doc: pymupdf.Document | |
| :param analysis_summary: The summary of the analysis results. | |
| :type analysis_summary: dict | |
| :param extraction_method: The method used for text extraction from the PDF. Options are "PyMuPDF" or "LLM". | |
| :type extraction_method: str | |
| :param do_nlp_preprocessing: Flag indicating whether NLP preprocessing was used. | |
| :type do_nlp_preprocessing: bool | |
| """ | |
| changes_by_type = analysis_summary.get("changes_by_type", {}) | |
| additions = changes_by_type.get("addition") or changes_by_type.get("additions") or 0 | |
| deletions = changes_by_type.get("deletion") or changes_by_type.get("deletions") or 0 | |
| modifications = ( | |
| changes_by_type.get("modification") or changes_by_type.get("modifications") or 0 | |
| ) | |
| summary_text = ( | |
| "Regulatory Summary:\n" | |
| f"- Extraction Method: {extraction_method}, NLP Preprocessing: {'yes' if do_nlp_preprocessing else 'no'}\n" | |
| f"- Total Changes: {analysis_summary.get('total_changes_detected', '0')}, Successful Annotations: {analysis_summary.get('successful_annotations', '0')}\n" | |
| f"- Additions: {additions}\n" | |
| f"- Deletions: {deletions}\n" | |
| f"- Modifications: {modifications}\n" | |
| ) | |
| page = doc.load_page(0) | |
| rect = pymupdf.Rect(10, 10, 550, 150) | |
| page.insert_textbox( | |
| rect, | |
| summary_text, | |
| fontsize=9, | |
| fontname="helv", | |
| align=pymupdf.TEXT_ALIGN_LEFT, | |
| color=(0, 0, 0.7), | |
| overlay=True, | |
| ) | |
| metadata = doc.metadata | |
| metadata["title"] = "Annotated " + ( | |
| metadata["title"] if metadata["title"] else "PDF" | |
| ) | |
| metadata["author"] = "Fortiss Regulatory Change Detector" + ( | |
| " & " + metadata["author"] if metadata["author"] else "" | |
| ) | |
| metadata["subject"] = "Annotated PDF with regulatory changes" | |
| metadata["keywords"] = "regulatory, changes, annotations, pdf" | |
| doc.set_metadata(metadata) | |
| def add_failed_annotations_to_pdf(doc, failed_annotations): | |
| """ | |
| Doc is edited in place. | |
| Adds failed annotations to the end of the PDF document. | |
| :param doc: The PyMuPDF document object. | |
| :type doc: pymupdf.Document | |
| :param failed_annotations: The failed annotations to be added. | |
| :type failed_annotations: array | |
| """ | |
| if not failed_annotations: | |
| return | |
| page = doc.new_page(pno=-1) | |
| annotation_str = "Failed Annotations:\n" | |
| for failed_annotation in failed_annotations: | |
| text = failed_annotation["change"]["relevant_text"] | |
| change_type = failed_annotation["change"]["change_type"] | |
| change_str = failed_annotation["change"]["change"] | |
| page_num = failed_annotation["page"] | |
| annotation_str += ( | |
| f"Page {page_num}: {text} ({change_type}) Change: {change_str}\n" | |
| ) | |
| rect = pymupdf.Rect(20, 20, 580, 822) | |
| page.insert_textbox( | |
| rect, | |
| annotation_str, | |
| fontsize=9, | |
| fontname="helv", | |
| align=pymupdf.TEXT_ALIGN_LEFT, | |
| color=(0, 0, 0.7), | |
| ) | |
| def get_data_dict_pymupdf(pdf_input: str, do_nlp_preprocessing: bool = True, progress_callback=None, status_callback=None): | |
| try: | |
| if status_callback: | |
| status_callback("Extracting document structure...") | |
| pymupdf_structure = create_hierarchical_structure_by_pymupdf(pdf_input) | |
| except Exception as e: | |
| raise Exception(f"Error extracting text from PDF: {e}") | |
| try: | |
| if status_callback: | |
| status_callback("Detecting regulatory changes...") | |
| if do_nlp_preprocessing: | |
| data_dict, _ = pymupdf_regulatory_change_detector_with_nlp_insights( | |
| pymupdf_structure, progress_callback, status_callback | |
| ) | |
| else: | |
| data_dict, _ = pymupdf_regulatory_change_detector_without_nlp_insights( | |
| pymupdf_structure, progress_callback, status_callback | |
| ) | |
| return data_dict | |
| except Exception as e: | |
| raise Exception(f"Error querying the pymupdf: {e}") | |
| def extract_document_pymupdf( | |
| uploaded_document: bytes, do_nlp_preprocessing=True, progress_callback=None, status_callback=None | |
| ) -> tuple[list[RegulatoryChange], str]: | |
| data = get_data_dict_pymupdf(uploaded_document, do_nlp_preprocessing, progress_callback, status_callback) | |
| if not data: | |
| return [], "" | |
| flattened_changes = [] | |
| for page_num_str, changes in data.get("changes_by_page", {}).items(): | |
| for change in changes: | |
| flattened_changes.append( | |
| { | |
| "text": change.get("relevant_text", ""), | |
| "validated": False, | |
| "confirmed": False, | |
| "reviewed": False, | |
| "category": change.get("change", ""), | |
| "type": change.get("change_type", ""), | |
| "context": change.get("explanation", ""), | |
| "grounding": [{"page": int(page_num_str), "line": -1}], | |
| } | |
| ) | |
| # convert to class | |
| flattened_changes = [ | |
| RegulatoryChange.from_dict(change) for change in flattened_changes | |
| ] | |
| markdown = pymupdf4llm.to_markdown( | |
| pymupdf.open(stream=uploaded_document, filetype="pdf") | |
| ) | |
| return flattened_changes, markdown | |
| def pymupdf_pdf_annotator(pdf_path, do_nlp_preprocessing=True, progress_callback=None, status_callback=None): | |
| """ | |
| Annotates a PDF document by applying highlights and comments based on the changes | |
| it gets from querying the llm with nlp preprocessing. | |
| The text is extracted using PyMuPDF. | |
| The annotations involve identifying specific text passages within the PDF and assigning an appropriate highlight color and comment | |
| based on the change type (addition, deletion, or modification). | |
| :param pdf_path: The file path to the PDF document that will be annotated. | |
| :type pdf_path: str | |
| :param do_nlp_preprocessing: Flag indicating whether to use NLP preprocessing for text extraction. Default is True. | |
| :type do_nlp_preprocessing: bool | |
| :param progress_callback: Optional callback function to report progress (0.0 to 1.0) | |
| :type progress_callback: callable | |
| :param status_callback: Optional callback function to report status messages | |
| :type status_callback: callable | |
| :return: Base64-encoded string of the annotated PDF document suitable for embedding in HTML. | |
| :rtype: str | |
| """ | |
| try: | |
| doc = pymupdf.open(pdf_path) | |
| except Exception as e: | |
| raise Exception(f"Error opening PDF file: {e}") | |
| data = get_data_dict_pymupdf(pdf_path, do_nlp_preprocessing, progress_callback, status_callback) | |
| if not data: | |
| raise Exception("No data found in the PDF document. Please check the file.") | |
| successful_annotations = 0 | |
| failed_annotations = [] | |
| for page_num_str, changes in data.get("changes_by_page", {}).items(): | |
| page_num = int(page_num_str) | |
| doc_page = doc.load_page(page_num - 1) | |
| # Sort by length of relevant_text in descending order to avoid overlapping highlights | |
| changes = sorted(changes, key=lambda c: -len(c["relevant_text"])) | |
| annotated_areas = [] | |
| for change in changes: | |
| text = change["relevant_text"] | |
| change_type = change["change_type"] | |
| change_str = change["change"] | |
| comment = change["explanation"] | |
| # Search for the relevant text on the page | |
| results = doc_page.search_for(text) | |
| # we only want the results that do not overlap with already annotated areas | |
| results = list( | |
| filter( | |
| lambda result: not any( | |
| result.intersects(area) for area in annotated_areas | |
| ), | |
| results, | |
| ) | |
| ) | |
| if not results: | |
| print( | |
| f"No non-overlapping match found on page {page_num} for: '{text}'" | |
| ) | |
| failed_annotations.append({"change": change, "page": page_num}) | |
| continue | |
| color = color_mapping.get(change_type, (1, 1, 0)) | |
| annotated_areas.append(results[0]) | |
| highlight = doc_page.add_highlight_annot(results[0]) | |
| highlight.set_colors({"stroke": color}) | |
| highlight.set_info( | |
| info={ | |
| "title": "Comment", | |
| "content": f"{change_type} - {change_str}\n{comment}", | |
| "name": change_type, | |
| } | |
| ) | |
| highlight.update() | |
| successful_annotations += 1 | |
| # if the resulting rects contain anything other than our search text we know it is a multiline highlight because for each line | |
| # we will have a new result rect. We need to check if the text in the rect is not equal to our search text but is inside of it | |
| # TODO test with multiple instances of multiline text on same page | |
| for result in results[1:]: | |
| resulttext = doc_page.get_textbox(result) | |
| if ( | |
| (resulttext.strip() != text.strip()) | |
| & (resulttext.strip() in text.strip()) | |
| & (not any(result.intersects(area) for area in annotated_areas)) | |
| ): | |
| highlight = doc_page.add_highlight_annot(result) | |
| highlight.set_colors({"stroke": color}) | |
| highlight.update() | |
| annotated_areas.append(result) | |
| data["analysis_summary"]["successful_annotations"] = successful_annotations | |
| add_infos_to_pdf(doc, data["analysis_summary"], "PyMuPDF", do_nlp_preprocessing) | |
| add_failed_annotations_to_pdf(doc, failed_annotations) | |
| base64_pdf = base64.b64encode(doc.tobytes()).decode("utf-8") | |
| doc.saveIncr() | |
| doc.close() | |
| return base64_pdf | |
| def extract_document_llm( | |
| uploaded_document: bytes, do_nlp_preprocessing=True, progress_callback=None, status_callback=None | |
| ) -> tuple[list[RegulatoryChange], str]: | |
| try: | |
| if status_callback: | |
| status_callback("Extracting document structure...") | |
| llm_structure = create_hierarchical_structure_by_llm(uploaded_document) | |
| except Exception as e: | |
| raise Exception(f"Error extracting text from PDF: {e}") | |
| try: | |
| if status_callback: | |
| status_callback("Detecting regulatory changes...") | |
| if do_nlp_preprocessing: | |
| data_dict = llm_regulatory_change_detector(llm_structure, progress_callback, status_callback) | |
| else: | |
| data_dict = llm_regulatory_change_detector_without_nlp_insights( | |
| llm_structure, progress_callback, status_callback | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error querying the LLM: {e}") | |
| data = data_dict | |
| flattened_changes = [] | |
| for _, changes in data.get("results", {}).items(): | |
| for change in changes: | |
| flattened_changes.append( | |
| { | |
| "text": change.get("relevant_text", ""), | |
| "validated": False, | |
| "confirmed": False, | |
| "reviewed": False, | |
| "category": change.get("change", ""), | |
| "type": change.get("change_type", ""), | |
| "context": change.get("explanation", ""), | |
| "grounding": [{"page": -1, "line": -1}], | |
| } | |
| ) | |
| # convert to class | |
| flattened_changes = [ | |
| RegulatoryChange.from_dict(change) for change in flattened_changes | |
| ] | |
| markdown = pymupdf4llm.to_markdown( | |
| pymupdf.open(stream=uploaded_document, filetype="pdf") | |
| ) | |
| return flattened_changes, markdown | |
| def llm_pdf_annotator(pdf_path, do_nlp_preprocessing=True, progress_callback=None, status_callback=None): | |
| """ | |
| Annotates a PDF document by applying highlights and comments based on the changes | |
| it gets from querying the llm with nlp preprocessing. | |
| The text is extracted uing an LLM. | |
| The annotations involve identifying specific text passages within the PDF and assigning an appropriate highlight color and comment | |
| based on the change type (addition, deletion, or modification). | |
| :param pdf_path: The file path to the PDF document that will be annotated. | |
| :type pdf_path: str | |
| :param do_nlp_preprocessing: Flag indicating whether to use NLP preprocessing for text extraction. Default is True. | |
| :type do_nlp_preprocessing: bool | |
| :param progress_callback: Optional callback function to report progress (0.0 to 1.0) | |
| :type progress_callback: callable | |
| :param status_callback: Optional callback function to report status messages | |
| :type status_callback: callable | |
| :return: Base64-encoded string of the annotated PDF document suitable for embedding in HTML. | |
| :rtype: str | |
| """ | |
| try: | |
| doc = pymupdf.open(pdf_path) | |
| except Exception as e: | |
| raise Exception(f"Error opening PDF file: {e}") | |
| try: | |
| if status_callback: | |
| status_callback("Extracting document structure...") | |
| llm_structure = create_hierarchical_structure_by_llm(pdf_path) | |
| except Exception as e: | |
| raise Exception(f"Error extracting text from PDF: {e}") | |
| try: | |
| if status_callback: | |
| status_callback("Detecting regulatory changes...") | |
| if do_nlp_preprocessing: | |
| data_dict = llm_regulatory_change_detector(llm_structure, progress_callback, status_callback) | |
| else: | |
| data_dict = llm_regulatory_change_detector_without_nlp_insights( | |
| llm_structure, progress_callback, status_callback | |
| ) | |
| except Exception as e: | |
| raise Exception(f"Error querying the LLM: {e}") | |
| data = data_dict | |
| successful_annotations = 0 | |
| failed_annotations = [] | |
| for _, changes in data.get("results", {}).items(): | |
| # Sort by length of relevant_text in descending order to avoid overlapping highlights | |
| changes = sorted(changes, key=lambda c: -len(c["relevant_text"])) | |
| annotated_areas = [] | |
| for change in changes: | |
| text = change["relevant_text"] | |
| change_type = change["change_type"] | |
| comment = change["explanation"] | |
| change_str = change["change"] | |
| results = [] | |
| # search entire document for the text because we dont have the page index in the llm output | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| text_instances = page.search_for(text) | |
| for inst in text_instances: | |
| results.append({"page": page_num, "bbox": inst}) | |
| # we only want the results that do not overlap with already annotated areas | |
| results = list( | |
| filter( | |
| lambda result: not any( | |
| result["bbox"].intersects(area) for area in annotated_areas | |
| ), | |
| results, | |
| ) | |
| ) | |
| if not results: | |
| print( | |
| f"No non-overlapping match found on page {page_num} for: '{text}'" | |
| ) | |
| failed_annotations.append({"change": change, "page": page_num}) | |
| continue | |
| color = color_mapping.get(change_type, (1, 1, 0)) | |
| ## we only want the first result because we will add highlights for each line of the multiline text | |
| doc_page = doc.load_page(results[0]["page"]) | |
| bbox = results[0]["bbox"] | |
| annotated_areas.append(bbox) | |
| highlight = doc_page.add_highlight_annot(bbox) | |
| highlight.set_colors({"stroke": color}) | |
| highlight.set_info( | |
| info={ | |
| "title": "Comment", | |
| "content": f"{change_type} - {change_str}\n{comment}", | |
| "name": change_type, | |
| } | |
| ) | |
| highlight.update() | |
| successful_annotations += 1 | |
| # if the resulting rects contain anything other than our search text we know it is a multiline highlight because for each line | |
| # we will have a new result rect. We need to check if the text in the rect is not equal to our search text but is inside of it | |
| for result in results[1:]: | |
| resulttext = doc_page.get_textbox(bbox) | |
| if ( | |
| (resulttext.strip() != text.strip()) | |
| & (resulttext.strip() in text.strip()) | |
| & ( | |
| not any( | |
| result["bbox"].intersects(area) for area in annotated_areas | |
| ) | |
| ) | |
| ): | |
| highlight = doc_page.add_highlight_annot(result["bbox"]) | |
| highlight.set_colors({"stroke": color}) | |
| highlight.update() | |
| annotated_areas.append(result["bbox"]) | |
| data["analysis_summary"]["successful_annotations"] = successful_annotations | |
| add_infos_to_pdf(doc, data["analysis_summary"], "LLM", do_nlp_preprocessing) | |
| add_failed_annotations_to_pdf(doc, failed_annotations) | |
| base64_pdf = base64.b64encode(doc.tobytes()).decode("utf-8") | |
| doc.saveIncr() | |
| doc.close() | |
| return base64_pdf | |