Spaces:
Sleeping
Sleeping
| """ | |
| Core security analyzer for Infrastructure as Code files | |
| """ | |
| import re | |
| from typing import List | |
| from models import SecurityVulnerability | |
| from security_rules import get_all_security_rules | |
| class IaCSecurityAnalyzer: | |
| """Main security analyzer class""" | |
| def __init__(self): | |
| self.security_rules = get_all_security_rules() | |
| def analyze_code(self, code: str, iac_type: str) -> List[SecurityVulnerability]: | |
| """Analyze IaC code for security vulnerabilities with improved context awareness""" | |
| vulnerabilities = [] | |
| rules = self.security_rules.get(iac_type, []) | |
| # Remove comments and normalize whitespace | |
| lines = code.split('\n') | |
| clean_lines = [] | |
| for line in lines: | |
| # Remove inline comments but preserve the line structure | |
| if '#' in line: | |
| line_without_comment = line.split('#')[0].rstrip() | |
| else: | |
| line_without_comment = line | |
| clean_lines.append(line_without_comment) | |
| # Join lines to analyze blocks properly | |
| clean_code = '\n'.join(clean_lines) | |
| for rule in rules: | |
| vulnerabilities.extend( | |
| self._analyze_rule(rule, clean_code, lines, clean_lines, iac_type) | |
| ) | |
| # Remove duplicates based on rule_id and line_number | |
| seen = set() | |
| unique_vulnerabilities = [] | |
| for vuln in vulnerabilities: | |
| key = (vuln.rule_id, vuln.line_number) | |
| if key not in seen: | |
| seen.add(key) | |
| unique_vulnerabilities.append(vuln) | |
| # Sort by severity and CVSS score | |
| severity_order = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1} | |
| unique_vulnerabilities.sort( | |
| key=lambda x: (severity_order.get(x.severity, 0), x.cvss_score or 0), | |
| reverse=True | |
| ) | |
| return unique_vulnerabilities | |
| def _analyze_rule(self, rule, clean_code: str, lines: List[str], | |
| clean_lines: List[str], iac_type: str) -> List[SecurityVulnerability]: | |
| """Analyze a specific security rule against the code""" | |
| vulnerabilities = [] | |
| if rule.rule_id == "TF002": # Special handling for EBS encryption | |
| vulnerabilities.extend( | |
| self._check_ebs_encryption(rule, clean_code, lines) | |
| ) | |
| elif rule.rule_id == "TF003": # Special handling for security groups | |
| vulnerabilities.extend( | |
| self._check_security_groups(rule, clean_code, lines) | |
| ) | |
| elif rule.rule_id == "TF004": # Special handling for RDS encryption | |
| vulnerabilities.extend( | |
| self._check_rds_encryption(rule, clean_code, lines) | |
| ) | |
| else: # Default pattern matching for other rules | |
| vulnerabilities.extend( | |
| self._check_default_patterns(rule, lines, clean_lines, iac_type) | |
| ) | |
| return vulnerabilities | |
| def _check_ebs_encryption(self, rule, clean_code: str, lines: List[str]) -> List[SecurityVulnerability]: | |
| """Check for EBS encryption issues""" | |
| vulnerabilities = [] | |
| # Check for resource blocks that don't have encryption enabled | |
| ebs_volume_pattern = r'resource\s+["\']aws_ebs_volume["\'][^{]*{([^{}]*(?:{[^{}]*}[^{}]*)*)[^{}]*}' | |
| ebs_instance_pattern = r'ebs_block_device\s*{([^{}]*(?:{[^{}]*}[^{}]*)*)[^{}]*}' | |
| for match_pattern, resource_type in [(ebs_volume_pattern, "aws_ebs_volume"), | |
| (ebs_instance_pattern, "ebs_block_device")]: | |
| matches = re.finditer(match_pattern, clean_code, re.DOTALL | re.IGNORECASE) | |
| for match in matches: | |
| block_content = match.group(1) | |
| if not re.search(r'encrypted\s*=\s*true', block_content, re.IGNORECASE): | |
| line_start = clean_code[:match.start()].count('\n') + 1 | |
| context_line = lines[line_start - 1] if line_start <= len(lines) else "" | |
| vulnerability = SecurityVulnerability( | |
| title=rule.title, | |
| severity=rule.severity, | |
| description=rule.description, | |
| line_number=line_start, | |
| context=context_line.strip(), | |
| recommendation=rule.recommendation, | |
| fix_example=rule.fix_example, | |
| rule_id=rule.rule_id, | |
| category=rule.category, | |
| cwe_id=rule.cwe_id, | |
| cvss_score=rule.cvss_score | |
| ) | |
| vulnerabilities.append(vulnerability) | |
| return vulnerabilities | |
| def _check_security_groups(self, rule, clean_code: str, lines: List[str]) -> List[SecurityVulnerability]: | |
| """Check for security group issues""" | |
| vulnerabilities = [] | |
| # Only flag ingress rules with 0.0.0.0/0 | |
| ingress_pattern = r'ingress\s*{([^{}]*(?:{[^{}]*}[^{}]*)*)[^{}]*}' | |
| matches = re.finditer(ingress_pattern, clean_code, re.DOTALL | re.IGNORECASE) | |
| for match in matches: | |
| ingress_block = match.group(1) | |
| if re.search(r'cidr_blocks\s*=\s*\[[^]]*["\']0\.0\.0\.0/0["\']', ingress_block, re.IGNORECASE): | |
| line_start = clean_code[:match.start()].count('\n') + 1 | |
| context_line = lines[line_start - 1] if line_start <= len(lines) else "" | |
| vulnerability = SecurityVulnerability( | |
| title=rule.title, | |
| severity=rule.severity, | |
| description=rule.description, | |
| line_number=line_start, | |
| context=context_line.strip(), | |
| recommendation=rule.recommendation, | |
| fix_example=rule.fix_example, | |
| rule_id=rule.rule_id, | |
| category=rule.category, | |
| cwe_id=rule.cwe_id, | |
| cvss_score=rule.cvss_score | |
| ) | |
| vulnerabilities.append(vulnerability) | |
| return vulnerabilities | |
| def _check_rds_encryption(self, rule, clean_code: str, lines: List[str]) -> List[SecurityVulnerability]: | |
| """Check for RDS encryption issues""" | |
| vulnerabilities = [] | |
| rds_pattern = r'resource\s+["\']aws_db_instance["\'][^{]*{([^{}]*(?:{[^{}]*}[^{}]*)*)[^{}]*}' | |
| matches = re.finditer(rds_pattern, clean_code, re.DOTALL | re.IGNORECASE) | |
| for match in matches: | |
| rds_block = match.group(1) | |
| if not re.search(r'storage_encrypted\s*=\s*true', rds_block, re.IGNORECASE): | |
| line_start = clean_code[:match.start()].count('\n') + 1 | |
| context_line = lines[line_start - 1] if line_start <= len(lines) else "" | |
| vulnerability = SecurityVulnerability( | |
| title=rule.title, | |
| severity=rule.severity, | |
| description=rule.description, | |
| line_number=line_start, | |
| context=context_line.strip(), | |
| recommendation=rule.recommendation, | |
| fix_example=rule.fix_example, | |
| rule_id=rule.rule_id, | |
| category=rule.category, | |
| cwe_id=rule.cwe_id, | |
| cvss_score=rule.cvss_score | |
| ) | |
| vulnerabilities.append(vulnerability) | |
| return vulnerabilities | |
| def _check_default_patterns(self, rule, lines: List[str], clean_lines: List[str], | |
| iac_type: str) -> List[SecurityVulnerability]: | |
| """Check default pattern matching for rules""" | |
| vulnerabilities = [] | |
| for pattern in rule.patterns: | |
| for line_num, line in enumerate(lines, 1): | |
| clean_line = clean_lines[line_num - 1] | |
| if clean_line.strip() and re.search(pattern, clean_line, re.IGNORECASE): | |
| vulnerability = SecurityVulnerability( | |
| title=rule.title, | |
| severity=rule.severity, | |
| description=rule.description, | |
| line_number=line_num, | |
| context=line.strip(), | |
| recommendation=rule.recommendation, | |
| fix_example=rule.fix_example, | |
| rule_id=rule.rule_id, | |
| category=rule.category, | |
| cwe_id=rule.cwe_id, | |
| cvss_score=rule.cvss_score | |
| ) | |
| vulnerabilities.append(vulnerability) | |
| return vulnerabilities |