""" 🔍 Code Duplication Detector Find copy-pasted code blocks and suggest refactoring """ from typing import Dict, List, Any, Set, Tuple import re import hashlib from difflib import SequenceMatcher import logging logger = logging.getLogger(__name__) class DuplicationDetector: """Detect code duplication and suggest DRY refactoring""" def __init__(self, min_lines: int = 5, similarity_threshold: float = 0.85): """ Args: min_lines: Minimum number of lines to consider as duplication similarity_threshold: Similarity ratio (0-1) to flag as duplicate """ self.min_lines = min_lines self.similarity_threshold = similarity_threshold @staticmethod def normalize_code(code: str) -> str: """Normalize code for comparison (remove comments, whitespace)""" # Remove single-line comments code = re.sub(r'//.*$', '', code, flags=re.MULTILINE) code = re.sub(r'#.*$', '', code, flags=re.MULTILINE) # Remove multi-line comments code = re.sub(r'/\*.*?\*/', '', code, flags=re.DOTALL) code = re.sub(r'""".*?"""', '', code, flags=re.DOTALL) code = re.sub(r"'''.*?'''", '', code, flags=re.DOTALL) # Normalize whitespace code = re.sub(r'\s+', ' ', code) return code.strip() @staticmethod def get_code_blocks(code: str, min_lines: int) -> List[Dict[str, Any]]: """Extract code blocks using efficient sliding window (fixed-size only)""" lines = code.splitlines() blocks = [] # Only use sliding windows of specific sizes to avoid O(n^4) explosion # Check blocks of min_lines, min_lines*2, and min_lines*3 window_sizes = [min_lines, min_lines * 2, min_lines * 3] for window_size in window_sizes: if window_size > len(lines): continue for start in range(len(lines) - window_size + 1): end = start + window_size block_lines = lines[start:end] block_code = '\n'.join(block_lines) # Skip if mostly whitespace or comments if len(block_code.strip()) < min_lines * 5: continue normalized = DuplicationDetector.normalize_code(block_code) blocks.append({ "start_line": start + 1, "end_line": end, "code": block_code, "normalized": normalized, "hash": hashlib.md5(normalized.encode()).hexdigest() }) return blocks @staticmethod def calculate_similarity(code1: str, code2: str) -> float: """Calculate similarity ratio between two code blocks""" return SequenceMatcher(None, code1, code2).ratio() @classmethod def find_duplicates(cls, code: str, min_lines: int = 5, similarity_threshold: float = 0.85) -> List[Dict[str, Any]]: """Find duplicate code blocks using efficient hash bucketing""" blocks = cls.get_code_blocks(code, min_lines) duplicates = [] seen_pairs: Set[Tuple[int, int]] = set() # Group blocks by hash for faster comparison hash_buckets: Dict[str, List[int]] = {} for i, block in enumerate(blocks): hash_key = block["hash"] if hash_key not in hash_buckets: hash_buckets[hash_key] = [] hash_buckets[hash_key].append(i) # Only compare blocks with similar hashes or within same bucket for i, block1 in enumerate(blocks): # Check exact hash matches first (fastest) for j in hash_buckets.get(block1["hash"], []): if j <= i: continue block2 = blocks[j] # Skip if we've already seen this pair pair = (i, j) if pair in seen_pairs: continue # Check if blocks overlap if not (block1["end_line"] < block2["start_line"] or block2["end_line"] < block1["start_line"]): continue # Exact hash match = 100% similar seen_pairs.add(pair) duplicates.append({ "block1": { "start": block1["start_line"], "end": block1["end_line"], "code": block1["code"] }, "block2": { "start": block2["start_line"], "end": block2["end_line"], "code": block2["code"] }, "similarity": 100.0, "lines": block1["end_line"] - block1["start_line"], "suggestion": cls.generate_refactor_suggestion(block1, block2) }) # Sort by severity (longer duplicates first) duplicates.sort(key=lambda x: x["lines"], reverse=True) return duplicates @staticmethod def generate_refactor_suggestion(block1: Dict, block2: Dict) -> str: """Generate refactoring suggestion""" lines = block1["end_line"] - block1["start_line"] if lines > 20: return "Extract to a separate module or class" elif lines > 10: return "Extract to a reusable function" else: return "Extract to a helper function" @classmethod def analyze_duplication(cls, code: str) -> Dict[str, Any]: """Full duplication analysis with fixed line counting""" duplicates = cls.find_duplicates(code) total_lines = len(code.splitlines()) # Count unique duplicated lines (avoid double counting) duplicated_line_set = set() for dup in duplicates: # Add lines from both blocks block1_lines = range(dup["block1"]["start"], dup["block1"]["end"]) block2_lines = range(dup["block2"]["start"], dup["block2"]["end"]) duplicated_line_set.update(block1_lines) duplicated_line_set.update(block2_lines) duplicated_lines = len(duplicated_line_set) return { "duplicates_found": len(duplicates), "duplicated_lines": duplicated_lines, "total_lines": total_lines, "duplication_percentage": round(duplicated_lines / total_lines * 100, 1) if total_lines > 0 else 0, "duplicates": duplicates, "statistics": { "total_lines": total_lines, "duplicated_lines": duplicated_lines, "duplication_percentage": f"{round(duplicated_lines / total_lines * 100, 1) if total_lines > 0 else 0}%" }, "severity": cls.assess_severity(len(duplicates), duplicated_lines, total_lines) } @staticmethod def assess_severity(count: int, dup_lines: int, total_lines: int) -> str: """Assess duplication severity""" percentage = (dup_lines / total_lines * 100) if total_lines > 0 else 0 if percentage > 30 or count > 10: return "critical" elif percentage > 15 or count > 5: return "high" elif percentage > 5 or count > 2: return "medium" else: return "low" def format_duplication_report(analysis: Dict[str, Any]) -> str: """Format duplication analysis into readable report""" count = analysis.get("duplicates_found", 0) dup_lines = analysis.get("duplicated_lines", 0) total = analysis.get("total_lines", 0) percentage = analysis.get("duplication_percentage", 0) severity = analysis.get("severity", "low") severity_emoji = { "critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🟢" } report = f""" # 🔍 Code Duplication Report ## 📊 Summary {severity_emoji.get(severity, '⚪')} **Severity**: {severity.title()} - **Duplicates Found**: {count} - **Duplicated Lines**: {dup_lines} / {total} ({percentage}%) """ if count == 0: report += "✅ **No significant code duplication detected!**\n" return report duplicates = analysis.get("duplicates", []) report += "## 🔄 Duplicate Blocks\n\n" for i, dup in enumerate(duplicates[:5], 1): # Show top 5 lines = dup.get("lines") similarity = dup.get("similarity") block1 = dup.get("block1", {}) block2 = dup.get("block2", {}) suggestion = dup.get("suggestion") report += f"### {i}. Duplicate Block ({lines} lines, {similarity}% similar)\n\n" report += f"**Location 1**: Lines {block1.get('start')}-{block1.get('end')}\n" report += f"**Location 2**: Lines {block2.get('start')}-{block2.get('end')}\n\n" report += f"💡 **Suggestion**: {suggestion}\n\n" # Show first few lines code_preview = block1.get('code', '').splitlines()[:3] report += "**Preview**:\n```\n" report += '\n'.join(code_preview) report += "\n...\n```\n\n" report += "\n## 🎯 Recommendations\n" report += "- Apply DRY (Don't Repeat Yourself) principle\n" report += "- Extract common logic to reusable functions\n" report += "- Consider using design patterns (Template, Strategy, etc.)\n" return report