Spaces:
Sleeping
Sleeping
| import requests | |
| import base64 | |
| import json | |
| import os | |
| from simple_salesforce import Salesforce | |
| from pdf2image import convert_from_path | |
| import pytesseract | |
| from transformers import LayoutLMv3Processor, LayoutLMv3ForTokenClassification | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Salesforce Authentication | |
| def get_salesforce_client(): | |
| try: | |
| username = os.getenv('SF_USERNAME') | |
| password = os.getenv('SF_PASSWORD') | |
| security_token = os.getenv('SF_SECURITY_TOKEN') | |
| domain = os.getenv('SF_DOMAIN', 'login.salesforce.com') # Default to login.salesforce.com if not set | |
| instance_url = f"https://{domain}" | |
| if not all([username, password, security_token, instance_url]): | |
| raise ValueError("Missing required Salesforce credentials or instance URL") | |
| sf = Salesforce( | |
| username=username, | |
| password=password, | |
| security_token=security_token, | |
| instance_url=instance_url | |
| ) | |
| print("Salesforce client connected successfully") | |
| return sf, None | |
| except Exception as e: | |
| print(f"Salesforce connection failed: {str(e)}") | |
| return None, str(e) | |
| # Fetch Salesforce Objects | |
| def get_salesforce_objects(sf): | |
| try: | |
| response = sf.restful('sobjects') | |
| return [obj['name'] for obj in response['sobjects'] if obj['createable']], None | |
| except Exception as e: | |
| return [], str(e) | |
| # Fetch Object Fields | |
| def get_object_fields(sf, object_name): | |
| try: | |
| desc = sf.__getattr__(object_name).describe() | |
| return [field['name'] for field in desc['fields']], None | |
| except Exception as e: | |
| return [], str(e) | |
| # OCR for Text Extraction | |
| def extract_text_from_pdf(pdf_path): | |
| try: | |
| images = convert_from_path(pdf_path) | |
| text_data = [pytesseract.image_to_string(img) for img in images] | |
| return {"pages": text_data}, None | |
| except Exception as e: | |
| if "poppler" in str(e).lower(): | |
| return {}, "Error: Unable to process PDF. Please ensure Poppler is installed and in PATH (e.g., 'apt-get install poppler-utils' on Ubuntu)." | |
| return {}, str(e) | |
| # Key-Value Pair Extraction using LayoutLMv3 | |
| def extract_key_value_pairs(pdf_path): | |
| try: | |
| processor = LayoutLMv3Processor.from_pretrained("microsoft/layoutlmv3-base") | |
| model = LayoutLMv3ForTokenClassification.from_pretrained("microsoft/layoutlmv3-base-finetuned-funsd") | |
| images = convert_from_path(pdf_path) | |
| extracted_data = [] | |
| for img in images: | |
| encoding = processor(img, truncation=True, return_tensors="pt") | |
| outputs = model(**encoding) | |
| # Simplified: Return dummy key-value pairs (real implementation needs post-processing) | |
| extracted_data.append({"keys": ["Contract Number", "Date"], "values": ["12345", "2025-01-01"]}) | |
| return extracted_data, None | |
| except Exception as e: | |
| if "poppler" in str(e).lower(): | |
| return [], "Error: Unable to process PDF. Please ensure Poppler is installed and in PATH (e.g., 'apt-get install poppler-utils' on Ubuntu)." | |
| return [], str(e) | |
| # Map Extracted Data to Salesforce Fields | |
| def map_fields(extracted_data, salesforce_fields): | |
| mappings = {} | |
| confidence_scores = {} | |
| for key in extracted_data[0]["keys"]: # Simplified: Using first page | |
| for field in salesforce_fields: | |
| if key.lower() in field.lower(): | |
| mappings[key] = field | |
| confidence_scores[key] = 0.9 # Dummy confidence score | |
| return mappings, confidence_scores, None | |
| # Create Salesforce Record | |
| def create_record(sf, object_api_name, data): | |
| try: | |
| result = sf.__getattr__(object_api_name).create(data) | |
| return result['id'], None | |
| except Exception as e: | |
| return None, str(e) | |
| # Attach PDF to Salesforce Record | |
| def attach_pdf(sf, record_id, file_path): | |
| try: | |
| with open(file_path, "rb") as f: | |
| encoded_file = base64.b64encode(f.read()).decode() | |
| attachment = { | |
| "ParentId": record_id, | |
| "Name": os.path.basename(file_path), | |
| "Body": encoded_file | |
| } | |
| sf.Attachment.create(attachment) | |
| return "PDF Attached", None | |
| except Exception as e: | |
| return None, str(e) | |
| # Log Failed Migration | |
| def log_failure(pdf_path, object_name, error): | |
| log_entry = {"pdf": pdf_path, "object": object_name, "error": error} | |
| with open("failures.json", "a") as f: | |
| json.dump(log_entry, f) | |
| f.write("\n") |