Spaces:
Sleeping
Sleeping
| import requests | |
| import json | |
| # Salesforce Credentials | |
| CONSUMER_KEY = "3MVG9PwZx9R6_UrcDlsRUsfM9CGTVKE82QnI5Vz02b.lv3H2yjDv_wy9.nrYihWYLUD1bv08N_qCG_gxiV5y8" | |
| CONSUMER_SECRET = "94983416FB1DF4E754B56F69982F07D94D8971523E6893B8A62D75CD8E9E3E34" | |
| USERNAME = "surendra@sathkrutha.ccom" | |
| PASSWORD = "Lavanyanaga@123" | |
| SECURITY_TOKEN = "GH9RG97LroDoLe6gAAOHaJBP" | |
| # Salesforce Authentication | |
| AUTH_URL = "https://login.salesforce.com/services/oauth2/token" | |
| BASE_URL = None | |
| HEADERS = None | |
| def authenticate_salesforce(): | |
| global BASE_URL, HEADERS | |
| data = { | |
| "grant_type": "password", | |
| "client_id": CONSUMER_KEY, | |
| "client_secret": CONSUMER_SECRET, | |
| "username": USERNAME, | |
| "password": PASSWORD + SECURITY_TOKEN, | |
| } | |
| response = requests.post(AUTH_URL, data=data) | |
| if response.status_code == 200: | |
| auth_data = response.json() | |
| BASE_URL = auth_data["instance_url"] | |
| HEADERS = { | |
| "Authorization": f"Bearer {auth_data['access_token']}", | |
| "Content-Type": "application/json", | |
| } | |
| else: | |
| raise Exception(f"Failed to authenticate with Salesforce: {response.text}") | |
| def check_credentials(email, password): | |
| """Check user credentials during login.""" | |
| try: | |
| authenticate_salesforce() | |
| query = f"SELECT Password__c FROM User_Login__c WHERE Email__c = '{email}'" | |
| response = requests.get(f"{BASE_URL}/services/data/v56.0/query?q={query}", headers=HEADERS) | |
| if response.status_code == 200: | |
| records = response.json().get("records", []) | |
| if records and records[0]["Password__c"] == password: | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error checking credentials: {e}") | |
| return False | |
| def save_user(name, phone, email, password): | |
| """Save user details to Salesforce.""" | |
| try: | |
| authenticate_salesforce() | |
| # Check if email already exists | |
| query = f"SELECT Id FROM User_Login__c WHERE Email__c = '{email}'" | |
| response = requests.get(f"{BASE_URL}/services/data/v56.0/query?q={query}", headers=HEADERS) | |
| if response.status_code == 200: | |
| if response.json().get("totalSize", 0) > 0: | |
| return False # Email already exists | |
| # Create new user | |
| user_data = { | |
| "Name__c": name, | |
| "Phone__c": phone, | |
| "Email__c": email, | |
| "Password__c": password, | |
| } | |
| response = requests.post(f"{BASE_URL}/services/data/v56.0/sobjects/User_Login__c/", headers=HEADERS, json=user_data) | |
| if response.status_code == 201: | |
| return True | |
| else: | |
| print(f"Error saving user: {response.text}") | |
| return False | |
| except Exception as e: | |
| print(f"Error saving user: {e}") | |
| return False |