Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| from simple_salesforce import Salesforce | |
| from contextlib import asynccontextmanager | |
| import requests | |
| from typing import Optional | |
| # Set up logging early | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| logger.info("Starting application initialization") | |
| # Load environment variables | |
| SF_USERNAME = os.getenv('SF_USERNAME') | |
| SF_PASSWORD = os.getenv('SF_PASSWORD') | |
| SF_SECURITY_TOKEN = os.getenv('SF_SECURITY_TOKEN') | |
| # Validate environment variables | |
| required_vars = { | |
| 'SF_USERNAME': SF_USERNAME, | |
| 'SF_PASSWORD': SF_PASSWORD, | |
| 'SF_SECURITY_TOKEN': SF_SECURITY_TOKEN | |
| } | |
| missing_vars = [var for var, value in required_vars.items() if not value] | |
| if missing_vars: | |
| logger.error(f"Missing required environment variables: {', '.join(missing_vars)}") | |
| raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") | |
| # Global Salesforce connection | |
| sf = None | |
| async def lifespan(app: FastAPI): | |
| """Manage application lifecycle.""" | |
| global sf | |
| logger.info("Starting application lifecycle") | |
| # Initialize Salesforce connection | |
| try: | |
| sf = Salesforce( | |
| username=SF_USERNAME, | |
| password=SF_PASSWORD, | |
| security_token=SF_SECURITY_TOKEN, | |
| instance_url='https://aicoachforsitesupervisors-dev-ed.develop.my.salesforce.com', | |
| version='63.0' | |
| ) | |
| logger.info("Successfully connected to Salesforce") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to Salesforce: {str(e)}") | |
| sf = None | |
| logger.info("Application initialized successfully") | |
| yield | |
| logger.info("Shutting down application") | |
| # Initialize FastAPI app with lifespan | |
| app = FastAPI(lifespan=lifespan) | |
| # OpenWeatherMap API key (hardcoded as it works and doesn't cause issues) | |
| WEATHER_API_KEY = "60c00e1b8293d3c0482f8d6ca1a37003" | |
| # Pydantic model for request body | |
| class ProjectRequest(BaseModel): | |
| projectId: str | |
| projectName: str | |
| milestones: Optional[str] = None | |
| weatherLogs: Optional[str] = None | |
| safetyLogs: Optional[str] = None | |
| role: str | |
| def fetch_weather_data(location: str) -> str: | |
| """Fetch weather data for a given location.""" | |
| try: | |
| url = f"http://api.openweathermap.org/data/2.5/weather?q={location}&appid={WEATHER_API_KEY}&units=metric" | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| data = response.json() | |
| weather = data['weather'][0]['description'] | |
| temp = data['main']['temp'] | |
| return f"{weather}, {temp}°C" | |
| except Exception as e: | |
| logger.error(f"Failed to fetch weather data: {str(e)}") | |
| return "Weather data unavailable" | |
| def generate_coaching_data(project: dict, role: str) -> dict: | |
| """Mock AI model to generate checklist, tips, and engagement score.""" | |
| logger.info(f"Generating coaching data for project {project.get('Name')}") | |
| checklist = f"1. Review safety protocols for {project.get('Project_Name__c', 'project')}\n2. Check {role} tasks\n3. Update milestones" | |
| tips = f"1. Prioritize safety due to {project.get('Weather_Logs__c', 'conditions')}\n2. Focus on {role} responsibilities\n3. Communicate progress" | |
| engagement_score = 85.0 # Mock score; replace with real AI model | |
| return { | |
| "checklist": checklist, | |
| "tips": tips, | |
| "engagementScore": engagement_score | |
| } | |
| async def get_latest_project(): | |
| """Fetch the latest Project__c record based on CreatedDate.""" | |
| if sf is None: | |
| logger.error("Salesforce connection not initialized") | |
| raise HTTPException(status_code=500, detail="Salesforce connection not initialized") | |
| try: | |
| query = """ | |
| SELECT Id, Name, Project_Name__c, Milestones__c, Weather_Logs__c, Safety_Logs__c, Location__c, Supervisor_ID__c | |
| FROM Project__c | |
| ORDER BY CreatedDate DESC | |
| LIMIT 1 | |
| """ | |
| logger.info(f"Executing SOQL query for latest project: {query}") | |
| result = sf.query(query) | |
| if result['totalSize'] == 0: | |
| logger.warning("No Project__c records found") | |
| raise HTTPException(status_code=404, detail="No projects found") | |
| project = result['records'][0] | |
| logger.info(f"Fetched latest project: {project['Name']}") | |
| # Prepare response with extracted fields | |
| response = { | |
| "projectId": project['Name'], | |
| "projectName": project['Project_Name__c'], | |
| "milestones": project.get('Milestones__c', ''), | |
| "weatherLogs": project.get('Weather_Logs__c', ''), | |
| "safetyLogs": project.get('Safety_Logs__c', ''), | |
| "role": "Site Manager" # Default role; can be updated based on Supervisor_ID__c if needed | |
| } | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error fetching latest project: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def generate_coaching(request: ProjectRequest): | |
| if sf is None: | |
| logger.error("Salesforce connection not initialized") | |
| raise HTTPException(status_code=500, detail="Salesforce connection not initialized") | |
| try: | |
| # Query Project__c by projectId to get the full record | |
| query = f"SELECT Id, Name, Project_Name__c, Milestones__c, Weather_Logs__c, Safety_Logs__c, Location__c, Supervisor_ID__c FROM Project__c WHERE Name = '{request.projectId}' LIMIT 1" | |
| logger.info(f"Executing SOQL query: {query}") | |
| result = sf.query(query) | |
| if result['totalSize'] == 0: | |
| logger.warning(f"Project {request.projectId} not found") | |
| raise HTTPException(status_code=404, detail="Project not found") | |
| project = result['records'][0] | |
| logger.info(f"Retrieved project: {project['Name']}") | |
| # Update weather logs if location available | |
| if project.get('Location__c'): | |
| project['Weather_Logs__c'] = fetch_weather_data(project['Location__c']) | |
| # Generate coaching data | |
| coaching_data = generate_coaching_data(project, request.role) | |
| # Prepare response with extracted fields and generated results | |
| response = { | |
| "projectId": project['Name'], | |
| "projectName": project['Project_Name__c'], | |
| "milestones": project.get('Milestones__c', ''), | |
| "weatherLogs": project.get('Weather_Logs__c', ''), | |
| "safetyLogs": project.get('Safety_Logs__c', ''), | |
| "role": request.role, | |
| "checklist": coaching_data['checklist'], | |
| "tips": coaching_data['tips'], | |
| "engagementScore": coaching_data['engagementScore'] | |
| } | |
| # Insert into Supervisor_AI_Coaching__c | |
| try: | |
| coaching_record = { | |
| 'Project_ID__c': project['Id'], # Updated to correct API name for Lookup field | |
| 'Checklist__c': coaching_data['checklist'], | |
| 'Tips__c': coaching_data['tips'], | |
| 'Engagement_Score__c': coaching_data['engagementScore'], | |
| 'Role__c': request.role, | |
| 'Project_Name__c': project['Project_Name__c'], | |
| 'Milestones__c': project.get('Milestones__c', ''), | |
| 'Weather_Logs__c': project.get('Weather_Logs__c', ''), | |
| 'Safety_Logs__c': project.get('Safety_Logs__c', '') | |
| } | |
| sf.Supervisor_AI_Coaching__c.create(coaching_record) | |
| logger.info(f"Inserted coaching data into Supervisor_AI_Coaching__c for project {project['Name']}") | |
| except Exception as e: | |
| logger.error(f"Failed to insert into Supervisor_AI_Coaching__c: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to save coaching data: {str(e)}") | |
| logger.info(f"Generated coaching response: {response}") | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error generating coaching data: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def root(): | |
| logger.info("Serving index.html from root directory") | |
| return FileResponse("index.html") | |
| async def serve_styles(): | |
| logger.info("Serving styles.css from root directory") | |
| return FileResponse("styles.css", media_type="text/css") | |
| # Start Uvicorn server for Hugging Face Spaces | |
| logger.info("Starting Uvicorn server for Hugging Face Spaces") | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |