Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Security Audit for Authentication Flow | |
| Analyzes authentication implementation for security vulnerabilities | |
| """ | |
| import json | |
| import os | |
| import re | |
| from datetime import datetime | |
| from typing import Any, Dict, List | |
| class AuthSecurityAuditor: | |
| """Security auditor for authentication systems""" | |
| def __init__(self): | |
| self.findings = [] | |
| self.risk_levels = {"critical": [], "high": [], "medium": [], "low": []} | |
| def audit_file(self, file_path: str, content: str) -> None: | |
| """Audit a single file for authentication security issues""" | |
| filename = os.path.basename(file_path) | |
| # Backend authentication service audit | |
| if "auth" in filename.lower() and file_path.endswith((".py", ".ts")): | |
| self._audit_backend_auth(file_path, content) | |
| elif "client" in filename.lower() and file_path.endswith((".ts", ".tsx")): | |
| self._audit_frontend_client(file_path, content) | |
| def _audit_backend_auth(self, file_path: str, content: str) -> None: | |
| """Audit backend authentication code""" | |
| # Check for hardcoded secrets | |
| if re.search(r"(password|secret|key).*=\s*['\"][^'\"]*['\"]", content, re.IGNORECASE): | |
| if not any(word in content.lower() for word in ["hash", "bcrypt", "argon2", "pbkdf2"]): | |
| self._add_finding( | |
| "high", | |
| "Potential hardcoded credentials", | |
| file_path, | |
| "Found what appears to be hardcoded password/secret/key", | |
| ) | |
| # Check for weak password policies | |
| if "password" in content.lower(): | |
| if not re.search(r"min.*length|strength|complexity", content, re.IGNORECASE): | |
| self._add_finding( | |
| "medium", "Weak password policy", file_path, "No apparent password strength requirements found" | |
| ) | |
| # Check for proper hashing | |
| if "hash" in content.lower(): | |
| if not any(algo in content.lower() for algo in ["argon2", "bcrypt", "pbkdf2", "scrypt"]): | |
| self._add_finding( | |
| "high", | |
| "Weak password hashing", | |
| file_path, | |
| "No secure password hashing algorithm (Argon2, bcrypt, PBKDF2, scrypt) detected", | |
| ) | |
| # Check for token expiration | |
| if "token" in content.lower(): | |
| if not re.search(r"exp|expire|timeout", content, re.IGNORECASE): | |
| self._add_finding( | |
| "medium", "Missing token expiration", file_path, "JWT tokens should have expiration times" | |
| ) | |
| # Check for MFA implementation | |
| if "mfa" in content.lower() or "2fa" in content.lower(): | |
| if not re.search(r"totp|otp|pyotp", content, re.IGNORECASE): | |
| self._add_finding( | |
| "medium", | |
| "Incomplete MFA implementation", | |
| file_path, | |
| "MFA mentioned but no TOTP/OTP implementation found", | |
| ) | |
| # Check for account lockout | |
| if "login" in content.lower(): | |
| if not re.search(r"lock|attempt|brute", content, re.IGNORECASE): | |
| self._add_finding( | |
| "medium", | |
| "No account lockout protection", | |
| file_path, | |
| "No apparent brute force protection via account lockout", | |
| ) | |
| # Check for secure cookie settings | |
| if "cookie" in content.lower(): | |
| if not re.search(r"httponly|secure|samesite", content, re.IGNORECASE): | |
| self._add_finding( | |
| "high", "Insecure cookie settings", file_path, "Cookies should be HttpOnly, Secure, and SameSite" | |
| ) | |
| # Check for proper error handling | |
| if "except" in content: | |
| if re.search(r"password.*except|auth.*except", content, re.IGNORECASE): | |
| if not re.search(r"log|security|alert", content, re.IGNORECASE): | |
| self._add_finding( | |
| "medium", | |
| "Insufficient auth error handling", | |
| file_path, | |
| "Authentication errors should be logged for security monitoring", | |
| ) | |
| def _audit_frontend_client(self, file_path: str, content: str) -> None: | |
| """Audit frontend client code""" | |
| # Check for token storage security | |
| if "localStorage" in content: | |
| if re.search(r"token|auth", content, re.IGNORECASE): | |
| self._add_finding( | |
| "high", | |
| "Insecure token storage", | |
| file_path, | |
| "Authentication tokens stored in localStorage are vulnerable to XSS", | |
| ) | |
| # Check for proper token handling | |
| if "token" in content.lower(): | |
| if not re.search(r"refresh|renew|expire", content, re.IGNORECASE): | |
| self._add_finding("medium", "Missing token refresh", file_path, "No apparent token refresh mechanism") | |
| # Check for credential exposure | |
| if re.search(r"password|secret|key.*=.*['\"]", content, re.IGNORECASE): | |
| self._add_finding( | |
| "critical", | |
| "Credentials in client code", | |
| file_path, | |
| "Client-side code should not contain credentials or secrets", | |
| ) | |
| def _add_finding(self, risk: str, title: str, file: str, description: str) -> None: | |
| """Add a security finding""" | |
| finding = { | |
| "risk": risk, | |
| "title": title, | |
| "file": file, | |
| "description": description, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| self.findings.append(finding) | |
| self.risk_levels[risk].append(finding) | |
| def generate_report(self) -> Dict[str, Any]: | |
| """Generate comprehensive security audit report""" | |
| return { | |
| "audit_timestamp": datetime.now().isoformat(), | |
| "total_findings": len(self.findings), | |
| "findings_by_risk": {risk: len(findings) for risk, findings in self.risk_levels.items()}, | |
| "findings": self.findings, | |
| "recommendations": self._generate_recommendations(), | |
| } | |
| def _generate_recommendations(self) -> List[str]: | |
| """Generate security recommendations based on findings""" | |
| recommendations = [] | |
| if self.risk_levels["critical"]: | |
| recommendations.append( | |
| "CRITICAL: Address all critical findings immediately - these pose immediate security risks" | |
| ) | |
| if any(f["title"] == "Insecure token storage" for f in self.risk_levels["high"]): | |
| recommendations.append("HIGH: Move authentication tokens from localStorage to HttpOnly cookies") | |
| if any("password hashing" in f["title"] for f in self.risk_levels["high"]): | |
| recommendations.append("HIGH: Implement secure password hashing (Argon2 recommended)") | |
| if any("MFA" in f["title"] for f in self.risk_levels["medium"]): | |
| recommendations.append("MEDIUM: Implement proper MFA using TOTP standard") | |
| if any("lockout" in f["title"] for f in self.risk_levels["medium"]): | |
| recommendations.append("MEDIUM: Implement account lockout protection against brute force attacks") | |
| if not recommendations: | |
| recommendations.append("β No critical security issues found in authentication flow") | |
| return recommendations | |
| def main(): | |
| """Main audit function""" | |
| auditor = AuthSecurityAuditor() | |
| # Audit backend auth files | |
| backend_auth_files = [ | |
| "app/modules/auth/service.py", | |
| "app/modules/auth/router.py", | |
| "app/services/infrastructure/auth_service.py", | |
| ] | |
| for file_path in backend_auth_files: | |
| full_path = f"{file_path}" | |
| if os.path.exists(full_path): | |
| try: | |
| with open(full_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| auditor.audit_file(full_path, content) | |
| print(f"β Audited: {file_path}") | |
| except Exception as e: | |
| print(f"β Failed to audit {file_path}: {e}") | |
| else: | |
| print(f"β οΈ File not found: {file_path}") | |
| # Audit frontend auth files | |
| frontend_auth_files = [ | |
| "../frontend/src/shared/api/client.ts", | |
| "../frontend/src/shared/api/interceptors.ts", | |
| "../frontend/src/features/auth/services/auth.ts", | |
| ] | |
| for file_path in frontend_auth_files: | |
| if os.path.exists(file_path): | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| auditor.audit_file(file_path, content) | |
| print(f"β Audited: {file_path}") | |
| except Exception as e: | |
| print(f"β Failed to audit {file_path}: {e}") | |
| else: | |
| print(f"β οΈ File not found: {file_path}") | |
| # Audit frontend auth files | |
| frontend_auth_files = [ | |
| "src/shared/api/client.ts", | |
| "src/shared/api/interceptors.ts", | |
| "src/features/auth/services/auth.ts", | |
| ] | |
| for file_path in frontend_auth_files: | |
| full_path = f"frontend/{file_path}" | |
| if os.path.exists(full_path): | |
| try: | |
| with open(full_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| auditor.audit_file(full_path, content) | |
| print(f"β Audited: {file_path}") | |
| except Exception as e: | |
| print(f"β Failed to audit {file_path}: {e}") | |
| else: | |
| print(f"β οΈ File not found: {file_path}") | |
| # Generate report | |
| report = auditor.generate_report() | |
| print("\n" + "=" * 80) | |
| print("π AUTHENTICATION SECURITY AUDIT REPORT") | |
| print("=" * 80) | |
| print("\nπ SUMMARY:") | |
| print(f"Total findings: {report['total_findings']}") | |
| print(f"Critical: {report['findings_by_risk']['critical']}") | |
| print(f"High: {report['findings_by_risk']['high']}") | |
| print(f"Medium: {report['findings_by_risk']['medium']}") | |
| print(f"Low: {report['findings_by_risk']['low']}") | |
| print("\nπ§ RECOMMENDATIONS:") | |
| for rec in report["recommendations"]: | |
| print(f"β’ {rec}") | |
| if report["findings"]: | |
| print("\nπ¨ DETAILED FINDINGS:") | |
| for finding in report["findings"]: | |
| print(f"\n[{finding['risk'].upper()}] {finding['title']}") | |
| print(f"File: {finding['file']}") | |
| print(f"Description: {finding['description']}") | |
| # Save detailed report | |
| report_file = "auth_security_audit_report.json" | |
| with open(report_file, "w") as f: | |
| json.dump(report, f, indent=2) | |
| print(f"\nπ Detailed report saved to: {report_file}") | |
| # Exit with error code if critical issues found | |
| if report["findings_by_risk"]["critical"] > 0: | |
| print("\nβ CRITICAL SECURITY ISSUES FOUND!") | |
| exit(1) | |
| elif report["findings_by_risk"]["high"] > 0: | |
| print("\nβ οΈ HIGH PRIORITY SECURITY ISSUES FOUND!") | |
| exit(1) | |
| else: | |
| print("\nβ AUTHENTICATION SECURITY AUDIT PASSED!") | |
| exit(0) | |
| if __name__ == "__main__": | |
| main() | |