Auto-Guardian-Core / scripts /aggregate_results.py
AbdulElahGwaith's picture
Upload folder using huggingface_hub
7270c96 verified
# استخراج نتائج الفحص وتجميعها في ملف JSON واحد
import json
import os
from datetime import datetime
from pathlib import Path
def load_json_file(filepath, default=None):
"""تحميل ملف JSON مع معالجة الأخطاء"""
if os.path.exists(filepath):
try:
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return default if default is not None else []
return default if default is not None else []
def normalize_bandit_result(issue):
"""تطبيع نتيجة Bandit"""
return {
"tool": "bandit",
"tool_name": "Bandit",
"severity": issue.get("issue_severity", "MEDIUM").upper(),
"severity_level": get_severity_level(issue.get("issue_severity", "MEDIUM")),
"file": issue.get("filename", ""),
"line": issue.get("line_number", 0),
"column": issue.get("line_range", [0, 0])[0] if issue.get("line_range") else 0,
"rule_id": issue.get("test_id", ""),
"rule_name": issue.get("test_name", ""),
"message": issue.get("issue_text", ""),
"description": issue.get("issue_text", ""),
"confidence": issue.get("issue_confidence", "MEDIUM").upper(),
"remediation": issue.get("more_info", ""),
"code_snippet": "",
"language": "python"
}
def normalize_pylint_result(issue):
"""تطبيع نتيجة Pylint"""
return {
"tool": "pylint",
"tool_name": "Pylint",
"severity": map_pylint_severity(issue.get("type", "info")),
"severity_level": get_severity_level(map_pylint_severity(issue.get("type", "info"))),
"file": issue.get("path", ""),
"line": issue.get("line", 0),
"column": issue.get("column", 0),
"rule_id": issue.get("symbol", ""),
"rule_name": issue.get("message-id", ""),
"message": issue.get("message", ""),
"description": issue.get("message", ""),
"confidence": "HIGH",
"remediation": "",
"code_snippet": issue.get("body", ""),
"language": "python"
}
def normalize_eslint_result(issue):
"""تطبيع نتيجة ESLint"""
return {
"tool": "eslint",
"tool_name": "ESLint",
"severity": map_eslint_severity(issue.get("severity", 1)),
"severity_level": get_severity_level(map_eslint_severity(issue.get("severity", 1))),
"file": issue.get("filePath", ""),
"line": issue.get("line", 0),
"column": issue.get("column", 0),
"rule_id": issue.get("ruleId", ""),
"rule_name": issue.get("ruleId", ""),
"message": issue.get("message", ""),
"description": issue.get("message", ""),
"confidence": "HIGH",
"remediation": "",
"code_snippet": issue.get("source", ""),
"language": "javascript"
}
def normalize_gosec_result(issue):
"""تطبيع نتيجة Gosec"""
return {
"tool": "gosec",
"tool_name": "Gosec",
"severity": map_gosec_severity(issue.get("severity", "MEDIUM")),
"severity_level": get_severity_level(map_gosec_severity(issue.get("severity", "MEDIUM"))),
"file": issue.get("file", ""),
"line": issue.get("line", 0),
"column": 0,
"rule_id": issue.get("rule_id", ""),
"rule_name": issue.get("func_name", ""),
"message": issue.get("details", ""),
"description": issue.get("details", ""),
"confidence": "HIGH",
"remediation": "",
"code_snippet": "",
"language": "go"
}
def normalize_cargo_audit_result(vuln):
"""تطبيع نتيجة Cargo Audit"""
severity_map = {
"high": "CRITICAL",
"medium": "HIGH",
"low": "MEDIUM",
"unknown": "LOW"
}
return {
"tool": "cargo-audit",
"tool_name": "Cargo Audit",
"severity": severity_map.get(vuln.get("severity", "unknown"), "MEDIUM"),
"severity_level": get_severity_level(severity_map.get(vuln.get("severity", "unknown"), "MEDIUM")),
"file": vuln.get("package", {}).get("name", "unknown"),
"line": 0,
"column": 0,
"rule_id": vuln.get("id", ""),
"rule_name": vuln.get("advisory_id", ""),
"message": vuln.get("title", ""),
"description": vuln.get("description", ""),
"confidence": "HIGH",
"remediation": vuln.get("url", ""),
"code_snippet": "",
"language": "rust"
}
def map_pylint_severity(pylint_type):
"""تحويل نوع Pylint إلى شدة موحدة"""
severity_map = {
"error": "CRITICAL",
"warning": "HIGH",
"refactor": "MEDIUM",
"convention": "LOW",
"info": "INFO"
}
return severity_map.get(pylint_type.lower(), "MEDIUM")
def map_eslint_severity(eslint_severity):
"""تحويل شدة ESLint إلى شدة موحدة"""
severity_map = {
2: "CRITICAL", # Error
1: "HIGH", # Warning
0: "INFO" # Info
}
return severity_map.get(eslint_severity, "MEDIUM")
def map_gosec_severity(gosec_severity):
"""تحويل شدة Gosec إلى شدة موحدة"""
severity_map = {
"HIGH": "CRITICAL",
"MEDIUM": "HIGH",
"LOW": "MEDIUM",
"UNKNOWN": "LOW"
}
return severity_map.get(gosec_severity.upper(), "MEDIUM")
def get_severity_level(severity):
"""الحصول على مستوى الشدة للفرز"""
level_map = {
"CRITICAL": 1,
"HIGH": 2,
"MEDIUM": 3,
"LOW": 4,
"INFO": 5
}
return level_map.get(severity.upper(), 3)
def get_git_info():
"""استخراج معلومات Git"""
commit_sha = os.environ.get("GITHUB_SHA", "")[:7]
branch = os.environ.get("GITHUB_REF", "").replace("refs/heads/", "")
return {
"commit_sha": commit_sha,
"branch": branch,
"workflow": "auto-guardian-scan"
}
def detect_languages():
"""اكتشاف اللغات الموجودة في المشروع"""
languages = []
extensions = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".jsx": "javascript",
".tsx": "typescript",
".go": "go",
".rs": "rust",
".java": "java",
".cpp": "cpp",
".c": "c"
}
for ext, lang in extensions.items():
if any(Path(".").rglob(f"*{ext}")):
if lang not in languages:
languages.append(lang)
return languages
def count_lines_by_language():
"""عدم الأسطر حسب اللغة"""
counts = {}
extensions = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".go": "go",
".rs": "rust"
}
for ext, lang in extensions.items():
files = list(Path(".").rglob(f"*{ext}"))
total_lines = 0
for f in files:
try:
with open(f, 'r', encoding='utf-8', errors='ignore') as file:
total_lines += sum(1 for _ in file)
except:
pass
if total_lines > 0:
counts[lang] = total_lines
return counts
def main():
"""الدالة الرئيسية"""
print("=" * 50)
print("Auto-Guardian: تجميع نتائج الفحص")
print("=" * 50)
# استخراج معلومات Git
git_info = get_git_info()
print(f"Commit: {git_info['commit_sha']}")
print(f"Branch: {git_info['branch']}")
# اكتشاف اللغات
languages = detect_languages()
print(f"Languages detected: {', '.join(languages)}")
# تجميع جميع النتائج
all_findings = []
summary_by_tool = {}
findings_count = 0
# Bandit Results
print("\n[1/5] Processing Bandit results...")
bandit_results = load_json_file("bandit_results.json")
if isinstance(bandit_results, list):
for result in bandit_results:
if "results" in result:
for issue in result["results"]:
finding = normalize_bandit_result(issue)
all_findings.append(finding)
findings_count += 1
summary_by_tool["bandit"] = {"issues": findings_count, "status": "completed"}
else:
summary_by_tool["bandit"] = {"issues": 0, "status": "no_data"}
print(f" Found {findings_count} Bandit issues")
# Pylint Results
print("\n[2/5] Processing Pylint results...")
pylint_results = load_json_file("pylint_results.json")
pylint_count = 0
if isinstance(pylint_results, list):
for issue in pylint_results:
if isinstance(issue, dict):
finding = normalize_pylint_result(issue)
all_findings.append(finding)
pylint_count += 1
summary_by_tool["pylint"] = {"issues": pylint_count, "status": "completed"}
else:
summary_by_tool["pylint"] = {"issues": 0, "status": "no_data"}
print(f" Found {pylint_count} Pylint issues")
# ESLint Results
print("\n[3/5] Processing ESLint results...")
eslint_results = load_json_file("eslint_results.json")
eslint_count = 0
if isinstance(eslint_results, list):
for result in eslint_results:
if "messages" in result:
for issue in result["messages"]:
if isinstance(issue, dict):
finding = normalize_eslint_result(issue)
all_findings.append(finding)
eslint_count += 1
summary_by_tool["eslint"] = {"issues": eslint_count, "status": "completed"}
else:
summary_by_tool["eslint"] = {"issues": 0, "status": "no_data"}
print(f" Found {eslint_count} ESLint issues")
# Gosec Results
print("\n[4/5] Processing Gosec results...")
gosec_results = load_json_file("gosec_results.json")
gosec_count = 0
if isinstance(gosec_results, dict) and "Issues" in gosec_results:
for issue in gosec_results["Issues"]:
finding = normalize_gosec_result(issue)
all_findings.append(finding)
gosec_count += 1
summary_by_tool["gosec"] = {"issues": gosec_count, "status": "completed"}
else:
summary_by_tool["gosec"] = {"issues": 0, "status": "no_data"}
print(f" Found {gosec_count} Gosec issues")
# Cargo Audit Results
print("\n[5/5] Processing Cargo Audit results...")
cargo_results = load_json_file("cargo_audit_results.json")
cargo_count = 0
if isinstance(cargo_results, dict) and "vulnerabilities" in cargo_results:
for vuln in cargo_results["vulnerabilities"]:
finding = normalize_cargo_audit_result(vuln)
all_findings.append(finding)
cargo_count += 1
summary_by_tool["cargo-audit"] = {"issues": cargo_count, "status": "completed"}
else:
summary_by_tool["cargo-audit"] = {"issues": 0, "status": "no_data"}
print(f" Found {cargo_count} Cargo Audit issues")
# حساب الإحصائيات
severity_counts = {
"CRITICAL": 0,
"HIGH": 0,
"MEDIUM": 0,
"LOW": 0,
"INFO": 0
}
for finding in all_findings:
severity = finding.get("severity", "MEDIUM")
if severity in severity_counts:
severity_counts[severity] += 1
# حساب النتيجة
total_findings = len(all_findings)
critical_penalty = severity_counts["CRITICAL"] * 10
high_penalty = severity_counts["HIGH"] * 5
medium_penalty = severity_counts["MEDIUM"] * 2
base_score = 100
security_score = max(0, base_score - critical_penalty - high_penalty - medium_penalty)
# عدد الملفات الممسوحة ضوئياً
file_count = 0
files_set = set()
for finding in all_findings:
file_path = finding.get("file", "")
if file_path and file_path not in files_set:
files_set.add(file_path)
file_count = len(files_set)
# إنشاء التقرير النهائي
scan_results = {
"metadata": {
"timestamp": datetime.utcnow().isoformat() + "Z",
"version": "1.0.0",
"tool": "auto-guardian",
"git": git_info
},
"summary": {
"total_issues": total_findings,
"critical_issues": severity_counts["CRITICAL"],
"high_issues": severity_counts["HIGH"],
"medium_issues": severity_counts["MEDIUM"],
"low_issues": severity_counts["LOW"],
"info_issues": severity_counts["INFO"],
"files_scanned": file_count,
"security_score": security_score,
"grade": get_grade(security_score)
},
"languages": {
"detected": languages,
"lines_of_code": count_lines_by_language(),
"tools_used": summary_by_tool
},
"findings": sorted(all_findings, key=lambda x: x.get("severity_level", 99)),
"trends": {
"previous_scan": None,
"improvement": None
}
}
# حفظ النتائج
output_file = "scan_results.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(scan_results, f, indent=2, ensure_ascii=False)
print("\n" + "=" * 50)
print("ملخص الفحص:")
print("=" * 50)
print(f" إجمالي المشاكل: {total_findings}")
print(f" مشاكل حرجة: {severity_counts['CRITICAL']}")
print(f" مشاكل عالية: {severity_counts['HIGH']}")
print(f" مشاكل متوسطة: {severity_counts['MEDIUM']}")
print(f" مشاكل منخفضة: {severity_counts['LOW']}")
print(f" الملفات الممسوحة: {file_count}")
print(f" نقاط الأمان: {security_score}/100 (الدرجة: {get_grade(security_score)})")
print("=" * 50)
print(f"✅ النتائج saved to: {output_file}")
print("=" * 50)
def get_grade(score):
"""الحصول على التقدير من النقاط"""
if score >= 90:
return "A"
elif score >= 80:
return "B"
elif score >= 70:
return "C"
elif score >= 60:
return "D"
else:
return "F"
if __name__ == "__main__":
main()