Spaces:
Sleeping
Sleeping
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
| import torch | |
| import logging | |
| from typing import Dict, List | |
| import time | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| def get_weather_condition(score: int) -> str: | |
| """Map weather impact score (0-100) to descriptive weather condition.""" | |
| if score <= 10: | |
| return "Sunny" | |
| elif score <= 30: | |
| return "Partly Cloudy" | |
| elif score <= 50: | |
| return "Cloudy" | |
| elif score <= 70: | |
| return "Light Rain" | |
| elif score <= 85: | |
| return "Heavy Rain" | |
| else: | |
| return "Severe Storm" | |
| def call_ai_model_for_insights(input_data: Dict, delay_risk: float) -> List[str]: | |
| """ | |
| Use T5-Small in Hugging Face Space (CPU) to generate insights based on input data and delay risk. | |
| """ | |
| model_name = "t5-small" | |
| max_retries = 3 | |
| retry_delay = 5 # seconds | |
| for attempt in range(max_retries): | |
| try: | |
| logger.info(f"Attempt {attempt + 1}/{max_retries} - Loading model: {model_name}") | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=False, use_fast=True) | |
| model = AutoModelForSeq2SeqLM.from_pretrained( | |
| model_name, | |
| torch_dtype=torch.float32, | |
| use_safetensors=True, | |
| trust_remote_code=False, | |
| low_cpu_mem_usage=True | |
| ) | |
| logger.info("Model loaded successfully. Generating insights...") | |
| prompt = f""" | |
| Summarize the following project delay risk data into 2-4 concise insights or mitigation strategies as a list: | |
| Project: {input_data.get('project_name', 'Unnamed Project')} | |
| Phase: {input_data.get('phase', '')} | |
| Task: {input_data.get('task', '')} | |
| Expected Duration: {input_data.get('task_expected_duration', 0)} days | |
| Actual Duration: {input_data.get('task_actual_duration', 0)} days | |
| Current Progress: {input_data.get('current_progress', 0)}% | |
| Workforce Gap: {input_data.get('workforce_gap', 0)}% | |
| Workforce Skill Level: {input_data.get('workforce_skill_level', '').lower()} | |
| Shift Hours: {input_data.get('workforce_shift_hours', 0)} hours | |
| Weather Impact Score: {input_data.get('weather_impact_score', 0)} (Condition: {get_weather_condition(input_data.get('weather_impact_score', 0))}) | |
| Calculated Delay Risk: {delay_risk:.1f}% | |
| Format the response as a list, e.g., ["Insight 1", "Insight 2"]. | |
| """ | |
| with torch.no_grad(): | |
| inputs = tokenizer(prompt, return_tensors="pt", max_length=512, truncation=True).to("cpu") | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=100, | |
| num_beams=4, | |
| temperature=0.7, | |
| do_sample=True | |
| ) | |
| response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| insights = [line.strip() for line in response.split("\n") if line.strip() and line.strip() not in [prompt]] | |
| logger.info(f"Generated insights: {insights}") | |
| return insights[:4] or ["No insights generated; review input data."] | |
| except Exception as e: | |
| logger.error(f"Attempt {attempt + 1}/{max_retries} - Model inference failed: {str(e)}") | |
| if attempt < max_retries - 1: | |
| logger.info(f"Retrying in {retry_delay} seconds...") | |
| time.sleep(retry_delay) | |
| else: | |
| logger.error("Max retries reached. Using fallback insights.") | |
| fallback_insights = [] | |
| if delay_risk > 75: | |
| fallback_insights.append("High risk detected; allocate additional resources urgently.") | |
| elif delay_risk > 50: | |
| fallback_insights.append("Moderate risk; consider extending shift hours or hiring staff.") | |
| if input_data.get('workforce_gap', 0) > 20: | |
| fallback_insights.append("Significant workforce gap; recruit additional workers.") | |
| if input_data.get('weather_impact_score', 0) > 50: | |
| fallback_insights.append("Adverse weather; prioritize indoor tasks.") | |
| return fallback_insights or ["AI model failed to generate insights; check system resources."] | |
| def predict_delay(input_data: Dict) -> Dict: | |
| """ | |
| Predict delay probability based on project task data. | |
| Uses task duration, progress, workforce info, and weather impact. | |
| Insights are generated by T5-Small (CPU). | |
| """ | |
| logger.info("Starting delay prediction") | |
| phase = input_data.get("phase", "") | |
| task = input_data.get("task", "") | |
| expected_duration = input_data.get("task_expected_duration", 0) | |
| actual_duration = input_data.get("task_actual_duration", 0) | |
| current_progress = input_data.get("current_progress", 0) # in % | |
| workforce_gap_pct = input_data.get("workforce_gap", 0) # in % | |
| skill_level = input_data.get("workforce_skill_level", "").lower() | |
| shift_hours = input_data.get("workforce_shift_hours", 0) | |
| weather_score = input_data.get("weather_impact_score", 0) # 0-100 scale | |
| # Auto-set weather condition if missing or inconsistent | |
| weather_condition = input_data.get("weather_condition", "") | |
| if not weather_condition: | |
| weather_condition = get_weather_condition(weather_score) | |
| # Task options for phase (hardcoded to match app.py) | |
| task_options = { | |
| "Planning": ["Define Scope", "Resource Allocation", "Permit Acquisition"], | |
| "Design": ["Architectural Drafting", "Engineering Analysis", "Design Review"], | |
| "Construction": ["Foundation Work", "Structural Build", "Utility Installation"] | |
| } | |
| delay_risk = 0 | |
| # 1. Duration overrun risk | |
| if expected_duration > 0 and actual_duration > expected_duration: | |
| overrun_pct = ((actual_duration - expected_duration) / expected_duration) * 100 | |
| delay_risk += min(overrun_pct, 30) | |
| # 2. Progress lag risk | |
| if expected_duration > 0 and current_progress >= 0: | |
| expected_progress = (actual_duration / expected_duration) * 100 | |
| if current_progress < expected_progress: | |
| progress_gap = expected_progress - current_progress | |
| delay_risk += min(progress_gap, 25) | |
| # 3. Workforce gap impact | |
| if workforce_gap_pct > 0: | |
| delay_risk += min(workforce_gap_pct * 0.5, 20) | |
| # 4. Skill level effect | |
| if skill_level == "low": | |
| delay_risk += 15 | |
| elif skill_level == "medium": | |
| delay_risk += 7 | |
| # 5. Shift hours effect | |
| if shift_hours < 8: | |
| delay_risk += (8 - shift_hours) * 3 | |
| elif shift_hours > 8: | |
| delay_risk -= min((shift_hours - 8) * 2, 10) | |
| # 6. Weather impact effect | |
| if weather_score > 50: | |
| delay_risk += min(weather_score / 2, 20) | |
| # Ensure delay_risk is between 0 and 100 | |
| delay_risk = max(0, min(delay_risk, 100)) | |
| # Generate high_risk_phases for all tasks in the phase | |
| high_risk_phases = [] | |
| if phase in task_options: | |
| for t in task_options[phase]: | |
| task_risk = delay_risk | |
| if t != task: | |
| task_risk = min(max(task_risk + (hash(t) % 10 - 5), 0), 100) # ±5% variation | |
| high_risk_phases.append({ | |
| "phase": phase, | |
| "task": t, | |
| "risk": round(task_risk, 1) | |
| }) | |
| # Generate AI-driven insights | |
| insights = call_ai_model_for_insights(input_data, delay_risk) | |
| logger.info(f"Prediction completed: Delay risk = {delay_risk:.1f}%") | |
| return { | |
| "project": input_data.get("project_name", "Unnamed Project"), | |
| "phase": phase, | |
| "task": task, | |
| "delay_probability": round(delay_risk, 1), | |
| "ai_insights": "; ".join(insights) if insights else "No significant delay factors detected.", | |
| "high_risk_phases": high_risk_phases, | |
| "weather_condition": weather_condition | |
| } |