from flask import Flask, request, jsonify import os import json import requests # For making HTTP requests to Salesforce from model import process_vendor_logs # Import your scoring logic from model.py app = Flask(__name__) # --- Configuration --- # These should be set as environment variables in your Hugging Face Space for security SALESFORCE_CONSUMER_KEY = os.environ.get('SALESFORCE_CONSUMER_KEY') SALESFORCE_CONSUMER_SECRET = os.environ.get('SALESFORCE_CONSUMER_SECRET') SALESFORCE_CALLBACK_URL = os.environ.get('SALESFORCE_CALLBACK_URL') # Needs to match your Connected App SALESFORCE_TOKEN_URL = "https://login.salesforce.com/services/oauth2/token" # Or your sandbox URL SALESFORCE_AUTH_URL = "https://login.salesforce.com/services/oauth2/authorize" # Or your sandbox URL # Placeholder for your Salesforce instance URL. This will be obtained dynamically SALESFORCE_INSTANCE_URL = None # Hugging Face Space doesn't persist data, so use a very basic in-memory storage for demonstration. # Replace with a database (e.g., PostgreSQL) for a production environment. auth_code = None access_token = None refresh_token = None # --- Helper Functions --- def get_salesforce_access_token(code): """ Exchanges an authorization code for an access token and refresh token from Salesforce. Args: code: The authorization code received from Salesforce. Returns: A tuple containing the access token and refresh token, or (None, None) on error. """ token_data = { 'grant_type': 'authorization_code', 'code': code, 'client_id': SALESFORCE_CONSUMER_KEY, 'client_secret': SALESFORCE_CONSUMER_SECRET, 'redirect_uri': SALESFORCE_CALLBACK_URL } response = requests.post(SALESFORCE_TOKEN_URL, data=token_data) if response.status_code == 200: token_response = response.json() return token_response.get('access_token'), token_response.get('refresh_token'), token_response.get('instance_url') else: print(f"Error getting access token: {response.text}") # Log the error return None, None, None def refresh_salesforce_access_token(refresh_token): """ Refreshes the access token using the refresh token. Args: refresh_token: The refresh token from Salesforce. Returns: A tuple containing the new access token and refresh token, or (None, None) on error. """ token_data = { 'grant_type': 'refresh_token', 'refresh_token': refresh_token, 'client_id': SALESFORCE_CONSUMER_KEY, 'client_secret': SALESFORCE_CONSUMER_SECRET, 'redirect_uri': SALESFORCE_CALLBACK_URL #important for refresh } response = requests.post(SALESFORCE_TOKEN_URL, data=token_data) if response.status_code == 200: token_response = response.json() return token_response.get('access_token'), token_response.get('refresh_token'), token_response.get('instance_url') else: print(f"Error refreshing access token: {response.text}") # Log the error return None, None, None def query_salesforce(soql_query, access_token, instance_url): """ Executes a SOQL query against Salesforce. Args: soql_query: The SOQL query string. access_token: The Salesforce access token. instance_url: The Salesforce instance URL. Returns: A list of records from Salesforce, or None on error. """ headers = {'Authorization': f'Bearer {access_token}'} url = f"{instance_url}/services/data/v59.0/query/?q={soql_query}" # Use API version v59.0 response = requests.get(url, headers=headers) if response.status_code == 200: return response.json().get('records') else: print(f"Error querying Salesforce: {response.text}") # Log the error return None def send_data_to_salesforce(sobject_name, data, access_token, instance_url): """ Sends data to Salesforce to create a new record. Args: sobject_name: The Salesforce object name (e.g., 'Subcontractor_Performance_Score__c'). data: A dictionary containing the data to send. access_token: The Salesforce access token. instance_url: The Salesforce instance URL. Returns: The ID of the created record, or None on error. """ headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } url = f"{instance_url}/services/data/v59.0/sobjects/{sobject_name}/" # Use API version v59.0 response = requests.post(url, headers=headers, json=data) if 200 <= response.status_code < 300: # Successful creation (201 Created) return response.json().get('id') else: print(f"Error sending data to Salesforce ({sobject_name}): {response.text}") # Log error return None # --- Flask Routes --- @app.route('/') def index(): """ Displays a welcome message and instructions. """ return ("Welcome to the Subcontractor Performance Scorer API!\n" "1. To authorize with Salesforce, visit /authorize_salesforce.\n" "2. After authorization, vendor logs will be processed automatically (in a real implementation).\n" "3. Scores will be sent to Salesforce (in a real implementation).\n") @app.route('/authorize_salesforce') def authorize_salesforce(): """ Redirects the user to the Salesforce authorization URL to obtain an authorization code. """ authorization_url = ( f"{SALESFORCE_AUTH_URL}?" "response_type=code&" f"client_id={SALESFORCE_CONSUMER_KEY}&" f"redirect_uri={SALESFORCE_CALLBACK_URL}&" "scope=api" # Add other scopes as needed (e.g., 'refresh_token') ) return jsonify({"authorization_url": authorization_url}) # Return as JSON for easier handling @app.route('/callback') def callback(): """ Handles the Salesforce callback after authorization. Exchanges the authorization code for an access token and refresh token. """ global auth_code, access_token, refresh_token, SALESFORCE_INSTANCE_URL code = request.args.get('code') if not code: return jsonify({'error': 'Authorization code not provided.'}), 400 auth_code = code # Store the auth code access_token, refresh_token, SALESFORCE_INSTANCE_URL = get_salesforce_access_token(code) if access_token: return jsonify({'message': 'Successfully authorized with Salesforce! Access token and refresh token obtained.'}), 200 else: return jsonify({'error': 'Failed to obtain access token.'}), 500 @app.route('/process_logs') # You might trigger this from a Salesforce outbound message or scheduled job. def process_logs(): """ Retrieves vendor logs from Salesforce, processes them using the scoring model, and sends the results to Salesforce. """ global access_token, refresh_token, SALESFORCE_INSTANCE_URL if not access_token: if auth_code: access_token, refresh_token, SALESFORCE_INSTANCE_URL = get_salesforce_access_token(auth_code) else: return jsonify({'error': 'Not authorized with Salesforce. Please visit /authorize_salesforce first.'}), 401 if not access_token: return jsonify({'error': 'Not authorized with Salesforce. Please visit /authorize_salesforce first.'}), 401 if not SALESFORCE_INSTANCE_URL: return jsonify({'error': 'Salesforce instance URL is missing.'}), 500 try: # 1. Fetch vendor logs from Salesforce (replace with your actual SOQL query) soql_query = "SELECT Id, Vendor__c, Work_Completion_Details__c, Delay_Reports__c, Incident_Logs__c, Log_Date__c FROM Vendor_Log__c" vendor_logs = query_salesforce(soql_query, access_token, SALESFORCE_INSTANCE_URL) if not vendor_logs: return jsonify({'error': 'Failed to retrieve vendor logs from Salesforce or no logs found.'}), 500 # 2. Process the vendor logs using the model scores_for_salesforce = process_vendor_logs(vendor_logs) # Call your model.py function if not scores_for_salesforce: return jsonify({'warning': 'No scores to send to Salesforce.'}), 200 # No error, but no data sent # 3. Send the scores to Salesforce all_updates_successful = True for score_data in scores_for_salesforce: # Map the score_data to the fields in your Subcontractor_Performance_Score__c object salesforce_data = { 'Vendor__c': score_data['vendor_id'], # You might need to look up the Salesforce ID 'Month_c': score_data['month'], 'Quality_Score__c': score_data['quality'], 'Timeliness_Score__c': score_data['timeliness'], 'Safety_Score__c': score_data['safety'], 'Communication_Score__c': score_data['communication'], 'Final_Score_c': score_data['final_score'], 'Alert_Flag_c': score_data['alert_flag'], 'Certification_URL_c': score_data['certificate_url'], #add this 'Trend_Deviation__c': score_data['trend_deviation'] } record_id = send_data_to_salesforce('Subcontractor_Performance_Score__c', salesforce_data, access_token, SALESFORCE_INSTANCE_URL) if not record_id: all_updates_successful = False print(f"Failed to send score for vendor {score_data['vendor_id']} to Salesforce.") if all_updates_successful: return jsonify({'message': 'Successfully processed vendor logs and sent scores to Salesforce.'}), 200 else: return jsonify({'error': 'Failed to send some scores to Salesforce. Check logs for details.'}), 500 except Exception as e: return jsonify({'error': f'An error occurred: {e}'}), 500 @app.route('/refresh_token') def refresh_token_route(): """ Endpoint to refresh the Salesforce access token. This would be called periodically (e.g., by a scheduled task) in a real application. """ global access_token, refresh_token, SALESFORCE_INSTANCE_URL if not refresh_token: return jsonify({'error': 'No refresh token available.'}), 400 new_access_token, new_refresh_token, SALESFORCE_INSTANCE_URL = refresh_salesforce_access_token(refresh_token) if new_access_token: access_token = new_access_token refresh_token = new_refresh_token # Update the refresh token, as it might change. return jsonify({'message': 'Access token refreshed successfully.'}), 200 else: return jsonify({'error': 'Failed to refresh access token.'}), 500 if __name__ == '__main__': # The port is automatically configured by Hugging Face Spaces port = int(os.environ.get("PORT", 8080)) app.run(host='0.0.0.0', port=port, debug=True)