|
|
|
|
|
import os |
|
|
import subprocess |
|
|
import sys |
|
|
import json |
|
|
import time |
|
|
import socket |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
|
|
|
CYAN = "\033[96m" |
|
|
GREEN = "\033[92m" |
|
|
RED = "\033[91m" |
|
|
YELLOW = "\033[93m" |
|
|
RESET = "\033[0m" |
|
|
|
|
|
def run_command(command, cwd=None, capture=True): |
|
|
"""Run a shell command and return stdout/stderr""" |
|
|
try: |
|
|
if capture: |
|
|
result = subprocess.run( |
|
|
command, |
|
|
cwd=cwd, |
|
|
shell=True, |
|
|
check=True, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE, |
|
|
text=True |
|
|
) |
|
|
return result.stdout.strip() |
|
|
else: |
|
|
subprocess.run(command, cwd=cwd, shell=True, check=True) |
|
|
return "" |
|
|
except subprocess.CalledProcessError as e: |
|
|
print(f"{RED}Error running '{command}': {e.stderr}{RESET}") |
|
|
return None |
|
|
|
|
|
def audit_code_quality(): |
|
|
print(f"\n{CYAN}[1/4] Running Code Complexity Audit (Radon)...{RESET}") |
|
|
|
|
|
backend_dir = "backend/app" |
|
|
|
|
|
|
|
|
print(f" {YELLOW}Checking Cyclomatic Complexity (CC)...{RESET}") |
|
|
|
|
|
cc_report = run_command(f"..\\.venv\\Scripts\\radon cc {backend_dir} -a -s", cwd="backend") |
|
|
|
|
|
if cc_report: |
|
|
print(cc_report) |
|
|
|
|
|
avg_line = [l for l in cc_report.splitlines() if "Average complexity:" in l] |
|
|
if avg_line: |
|
|
score = avg_line[0].split()[-1].strip("()") |
|
|
print(f" {GREEN}Average Complexity Score: {score}{RESET}") |
|
|
|
|
|
|
|
|
print(f" {YELLOW}Checking Maintainability Index (MI)...{RESET}") |
|
|
mi_report = run_command(f"..\\.venv\\Scripts\\radon mi {backend_dir}", cwd="backend") |
|
|
if mi_report: |
|
|
print(mi_report) |
|
|
|
|
|
def audit_security(): |
|
|
print(f"\n{CYAN}[2/4] Running Security Audit (Bandit)...{RESET}") |
|
|
|
|
|
|
|
|
print(f" {YELLOW}Scanning for vulnerabilities...{RESET}") |
|
|
|
|
|
|
|
|
cmd = f"..\\.venv\\Scripts\\bandit -r backend/app -ll -f custom --msg-template '{{abspath}}:{{line}}: {{test_id}}: {{severity}}: {{msg}}'" |
|
|
|
|
|
|
|
|
try: |
|
|
result = subprocess.run( |
|
|
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True |
|
|
) |
|
|
output = result.stdout.strip() |
|
|
if output: |
|
|
print(output) |
|
|
print(f" {YELLOW}Review above security warnings.{RESET}") |
|
|
else: |
|
|
print(f" {GREEN}No medium/high severity issues found.{RESET}") |
|
|
except Exception as e: |
|
|
print(f"{RED}Bandit failed: {e}{RESET}") |
|
|
|
|
|
def audit_performance_frontend(): |
|
|
print(f"\n{CYAN}[3/4] Running Frontend Lighthouse Audit...{RESET}") |
|
|
|
|
|
|
|
|
import socket |
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
|
result = sock.connect_ex(('localhost', 8501)) |
|
|
sock.close() |
|
|
|
|
|
if result != 0: |
|
|
print(f"{RED}Streamlit app not running on localhost:8501. Skipping Lighthouse.{RESET}") |
|
|
print(f"{YELLOW}Tip: Run 'streamlit run frontend/Home.py' in a separate terminal.{RESET}") |
|
|
return |
|
|
|
|
|
print(f" {YELLOW}Running LHCI (Desktop)...{RESET}") |
|
|
|
|
|
|
|
|
|
|
|
cmd = "lhci collect --url=http://localhost:8501 --numberOfRuns=1 --settings.preset=desktop" |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
subprocess.run("lhci --version", shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
subprocess.run(cmd, shell=True) |
|
|
print(f" {GREEN}Lighthouse audit complete. Check .lighthouseci/ for report.{RESET}") |
|
|
except: |
|
|
print(f" {YELLOW}Global 'lhci' not found. Trying via npx...{RESET}") |
|
|
try: |
|
|
subprocess.run(f"npx @lhci/cli collect --url=http://localhost:8501 --numberOfRuns=1", shell=True) |
|
|
print(f" {GREEN}Lighthouse audit complete.{RESET}") |
|
|
except Exception as e: |
|
|
print(f"{RED}Lighthouse failed: {e}{RESET}") |
|
|
|
|
|
def audit_performance_backend(): |
|
|
print(f"\n{CYAN}[4/4] Running Backend Load Test (Locust)...{RESET}") |
|
|
|
|
|
|
|
|
locust_file = "backend/tests/locustfile.py" |
|
|
if not os.path.exists(locust_file): |
|
|
print(f" {YELLOW}Creating temporary locustfile...{RESET}") |
|
|
with open(locust_file, "w") as f: |
|
|
f.write(""" |
|
|
from locust import HttpUser, task, between |
|
|
|
|
|
class APIUser(HttpUser): |
|
|
wait_time = between(1, 3) |
|
|
|
|
|
@task(3) |
|
|
def health_check(self): |
|
|
self.client.get("/health") |
|
|
|
|
|
@task(1) |
|
|
def api_docs(self): |
|
|
self.client.get("/docs") |
|
|
""") |
|
|
|
|
|
print(f" {YELLOW}Simulating 50 users for 10 seconds...{RESET}") |
|
|
|
|
|
|
|
|
cmd = ( |
|
|
f"..\\.venv\\Scripts\\locust -f {locust_file} " |
|
|
"--headless -u 50 -r 10 --run-time 10s --host http://localhost:8000" |
|
|
) |
|
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
|
result = sock.connect_ex(('localhost', 8000)) |
|
|
sock.close() |
|
|
|
|
|
if result != 0: |
|
|
print(f"{RED}Backend API not running on localhost:8000. Skipping Load Test.{RESET}") |
|
|
return |
|
|
|
|
|
run_command(cmd, cwd="backend", capture=False) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
print(f"{GREEN}=== Starting VoiceForge Deep System Audit ==={RESET}") |
|
|
audit_code_quality() |
|
|
audit_security() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
audit_performance_backend() |
|
|
audit_performance_frontend() |
|
|
|
|
|
print(f"\n{GREEN}=== Audit Complete ==={RESET}") |
|
|
|