LordXido commited on
Commit
c2ab996
·
verified ·
1 Parent(s): abd9fb7

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +503 -24
app.py CHANGED
@@ -1,29 +1,508 @@
1
  import gradio as gr
2
- from compiler import compile_codexbyte
3
- from codexbyte_vm import CodexByteVM
 
 
 
 
 
4
 
5
- vm = CodexByteVM()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
 
7
- def run_codexbyte(source: str):
8
- try:
9
- program = compile_codexbyte(source.strip().splitlines())
10
- ledger = vm.run(program)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  return {
12
- "registers": vm.reg,
13
- "memory": vm.mem,
14
- "omega_ledger": ledger
 
 
 
15
  }
16
- except Exception as e:
17
- return {"error": str(e)}
18
-
19
- gr.Interface(
20
- fn=run_codexbyte,
21
- inputs=gr.Textbox(
22
- lines=16,
23
- label="CodexByte Program",
24
- placeholder="LOAD_IMM 0 1000\nLOAD_MEM 1 0x20\nCMP 1 0\nJZ 10\nHALT"
25
- ),
26
- outputs=gr.JSON(label="Ω-State Proof"),
27
- title="CodexByte ΩΞ Runtime",
28
- description="Bytecode execution is enforcement. Execution trace is proof."
29
- ).launch()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import gradio as gr
2
+ import hashlib
3
+ import json
4
+ import time
5
+ import io
6
+ import zipfile
7
+ from dataclasses import dataclass, asdict
8
+ from typing import List, Dict, Any, Tuple
9
 
