Spaces:
Sleeping
Sleeping
| """ | |
| ABPT Research Worker — Gradio app с ZeroGPU. | |
| Бесплатный T4 GPU на HuggingFace Spaces. | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import json | |
| import os | |
| import re | |
| import runpy | |
| import subprocess | |
| import sys | |
| import time | |
| import traceback | |
| from contextlib import redirect_stderr, redirect_stdout | |
| from pathlib import Path | |
| from typing import Any | |
| import gradio as gr | |
| import spaces | |
| SCRIPTS_DIR = Path(__file__).parent / "scripts" | |
| INPROCESS_CACHEABLE_SCRIPTS = { | |
| "run_qwen_per_case_diagnostic.py", | |
| "run_qwen_per_case_diagnostic_v2.py", | |
| } | |
| _OVERLAY_CACHE: dict[str, Any] = {} | |
| ZEROGPU_DURATION_SECONDS = 120 | |
| SCRIPT_CLI_METADATA: dict[str, dict[str, Any]] = { | |
| "run_qwen_20domain_geometry_gated.py": { | |
| "model_flag": "--model", | |
| "preserve_underscores": { | |
| "max_new_tokens", "max_length", "conflict_threshold", "bias_scale", | |
| "repetition_penalty", "frequency_penalty", "no_repeat_ngram_size", | |
| "max_bias_gate_sum", "entropy_top_k", "entropy_threshold", | |
| "entropy_slope", "pressure_threshold", "pressure_slope", | |
| "pressure_rescue_floor", "mature_r1_threshold", "template_delta_threshold", "r1_ceiling", | |
| "output_dir", | |
| }, | |
| }, | |
| "run_qwen_20domain_retention_campaign.py": { | |
| "model_flag": "--model", | |
| "preserve_underscores": { | |
| "max_new_tokens", "max_length", "bias_scale", "conflict_threshold", | |
| "repetition_penalty", "frequency_penalty", "no_repeat_ngram_size", | |
| "max_bias_gate_sum", "entropy_top_k", "entropy_threshold", | |
| "entropy_slope", "pressure_threshold", "pressure_slope", | |
| "pressure_rescue_floor", "output_dir", | |
| }, | |
| }, | |
| "run_qwen_anchor_carryover_probe.py": { | |
| "model_flag": "--model", | |
| "preserve_underscores": {"max_length", "neutral_components", "neutral_variance_cutoff", "case_name"}, | |
| }, | |
| "run_qwen_anchor_layer_profile_map.py": { | |
| "model_flag": "--model", | |
| "preserve_underscores": {"max_length", "case_name"}, | |
| }, | |
| "run_qwen_anchor_concept_direction_map.py": { | |
| "model_flag": "--model", | |
| "preserve_underscores": {"max_length", "neutral_components", "neutral_variance_cutoff", "case_name"}, | |
| }, | |
| "run_qwen_injection_layer_ablation.py": { | |
| "model_flag": "--model-name", | |
| "preserve_underscores": {"input_json", "output_json", "output_md"}, | |
| }, | |
| "run_qwen_future_influence_probe.py": { | |
| "model_flag": "--model", | |
| "preserve_underscores": {"max_length", "future_window", "top_k", "span_threshold", "top_spans", "case_filter"}, | |
| }, | |
| } | |
| def _default_device() -> str: | |
| try: | |
| import torch | |
| return "cuda" if torch.cuda.is_available() else "cpu" | |
| except Exception: | |
| return "cpu" | |
| def _default_cpu_quant_mode() -> str: | |
| return os.environ.get("CPU_QUANT_MODE", "int8").strip().lower() or "int8" | |
| def _build_cpu_quant_kwargs(device: str | None) -> tuple[dict[str, Any], str | None]: | |
| if str(device).lower() != "cpu": | |
| return {}, None | |
| quant_mode = _default_cpu_quant_mode() | |
| if quant_mode in {"off", "none", "false", "0"}: | |
| return {}, None | |
| try: | |
| from transformers import TorchAoConfig | |
| from torchao.quantization import Int4WeightOnlyConfig, Int8WeightOnlyConfig | |
| except Exception as exc: | |
| return {}, f"cpu_quant_unavailable: {exc}" | |
| try: | |
| if quant_mode == "int4": | |
| quantization_config = TorchAoConfig(Int4WeightOnlyConfig(group_size=128)) | |
| else: | |
| quantization_config = TorchAoConfig(Int8WeightOnlyConfig()) | |
| return { | |
| "quantization_config": quantization_config, | |
| "low_cpu_mem_usage": True, | |
| }, quant_mode | |
| except Exception as exc: | |
| return {}, f"cpu_quant_init_failed: {exc}" | |
| def _build_cli_args(script: str, args: dict[str, Any]) -> list[str]: | |
| cli_args: list[str] = [] | |
| meta = SCRIPT_CLI_METADATA.get(script, {}) | |
| preserve_underscores = set(meta.get("preserve_underscores", set())) | |
| for key, val in args.items(): | |
| key_str = str(key) | |
| cli_key = f"--{key_str}" if key_str in preserve_underscores else f"--{key_str.replace('_', '-')}" | |
| if isinstance(val, bool): | |
| if val: | |
| cli_args.append(cli_key) | |
| continue | |
| if isinstance(val, (list, tuple)): | |
| cli_args.append(cli_key) | |
| cli_args.extend(str(item) for item in val) | |
| else: | |
| cli_args.extend([cli_key, str(val)]) | |
| return cli_args | |
| def _load_cached_overlay( | |
| model_name: str, | |
| *, | |
| factory: Any, | |
| cfg: Any, | |
| device: str | None, | |
| torch_dtype: Any, | |
| extra_kwargs: dict[str, Any], | |
| ) -> Any: | |
| import torch | |
| from src.model.qwen_anchor_overlay import QwenAnchorOverlay | |
| resolved_device = device or ("cuda" if torch.cuda.is_available() else None) | |
| resolved_dtype = torch_dtype | |
| if resolved_dtype is None and torch.cuda.is_available(): | |
| resolved_dtype = torch.float16 | |
| cpu_quant_kwargs, cpu_quant_status = _build_cpu_quant_kwargs(resolved_device) | |
| cache_key = f"{model_name}|{resolved_device}|{resolved_dtype}|{cpu_quant_status or 'fp'}" | |
| cached = _OVERLAY_CACHE.get(cache_key) | |
| if cached is not None: | |
| if resolved_device is not None: | |
| cached = cached.to(resolved_device) | |
| cached.eval() | |
| return cached | |
| load_kwargs = dict(extra_kwargs) | |
| load_kwargs.update(cpu_quant_kwargs) | |
| if hasattr(factory, "__func__"): | |
| overlay = factory.__func__( | |
| QwenAnchorOverlay, | |
| model_name=model_name, | |
| cfg=cfg, | |
| device=resolved_device, | |
| torch_dtype=resolved_dtype, | |
| **load_kwargs, | |
| ) | |
| else: | |
| overlay = factory( | |
| model_name=model_name, | |
| cfg=cfg, | |
| device=resolved_device, | |
| torch_dtype=resolved_dtype, | |
| **load_kwargs, | |
| ) | |
| overlay.eval() | |
| setattr(overlay, "_cpu_quant_status", cpu_quant_status or "fp") | |
| _OVERLAY_CACHE[cache_key] = overlay | |
| return overlay | |
| def _run_inprocess_cached( | |
| *, | |
| script_path: Path, | |
| script: str, | |
| args: dict[str, Any], | |
| model: str, | |
| ) -> dict[str, Any]: | |
| from src.model.qwen_anchor_overlay import QwenAnchorOverlay | |
| original_factory = QwenAnchorOverlay.from_pretrained | |
| original_argv = list(sys.argv) | |
| stdout_buffer = io.StringIO() | |
| stderr_buffer = io.StringIO() | |
| def cached_from_pretrained( | |
| cls, | |
| model_name: str = "Qwen/Qwen3.5-4B", | |
| cfg: Any | None = None, | |
| device: str | None = None, | |
| torch_dtype: Any | None = None, | |
| **kwargs: Any, | |
| ) -> Any: | |
| return _load_cached_overlay( | |
| model_name=model_name, | |
| factory=original_factory, | |
| cfg=cfg, | |
| device=device, | |
| torch_dtype=torch_dtype, | |
| extra_kwargs=kwargs, | |
| ) | |
| model_flag = str(SCRIPT_CLI_METADATA.get(script, {}).get("model_flag", "--model-name")) | |
| cli_args = [str(script_path), *_build_cli_args(script, args), model_flag, model] | |
| returncode = 0 | |
| t0 = time.time() | |
| try: | |
| QwenAnchorOverlay.from_pretrained = classmethod(cached_from_pretrained) | |
| sys.argv = cli_args | |
| with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer): | |
| try: | |
| runpy.run_path(str(script_path), run_name="__main__") | |
| except SystemExit as exc: | |
| code = exc.code | |
| if code in (None, 0): | |
| returncode = 0 | |
| elif isinstance(code, int): | |
| returncode = code | |
| else: | |
| returncode = 1 | |
| print(code, file=sys.stderr) | |
| except Exception: | |
| returncode = 1 | |
| traceback.print_exc() | |
| finally: | |
| QwenAnchorOverlay.from_pretrained = original_factory | |
| sys.argv = original_argv | |
| stdout = stdout_buffer.getvalue() | |
| stderr = stderr_buffer.getvalue() | |
| return { | |
| "status": "success" if returncode == 0 else "failed", | |
| "returncode": returncode, | |
| "elapsed_seconds": round(time.time() - t0, 1), | |
| "stdout_tail": stdout[-3000:], | |
| "stderr_tail": stderr[-1000:], | |
| "result_json": _extract_result_json(stdout, stderr), | |
| "runner": "inprocess_cached", | |
| "cached_models": len(_OVERLAY_CACHE), | |
| } | |
| def _gpu_decorator(fn): | |
| """Apply @spaces.GPU only when ZeroGPU is available.""" | |
| try: | |
| import torch | |
| if torch.cuda.is_available() or os.environ.get("SPACES_ZERO_GPU"): | |
| return spaces.GPU(duration=ZEROGPU_DURATION_SECONDS)(fn) | |
| except Exception: | |
| pass | |
| return fn | |
| def run_experiment(request_json: str) -> str: | |
| """ | |
| Запускает эксперимент на GPU. Принимает и возвращает JSON строки. | |
| Input JSON: | |
| {"script": "run_qwen_phase_probe.py", "args": {...}, "model": "Qwen/Qwen3.5-4B"} | |
| Output JSON: | |
| {"status": "success", "returncode": 0, "elapsed_seconds": 120, | |
| "stdout_tail": "...", "stderr_tail": "...", "result_json": {...}} | |
| """ | |
| try: | |
| req = json.loads(request_json) | |
| except json.JSONDecodeError as e: | |
| return json.dumps({"status": "error", "stderr_tail": f"Invalid JSON: {e}"}) | |
| script = req.get("script", "") | |
| args = dict(req.get("args", {})) | |
| model = req.get("model", "Qwen/Qwen3.5-4B") | |
| max_timeout = (ZEROGPU_DURATION_SECONDS - 20) if os.environ.get("SPACES_ZERO_GPU") else 7200 | |
| timeout = min(req.get("timeout", max_timeout), max_timeout) | |
| args.setdefault("device", _default_device()) | |
| os.environ.setdefault("ATTN_IMPL", "sdpa") # save VRAM on ZeroGPU | |
| script_path = SCRIPTS_DIR / script | |
| if not script_path.exists(): | |
| return json.dumps({"status": "error", "stderr_tail": f"Script not found: {script}"}) | |
| # Sanitize | |
| if ".." in script or "/" in script: | |
| return json.dumps({"status": "error", "stderr_tail": "Invalid script name"}) | |
| if script in INPROCESS_CACHEABLE_SCRIPTS: | |
| print(f"[Worker] Running cached in-process path for {script}") | |
| try: | |
| output = _run_inprocess_cached( | |
| script_path=script_path, | |
| script=script, | |
| args=args, | |
| model=model, | |
| ) | |
| return json.dumps(output, ensure_ascii=False) | |
| except Exception as e: | |
| return json.dumps({"status": "error", "stderr_tail": f"in-process runner failed: {e}"}) | |
| # Build command | |
| model_flag = str(SCRIPT_CLI_METADATA.get(script, {}).get("model_flag", "--model-name")) | |
| cmd = [sys.executable, str(script_path), *_build_cli_args(script, args), model_flag, model] | |
| print(f"[Worker] Running: {' '.join(cmd)}") | |
| t0 = time.time() | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| cwd=str(SCRIPTS_DIR.parent), | |
| timeout=timeout, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return json.dumps({"status": "timeout", "elapsed_seconds": timeout}) | |
| except Exception as e: | |
| return json.dumps({"status": "error", "stderr_tail": str(e)}) | |
| elapsed = time.time() - t0 | |
| # Try to find and read result JSON | |
| result_json = _extract_result_json(result.stdout or "", result.stderr or "") | |
| output = { | |
| "status": "success" if result.returncode == 0 else "failed", | |
| "returncode": result.returncode, | |
| "elapsed_seconds": round(elapsed, 1), | |
| "stdout_tail": (result.stdout or "")[-3000:], | |
| "stderr_tail": (result.stderr or "")[-1000:], | |
| "result_json": result_json, | |
| } | |
| return json.dumps(output, ensure_ascii=False) | |
| def health_check() -> str: | |
| """Проверка состояния worker.""" | |
| import torch | |
| scripts = [p.name for p in SCRIPTS_DIR.glob("run_qwen_*.py")] if SCRIPTS_DIR.exists() else [] | |
| return json.dumps({ | |
| "status": "ok", | |
| "gpu": torch.cuda.is_available(), | |
| "gpu_name": torch.cuda.get_device_name(0) if torch.cuda.is_available() else None, | |
| "default_device": _default_device(), | |
| "cpu_quant_mode": _default_cpu_quant_mode(), | |
| "scripts": scripts, | |
| "cached_models": len(_OVERLAY_CACHE), | |
| "inprocess_cacheable_scripts": sorted(INPROCESS_CACHEABLE_SCRIPTS), | |
| }, indent=2) | |
| def _extract_result_json(stdout: str, stderr: str) -> dict[str, Any] | None: | |
| combined = stdout + "\n" + stderr | |
| for pattern in ( | |
| r"saved_json=(?P<path>[^\r\n]+)", | |
| r"saved json:\s*(?P<path>[^\r\n]+)", | |
| r"saved_json:\s*(?P<path>[^\r\n]+)", | |
| ): | |
| match = re.search(pattern, combined, flags=re.IGNORECASE) | |
| if match: | |
| raw_path = match.group("path").strip().strip("'\"") | |
| p = Path(raw_path) | |
| if p.exists(): | |
| try: | |
| return json.loads(p.read_text(encoding="utf-8")) | |
| except Exception: | |
| pass | |
| marker = "===FINAL_RESULT===" | |
| marker_index = combined.rfind(marker) | |
| if marker_index >= 0: | |
| tail = combined[marker_index + len(marker):].strip() | |
| decoder = json.JSONDecoder() | |
| for i, ch in enumerate(tail): | |
| if ch not in "{[": | |
| continue | |
| try: | |
| payload, _ = decoder.raw_decode(tail[i:]) | |
| except Exception: | |
| continue | |
| if isinstance(payload, dict): | |
| return payload | |
| return None | |
| # Gradio UI + API | |
| with gr.Blocks(title="ABPT Research Worker") as demo: | |
| gr.Markdown("# ABPT Research Worker (ZeroGPU)") | |
| gr.Markdown("GPU worker for autonomous research loop. Use via API.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_box = gr.Textbox( | |
| label="Request JSON", | |
| placeholder='{"script": "run_qwen_phase_probe.py", "args": {}, "model": "Qwen/Qwen3.5-4B"}', | |
| lines=5, | |
| ) | |
| run_btn = gr.Button("Run Experiment", variant="primary") | |
| with gr.Column(): | |
| output_box = gr.Textbox(label="Result JSON", lines=10) | |
| run_btn.click(fn=run_experiment, inputs=input_box, outputs=output_box, api_name="run") | |
| health_btn = gr.Button("Health Check") | |
| health_output = gr.Textbox(label="Health", lines=5) | |
| health_btn.click(fn=health_check, outputs=health_output, api_name="health") | |
| demo.launch() | |