File size: 6,344 Bytes
d00203b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174

import os
import subprocess
import sys
import json
import time
import socket
from concurrent.futures import ThreadPoolExecutor

# ANSI Colors
CYAN = "\033[96m"
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
RESET = "\033[0m"

def run_command(command, cwd=None, capture=True):
    """Run a shell command and return stdout/stderr"""
    try:
        if capture:
            result = subprocess.run(
                command, 
                cwd=cwd, 
                shell=True, 
                check=True, 
                stdout=subprocess.PIPE, 
                stderr=subprocess.PIPE,
                text=True
            )
            return result.stdout.strip()
        else:
            subprocess.run(command, cwd=cwd, shell=True, check=True)
            return ""
    except subprocess.CalledProcessError as e:
        print(f"{RED}Error running '{command}': {e.stderr}{RESET}")
        return None

def audit_code_quality():
    print(f"\n{CYAN}[1/4] Running Code Complexity Audit (Radon)...{RESET}")
    # Backend directory
    backend_dir = "backend/app"
    
    # Cyclomatic Complexity
    print(f"  {YELLOW}Checking Cyclomatic Complexity (CC)...{RESET}")
    # -a: average, -s: show complexity check script score
    cc_report = run_command(f"..\\.venv\\Scripts\\radon cc {backend_dir} -a -s", cwd="backend")
    
    if cc_report:
        print(cc_report)
        # Check if average complexity is acceptable (Target < 10)
        avg_line = [l for l in cc_report.splitlines() if "Average complexity:" in l]
        if avg_line:
            score = avg_line[0].split()[-1].strip("()")
            print(f"  {GREEN}Average Complexity Score: {score}{RESET}")

    # Maintainability Index
    print(f"  {YELLOW}Checking Maintainability Index (MI)...{RESET}")
    mi_report = run_command(f"..\\.venv\\Scripts\\radon mi {backend_dir}", cwd="backend")
    if mi_report:
        print(mi_report)

def audit_security():
    print(f"\n{CYAN}[2/4] Running Security Audit (Bandit)...{RESET}")
    # Bandit checks for common security issues
    # -r: recursive, -ll: medium confidence/severity
    print(f"  {YELLOW}Scanning for vulnerabilities...{RESET}")
    
    # Using format json to parse logic if needed, but for CLI output simply custom
    cmd = f"..\\.venv\\Scripts\\bandit -r backend/app -ll -f custom --msg-template '{{abspath}}:{{line}}: {{test_id}}: {{severity}}: {{msg}}'"
    
    # Bandit returns exit code 1 if issues found which makes run_command fail, so we handle manually
    try:
        result = subprocess.run(
            cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
        )
        output = result.stdout.strip()
        if output:
            print(output)
            print(f"  {YELLOW}Review above security warnings.{RESET}")
        else:
            print(f"  {GREEN}No medium/high severity issues found.{RESET}")
    except Exception as e:
        print(f"{RED}Bandit failed: {e}{RESET}")

def audit_performance_frontend():
    print(f"\n{CYAN}[3/4] Running Frontend Lighthouse Audit...{RESET}")
    
    # Check if app is running on localhost:8501
    import socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    result = sock.connect_ex(('localhost', 8501))
    sock.close()
    
    if result != 0:
        print(f"{RED}Streamlit app not running on localhost:8501. Skipping Lighthouse.{RESET}")
        print(f"{YELLOW}Tip: Run 'streamlit run frontend/Home.py' in a separate terminal.{RESET}")
        return

    print(f"  {YELLOW}Running LHCI (Desktop)...{RESET}")
    # lhci collect --url=http://localhost:8501 --numberOfRuns=3
    
    # We use a temp config to avoid creating a file
    cmd = "lhci collect --url=http://localhost:8501 --numberOfRuns=1 --settings.preset=desktop"
    
    # This might fail if lhci is not found in path despite npm install -g
    # Users machine might need restart or path update. We'll try to execute via npx if fails.
    try:
        # Check dependencies first
        subprocess.run("lhci --version", shell=True, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        subprocess.run(cmd, shell=True)
        print(f"  {GREEN}Lighthouse audit complete. Check .lighthouseci/ for report.{RESET}")
    except:
        print(f"  {YELLOW}Global 'lhci' not found. Trying via npx...{RESET}")
        try:
            subprocess.run(f"npx @lhci/cli collect --url=http://localhost:8501 --numberOfRuns=1", shell=True)
            print(f"  {GREEN}Lighthouse audit complete.{RESET}")
        except Exception as e:
            print(f"{RED}Lighthouse failed: {e}{RESET}")

def audit_performance_backend():
    print(f"\n{CYAN}[4/4] Running Backend Load Test (Locust)...{RESET}")
    
    # Create a simple locustfile if not exists
    locust_file = "backend/tests/locustfile.py"
    if not os.path.exists(locust_file):
        print(f"  {YELLOW}Creating temporary locustfile...{RESET}")
        with open(locust_file, "w") as f:
            f.write("""
from locust import HttpUser, task, between

class APIUser(HttpUser):
    wait_time = between(1, 3)

    @task(3)
    def health_check(self):
        self.client.get("/health")

    @task(1)
    def api_docs(self):
        self.client.get("/docs")
            """)
    
    print(f"  {YELLOW}Simulating 50 users for 10 seconds...{RESET}")
    
    # Run headless locust
    cmd = (
        f"..\\.venv\\Scripts\\locust -f {locust_file} "
        "--headless -u 50 -r 10 --run-time 10s --host http://localhost:8000"
    )
    
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    result = sock.connect_ex(('localhost', 8000))
    sock.close()
    
    if result != 0:
        print(f"{RED}Backend API not running on localhost:8000. Skipping Load Test.{RESET}")
        return

    run_command(cmd, cwd="backend", capture=False)

if __name__ == "__main__":
    print(f"{GREEN}=== Starting VoiceForge Deep System Audit ==={RESET}")
    audit_code_quality()
    audit_security()
    # audit_performance_frontend() # enable strict
    # audit_performance_backend() # enable strict
    
    # We will trigger these only if user confirms app is running, 
    # but for this script we will try to run them gracefully
    audit_performance_backend()
    audit_performance_frontend()
    
    print(f"\n{GREEN}=== Audit Complete ==={RESET}")