Spaces:
Sleeping
Sleeping
| from typing import List, Dict, Optional | |
| from datetime import datetime, timedelta, date | |
| from collections import defaultdict | |
| import numpy as np | |
| from models.jira_models import JiraIssue, Sprint, WorklogEntry | |
| from models.intelligence_models import ( | |
| DeliveryHealthMetrics, | |
| ProductivityMetrics, | |
| CostEfficiencyMetrics, | |
| TeamCapacityMetrics, | |
| RiskAlert, | |
| InsightRecommendation, | |
| KanbanFlowMetrics, | |
| KanbanColumnAnalysis, | |
| KanbanCumulativeFlow, | |
| WIPLimitRecommendation | |
| ) | |
| import logging | |
| import uuid | |
| logger = logging.getLogger(__name__) | |
| class IntelligenceService: | |
| """Service for generating business intelligence from engineering data""" | |
| def calculate_delivery_health( | |
| self, | |
| issues: List[JiraIssue], | |
| sprint: Optional[Sprint] = None, | |
| period_start: Optional[date] = None, | |
| period_end: Optional[date] = None | |
| ) -> DeliveryHealthMetrics: | |
| """Calculate delivery health metrics""" | |
| if not period_start and sprint: | |
| period_start = sprint.start_date.date() if sprint.start_date else date.today() | |
| if not period_end and sprint: | |
| period_end = sprint.end_date.date() if sprint.end_date else date.today() | |
| if not period_start: | |
| period_start = date.today() - timedelta(days=14) | |
| if not period_end: | |
| period_end = date.today() | |
| # Filter issues within the period | |
| period_issues = [ | |
| issue for issue in issues | |
| if period_start <= issue.created.date() <= period_end | |
| ] | |
| # Calculate metrics | |
| total_issues = len(period_issues) | |
| completed_issues = len([i for i in period_issues if i.status.lower() in ['done', 'closed']]) | |
| blocked_issues = len([i for i in period_issues if i.status.lower() == 'blocked']) | |
| # Story points | |
| planned_points = sum([i.story_points or 0 for i in period_issues]) | |
| completed_points = sum([ | |
| i.story_points or 0 for i in period_issues | |
| if i.status.lower() in ['done', 'closed'] | |
| ]) | |
| # Cycle time calculation | |
| cycle_times = [] | |
| for issue in period_issues: | |
| if issue.resolved and issue.created: | |
| cycle_time = (issue.resolved - issue.created).total_seconds() / 3600 | |
| cycle_times.append(cycle_time) | |
| avg_cycle_time = np.mean(cycle_times) if cycle_times else 0 | |
| # Calculate completion rate | |
| completion_rate = (completed_issues / total_issues * 100) if total_issues > 0 else 0 | |
| # Calculate health score (0-100) | |
| health_score = self._calculate_health_score( | |
| completion_rate=completion_rate, | |
| blocked_ratio=blocked_issues / total_issues if total_issues > 0 else 0, | |
| velocity_ratio=completed_points / planned_points if planned_points > 0 else 0 | |
| ) | |
| return DeliveryHealthMetrics( | |
| sprint_id=str(sprint.sprint_id) if sprint else None, | |
| sprint_name=sprint.sprint_name if sprint else None, | |
| period_start=period_start, | |
| period_end=period_end, | |
| planned_story_points=planned_points, | |
| completed_story_points=completed_points, | |
| velocity=completed_points, | |
| velocity_trend=0, # TODO: Calculate from historical data | |
| total_issues=total_issues, | |
| completed_issues=completed_issues, | |
| completion_rate=completion_rate, | |
| avg_cycle_time_hours=avg_cycle_time, | |
| avg_lead_time_hours=avg_cycle_time, # Simplified | |
| blocked_issues_count=blocked_issues, | |
| overdue_issues_count=0, # TODO: Calculate based on due dates | |
| reopened_issues_count=0, # TODO: Track issue history | |
| at_risk_issues=blocked_issues, | |
| health_score=health_score | |
| ) | |
| def calculate_productivity_metrics( | |
| self, | |
| issues: List[JiraIssue], | |
| worklogs: List[WorklogEntry], | |
| team_member_id: str, | |
| team_member_name: str, | |
| period_start: date, | |
| period_end: date | |
| ) -> ProductivityMetrics: | |
| """Calculate productivity metrics for a team member""" | |
| # Filter issues assigned to this member | |
| member_issues = [ | |
| i for i in issues | |
| if i.assignee == team_member_name | |
| and period_start <= i.created.date() <= period_end | |
| ] | |
| # Filter worklogs for this member | |
| member_worklogs = [ | |
| w for w in worklogs | |
| if w.author == team_member_name | |
| and period_start <= w.started.date() <= period_end | |
| ] | |
| # Calculate metrics | |
| completed_issues = len([i for i in member_issues if i.status.lower() in ['done', 'closed']]) | |
| story_points_completed = sum([ | |
| i.story_points or 0 for i in member_issues | |
| if i.status.lower() in ['done', 'closed'] | |
| ]) | |
| # Time metrics | |
| total_hours = sum([w.time_spent_seconds / 3600 for w in member_worklogs]) | |
| days_in_period = (period_end - period_start).days + 1 | |
| avg_hours_per_day = total_hours / days_in_period if days_in_period > 0 else 0 | |
| # Completion time | |
| completion_times = [] | |
| for issue in member_issues: | |
| if issue.resolved and issue.created: | |
| completion_time = (issue.resolved - issue.created).total_seconds() / 3600 | |
| completion_times.append(completion_time) | |
| avg_completion_time = np.mean(completion_times) if completion_times else 0 | |
| # Productivity score | |
| productivity_score = self._calculate_productivity_score( | |
| completed_issues=completed_issues, | |
| story_points_completed=story_points_completed, | |
| avg_completion_time=avg_completion_time, | |
| hours_logged=total_hours | |
| ) | |
| # Current workload | |
| current_assigned = len([i for i in issues if i.assignee == team_member_name and i.status.lower() not in ['done', 'closed']]) | |
| current_points = sum([i.story_points or 0 for i in issues if i.assignee == team_member_name and i.status.lower() not in ['done', 'closed']]) | |
| return ProductivityMetrics( | |
| team_member_id=team_member_id, | |
| team_member_name=team_member_name, | |
| period_start=period_start, | |
| period_end=period_end, | |
| issues_completed=completed_issues, | |
| story_points_completed=story_points_completed, | |
| code_commits=0, # TODO: Integrate with GitHub | |
| pull_requests=0, # TODO: Integrate with GitHub | |
| total_hours_logged=total_hours, | |
| avg_hours_per_day=avg_hours_per_day, | |
| avg_issue_completion_time_hours=avg_completion_time, | |
| productivity_score=productivity_score, | |
| current_assigned_issues=current_assigned, | |
| current_story_points=current_points, | |
| utilization_rate=min(100, (total_hours / (days_in_period * 8)) * 100) if days_in_period > 0 else 0 | |
| ) | |
| def calculate_cost_efficiency( | |
| self, | |
| issues: List[JiraIssue], | |
| worklogs: List[WorklogEntry], | |
| period_start: date, | |
| period_end: date, | |
| avg_hourly_rate: float = 75.0 # Default rate | |
| ) -> CostEfficiencyMetrics: | |
| """Calculate cost efficiency metrics""" | |
| # Filter data for period | |
| period_issues = [ | |
| i for i in issues | |
| if period_start <= i.created.date() <= period_end | |
| ] | |
| period_worklogs = [ | |
| w for w in worklogs | |
| if period_start <= w.started.date() <= period_end | |
| ] | |
| # Get unique team members | |
| team_members = set([w.author for w in period_worklogs]) | |
| total_team_members = len(team_members) | |
| # Calculate hours and cost | |
| total_hours = sum([w.time_spent_seconds / 3600 for w in period_worklogs]) | |
| estimated_cost = total_hours * avg_hourly_rate | |
| # Output metrics | |
| features_delivered = len([i for i in period_issues if i.status.lower() in ['done', 'closed'] and i.issue_type.lower() in ['story', 'feature']]) | |
| story_points_delivered = sum([ | |
| i.story_points or 0 for i in period_issues | |
| if i.status.lower() in ['done', 'closed'] | |
| ]) | |
| # Efficiency ratios | |
| cost_per_feature = estimated_cost / features_delivered if features_delivered > 0 else 0 | |
| cost_per_story_point = estimated_cost / story_points_delivered if story_points_delivered > 0 else 0 | |
| hours_per_story_point = total_hours / story_points_delivered if story_points_delivered > 0 else 0 | |
| # Waste calculation (blocked time) | |
| blocked_hours = sum([ | |
| (w.time_spent_seconds / 3600) for w in period_worklogs | |
| if any(i.issue_key == w.issue_key and i.status.lower() == 'blocked' for i in period_issues) | |
| ]) | |
| waste_percentage = (blocked_hours / total_hours * 100) if total_hours > 0 else 0 | |
| return CostEfficiencyMetrics( | |
| period_start=period_start, | |
| period_end=period_end, | |
| total_team_members=total_team_members, | |
| total_hours_logged=total_hours, | |
| estimated_cost=estimated_cost, | |
| features_delivered=features_delivered, | |
| story_points_delivered=story_points_delivered, | |
| cost_per_feature=cost_per_feature, | |
| cost_per_story_point=cost_per_story_point, | |
| hours_per_story_point=hours_per_story_point, | |
| blocked_time_hours=blocked_hours, | |
| rework_hours=0, # TODO: Track rework | |
| waste_percentage=waste_percentage | |
| ) | |
| def generate_risk_alerts( | |
| self, | |
| delivery_health: DeliveryHealthMetrics, | |
| productivity_metrics: List[ProductivityMetrics], | |
| cost_metrics: CostEfficiencyMetrics | |
| ) -> List[RiskAlert]: | |
| """Generate risk alerts based on metrics""" | |
| alerts = [] | |
| # Delivery health alerts | |
| if delivery_health.health_score < 50: | |
| alerts.append(RiskAlert( | |
| alert_id=str(uuid.uuid4()), | |
| alert_type="delivery_delay", | |
| severity="critical" if delivery_health.health_score < 30 else "high", | |
| title="Low Delivery Health Score", | |
| description=f"Sprint health score is {delivery_health.health_score:.1f}/100, indicating significant delivery risks.", | |
| affected_entity="sprint", | |
| entity_id=delivery_health.sprint_id or "unknown", | |
| detected_at=datetime.now(), | |
| suggested_action="Review blocked issues, reassign workload, and identify bottlenecks.", | |
| metrics={"health_score": delivery_health.health_score} | |
| )) | |
| if delivery_health.completion_rate < 60: | |
| alerts.append(RiskAlert( | |
| alert_id=str(uuid.uuid4()), | |
| alert_type="delivery_delay", | |
| severity="high", | |
| title="Low Completion Rate", | |
| description=f"Only {delivery_health.completion_rate:.1f}% of planned work is completed.", | |
| affected_entity="sprint", | |
| entity_id=delivery_health.sprint_id or "unknown", | |
| detected_at=datetime.now(), | |
| suggested_action="Reduce scope or extend timeline to meet commitments.", | |
| metrics={"completion_rate": delivery_health.completion_rate} | |
| )) | |
| # Productivity alerts | |
| overworked = [p for p in productivity_metrics if p.avg_hours_per_day > 10] | |
| if overworked: | |
| for member in overworked: | |
| alerts.append(RiskAlert( | |
| alert_id=str(uuid.uuid4()), | |
| alert_type="resource_shortage", | |
| severity="medium", | |
| title="Team Member Overworked", | |
| description=f"{member.team_member_name} is logging {member.avg_hours_per_day:.1f} hours/day.", | |
| affected_entity="team_member", | |
| entity_id=member.team_member_id, | |
| detected_at=datetime.now(), | |
| suggested_action="Redistribute workload to prevent burnout.", | |
| metrics={"avg_hours_per_day": member.avg_hours_per_day} | |
| )) | |
| # Cost alerts | |
| if cost_metrics.waste_percentage > 20: | |
| alerts.append(RiskAlert( | |
| alert_id=str(uuid.uuid4()), | |
| alert_type="cost_overrun", | |
| severity="high", | |
| title="High Waste Percentage", | |
| description=f"{cost_metrics.waste_percentage:.1f}% of time is wasted on blocked work.", | |
| affected_entity="project", | |
| entity_id="project", | |
| detected_at=datetime.now(), | |
| suggested_action="Identify and remove blockers urgently.", | |
| metrics={"waste_percentage": cost_metrics.waste_percentage} | |
| )) | |
| return alerts | |
| def generate_insights( | |
| self, | |
| delivery_health: DeliveryHealthMetrics, | |
| productivity_metrics: List[ProductivityMetrics], | |
| cost_metrics: CostEfficiencyMetrics | |
| ) -> List[InsightRecommendation]: | |
| """Generate AI-powered insights and recommendations""" | |
| insights = [] | |
| # Velocity trend insight | |
| if delivery_health.velocity > 0: | |
| insights.append(InsightRecommendation( | |
| insight_id=str(uuid.uuid4()), | |
| category="delivery", | |
| title="Velocity Analysis", | |
| description=f"Team completed {delivery_health.completed_story_points:.1f} story points with {delivery_health.completion_rate:.1f}% completion rate.", | |
| confidence_score=0.85, | |
| impact_level="medium", | |
| recommendations=[ | |
| "Maintain current sprint planning strategy", | |
| "Consider increasing capacity for higher throughput" | |
| ], | |
| supporting_data={ | |
| "completed_points": delivery_health.completed_story_points, | |
| "completion_rate": delivery_health.completion_rate | |
| }, | |
| generated_at=datetime.now() | |
| )) | |
| # Team efficiency insight | |
| if productivity_metrics: | |
| avg_productivity = np.mean([p.productivity_score for p in productivity_metrics]) | |
| insights.append(InsightRecommendation( | |
| insight_id=str(uuid.uuid4()), | |
| category="productivity", | |
| title="Team Productivity Overview", | |
| description=f"Average team productivity score is {avg_productivity:.1f}/100.", | |
| confidence_score=0.80, | |
| impact_level="high" if avg_productivity < 60 else "low", | |
| recommendations=[ | |
| "Identify top performers and share best practices", | |
| "Provide additional support to team members scoring below 50", | |
| "Review tool and process efficiency" | |
| ] if avg_productivity < 70 else [ | |
| "Team is performing well", | |
| "Focus on maintaining current practices" | |
| ], | |
| supporting_data={"avg_productivity_score": avg_productivity}, | |
| generated_at=datetime.now() | |
| )) | |
| # Cost optimization insight | |
| if cost_metrics.cost_per_story_point > 0: | |
| insights.append(InsightRecommendation( | |
| insight_id=str(uuid.uuid4()), | |
| category="cost", | |
| title="Cost Efficiency Analysis", | |
| description=f"Current cost per story point is ${cost_metrics.cost_per_story_point:.2f}.", | |
| confidence_score=0.75, | |
| impact_level="medium", | |
| recommendations=[ | |
| "Track this metric over time to identify trends", | |
| "Compare with industry benchmarks", | |
| "Focus on reducing waste to improve efficiency" | |
| ], | |
| supporting_data={ | |
| "cost_per_story_point": cost_metrics.cost_per_story_point, | |
| "waste_percentage": cost_metrics.waste_percentage | |
| }, | |
| generated_at=datetime.now() | |
| )) | |
| return insights | |
| def _calculate_health_score( | |
| self, | |
| completion_rate: float, | |
| blocked_ratio: float, | |
| velocity_ratio: float | |
| ) -> float: | |
| """Calculate overall health score (0-100)""" | |
| # Weighted scoring | |
| completion_weight = 0.4 | |
| blocked_weight = 0.3 | |
| velocity_weight = 0.3 | |
| completion_score = completion_rate | |
| blocked_score = max(0, 100 - (blocked_ratio * 200)) # Penalize blocked items | |
| velocity_score = min(100, velocity_ratio * 100) | |
| health_score = ( | |
| completion_score * completion_weight + | |
| blocked_score * blocked_weight + | |
| velocity_score * velocity_weight | |
| ) | |
| return max(0, min(100, health_score)) | |
| def _calculate_productivity_score( | |
| self, | |
| completed_issues: int, | |
| story_points_completed: float, | |
| avg_completion_time: float, | |
| hours_logged: float | |
| ) -> float: | |
| """Calculate productivity score (0-100)""" | |
| # Simple scoring based on output | |
| output_score = min(100, (completed_issues * 10) + (story_points_completed * 5)) | |
| # Efficiency score (inverse of completion time) | |
| efficiency_score = min(100, 100 - (avg_completion_time / 10)) if avg_completion_time > 0 else 50 | |
| # Average the scores | |
| productivity_score = (output_score * 0.6) + (efficiency_score * 0.4) | |
| return max(0, min(100, productivity_score)) | |
| # ===== KANBAN INTELLIGENCE METHODS ===== | |
| def calculate_kanban_flow_metrics( | |
| self, | |
| board_id: int, | |
| board_name: str, | |
| issues: List[JiraIssue], | |
| columns: List, # List of KanbanIssuesByColumn | |
| period_start: date, | |
| period_end: date | |
| ) -> KanbanFlowMetrics: | |
| """Calculate Kanban flow efficiency metrics""" | |
| # Filter issues within period | |
| period_issues = [ | |
| issue for issue in issues | |
| if period_start <= issue.created.date() <= period_end | |
| ] | |
| # Calculate throughput (completed in period) | |
| completed_issues = [ | |
| i for i in period_issues | |
| if i.status.lower() in ['done', 'closed'] and i.resolved | |
| and period_start <= i.resolved.date() <= period_end | |
| ] | |
| throughput = len(completed_issues) | |
| # Calculate cycle time for completed issues | |
| cycle_times = [] | |
| for issue in completed_issues: | |
| if issue.resolved and issue.created: | |
| cycle_time = (issue.resolved.date() - issue.created.date()).days | |
| cycle_times.append(cycle_time) | |
| avg_cycle_time = np.mean(cycle_times) if cycle_times else 0 | |
| cycle_time_variance = np.std(cycle_times) if len(cycle_times) > 1 else 0 | |
| # Calculate WIP | |
| current_wip = sum([col.issue_count for col in columns if col.column_name.lower() != 'done']) | |
| # Calculate WIP violations | |
| wip_violations = sum([ | |
| 1 for col in columns | |
| if col.wip_limit_max and col.issue_count > col.wip_limit_max | |
| ]) | |
| # Identify bottleneck | |
| bottleneck_column = None | |
| bottleneck_score = 0 | |
| for col in columns: | |
| if col.column_name.lower() not in ['backlog', 'done']: | |
| # Bottleneck score based on utilization and throughput | |
| utilization = col.issue_count / col.wip_limit_max if col.wip_limit_max else 0 | |
| score = utilization * col.issue_count | |
| if score > bottleneck_score: | |
| bottleneck_score = score | |
| bottleneck_column = col.column_name | |
| # Calculate flow efficiency (simplified) | |
| flow_efficiency = min(100, (throughput / max(1, current_wip)) * 100) | |
| # Calculate flow health score | |
| flow_health_score = self._calculate_flow_health_score( | |
| throughput=throughput, | |
| avg_cycle_time=avg_cycle_time, | |
| wip_violations=wip_violations, | |
| current_wip=current_wip | |
| ) | |
| return KanbanFlowMetrics( | |
| board_id=board_id, | |
| board_name=board_name, | |
| period_start=period_start, | |
| period_end=period_end, | |
| throughput=throughput, | |
| avg_cycle_time_days=avg_cycle_time, | |
| avg_lead_time_days=avg_cycle_time, # Simplified | |
| flow_efficiency=flow_efficiency, | |
| current_wip=current_wip, | |
| avg_wip=current_wip, # TODO: Calculate historical average | |
| wip_violations=wip_violations, | |
| bottleneck_column=bottleneck_column, | |
| bottleneck_score=bottleneck_score, | |
| throughput_variance=0, # TODO: Calculate from historical data | |
| cycle_time_variance=cycle_time_variance, | |
| flow_health_score=flow_health_score | |
| ) | |
| def analyze_kanban_columns( | |
| self, | |
| columns: List, # List of KanbanIssuesByColumn | |
| issues: List[JiraIssue] | |
| ) -> List[KanbanColumnAnalysis]: | |
| """Analyze each Kanban column for bottlenecks and efficiency""" | |
| analyses = [] | |
| max_issue_count = max([col.issue_count for col in columns]) if columns else 1 | |
| for col in columns: | |
| # Calculate average time in column (simplified) | |
| column_issues = col.issues | |
| times_in_column = [] | |
| for issue in column_issues: | |
| if issue.resolved and issue.created: | |
| time_in_col = (issue.resolved - issue.created).total_seconds() / (3600 * 24) | |
| times_in_column.append(time_in_col) | |
| avg_time = np.mean(times_in_column) if times_in_column else 0 | |
| # Calculate utilization | |
| utilization = 0 | |
| if col.wip_limit_max and col.wip_limit_max > 0: | |
| utilization = (col.issue_count / col.wip_limit_max) * 100 | |
| # Determine if bottleneck (high issue count relative to others) | |
| is_bottleneck = col.issue_count >= (max_issue_count * 0.7) and col.column_name.lower() not in ['backlog', 'done'] | |
| bottleneck_score = (col.issue_count / max_issue_count) * 100 | |
| analyses.append(KanbanColumnAnalysis( | |
| column_name=col.column_name, | |
| statuses=col.statuses, | |
| current_issue_count=col.issue_count, | |
| wip_limit_min=col.wip_limit_min, | |
| wip_limit_max=col.wip_limit_max, | |
| is_over_wip_limit=bool(col.wip_limit_max and col.issue_count > col.wip_limit_max), | |
| avg_time_in_column_days=avg_time, | |
| throughput=len(column_issues), | |
| utilization_rate=utilization, | |
| is_bottleneck=is_bottleneck, | |
| bottleneck_score=bottleneck_score | |
| )) | |
| return analyses | |
| def generate_wip_recommendations( | |
| self, | |
| column_analyses: List[KanbanColumnAnalysis], | |
| flow_metrics: KanbanFlowMetrics | |
| ) -> List[WIPLimitRecommendation]: | |
| """Generate WIP limit recommendations for columns""" | |
| recommendations = [] | |
| for analysis in column_analyses: | |
| if analysis.column_name.lower() in ['backlog', 'done']: | |
| continue # Skip backlog and done columns | |
| current_limit = analysis.wip_limit_max | |
| current_count = analysis.current_issue_count | |
| # Calculate recommended limits | |
| if analysis.is_bottleneck: | |
| # Bottleneck: recommend lower WIP to force upstream to slow down | |
| recommended_max = max(2, int(current_count * 0.7)) | |
| recommended_min = max(1, int(recommended_max * 0.5)) | |
| reasoning = f"Column is a bottleneck. Reducing WIP limit will help identify and resolve issues faster." | |
| confidence = 0.8 | |
| elif analysis.is_over_wip_limit: | |
| # Over limit: recommend current count or slightly higher | |
| recommended_max = max(current_limit or 5, current_count) | |
| recommended_min = max(1, int(recommended_max * 0.5)) | |
| reasoning = f"Currently over WIP limit. Recommend increasing limit to {recommended_max} or focusing on moving work out of this column." | |
| confidence = 0.7 | |
| elif current_limit and analysis.utilization_rate < 50: | |
| # Underutilized: recommend lower limit | |
| recommended_max = max(2, int(current_limit * 0.7)) | |
| recommended_min = max(1, int(recommended_max * 0.5)) | |
| reasoning = f"Column is underutilized ({analysis.utilization_rate:.1f}%). Consider reducing WIP limit to improve focus." | |
| confidence = 0.6 | |
| else: | |
| # Optimal: keep current or suggest based on team size | |
| recommended_max = current_limit or max(3, int(current_count * 1.2)) | |
| recommended_min = max(1, int(recommended_max * 0.5)) | |
| reasoning = f"Current WIP appears optimal. Maintain current limits and monitor flow." | |
| confidence = 0.5 | |
| recommendations.append(WIPLimitRecommendation( | |
| column_name=analysis.column_name, | |
| current_limit=current_limit, | |
| recommended_min=recommended_min, | |
| recommended_max=recommended_max, | |
| reasoning=reasoning, | |
| confidence_score=confidence | |
| )) | |
| return recommendations | |
| def generate_kanban_insights( | |
| self, | |
| flow_metrics: KanbanFlowMetrics, | |
| column_analyses: List[KanbanColumnAnalysis], | |
| wip_recommendations: List[WIPLimitRecommendation] | |
| ) -> List[InsightRecommendation]: | |
| """Generate Kanban-specific insights and recommendations""" | |
| insights = [] | |
| # Flow efficiency insight | |
| insights.append(InsightRecommendation( | |
| insight_id=str(uuid.uuid4()), | |
| category="kanban_flow", | |
| title="Kanban Flow Efficiency", | |
| description=f"Board throughput is {flow_metrics.throughput} items with average cycle time of {flow_metrics.avg_cycle_time_days:.1f} days.", | |
| confidence_score=0.85, | |
| impact_level="high" if flow_metrics.flow_health_score < 60 else "medium", | |
| recommendations=[ | |
| f"Current flow health score: {flow_metrics.flow_health_score:.1f}/100", | |
| "Focus on reducing WIP to improve flow" if flow_metrics.current_wip > 10 else "WIP levels are healthy", | |
| f"Address bottleneck in '{flow_metrics.bottleneck_column}' column" if flow_metrics.bottleneck_column else "No major bottlenecks detected" | |
| ], | |
| supporting_data={ | |
| "throughput": flow_metrics.throughput, | |
| "cycle_time": flow_metrics.avg_cycle_time_days, | |
| "wip": flow_metrics.current_wip, | |
| "health_score": flow_metrics.flow_health_score | |
| }, | |
| generated_at=datetime.now() | |
| )) | |
| # WIP violations insight | |
| if flow_metrics.wip_violations > 0: | |
| insights.append(InsightRecommendation( | |
| insight_id=str(uuid.uuid4()), | |
| category="kanban_wip", | |
| title="WIP Limit Violations Detected", | |
| description=f"{flow_metrics.wip_violations} column(s) are exceeding their WIP limits.", | |
| confidence_score=0.95, | |
| impact_level="high", | |
| recommendations=[ | |
| "Review and enforce WIP limits to maintain flow", | |
| "Investigate why work is accumulating in these columns", | |
| "Consider if WIP limits need adjustment or if there are blockers" | |
| ], | |
| supporting_data={"wip_violations": flow_metrics.wip_violations}, | |
| generated_at=datetime.now() | |
| )) | |
| # Bottleneck insight | |
| if flow_metrics.bottleneck_column: | |
| bottleneck_analysis = next( | |
| (a for a in column_analyses if a.column_name == flow_metrics.bottleneck_column), | |
| None | |
| ) | |
| if bottleneck_analysis: | |
| insights.append(InsightRecommendation( | |
| insight_id=str(uuid.uuid4()), | |
| category="kanban_bottleneck", | |
| title=f"Bottleneck Identified: {flow_metrics.bottleneck_column}", | |
| description=f"The '{flow_metrics.bottleneck_column}' column has {bottleneck_analysis.current_issue_count} items with {bottleneck_analysis.avg_time_in_column_days:.1f} days average dwell time.", | |
| confidence_score=0.80, | |
| impact_level="high", | |
| recommendations=[ | |
| "Add more resources to this stage of the workflow", | |
| "Review and optimize processes in this column", | |
| "Consider splitting this column into smaller steps", | |
| "Implement swarming: have team members help clear this column" | |
| ], | |
| supporting_data={ | |
| "column": flow_metrics.bottleneck_column, | |
| "issue_count": bottleneck_analysis.current_issue_count, | |
| "avg_time_days": bottleneck_analysis.avg_time_in_column_days | |
| }, | |
| generated_at=datetime.now() | |
| )) | |
| # Cycle time insight | |
| if flow_metrics.cycle_time_variance > flow_metrics.avg_cycle_time_days * 0.5: | |
| insights.append(InsightRecommendation( | |
| insight_id=str(uuid.uuid4()), | |
| category="kanban_predictability", | |
| title="High Cycle Time Variability", | |
| description=f"Cycle times are inconsistent (variance: {flow_metrics.cycle_time_variance:.1f} days), making delivery predictions difficult.", | |
| confidence_score=0.75, | |
| impact_level="medium", | |
| recommendations=[ | |
| "Standardize work item sizes for better predictability", | |
| "Identify and address sources of variation", | |
| "Consider using work item types or classes of service", | |
| "Break down larger items into smaller, more predictable pieces" | |
| ], | |
| supporting_data={ | |
| "avg_cycle_time": flow_metrics.avg_cycle_time_days, | |
| "variance": flow_metrics.cycle_time_variance | |
| }, | |
| generated_at=datetime.now() | |
| )) | |
| return insights | |
| def _calculate_flow_health_score( | |
| self, | |
| throughput: int, | |
| avg_cycle_time: float, | |
| wip_violations: int, | |
| current_wip: int | |
| ) -> float: | |
| """Calculate Kanban flow health score (0-100)""" | |
| # Throughput score (higher is better) | |
| throughput_score = min(100, throughput * 10) | |
| # Cycle time score (lower is better, assuming < 7 days is good) | |
| cycle_time_score = max(0, 100 - (avg_cycle_time * 10)) | |
| # WIP score (penalize violations and high WIP) | |
| wip_score = max(0, 100 - (wip_violations * 30) - (current_wip * 2)) | |
| # Weighted average | |
| health_score = ( | |
| throughput_score * 0.4 + | |
| cycle_time_score * 0.4 + | |
| wip_score * 0.2 | |
| ) | |
| return max(0, min(100, health_score)) | |
| # Singleton instance | |
| intelligence_service = IntelligenceService() | |