Secure_IaC / src /analyzer.py
Waseem-k's picture
Upload 9 files
f4ff0e8 verified
Raw
History Blame Contribute Delete
9.11 kB
"""
Core security analyzer for Infrastructure as Code files
"""
import re
from typing import List
from models import SecurityVulnerability
from security_rules import get_all_security_rules
class IaCSecurityAnalyzer:
"""Main security analyzer class"""
def __init__(self):
self.security_rules = get_all_security_rules()
def analyze_code(self, code: str, iac_type: str) -> List[SecurityVulnerability]:
"""Analyze IaC code for security vulnerabilities with improved context awareness"""
vulnerabilities = []
rules = self.security_rules.get(iac_type, [])
# Remove comments and normalize whitespace
lines = code.split('\n')
clean_lines = []
for line in lines:
# Remove inline comments but preserve the line structure
if '#' in line:
line_without_comment = line.split('#')[0].rstrip()
else:
line_without_comment = line
clean_lines.append(line_without_comment)
# Join lines to analyze blocks properly
clean_code = '\n'.join(clean_lines)
for rule in rules:
vulnerabilities.extend(
self._analyze_rule(rule, clean_code, lines, clean_lines, iac_type)
)
# Remove duplicates based on rule_id and line_number
seen = set()
unique_vulnerabilities = []
for vuln in vulnerabilities:
key = (vuln.rule_id, vuln.line_number)
if key not in seen:
seen.add(key)
unique_vulnerabilities.append(vuln)
# Sort by severity and CVSS score
severity_order = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1}
unique_vulnerabilities.sort(
key=lambda x: (severity_order.get(x.severity, 0), x.cvss_score or 0),
reverse=True
)
return unique_vulnerabilities
def _analyze_rule(self, rule, clean_code: str, lines: List[str],
clean_lines: List[str], iac_type: str) -> List[SecurityVulnerability]:
"""Analyze a specific security rule against the code"""
vulnerabilities = []
if rule.rule_id == "TF002": # Special handling for EBS encryption
vulnerabilities.extend(
self._check_ebs_encryption(rule, clean_code, lines)
)
elif rule.rule_id == "TF003": # Special handling for security groups
vulnerabilities.extend(
self._check_security_groups(rule, clean_code, lines)
)
elif rule.rule_id == "TF004": # Special handling for RDS encryption
vulnerabilities.extend(
self._check_rds_encryption(rule, clean_code, lines)
)
else: # Default pattern matching for other rules
vulnerabilities.extend(
self._check_default_patterns(rule, lines, clean_lines, iac_type)
)
return vulnerabilities
def _check_ebs_encryption(self, rule, clean_code: str, lines: List[str]) -> List[SecurityVulnerability]:
"""Check for EBS encryption issues"""
vulnerabilities = []
# Check for resource blocks that don't have encryption enabled
ebs_volume_pattern = r'resource\s+["\']aws_ebs_volume["\'][^{]*{([^{}]*(?:{[^{}]*}[^{}]*)*)[^{}]*}'
ebs_instance_pattern = r'ebs_block_device\s*{([^{}]*(?:{[^{}]*}[^{}]*)*)[^{}]*}'
for match_pattern, resource_type in [(ebs_volume_pattern, "aws_ebs_volume"),
(ebs_instance_pattern, "ebs_block_device")]:
matches = re.finditer(match_pattern, clean_code, re.DOTALL | re.IGNORECASE)
for match in matches:
block_content = match.group(1)
if not re.search(r'encrypted\s*=\s*true', block_content, re.IGNORECASE):
line_start = clean_code[:match.start()].count('\n') + 1
context_line = lines[line_start - 1] if line_start <= len(lines) else ""
vulnerability = SecurityVulnerability(
title=rule.title,
severity=rule.severity,
description=rule.description,
line_number=line_start,
context=context_line.strip(),
recommendation=rule.recommendation,
fix_example=rule.fix_example,
rule_id=rule.rule_id,
category=rule.category,
cwe_id=rule.cwe_id,
cvss_score=rule.cvss_score
)
vulnerabilities.append(vulnerability)
return vulnerabilities
def _check_security_groups(self, rule, clean_code: str, lines: List[str]) -> List[SecurityVulnerability]:
"""Check for security group issues"""
vulnerabilities = []
# Only flag ingress rules with 0.0.0.0/0
ingress_pattern = r'ingress\s*{([^{}]*(?:{[^{}]*}[^{}]*)*)[^{}]*}'
matches = re.finditer(ingress_pattern, clean_code, re.DOTALL | re.IGNORECASE)
for match in matches:
ingress_block = match.group(1)
if re.search(r'cidr_blocks\s*=\s*\[[^]]*["\']0\.0\.0\.0/0["\']', ingress_block, re.IGNORECASE):
line_start = clean_code[:match.start()].count('\n') + 1
context_line = lines[line_start - 1] if line_start <= len(lines) else ""
vulnerability = SecurityVulnerability(
title=rule.title,
severity=rule.severity,
description=rule.description,
line_number=line_start,
context=context_line.strip(),
recommendation=rule.recommendation,
fix_example=rule.fix_example,
rule_id=rule.rule_id,
category=rule.category,
cwe_id=rule.cwe_id,
cvss_score=rule.cvss_score
)
vulnerabilities.append(vulnerability)
return vulnerabilities
def _check_rds_encryption(self, rule, clean_code: str, lines: List[str]) -> List[SecurityVulnerability]:
"""Check for RDS encryption issues"""
vulnerabilities = []
rds_pattern = r'resource\s+["\']aws_db_instance["\'][^{]*{([^{}]*(?:{[^{}]*}[^{}]*)*)[^{}]*}'
matches = re.finditer(rds_pattern, clean_code, re.DOTALL | re.IGNORECASE)
for match in matches:
rds_block = match.group(1)
if not re.search(r'storage_encrypted\s*=\s*true', rds_block, re.IGNORECASE):
line_start = clean_code[:match.start()].count('\n') + 1
context_line = lines[line_start - 1] if line_start <= len(lines) else ""
vulnerability = SecurityVulnerability(
title=rule.title,
severity=rule.severity,
description=rule.description,
line_number=line_start,
context=context_line.strip(),
recommendation=rule.recommendation,
fix_example=rule.fix_example,
rule_id=rule.rule_id,
category=rule.category,
cwe_id=rule.cwe_id,
cvss_score=rule.cvss_score
)
vulnerabilities.append(vulnerability)
return vulnerabilities
def _check_default_patterns(self, rule, lines: List[str], clean_lines: List[str],
iac_type: str) -> List[SecurityVulnerability]:
"""Check default pattern matching for rules"""
vulnerabilities = []
for pattern in rule.patterns:
for line_num, line in enumerate(lines, 1):
clean_line = clean_lines[line_num - 1]
if clean_line.strip() and re.search(pattern, clean_line, re.IGNORECASE):
vulnerability = SecurityVulnerability(
title=rule.title,
severity=rule.severity,
description=rule.description,
line_number=line_num,
context=line.strip(),
recommendation=rule.recommendation,
fix_example=rule.fix_example,
rule_id=rule.rule_id,
category=rule.category,
cwe_id=rule.cwe_id,
cvss_score=rule.cvss_score
)
vulnerabilities.append(vulnerability)
return vulnerabilities