File size: 7,835 Bytes
13712b0
ef55883
 
 
 
 
 
 
 
 
13712b0
ef55883
 
 
13712b0
ef55883
 
 
 
 
 
13712b0
ef55883
 
 
13712b0
 
 
 
 
 
 
 
 
 
ef55883
13712b0
 
 
 
 
 
 
ef55883
13712b0
ef55883
 
 
 
 
 
 
 
 
13712b0
 
 
 
 
 
ef55883
 
13712b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ef55883
13712b0
 
 
 
 
ef55883
 
 
 
13712b0
 
 
 
 
 
 
 
ef55883
 
 
 
 
 
 
 
 
 
 
13712b0
 
 
 
ef55883
 
 
 
 
13712b0
ef55883
 
 
 
 
 
 
 
 
 
 
 
 
 
13712b0
ef55883
 
 
13712b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ef55883
 
 
 
13712b0
ef55883
13712b0
 
 
 
 
 
 
 
 
 
 
 
 
 
ef55883
 
 
 
 
 
 
 
 
 
 
 
 
 
13712b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ef55883
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
#!/usr/bin/env python3
from __future__ import annotations
import os
import sys
import json
import shutil
import subprocess
import platform
import getpass
import datetime
from typing import Dict, Any, Optional, Sequence

# Try to import psutil if available for richer process info
try:
    import psutil  # type: ignore
except Exception:
    psutil = None  # type: ignore


def get_env_vars() -> Dict[str, str]:
    """Return all environment variables as a dict."""
    # Convert to plain dict to avoid os._Environ metadata in output
    return dict(os.environ)


def _shorten_text(s: str, max_lines: int = 5, max_chars: int = 2000) -> str:
    """Trim text to the given number of lines and characters to keep output small."""
    out = "\n".join(s.splitlines()[:max_lines])
    if len(out) > max_chars:
        out = out[: max_chars - 3] + "..."
    return out


def safe_run_version(cmd: Sequence[str], timeout: float = 3.0) -> Dict[str, Any]:
    """Try to run `cmd` and capture stdout/stderr (combined)."""
    try:
        p = subprocess.run(
            list(cmd),
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            timeout=timeout,
            check=False,
        )
        out = p.stdout.decode(errors="replace").strip()
        short_out = _shorten_text(out)
        return {"ok": True, "rc": p.returncode, "output": short_out}
    except FileNotFoundError:
        return {"ok": False, "error": "not found"}
    except subprocess.TimeoutExpired:
        return {"ok": False, "error": "timeout"}
    except Exception as e:
        return {"ok": False, "error": str(e)}


# Common flags to try for version/help info
TOOL_VERSION_FLAGS = ("--version", "-V", "-v", "--help")


def check_tools(tools: Sequence[str]) -> Dict[str, Dict[str, Any]]:
    """Check whether tools are callable (in PATH) and try to get version/help output."""
    results: Dict[str, Dict[str, Any]] = {}
    for t in tools:
        found_path: Optional[str] = shutil.which(t)
        info: Dict[str, Any] = {"path": found_path, "callable": bool(found_path)}
        # Whether we should try to run "t" or the absolute path
        cmd_base = found_path or t
        # Try common flags to get version/help output
        for flag in TOOL_VERSION_FLAGS:
            res = safe_run_version([cmd_base, flag])
            if res.get("ok") and res.get("output"):
                info["version_cmd"] = flag
                info["version_output"] = res.get("output")
                break
        else:
            # final fallback: attempt with no args (some tools print version/usage by default)
            res = safe_run_version([cmd_base])
            if res.get("ok") and res.get("output"):
                info["version_cmd"] = None
                info["version_output"] = res.get("output")
            else:
                # If not found or error, pass the error info
                info["version_error"] = res.get("error")
                # Ensure callable is set correctly: if subprocess reports not found, set callable False
                if res.get("error") == "not found":
                    info["callable"] = False
        results[t] = info
    return results


def _safe_get_user() -> Optional[str]:
    try:
        return getpass.getuser()
    except Exception:
        # Fallback to env variables
        return os.environ.get("USER") or os.environ.get("USERNAME")


def get_process_info() -> Dict[str, Any]:
    """Collect basic process information. If psutil is available, include richer info."""
    info: Dict[str, Any] = {}
    info["platform"] = platform.system()
    info["platform_release"] = platform.release()
    info["platform_version"] = platform.version()
    info["machine"] = platform.machine()
    info["processor"] = platform.processor()
    info["python_version"] = platform.python_version()
    info["python_executable"] = sys.executable
    info["argv"] = sys.argv
    try:
        info["cwd"] = os.getcwd()
    except Exception:
        info["cwd"] = None
    info["pid"] = os.getpid()
    try:
        info["ppid"] = os.getppid()
    except Exception:
        info["ppid"] = None
    info["user"] = _safe_get_user()

    # Add Unix-specific ids if available
    if hasattr(os, "getuid"):
        try:
            info["uid"] = os.getuid()
            info["gid"] = os.getgid()
            if hasattr(os, "geteuid"):
                info["euid"] = os.geteuid()
            if hasattr(os, "getegid"):
                info["egid"] = os.getegid()
        except Exception:
            pass

    # Try using psutil for richer details
    if psutil is not None:
        try:
            p = psutil.Process(info["pid"])
            with p.oneshot():
                try:
                    info["exe"] = p.exe()
                except Exception:
                    info["exe"] = sys.executable
                try:
                    info["cmdline"] = p.cmdline()
                except Exception:
                    info["cmdline"] = sys.argv
                try:
                    ct = p.create_time()
                    info["create_time"] = datetime.datetime.fromtimestamp(ct).isoformat()
                except Exception:
                    info["create_time"] = None
                try:
                    info["cpu_percent"] = p.cpu_percent(interval=0.1)
                except Exception:
                    info["cpu_percent"] = None
                try:
                    mem = p.memory_info()
                    info["memory_rss"] = getattr(mem, "rss", None)
                    info["memory_vms"] = getattr(mem, "vms", None)
                except Exception:
                    info["memory_rss"] = None
                    info["memory_vms"] = None
                try:
                    info["num_threads"] = p.num_threads()
                except Exception:
                    info["num_threads"] = None
                try:
                    info["open_files"] = [f.path for f in p.open_files()]
                except Exception:
                    info["open_files"] = None
                # p.connections() can be expensive and may require permissions; tolerate failures
                try:
                    # Modern psutil uses net_connections
                    conns = getattr(p, "connections", None)
                    if callable(conns):
                        conns_list = p.connections()
                    else:
                        net_conns = getattr(p, "net_connections", None)
                        if callable(net_conns):
                            conns_list = p.net_connections()
                        else:
                            conns_list = None
                    if conns_list is None:
                        info["connections"] = None
                    else:
                        info["connections"] = [str(c) for c in conns_list]
                except Exception:
                    info["connections"] = None
        except Exception:
            # Fall back to minimal info already present
            pass
    else:
        # Fallback values
        info["exe"] = sys.executable
        info["cmdline"] = sys.argv
        info["create_time"] = None
    return info


def main() -> int:
    try:
        env = get_env_vars()
        process = get_process_info()
        tools = check_tools(["curl", "wget"])
        result = {"env": env, "process": process, "tools": tools}
        # Ensure stable JSON output
        out = json.dumps(result, indent=2, ensure_ascii=False, sort_keys=False)
        print(out, flush=True)
        return 0
    except Exception as e:
        # Print error message in JSON form for easier parsing
        try:
            print(json.dumps({"error": str(e)}), flush=True)
        except Exception:
            print(f"error: {e}", flush=True)
        return 2


main()