Spaces:
Sleeping
Sleeping
| import logging | |
| import os | |
| import sys | |
| import time | |
| import uuid | |
| import gradio as gr | |
| from daggr import FnNode, Graph | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", | |
| stream=sys.stdout, | |
| ) | |
| log = logging.getLogger("daggr.micro") | |
| import hashlib | |
| def _stamp(*parts) -> str: | |
| h = hashlib.sha1("||".join(map(str, parts)).encode("utf-8")).hexdigest()[:10] | |
| return h | |
| def _phase(title: str): | |
| log.info("========== %s ==========", title) | |
| def _drv(lines, msg: str): | |
| log.info("[DRV] %s", msg) | |
| lines.append(msg) | |
| def _assert_drv(lines, cond: bool, msg_ok: str, msg_bad: str): | |
| if cond: | |
| _drv(lines, f"OK | {msg_ok}") | |
| else: | |
| _drv(lines, f"FAIL | {msg_bad}") | |
| def make_d(seed_d: int, salt_d: int) -> tuple[int, str]: | |
| run_id = uuid.uuid4().hex[:8] | |
| t0 = time.time() | |
| log.info("[D] start run=%s seed_d=%s salt_d=%s", run_id, seed_d, salt_d) | |
| val = (int(seed_d) * 10) + (int(salt_d) % 10) | |
| tag = f"D(seed={int(seed_d)},salt={int(salt_d)})" | |
| out = (val, tag) | |
| dt = (time.time() - t0) * 1000 | |
| log.info("[D] done run=%s dt=%.1fms out=%s", run_id, dt, out) | |
| return out | |
| def make_e(seed_e: int, salt_e: int) -> tuple[int, str]: | |
| run_id = uuid.uuid4().hex[:8] | |
| t0 = time.time() | |
| log.info("[E] start run=%s seed_e=%s salt_e=%s", run_id, seed_e, salt_e) | |
| val = (int(seed_e) * 100) + (int(salt_e) % 10) | |
| tag = f"E(seed={int(seed_e)},salt={int(salt_e)})" | |
| out = (val, tag) | |
| dt = (time.time() - t0) * 1000 | |
| log.info("[E] done run=%s dt=%.1fms out=%s", run_id, dt, out) | |
| return out | |
| def combine(d_val: int, d_tag: str, e_val: int, e_tag: str, alpha: float) -> tuple[int, str]: | |
| run_id = uuid.uuid4().hex[:8] | |
| t0 = time.time() | |
| a = float(alpha) | |
| log.info("[A] start run=%s d=%s e=%s alpha=%s", run_id, (d_val, d_tag), (e_val, e_tag), a) | |
| mix = int((int(d_val) * a) + (int(e_val) * (1.0 - a))) | |
| summary = f"mix={mix} | {d_tag} + {e_tag} | alpha={a:.2f}" | |
| dt = (time.time() - t0) * 1000 | |
| log.info("[A] done run=%s dt=%.1fms mix=%s", run_id, dt, mix) | |
| return mix, summary | |
| def postprocess(mix: int, a_summary: str, mode: str, bump: int) -> str: | |
| run_id = uuid.uuid4().hex[:8] | |
| t0 = time.time() | |
| bump_i = int(bump) | |
| log.info("[B] start run=%s mode=%s bump=%s mix=%s", run_id, mode, bump_i, mix) | |
| if mode == "add": | |
| mix2 = int(mix) + bump_i | |
| elif mode == "mul": | |
| mix2 = int(mix) * max(1, bump_i) | |
| else: | |
| mix2 = int(mix) | |
| text = f"{a_summary} || B({mode},{bump_i}) => {mix2}" | |
| dt = (time.time() - t0) * 1000 | |
| log.info("[B] done run=%s dt=%.1fms mix_out=%s", run_id, dt, mix2) | |
| return text | |
| def observe(b_text: str, note: str) -> str: | |
| run_id = uuid.uuid4().hex[:8] | |
| t0 = time.time() | |
| log.info("[C] start run=%s note=%s", run_id, note) | |
| out = f"[C] note={note} | {b_text}" | |
| dt = (time.time() - t0) * 1000 | |
| log.info("[C] done run=%s dt=%.1fms", run_id, dt) | |
| return out | |
| def driver(run_id: int = 1) -> str: | |
| """ | |
| Driver: exécute une séquence déterministe d'appels Python | |
| pour valider la logique (phases 0-3) et produire un rapport. | |
| IMPORTANT: ne met pas à jour l'historique UI des nodes D/E/A/B/C. | |
| """ | |
| lines = [] | |
| sid = _stamp("driver", run_id, time.time_ns()) | |
| _phase(f"DRIVER start sid={sid}") | |
| _drv(lines, f"sid={sid}") | |
| _drv(lines, "NOTE: driver calls functions directly; UI history for D/E/A/B/C will NOT change.") | |
| # ---------------- | |
| # PH0 baseline | |
| # ---------------- | |
| _phase("PH0 baseline") | |
| d0 = make_d(0, 0) | |
| e0 = make_e(0, 0) | |
| a0 = combine(d0[0], d0[1], e0[0], e0[1], 0.5) | |
| b0 = postprocess(a0[0], a0[1], "none", 1) | |
| c0 = observe(b0, "hello") | |
| _drv(lines, f"PH0 | D={d0} | E={e0}") | |
| _drv(lines, f"PH0 | A(mix,summary)={a0}") | |
| _drv(lines, f"PH0 | B(text)={b0}") | |
| _drv(lines, f"PH0 | C(text)={c0}") | |
| # invariants simples | |
| _assert_drv(lines, isinstance(a0[0], int), "A.mix is int", f"A.mix not int: {type(a0[0])}") | |
| _assert_drv(lines, "alpha=0.50" in a0[1], "A.summary contains alpha", "A.summary missing alpha") | |
| _assert_drv(lines, "mix=" in a0[1], "A.summary contains mix", "A.summary missing mix") | |
| # ---------------- | |
| # PH1A D versions | |
| # ---------------- | |
| _phase("PH1A D versions (E fixed)") | |
| for sd in [1, 2]: | |
| d = make_d(sd, 0) | |
| a = combine(d[0], d[1], e0[0], e0[1], 0.5) | |
| _drv(lines, f"PH1A | seed_d={sd} => D={d} | A={a}") | |
| _assert_drv(lines, a[0] == int(d[0] * 0.5 + e0[0] * 0.5), "A.mix matches weighted sum", "A.mix mismatch") | |
| # ---------------- | |
| # PH1B E versions (D fixed) | |
| # ---------------- | |
| _phase("PH1B E versions (D fixed to seed=2)") | |
| d2 = make_d(2, 0) | |
| for se in [1, 2]: | |
| e = make_e(se, 0) | |
| a = combine(d2[0], d2[1], e[0], e[1], 0.5) | |
| _drv(lines, f"PH1B | seed_e={se} => E={e} | A={a}") | |
| _assert_drv(lines, a[0] == int(d2[0] * 0.5 + e[0] * 0.5), "A.mix matches weighted sum", "A.mix mismatch") | |
| # ---------------- | |
| # PH2 alpha variations (same upstream pair) | |
| # ---------------- | |
| _phase("PH2 alpha variations (D=2,E=1)") | |
| d = make_d(2, 0) | |
| e = make_e(1, 0) | |
| for alpha in [0.2, 0.8, 0.35]: | |
| a = combine(d[0], d[1], e[0], e[1], alpha) | |
| _drv(lines, f"PH2 | alpha={alpha:.2f} => A={a}") | |
| _assert_drv(lines, f"alpha={alpha:.2f}" in a[1], "A.summary alpha matches", "A.summary alpha mismatch") | |
| # ---------------- | |
| # PH3 downstream noise | |
| # ---------------- | |
| _phase("PH3 downstream noise") | |
| a = combine(d[0], d[1], e[0], e[1], 0.35) | |
| for mode, bump in [("add", 1), ("add", 2), ("mul", 2)]: | |
| b = postprocess(a[0], a[1], mode, bump) | |
| c = observe(b, f"note-{mode}-{bump}") | |
| _drv(lines, f"PH3 | B({mode},{bump}) => {b}") | |
| _drv(lines, f"PH3 | C(note-{mode}-{bump}) => {c}") | |
| _phase(f"DRIVER done sid={sid}") | |
| return "\n".join(lines) | |
| DRIVER = FnNode( | |
| fn=driver, | |
| inputs={ | |
| "run_id": gr.Slider(label="Driver run_id", minimum=1, maximum=20, step=1, value=1), | |
| }, | |
| outputs={ | |
| "report": gr.Textbox(label="Driver report", lines=30), | |
| }, | |
| ) | |
| D = FnNode( | |
| fn=make_d, | |
| inputs={ | |
| "seed_d": gr.Number(label="D seed", value=0, precision=0), | |
| "salt_d": gr.Slider(label="D salt", minimum=0, maximum=9, step=1, value=0), | |
| }, | |
| outputs={ | |
| "d_val": gr.Number(label="D val"), | |
| "d_tag": gr.Textbox(label="D tag"), | |
| }, | |
| ) | |
| E = FnNode( | |
| fn=make_e, | |
| inputs={ | |
| "seed_e": gr.Number(label="E seed", value=0, precision=0), | |
| "salt_e": gr.Slider(label="E salt", minimum=0, maximum=9, step=1, value=0), | |
| }, | |
| outputs={ | |
| "e_val": gr.Number(label="E val"), | |
| "e_tag": gr.Textbox(label="E tag"), | |
| }, | |
| ) | |
| A = FnNode( | |
| fn=combine, | |
| inputs={ | |
| "d_val": D.d_val, | |
| "d_tag": D.d_tag, | |
| "e_val": E.e_val, | |
| "e_tag": E.e_tag, | |
| "alpha": gr.Slider(label="A alpha", minimum=0.0, maximum=1.0, step=0.05, value=0.5), | |
| }, | |
| outputs={ | |
| "mix": gr.Number(label="A mix"), | |
| "a_summary": gr.Textbox(label="A summary"), | |
| }, | |
| ) | |
| B = FnNode( | |
| fn=postprocess, | |
| inputs={ | |
| "mix": A.mix, | |
| "a_summary": A.a_summary, | |
| "mode": gr.Radio(label="B mode", choices=["none", "add", "mul"], value="none"), | |
| "bump": gr.Slider(label="B bump", minimum=0, maximum=5, step=1, value=1), | |
| }, | |
| outputs={"b_text": gr.Textbox(label="B text")}, | |
| ) | |
| C = FnNode( | |
| fn=observe, | |
| inputs={ | |
| "b_text": B.b_text, | |
| "note": gr.Textbox(label="C note", value="hello"), | |
| }, | |
| outputs={"c_text": gr.Textbox(label="C text")}, | |
| ) | |
| graph = Graph( | |
| name="micro-restore-debug-nojson", | |
| nodes=[D, E, A, B, C, DRIVER], | |
| persist_key=False, | |
| ) | |
| port = int(os.getenv("PORT", "7860")) | |
| graph.launch(host="0.0.0.0", port=port, open_browser=False, access_log=True) |