Spaces:
Sleeping
Sleeping
| import os | |
| import fitz # PyMuPDF | |
| import json | |
| import re | |
| import difflib | |
| import base64 # β added for base64 encoding | |
| from simple_salesforce import Salesforce | |
| from dotenv import load_dotenv | |
| from datetime import datetime # β added for date conversion | |
| # Load .env variables | |
| load_dotenv() | |
| def get_salesforce_client(): | |
| try: | |
| sf = Salesforce( | |
| username=os.getenv("SF_USERNAME"), | |
| password=os.getenv("SF_PASSWORD"), | |
| security_token=os.getenv("SF_SECURITY_TOKEN"), | |
| domain=os.getenv("SF_DOMAIN").replace("https://", "").replace(".salesforce.com", "") | |
| ) | |
| return sf, None | |
| except Exception as e: | |
| return None, str(e) | |
| def get_salesforce_objects(sf): | |
| try: | |
| desc = sf.describe() | |
| object_names = [] | |
| for obj in desc['sobjects']: | |
| if not obj['deprecatedAndHidden']: # Skip hidden/deprecated | |
| object_names.append(obj['name']) | |
| object_names.sort() | |
| return object_names, None | |
| except Exception as e: | |
| return [], str(e) | |
| def get_object_fields(sf, object_name): | |
| try: | |
| metadata = sf.__getattr__(object_name).describe() | |
| fields = [field['name'] for field in metadata['fields']] | |
| return fields, None | |
| except Exception as e: | |
| return [], str(e) | |
| def extract_text_from_pdf(pdf_path): | |
| try: | |
| doc = fitz.open(pdf_path) | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| return text.strip(), None | |
| except Exception as e: | |
| return None, str(e) | |
| # β Helper: Convert date to YYYY-MM-DD | |
| def convert_to_salesforce_date(date_str): | |
| for fmt in ("%m/%d/%Y", "%m/%d/%y"): | |
| try: | |
| dt = datetime.strptime(date_str, fmt) | |
| return dt.strftime("%Y-%m-%d") | |
| except ValueError: | |
| continue | |
| return date_str # Return original if no match | |
| # β Extract key-value pairs smartly from the PDF text | |
| def extract_key_value_pairs(pdf_path): | |
| try: | |
| doc = fitz.open(pdf_path) | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| keys = [] | |
| values = [] | |
| # Pattern 1: Direct "Key: Value" | |
| lines = text.splitlines() | |
| for line in lines: | |
| match = re.match(r"^([A-Z][A-Za-z0-9 ()/_\-]{3,50})\s*[:\-]\s*(.+)$", line.strip()) | |
| if match: | |
| key = match.group(1).strip() | |
| value = match.group(2).strip() | |
| if len(value) > 1: | |
| keys.append(key) | |
| values.append(value) | |
| # Pattern 2: Special extractions | |
| # Total Agreement Value | |
| match_val = re.search(r"Total Agreement Value[^\$]*\$\s?([\d,]+(?:\.\d{2})?)", text, re.IGNORECASE) | |
| if match_val: | |
| keys.append("Total Agreement Value") | |
| numeric_value = match_val.group(1).replace(",", "") | |
| values.append(numeric_value) | |
| # Agreement Name | |
| match_name = re.search(r"Agreement\s+(MSA\s+[A-Za-z0-9 _\-]+)", text, re.IGNORECASE) | |
| if match_name: | |
| keys.append("Agreement Name") | |
| values.append(match_name.group(1).strip()) | |
| # Agreement Start and End Dates | |
| match_dates = re.search( | |
| r"effective as of\s*([0-9]{1,2}/[0-9]{1,2}/[0-9]{2,4}).*?until\s*[<\(]?([0-9]{1,2}/[0-9]{1,2}/[0-9]{2,4})", | |
| text, re.IGNORECASE | re.DOTALL | |
| ) | |
| if match_dates: | |
| keys.append("Agreement Start Date") | |
| values.append(convert_to_salesforce_date(match_dates.group(1).strip())) | |
| keys.append("Agreement End Date") | |
| values.append(convert_to_salesforce_date(match_dates.group(2).strip())) | |
| return [{"keys": keys, "values": values}], None | |
| except Exception as e: | |
| return None, str(e) | |
| # β FIXED: Use original key as mapping key (not value) | |
| def map_fields(extracted_data, object_fields): | |
| try: | |
| mappings = {} | |
| confidence_scores = {} | |
| keys = extracted_data[0]["keys"] | |
| values = extracted_data[0]["values"] | |
| for key, value in zip(keys, values): | |
| if key.lower() in ["name", "email"]: | |
| continue # Skip these | |
| best_match = difflib.get_close_matches(key, object_fields, n=1, cutoff=0.0) | |
| if best_match: | |
| matched_field = best_match[0] | |
| confidence = difflib.SequenceMatcher(None, key.lower(), matched_field.lower()).ratio() | |
| mappings[key] = matched_field | |
| confidence_scores[key] = round(confidence, 2) | |
| else: | |
| mappings[key] = object_fields[0] | |
| confidence_scores[key] = 0.0 | |
| return mappings, confidence_scores, None | |
| except Exception as e: | |
| return None, None, str(e) | |
| def create_record(sf, object_name, data): | |
| try: | |
| result = sf.__getattr__(object_name).create(data) | |
| return result.get("id", "Unknown ID"), None | |
| except Exception as e: | |
| return None, str(e) | |
| # β FIXED: Properly encode PDF as base64 string | |
| def attach_pdf(sf, record_id, pdf_path): | |
| try: | |
| with open(pdf_path, "rb") as f: | |
| body = f.read() | |
| base64_body = base64.b64encode(body).decode("utf-8") | |
| content_version = sf.ContentVersion.create({ | |
| "Title": os.path.basename(pdf_path), | |
| "PathOnClient": os.path.basename(pdf_path), | |
| "VersionData": base64_body | |
| }) | |
| content_document_id = sf.query( | |
| f"SELECT ContentDocumentId FROM ContentVersion WHERE Id = '{content_version['id']}'" | |
| )["records"][0]["ContentDocumentId"] | |
| sf.ContentDocumentLink.create({ | |
| "ContentDocumentId": content_document_id, | |
| "LinkedEntityId": record_id, | |
| "ShareType": "V" | |
| }) | |
| return "PDF attached successfully", None | |
| except Exception as e: | |
| return None, str(e) | |
| def log_failure(pdf_path, object_name, error): | |
| with open("failures.json", "a") as f: | |
| json.dump({"pdf": pdf_path, "object": object_name, "error": error}, f) | |
| f.write("\n") |