scvcoder commited on
Commit
4f8a16e
·
verified ·
1 Parent(s): f160777

Use generator pattern (cross-process yield via spaces res_queue) — ZeroGPU forks process so streamer queue isn't shared

Browse files
Files changed (1) hide show
  1. src/kpaa/llm/zerogpu_backend.py +40 -48
src/kpaa/llm/zerogpu_backend.py CHANGED
@@ -109,39 +109,38 @@ class ZeroGPUBackend:
109
  add_generation_prompt=True,
110
  return_tensors="pt",
111
  )
112
- # transformers 5.x 는 BatchEncoding(dict-like) 을, 4.x 는 Tensor 를
113
- # 반환. model.generate(input_ids=...) 는 Tensor 만 받으므로 추출.
114
  input_ids = _encoded["input_ids"] if hasattr(_encoded, "input_ids") else _encoded
115
 
116
- from transformers import TextIteratorStreamer
117
-
118
- streamer = TextIteratorStreamer(
119
- tok,
120
- skip_prompt=True,
121
- skip_special_tokens=True,
122
- )
123
-
124
  @_gpu_decorator(self._gpu_duration)
125
- def _run_generate() -> None:
126
- """ZeroGPU 점유 동안 generate 완료까지 블로킹.
127
 
128
- 내부에서 cuda 모델·입력을 옮기고, 별도 threadgenerate
129
- 실행 → 메인 코루틴은 streamer 에서 토큰 빨아냄. 이 함수가
130
- 반환되어야 ZeroGPU 가 GPU 회수.
131
  """
132
  import torch as _t
133
- from transformers import GenerationConfig
134
 
135
  device = "cuda" if _t.cuda.is_available() else "cpu"
136
- print(f"[kpaa.zerogpu] _run_generate start, device={device}", flush=True)
137
  if device == "cuda":
138
  model.to(device)
139
  print(f"[kpaa.zerogpu] model moved to cuda", flush=True)
140
  ids = input_ids.to(device)
141
  print(f"[kpaa.zerogpu] input shape={tuple(ids.shape)}, max_new_tokens={opts.max_tokens}", flush=True)
142
 
143
- # transformers 5.x generate(temperature=...) 직접 호출을 무시하고
144
- # GenerationConfig 객체로만 받음. 명시적으로 묶어서 전달.
 
 
 
 
 
145
  gen_cfg = GenerationConfig(
146
  max_new_tokens=opts.max_tokens,
147
  do_sample=opts.temperature > 0.0,
@@ -161,10 +160,6 @@ class ZeroGPUBackend:
161
  print(f"[kpaa.zerogpu] generate() raised: {type(e).__name__}: {e}", flush=True)
162
  raise
163
  finally:
164
- # transformers 5.x 에서 streamer.end() 가 generate 종료 시점에
165
- # 자동 호출되지 않는 케이스가 있음 → next(streamer) 가 영원히
166
- # 블록. 성공/실패 모두 명시적으로 end() 호출해 stop_signal
167
- # 을 큐에 넣어 consumer 의 StopIteration 을 보장.
168
  try:
169
  streamer.end()
170
  print(f"[kpaa.zerogpu] streamer.end() called", flush=True)
@@ -173,43 +168,40 @@ class ZeroGPUBackend:
173
 
174
  gen_thread = Thread(target=_generate_target, daemon=True)
175
  gen_thread.start()
176
- gen_thread.join() # GPU 점유 중 generate 완료 대기
177
- print(f"[kpaa.zerogpu] _run_generate end", flush=True)
178
 
179
- # generate 호출 자체는 별도 thread (run_in_executor) 그래야
180
- # 코루틴이 streamer 에서 토큰을 비동기로 빨아낼 수 있음.
181
- gen_future = loop.run_in_executor(None, _run_generate)
 
 
 
 
 
182
 
183
- _tok_count = [0]
 
184
 
185
- def _next_token() -> str | None:
 
186
  try:
187
- t = next(streamer)
188
- _tok_count[0] += 1
189
- if _tok_count[0] <= 3 or _tok_count[0] % 100 == 0:
190
- print(
191
- f"[kpaa.zerogpu] streamer #{_tok_count[0]}: {t!r}",
192
- flush=True,
193
- )
194
- return t
195
  except StopIteration:
196
- print(
197
- f"[kpaa.zerogpu] streamer exhausted, total={_tok_count[0]}",
198
- flush=True,
199
- )
200
- return None
201
 
 
202
  try:
203
  while True:
204
- token = await loop.run_in_executor(None, _next_token)
205
- if token is None:
206
  break
207
  if token:
208
  yield token
209
  finally:
210
- # generate 가 아직 안 끝났으면 마무리 대기 (GPU 회수 보장).
211
- if not gen_future.done():
212
- await gen_future
 
 
213
 
214
  async def close(self) -> None:
215
  # CUDA cache 는 ZeroGPU 가 자동 정리. transformers 는 GC.
 
109
  add_generation_prompt=True,
110
  return_tensors="pt",
111
  )
