Spaces:
Sleeping
Sleeping
| import requests | |
| # Salesforce credentials | |
| SALESFORCE_URL = "https://login.salesforce.com/services/oauth2/token" | |
| CLIENT_ID = "3MVG9PwZx9R6_UrcDlsRUsfM9CGTVKE82QnI5Vz02b.lv3H2yjDv_wy9.nrYihWYLUD1bv08N_qCG_gxiV5y8" | |
| CLIENT_SECRET = "94983416FB1DF4E754B56F69982F07D94D8971523E6893B8A62D75CD8E9E3A34" | |
| USERNAME = "surendra@sathkrutha.ccom" | |
| PASSWORD = "Lavanyanaga@123" | |
| SECURITY_TOKEN = "z7Wvk6mys7n8XjqbYKf3bwBh7" | |
| # Authenticate with Salesforce | |
| def get_salesforce_access_token(): | |
| """Authenticate with Salesforce Developer Org and get an access token.""" | |
| try: | |
| # Authentication URL for Salesforce production and Developer Org | |
| auth_url = "https://login.salesforce.com/services/oauth2/token" | |
| # Payload with credentials | |
| payload = { | |
| "grant_type": "password", | |
| "client_id": "3MVG9PwZx9R6_UrcDlsRUsfM9CGTVKE82QnI5Vz02b.lv3H2yjDv_wy9.nrYihWYLUD1bv08N_qCG_gxiV5y8", # Consumer Key | |
| "client_secret": "94983416FB1DF4E754B56F69982F07D94D8971523E6893B8A62D75CD8E9E3A34", # Consumer Secret | |
| "username": "surendra@sathkrutha.ccom", # Developer Org username | |
| "password": "Lavanyanaga@123" + "GH9RG97LroDoLe6gAAOHaJBP", # Password + Security Token | |
| } | |
| # Send POST request to authenticate | |
| response = requests.post(auth_url, data=payload) | |
| # Print full response for debugging | |
| print("Response Status Code:", response.status_code) | |
| print("Response Text:", response.text) | |
| response.raise_for_status() # Raise error for bad HTTP responses | |
| # Parse and return the access token and instance URL | |
| access_token = response.json().get("access_token") | |
| instance_url = response.json().get("instance_url") | |
| print(f"Successfully authenticated. Instance URL: {instance_url}") | |
| return access_token, instance_url | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error obtaining Salesforce access token: {e}") | |
| return None, None | |
| # Save user to Salesforce | |
| def save_user(name, phone, email, password): | |
| """Save user details to Salesforce.""" | |
| access_token, instance_url = get_salesforce_access_token() | |
| if not access_token: | |
| print("Failed to authenticate with Salesforce.") | |
| return False | |
| # Check if email already exists | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Content-Type": "application/json", | |
| } | |
| query = f"SELECT Id, Email__c FROM User_Login__c WHERE Email__c = '{email}'" | |
| query_response = requests.get( | |
| f"{instance_url}/services/data/v53.0/query", headers=headers, params={"q": query} | |
| ) | |
| print("Query Response:", query_response.json()) # Log the query response | |
| if query_response.status_code == 200: | |
| records = query_response.json().get("records", []) | |
| if records: | |
| print(f"Email already exists in Salesforce: {email}") | |
| return False # Email already exists | |
| # Save new user with hashed password | |
| hashed_password = hashpw(password.encode(), gensalt()).decode() | |
| data = { | |
| "Name__c": name, | |
| "Phone__c": phone, | |
| "Email__c": email, | |
| "Password__c": hashed_password, | |
| } | |
| response = requests.post( | |
| f"{instance_url}/services/data/v53.0/sobjects/User_Login__c", | |
| headers=headers, | |
| json=data, | |
| ) | |
| print("Save Response:", response.status_code, response.json()) # Log the save response | |
| if response.status_code == 201: | |
| print("User created successfully in Salesforce.") | |
| return True | |
| else: | |
| print("Failed to create user in Salesforce:", response.json()) | |
| return False | |
| def check_credentials(email, password): | |
| """Check user credentials during login.""" | |
| access_token, instance_url = get_salesforce_access_token() | |
| if not access_token: | |
| print("Failed to authenticate with Salesforce.") | |
| return False | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Content-Type": "application/json", | |
| } | |
| query = f"SELECT Password__c FROM User_Login__c WHERE Email__c = '{email}'" | |
| query_response = requests.get( | |
| f"{instance_url}/services/data/v53.0/query", headers=headers, params={"q": query} | |
| ) | |
| print("Query Response:", query_response.json()) # Log the query response | |
| if query_response.status_code == 200: | |
| records = query_response.json().get("records", []) | |
| if records: | |
| stored_password = records[0]["Password__c"] | |
| if checkpw(password.encode(), stored_password.encode()): | |
| print("Login successful.") | |
| return True | |
| else: | |
| print("Incorrect password.") | |
| else: | |
| print("Email not found in Salesforce.") | |
| else: | |
| print("Failed to query Salesforce:", query_response.json()) | |
| return False | |