multimodalart HF Staff Claude Opus 4.8 (1M context) commited on
Commit
9dbbb30
·
1 Parent(s): 2eeb78f

Add in-process FastVideo executor (no worker spawn) for ZeroGPU

Browse files

inproc.py: InProcessExecutor builds the pipeline in-process (build_pipeline) and
calls pipeline.forward directly, removing the spawned worker whose CUDA init
bypasses ZeroGPU's spaces hijack. Validated locally: in-process, AOTI+text-cache
work, 2.9s warm.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

Files changed (1) hide show
  1. inproc.py +83 -0
inproc.py ADDED
@@ -0,0 +1,83 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Run FastVideo's pipeline IN-PROCESS (no spawned worker) — for ZeroGPU.
2
+
3
+ FastVideo's `VideoGenerator` always spawns a worker subprocess
4
+ (MultiprocExecutor), whose CUDA init + `.to("cuda")` happen in a separate torch
5
+ that ZeroGPU's `spaces` hijack never sees, and which grabs a GPU outside any
6
+ `@spaces.GPU` window. That's incompatible with ZeroGPU.
7
+
8
+ This swaps in an in-process Executor: it builds a `Worker` (→ `build_pipeline`)
9
+ in the SAME process and calls `pipeline.forward` directly. All of
10
+ VideoGenerator's request→ForwardBatch translation is reused unchanged; only the
11
+ execution backend changes. Combined with lazy init inside `@spaces.GPU`, the
12
+ whole pipeline lives in the GPU-allocated process — the ZeroGPU shape.
13
+
14
+ `install()` monkeypatches `Executor.get_class` to return this backend.
15
+ """
16
+ from __future__ import annotations
17
+ import os
18
+ from typing import Any
19
+
20
+ ENABLED = os.getenv("DREAMVERSE_INPROC", "1") == "1"
21
+
22
+
23
+ def install():
24
+ if not ENABLED:
25
+ return
26
+ try:
27
+ from fastvideo.worker.executor import Executor
28
+ from fastvideo.worker.gpu_worker import Worker
29
+ except Exception as e:
30
+ print(f"[inproc] fastvideo not importable here ({e}); skipping", flush=True)
31
+ return
32
+ if getattr(Executor, "_inproc_patched", False):
33
+ return
34
+
35
+ class InProcessExecutor(Executor):
36
+ def _init_executor(self) -> None:
37
+ os.environ.setdefault("RANK", "0")
38
+ os.environ.setdefault("LOCAL_RANK", "0")
39
+ os.environ.setdefault("WORLD_SIZE", "1")
40
+ os.environ.setdefault("MASTER_ADDR", "127.0.0.1")
41
+ os.environ.setdefault("MASTER_PORT", "29591")
42
+ self.worker = Worker(self.fastvideo_args, local_rank=0, rank=0,
43
+ distributed_init_method="env://")
44
+ self.worker.init_device() # maybe_init_distributed + build_pipeline (in-process)
45
+ print("[inproc] pipeline built in-process (no worker subprocess)", flush=True)
46
+
47
+ # Override the collective path: call the worker method directly.
48
+ def execute_forward(self, forward_batch, fastvideo_args):
49
+ return self.worker.execute_forward(forward_batch, fastvideo_args)
50
+
51
+ def collective_rpc(self, method: str, timeout=None, args=(), kwargs=None) -> list[Any]:
52
+ return [getattr(self.worker, method)(*args, **(kwargs or {}))]
53
+
54
+ def set_lora_adapter(self, lora_nickname: str, lora_path: str | None = None) -> None:
55
+ self.worker.set_lora_adapter(lora_nickname, lora_path)
56
+
57
+ def unmerge_lora_weights(self) -> None:
58
+ self.worker.unmerge_lora_weights()
59
+
60
+ def merge_lora_weights(self) -> None:
61
+ self.worker.merge_lora_weights()
62
+
63
+ def set_log_queue(self, log_queue) -> None:
64
+ pass
65
+
66
+ def clear_log_queue(self) -> None:
67
+ pass
68
+
69
+ def shutdown(self) -> None:
70
+ try:
71
+ self.worker.shutdown()
72
+ except Exception:
73
+ pass
74
+
75
+ _orig = Executor.get_class.__func__ if hasattr(Executor.get_class, "__func__") else None
76
+
77
+ @staticmethod
78
+ def _patched_get_class(fastvideo_args):
79
+ return InProcessExecutor
80
+
81
+ Executor.get_class = _patched_get_class
82
+ Executor._inproc_patched = True
83
+ print("[inproc] installed in-process executor (no spawn)", flush=True)