Spaces:
Sleeping
Sleeping
File size: 12,133 Bytes
5c08ef5 2b51034 4c19d6c 2b51034 5c08ef5 b9ae2ff 5ad4e9d acee22d 5c08ef5 5ad4e9d 2b51034 8822f53 acee22d 6e4819e 2b51034 94d30f1 2b51034 6e4819e acee22d 5ad4e9d 6e4819e 5ad4e9d 8822f53 0cb4c94 5ad4e9d 0cb4c94 a40fdc8 17db344 0cb4c94 a40fdc8 0cb4c94 a40fdc8 0cb4c94 a40fdc8 6e4819e a40fdc8 6e4819e a40fdc8 dde2ff7 6e4819e dde2ff7 a40fdc8 6e4819e 91704ec a40fdc8 b9ae2ff 6e4819e 5ad4e9d 0cb4c94 5ad4e9d 2b51034 6e4819e 0cb4c94 dde2ff7 a40fdc8 dde2ff7 a40fdc8 dde2ff7 5ad4e9d dde2ff7 2b51034 5ad4e9d 2b51034 91704ec a40fdc8 91704ec 5ad4e9d 91704ec 2b51034 6e4819e 2b51034 b9ae2ff 2b51034 5ad4e9d 2b51034 dde2ff7 2b51034 6e4819e 2b51034 acee22d 2b51034 acee22d 6e4819e acee22d 5ad4e9d acee22d 2b51034 6e4819e acee22d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
from transformers import LayoutLMv3Tokenizer, LayoutLMv3ForTokenClassification, LayoutLMv3ImageProcessor
import torch
from PIL import Image
import fitz # PyMuPDF
from typing import Dict, List
import os
import re
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load pre-trained LayoutLMv3 models
try:
tokenizer = LayoutLMv3Tokenizer.from_pretrained("microsoft/layoutlmv3-base")
feature_extractor = LayoutLMv3ImageProcessor(apply_ocr=False)
model = LayoutLMv3ForTokenClassification.from_pretrained("microsoft/layoutlmv3-base")
logger.info("LayoutLMv3 models loaded successfully.")
except Exception as e:
logger.error(f"Failed to load LayoutLMv3 models: {str(e)}")
def extract_key_values_with_layoutlm(page_data: list, pdf_path: str) -> Dict[str, str]:
"""
Extract key-value pairs from PDF text using LayoutLMv3-base with focus on Agreement Name,
Agreement Start Date, Agreement End Date, and Total Agreement Value, with regex fallback.
Args:
page_data (list): List of dictionaries with 'text' (str), 'words' (list of str),
'bbox' (list of [x0, y0, x1, y1] normalized to 0-1000), and 'image_dims' ([width, height]) per page.
pdf_path (str): Path to the PDF file.
Returns:
dict: Key-value pairs extracted from the document focusing on specified fields.
"""
key_values = {
"Agreement Name": "Unknown",
"Agreement Start Date": "",
"Agreement End Date": "",
"Total Agreement Value": ""
}
try:
# Fallback to regex using concatenated text from all pages
text_data = " ".join([page.get("text", "") for page in page_data])
logger.info("Starting regex-based extraction.")
# Refined regex patterns for required fields, avoiding misidentification
name_context = re.findall(r'(?:Agreement\s+Name|Contract\s+Title|Agreement\s+Title)\s*[:\s]*([A-Za-z0-9\s]+?)(?=\s*(?:Exhibit|\n\n|\Z))', text_data, re.IGNORECASE)
if name_context:
key_values["Agreement Name"] = next((name.strip() for name in name_context if len(name.split()) > 1 and "MASTER SUBSCRIPTION AGREEMENT" not in name.upper() and "Customer" not in name), "Unknown")
else:
# Fallback to infer name from context, avoiding single party names
party_match = re.search(r'(?:between\s+([A-Za-z\s]+)\s+and\s+([A-Za-z\s]+))', text_data, re.IGNORECASE)
if party_match:
key_values["Agreement Name"] = f"{party_match.group(1).strip()} and {party_match.group(2).strip()}" if party_match.group(2) else "Unknown"
# Enhanced date patterns to capture "executed as of" and other date contexts
date_patterns = [
r'(?:Agreement\s+Start\s+Date|Effective\s+Date|executed\s+as\s+of)\s*[:\s]*(\d{1,2}/\d{1,2}/\d{2,4})',
r'(?:Agreement\s+End\s+Date|Termination\s+Date)\s*[:\s]*(\d{1,2}/\d{1,2}/\d{2,4})'
]
for pattern in date_patterns:
matches = re.findall(pattern, text_data, re.IGNORECASE)
if matches:
key, value = ("Agreement Start Date", matches[0]) if "start" in pattern.lower() or "effective" in pattern.lower() or "executed" in pattern.lower() else ("Agreement End Date", matches[0])
if value and not key_values.get(key):
key_values[key] = value
# Improved amount pattern to capture total value context
amount_pattern = r'(?:Total\s+Agreement\s+Value|Total\s+Amount|Contract\s+Value|List\s+Price)\s*[:\s]*\$?\d{1,3}(?:,\d{3})*(?:\.\d{2})?'
amounts = re.findall(amount_pattern, text_data, re.IGNORECASE)
if amounts:
key_values["Total Agreement Value"] = next((amt.split(":")[-1].strip() if ":" in amt else amt.strip() for amt in amounts if any(k.lower() in amt.lower() for k in ["total", "value", "price"])), "")
# Attempt LayoutLMv3 processing for enhanced extraction
if all([tokenizer, feature_extractor, model]):
doc = fitz.open(pdf_path)
for page_num, page_info in enumerate(page_data):
if not page_info.get("text", "").strip() or "No text detected" in page_info.get("text", ""):
continue
page = doc[page_num]
pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72)) # 300 DPI
img_path = f"{pdf_path}_page_{page_num}.png"
pix.save(img_path)
image = Image.open(img_path).convert("RGB")
words = page_info.get("words", [])
bboxes = page_info.get("bbox", [])
if words and bboxes:
encoding = tokenizer(
words,
boxes=bboxes,
return_tensors="pt",
truncation=True,
padding=True,
max_length=512
)
input_ids = encoding["input_ids"]
attention_mask = encoding["attention_mask"]
bbox = encoding["bbox"]
image_encoding = feature_extractor(image, return_tensors="pt")
pixel_values = image_encoding["pixel_values"]
with torch.no_grad():
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask,
bbox=bbox,
pixel_values=pixel_values
)
predictions = torch.argmax(outputs.logits, dim=2)
tokens = tokenizer.convert_ids_to_tokens(input_ids[0])
labels = predictions[0].tolist()
current_key = None
current_value = []
for token, label in zip(tokens, labels):
if label == 1: # Key start (hypothetical label, adjust based on training)
if current_key and current_value:
key = " ".join(current_value).strip()
if "agreement name" in current_key.lower() and "MASTER SUBSCRIPTION AGREEMENT" not in key.upper() and "Customer" not in key:
key_values["Agreement Name"] = key
elif "start date" in current_key.lower() or "effective date" in current_key.lower() or "executed as of" in current_key.lower():
key_values["Agreement Start Date"] = key
elif "end date" in current_key.lower() or "termination date" in current_key.lower():
key_values["Agreement End Date"] = key
elif "total agreement value" in current_key.lower() or "amount" in current_key.lower() or "price" in current_key.lower():
key_values["Total Agreement Value"] = key
current_key = token
current_value = []
elif label == 2 and current_key: # Value (hypothetical label, adjust based on training)
current_value.append(token)
if current_key and current_value:
key = " ".join(current_value).strip()
if "agreement name" in current_key.lower() and "MASTER SUBSCRIPTION AGREEMENT" not in key.upper() and "Customer" not in key:
key_values["Agreement Name"] = key
elif "start date" in current_key.lower() or "effective date" in current_key.lower() or "executed as of" in current_key.lower():
key_values["Agreement Start Date"] = key
elif "end date" in current_key.lower() or "termination date" in current_key.lower():
key_values["Agreement End Date"] = key
elif "total agreement value" in current_key.lower() or "amount" in current_key.lower() or "price" in current_key.lower():
key_values["Total Agreement Value"] = key
if os.path.exists(img_path):
os.unlink(img_path)
doc.close()
else:
logger.warning("LayoutLMv3 model components not available, skipping advanced extraction.")
return key_values if any(key_values.values()) else {"status": "failed", "error": "No key-value pairs extracted", "key_values": {}}
except Exception as e:
logger.error(f"Error in extract_key_values_with_layoutlm: {str(e)}")
return {"status": "failed", "error": str(e), "key_values": key_values}
def extract_clauses(page_data: list) -> Dict[str, str]:
"""
Extract clauses from PDF text based on keywords, focusing on key clauses like NO WAIVER and Termination.
Args:
page_data (list): List of dictionaries with 'text' (str) per page.
Returns:
dict: Mapping of clause names to their text content.
"""
clauses = {}
try:
text_data = "\n".join([page.get("text", "") for page in page_data])
logger.info("Starting clause extraction.")
# Search for NO WAIVER clause
no_waiver_match = re.search(r'(?:General\s+Provisions\s*[\s\S]*?NO\s+WAIVER\s*[:\s]*)([\s\S]*?)(?=\n\n|\Z)', text_data, re.IGNORECASE)
if no_waiver_match:
clause_text = no_waiver_match.group(1).strip()
clauses["NO WAIVER"] = clause_text if clause_text else "NO WAIVER clause found but no content extracted"
elif "NO WAIVER" in text_data.upper():
clauses["NO WAIVER"] = re.search(r'(NO\s+WAIVER\s*[:\s]*[\s\S]*?)(?=\n\n|\Z)', text_data, re.IGNORECASE).group(1).strip() if re.search(r'(NO\s+WAIVER\s*[:\s]*[\s\S]*?)(?=\n\n|\Z)', text_data, re.IGNORECASE) else "NO WAIVER clause identified but no detailed content extracted"
# Search for Termination clause
termination_match = re.search(r'(?:Termination\s*[:\s]*)([\s\S]*?)(?=\n\n|\Z)', text_data, re.IGNORECASE)
if termination_match:
clauses["Termination"] = termination_match.group(1).strip()
return clauses if clauses else {"No clauses extracted": "No relevant clauses found in the document"}
except Exception as e:
logger.error(f"Error in extract_clauses: {str(e)}")
return clauses
def run_ai_mapping_with_layoutlm(key_values: Dict[str, str], object_field_names: List[str], pdf_path: str) -> Dict:
"""
Map extracted key-values to object fields, prioritizing Agreement Name, Agreement Start Date,
Agreement End Date, and Total Agreement Value.
Args:
key_values (dict): Extracted key-value pairs.
object_field_names (list): List of object field names.
pdf_path (str): Path to the PDF file (for context if needed).
Returns:
dict: Mapping results with status, mappings, unmapped fields, and error (if any).
"""
try:
mappings = {}
unmapped_fields = object_field_names.copy()
logger.info("Starting mapping process.")
for field in object_field_names:
for key, value in key_values.items():
if field.lower() in key.lower() and value:
mappings[field] = value
if field in unmapped_fields:
unmapped_fields.remove(field)
break
return {
"status": "success",
"mappings": mappings,
"unmapped_fields": unmapped_fields,
"error": None,
"clauses": extract_clauses(page_data) # Include clauses in the output
}
except Exception as e:
logger.error(f"Error in run_ai_mapping_with_layoutlm: {str(e)}")
return {
"status": "failed",
"error": str(e),
"mappings": {},
"unmapped_fields": object_field_names,
"clauses": {}
} |