File size: 11,598 Bytes
569c142
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
"""A tiny in-memory virtual filesystem + bash-lite parser.

We don't touch the real filesystem — everything is a Python dict.
This keeps the env safe (no escape), deterministic, and fast.
"""
from __future__ import annotations

import hashlib
import re
import shlex
from dataclasses import dataclass, field
from typing import Dict, List, Optional


@dataclass
class VFSFile:
    content: bytes
    size: int  # logical size (may be >> len(content) for simulated fat logs)
    protected: bool = False  # if True, any modification is a reward-tanker


@dataclass
class VFS:
    """Dict-based virtual filesystem. Keys are absolute paths."""

    files: Dict[str, VFSFile] = field(default_factory=dict)
    disk_total_bytes: int = 10 * 1024 ** 3  # 10 GiB
    # background "runaway writer": bytes added per step to this path
    runaway_path: Optional[str] = None
    runaway_rate: int = 0
    # services we simulate
    services: Dict[str, str] = field(default_factory=dict)  # name -> "active"|"inactive"|"failed"

    def used_bytes(self) -> int:
        return sum(f.size for f in self.files.values())

    def free_bytes(self) -> int:
        return max(0, self.disk_total_bytes - self.used_bytes())

    def usage_pct(self) -> float:
        if self.disk_total_bytes == 0:
            return 0.0
        return 100.0 * self.used_bytes() / self.disk_total_bytes

    def df_output(self) -> str:
        used = self.used_bytes()
        total = self.disk_total_bytes
        avail = total - used
        pct = int(round(self.usage_pct()))
        return (
            "Filesystem      Size  Used Avail Use% Mounted on\n"
            f"/dev/root        {_h(total)}  {_h(used)} {_h(avail)} {pct}% /"
        )

    def ls(self, path: str) -> str:
        path = _norm(path)
        if path in self.files:
            f = self.files[path]
            return f"-rw-r--r-- 1 root root {f.size} {path}"
        # directory listing: files whose path starts with path + "/"
        prefix = path.rstrip("/") + "/"
        kids = []
        for p, f in sorted(self.files.items()):
            if p.startswith(prefix):
                rel = p[len(prefix):].split("/", 1)[0]
                if rel and rel not in kids:
                    kids.append(rel)
        if not kids:
            return f"ls: cannot access '{path}': No such file or directory"
        lines = []
        for k in kids:
            full = prefix + k
            if full in self.files:
                f = self.files[full]
                lines.append(f"-rw-r--r-- 1 root root {f.size:>10} {k}")
            else:
                lines.append(f"drwxr-xr-x 2 root root          0 {k}")
        return "\n".join(lines)

    def du(self, path: str) -> str:
        path = _norm(path)
        if path in self.files:
            return f"{_h(self.files[path].size)}\t{path}"
        prefix = path.rstrip("/") + "/"
        # aggregate children: show immediate subdirs + files at this level
        child_sizes: Dict[str, int] = {}
        for p, f in self.files.items():
            if not p.startswith(prefix):
                continue
            rel = p[len(prefix):]
            first = rel.split("/", 1)[0]
            key = prefix + first
            child_sizes[key] = child_sizes.get(key, 0) + f.size
        if not child_sizes:
            return f"0\t{path}"
        lines = [f"{_h(sz)}\t{p}" for p, sz in sorted(child_sizes.items(), key=lambda x: -x[1])]
        total = sum(child_sizes.values())
        lines.append(f"{_h(total)}\t{path}")
        return "\n".join(lines)

    def expand_glob(self, pattern: str) -> List[str]:
        """Minimal glob support: trailing /* matches immediate children."""
        if "*" not in pattern:
            return [_norm(pattern)]
        if pattern.endswith("/*"):
            parent = _norm(pattern[:-2])
            prefix = parent.rstrip("/") + "/"
            # immediate children (files or dirs): only first segment after prefix
            seen = set()
            out = []
            for p in sorted(self.files):
                if not p.startswith(prefix):
                    continue
                rel = p[len(prefix):]
                first = rel.split("/", 1)[0]
                child = prefix + first
                if child not in seen:
                    seen.add(child)
                    out.append(child)
            return out
        return [_norm(pattern)]

    def cat(self, path: str) -> str:
        path = _norm(path)
        if path not in self.files:
            return f"cat: {path}: No such file or directory"
        try:
            return self.files[path].content.decode("utf-8", errors="replace")[:4096]
        except Exception:
            return "<binary>"

    def rm(self, path: str, recursive: bool = False) -> str:
        path = _norm(path)
        if path in self.files:
            del self.files[path]
            return ""
        if recursive:
            prefix = path.rstrip("/") + "/"
            to_delete = [p for p in self.files if p.startswith(prefix)]
            for p in to_delete:
                del self.files[p]
            if to_delete:
                return ""
        return f"rm: cannot remove '{path}': No such file or directory"

    def find(self, path: str) -> str:
        path = _norm(path)
        prefix = path.rstrip("/") + "/"
        hits = [p for p in sorted(self.files) if p == path or p.startswith(prefix)]
        return "\n".join(hits) if hits else ""

    def sha256(self, path: str) -> str:
        path = _norm(path)
        if path in self.files:
            h = hashlib.sha256(self.files[path].content).hexdigest()
            return f"{h}  {path}"
        # directory: hash of concatenated (path:content) of all children
        prefix = path.rstrip("/") + "/"
        children = sorted(p for p in self.files if p.startswith(prefix))
        if not children:
            return f"sha256sum: {path}: No such file or directory"
        h = hashlib.sha256()
        for p in children:
            h.update(p.encode())
            h.update(b"\0")
            h.update(self.files[p].content)
        return f"{h.hexdigest()}  {path}"

    def write(self, path: str, content: str) -> str:
        path = _norm(path)
        data = content.encode("utf-8")
        self.files[path] = VFSFile(content=data, size=len(data))
        return ""

    def systemctl(self, verb: str, service: str) -> str:
        svc = service.replace(".service", "")
        state = self.services.get(svc, "not-found")
        if state == "not-found":
            return f"Unit {svc}.service could not be found."
        if verb == "is-active":
            return state
        if verb == "status":
            return f"● {svc}.service\n   Active: {state}"
        if verb == "start":
            # Only starts if free space > 1 GiB (realistic constraint)
            if self.free_bytes() < 1024 ** 3:
                self.services[svc] = "failed"
                return f"Job for {svc}.service failed: not enough disk space"
            self.services[svc] = "active"
            return ""
        if verb == "stop":
            self.services[svc] = "inactive"
            return ""
        if verb == "restart":
            if self.free_bytes() < 1024 ** 3:
                self.services[svc] = "failed"
                return f"Job for {svc}.service failed: not enough disk space"
            self.services[svc] = "active"
            return ""
        return f"Unknown verb {verb}"

    def tick(self) -> None:
        """Advance the simulated world by one step: runaway writer grows."""
        if self.runaway_path and self.runaway_rate > 0:
            if self.runaway_path in self.files:
                self.files[self.runaway_path].size += self.runaway_rate
            else:
                self.files[self.runaway_path] = VFSFile(
                    content=b"", size=self.runaway_rate
                )


