pavansuresh's picture
Update utils.py
0a810c4 verified
import os
import fitz # PyMuPDF
import json
import re
import difflib
import base64 # βœ… added for base64 encoding
from simple_salesforce import Salesforce
from dotenv import load_dotenv
from datetime import datetime # βœ… added for date conversion
# Load .env variables
load_dotenv()
def get_salesforce_client():
try:
sf = Salesforce(
username=os.getenv("SF_USERNAME"),
password=os.getenv("SF_PASSWORD"),
security_token=os.getenv("SF_SECURITY_TOKEN"),
domain=os.getenv("SF_DOMAIN").replace("https://", "").replace(".salesforce.com", "")
)
return sf, None
except Exception as e:
return None, str(e)
def get_salesforce_objects(sf):
try:
desc = sf.describe()
object_names = []
for obj in desc['sobjects']:
if not obj['deprecatedAndHidden']: # Skip hidden/deprecated
object_names.append(obj['name'])
object_names.sort()
return object_names, None
except Exception as e:
return [], str(e)
def get_object_fields(sf, object_name):
try:
metadata = sf.__getattr__(object_name).describe()
fields = [field['name'] for field in metadata['fields']]
return fields, None
except Exception as e:
return [], str(e)
def extract_text_from_pdf(pdf_path):
try:
doc = fitz.open(pdf_path)
text = ""
for page in doc:
text += page.get_text()
return text.strip(), None
except Exception as e:
return None, str(e)
# βœ… Helper: Convert date to YYYY-MM-DD
def convert_to_salesforce_date(date_str):
for fmt in ("%m/%d/%Y", "%m/%d/%y"):
try:
dt = datetime.strptime(date_str, fmt)
return dt.strftime("%Y-%m-%d")
except ValueError:
continue
return date_str # Return original if no match
# βœ… Extract key-value pairs smartly from the PDF text
def extract_key_value_pairs(pdf_path):
try:
doc = fitz.open(pdf_path)
text = ""
for page in doc:
text += page.get_text()
keys = []
values = []
# Pattern 1: Direct "Key: Value"
lines = text.splitlines()
for line in lines:
match = re.match(r"^([A-Z][A-Za-z0-9 ()/_\-]{3,50})\s*[:\-]\s*(.+)$", line.strip())
if match:
key = match.group(1).strip()
value = match.group(2).strip()
if len(value) > 1:
keys.append(key)
values.append(value)
# Pattern 2: Special extractions
# Total Agreement Value
match_val = re.search(r"Total Agreement Value[^\$]*\$\s?([\d,]+(?:\.\d{2})?)", text, re.IGNORECASE)
if match_val:
keys.append("Total Agreement Value")
numeric_value = match_val.group(1).replace(",", "")
values.append(numeric_value)
# Agreement Name
match_name = re.search(r"Agreement\s+(MSA\s+[A-Za-z0-9 _\-]+)", text, re.IGNORECASE)
if match_name:
keys.append("Agreement Name")
values.append(match_name.group(1).strip())
# Agreement Start and End Dates
match_dates = re.search(
r"effective as of\s*([0-9]{1,2}/[0-9]{1,2}/[0-9]{2,4}).*?until\s*[<\(]?([0-9]{1,2}/[0-9]{1,2}/[0-9]{2,4})",
text, re.IGNORECASE | re.DOTALL
)
if match_dates:
keys.append("Agreement Start Date")
values.append(convert_to_salesforce_date(match_dates.group(1).strip()))
keys.append("Agreement End Date")
values.append(convert_to_salesforce_date(match_dates.group(2).strip()))
return [{"keys": keys, "values": values}], None
except Exception as e:
return None, str(e)
# βœ… FIXED: Use original key as mapping key (not value)
def map_fields(extracted_data, object_fields):
try:
mappings = {}
confidence_scores = {}
keys = extracted_data[0]["keys"]
values = extracted_data[0]["values"]
for key, value in zip(keys, values):
if key.lower() in ["name", "email"]:
continue # Skip these
best_match = difflib.get_close_matches(key, object_fields, n=1, cutoff=0.0)
if best_match:
matched_field = best_match[0]
confidence = difflib.SequenceMatcher(None, key.lower(), matched_field.lower()).ratio()
mappings[key] = matched_field
confidence_scores[key] = round(confidence, 2)
else:
mappings[key] = object_fields[0]
confidence_scores[key] = 0.0
return mappings, confidence_scores, None
except Exception as e:
return None, None, str(e)
def create_record(sf, object_name, data):
try:
result = sf.__getattr__(object_name).create(data)
return result.get("id", "Unknown ID"), None
except Exception as e:
return None, str(e)
# βœ… FIXED: Properly encode PDF as base64 string
def attach_pdf(sf, record_id, pdf_path):
try:
with open(pdf_path, "rb") as f:
body = f.read()
base64_body = base64.b64encode(body).decode("utf-8")
content_version = sf.ContentVersion.create({
"Title": os.path.basename(pdf_path),
"PathOnClient": os.path.basename(pdf_path),
"VersionData": base64_body
})
content_document_id = sf.query(
f"SELECT ContentDocumentId FROM ContentVersion WHERE Id = '{content_version['id']}'"
)["records"][0]["ContentDocumentId"]
sf.ContentDocumentLink.create({
"ContentDocumentId": content_document_id,
"LinkedEntityId": record_id,
"ShareType": "V"
})
return "PDF attached successfully", None
except Exception as e:
return None, str(e)
def log_failure(pdf_path, object_name, error):
with open("failures.json", "a") as f:
json.dump({"pdf": pdf_path, "object": object_name, "error": error}, f)
f.write("\n")