Codelint-MCP / src /utils /duplication_detector.py
OsamaAliMid's picture
Add CodeLint MCP Premium Edition application
ec37394
"""
πŸ” Code Duplication Detector
Find copy-pasted code blocks and suggest refactoring
"""
from typing import Dict, List, Any, Set, Tuple
import re
import hashlib
from difflib import SequenceMatcher
import logging
logger = logging.getLogger(__name__)
class DuplicationDetector:
"""Detect code duplication and suggest DRY refactoring"""
def __init__(self, min_lines: int = 5, similarity_threshold: float = 0.85):
"""
Args:
min_lines: Minimum number of lines to consider as duplication
similarity_threshold: Similarity ratio (0-1) to flag as duplicate
"""
self.min_lines = min_lines
self.similarity_threshold = similarity_threshold
@staticmethod
def normalize_code(code: str) -> str:
"""Normalize code for comparison (remove comments, whitespace)"""
# Remove single-line comments
code = re.sub(r'//.*$', '', code, flags=re.MULTILINE)
code = re.sub(r'#.*$', '', code, flags=re.MULTILINE)
# Remove multi-line comments
code = re.sub(r'/\*.*?\*/', '', code, flags=re.DOTALL)
code = re.sub(r'""".*?"""', '', code, flags=re.DOTALL)
code = re.sub(r"'''.*?'''", '', code, flags=re.DOTALL)
# Normalize whitespace
code = re.sub(r'\s+', ' ', code)
return code.strip()
@staticmethod
def get_code_blocks(code: str, min_lines: int) -> List[Dict[str, Any]]:
"""Extract code blocks using efficient sliding window (fixed-size only)"""
lines = code.splitlines()
blocks = []
# Only use sliding windows of specific sizes to avoid O(n^4) explosion
# Check blocks of min_lines, min_lines*2, and min_lines*3
window_sizes = [min_lines, min_lines * 2, min_lines * 3]
for window_size in window_sizes:
if window_size > len(lines):
continue
for start in range(len(lines) - window_size + 1):
end = start + window_size
block_lines = lines[start:end]
block_code = '\n'.join(block_lines)
# Skip if mostly whitespace or comments
if len(block_code.strip()) < min_lines * 5:
continue
normalized = DuplicationDetector.normalize_code(block_code)
blocks.append({
"start_line": start + 1,
"end_line": end,
"code": block_code,
"normalized": normalized,
"hash": hashlib.md5(normalized.encode()).hexdigest()
})
return blocks
@staticmethod
def calculate_similarity(code1: str, code2: str) -> float:
"""Calculate similarity ratio between two code blocks"""
return SequenceMatcher(None, code1, code2).ratio()
@classmethod
def find_duplicates(cls, code: str, min_lines: int = 5,
similarity_threshold: float = 0.85) -> List[Dict[str, Any]]:
"""Find duplicate code blocks using efficient hash bucketing"""
blocks = cls.get_code_blocks(code, min_lines)
duplicates = []
seen_pairs: Set[Tuple[int, int]] = set()
# Group blocks by hash for faster comparison
hash_buckets: Dict[str, List[int]] = {}
for i, block in enumerate(blocks):
hash_key = block["hash"]
if hash_key not in hash_buckets:
hash_buckets[hash_key] = []
hash_buckets[hash_key].append(i)
# Only compare blocks with similar hashes or within same bucket
for i, block1 in enumerate(blocks):
# Check exact hash matches first (fastest)
for j in hash_buckets.get(block1["hash"], []):
if j <= i:
continue
block2 = blocks[j]
# Skip if we've already seen this pair
pair = (i, j)
if pair in seen_pairs:
continue
# Check if blocks overlap
if not (block1["end_line"] < block2["start_line"] or
block2["end_line"] < block1["start_line"]):
continue
# Exact hash match = 100% similar
seen_pairs.add(pair)
duplicates.append({
"block1": {
"start": block1["start_line"],
"end": block1["end_line"],
"code": block1["code"]
},
"block2": {
"start": block2["start_line"],
"end": block2["end_line"],
"code": block2["code"]
},
"similarity": 100.0,
"lines": block1["end_line"] - block1["start_line"],
"suggestion": cls.generate_refactor_suggestion(block1, block2)
})
# Sort by severity (longer duplicates first)
duplicates.sort(key=lambda x: x["lines"], reverse=True)
return duplicates
@staticmethod
def generate_refactor_suggestion(block1: Dict, block2: Dict) -> str:
"""Generate refactoring suggestion"""
lines = block1["end_line"] - block1["start_line"]
if lines > 20:
return "Extract to a separate module or class"
elif lines > 10:
return "Extract to a reusable function"
else:
return "Extract to a helper function"
@classmethod
def analyze_duplication(cls, code: str) -> Dict[str, Any]:
"""Full duplication analysis with fixed line counting"""
duplicates = cls.find_duplicates(code)
total_lines = len(code.splitlines())
# Count unique duplicated lines (avoid double counting)
duplicated_line_set = set()
for dup in duplicates:
# Add lines from both blocks
block1_lines = range(dup["block1"]["start"], dup["block1"]["end"])
block2_lines = range(dup["block2"]["start"], dup["block2"]["end"])
duplicated_line_set.update(block1_lines)
duplicated_line_set.update(block2_lines)
duplicated_lines = len(duplicated_line_set)
return {
"duplicates_found": len(duplicates),
"duplicated_lines": duplicated_lines,
"total_lines": total_lines,
"duplication_percentage": round(duplicated_lines / total_lines * 100, 1) if total_lines > 0 else 0,
"duplicates": duplicates,
"statistics": {
"total_lines": total_lines,
"duplicated_lines": duplicated_lines,
"duplication_percentage": f"{round(duplicated_lines / total_lines * 100, 1) if total_lines > 0 else 0}%"
},
"severity": cls.assess_severity(len(duplicates), duplicated_lines, total_lines)
}
@staticmethod
def assess_severity(count: int, dup_lines: int, total_lines: int) -> str:
"""Assess duplication severity"""
percentage = (dup_lines / total_lines * 100) if total_lines > 0 else 0
if percentage > 30 or count > 10:
return "critical"
elif percentage > 15 or count > 5:
return "high"
elif percentage > 5 or count > 2:
return "medium"
else:
return "low"
def format_duplication_report(analysis: Dict[str, Any]) -> str:
"""Format duplication analysis into readable report"""
count = analysis.get("duplicates_found", 0)
dup_lines = analysis.get("duplicated_lines", 0)
total = analysis.get("total_lines", 0)
percentage = analysis.get("duplication_percentage", 0)
severity = analysis.get("severity", "low")
severity_emoji = {
"critical": "πŸ”΄",
"high": "🟠",
"medium": "🟑",
"low": "🟒"
}
report = f"""
# πŸ” Code Duplication Report
## πŸ“Š Summary
{severity_emoji.get(severity, 'βšͺ')} **Severity**: {severity.title()}
- **Duplicates Found**: {count}
- **Duplicated Lines**: {dup_lines} / {total} ({percentage}%)
"""
if count == 0:
report += "βœ… **No significant code duplication detected!**\n"
return report
duplicates = analysis.get("duplicates", [])
report += "## πŸ”„ Duplicate Blocks\n\n"
for i, dup in enumerate(duplicates[:5], 1): # Show top 5
lines = dup.get("lines")
similarity = dup.get("similarity")
block1 = dup.get("block1", {})
block2 = dup.get("block2", {})
suggestion = dup.get("suggestion")
report += f"### {i}. Duplicate Block ({lines} lines, {similarity}% similar)\n\n"
report += f"**Location 1**: Lines {block1.get('start')}-{block1.get('end')}\n"
report += f"**Location 2**: Lines {block2.get('start')}-{block2.get('end')}\n\n"
report += f"πŸ’‘ **Suggestion**: {suggestion}\n\n"
# Show first few lines
code_preview = block1.get('code', '').splitlines()[:3]
report += "**Preview**:\n```\n"
report += '\n'.join(code_preview)
report += "\n...\n```\n\n"
report += "\n## 🎯 Recommendations\n"
report += "- Apply DRY (Don't Repeat Yourself) principle\n"
report += "- Extract common logic to reusable functions\n"
report += "- Consider using design patterns (Template, Strategy, etc.)\n"
return report