10
+ # ============================================================
11
+ # CodexByte ΩΞ Runtime — Single-File, HF-Safe, Proof-First
12
+ # ============================================================
13
+
14
+ # ----------------------------
15
+ # ISA (stable core)
16
+ # ----------------------------
17
+ OPCODES = {
18
+ "HALT": 0x00,
19
+ "LOAD_IMM": 0x01, # r, v
20
+ "LOAD_MEM": 0x02, # r, a
21
+ "STORE": 0x03, # r, a
22
+ "ADD": 0x04, # r1, r2
23
+ "SUB": 0x05, # r1, r2
24
+ "MUL": 0x06, # r1, r2
25
+ "DIV": 0x07, # r1, r2
26
+ "CMP": 0x08, # r1, r2 -> sets Z
27
+ "JMP": 0x09, # a
28
+ "JZ": 0x0A, # a
29
+ "JNZ": 0x0B, # a
30
+ "HASH": 0x0C, # r -> r becomes 64-bit int derived from sha256
31
+ "TIME": 0x10, # r -> monotonic ms (deterministic-ish per run; used for trace only)
32
+ "COMMIT": 0x0F, # -> append chained hash to Ω-ledger
33
+ "EMIT": 0x11, # r -> LAST_EMIT in mem
34
+ }
35
+
36
+ OPNAMES = {v: k for k, v in OPCODES.items()}
37
+
38
+ # ----------------------------
39
+ # Utilities
40
+ # ----------------------------
41
+ def sha256_hex(b: bytes) -> str:
42
+ return hashlib.sha256(b).hexdigest()
43
+
44
+ def stable_json(obj: Any) -> bytes:
45
+ return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
46
+
47
+ def clamp_int64(x: int) -> int:
48
+ # Keep values bounded (prevents unbounded growth)
49
+ return int(max(min(x, (1 << 63) - 1), -(1 << 63)))
50
+
51
+ def parse_int(token: str) -> int:
52
+ # Accept decimal or 0x.. forms
53
+ return int(token, 0)
54
+
55
+ # ----------------------------
56
+ # Compiler: Text -> Bytecode
57
+ # ----------------------------
58
+ def compile_codexbyte(source: str) -> List[int]:
59
+ """
60
+ Assembles a simple assembly-like form into a flat integer bytecode list.
61
+ Each instruction is encoded as: [opcode, operand1, operand2, ...] with fixed arity per op.
62
+ """
63
+ lines = source.splitlines()
64
+ bytecode: List[int] = []
65
+
66
+ for raw in lines:
67
+ line = raw.strip()
68
+ if not line or line.startswith(";") or line.startswith("#"):
69
+ continue
70
+
71
+ # strip inline comments
72
+ if ";" in line:
73
+ line = line.split(";", 1)[0].strip()
74
+ if "#" in line:
75
+ line = line.split("#", 1)[0].strip()
76
+ if not line:
77
+ continue
78
+
79
+ parts = line.split()
80
+ op = parts[0].upper()
81
+ if op not in OPCODES:
82
+ raise ValueError(f"Unknown opcode: {op}")
83
+
84
+ bytecode.append(OPCODES[op])
85
+
86
+ # encode operands
87
+ for tok in parts[1:]:
88
+ bytecode.append(parse_int(tok))
89
+
90
+ return bytecode
91
+
92
+ # ----------------------------
93
+ # Trace record
94
+ # ----------------------------
95
+ @dataclass
96
+ class TraceStep:
97
+ step: int
98
+ pc_before: int
99
+ op: str
100
+ operands: List[int]
101
+ reg_before: List[int]
102
+ reg_after: List[int]
103
+ flags_before: Dict[str, bool]
104
+ flags_after: Dict[str, bool]
105
+ mem_writes: List[Tuple[str, int]] # (addr, value)
106
+ ledger_added: str | None
107
+
108
+ # ----------------------------
109
+ # VM: Bytecode -> State/Proof
110
+ # ----------------------------
111
+ class CodexByteVM:
112
+ def __init__(self):
113
+ self.reset()
114
+
115
+ def reset(self):
116
+ self.reg = [0] * 8
117
+ self.mem: Dict[Any, int] = {}
118
+ self.flags = {"Z": False}
119
+ self.pc = 0
120
+ self.omega_ledger: List[str] = []
121
+ self._ledger_head = "0" * 64 # chained hash head
122
+ self.last_trace: List[TraceStep] = []
123
+
124
+ def _state_digest(self) -> str:
125
+ # Deterministic digest of current state (regs + mem + flags + pc)
126
+ obj = {
127
+ "pc": self.pc,
128
+ "reg": self.reg,
129
+ "mem": self.mem,
130
+ "flags": self.flags,
131
+ "ledger_head": self._ledger_head,
132
+ }
133
+ return sha256_hex(stable_json(obj))
134
+
135
+ def _commit(self) -> str:
136
+ # Chain: head <- sha256(head || state_digest)
137
+ sd = self._state_digest()
138
+ new_head = sha256_hex((self._ledger_head + sd).encode("utf-8"))
139
+ self._ledger_head = new_head
140
+ self.omega_ledger.append(new_head)
141
+ return new_head
142
+
143
+ def run(self, prog: List[int], step_limit: int = 20000, trace: bool = True) -> Dict[str, Any]:
144
+ self.last_trace = []
145
+ steps = 0
146
+
147
+ while True:
148
+ if steps >= step_limit:
149
+ raise RuntimeError(f"Step limit exceeded ({step_limit}). Possible infinite loop.")
150
+
151
+ if self.pc < 0 or self.pc >= len(prog):
152
+ raise RuntimeError(f"PC out of bounds: pc={self.pc}, program_len={len(prog)}")
153
+
154
+ pc_before = self.pc
155
+ op = prog[self.pc]
156
+ self.pc += 1
157
+
158
+ opname = OPNAMES.get(op, f"OP_{op:02X}")
159
+ reg_before = self.reg.copy()
160
+ flags_before = dict(self.flags)
161
+ mem_writes: List[Tuple[str, int]] = []
162
+ ledger_added = None
163
+
164
+ def need(n: int):
165
+ if self.pc + n > len(prog):
166
+ raise RuntimeError(f"Truncated operands for {opname} at pc={pc_before}")
167
+
168
+ def read1() -> int:
169
+ nonlocal prog
170
+ v = prog[self.pc]
171
+ self.pc += 1
172
+ return v
173
+
174
+ def read2() -> Tuple[int, int]:
175
+ need(2)
176
+ a = read1()
177
+ b = read1()
178
+ return a, b
179
+
180
+ # ---- Execute ----
181
+ if op == 0x00: # HALT
182
+ pass
183
+
184
+ elif op == 0x01: # LOAD_IMM r v
185
+ r, v = read2()
186
+ self._check_reg(r)
187
+ self.reg[r] = clamp_int64(v)
188
+
189
+ elif op == 0x02: # LOAD_MEM r a
190
+ r, a = read2()
191
+ self._check_reg(r)
192
+ self.reg[r] = clamp_int64(self.mem.get(self._addr(a), 0))
193
+
194
+ elif op == 0x03: # STORE r a
195
+ r, a = read2()
196
+ self._check_reg(r)
197
+ addr = self._addr(a)
198
+ self.mem[addr] = clamp_int64(self.reg[r])
199
+ mem_writes.append((str(addr), self.mem[addr]))
200
+
201
+ elif op == 0x04: # ADD r1 r2
202
+ r1, r2 = read2()
203
+ self._check_reg(r1); self._check_reg(r2)
204
+ self.reg[r1] = clamp_int64(self.reg[r1] + self.reg[r2])
205
+
206
+ elif op == 0x05: # SUB r1 r2
207
+ r1, r2 = read2()
208
+ self._check_reg(r1); self._check_reg(r2)
209
+ self.reg[r1] = clamp_int64(self.reg[r1] - self.reg[r2])
210
+
211
+ elif op == 0x06: # MUL r1 r2
212
+ r1, r2 = read2()
213
+ self._check_reg(r1); self._check_reg(r2)
214
+ self.reg[r1] = clamp_int64(self.reg[r1] * self.reg[r2])
215
+
216
+ elif op == 0x07: # DIV r1 r2
217
+ r1, r2 = read2()
218
+ self._check_reg(r1); self._check_reg(r2)
219
+ if self.reg[r2] == 0:
220
+ raise ZeroDivisionError("DIV by zero")
221
+ self.reg[r1] = clamp_int64(int(self.reg[r1] / self.reg[r2]))
222
+
223
+ elif op == 0x08: # CMP r1 r2
224
+ r1, r2 = read2()
225
+ self._check_reg(r1); self._check_reg(r2)
226
+ self.flags["Z"] = (self.reg[r1] == self.reg[r2])
227
+
228
+ elif op == 0x09: # JMP a
229
+ a = read1()
230
+ self.pc = self._pc(a, len(prog))
231
+
232
+ elif op == 0x0A: # JZ a
233
+ a = read1()
234
+ if self.flags["Z"]:
235
+ self.pc = self._pc(a, len(prog))
236
 