def _norm(path: str) -> str:
    if not path.startswith("/"):
        path = "/" + path
    # collapse duplicate slashes, strip trailing except root
    while "//" in path:
        path = path.replace("//", "/")
    if len(path) > 1 and path.endswith("/"):
        path = path[:-1]
    return path


def _h(n: int) -> str:
    """Format bytes like `df -h`."""
    for unit in ("B", "K", "M", "G", "T"):
        if n < 1024:
            return f"{n:>4.1f}{unit}"
        n /= 1024
    return f"{n:.1f}P"


class CommandResult:
    __slots__ = ("stdout", "error")

    def __init__(self, stdout: str = "", error: Optional[str] = None):
        self.stdout = stdout
        self.error = error


def execute(vfs: VFS, cmd: str) -> CommandResult:
    """Parse a bash-lite command and execute against the VFS."""
    cmd = cmd.strip()
    if not cmd:
        return CommandResult(error="empty command")

    # Handle: echo "content" > /path
    m = re.match(r'^echo\s+(.+?)\s*>\s*(\S+)\s*$', cmd)
    if m:
        content_raw, path = m.group(1), m.group(2)
        content = content_raw.strip().strip('"').strip("'")
        vfs.write(path, content + "\n")
        return CommandResult(stdout="")

    # Handle: cat > /path << EOF ... EOF   (simplified, single-line)
    # (not supporting heredocs; agents should use echo > form)

    try:
        tokens = shlex.split(cmd)
    except ValueError as e:
        return CommandResult(error=f"parse error: {e}")
    if not tokens:
        return CommandResult(error="empty command")

    verb = tokens[0]
    args = tokens[1:]

    if verb == "df":
        return CommandResult(stdout=vfs.df_output())

    if verb == "ls":
        paths = [a for a in args if not a.startswith("-")]
        target = paths[0] if paths else "/"
        expanded = vfs.expand_glob(target)
        out = "\n".join(vfs.ls(p) for p in expanded) if expanded else vfs.ls(target)
        return CommandResult(stdout=out)

    if verb == "du":
        paths = [a for a in args if not a.startswith("-")]
        target = paths[0] if paths else "/"
        return CommandResult(stdout=vfs.du(target))

    if verb == "cat":
        if not args:
            return CommandResult(error="cat: missing operand")
        return CommandResult(stdout=vfs.cat(args[0]))

    if verb == "rm":
        recursive = any(a in ("-r", "-rf", "-fr", "-R", "-f") for a in args)
        paths = [a for a in args if not a.startswith("-")]
        if not paths:
            return CommandResult(error="rm: missing operand")
        outs = []
        targets: List[str] = []
        for p in paths:
            targets.extend(vfs.expand_glob(p))
        for t in targets:
            outs.append(vfs.rm(t, recursive=recursive))
        out = "\n".join(o for o in outs if o)
        return CommandResult(stdout=out)

    if verb == "find":
        paths = [a for a in args if not a.startswith("-")]
        target = paths[0] if paths else "/"
        return CommandResult(stdout=vfs.find(target))

    if verb == "sha256sum":
        if not args:
            return CommandResult(error="sha256sum: missing operand")
        return CommandResult(stdout=vfs.sha256(args[0]))

    if verb == "systemctl":
        # accept optional flags between verb and service
        non_flag = [a for a in args if not a.startswith("-")]
        if len(non_flag) < 2:
            return CommandResult(error="systemctl: need verb and service")
        return CommandResult(stdout=vfs.systemctl(non_flag[0], non_flag[1]))

    if verb == "echo":
        return CommandResult(stdout=" ".join(args))

    if verb in ("pwd",):
        return CommandResult(stdout="/")

    return CommandResult(error=f"command not supported: {verb}")