import requests import os from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() def get_token(): """ Retrieve Salesforce OAuth2 access token using password flow. Returns: tuple: (access_token, instance_url) or raises Exception on failure. """ try: is_sandbox = os.getenv("SF_IS_SANDBOX", "false").lower() == "true" domain = "test" if is_sandbox else "login" oauth_url = f"https://{domain}.salesforce.com/services/oauth2/token" print(f"Attempting authentication with URL: {oauth_url}") client_id = os.getenv("SF_CLIENT_ID") client_secret = os.getenv("SF_CLIENT_SECRET") username = os.getenv("SF_USERNAME") password = os.getenv("SF_PASSWORD") security_token = os.getenv("SF_SECURITY_TOKEN", "") if not all([client_id, client_secret, username, password]): raise Exception("Missing required environment variables: SF_CLIENT_ID, SF_CLIENT_SECRET, SF_USERNAME, or SF_PASSWORD") # Append security token to password if provided full_password = f"{password}{security_token}" if security_token else password payload = { "grant_type": "password", "client_id": client_id, "client_secret": client_secret, "username": username, "password": full_password } print(f"Payload: {payload}") response = requests.post(oauth_url, data=payload, timeout=10) response.raise_for_status() data = response.json() print(f"Authentication successful. Response: {data}") return data["access_token"], data["instance_url"] except requests.exceptions.HTTPError as e: print(f"HTTP Error: {str(e)} - Response: {e.response.text if e.response is not None else 'No response'}") raise Exception(f"Authentication failed: {str(e)}") except KeyError as e: print(f"Unexpected response format: Missing key {str(e)} - Response: {data if 'data' in locals() else 'No data'}") raise Exception(f"Unexpected response format: Missing key {str(e)}") except Exception as e: print(f"General error during authentication: {str(e)}") raise Exception(f"Authentication failed: {str(e)}") def get_salesforce_objects(token, instance_url): """ Fetch a list of createable Salesforce objects. Args: token (str): Salesforce access token. instance_url (str): Salesforce instance URL. Returns: list: List of dictionaries containing object details. """ try: url = f"{instance_url}/services/data/v58.0/sobjects/" headers = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() return [obj for obj in data["sobjects"] if obj.get("createable")] except requests.exceptions.RequestException as e: raise Exception(f"Failed to fetch objects: {str(e)}") def get_salesforce_object_fields(token, instance_url, object_name): """ Fetch createable fields for a specific Salesforce object. Args: token (str): Salesforce access token. instance_url (str): Salesforce instance URL. object_name (str): Salesforce object API name. Returns: list: List of dictionaries containing field details. """ try: url = f"{instance_url}/services/data/v58.0/sobjects/{object_name}/describe" headers = {"Authorization": f"Bearer {token}"} response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() return [field for field in data["fields"] if field.get("createable")] except requests.exceptions.RequestException as e: raise Exception(f"Failed to fetch fields for {object_name}: {str(e)}") def create_or_update_record(object_name, mappings, token, instance_url, record_id=None): """ Create a new record or update an existing one in Salesforce. Args: object_name (str): Salesforce object API name (e.g., 'Account'). mappings (dict): Field-value mappings for the record. token (str): Salesforce access token. instance_url (str): Salesforce instance URL. record_id (str, optional): ID of the record to update; if None, creates a new record. Returns: dict: API response containing the record ID or error details. """ try: url = f"{instance_url}/services/data/v58.0/sobjects/{object_name}/" headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} if record_id: url += f"{record_id}" response = requests.patch(url, headers=headers, json=mappings) else: response = requests.post(url, headers=headers, json=mappings) response.raise_for_status() response_data = response.json() if response.status_code in [200, 201]: response_data['isUpdate'] = bool(record_id) return response_data except requests.exceptions.RequestException as e: return {"error": f"API request failed: {str(e)}", "status_code": response.status_code} if 'response' in locals() else {"error": str(e)} def attach_pdf(record_id, pdf_path, token, instance_url): """ Attach a PDF to a Salesforce record as a File. Args: record_id (str): Salesforce record ID. pdf_path (str): Path to the PDF file. token (str): Salesforce access token. instance_url (str): Salesforce instance URL. Returns: dict: API response or error details. """ try: url = f"{instance_url}/services/data/v58.0/sobjects/ContentVersion/" headers = {"Authorization": f"Bearer {token}"} with open(pdf_path, 'rb') as pdf_file: files = {'entity_content': pdf_file} data = { "Title": os.path.basename(pdf_path), "PathOnClient": os.path.basename(pdf_path), "VersionData": pdf_file.read(), "FirstPublishLocationId": record_id } response = requests.post(url, headers=headers, files=files, data=data) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: return {"error": f"Failed to attach PDF: {str(e)}"}