237
+ elif op == 0x0B: # JNZ a
238
+ a = read1()
239
+ if not self.flags["Z"]:
240
+ self.pc = self._pc(a, len(prog))
241
+
242
+ elif op == 0x0C: # HASH r (store as 64-bit int)
243
+ r = read1()
244
+ self._check_reg(r)
245
+ h = sha256_hex(str(self.reg[r]).encode("utf-8"))
246
+ self.reg[r] = int(h[:16], 16) # 64-bit-ish
247
+
248
+ elif op == 0x10: # TIME r
249
+ r = read1()
250
+ self._check_reg(r)
251
+ # Note: time is not strictly deterministic across runs; use only for observability.
252
+ self.reg[r] = int(time.time() * 1000)
253
+
254
+ elif op == 0x0F: # COMMIT
255
+ ledger_added = self._commit()
256
+
257
+ elif op == 0x11: # EMIT r
258
+ r = read1()
259
+ self._check_reg(r)
260
+ self.mem["LAST_EMIT"] = clamp_int64(self.reg[r])
261
+ mem_writes.append(("LAST_EMIT", self.mem["LAST_EMIT"]))
262
+
263
+ else:
264
+ raise RuntimeError(f"Unsupported opcode: 0x{op:02X} at pc={pc_before}")
265
+
266
+ reg_after = self.reg.copy()
267
+ flags_after = dict(self.flags)
268
+
269
+ # Trace
270
+ if trace:
271
+ # operands captured approximately: from prog slice
272
+ # best-effort decode: since pc moved, reconstruct from pc_before+1 to current pc
273
+ op_slice = prog[pc_before+1:self.pc]
274
+ self.last_trace.append(TraceStep(
275
+ step=steps,
276
+ pc_before=pc_before,
277
+ op=opname,
278
+ operands=op_slice,
279
+ reg_before=reg_before,
280
+ reg_after=reg_after,
281
+ flags_before=flags_before,
282
+ flags_after=flags_after,
283
+ mem_writes=mem_writes,
284
+ ledger_added=ledger_added
285
+ ))
286
+
287
+ steps += 1
288
+ if op == 0x00: # HALT
289
+ break
290
+
291
+ return self.snapshot()
292
+
293
+ def snapshot(self) -> Dict[str, Any]:
294
  return {
295
+ "registers": self.reg,
296
+ "memory": self.mem,
297
+ "flags": self.flags,
298
+ "omega_ledger": self.omega_ledger,
299
+ "ledger_head": self._ledger_head,
300
+ "state_digest": self._state_digest(),
301
  }