112
+ # transformers 5.x 는 BatchEncoding 을, 4.x 는 Tensor 를 반환.
 
113
  input_ids = _encoded["input_ids"] if hasattr(_encoded, "input_ids") else _encoded
114
 
115
+ # ZeroGPU @spaces.GPU 함수를 fork() 자식 프로세스에서 실행한다.
116
+ # 자식 프로세스의 streamer.text_queue 는 부모 프로세스에서 접근 불가.
117
+ # @spaces.GPU 를 generator 함수에 적용하면 spaces 가 res_queue 로
118
+ # yield 값을 부모로 전송 (multiprocessing-safe). streamer 도 자식
119
+ # 프로세스 내에서 생성·소비.
 
 
 
120
  @_gpu_decorator(self._gpu_duration)
121
+ def _run_generate_gen():
122
+ """ZeroGPU 자식 프로세스에서 토큰 generator.
123
 
124
+ yield spaces res_queue 통해 부모 프세스로 전달된다.
 
 
125
  """
126
  import torch as _t
127
+ from transformers import GenerationConfig, TextIteratorStreamer
128
 
129
  device = "cuda" if _t.cuda.is_available() else "cpu"
130
+ print(f"[kpaa.zerogpu] _gen start, device={device}", flush=True)
131
  if device == "cuda":
132
  model.to(device)
133
  print(f"[kpaa.zerogpu] model moved to cuda", flush=True)
134
  ids = input_ids.to(device)
135
  print(f"[kpaa.zerogpu] input shape={tuple(ids.shape)}, max_new_tokens={opts.max_tokens}", flush=True)
136
 
137
+ # streamer자식 프로세스 내에서 생성 — 자식 프로세스 thread 간 통신.
138
+ streamer = TextIteratorStreamer(
139
+ tok,
140
+ skip_prompt=True,
141
+ skip_special_tokens=True,
142
+ )
143
+
144
  gen_cfg = GenerationConfig(
145
  max_new_tokens=opts.max_tokens,
146
  do_sample=opts.temperature > 0.0,
 
160
  print(f"[kpaa.zerogpu] generate() raised: {type(e).__name__}: {e}", flush=True)
161
  raise
162
  finally:
 
 
 
 
163
  try:
164
  streamer.end()
165
  print(f"[kpaa.zerogpu] streamer.end() called", flush=True)
 
168
 
169
  gen_thread = Thread(target=_generate_target, daemon=True)
170
  gen_thread.start()
 
 
171
 
172
+ # streamer 에서 토큰 빨아내며 yield spaces 가 res_queue 통해 부모 전송.
173
+ tok_n = 0
174
+ for token in streamer:
175
+ if token:
176
+ tok_n += 1
177
+ if tok_n <= 3 or tok_n % 100 == 0:
178
+ print(f"[kpaa.zerogpu] yield #{tok_n}: {token!r}", flush=True)
179
+ yield token
180
 
181
+ gen_thread.join()
182
+ print(f"[kpaa.zerogpu] _gen end (total yielded={tok_n})", flush=True)
183
 
184
+ # 부모: 자식의 generator async iterate. spaces 가 cross-process 직렬화 처리.
185
+ def _safe_next(gen):
186
  try:
187
+ return next(gen), False
 
 
 
 
 
 
 
188
  except StopIteration:
189
+ return None, True
 
 
 
 
190
 
191
+ gen = _run_generate_gen()
192
  try:
193
  while True:
194
+ token, done = await loop.run_in_executor(None, _safe_next, gen)
195
+ if done:
196
  break
197
  if token:
198
  yield token
199
  finally:
200
+ # gen 기 (자식 프로세스 종료 보장).
201
+ try:
202
+ gen.close()
203
+ except Exception:
204
+ pass
205
 
206
  async def close(self) -> None:
207
  # CUDA cache 는 ZeroGPU 가 자동 정리. transformers 는 GC.