#!/usr/bin/env python3 """ Code Quality Improvement Service Automated technical debt reduction and code smell remediation """ import ast import logging import os from collections.abc import Callable from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any logger = logging.getLogger(__name__) class CodeSmell(Enum): LONG_METHOD = "long_method" LARGE_CLASS = "large_class" DUPLICATE_CODE = "duplicate_code" COMPLEX_CONDITIONAL = "complex_conditional" LONG_PARAMETER_LIST = "long_parameter_list" DATA_CLASS = "data_class" FEATURE_ENVY = "feature_envy" MESSAGE_CHAIN = "message_chain" MIDDLE_MAN = "middle_man" INAPPROPRIATE_INTIMACY = "inappropriate_intimacy" class DebtPriority(Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" @dataclass class CodeIssue: """Represents a code quality issue""" issue_id: str file_path: str line_number: int smell_type: CodeSmell severity: DebtPriority description: str code_snippet: str estimated_effort: str # "quick_fix", "refactor", "major_rework" automated_fix_available: bool impact_score: float # 0.0 to 1.0 identified_at: datetime @dataclass class RefactoringTask: """Represents a refactoring task""" task_id: str issue_ids: list[str] title: str description: str priority: DebtPriority estimated_effort_days: float status: str # "pending", "in_progress", "completed", "blocked" assigned_to: str | None = None created_at: datetime = field(default_factory=datetime.now) completed_at: datetime | None = None automated: bool = False @dataclass class CodeQualityMetrics: """Code quality metrics""" total_lines: int cyclomatic_complexity_avg: float duplication_percentage: float test_coverage: float technical_debt_ratio: float maintainability_index: float issues_count: int issues_fixed: int class CodeQualityImprovementService: """Automated code quality improvement and technical debt reduction""" def __init__(self): self.code_issues: dict[str, CodeIssue] = {} self.refactoring_tasks: dict[str, RefactoringTask] = {} self.code_metrics: dict[str, CodeQualityMetrics] = {} self.automated_fixes: dict[CodeSmell, Callable] = {} self._initialize_automated_fixes() self._setup_code_analysis() def _initialize_automated_fixes(self): """Initialize automated code fixes""" self.automated_fixes = { CodeSmell.LONG_METHOD: self._fix_long_method, CodeSmell.DUPLICATE_CODE: self._fix_duplicate_code, CodeSmell.LONG_PARAMETER_LIST: self._fix_long_parameter_list, CodeSmell.COMPLEX_CONDITIONAL: self._fix_complex_conditional, } def _setup_code_analysis(self): """Setup code analysis tools""" self.analysis_rules = { "max_method_length": 30, "max_class_length": 300, "max_parameters": 5, "max_complexity": 10, "duplicate_threshold": 0.8, # 80% similarity } async def analyze_codebase( self, root_path: str = "/Users/Arief/Desktop/Zenith" ) -> dict[str, Any]: """Comprehensive codebase analysis""" logger.info(f"Starting codebase analysis for: {root_path}") analysis_results = { "files_analyzed": 0, "issues_found": 0, "automated_fixes_available": 0, "technical_debt_estimate": 0, "issues_by_type": {}, "issues_by_severity": {}, } # Find Python files python_files = [] for root, dirs, files in os.walk(root_path): # Skip certain directories dirs[:] = [ d for d in dirs if not d.startswith(".") and d not in ["node_modules", "__pycache__", ".git"] ] for file in files: if file.endswith(".py"): python_files.append(os.path.join(root, file)) logger.info(f"Found {len(python_files)} Python files to analyze") for file_path in python_files[:50]: # Limit for performance try: issues = await self._analyze_file(file_path) analysis_results["files_analyzed"] += 1 for issue in issues: self.code_issues[issue.issue_id] = issue analysis_results["issues_found"] += 1 if issue.automated_fix_available: analysis_results["automated_fixes_available"] += 1 # Categorize issues issue_type = issue.smell_type.value severity = issue.severity.value analysis_results["issues_by_type"][issue_type] = ( analysis_results["issues_by_type"].get(issue_type, 0) + 1 ) analysis_results["issues_by_severity"][severity] = ( analysis_results["issues_by_severity"].get(severity, 0) + 1 ) # Estimate technical debt effort_multiplier = { "quick_fix": 0.5, "refactor": 2, "major_rework": 5, } analysis_results[ "technical_debt_estimate" ] += issue.impact_score * effort_multiplier.get( issue.estimated_effort, 1 ) except Exception as e: logger.error(f"Failed to analyze {file_path}: {e}") # Calculate overall metrics analysis_results["technical_debt_hours"] = analysis_results[ "technical_debt_estimate" ] analysis_results["code_quality_score"] = max( 0, 100 - (analysis_results["issues_found"] * 2) ) logger.info( f"Analysis complete: {analysis_results['issues_found']} issues found in {analysis_results['files_analyzed']} files" ) return analysis_results async def _analyze_file(self, file_path: str) -> list[CodeIssue]: """Analyze a single Python file for code smells""" issues = [] try: with open(file_path, encoding="utf-8") as f: content = f.read() lines = content.split("\n") tree = ast.parse(content, file_path) # Analyze AST for various smells issues.extend(self._detect_long_methods(tree, file_path, lines)) issues.extend(self._detect_large_classes(tree, file_path, lines)) issues.extend(self._detect_long_parameter_lists(tree, file_path, lines)) issues.extend(self._detect_complex_conditionals(tree, file_path, lines)) # Text-based analysis issues.extend(self._detect_duplicate_code(content, file_path, lines)) except SyntaxError: logger.warning(f"Syntax error in {file_path}, skipping AST analysis") except Exception as e: logger.error(f"Error analyzing {file_path}: {e}") return issues def _detect_long_methods( self, tree: ast.AST, file_path: str, lines: list[str] ) -> list[CodeIssue]: """Detect methods that are too long""" issues = [] for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): method_length = node.end_lineno - node.lineno if method_length > self.analysis_rules["max_method_length"]: severity = ( DebtPriority.HIGH if method_length > 50 else DebtPriority.MEDIUM ) issue = CodeIssue( issue_id=f"long_method_{file_path}_{node.lineno}", file_path=file_path, line_number=node.lineno, smell_type=CodeSmell.LONG_METHOD, severity=severity, description=f"Method '{node.name}' is {method_length} lines long (max recommended: {self.analysis_rules['max_method_length']})", code_snippet="\n".join( lines[node.lineno - 1 : node.lineno + 5] ), estimated_effort="refactor", automated_fix_available=True, impact_score=min(1.0, method_length / 100), identified_at=datetime.now(), ) issues.append(issue) return issues def _detect_large_classes( self, tree: ast.AST, file_path: str, lines: list[str] ) -> list[CodeIssue]: """Detect classes that are too large""" issues = [] for node in ast.walk(tree): if isinstance(node, ast.ClassDef): class_length = node.end_lineno - node.lineno if class_length > self.analysis_rules["max_class_length"]: issue = CodeIssue( issue_id=f"large_class_{file_path}_{node.lineno}", file_path=file_path, line_number=node.lineno, smell_type=CodeSmell.LARGE_CLASS, severity=DebtPriority.HIGH, description=f"Class '{node.name}' is {class_length} lines long (max recommended: {self.analysis_rules['max_class_length']})", code_snippet="\n".join( lines[node.lineno - 1 : node.lineno + 3] ), estimated_effort="major_rework", automated_fix_available=False, impact_score=min(1.0, class_length / 500), identified_at=datetime.now(), ) issues.append(issue) return issues def _detect_long_parameter_lists( self, tree: ast.AST, file_path: str, lines: list[str] ) -> list[CodeIssue]: """Detect functions with too many parameters""" issues = [] for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): param_count = len(node.args.args) if param_count > self.analysis_rules["max_parameters"]: issue = CodeIssue( issue_id=f"long_params_{file_path}_{node.lineno}", file_path=file_path, line_number=node.lineno, smell_type=CodeSmell.LONG_PARAMETER_LIST, severity=DebtPriority.MEDIUM, description=f"Function '{node.name}' has {param_count} parameters (max recommended: {self.analysis_rules['max_parameters']})", code_snippet="\n".join( lines[node.lineno - 1 : node.lineno + 2] ), estimated_effort="refactor", automated_fix_available=True, impact_score=min(1.0, param_count / 10), identified_at=datetime.now(), ) issues.append(issue) return issues def _detect_complex_conditionals( self, tree: ast.AST, file_path: str, lines: list[str] ) -> list[CodeIssue]: """Detect complex conditional statements""" issues = [] for node in ast.walk(tree): if isinstance(node, ast.If): # Calculate complexity based on nested conditions complexity = self._calculate_conditional_complexity(node) if complexity > self.analysis_rules["max_complexity"]: issue = CodeIssue( issue_id=f"complex_conditional_{file_path}_{node.lineno}", file_path=file_path, line_number=node.lineno, smell_type=CodeSmell.COMPLEX_CONDITIONAL, severity=DebtPriority.MEDIUM, description=f"Complex conditional with complexity score {complexity} (max recommended: {self.analysis_rules['max_complexity']})", code_snippet="\n".join( lines[node.lineno - 1 : node.lineno + 3] ), estimated_effort="refactor", automated_fix_available=True, impact_score=min(1.0, complexity / 20), identified_at=datetime.now(), ) issues.append(issue) return issues def _calculate_conditional_complexity(self, node: ast.If, depth: int = 1) -> int: """Calculate complexity of conditional statement""" complexity = depth # Check for and/or operators if hasattr(node.test, "left"): complexity += 1 # Check nested if statements if node.orelse: for child in node.orelse: if isinstance(child, ast.If): complexity += self._calculate_conditional_complexity( child, depth + 1 ) return complexity def _detect_duplicate_code( self, content: str, file_path: str, lines: list[str] ) -> list[CodeIssue]: """Detect duplicate code blocks""" issues = [] # Simple duplicate detection - check for repeated code blocks code_blocks = [] for i, line in enumerate(lines): if line.strip() and not line.strip().startswith("#"): # Extract code blocks of 3-5 lines if i + 2 < len(lines): block = "\n".join(lines[i : i + 3]).strip() if len(block) > 20: # Minimum block size code_blocks.append((i + 1, block)) # Find duplicates seen_blocks = {} for line_num, block in code_blocks: if block in seen_blocks: # Found duplicate original_line = seen_blocks[block] issue = CodeIssue( issue_id=f"duplicate_code_{file_path}_{line_num}", file_path=file_path, line_number=line_num, smell_type=CodeSmell.DUPLICATE_CODE, severity=DebtPriority.MEDIUM, description=f"Duplicate code block found (original at line {original_line})", code_snippet=block[:100] + "..." if len(block) > 100 else block, estimated_effort="refactor", automated_fix_available=True, impact_score=0.6, identified_at=datetime.now(), ) issues.append(issue) else: seen_blocks[block] = line_num return issues async def apply_automated_fixes(self) -> dict[str, Any]: """Apply all available automated fixes""" results = { "fixes_attempted": 0, "fixes_successful": 0, "fixes_failed": 0, "issues_resolved": [], } for issue in self.code_issues.values(): if issue.automated_fix_available and not self._is_issue_resolved(issue): results["fixes_attempted"] += 1 try: success = await self._apply_fix(issue) if success: results["fixes_successful"] += 1 results["issues_resolved"].append(issue.issue_id) logger.info(f"Successfully fixed issue: {issue.issue_id}") else: results["fixes_failed"] += 1 logger.warning(f"Failed to fix issue: {issue.issue_id}") except Exception as e: results["fixes_failed"] += 1 logger.error(f"Error fixing issue {issue.issue_id}: {e}") return results async def _apply_fix(self, issue: CodeIssue) -> bool: """Apply automated fix for a specific issue""" if issue.smell_type in self.automated_fixes: return await self.automated_fixes[issue.smell_type](issue) return False async def _fix_long_method(self, issue: CodeIssue) -> bool: """Apply automated fix for long method""" # This would require more sophisticated code analysis and transformation # For now, we'll create a refactoring task task = RefactoringTask( task_id=f"refactor_{issue.issue_id}", issue_ids=[issue.issue_id], title=f"Refactor long method: {issue.description}", description="Break down long method into smaller, focused functions", priority=issue.severity, estimated_effort_days=2.0, status="pending", automated=False, ) self.refactoring_tasks[task.task_id] = task return True async def _fix_duplicate_code(self, issue: CodeIssue) -> bool: """Apply automated fix for duplicate code""" # Extract common functionality to a utility function task = RefactoringTask( task_id=f"refactor_{issue.issue_id}", issue_ids=[issue.issue_id], title=f"Extract duplicate code: {issue.description}", description="Create utility function for duplicated code block", priority=issue.severity, estimated_effort_days=1.0, status="pending", automated=False, ) self.refactoring_tasks[task.task_id] = task return True async def _fix_long_parameter_list(self, issue: CodeIssue) -> bool: """Apply automated fix for long parameter list""" # Introduce parameter object pattern task = RefactoringTask( task_id=f"refactor_{issue.issue_id}", issue_ids=[issue.issue_id], title=f"Refactor parameter list: {issue.description}", description="Introduce parameter object to reduce parameter count", priority=issue.severity, estimated_effort_days=1.5, status="pending", automated=False, ) self.refactoring_tasks[task.task_id] = task return True async def _fix_complex_conditional(self, issue: CodeIssue) -> bool: """Apply automated fix for complex conditional""" # Extract method or use strategy pattern task = RefactoringTask( task_id=f"refactor_{issue.issue_id}", issue_ids=[issue.issue_id], title=f"Simplify complex conditional: {issue.description}", description="Extract conditional logic into separate method or use strategy pattern", priority=issue.severity, estimated_effort_days=1.5, status="pending", automated=False, ) self.refactoring_tasks[task.task_id] = task return True def _is_issue_resolved(self, issue: CodeIssue) -> bool: """Check if an issue has been resolved""" # In production, this would check if the issue still exists in the code # For now, assume issues are not resolved return False async def generate_refactoring_plan(self) -> dict[str, Any]: """Generate comprehensive refactoring plan""" plan = { "total_issues": len(self.code_issues), "automated_fixes": len( [i for i in self.code_issues.values() if i.automated_fix_available] ), "refactoring_tasks": len(self.refactoring_tasks), "estimated_effort_days": sum( t.estimated_effort_days for t in self.refactoring_tasks.values() ), "tasks_by_priority": {}, "tasks_by_type": {}, } # Group tasks by priority and type for task in self.refactoring_tasks.values(): priority = task.priority.value plan["tasks_by_priority"][priority] = ( plan["tasks_by_priority"].get(priority, 0) + 1 ) # Determine task type from title if "duplicate" in task.title.lower(): task_type = "duplicate_code" elif "parameter" in task.title.lower(): task_type = "parameter_refactor" elif "conditional" in task.title.lower(): task_type = "conditional_simplify" elif "method" in task.title.lower(): task_type = "method_refactor" else: task_type = "general_refactor" plan["tasks_by_type"][task_type] = ( plan["tasks_by_type"].get(task_type, 0) + 1 ) return plan def get_quality_dashboard(self) -> dict[str, Any]: """Get code quality dashboard""" total_issues = len(self.code_issues) resolved_issues = len( [i for i in self.code_issues.values() if self._is_issue_resolved(i)] ) return { "total_issues": total_issues, "resolved_issues": resolved_issues, "resolution_rate": ( resolved_issues / total_issues if total_issues > 0 else 0 ), "refactoring_tasks": len(self.refactoring_tasks), "issues_by_severity": self._get_issues_by_severity(), "issues_by_type": self._get_issues_by_type(), "estimated_debt_hours": sum( i.impact_score * 8 for i in self.code_issues.values() ), # Rough estimate } def _get_issues_by_severity(self) -> dict[str, int]: """Get issues count by severity""" severities = {} for issue in self.code_issues.values(): severity = issue.severity.value severities[severity] = severities.get(severity, 0) + 1 return severities def _get_issues_by_type(self) -> dict[str, int]: """Get issues count by type""" types = {} for issue in self.code_issues.values(): smell_type = issue.smell_type.value types[smell_type] = types.get(smell_type, 0) + 1 return types # Global instance code_quality_improvement = CodeQualityImprovementService()