Spaces:
Sleeping
Sleeping
| import requests | |
| import base64 | |
| import os | |
| from dotenv import load_dotenv | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Retrieve credentials from environment variables | |
| USERNAME = os.getenv("SF_USERNAME") | |
| PASSWORD = os.getenv("SF_PASSWORD") | |
| SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN") | |
| DOMAIN = os.getenv("SF_DOMAIN", "test") # Default to sandbox; use "login" for production | |
| # Validate environment variables | |
| if not all([USERNAME, PASSWORD, SECURITY_TOKEN]): | |
| raise ValueError("❌ Missing required environment variables. Ensure SF_USERNAME, SF_PASSWORD, and SF_SECURITY_TOKEN are set.") | |
| LOGIN_URL = f"https://sathkruthatechsolutionspr45-dev-ed.develop.lightning.force.com/lightning/setup/SetupOneHome/home" | |
| def get_token(): | |
| url = f"https://sathkruthatechsolutionspr45-dev-ed.develop.lightning.force.com/lightning/setup/SetupOneHome/home/services/oauth2/token" | |
| payload = { | |
| 'grant_type': 'password', | |
| 'username': USERNAME, | |
| 'password': PASSWORD + SECURITY_TOKEN | |
| } | |
| try: | |
| response = requests.post(url, data=payload) | |
| print(f"Response Status: {response.status_code}") | |
| print(f"Response Body: {response.text}") | |
| result = response.json() | |
| if response.status_code == 200 and 'access_token' in result: | |
| return result['access_token'], result['instance_url'] | |
| else: | |
| print(f"Login failed with response: {result}") | |
| raise Exception(f"❌ Salesforce login failed: {result}") | |
| except requests.RequestException as e: | |
| raise Exception(f"🚨 Network error during Salesforce login: {str(e)}") | |
| def get_salesforce_objects(token, instance_url): | |
| headers = {'Authorization': f'Bearer {token}'} | |
| response = requests.get(f"{instance_url}/services/data/v56.0/sobjects", headers=headers) | |
| return response.json().get('sobjects', []) | |
| def create_record(object_name, data, token, instance_url): | |
| headers = { | |
| 'Authorization': f'Bearer {token}', | |
| 'Content-Type': 'application/json' | |
| } | |
| url = f"{instance_url}/services/data/v56.0/sobjects/{object_name}/" | |
| response = requests.post(url, json=data, headers=headers) | |
| return response.json() | |
| def attach_pdf(record_id, file_path, token, instance_url): | |
| try: | |
| with open(file_path, "rb") as f: | |
| body = base64.b64encode(f.read()).decode() | |
| data = { | |
| "ParentId": record_id, | |
| "Name": os.path.basename(file_path), | |
| "Body": body | |
| } | |
| headers = { | |
| 'Authorization': f'Bearer {token}', | |
| 'Content-Type': 'application/json' | |
| } | |
| url = f"{instance_url}/services/data/v56.0/sobjects/Attachment" | |
| response = requests.post(url, json=data, headers=headers) | |
| return response.json() | |
| except FileNotFoundError: | |
| raise Exception(f"❌ File not found: {file_path}") | |
| except Exception as e: | |
| raise Exception(f"🚨 Error attaching PDF: {str(e)}") | |
| # Example usage | |
| if __name__ == "__main__": | |
| try: | |
| # Get access token and instance URL | |
| token, instance_url = get_token() | |
| print(f"✅ Successfully authenticated.") | |
| # Example: List Salesforce objects | |
| objects = get_salesforce_objects(token, instance_url) | |
| print(f"Found {len(objects)} Salesforce objects") | |
| # Example: Create a record | |
| sample_data = {"Name": "Test Account"} | |
| result = create_record("Account", sample_data, token, instance_url) | |
| print(f"Created record: {result}") | |
| # Example: Attach a PDF | |
| record_id = result.get("id") | |
| if record_id: | |
| attach_pdf(record_id, "sample.pdf", token, instance_url) | |
| print("✅ PDF attached successfully") | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |