Spaces:
Sleeping
Sleeping
| """ | |
| π― Smart Prioritization System | |
| Severity-based sorting with impact analysis and fix effort estimation | |
| """ | |
| from typing import Dict, List, Any | |
| import re | |
| class IssuePrioritizer: | |
| """Smart issue prioritization with severity, impact, and effort analysis""" | |
| # Severity weights | |
| SEVERITY_WEIGHTS = { | |
| "error": 10, | |
| "critical": 10, | |
| "warning": 5, | |
| "info": 1 | |
| } | |
| # Impact categories (security > reliability > performance > style) | |
| IMPACT_WEIGHTS = { | |
| "security": 100, | |
| "reliability": 50, | |
| "performance": 25, | |
| "maintainability": 10, | |
| "style": 5 | |
| } | |
| # Fix effort estimation keywords | |
| QUICK_FIX_PATTERNS = [ | |
| "missing semicolon", "unused variable", "console.log", | |
| "trailing whitespace", "missing docstring", "import order" | |
| ] | |
| MEDIUM_FIX_PATTERNS = [ | |
| "complexity", "duplicate code", "function too long", | |
| "too many parameters", "nested loop" | |
| ] | |
| MAJOR_REFACTOR_PATTERNS = [ | |
| "sql injection", "xss vulnerability", "race condition", | |
| "memory leak", "architecture", "design pattern" | |
| ] | |
| def classify_severity(issue: Dict[str, Any]) -> str: | |
| """Classify issue severity""" | |
| severity = issue.get("severity", "").lower() | |
| message = issue.get("message", "").lower() | |
| rule_id = issue.get("rule_id", "").lower() | |
| # Security issues are always critical | |
| if any(word in message or word in rule_id for word in | |
| ["security", "vulnerability", "injection", "xss", "csrf"]): | |
| return "critical" | |
| # Map existing severities | |
| if severity in ["error", "critical"]: | |
| return "critical" | |
| elif severity == "warning": | |
| return "high" | |
| elif severity == "info": | |
| return "low" | |
| return "medium" | |
| def classify_impact(issue: Dict[str, Any]) -> tuple[str, int]: | |
| """Classify issue impact category and score""" | |
| message = issue.get("message", "").lower() | |
| rule_id = issue.get("rule_id", "").lower() | |
| text = f"{message} {rule_id}" | |
| # Security issues | |
| if any(word in text for word in | |
| ["security", "vulnerability", "injection", "xss", "csrf", | |
| "hardcoded", "secret", "token", "password", "weak"]): | |
| return "security", 100 | |
| # Reliability issues | |
| if any(word in text for word in | |
| ["null", "undefined", "exception", "error handling", | |
| "race condition", "memory leak", "resource leak"]): | |
| return "reliability", 50 | |
| # Performance issues | |
| if any(word in text for word in | |
| ["performance", "slow", "inefficient", "complexity", | |
| "nested loop", "blocking", "synchronous"]): | |
| return "performance", 25 | |
| # Maintainability issues | |
| if any(word in text for word in | |
| ["duplicate", "complexity", "maintainability", "readability", | |
| "naming", "function length", "parameters"]): | |
| return "maintainability", 10 | |
| # Style issues | |
| return "style", 5 | |
| def estimate_fix_effort(issue: Dict[str, Any]) -> tuple[str, int]: | |
| """Estimate effort to fix (quick/medium/major) with time in minutes""" | |
| message = issue.get("message", "").lower() | |
| rule_id = issue.get("rule_id", "").lower() | |
| text = f"{message} {rule_id}" | |
| # Quick fixes (1-5 minutes) | |
| for pattern in IssuePrioritizer.QUICK_FIX_PATTERNS: | |
| if pattern in text: | |
| return "quick-fix", 2 | |
| # Major refactors (30+ minutes) | |
| for pattern in IssuePrioritizer.MAJOR_REFACTOR_PATTERNS: | |
| if pattern in text: | |
| return "major-refactor", 45 | |
| # Medium fixes (5-15 minutes) | |
| for pattern in IssuePrioritizer.MEDIUM_FIX_PATTERNS: | |
| if pattern in text: | |
| return "medium-fix", 10 | |
| # Default to medium | |
| return "medium-fix", 10 | |
| def calculate_priority_score(cls, issue: Dict[str, Any]) -> int: | |
| """Calculate overall priority score (higher = more urgent)""" | |
| severity = cls.classify_severity(issue) | |
| impact_category, impact_score = cls.classify_impact(issue) | |
| effort_category, effort_minutes = cls.estimate_fix_effort(issue) | |
| # Base score from severity | |
| severity_map = { | |
| "critical": 1000, | |
| "high": 500, | |
| "medium": 100, | |
| "low": 10 | |
| } | |
| base_score = severity_map.get(severity, 100) | |
| # Add impact score | |
| total_score = base_score + impact_score | |
| # Boost quick fixes (we want to encourage quick wins) | |
| if effort_category == "quick-fix": | |
| total_score += 50 | |
| return total_score | |
| def enrich_issue(cls, issue: Dict[str, Any]) -> Dict[str, Any]: | |
| """Enrich issue with priority metadata""" | |
| enriched = issue.copy() | |
| # Add classifications | |
| enriched["priority_severity"] = cls.classify_severity(issue) | |
| impact_cat, impact_score = cls.classify_impact(issue) | |
| enriched["impact_category"] = impact_cat | |
| enriched["impact_score"] = impact_score | |
| effort_cat, effort_minutes = cls.estimate_fix_effort(issue) | |
| enriched["fix_effort"] = effort_cat | |
| enriched["estimated_fix_time_minutes"] = effort_minutes | |
| # Calculate priority score | |
| enriched["priority_score"] = cls.calculate_priority_score(issue) | |
| return enriched | |
| def prioritize_issues(cls, issues: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Sort issues by priority (highest first) and enrich with metadata""" | |
| # Enrich all issues | |
| enriched_issues = [cls.enrich_issue(issue) for issue in issues] | |
| # Sort by priority score (descending) | |
| sorted_issues = sorted( | |
| enriched_issues, | |
| key=lambda x: x.get("priority_score", 0), | |
| reverse=True | |
| ) | |
| return sorted_issues | |
| def get_statistics(issues: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Generate statistics from prioritized issues""" | |
| if not issues: | |
| return { | |
| "total": 0, | |
| "by_severity": {}, | |
| "by_impact": {}, | |
| "by_effort": {}, | |
| "total_fix_time_minutes": 0, | |
| "quick_wins": 0 | |
| } | |
| stats = { | |
| "total": len(issues), | |
| "by_severity": {}, | |
| "by_impact": {}, | |
| "by_effort": {}, | |
| "total_fix_time_minutes": 0, | |
| "quick_wins": 0 | |
| } | |
| for issue in issues: | |
| # Count by severity | |
| severity = issue.get("priority_severity", "medium") | |
| stats["by_severity"][severity] = stats["by_severity"].get(severity, 0) + 1 | |
| # Count by impact | |
| impact = issue.get("impact_category", "style") | |
| stats["by_impact"][impact] = stats["by_impact"].get(impact, 0) + 1 | |
| # Count by effort | |
| effort = issue.get("fix_effort", "medium-fix") | |
| stats["by_effort"][effort] = stats["by_effort"].get(effort, 0) + 1 | |
| # Sum fix time | |
| stats["total_fix_time_minutes"] += issue.get("estimated_fix_time_minutes", 0) | |
| # Count quick wins | |
| if effort == "quick-fix": | |
| stats["quick_wins"] += 1 | |
| # Add human-readable time estimate | |
| total_minutes = stats["total_fix_time_minutes"] | |
| if total_minutes < 60: | |
| stats["estimated_fix_time"] = f"{total_minutes} minutes" | |
| else: | |
| hours = total_minutes // 60 | |
| minutes = total_minutes % 60 | |
| stats["estimated_fix_time"] = f"{hours}h {minutes}m" | |
| return stats | |
| def format_priority_report(issues: List[Dict[str, Any]]) -> str: | |
| """Format prioritized issues into a readable report""" | |
| if not issues: | |
| return "β No issues found!" | |
| stats = IssuePrioritizer.get_statistics(issues) | |
| report = f""" | |
| # π Issue Priority Report | |
| ## π Summary | |
| - **Total Issues**: {stats['total']} | |
| - **Estimated Fix Time**: {stats['estimated_fix_time']} | |
| - **Quick Wins**: {stats['quick_wins']} issues | |
| ## π¨ By Severity | |
| """ | |
| severity_emojis = { | |
| "critical": "π΄", | |
| "high": "π ", | |
| "medium": "π‘", | |
| "low": "π΅" | |
| } | |
| for severity in ["critical", "high", "medium", "low"]: | |
| count = stats["by_severity"].get(severity, 0) | |
| if count > 0: | |
| emoji = severity_emojis.get(severity, "βͺ") | |
| report += f"- {emoji} **{severity.title()}**: {count}\n" | |
| report += "\n## π― By Impact\n" | |
| impact_emojis = { | |
| "security": "π‘οΈ", | |
| "reliability": "β‘", | |
| "performance": "π", | |
| "maintainability": "π§", | |
| "style": "π¨" | |
| } | |
| for impact in ["security", "reliability", "performance", "maintainability", "style"]: | |
| count = stats["by_impact"].get(impact, 0) | |
| if count > 0: | |
| emoji = impact_emojis.get(impact, "π") | |
| report += f"- {emoji} **{impact.title()}**: {count}\n" | |
| report += "\n## β±οΈ By Fix Effort\n" | |
| effort_labels = { | |
| "quick-fix": "β‘ Quick Fix (< 5 min)", | |
| "medium-fix": "π§ Medium Fix (5-15 min)", | |
| "major-refactor": "ποΈ Major Refactor (30+ min)" | |
| } | |
| for effort in ["quick-fix", "medium-fix", "major-refactor"]: | |
| count = stats["by_effort"].get(effort, 0) | |
| if count > 0: | |
| label = effort_labels.get(effort, effort) | |
| report += f"- {label}: {count}\n" | |
| # Top 5 highest priority issues | |
| report += "\n## π₯ Top Priority Issues\n\n" | |
| for i, issue in enumerate(issues[:5], 1): | |
| severity_emoji = severity_emojis.get(issue.get("priority_severity", "medium"), "βͺ") | |
| impact_emoji = impact_emojis.get(issue.get("impact_category", "style"), "π") | |
| line = issue.get("line", "?") | |
| message = issue.get("message", "No description") | |
| effort = issue.get("fix_effort", "medium-fix") | |
| fix_time = issue.get("estimated_fix_time_minutes", "?") | |
| report += f"{i}. {severity_emoji} {impact_emoji} **Line {line}**: {message}\n" | |
| report += f" β±οΈ ~{fix_time} min to fix\n\n" | |
| return report | |