|
|
import logging |
|
|
from typing import Dict, List |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import numpy as np |
|
|
import pickle |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
from datetime import datetime, timedelta |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class DelayPredictor(nn.Module): |
|
|
def __init__(self, input_size, hidden_size, num_layers): |
|
|
super(DelayPredictor, self).__init__() |
|
|
self.hidden_size = hidden_size |
|
|
self.num_layers = num_layers |
|
|
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) |
|
|
self.attention = nn.Linear(hidden_size, 1) |
|
|
self.fc = nn.Linear(hidden_size, 1) |
|
|
self.sigmoid = nn.Sigmoid() |
|
|
|
|
|
def forward(self, x): |
|
|
lstm_out, _ = self.lstm(x) |
|
|
attn_weights = torch.softmax(self.attention(lstm_out).squeeze(-1), dim=1) |
|
|
context = torch.bmm(attn_weights.unsqueeze(1), lstm_out).squeeze(1) |
|
|
out = self.fc(context) |
|
|
return self.sigmoid(out) * 100 |
|
|
|
|
|
|
|
|
try: |
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
model = DelayPredictor(input_size=7, hidden_size=64, num_layers=2).to(device) |
|
|
model.load_state_dict(torch.load("models/delay_model.pth", map_location=device)) |
|
|
model.eval() |
|
|
with open("models/scaler.pkl", "rb") as f: |
|
|
scaler = pickle.load(f) |
|
|
logger.info("LSTM model and scaler loaded successfully") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load model or scaler: {str(e)}") |
|
|
model = None |
|
|
scaler = None |
|
|
|
|
|
def get_weather_condition(score: int) -> str: |
|
|
"""Map weather impact score (0-100) to descriptive weather condition.""" |
|
|
if score <= 10: |
|
|
return "Sunny" |
|
|
elif score <= 30: |
|
|
return "Partly Cloudy" |
|
|
elif score <= 50: |
|
|
return "Cloudy" |
|
|
elif score <= 70: |
|
|
return "Light Rain" |
|
|
elif score <= 85: |
|
|
return "Heavy Rain" |
|
|
else: |
|
|
return "Severe Storm" |
|
|
|
|
|
def call_ai_model_for_insights(input_data: Dict, delay_risk: float) -> List[str]: |
|
|
""" |
|
|
Generate detailed hardcoded insights based on input data and delay risk. |
|
|
Includes a 2-week risk alert if weather_forecast_date is within 14 days. |
|
|
Returns 3-5 prioritized, phase/task-specific insights. |
|
|
""" |
|
|
logger.info("Generating detailed hardcoded AI insights") |
|
|
phase = input_data.get("phase", "") |
|
|
task = input_data.get("task", "") |
|
|
current_progress = input_data.get("current_progress", 0) |
|
|
expected_duration = input_data.get("task_expected_duration", 0) |
|
|
actual_duration = input_data.get("task_actual_duration", 0) |
|
|
workforce_gap = input_data.get("workforce_gap", 0) |
|
|
skill_level = input_data.get("workforce_skill_level", "").lower() |
|
|
shift_hours = input_data.get("workforce_shift_hours", 0) |
|
|
weather_score = input_data.get("weather_impact_score", 0) |
|
|
weather_condition = input_data.get("weather_condition", get_weather_condition(weather_score)) |
|
|
project_location = input_data.get("project_location", "Unknown") |
|
|
weather_forecast_date = input_data.get("weather_forecast_date", "") |
|
|
|
|
|
|
|
|
insights = [] |
|
|
|
|
|
|
|
|
def add_insight(message: str, priority: float): |
|
|
insights.append((message, priority)) |
|
|
|
|
|
|
|
|
try: |
|
|
forecast_date = datetime.strptime(weather_forecast_date, "%Y-%m-%d") |
|
|
current_date = datetime(2025, 5, 26) |
|
|
two_weeks_later = current_date + timedelta(days=14) |
|
|
if current_date <= forecast_date <= two_weeks_later: |
|
|
if delay_risk > 75 or weather_score > 75: |
|
|
add_insight( |
|
|
f"⚠️ Critical 2-Week Risk Alert: High risk of delay for {phase}: {task} in {project_location} by {weather_forecast_date} due to {'severe weather' if weather_score > 75 else 'high delay probability'}. Implement contingency plans immediately.", |
|
|
1.2 |
|
|
) |
|
|
elif delay_risk > 50 or weather_score > 50: |
|
|
add_insight( |
|
|
f"⚠️ 2-Week Risk Alert: Moderate risk of delay for {phase}: {task} in {project_location} by {weather_forecast_date}. Monitor closely and prepare mitigation measures.", |
|
|
1.1 |
|
|
) |
|
|
except ValueError: |
|
|
logger.warning("Invalid weather_forecast_date format; skipping 2-week risk alert") |
|
|
|
|
|
|
|
|
if delay_risk > 75: |
|
|
add_insight(f"Urgent: High delay risk detected for {phase}: {task} in {project_location}. Take immediate action.", 1.0) |
|
|
elif delay_risk > 50: |
|
|
add_insight(f"Monitor {phase}: {task} closely in {project_location} to prevent delays.", 0.9) |
|
|
elif delay_risk > 25: |
|
|
add_insight(f"Maintain steady progress for {phase}: {task} in {project_location}.", 0.7) |
|
|
else: |
|
|
add_insight(f"Optimize resources for {phase}: {task} in {project_location} to maintain schedule.", 0.6) |
|
|
|
|
|
|
|
|
if weather_score > 85: |
|
|
add_insight(f"Critical: Severe storm forecast in {project_location} for {phase}: {task}. Consider halting outdoor activities.", 1.1) |
|
|
elif weather_score > 70: |
|
|
add_insight(f"Reschedule outdoor tasks for {phase}: {task} in {project_location} due to heavy rain forecast.", 1.0) |
|
|
elif weather_score > 50: |
|
|
add_insight(f"Shift to indoor or weather-resistant tasks for {phase}: {task} in {project_location} due to light rain.", 0.9) |
|
|
elif weather_score > 30: |
|
|
add_insight(f"Monitor cloudy conditions in {project_location} for {phase}: {task} to avoid unexpected delays.", 0.7) |
|
|
else: |
|
|
add_insight(f"Proceed with {phase}: {task} in {project_location} under favorable weather conditions.", 0.6) |
|
|
|
|
|
|
|
|
task_specific = { |
|
|
"Planning": { |
|
|
"Define Scope": f"Ensure stakeholder alignment for Planning: Define Scope in {project_location}, considering weather impacts.", |
|
|
"Resource Allocation": f"Secure budget and resources early for Planning: Resource Allocation in {project_location}.", |
|
|
"Permit Acquisition": f"Expedite permits for Planning: Permit Acquisition in {project_location} to avoid weather-related delays." |
|
|
}, |
|
|
"Design": { |
|
|
"Architectural Drafting": f"Engage architects early for Design: Architectural Drafting in {project_location}, accounting for weather.", |
|
|
"Engineering Analysis": f"Hire additional engineers for Design: Engineering Analysis in {project_location} to meet deadlines.", |
|
|
"Design Review": f"Schedule thorough reviews for Design: Design Review in {project_location}, considering forecast." |
|
|
}, |
|
|
"Construction": { |
|
|
"Foundation Work": f"Optimize material delivery for Construction: Foundation Work in {project_location}, avoiding {weather_condition.lower()}.", |
|
|
"Structural Build": f"Ensure equipment availability for Construction: Structural Build in {project_location} under {weather_condition.lower()}.", |
|
|
"Utility Installation": f"Coordinate subcontractors for Construction: Utility Installation in {project_location}, monitoring weather." |
|
|
} |
|
|
} |
|
|
if phase in task_specific and task in task_specific[phase]: |
|
|
add_insight(task_specific[phase][task], 0.8) |
|
|
|
|
|
|
|
|
if workforce_gap > 30: |
|
|
add_insight(f"Urgently hire subcontractors in {project_location} to address {workforce_gap}% workforce shortage.", 1.0) |
|
|
elif workforce_gap > 15: |
|
|
add_insight(f"Recruit additional workers in {project_location} to reduce {workforce_gap}% workforce gap.", 0.9) |
|
|
elif workforce_gap > 5: |
|
|
add_insight(f"Consider temporary staff in {project_location} to address minor workforce gap.", 0.7) |
|
|
|
|
|
if skill_level == "low": |
|
|
add_insight(f"Provide training in {project_location} to improve low skill levels for {phase}: {task}.", 0.9) |
|
|
elif skill_level == "medium" and delay_risk > 50: |
|
|
add_insight(f"Upskill workforce in {project_location} for efficiency in {phase}: {task}.", 0.8) |
|
|
elif skill_level == "high" and delay_risk < 25: |
|
|
add_insight(f"Leverage high skill levels in {project_location} to maintain {phase}: {task} progress.", 0.6) |
|
|
|
|
|
if shift_hours < 6: |
|
|
add_insight(f"Extend shift hours beyond {shift_hours} in {project_location} to meet {phase}: {task} deadlines.", 0.9) |
|
|
elif shift_hours < 8 and delay_risk > 50: |
|
|
add_insight(f"Increase shift hours to 8 in {project_location} for {phase}: {task}.", 0.8) |
|
|
elif shift_hours > 10: |
|
|
add_insight(f"Balance shifts in {project_location} to prevent burnout during {phase}: {task}.", 0.7) |
|
|
|
|
|
|
|
|
if expected_duration > 0 and actual_duration > expected_duration: |
|
|
overrun_pct = ((actual_duration - expected_duration) / expected_duration) * 100 |
|
|
if overrun_pct > 20: |
|
|
add_insight(f"Address significant duration overrun ({overrun_pct:.1f}%) for {phase}: {task} in {project_location}.", 1.0) |
|
|
elif overrun_pct > 10: |
|
|
add_insight(f"Review scheduling to address {overrun_pct:.1f}% overrun in {phase}: {task} in {project_location}.", 0.8) |
|
|
|
|
|
if expected_duration > 0: |
|
|
expected_progress = min((actual_duration / expected_duration) * 100, 100) |
|
|
if current_progress < expected_progress * 0.8: |
|
|
add_insight(f"Accelerate task progress for {phase}: {task} in {project_location} to align with schedule.", 0.9) |
|
|
elif current_progress < 50 and delay_risk > 50: |
|
|
add_insight(f"Increase resources to boost {current_progress}% progress in {phase}: {task} in {project_location}.", 0.8) |
|
|
|
|
|
|
|
|
if workforce_gap >= 90: |
|
|
add_insight(f"Critical: Halt non-essential tasks in {project_location} until workforce gap for {phase}: {task} is resolved.", 1.1) |
|
|
if current_progress == 0 and delay_risk > 50: |
|
|
add_insight(f"Initiate {phase}: {task} in {project_location} immediately to avoid further delays.", 1.0) |
|
|
if expected_duration == 0 or actual_duration == 0: |
|
|
add_insight(f"Provide accurate duration estimates for {phase}: {task} in {project_location} for reliable predictions.", 0.7) |
|
|
if weather_score > 50 and phase == "Construction": |
|
|
add_insight(f"Prepare contingency plans for {phase}: {task} in {project_location} due to adverse weather forecast.", 0.95) |
|
|
|
|
|
|
|
|
insights.sort(key=lambda x: x[1], reverse=True) |
|
|
selected_insights = [insight[0] for insight in insights[:5]] |
|
|
|
|
|
logger.info(f"Generated insights: {selected_insights}") |
|
|
return selected_insights or [f"No significant delay factors detected for {phase}: {task} in {project_location}."] |
|
|
|
|
|
def predict_delay(input_data: Dict) -> Dict: |
|
|
""" |
|
|
Predict delay probability using LSTM model. |
|
|
Inputs: Project task data (phase, progress, duration, workforce, weather). |
|
|
Outputs: Delay probability, AI insights, high-risk phases, weather condition. |
|
|
""" |
|
|
logger.info("Starting LSTM delay prediction") |
|
|
if model is None or scaler is None: |
|
|
logger.error("Model or scaler not loaded; falling back to default") |
|
|
return { |
|
|
"project": input_data.get("project_name", "Unnamed Project"), |
|
|
"phase": input_data.get("phase", ""), |
|
|
"task": input_data.get("task", ""), |
|
|
"delay_probability": 50.0, |
|
|
"ai_insights": "Model unavailable; please check deployment.", |
|
|
"high_risk_phases": [], |
|
|
"weather_condition": "Unknown" |
|
|
} |
|
|
|
|
|
phase = input_data.get("phase", "") |
|
|
task = input_data.get("task", "") |
|
|
weather_condition = input_data.get("weather_condition", get_weather_condition(input_data.get("weather_impact_score", 0))) |
|
|
|
|
|
|
|
|
phase_mapping = {"Planning": 0, "Design": 1, "Construction": 2} |
|
|
skill_mapping = {"Low": 0, "Medium": 1, "High": 2} |
|
|
try: |
|
|
features = np.array([[ |
|
|
input_data.get("current_progress", 0), |
|
|
input_data.get("task_expected_duration", 0), |
|
|
input_data.get("task_actual_duration", 0), |
|
|
input_data.get("workforce_gap", 0), |
|
|
input_data.get("weather_impact_score", 0), |
|
|
skill_mapping.get(input_data.get("workforce_skill_level", "Medium"), 1), |
|
|
phase_mapping.get(phase, 0) |
|
|
]]) |
|
|
except KeyError as e: |
|
|
logger.error(f"Invalid input data: {str(e)}") |
|
|
return { |
|
|
"project": input_data.get("project_name", "Unnamed Project"), |
|
|
"phase": phase, |
|
|
"task": task, |
|
|
"delay_probability": 50.0, |
|
|
"ai_insights": f"Invalid input: {str(e)}", |
|
|
"high_risk_phases": [], |
|
|
"weather_condition": weather_condition |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
features_scaled = scaler.transform(features) |
|
|
features_tensor = torch.tensor(features_scaled.reshape(1, 1, -1), dtype=torch.float32).to(device) |
|
|
except Exception as e: |
|
|
logger.error(f"Feature preprocessing failed: {str(e)}") |
|
|
return { |
|
|
"project": input_data.get("project_name", "Unnamed Project"), |
|
|
"phase": phase, |
|
|
"task": task, |
|
|
"delay_probability": 50.0, |
|
|
"ai_insights": f"Preprocessing error: {str(e)}", |
|
|
"high_risk_phases": [], |
|
|
"weather_condition": weather_condition |
|
|
} |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
delay_risk = model(features_tensor).cpu().numpy().item() |
|
|
delay_risk = round(max(0, min(delay_risk, 100)), 1) |
|
|
|
|
|
|
|
|
task_options = { |
|
|
"Planning": ["Define Scope", "Resource Allocation", "Permit Acquisition"], |
|
|
"Design": ["Architectural Drafting", "Engineering Analysis", "Design Review"], |
|
|
"Construction": ["Foundation Work", "Structural Build", "Utility Installation"] |
|
|
} |
|
|
high_risk_phases = [] |
|
|
if phase in task_options: |
|
|
for t in task_options[phase]: |
|
|
task_risk = delay_risk |
|
|
if t != task: |
|
|
task_risk = min(max(task_risk + (hash(t) % 10 - 5), 0), 100) |
|
|
high_risk_phases.append({ |
|
|
"phase": phase, |
|
|
"task": t, |
|
|
"risk": round(task_risk, 1) |
|
|
}) |
|
|
|
|
|
|
|
|
insights = call_ai_model_for_insights(input_data, delay_risk) |
|
|
|
|
|
logger.info(f"Prediction completed: Delay risk = {delay_risk:.1f}%") |
|
|
return { |
|
|
"project": input_data.get("project_name", "Unnamed Project"), |
|
|
"phase": phase, |
|
|
"task": task, |
|
|
"delay_probability": delay_risk, |
|
|
"ai_insights": "; ".join(insights) if insights else "No significant delay factors detected.", |
|
|
"high_risk_phases": high_risk_phases, |
|
|
"weather_condition": weather_condition |
|
|
} |