import os import gradio as gr import pandas as pd import numpy as np from datetime import datetime from simple_salesforce import Salesforce from dotenv import load_dotenv import plotly.express as px import plotly.graph_objects as go import io import base64 from matplotlib.backends.backend_pdf import PdfPages import matplotlib.pyplot as plt # Load environment variables load_dotenv() # Salesforce credentials SF_USERNAME = os.getenv('SF_USERNAME') SF_PASSWORD = os.getenv('SF_PASSWORD') SF_SECURITY_TOKEN = os.getenv('SF_SECURITY_TOKEN') # Connect to Salesforce try: sf = Salesforce( username=SF_USERNAME, password=SF_PASSWORD, security_token=SF_SECURITY_TOKEN ) except Exception as e: sf = None print(f"Error connecting to Salesforce: {str(e)}") # Weighted moving average forecast with heuristic shortage probability def weighted_moving_average_forecast(df, trade, site_calendar_date): df['Date'] = pd.to_datetime(df['Date'], format='%Y-%m-%d', errors='coerce').dt.date trade_df = df[df['Trade'] == trade].copy() if trade_df.empty: return [], 0.5, None, f"No data found for trade: {trade}" # Parse site calendar date try: site_calendar_date = pd.to_datetime(site_calendar_date, format='%Y-%m-%d').date() is_weekday = site_calendar_date.weekday() < 5 site_calendar = 1 if is_weekday else 0 except ValueError: return [], 0.5, None, f"Invalid site calendar date: {site_calendar_date}" # Check for data on the next 3 days future_dates = pd.date_range(site_calendar_date, periods=4, freq='D')[1:] predictions = [] shortage_prob = 0.5 # Default shortage probability # Filter data up to and including site_calendar_date for historical context trade_df = trade_df[trade_df['Date'] <= site_calendar_date] recent_data = trade_df.tail(30)[['Date', 'Attendance', 'Weather', 'Shortage_risk']] if recent_data.empty: return [], 0.5, None, f"No data available for trade {trade} on or before {site_calendar_date}" # Check if future dates exist in CSV for date in future_dates: date = date.date() # Normalize to date-only future_data = df[(df['Trade'] == trade) & (df['Date'] == date)] if not future_data.empty: # Use CSV data if available record = future_data.iloc[0] headcount = int(record['Attendance']) if pd.notna(record['Attendance']) else 0 shortage_prob = record['Shortage_risk'] if pd.notna(record['Shortage_risk']) else 0.5 predictions.append({ "date": date.strftime('%Y-%m-%d'), "headcount": headcount }) else: # Fallback to weighted moving average recent_attendance = recent_data['Attendance'].values num_days = len(recent_attendance) if num_days >= 3: weights = np.array([0.5, 0.3, 0.2]) recent_attendance = recent_attendance[-3:] elif num_days == 2: weights = np.array([0.6, 0.4]) recent_attendance = recent_attendance[-2:] else: weights = np.array([1.0]) recent_attendance = recent_attendance[-1:] forecast_value = np.average(recent_attendance, weights=weights) latest_weather = recent_data['Weather'].map({'Sunny': 0, 'Rainy': 1, 'Cloudy': 0.5, np.nan: 0.5}).iloc[-1] forecast_value *= (1 - 0.1 * latest_weather) headcount = round(forecast_value * (1 if site_calendar == 1 else 0.8)) predictions.append({ "date": date.strftime('%Y-%m-%d'), "headcount": headcount }) # Use historical shortage risk for future dates if no CSV data shortage_prob = recent_data['Shortage_risk'].tail(30).mean() attendance_trend = recent_data['Attendance'].pct_change().mean() if num_days > 1 else 0 shortage_prob = min(max(shortage_prob + attendance_trend * 0.1, 0), 1) site_calendar_value = site_calendar_date.strftime('%Y-%m-%d') + f" ({'Weekday' if is_weekday else 'Weekend'})" return predictions, shortage_prob, site_calendar_value, None # Fetch Project ID from Salesforce def get_project_id(): if not sf: return None, "Salesforce connection failed." try: query = "SELECT Id FROM Project__c ORDER BY CreatedDate DESC LIMIT 1" result = sf.query(query) if result['totalSize'] > 0: return result['records'][0]['Id'], None return None, "No project found in Salesforce." except Exception as e: return None, f"Error fetching Project ID: {str(e)}" # Save to Salesforce def save_to_salesforce(record): if not sf: return {"error": "Salesforce connection failed."} try: result = sf.Labour_Attendance_Forecast__c.create(record) return {"success": f"Record created for {record['Trade__c']}", "record_id": result['id']} except Exception as e: return {"error": f"Error uploading to Salesforce for {record['Trade__c']}: {str(e)}"} # Create heatmap for shortfall risk def create_heatmap(df, predictions_dict, shortage_probs, site_calendar_date): heatmap_data = [] site_calendar_date = pd.to_datetime(site_calendar_date, format='%Y-%m-%d').date() future_dates = pd.date_range(site_calendar_date, periods=4, freq='D')[1:] for trade in predictions_dict.keys(): # Get shortage risk for the specified date from CSV trade_df = df[(df['Trade'] == trade) & (df['Date'] == site_calendar_date)] if not trade_df.empty: prob = trade_df.iloc[0]['Shortage_risk'] if pd.notna(trade_df.iloc[0]['Shortage_risk']) else 0.5 heatmap_data.append({ 'Date': site_calendar_date.strftime('%Y-%m-%d'), 'Trade': trade, 'Shortage_Probability': prob }) # Get shortage probabilities for future dates for date in future_dates: date = date.date() future_data = df[(df['Trade'] == trade) & (df['Date'] == date)] if not future_data.empty: prob = future_data.iloc[0]['Shortage_risk'] if pd.notna(future_data.iloc[0]['Shortage_risk']) else 0.5 else: prob = shortage_probs.get(trade, 0.5) heatmap_data.append({ 'Date': date.strftime('%Y-%m-%d'), 'Trade': trade, 'Shortage_Probability': prob }) heatmap_df = pd.DataFrame(heatmap_data) if heatmap_df.empty: return go.Figure().update_layout(title="Shortage Risk Heatmap (No Data)") # Create heatmap with improved styling fig = go.Figure(data=go.Heatmap( x=heatmap_df['Date'], y=heatmap_df['Trade'], z=heatmap_df['Shortage_Probability'], colorscale='Blues', zmin=0, zmax=1, text=heatmap_df['Shortage_Probability'].round(2), texttemplate="%{text}", textfont={"size": 12}, colorbar=dict(title="Shortage Risk", tickvals=[0, 0.5, 1], ticktext=["0%", "50%", "100%"]) )) fig.update_layout( title="Shortage Risk Heatmap", xaxis_title="Date", yaxis_title="Trade", xaxis=dict(tickangle=45, tickformat="%Y-%m-%d"), yaxis=dict(autorange="reversed"), font=dict(size=14), margin=dict(l=100, r=50, t=100, b=100), plot_bgcolor="white", paper_bgcolor="white", showlegend=False, grid=dict(rows=1, columns=1) ) fig.update_xaxes(showgrid=True, gridcolor="lightgray") fig.update_yaxes(showgrid=True, gridcolor="lightgray") return fig # Create line chart for forecasts def create_chart(df, predictions_dict): combined_df = pd.DataFrame() for trade, predictions in predictions_dict.items(): trade_df = df[df['Trade'] == trade].copy() if trade_df.empty: continue trade_df['Type'] = 'Historical' trade_df['Trade'] = trade forecast_df = pd.DataFrame(predictions) if forecast_df.empty: continue forecast_df['Date'] = pd.to_datetime(forecast_df['date'], format='%Y-%m-%d').dt.date forecast_df['Attendance'] = forecast_df['headcount'] forecast_df['Type'] = 'Forecast' forecast_df['Trade'] = trade combined_df = pd.concat([ combined_df, trade_df[['Date', 'Attendance', 'Type', 'Trade']], forecast_df[['Date', 'Attendance', 'Type', 'Trade']] ]) if combined_df.empty: return go.Figure().update_layout(title="Labour Attendance Forecast (No Data)") fig = px.line( combined_df, x='Date', y='Attendance', color='Trade', line_dash='Type', markers=True, title='Labour Attendance Forecast by Trade' ) return fig # Generate PDF summary def generate_pdf_summary(trade_results, project_id): buffer = io.BytesIO() with PdfPages(buffer) as pdf: fig, ax = plt.subplots(figsize=(10, 6)) if not trade_results: ax.text(0.1, 0.5, "No data available for summary", fontsize=12) else: for i, (trade, data) in enumerate(trade_results.items()): ax.text(0.1, 0.9 - 0.1*i, f"{trade}: {data['Attendance']} (Actual)", fontsize=12) ax.set_title(f"Weekly Summary for Project {project_id}") ax.axis('off') pdf.savefig() plt.close() pdf_base64 = base64.b64encode(buffer.getvalue()).decode() return pdf_base64 # Notify contractor (mock) def notify_contractor(trade, alert_status): return f"Notification sent to contractor for {trade} with alert status: {alert_status}" # Format output to display CSV file values and Forecast_Next_3_Days__c def format_output(trade_results, site_calendar_date): csv_columns = ['Date', 'Trade', 'Weather', 'Alert_status', 'Shortage_risk', 'Suggested_actions', 'Attendance', 'Forecast_Next_3_Days__c'] output = [] for trade, data in trade_results.items(): output.append(f"Trade: {trade}") for key in csv_columns: if key == 'Date': value = pd.to_datetime(site_calendar_date, format='%Y-%m-%d').strftime('%Y-%m-%d') if pd.notna(site_calendar_date) else 'N/A' elif key == 'Forecast_Next_3_Days__c': value = ', '.join([f"{item['date']}: {item['headcount']}" for item in data.get(key, [])]) if data.get(key) else 'N/A' else: value = data.get(key, 'N/A') if key in ['Weather', 'Alert_status', 'Suggested_actions', 'Trade'] and value is not None: value = str(value) elif key == 'Shortage_risk' and value is not None: value = str(round(value, 2)) elif key == 'Attendance' and value is not None: value = str(int(value)) output.append(f" • {key}: {value}") output.append("") return "\n".join(output) if trade_results else "No valid trade data available." # Gradio forecast function def forecast_labour(csv_file, trade_filter=None, site_calendar_date=None): try: encodings = ['utf-8', 'latin1', 'iso-8859-1', 'utf-16'] df = None for encoding in encodings: try: df = pd.read_csv(csv_file.name, encoding=encoding, dtype_backend='numpy_nullable') break except UnicodeDecodeError: continue if df is None: return "Error: Could not decode CSV file.", None, None, None, None df.columns = df.columns.str.strip().str.capitalize() required_columns = ['Date', 'Attendance', 'Trade', 'Weather', 'Alert_status', 'Shortage_risk', 'Suggested_actions'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: return f"Error: CSV missing columns: {', '.join(missing_columns)}", None, None, None, None # Parse dates with explicit format df['Date'] = pd.to_datetime(df['Date'], format='%Y-%m-%d', errors='coerce').dt.date if df['Date'].isna().all(): return "Error: All dates in CSV are invalid.", None, None, None, None df['Attendance'] = pd.to_numeric(df['Attendance'], errors='coerce').fillna(0).astype('Int64') df['Shortage_risk'] = df['Shortage_risk'].replace('%', '', regex=True) df['Shortage_risk'] = pd.to_numeric(df['Shortage_risk'], errors='coerce').fillna(0.5) / 100 df['Weather'] = df['Weather'].astype(str).replace('nan', 'N/A') df['Alert_status'] = df['Alert_status'].astype(str).replace('nan', 'N/A') df['Suggested_actions'] = df['Suggested_actions'].astype(str).replace('nan', 'N/A') df['Trade'] = df['Trade'].astype(str).replace('nan', 'N/A') unique_trades = df['Trade'].dropna().unique() if trade_filter: selected_trades = [t.strip() for t in trade_filter.split(',') if t.strip()] selected_trades = [t for t in selected_trades if t in unique_trades] if not selected_trades: return f"Error: None of the specified trades '{trade_filter}' found in CSV.", None, None, None, None else: selected_trades = unique_trades trade_results = {} predictions_dict = {} shortage_probs = {} errors = [] project_id, error = get_project_id() if error: return f"Error: {error}", None, None, None, None # Parse site_calendar_date with explicit format try: site_calendar_date = pd.to_datetime(site_calendar_date, format='%Y-%m-%d', errors='coerce').date() if pd.isna(site_calendar_date): raise ValueError(f"Invalid site calendar date: {site_calendar_date}") except ValueError as e: errors.append(str(e)) return f"Error: {e}", None, None, None, None for trade in selected_trades: trade_df = df[df['Trade'] == trade].copy() if trade_df.empty: errors.append(f"No data for trade: {trade}") continue # Debug: Print trade_df to verify data print(f"Trade: {trade}, Data for {site_calendar_date}:") print(trade_df[trade_df['Date'] == site_calendar_date]) date_match = trade_df[trade_df['Date'] == site_calendar_date] if date_match.empty: errors.append(f"No data found for trade {trade} on {site_calendar_date}") continue if len(date_match) > 1: errors.append(f"Warning: Multiple rows found for trade {trade} on {site_calendar_date}. Using first row.") predictions, shortage_prob, site_calendar, forecast_error = weighted_moving_average_forecast(trade_df, trade, site_calendar_date) if forecast_error: errors.append(forecast_error) continue predictions_dict[trade] = predictions shortage_probs[trade] = shortage_prob record = date_match.iloc[0] result_data = { 'Date': site_calendar_date, 'Trade': record['Trade'], 'Weather': record['Weather'], 'Alert_status': record['Alert_status'], 'Shortage_risk': record['Shortage_risk'], 'Suggested_actions': record['Suggested_actions'], 'Attendance': record['Attendance'], 'Forecast': predictions, 'Shortage_Probability': round(shortage_prob, 2), 'Forecast_Next_3_Days__c': predictions, 'Project__c': project_id } salesforce_record = { 'Trade__c': trade, 'Shortage_Risk__c': record['Shortage_risk'], 'Suggested_Actions__c': record['Suggested_actions'], 'Expected_Headcount__c': predictions[0]['headcount'] if predictions else 0, 'Actual_Headcount__c': int(record['Attendance']) if pd.notna(record['Attendance']) else 0, 'Forecast_Next_3_Days__c': str(predictions), 'Project_ID__c': project_id, 'Alert_Status__c': record['Alert_status'], 'Dashboard_Display__c': True, 'Date__c': pd.Timestamp(site_calendar_date).isoformat() } sf_result = save_to_salesforce(salesforce_record) result_data.update(sf_result) trade_results[trade] = result_data if not trade_results: error_msg = "No valid trade data processed for the specified date." if errors: error_msg += " Errors: " + "; ".join(errors) return error_msg, None, None, None, None line_chart = create_chart(df, predictions_dict) heatmap = create_heatmap(df, predictions_dict, shortage_probs, site_calendar_date) pdf_summary = generate_pdf_summary(trade_results, project_id) notification_trade = selected_trades[0] notification = notify_contractor(notification_trade, trade_results[notification_trade]['Alert_status']) error_msg = "; ".join(errors) if errors else None return ( format_output(trade_results, site_calendar_date) + (f"\nWarnings: {error_msg}" if error_msg else ""), line_chart, heatmap, f'Download Summary PDF', notification ) except Exception as e: return f"Error processing file: {str(e)}", None, None, None, None # Gradio UI def gradio_interface(): with gr.Blocks(theme=gr.themes.Soft()) as interface: gr.Markdown("# Labour Attendance Forecast") gr.Markdown("Upload a CSV with columns: Date, Attendance, Trade, Weather, Alert_Status, Shortage_Risk (e.g. 22%), Suggested_Actions.") gr.Markdown("Enter trade names (e.g., 'Painter, Electrician') separated by commas, or leave blank to process all trades.") gr.Markdown("Enter a specific date for the site calendar (YYYY-MM-DD) to display CSV data for that date and forecast the next 3 days.") with gr.Row(): csv_input = gr.File(label="Upload CSV") trade_input = gr.Textbox(label="Filter by Trades (e.g., Painter, Electrician)", placeholder="Enter trade names separated by commas or leave blank for all trades") site_calendar_input = gr.Textbox(label="Site Calendar Date (YYYY-MM-DD)", placeholder="e.g., 2025-05-24") forecast_button = gr.Button("Generate Forecast") result_output = gr.Textbox(label="Forecast Result", lines=20) line_chart_output = gr.Plot(label="Forecast Trendline") heatmap_output = gr.Plot(label="Shortage Risk Heatmap") pdf_output = gr.HTML(label="Download Summary PDF") notification_output = gr.Textbox(label="Contractor Notification") forecast_button.click( fn=forecast_labour, inputs=[csv_input, trade_input, site_calendar_input], outputs=[result_output, line_chart_output, heatmap_output, pdf_output, notification_output] ) interface.launch(share=False) if __name__ == '__main__': gradio_interface()