Codelint-MCP / src /utils /prioritization.py
OsamaAliMid's picture
Add CodeLint MCP Premium Edition application
ec37394
"""
🎯 Smart Prioritization System
Severity-based sorting with impact analysis and fix effort estimation
"""
from typing import Dict, List, Any
import re
class IssuePrioritizer:
"""Smart issue prioritization with severity, impact, and effort analysis"""
# Severity weights
SEVERITY_WEIGHTS = {
"error": 10,
"critical": 10,
"warning": 5,
"info": 1
}
# Impact categories (security > reliability > performance > style)
IMPACT_WEIGHTS = {
"security": 100,
"reliability": 50,
"performance": 25,
"maintainability": 10,
"style": 5
}
# Fix effort estimation keywords
QUICK_FIX_PATTERNS = [
"missing semicolon", "unused variable", "console.log",
"trailing whitespace", "missing docstring", "import order"
]
MEDIUM_FIX_PATTERNS = [
"complexity", "duplicate code", "function too long",
"too many parameters", "nested loop"
]
MAJOR_REFACTOR_PATTERNS = [
"sql injection", "xss vulnerability", "race condition",
"memory leak", "architecture", "design pattern"
]
@staticmethod
def classify_severity(issue: Dict[str, Any]) -> str:
"""Classify issue severity"""
severity = issue.get("severity", "").lower()
message = issue.get("message", "").lower()
rule_id = issue.get("rule_id", "").lower()
# Security issues are always critical
if any(word in message or word in rule_id for word in
["security", "vulnerability", "injection", "xss", "csrf"]):
return "critical"
# Map existing severities
if severity in ["error", "critical"]:
return "critical"
elif severity == "warning":
return "high"
elif severity == "info":
return "low"
return "medium"
@staticmethod
def classify_impact(issue: Dict[str, Any]) -> tuple[str, int]:
"""Classify issue impact category and score"""
message = issue.get("message", "").lower()
rule_id = issue.get("rule_id", "").lower()
text = f"{message} {rule_id}"
# Security issues
if any(word in text for word in
["security", "vulnerability", "injection", "xss", "csrf",
"hardcoded", "secret", "token", "password", "weak"]):
return "security", 100
# Reliability issues
if any(word in text for word in
["null", "undefined", "exception", "error handling",
"race condition", "memory leak", "resource leak"]):
return "reliability", 50
# Performance issues
if any(word in text for word in
["performance", "slow", "inefficient", "complexity",
"nested loop", "blocking", "synchronous"]):
return "performance", 25
# Maintainability issues
if any(word in text for word in
["duplicate", "complexity", "maintainability", "readability",
"naming", "function length", "parameters"]):
return "maintainability", 10
# Style issues
return "style", 5
@staticmethod
def estimate_fix_effort(issue: Dict[str, Any]) -> tuple[str, int]:
"""Estimate effort to fix (quick/medium/major) with time in minutes"""
message = issue.get("message", "").lower()
rule_id = issue.get("rule_id", "").lower()
text = f"{message} {rule_id}"
# Quick fixes (1-5 minutes)
for pattern in IssuePrioritizer.QUICK_FIX_PATTERNS:
if pattern in text:
return "quick-fix", 2
# Major refactors (30+ minutes)
for pattern in IssuePrioritizer.MAJOR_REFACTOR_PATTERNS:
if pattern in text:
return "major-refactor", 45
# Medium fixes (5-15 minutes)
for pattern in IssuePrioritizer.MEDIUM_FIX_PATTERNS:
if pattern in text:
return "medium-fix", 10
# Default to medium
return "medium-fix", 10
@classmethod
def calculate_priority_score(cls, issue: Dict[str, Any]) -> int:
"""Calculate overall priority score (higher = more urgent)"""
severity = cls.classify_severity(issue)
impact_category, impact_score = cls.classify_impact(issue)
effort_category, effort_minutes = cls.estimate_fix_effort(issue)
# Base score from severity
severity_map = {
"critical": 1000,
"high": 500,
"medium": 100,
"low": 10
}
base_score = severity_map.get(severity, 100)
# Add impact score
total_score = base_score + impact_score
# Boost quick fixes (we want to encourage quick wins)
if effort_category == "quick-fix":
total_score += 50
return total_score
@classmethod
def enrich_issue(cls, issue: Dict[str, Any]) -> Dict[str, Any]:
"""Enrich issue with priority metadata"""
enriched = issue.copy()
# Add classifications
enriched["priority_severity"] = cls.classify_severity(issue)
impact_cat, impact_score = cls.classify_impact(issue)
enriched["impact_category"] = impact_cat
enriched["impact_score"] = impact_score
effort_cat, effort_minutes = cls.estimate_fix_effort(issue)
enriched["fix_effort"] = effort_cat
enriched["estimated_fix_time_minutes"] = effort_minutes
# Calculate priority score
enriched["priority_score"] = cls.calculate_priority_score(issue)
return enriched
@classmethod
def prioritize_issues(cls, issues: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Sort issues by priority (highest first) and enrich with metadata"""
# Enrich all issues
enriched_issues = [cls.enrich_issue(issue) for issue in issues]
# Sort by priority score (descending)
sorted_issues = sorted(
enriched_issues,
key=lambda x: x.get("priority_score", 0),
reverse=True
)
return sorted_issues
@staticmethod
def get_statistics(issues: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate statistics from prioritized issues"""
if not issues:
return {
"total": 0,
"by_severity": {},
"by_impact": {},
"by_effort": {},
"total_fix_time_minutes": 0,
"quick_wins": 0
}
stats = {
"total": len(issues),
"by_severity": {},
"by_impact": {},
"by_effort": {},
"total_fix_time_minutes": 0,
"quick_wins": 0
}
for issue in issues:
# Count by severity
severity = issue.get("priority_severity", "medium")
stats["by_severity"][severity] = stats["by_severity"].get(severity, 0) + 1
# Count by impact
impact = issue.get("impact_category", "style")
stats["by_impact"][impact] = stats["by_impact"].get(impact, 0) + 1
# Count by effort
effort = issue.get("fix_effort", "medium-fix")
stats["by_effort"][effort] = stats["by_effort"].get(effort, 0) + 1
# Sum fix time
stats["total_fix_time_minutes"] += issue.get("estimated_fix_time_minutes", 0)
# Count quick wins
if effort == "quick-fix":
stats["quick_wins"] += 1
# Add human-readable time estimate
total_minutes = stats["total_fix_time_minutes"]
if total_minutes < 60:
stats["estimated_fix_time"] = f"{total_minutes} minutes"
else:
hours = total_minutes // 60
minutes = total_minutes % 60
stats["estimated_fix_time"] = f"{hours}h {minutes}m"
return stats
def format_priority_report(issues: List[Dict[str, Any]]) -> str:
"""Format prioritized issues into a readable report"""
if not issues:
return "βœ… No issues found!"
stats = IssuePrioritizer.get_statistics(issues)
report = f"""
# πŸ“Š Issue Priority Report
## πŸ“ˆ Summary
- **Total Issues**: {stats['total']}
- **Estimated Fix Time**: {stats['estimated_fix_time']}
- **Quick Wins**: {stats['quick_wins']} issues
## 🚨 By Severity
"""
severity_emojis = {
"critical": "πŸ”΄",
"high": "🟠",
"medium": "🟑",
"low": "πŸ”΅"
}
for severity in ["critical", "high", "medium", "low"]:
count = stats["by_severity"].get(severity, 0)
if count > 0:
emoji = severity_emojis.get(severity, "βšͺ")
report += f"- {emoji} **{severity.title()}**: {count}\n"
report += "\n## 🎯 By Impact\n"
impact_emojis = {
"security": "πŸ›‘οΈ",
"reliability": "⚑",
"performance": "πŸš€",
"maintainability": "πŸ”§",
"style": "🎨"
}
for impact in ["security", "reliability", "performance", "maintainability", "style"]:
count = stats["by_impact"].get(impact, 0)
if count > 0:
emoji = impact_emojis.get(impact, "πŸ“Œ")
report += f"- {emoji} **{impact.title()}**: {count}\n"
report += "\n## ⏱️ By Fix Effort\n"
effort_labels = {
"quick-fix": "⚑ Quick Fix (< 5 min)",
"medium-fix": "πŸ”§ Medium Fix (5-15 min)",
"major-refactor": "πŸ—οΈ Major Refactor (30+ min)"
}
for effort in ["quick-fix", "medium-fix", "major-refactor"]:
count = stats["by_effort"].get(effort, 0)
if count > 0:
label = effort_labels.get(effort, effort)
report += f"- {label}: {count}\n"
# Top 5 highest priority issues
report += "\n## πŸ”₯ Top Priority Issues\n\n"
for i, issue in enumerate(issues[:5], 1):
severity_emoji = severity_emojis.get(issue.get("priority_severity", "medium"), "βšͺ")
impact_emoji = impact_emojis.get(issue.get("impact_category", "style"), "πŸ“Œ")
line = issue.get("line", "?")
message = issue.get("message", "No description")
effort = issue.get("fix_effort", "medium-fix")
fix_time = issue.get("estimated_fix_time_minutes", "?")
report += f"{i}. {severity_emoji} {impact_emoji} **Line {line}**: {message}\n"
report += f" ⏱️ ~{fix_time} min to fix\n\n"
return report