import requests import base64 import os from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Retrieve credentials from environment variables USERNAME = os.getenv("SF_USERNAME") PASSWORD = os.getenv("SF_PASSWORD") SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") DOMAIN = os.getenv("SF_DOMAIN", "test") # Default to sandbox; use "login" for production # Validate environment variables if not all([USERNAME, PASSWORD, SECURITY_TOKEN]): raise ValueError("❌ Missing required environment variables. Ensure SF_USERNAME, SF_PASSWORD, and SF_SECURITY_TOKEN are set.") LOGIN_URL = f"https://sathkruthatechsolutionspr45-dev-ed.develop.lightning.force.com/lightning/setup/SetupOneHome/home" def get_token(): url = f"https://sathkruthatechsolutionspr45-dev-ed.develop.lightning.force.com/lightning/setup/SetupOneHome/home/services/oauth2/token" payload = { 'grant_type': 'password', 'username': USERNAME, 'password': PASSWORD + SECURITY_TOKEN } try: response = requests.post(url, data=payload) print(f"Response Status: {response.status_code}") print(f"Response Body: {response.text}") result = response.json() if response.status_code == 200 and 'access_token' in result: return result['access_token'], result['instance_url'] else: print(f"Login failed with response: {result}") raise Exception(f"❌ Salesforce login failed: {result}") except requests.RequestException as e: raise Exception(f"🚨 Network error during Salesforce login: {str(e)}") def get_salesforce_objects(token, instance_url): headers = {'Authorization': f'Bearer {token}'} response = requests.get(f"{instance_url}/services/data/v56.0/sobjects", headers=headers) return response.json().get('sobjects', []) def create_record(object_name, data, token, instance_url): headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } url = f"{instance_url}/services/data/v56.0/sobjects/{object_name}/" response = requests.post(url, json=data, headers=headers) return response.json() def attach_pdf(record_id, file_path, token, instance_url): try: with open(file_path, "rb") as f: body = base64.b64encode(f.read()).decode() data = { "ParentId": record_id, "Name": os.path.basename(file_path), "Body": body } headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } url = f"{instance_url}/services/data/v56.0/sobjects/Attachment" response = requests.post(url, json=data, headers=headers) return response.json() except FileNotFoundError: raise Exception(f"❌ File not found: {file_path}") except Exception as e: raise Exception(f"🚨 Error attaching PDF: {str(e)}") # Example usage if __name__ == "__main__": try: # Get access token and instance URL token, instance_url = get_token() print(f"✅ Successfully authenticated.") # Example: List Salesforce objects objects = get_salesforce_objects(token, instance_url) print(f"Found {len(objects)} Salesforce objects") # Example: Create a record sample_data = {"Name": "Test Account"} result = create_record("Account", sample_data, token, instance_url) print(f"Created record: {result}") # Example: Attach a PDF record_id = result.get("id") if record_id: attach_pdf(record_id, "sample.pdf", token, instance_url) print("✅ PDF attached successfully") except Exception as e: print(f"Error: {str(e)}")