abpt / app.py
Search
auto: sync run_qwen_phase3_evidence_synthesis.py
1bdab81
"""
ABPT Research Worker — Gradio app с ZeroGPU.
Бесплатный T4 GPU на HuggingFace Spaces.
"""
from __future__ import annotations
import io
import json
import os
import re
import runpy
import subprocess
import sys
import time
import traceback
from contextlib import redirect_stderr, redirect_stdout
from pathlib import Path
from typing import Any
import gradio as gr
import spaces
SCRIPTS_DIR = Path(__file__).parent / "scripts"
INPROCESS_CACHEABLE_SCRIPTS = {
"run_qwen_per_case_diagnostic.py",
"run_qwen_per_case_diagnostic_v2.py",
}
_OVERLAY_CACHE: dict[str, Any] = {}
ZEROGPU_DURATION_SECONDS = 120
SCRIPT_CLI_METADATA: dict[str, dict[str, Any]] = {
"run_qwen_20domain_geometry_gated.py": {
"model_flag": "--model",
"preserve_underscores": {
"max_new_tokens", "max_length", "conflict_threshold", "bias_scale",
"repetition_penalty", "frequency_penalty", "no_repeat_ngram_size",
"max_bias_gate_sum", "entropy_top_k", "entropy_threshold",
"entropy_slope", "pressure_threshold", "pressure_slope",
"pressure_rescue_floor", "mature_r1_threshold", "template_delta_threshold", "r1_ceiling",
"output_dir",
},
},
"run_qwen_20domain_retention_campaign.py": {
"model_flag": "--model",
"preserve_underscores": {
"max_new_tokens", "max_length", "bias_scale", "conflict_threshold",
"repetition_penalty", "frequency_penalty", "no_repeat_ngram_size",
"max_bias_gate_sum", "entropy_top_k", "entropy_threshold",
"entropy_slope", "pressure_threshold", "pressure_slope",
"pressure_rescue_floor", "output_dir",
},
},
"run_qwen_anchor_carryover_probe.py": {
"model_flag": "--model",
"preserve_underscores": {"max_length", "neutral_components", "neutral_variance_cutoff", "case_name"},
},
"run_qwen_anchor_layer_profile_map.py": {
"model_flag": "--model",
"preserve_underscores": {"max_length", "case_name"},
},
"run_qwen_anchor_concept_direction_map.py": {
"model_flag": "--model",
"preserve_underscores": {"max_length", "neutral_components", "neutral_variance_cutoff", "case_name"},
},
"run_qwen_injection_layer_ablation.py": {
"model_flag": "--model-name",
"preserve_underscores": {"input_json", "output_json", "output_md"},
},
"run_qwen_future_influence_probe.py": {
"model_flag": "--model",
"preserve_underscores": {"max_length", "future_window", "top_k", "span_threshold", "top_spans", "case_filter"},
},
}
def _default_device() -> str:
try:
import torch
return "cuda" if torch.cuda.is_available() else "cpu"
except Exception:
return "cpu"
def _default_cpu_quant_mode() -> str:
return os.environ.get("CPU_QUANT_MODE", "int8").strip().lower() or "int8"
def _build_cpu_quant_kwargs(device: str | None) -> tuple[dict[str, Any], str | None]:
if str(device).lower() != "cpu":
return {}, None
quant_mode = _default_cpu_quant_mode()
if quant_mode in {"off", "none", "false", "0"}:
return {}, None
try:
from transformers import TorchAoConfig
from torchao.quantization import Int4WeightOnlyConfig, Int8WeightOnlyConfig
except Exception as exc:
return {}, f"cpu_quant_unavailable: {exc}"
try:
if quant_mode == "int4":
quantization_config = TorchAoConfig(Int4WeightOnlyConfig(group_size=128))
else:
quantization_config = TorchAoConfig(Int8WeightOnlyConfig())
return {
"quantization_config": quantization_config,
"low_cpu_mem_usage": True,
}, quant_mode
except Exception as exc:
return {}, f"cpu_quant_init_failed: {exc}"
def _build_cli_args(script: str, args: dict[str, Any]) -> list[str]:
cli_args: list[str] = []
meta = SCRIPT_CLI_METADATA.get(script, {})
preserve_underscores = set(meta.get("preserve_underscores", set()))
for key, val in args.items():
key_str = str(key)
cli_key = f"--{key_str}" if key_str in preserve_underscores else f"--{key_str.replace('_', '-')}"
if isinstance(val, bool):
if val:
cli_args.append(cli_key)
continue
if isinstance(val, (list, tuple)):
cli_args.append(cli_key)
cli_args.extend(str(item) for item in val)
else:
cli_args.extend([cli_key, str(val)])
return cli_args
def _load_cached_overlay(
model_name: str,
*,
factory: Any,
cfg: Any,
device: str | None,
torch_dtype: Any,
extra_kwargs: dict[str, Any],
) -> Any:
import torch
from src.model.qwen_anchor_overlay import QwenAnchorOverlay
resolved_device = device or ("cuda" if torch.cuda.is_available() else None)
resolved_dtype = torch_dtype
if resolved_dtype is None and torch.cuda.is_available():
resolved_dtype = torch.float16
cpu_quant_kwargs, cpu_quant_status = _build_cpu_quant_kwargs(resolved_device)
cache_key = f"{model_name}|{resolved_device}|{resolved_dtype}|{cpu_quant_status or 'fp'}"
cached = _OVERLAY_CACHE.get(cache_key)
if cached is not None:
if resolved_device is not None:
cached = cached.to(resolved_device)
cached.eval()
return cached
load_kwargs = dict(extra_kwargs)
load_kwargs.update(cpu_quant_kwargs)
if hasattr(factory, "__func__"):
overlay = factory.__func__(
QwenAnchorOverlay,
model_name=model_name,
cfg=cfg,
device=resolved_device,
torch_dtype=resolved_dtype,
**load_kwargs,
)
else:
overlay = factory(
model_name=model_name,
cfg=cfg,
device=resolved_device,
torch_dtype=resolved_dtype,
**load_kwargs,
)
overlay.eval()
setattr(overlay, "_cpu_quant_status", cpu_quant_status or "fp")
_OVERLAY_CACHE[cache_key] = overlay
return overlay
def _run_inprocess_cached(
*,
script_path: Path,
script: str,
args: dict[str, Any],
model: str,
) -> dict[str, Any]:
from src.model.qwen_anchor_overlay import QwenAnchorOverlay
original_factory = QwenAnchorOverlay.from_pretrained
original_argv = list(sys.argv)
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()
def cached_from_pretrained(
cls,
model_name: str = "Qwen/Qwen3.5-4B",
cfg: Any | None = None,
device: str | None = None,
torch_dtype: Any | None = None,
**kwargs: Any,
) -> Any:
return _load_cached_overlay(
model_name=model_name,
factory=original_factory,
cfg=cfg,
device=device,
torch_dtype=torch_dtype,
extra_kwargs=kwargs,
)
model_flag = str(SCRIPT_CLI_METADATA.get(script, {}).get("model_flag", "--model-name"))
cli_args = [str(script_path), *_build_cli_args(script, args), model_flag, model]
returncode = 0
t0 = time.time()
try:
QwenAnchorOverlay.from_pretrained = classmethod(cached_from_pretrained)
sys.argv = cli_args
with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer):
try:
runpy.run_path(str(script_path), run_name="__main__")
except SystemExit as exc:
code = exc.code
if code in (None, 0):
returncode = 0
elif isinstance(code, int):
returncode = code
else:
returncode = 1
print(code, file=sys.stderr)
except Exception:
returncode = 1
traceback.print_exc()
finally:
QwenAnchorOverlay.from_pretrained = original_factory
sys.argv = original_argv
stdout = stdout_buffer.getvalue()
stderr = stderr_buffer.getvalue()
return {
"status": "success" if returncode == 0 else "failed",
"returncode": returncode,
"elapsed_seconds": round(time.time() - t0, 1),
"stdout_tail": stdout[-3000:],
"stderr_tail": stderr[-1000:],
"result_json": _extract_result_json(stdout, stderr),
"runner": "inprocess_cached",
"cached_models": len(_OVERLAY_CACHE),
}
def _gpu_decorator(fn):
"""Apply @spaces.GPU only when ZeroGPU is available."""
try:
import torch
if torch.cuda.is_available() or os.environ.get("SPACES_ZERO_GPU"):
return spaces.GPU(duration=ZEROGPU_DURATION_SECONDS)(fn)
except Exception:
pass
return fn
@_gpu_decorator
def run_experiment(request_json: str) -> str:
"""
Запускает эксперимент на GPU. Принимает и возвращает JSON строки.
Input JSON:
{"script": "run_qwen_phase_probe.py", "args": {...}, "model": "Qwen/Qwen3.5-4B"}
Output JSON:
{"status": "success", "returncode": 0, "elapsed_seconds": 120,
"stdout_tail": "...", "stderr_tail": "...", "result_json": {...}}
"""
try:
req = json.loads(request_json)
except json.JSONDecodeError as e:
return json.dumps({"status": "error", "stderr_tail": f"Invalid JSON: {e}"})
script = req.get("script", "")
args = dict(req.get("args", {}))
model = req.get("model", "Qwen/Qwen3.5-4B")
max_timeout = (ZEROGPU_DURATION_SECONDS - 20) if os.environ.get("SPACES_ZERO_GPU") else 7200
timeout = min(req.get("timeout", max_timeout), max_timeout)
args.setdefault("device", _default_device())
os.environ.setdefault("ATTN_IMPL", "sdpa") # save VRAM on ZeroGPU
script_path = SCRIPTS_DIR / script
if not script_path.exists():
return json.dumps({"status": "error", "stderr_tail": f"Script not found: {script}"})
# Sanitize
if ".." in script or "/" in script:
return json.dumps({"status": "error", "stderr_tail": "Invalid script name"})
if script in INPROCESS_CACHEABLE_SCRIPTS:
print(f"[Worker] Running cached in-process path for {script}")
try:
output = _run_inprocess_cached(
script_path=script_path,
script=script,
args=args,
model=model,
)
return json.dumps(output, ensure_ascii=False)
except Exception as e:
return json.dumps({"status": "error", "stderr_tail": f"in-process runner failed: {e}"})
# Build command
model_flag = str(SCRIPT_CLI_METADATA.get(script, {}).get("model_flag", "--model-name"))
cmd = [sys.executable, str(script_path), *_build_cli_args(script, args), model_flag, model]
print(f"[Worker] Running: {' '.join(cmd)}")
t0 = time.time()
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=str(SCRIPTS_DIR.parent),
timeout=timeout,
)
except subprocess.TimeoutExpired:
return json.dumps({"status": "timeout", "elapsed_seconds": timeout})
except Exception as e:
return json.dumps({"status": "error", "stderr_tail": str(e)})
elapsed = time.time() - t0
# Try to find and read result JSON
result_json = _extract_result_json(result.stdout or "", result.stderr or "")
output = {
"status": "success" if result.returncode == 0 else "failed",
"returncode": result.returncode,
"elapsed_seconds": round(elapsed, 1),
"stdout_tail": (result.stdout or "")[-3000:],
"stderr_tail": (result.stderr or "")[-1000:],
"result_json": result_json,
}
return json.dumps(output, ensure_ascii=False)
def health_check() -> str:
"""Проверка состояния worker."""
import torch
scripts = [p.name for p in SCRIPTS_DIR.glob("run_qwen_*.py")] if SCRIPTS_DIR.exists() else []
return json.dumps({
"status": "ok",
"gpu": torch.cuda.is_available(),
"gpu_name": torch.cuda.get_device_name(0) if torch.cuda.is_available() else None,
"default_device": _default_device(),
"cpu_quant_mode": _default_cpu_quant_mode(),
"scripts": scripts,
"cached_models": len(_OVERLAY_CACHE),
"inprocess_cacheable_scripts": sorted(INPROCESS_CACHEABLE_SCRIPTS),
}, indent=2)
def _extract_result_json(stdout: str, stderr: str) -> dict[str, Any] | None:
combined = stdout + "\n" + stderr
for pattern in (
r"saved_json=(?P<path>[^\r\n]+)",
r"saved json:\s*(?P<path>[^\r\n]+)",
r"saved_json:\s*(?P<path>[^\r\n]+)",
):
match = re.search(pattern, combined, flags=re.IGNORECASE)
if match:
raw_path = match.group("path").strip().strip("'\"")
p = Path(raw_path)
if p.exists():
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception:
pass
marker = "===FINAL_RESULT==="
marker_index = combined.rfind(marker)
if marker_index >= 0:
tail = combined[marker_index + len(marker):].strip()
decoder = json.JSONDecoder()
for i, ch in enumerate(tail):
if ch not in "{[":
continue
try:
payload, _ = decoder.raw_decode(tail[i:])
except Exception:
continue
if isinstance(payload, dict):
return payload
return None
# Gradio UI + API
with gr.Blocks(title="ABPT Research Worker") as demo:
gr.Markdown("# ABPT Research Worker (ZeroGPU)")
gr.Markdown("GPU worker for autonomous research loop. Use via API.")
with gr.Row():
with gr.Column():
input_box = gr.Textbox(
label="Request JSON",
placeholder='{"script": "run_qwen_phase_probe.py", "args": {}, "model": "Qwen/Qwen3.5-4B"}',
lines=5,
)
run_btn = gr.Button("Run Experiment", variant="primary")
with gr.Column():
output_box = gr.Textbox(label="Result JSON", lines=10)
run_btn.click(fn=run_experiment, inputs=input_box, outputs=output_box, api_name="run")
health_btn = gr.Button("Health Check")
health_output = gr.Textbox(label="Health", lines=5)
health_btn.click(fn=health_check, outputs=health_output, api_name="health")
demo.launch()