shinnosukeono Claude Opus 4.5 commited on
Commit
86da8ad
·
0 Parent(s):

Initial commit: Parallel request processing for JPharmatron

Browse files

- 8 parallel input/output boxes with streaming
- Multi-GPU architecture (one vLLM engine per GPU)
- Multiprocessing workers for true parallelism
- Individual and batch execution modes

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

Files changed (3) hide show
  1. README.md +41 -0
  2. app.py +377 -0
  3. requirements.txt +5 -0
README.md ADDED
@@ -0,0 +1,41 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ---
2
+ title: JPharmatron Parallel Chat
3
+ emoji: 💊
4
+ colorFrom: blue
5
+ colorTo: purple
6
+ sdk: gradio
7
+ sdk_version: 5.45.0
8
+ app_file: app.py
9
+ pinned: false
10
+ hardware: nvidia-l40s-x8
11
+ ---
12
+
13
+ # JPharmatron Parallel Chat
14
+
15
+ Parallel request processing interface for JPharmatron-7B-chat model.
16
+
17
+ ## Features
18
+
19
+ - **8 Parallel Request Processing**: Submit up to 8 prompts simultaneously
20
+ - **Independent Streaming Outputs**: Each response streams independently
21
+ - **Multi-GPU Architecture**: One vLLM engine instance per L40S GPU
22
+ - **True Parallelism**: No contention between requests
23
+
24
+ ## Hardware Requirements
25
+
26
+ This Space requires **8x NVIDIA L40S** GPUs (48GB VRAM each).
27
+
28
+ - Each 7B model instance uses ~14GB VRAM in fp16
29
+ - 8 independent instances = 8x true throughput
30
+ - No inter-GPU communication overhead
31
+
32
+ ## Usage
33
+
34
+ 1. Enter prompts in any of the 8 input text boxes
35
+ 2. Select mode options (pharmaceutical expert, international standards, specific procedures)
36
+ 3. Click "Run All in Parallel" to execute all prompts simultaneously
37
+ 4. Watch responses stream in real-time in their corresponding output boxes
38
+
39
+ ## Model
40
+
41
+ Uses [EQUES/JPharmatron-7B-chat](https://huggingface.co/EQUES/JPharmatron-7B-chat) - a pharmaceutical domain expert model.
app.py ADDED
@@ -0,0 +1,377 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import os
3
+ import re
4
+ import uuid
5
+ import threading
6
+ import queue
7
+ from multiprocessing import Process, Queue
8
+ from typing import Generator
9
+
10
+ import gradio as gr
11
+
12
+ NUM_GPUS = 8
13
+
14
+ # Stop strings for generation
15
+ STOP_STRINGS = [
16
+ "\nUser:", "\nユーザ:", "\nユーザー:",
17
+ "\nAssistant:", "\nアシスタント:",
18
+ "\nHuman:", "\nHuman:"
19
+ ]
20
+ # Regex for post-processing cleanup
21
+ STOP_RE = re.compile(
22
+ r"(?:^|\n)(?:User|ユーザ|ユーザー|Assistant|アシスタント)[::].*",
23
+ re.MULTILINE
24
+ )
25
+
26
+
27
+ def gpu_worker_main(gpu_id: int, request_queue: Queue, response_queue: Queue):
28
+ """
29
+ Worker process that runs on a dedicated GPU.
30
+ Each worker has its own vLLM engine instance.
31
+ """
32
+ os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
33
+
34
+ import asyncio
35
+ from vllm import AsyncLLMEngine, AsyncEngineArgs, SamplingParams
36
+
37
+ # Initialize vLLM engine on this GPU
38
+ engine_args = AsyncEngineArgs(
39
+ model="EQUES/JPharmatron-7B-chat",
40
+ enforce_eager=True,
41
+ gpu_memory_utilization=0.85,
42
+ )
43
+ engine = AsyncLLMEngine.from_engine_args(engine_args)
44
+
45
+ loop = asyncio.new_event_loop()
46
+ asyncio.set_event_loop(loop)
47
+
48
+ async def generate_and_stream(request_id: str, prompt: str):
49
+ """Generate tokens and stream chunks back via response queue."""
50
+ params = SamplingParams(
51
+ temperature=0.0,
52
+ max_tokens=4096,
53
+ repetition_penalty=1.2,
54
+ stop=STOP_STRINGS,
55
+ )
56
+
57
+ previous_text = ""
58
+ try:
59
+ async for out in engine.generate(prompt, params, request_id=request_id):
60
+ full_text = out.outputs[0].text
61
+
62
+ # Check for stop patterns that might have leaked through
63
+ m = STOP_RE.search(full_text)
64
+ if m:
65
+ cut = m.start()
66
+ chunk = full_text[len(previous_text):cut]
67
+ if chunk:
68
+ response_queue.put((gpu_id, chunk, False))
69
+ break
70
+
71
+ chunk = full_text[len(previous_text):]
72
+ previous_text = full_text
73
+ if chunk:
74
+ response_queue.put((gpu_id, chunk, False))
75
+
76
+ except Exception as e:
77
+ response_queue.put((gpu_id, f"\n[Error: {str(e)}]", False))
78
+
79
+ # Signal completion
80
+ response_queue.put((gpu_id, "", True))
81
+
82
+ # Main worker loop
83
+ while True:
84
+ try:
85
+ request = request_queue.get(timeout=1.0)
86
+ except:
87
+ continue
88
+
89
+ if request is None: # Shutdown signal
90
+ break
91
+
92
+ request_id, prompt = request
93
+ loop.run_until_complete(generate_and_stream(request_id, prompt))
94
+
95
+
96
+ class ParallelInferenceManager:
97
+ """Manages multiple GPU worker processes for parallel inference."""
98
+
99
+ def __init__(self, num_gpus: int = NUM_GPUS):
100
+ self.num_gpus = num_gpus
101
+ self.workers = []
102
+ self.request_queues = []
103
+ self.response_queue = Queue() # Shared response queue
104
+ self._started = False
105
+
106
+ def start(self):
107
+ """Start all GPU worker processes."""
108
+ if self._started:
109
+ return
110
+
111
+ for gpu_id in range(self.num_gpus):
112
+ request_queue = Queue()
113
+ self.request_queues.append(request_queue)
114
+
115
+ process = Process(
116
+ target=gpu_worker_main,
117
+ args=(gpu_id, request_queue, self.response_queue),
118
+ daemon=True
119
+ )
120
+ process.start()
121
+ self.workers.append(process)
122
+
123
+ self._started = True
124
+
125
+ def submit_request(self, gpu_id: int, prompt: str, request_id: str):
126
+ """Submit a request to a specific GPU worker."""
127
+ if 0 <= gpu_id < self.num_gpus:
128
+ self.request_queues[gpu_id].put((request_id, prompt))
129
+
130
+ def shutdown(self):
131
+ """Shutdown all workers."""
132
+ for q in self.request_queues:
133
+ q.put(None)
134
+ for w in self.workers:
135
+ w.join(timeout=5)
136
+ if w.is_alive():
137
+ w.terminate()
138
+
139
+
140
+ # Global manager instance (initialized lazily)
141
+ _manager = None
142
+ _manager_lock = threading.Lock()
143
+
144
+
145
+ def get_manager() -> ParallelInferenceManager:
146
+ """Get or create the global inference manager."""
147
+ global _manager
148
+ if _manager is None:
149
+ with _manager_lock:
150
+ if _manager is None:
151
+ _manager = ParallelInferenceManager(NUM_GPUS)
152
+ _manager.start()
153
+ return _manager
154
+
155
+
156
+ def build_prompt(user_input: str, mode: list[str]) -> str:
157
+ """Build the prompt with system instructions and mode settings."""
158
+ base_prompt = "あなたは製薬に関する専門家です。製薬に関するユーザーの質問に親切に回答してください。参照した文献を回答の末尾に常に提示してください。\n"
159
+
160
+ if "製薬の専門家" in mode:
161
+ base_prompt += "あなたは製薬に関する専門家���す。製薬に関するユーザーの質問に親切に回答してください。参照した文献は常に提示してください。\n"
162
+ if "国際基準に準拠" in mode:
163
+ base_prompt += "回答に際して、国際基準に準拠してください。\n"
164
+ if "具体的な手順" in mode:
165
+ base_prompt += "回答には具体的な作業手順を含めてください。\n"
166
+
167
+ base_prompt += f"ユーザー: {user_input}\nアシスタント:"
168
+ return base_prompt
169
+
170
+
171
+ def respond_parallel(
172
+ prompt0: str, prompt1: str, prompt2: str, prompt3: str,
173
+ prompt4: str, prompt5: str, prompt6: str, prompt7: str,
174
+ mode: list[str]
175
+ ) -> Generator:
176
+ """
177
+ Process up to 8 prompts in parallel, streaming results back.
178
+ Each prompt is sent to a dedicated GPU worker.
179
+ """
180
+ prompts = [prompt0, prompt1, prompt2, prompt3, prompt4, prompt5, prompt6, prompt7]
181
+ manager = get_manager()
182
+
183
+ # Track active requests and their results
184
+ results = [""] * NUM_GPUS
185
+ active_gpus = set()
186
+
187
+ # Submit non-empty prompts to their respective GPUs
188
+ for gpu_id, prompt in enumerate(prompts):
189
+ if prompt and prompt.strip():
190
+ full_prompt = build_prompt(prompt.strip(), mode)
191
+ request_id = f"req_{gpu_id}_{uuid.uuid4().hex[:8]}"
192
+ manager.submit_request(gpu_id, full_prompt, request_id)
193
+ active_gpus.add(gpu_id)
194
+ results[gpu_id] = "" # Initialize result
195
+ else:
196
+ results[gpu_id] = "" # Empty prompt = empty result
197
+
198
+ if not active_gpus:
199
+ # No prompts to process
200
+ yield tuple(results)
201
+ return
202
+
203
+ # Stream results from all active workers
204
+ while active_gpus:
205
+ try:
206
+ gpu_id, chunk, is_done = manager.response_queue.get(timeout=0.1)
207
+
208
+ if is_done:
209
+ active_gpus.discard(gpu_id)
210
+ else:
211
+ results[gpu_id] += chunk
212
+
213
+ # Yield current state of all results
214
+ yield tuple(results)
215
+
216
+ except:
217
+ # Timeout - yield current state and continue
218
+ yield tuple(results)
219
+
220
+
221
+ def respond_single(gpu_id: int, prompt: str, mode: list[str]) -> Generator:
222
+ """Process a single prompt on a specific GPU."""
223
+ manager = get_manager()
224
+
225
+ if not prompt or not prompt.strip():
226
+ yield ""
227
+ return
228
+
229
+ full_prompt = build_prompt(prompt.strip(), mode)
230
+ request_id = f"single_{gpu_id}_{uuid.uuid4().hex[:8]}"
231
+ manager.submit_request(gpu_id, full_prompt, request_id)
232
+
233
+ result = ""
234
+ while True:
235
+ try:
236
+ recv_gpu_id, chunk, is_done = manager.response_queue.get(timeout=0.1)
237
+
238
+ # Only process responses for our request
239
+ if recv_gpu_id == gpu_id:
240
+ if is_done:
241
+ break
242
+ result += chunk
243
+ yield result
244
+
245
+ except:
246
+ yield result
247
+
248
+
249
+ # Build the Gradio interface
250
+ with gr.Blocks(title="JPharmatron Parallel Chat") as demo:
251
+ gr.Markdown("# 💊 JPharmatron - Parallel Request Processing")
252
+ gr.Markdown(
253
+ "Enter up to 8 prompts and process them simultaneously on dedicated GPUs. "
254
+ "Each response streams independently."
255
+ )
256
+
257
+ # Mode selection
258
+ mode = gr.CheckboxGroup(
259
+ label="モード (Mode)",
260
+ choices=["製薬の専門家", "国際基準に準拠", "具体的な手順"],
261
+ value=[],
262
+ )
263
+
264
+ # Preset examples
265
+ gr.Markdown("### 🔧 Presets (click to copy)")
266
+ preset_list = [
267
+ "グレープフルーツと薬を一緒に飲んじゃだめなんですか?",
268
+ "新薬の臨床試験(Phase I〜III)の概要を、具体例つきで簡単に教えて。",
269
+ "ジェネリック医薬品が承認されるまでの流れを、タイムラインで解説して。",
270
+ "抗生物質の作用機序と耐性菌について説明してください。",
271
+ "COVID-19ワクチンの開発プロセスを教えてください。",
272
+ "薬物相互作用の主なメカニズムを教えてください。",
273
+ "バイオシミラーと先発医薬品の違いは何ですか?",
274
+ "製薬企業のGMP(Good Manufacturing Practice)について説明してください。",
275
+ ]
276
+
277
+ # Input section
278
+ gr.Markdown("### 📝 Input Prompts")
279
+ with gr.Row():
280
+ with gr.Column():
281
+ input_boxes = []
282
+ for i in range(4):
283
+ tb = gr.Textbox(
284
+ label=f"Prompt {i+1}",
285
+ placeholder=f"Enter prompt {i+1}...",
286
+ lines=3
287
+ )
288
+ input_boxes.append(tb)
289
+ with gr.Column():
290
+ for i in range(4, 8):
291
+ tb = gr.Textbox(
292
+ label=f"Prompt {i+1}",
293
+ placeholder=f"Enter prompt {i+1}...",
294
+ lines=3
295
+ )
296
+ input_boxes.append(tb)
297
+
298
+ # Examples that fill multiple boxes
299
+ gr.Examples(
300
+ examples=[preset_list[:4], preset_list[4:]],
301
+ inputs=input_boxes[:4],
302
+ label="Fill first 4 prompts with presets"
303
+ )
304
+
305
+ # Control buttons
306
+ with gr.Row():
307
+ run_all_btn = gr.Button("🚀 Run All in Parallel", variant="primary", scale=2)
308
+ clear_inputs_btn = gr.Button("🗑️ Clear Inputs", scale=1)
309
+ clear_outputs_btn = gr.Button("🗑️ Clear Outputs", scale=1)
310
+
311
+ # Output section
312
+ gr.Markdown("### 📤 Streaming Outputs")
313
+ with gr.Row():
314
+ with gr.Column():
315
+ output_boxes = []
316
+ for i in range(4):
317
+ tb = gr.Textbox(
318
+ label=f"Response {i+1}",
319
+ lines=10,
320
+ interactive=False,
321
+ show_copy_button=True
322
+ )
323
+ output_boxes.append(tb)
324
+ with gr.Column():
325
+ for i in range(4, 8):
326
+ tb = gr.Textbox(
327
+ label=f"Response {i+1}",
328
+ lines=10,
329
+ interactive=False,
330
+ show_copy_button=True
331
+ )
332
+ output_boxes.append(tb)
333
+
334
+ # Wire up the "Run All" button
335
+ run_all_btn.click(
336
+ fn=respond_parallel,
337
+ inputs=input_boxes + [mode],
338
+ outputs=output_boxes
339
+ )
340
+
341
+ # Clear buttons
342
+ clear_inputs_btn.click(
343
+ fn=lambda: tuple([""] * 8),
344
+ inputs=None,
345
+ outputs=input_boxes
346
+ )
347
+ clear_outputs_btn.click(
348
+ fn=lambda: tuple([""] * 8),
349
+ inputs=None,
350
+ outputs=output_boxes
351
+ )
352
+
353
+ # Individual run buttons for each slot
354
+ gr.Markdown("### 🎯 Run Individual Prompts")
355
+ with gr.Row():
356
+ for i in range(8):
357
+ btn = gr.Button(f"Run #{i+1}", size="sm")
358
+ # Create closure to capture gpu_id
359
+ def make_single_handler(gpu_id):
360
+ def handler(prompt, mode):
361
+ yield from respond_single(gpu_id, prompt, mode)
362
+ return handler
363
+ btn.click(
364
+ fn=make_single_handler(i),
365
+ inputs=[input_boxes[i], mode],
366
+ outputs=[output_boxes[i]]
367
+ )
368
+
369
+
370
+ def main():
371
+ """Entry point for the application."""
372
+ demo.queue()
373
+ demo.launch()
374
+
375
+
376
+ if __name__ == "__main__":
377
+ main()
requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ transformers>=4.40.0
2
+ accelerate>=0.30.0
3
+ gradio>=5.45.0
4
+ vllm>=0.4.0
5
+ torch>=2.2.0