302
+
303
+ def _check_reg(self, r: int):
304
+ if not (0 <= r < 8):
305
+ raise ValueError(f"Invalid register index r={r}, expected 0..7")
306
+
307
+ def _addr(self, a: int) -> str:
308
+ # normalize address into stable string
309
+ return hex(a) if isinstance(a, int) else str(a)
310
+
311
+ def _pc(self, a: int, n: int) -> int:
312
+ if not (0 <= a < n):
313
+ raise ValueError(f"Invalid jump target {a}, program_len={n}")
314
+ return a
315
+
316
+ # ----------------------------
317
+ # Proof bundle (ZIP export)
318
+ # ----------------------------
319
+ def build_proof_bundle(
320
+ source: str,
321
+ bytecode: List[int],
322
+ result: Dict[str, Any],
323
+ trace_steps: List[TraceStep],
324
+ ) -> bytes:
325
+ trace_json = [asdict(s) for s in trace_steps]
326
+ bundle = {
327
+ "meta": {
328
+ "system": "CodexByte_Runtime",
329
+ "proof_format": "OmegaTraceBundle.v1",
330
+ },
331
+ "source": source,
332
+ "bytecode": bytecode,
333
+ "result": result,
334
+ "trace": trace_json,
335
+ }
336
+
337
+ verifier_py = r'''
338
+ import json, hashlib
339
+
340
+ def sha256_hex(b: bytes) -> str:
341
+ return hashlib.sha256(b).hexdigest()
342
+
343
+ def stable_json(obj):
344
+ return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
345
+
346
+ def replay(bundle_path: str):
347
+ with open(bundle_path, "rb") as f:
348
+ z = f.read()
349
+ # This verifier expects you to open the ZIP and inspect bundle.json.
350
+ print("Open the ZIP, extract bundle.json, and use your runtime to replay bytecode.")
351
+ print("Bundle bytes sha256:", sha256_hex(z))
352
+
353
+ if __name__ == "__main__":
354
+ replay("CodexByte_ProofBundle.zip")
355
+ '''.lstrip()
356
+
357
+ mem = io.BytesIO()
358
+ with zipfile.ZipFile(mem, "w", compression=zipfile.ZIP_DEFLATED) as z:
359
+ z.writestr("bundle.json", json.dumps(bundle, indent=2, ensure_ascii=False))
360
+ z.writestr("verifier.py", verifier_py)
361
+ z.writestr("README.txt",
362
+ "CodexByte Proof Bundle\n"
363
+ "- bundle.json contains source, bytecode, trace, result\n"
364
+ "- verifier.py provides a minimal integrity hook\n"
365
+ "Replay verification: run the same bytecode in the runtime; compare ledger_head/state_digest.\n")
366
+ return mem.getvalue()
367
+
368
+ # ----------------------------
369
+ # Samples
370
+ # ----------------------------
371
+ SAMPLES: Dict[str, str] = {
372
+ "Contract: obligation satisfied (commit+emit)": """\
373
+ ; If mem[0x20] == 1000 -> satisfied (emit 0), else breach (emit 1)
374
+ LOAD_IMM 0 1000
375
+ LOAD_MEM 1 0x20
376
+ CMP 1 0
377
+ JZ 18
378
+ ; breach
379
+ LOAD_IMM 2 1
380
+ STORE 2 0x30
381
+ COMMIT
382
+ EMIT 2
383
+ HALT
384
+ ; satisfied
385
+ LOAD_IMM 2 0
386
+ STORE 2 0x30
387
+ COMMIT
388
+ EMIT 2
389
+ HALT
390
+ """,
391
+ "Ledger integrity demo (multi-commit)": """\
392
+ LOAD_IMM 0 7
393
+ COMMIT
394
+ ADD 0 0
395
+ COMMIT
396
+ HASH 0
397
+ COMMIT
398
+ EMIT 0
399
+ HALT
400
+ """,
401
+ "Loop demo (safe with step limit)": """\
402
+ LOAD_IMM 0 0
403
+ LOAD_IMM 1 1
404
+ ADD 0 1
405
+ JMP 4
406
+ HALT
407
+ """,
408
+ }
409
+
410
+ # ============================================================
411
+ # Gradio App
412
+ # ============================================================
413
+ vm = CodexByteVM()
414
+
415
+ def load_sample(name: str) -> str:
416
+ return SAMPLES.get(name, "")
417
+
418
+ def run_program(source: str, step_limit: int, enable_trace: bool, preload_mem_0x20: int):
419
+ vm.reset()
420
+ # preload for common contract patterns
421
+ vm.mem[hex(0x20)] = int(preload_mem_0x20)
422
+
423
+ bytecode = compile_codexbyte(source)
424
+ result = vm.run(bytecode, step_limit=step_limit, trace=enable_trace)
425
+
426
+ trace = [asdict(s) for s in vm.last_trace] if enable_trace else []
427
+ proof_zip = build_proof_bundle(source, bytecode, result, vm.last_trace if enable_trace else [])
428
+
429
+ # gr.File expects a path OR a tuple (name, bytes) in newer versions.
430
+ # Use (filename, bytes) which HF Gradio accepts.
431
+ return (
432
+ {
433
+ "status": "OK",
434
+ "result": result,
435
+ "bytecode_len": len(bytecode),
436
+ "preloaded_memory": {"0x20": preload_mem_0x20},
437
+ },
438
+ trace,
439
+ ("CodexByte_ProofBundle.zip", proof_zip),
440
+ )
441
+
442
+ def replay_verify(source: str, step_limit: int, preload_mem_0x20: int):
443
+ """
444
+ Replay check: run twice, compare final ledger_head & state_digest.
445
+ """
446
+ # run 1
447
+ vm1 = CodexByteVM()
448
+ vm1.mem[hex(0x20)] = int(preload_mem_0x20)
449
+ bc = compile_codexbyte(source)
450
+ r1 = vm1.run(bc, step_limit=step_limit, trace=False)
451
+
452
+ # run 2
453
+ vm2 = CodexByteVM()
454
+ vm2.mem[hex(0x20)] = int(preload_mem_0x20)
455
+ r2 = vm2.run(bc, step_limit=step_limit, trace=False)
456
+
457
+ ok = (r1["ledger_head"] == r2["ledger_head"]) and (r1["state_digest"] == r2["state_digest"])
458
+ return {
459
+ "replay_ok": ok,
460
+ "run1": {"ledger_head": r1["ledger_head"], "state_digest": r1["state_digest"]},
461
+ "run2": {"ledger_head": r2["ledger_head"], "state_digest": r2["state_digest"]},
462
+ "note": "TIME opcode makes replay non-deterministic. Avoid TIME in proofs."
463
+ }
464
+
465
+ with gr.Blocks(title="CodexByte ΩΞ Runtime") as demo:
466
+ gr.Markdown(
467
+ "# CodexByte ΩΞ Runtime\n"
468
+ "**Programs = contracts • Execution = enforcement • Trace = Ω-proof**\n\n"
469
+ "This runtime executes CodexByte deterministically and emits an intrinsic proof bundle."
470
+ )
471
+
472
+ with gr.Row():
473
+ sample = gr.Dropdown(list(SAMPLES.keys()), value="Contract: obligation satisfied (commit+emit)", label="Sample Programs")
474
+ load_btn = gr.Button("Load Sample")
475
+
476
+ program = gr.Textbox(lines=18, label="CodexByte Program")
477
+ load_btn.click(load_sample, inputs=sample, outputs=program)
478
+
479
+ with gr.Row():
480
+ preload = gr.Number(value=1000, label="Preload mem[0x20] value (common contract input)")
481
+ step_limit = gr.Slider(100, 50000, value=20000, step=100, label="Step limit (safety)")
482
+ trace_on = gr.Checkbox(value=True, label="Enable trace (Ω-step log)")
483
+
484
+ run_btn = gr.Button("Execute")
485
+ with gr.Tabs():
486
+ with gr.Tab("Ω-State Result"):
487
+ result_out = gr.JSON()
488
+ proof_file = gr.File(label="Download Proof Bundle (ZIP)")
489
+ with gr.Tab("Execution Trace"):
490
+ trace_out = gr.JSON()
491
+ with gr.Tab("Replay Verification"):
492
+ verify_btn = gr.Button("Replay verify (run twice)")
493
+ verify_out = gr.JSON()
494
+
495
+ run_btn.click(
496
+ fn=run_program,
497
+ inputs=[program, step_limit, trace_on, preload],
498
+ outputs=[result_out, trace_out, proof_file],
499
+ )
500
+
501
+ verify_btn.click(
502
+ fn=replay_verify,
503
+ inputs=[program, step_limit, preload],
504
+ outputs=verify_out
505
+ )
506
+
507
+ if __name__ == "__main__":
508
+ demo.launch()