darkbit1001 commited on
Commit
b8f54c5
·
1 Parent(s): 8572c72

made more determinant in unet parsed params

Browse files
Files changed (2) hide show
  1. lcm_server.py +176 -108
  2. rknnlcm.py +66 -31
lcm_server.py CHANGED
@@ -1,3 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import io
2
  import os
3
  import json
@@ -6,7 +29,8 @@ import queue
6
  import threading
7
  from dataclasses import dataclass
8
  from concurrent.futures import Future
9
- from typing import Optional, List, Tuple
 
10
 
11
  import numpy as np
12
  from fastapi import FastAPI, Response, HTTPException
@@ -17,34 +41,34 @@ from transformers import CLIPTokenizer
17
 
18
  from rknnlcm import RKNN2Model, RKNN2LatentConsistencyPipeline
19
 
20
- # --- Your imports (as in your script) ---
21
- # from your_pkg import RKNN2LatentConsistencyPipeline, RKNN2Model
22
- # NOTE: keep these as-is in your project.
23
-
24
 
25
  # -----------------------------
26
  # Request schema (HTTP)
27
  # -----------------------------
28
  class GenerateRequest(BaseModel):
29
  prompt: str
30
- size: str = Field(default="512x512", pattern=r"^\d+x\d+$")
31
- num_inference_steps: int = 4
32
- guidance_scale: float = 1.0
33
- seed: int = 1234
34
 
35
 
36
- @dataclass
37
  class ModelPaths:
38
- root: str # args.i
 
39
  @property
40
  def scheduler_config(self) -> str:
41
- return os.path.join(self.root, "scheduler/scheduler_config.json")
 
42
  @property
43
  def text_encoder(self) -> str:
44
  return os.path.join(self.root, "text_encoder")
 
45
  @property
46
  def unet(self) -> str:
47
  return os.path.join(self.root, "unet")
 
48
  @property
49
  def vae_decoder(self) -> str:
50
  return os.path.join(self.root, "vae_decoder")
@@ -57,69 +81,118 @@ class Job:
57
  submitted_at: float
58
 
59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
  # -----------------------------
61
  # Pipeline Worker
62
  # -----------------------------
63
  class PipelineWorker:
64
  """
65
- Owns ONE pipeline instance. Run this in a dedicated thread.
66
  """
 
67
  def __init__(
68
  self,
69
  worker_id: int,
70
  paths: ModelPaths,
71
- scheduler: LCMScheduler,
72
  tokenizer: CLIPTokenizer,
73
- rknn_context_cfg: dict,
 
74
  ):
75
  self.worker_id = worker_id
76
  self.paths = paths
77
- self.scheduler = scheduler
78
  self.tokenizer = tokenizer
79
- self.rknn_context_cfg = rknn_context_cfg
 
80
 
81
- self.pipe = None # built in init()
82
  self._init_pipeline()
83
 
 
 
 
 
 
 
 
 
 
84
  def _init_pipeline(self):
85
- # IMPORTANT: Each worker gets its *own* RKNN runtime context.
86
- # You must map rknn_context_cfg to however your RKNN2Model supports it.
87
- #
88
- # Examples you might support in RKNN2Model:
89
- # RKNN2Model(path, core_mask=..., multi_context=True, device_id=..., ...)
90
- # RKNN2Model(path, runtime_options={...})
91
- #
92
- # Here: we pass **rknn_context_cfg as a flexible hook.
93
  self.pipe = RKNN2LatentConsistencyPipeline(
94
- text_encoder=RKNN2Model(self.paths.text_encoder, **self.rknn_context_cfg),
95
- unet=RKNN2Model(self.paths.unet, **self.rknn_context_cfg),
96
- vae_decoder=RKNN2Model(self.paths.vae_decoder, **self.rknn_context_cfg),
97
- scheduler=self.scheduler,
98
  tokenizer=self.tokenizer,
99
  )
100
 
101
- def run_job(self, job: Job) -> bytes:
102
- h, w = (int(x) for x in job.req.size.split("x"))
 
103
 
104
- # Deterministic per-request random generator
105
- rng = np.random.RandomState(job.req.seed)
106
-
107
- print("seed ", job.req.seed)
108
- print("rng", rng)
109
 
110
  result = self.pipe(
111
  prompt=job.req.prompt,
112
- height=h,
113
- width=w,
114
  num_inference_steps=job.req.num_inference_steps,
115
  guidance_scale=job.req.guidance_scale,
116
  generator=rng,
117
- )
118
 
119
  pil_image = result["images"][0]
120
  buf = io.BytesIO()
121
  pil_image.save(buf, format="PNG")
122
- return buf.getvalue()
123
 
124
 
125
  # -----------------------------
@@ -128,54 +201,57 @@ class PipelineWorker:
128
  class PipelineService:
129
  """
130
  Singleton-ish service that:
131
- - loads scheduler/tokenizer once
132
  - starts N worker threads
133
- - provides a queued submit() API
134
  """
 
135
  _instance = None
136
  _instance_lock = threading.Lock()
137
 
138
  def __init__(
139
  self,
140
  paths: ModelPaths,
141
- num_workers: int = 3,
142
- queue_max: int = 64,
143
  rknn_context_cfgs: Optional[List[dict]] = None,
 
144
  ):
145
  self.paths = paths
146
- self.num_workers = num_workers
147
- self.q: queue.Queue[Job] = queue.Queue(maxsize=queue_max)
148
 
149
- # Load once (shared immutable objects)
150
  with open(self.paths.scheduler_config, "r") as f:
151
- scheduler_config = json.load(f)
152
- self.scheduler = LCMScheduler.from_config(scheduler_config)
 
153
  self.tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch16")
154
 
155
- # Build per-worker RKNN context configs
156
- # If not provided, create N identical configs with multi_context enabled.
157
  if rknn_context_cfgs is None:
158
- rknn_context_cfgs = [{"multi_context": True, "worker_id": i} for i in range(num_workers)]
159
- if len(rknn_context_cfgs) != num_workers:
160
  raise ValueError("rknn_context_cfgs must match num_workers length")
161
 
162
  self.workers: List[PipelineWorker] = []
163
  self.threads: List[threading.Thread] = []
164
  self._stop = threading.Event()
165
 
166
- # Create worker-owned pipelines
167
- for i in range(num_workers):
168
- worker = PipelineWorker(
169
  worker_id=i,
170
  paths=self.paths,
171
- scheduler=self.scheduler,
172
  tokenizer=self.tokenizer,
173
  rknn_context_cfg=rknn_context_cfgs[i],
 
174
  )
175
- self.workers.append(worker)
176
 
177
- # Start threads
178
- for i in range(num_workers):
179
  t = threading.Thread(target=self._worker_loop, args=(i,), daemon=True)
180
  t.start()
181
  self.threads.append(t)
@@ -184,9 +260,10 @@ class PipelineService:
184
  def get_instance(
185
  cls,
186
  paths: ModelPaths,
187
- num_workers: int = 3,
188
- queue_max: int = 64,
189
  rknn_context_cfgs: Optional[List[dict]] = None,
 
190
  ) -> "PipelineService":
191
  with cls._instance_lock:
192
  if cls._instance is None:
@@ -195,12 +272,13 @@ class PipelineService:
195
  num_workers=num_workers,
196
  queue_max=queue_max,
197
  rknn_context_cfgs=rknn_context_cfgs,
 
198
  )
199
  return cls._instance
200
 
201
  def shutdown(self):
202
  self._stop.set()
203
- # Optionally drain queue with errors
204
  while True:
205
  try:
206
  job = self.q.get_nowait()
@@ -210,10 +288,9 @@ class PipelineService:
210
  job.fut.set_exception(RuntimeError("Service shutting down"))
211
  self.q.task_done()
212
 
213
- def submit(self, req: GenerateRequest, timeout_s: float = 0.5) -> Future:
214
  fut: Future = Future()
215
  job = Job(req=req, fut=fut, submitted_at=time.time())
216
-
217
  try:
218
  self.q.put(job, timeout=timeout_s)
219
  except queue.Full:
@@ -233,9 +310,9 @@ class PipelineService:
233
  continue
234
 
235
  try:
236
- png = worker.run_job(job)
237
  if not job.fut.done():
238
- job.fut.set_result(png)
239
  except Exception as e:
240
  if not job.fut.done():
241
  job.fut.set_exception(e)
@@ -243,58 +320,46 @@ class PipelineService:
243
  self.q.task_done()
244
 
245
 
246
- # -----------------------------
247
- # RKNN multi-context configuration
248
- # -----------------------------
249
- def build_rknn_context_cfgs_for_rk3588(num_workers: int) -> List[dict]:
250
- """
251
- Plug this into your RKNN2Model wrapper.
252
- Typical approach on RK3588:
253
- - bind each worker to a different NPU core (0/1/2)
254
- - enable multi_context so each model instance has its own runtime context
255
-
256
- You must map these fields inside RKNN2Model.
257
- """
258
- core_masks = ["NPU_CORE_0", "NPU_CORE_1", "NPU_CORE_2"]
259
- cfgs = []
260
- for i in range(num_workers):
261
- cfgs.append({
262
- "multi_context": True,
263
- '''"core_mask": core_masks[i % len(core_masks)],'''
264
- "core_mask": "NPU_CORE_AUTO",
265
- "context_name": f"w{i}",
266
- "worker_id": i,
267
- })
268
- return cfgs
269
-
270
-
271
  # -----------------------------
272
  # FastAPI server
273
  # -----------------------------
274
- app = FastAPI()
275
-
276
- # Configure these for your deployment
277
  MODEL_ROOT = os.environ.get("MODEL_ROOT", "/models/lcm_rknn")
278
- NUM_WORKERS = int(os.environ.get("NUM_WORKERS", "3"))
279
  QUEUE_MAX = int(os.environ.get("QUEUE_MAX", "64"))
 
 
 
 
 
280
 
281
  paths = ModelPaths(root=MODEL_ROOT)
282
 
283
- # Create singleton service at import time (fastest first request).
284
- service = PipelineService.get_instance(
285
- paths=paths,
286
- num_workers=NUM_WORKERS,
287
- queue_max=QUEUE_MAX,
288
- rknn_context_cfgs=build_rknn_context_cfgs_for_rk3588(NUM_WORKERS),
289
- )
 
 
 
 
 
 
 
 
 
 
290
 
291
 
292
  @app.post("/generate", responses={200: {"content": {"image/png": {}}}})
293
  def generate(req: GenerateRequest):
294
- fut = service.submit(req, timeout_s=0.25)
295
 
 
296
  try:
297
- png_bytes = fut.result(timeout=120) # you can tune this
298
  except Exception as e:
299
  msg = str(e)
300
  if "Queue full" in msg:
@@ -306,14 +371,17 @@ def generate(req: GenerateRequest):
306
  media_type="image/png",
307
  headers={
308
  "Cache-Control": "no-store",
 
309
  },
310
  )
311
 
 
312
  if __name__ == "__main__":
313
  import uvicorn
 
314
  uvicorn.run(
315
  app,
316
  host="0.0.0.0",
317
- port=int(os.environ.get("PORT", "4200")),
318
- log_config=None, # <-- key
319
  )
 
1
+ """
2
+ lcn_server.py — RKNN LCM Stable Diffusion FastAPI server (queued, multi-worker safe)
3
+
4
+ Key goals:
5
+ - One pipeline per worker thread (no shared RKNN objects across threads)
6
+ - Determin guarantee: per-request seed -> np.RandomState
7
+ - Deterministic input ordering handled in RKNN2Model (recommended)
8
+ - Explicit data_format per model (UNet + VAE commonly NHWC on RKNN)
9
+ - Queue backpressure (429 on overflow)
10
+ - Clean startup/shutdown (FastAPI lifespan)
11
+ - Returns PNG bytes + X-Seed header
12
+
13
+ Env:
14
+ MODEL_ROOT=/models/lcm_rknn
15
+ PORT=4200
16
+ NUM_WORKERS=1..3
17
+ QUEUE_MAX=64
18
+ DEFAULT_SIZE=512x512
19
+ DEFAULT_STEPS=4
20
+ DEFAULT_GUIDANCE=1.0
21
+ DEFAULT_TIMEOUT=120
22
+ """
23
+
24
  import io
25
  import os
26
  import json
 
29
  import threading
30
  from dataclasses import dataclass
31
  from concurrent.futures import Future
32
+ from typing import Optional, List, Dict, Tuple
33
+ from contextlib import asynccontextmanager
34
 
35
  import numpy as np
36
  from fastapi import FastAPI, Response, HTTPException
 
41
 
42
  from rknnlcm import RKNN2Model, RKNN2LatentConsistencyPipeline
43
 
 
 
 
 
44
 
45
  # -----------------------------
46
  # Request schema (HTTP)
47
  # -----------------------------
48
  class GenerateRequest(BaseModel):
49
  prompt: str
50
+ size: str = Field(default=os.environ.get("DEFAULT_SIZE", "512x512"), pattern=r"^\d+x\d+$")
51
+ num_inference_steps: int = Field(default=int(os.environ.get("DEFAULT_STEPS", "4")), ge=1, le=50)
52
+ guidance_scale: float = Field(default=float(os.environ.get("DEFAULT_GUIDANCE", "1.0")), ge=0.0, le=20.0)
53
+ seed: Optional[int] = Field(default=None, ge=0, le=2**31 - 1)
54
 
55
 
56
+ @dataclass(frozen=True)
57
  class ModelPaths:
58
+ root: str
59
+
60
  @property
61
  def scheduler_config(self) -> str:
62
+ return os.path.join(self.root, "scheduler", "scheduler_config.json")
63
+
64
  @property
65
  def text_encoder(self) -> str:
66
  return os.path.join(self.root, "text_encoder")
67
+
68
  @property
69
  def unet(self) -> str:
70
  return os.path.join(self.root, "unet")
71
+
72
  @property
73
  def vae_decoder(self) -> str:
74
  return os.path.join(self.root, "vae_decoder")
 
81
  submitted_at: float
82
 
83
 
84
+ # -----------------------------
85
+ # RKNN multi-context configuration
86
+ # -----------------------------
87
+ def build_rknn_context_cfgs_for_rk3588(num_workers: int) -> List[dict]:
88
+ """
89
+ You must map these fields inside RKNN2Model if you actually support them.
90
+ If your RKNN2Model does NOT accept these kwargs, set USE_RKNN_CONTEXT_CFGS=0.
91
+ """
92
+ core_masks = ["NPU_CORE_0", "NPU_CORE_1", "NPU_CORE_2"]
93
+ cfgs = []
94
+ for i in range(num_workers):
95
+ cfgs.append(
96
+ {
97
+ "multi_context": True,
98
+ # binding per-core is optional; if unstable, keep AUTO
99
+ "core_mask": core_masks[i % len(core_masks)],
100
+ # "core_mask": "NPU_CORE_AUTO",
101
+ "context_name": f"w{i}",
102
+ "worker_id": i,
103
+ }
104
+ )
105
+ return cfgs
106
+
107
+
108
+ def parse_size(size_str: str) -> Tuple[int, int]:
109
+ """
110
+ Parse 'WIDTHxHEIGHT' -> (width, height)
111
+ """
112
+ w_str, h_str = size_str.lower().split("x")
113
+ w, h = int(w_str), int(h_str)
114
+ if w <= 0 or h <= 0:
115
+ raise ValueError("size must be positive")
116
+ return w, h
117
+
118
+
119
+ def gen_seed_8_digits() -> int:
120
+ # 0..99,999,999 inclusive
121
+ return int(np.random.randint(0, 100_000_000))
122
+
123
+
124
  # -----------------------------
125
  # Pipeline Worker
126
  # -----------------------------
127
  class PipelineWorker:
128
  """
129
+ Owns ONE pipeline instance. Execute jobs sequentially on this worker.
130
  """
131
+
132
  def __init__(
133
  self,
134
  worker_id: int,
135
  paths: ModelPaths,
136
+ scheduler_config: Dict,
137
  tokenizer: CLIPTokenizer,
138
+ rknn_context_cfg: Optional[dict] = None,
139
+ use_rknn_context_cfgs: bool = True,
140
  ):
141
  self.worker_id = worker_id
142
  self.paths = paths
143
+ self.scheduler_config = scheduler_config
144
  self.tokenizer = tokenizer
145
+ self.rknn_context_cfg = rknn_context_cfg or {}
146
+ self.use_rknn_context_cfgs = use_rknn_context_cfgs
147
 
148
+ self.pipe = None
149
  self._init_pipeline()
150
 
151
+ def _mk_model(self, model_path: str, *, data_format: str) -> RKNN2Model:
152
+ """
153
+ Create one RKNN2Model with explicit data_format.
154
+ If your RKNN2Model supports multi_context/core_mask/etc, it will receive them.
155
+ """
156
+ if self.use_rknn_context_cfgs:
157
+ return RKNN2Model(model_path, data_format=data_format, **self.rknn_context_cfg)
158
+ return RKNN2Model(model_path, data_format=data_format)
159
+
160
  def _init_pipeline(self):
161
+ # IMPORTANT: per-worker scheduler instance (avoid shared mutable state)
162
+ scheduler = LCMScheduler.from_config(self.scheduler_config)
163
+
164
+ # Per-model explicit formats:
165
+ # - text encoder is token/embedding, format mostly irrelevant; keep nchw
166
+ # - unet + vae_decoder commonly require nhwc on RKNN
 
 
167
  self.pipe = RKNN2LatentConsistencyPipeline(
168
+ text_encoder=self._mk_model(self.paths.text_encoder, data_format="nchw"),
169
+ unet=self._mk_model(self.paths.unet, data_format="nhwc"),
170
+ vae_decoder=self._mk_model(self.paths.vae_decoder, data_format="nhwc"),
171
+ scheduler=scheduler,
172
  tokenizer=self.tokenizer,
173
  )
174
 
175
+ def run_job(self, job: Job) -> Tuple[bytes, int]:
176
+ # Parse WIDTHxHEIGHT
177
+ width, height = parse_size(job.req.size)
178
 
179
+ # Deterministic per-request RNG
180
+ seed = job.req.seed if job.req.seed is not None else gen_seed_8_digits()
181
+ rng = np.random.RandomState(seed)
 
 
182
 
183
  result = self.pipe(
184
  prompt=job.req.prompt,
185
+ height=height,
186
+ width=width,
187
  num_inference_steps=job.req.num_inference_steps,
188
  guidance_scale=job.req.guidance_scale,
189
  generator=rng,
190
+ )
191
 
192
  pil_image = result["images"][0]
193
  buf = io.BytesIO()
194
  pil_image.save(buf, format="PNG")
195
+ return buf.getvalue(), seed
196
 
197
 
198
  # -----------------------------
 
201
  class PipelineService:
202
  """
203
  Singleton-ish service that:
204
+ - loads scheduler_config + tokenizer once
205
  - starts N worker threads
206
+ - queues requests and runs them on worker-owned pipelines
207
  """
208
+
209
  _instance = None
210
  _instance_lock = threading.Lock()
211
 
212
  def __init__(
213
  self,
214
  paths: ModelPaths,
215
+ num_workers: int,
216
+ queue_max: int,
217
  rknn_context_cfgs: Optional[List[dict]] = None,
218
+ use_rknn_context_cfgs: bool = True,
219
  ):
220
  self.paths = paths
221
+ self.num_workers = max(1, int(num_workers))
222
+ self.q: "queue.Queue[Job]" = queue.Queue(maxsize=int(queue_max))
223
 
224
+ # Load scheduler config once (immutable dict)
225
  with open(self.paths.scheduler_config, "r") as f:
226
+ self.scheduler_config = json.load(f)
227
+
228
+ # Tokenizer is safe to share (read-only)
229
  self.tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch16")
230
 
231
+ # Worker RKNN configs
 
232
  if rknn_context_cfgs is None:
233
+ rknn_context_cfgs = build_rknn_context_cfgs_for_rk3588(self.num_workers)
234
+ if len(rknn_context_cfgs) != self.num_workers:
235
  raise ValueError("rknn_context_cfgs must match num_workers length")
236
 
237
  self.workers: List[PipelineWorker] = []
238
  self.threads: List[threading.Thread] = []
239
  self._stop = threading.Event()
240
 
241
+ # Create worker pipelines
242
+ for i in range(self.num_workers):
243
+ w = PipelineWorker(
244
  worker_id=i,
245
  paths=self.paths,
246
+ scheduler_config=self.scheduler_config,
247
  tokenizer=self.tokenizer,
248
  rknn_context_cfg=rknn_context_cfgs[i],
249
+ use_rknn_context_cfgs=use_rknn_context_cfgs,
250
  )
251
+ self.workers.append(w)
252
 
253
+ # Start worker threads
254
+ for i in range(self.num_workers):
255
  t = threading.Thread(target=self._worker_loop, args=(i,), daemon=True)
256
  t.start()
257
  self.threads.append(t)
 
260
  def get_instance(
261
  cls,
262
  paths: ModelPaths,
263
+ num_workers: int,
264
+ queue_max: int,
265
  rknn_context_cfgs: Optional[List[dict]] = None,
266
+ use_rknn_context_cfgs: bool = True,
267
  ) -> "PipelineService":
268
  with cls._instance_lock:
269
  if cls._instance is None:
 
272
  num_workers=num_workers,
273
  queue_max=queue_max,
274
  rknn_context_cfgs=rknn_context_cfgs,
275
+ use_rknn_context_cfgs=use_rknn_context_cfgs,
276
  )
277
  return cls._instance
278
 
279
  def shutdown(self):
280
  self._stop.set()
281
+ # Drain queue with errors
282
  while True:
283
  try:
284
  job = self.q.get_nowait()
 
288
  job.fut.set_exception(RuntimeError("Service shutting down"))
289
  self.q.task_done()
290
 
291
+ def submit(self, req: GenerateRequest, timeout_s: float = 0.25) -> Future:
292
  fut: Future = Future()
293
  job = Job(req=req, fut=fut, submitted_at=time.time())
 
294
  try:
295
  self.q.put(job, timeout=timeout_s)
296
  except queue.Full:
 
310
  continue
311
 
312
  try:
313
+ png, seed = worker.run_job(job)
314
  if not job.fut.done():
315
+ job.fut.set_result((png, seed))
316
  except Exception as e:
317
  if not job.fut.done():
318
  job.fut.set_exception(e)
 
320
  self.q.task_done()
321
 
322
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
323
  # -----------------------------
324
  # FastAPI server
325
  # -----------------------------
 
 
 
326
  MODEL_ROOT = os.environ.get("MODEL_ROOT", "/models/lcm_rknn")
327
+ NUM_WORKERS = int(os.environ.get("NUM_WORKERS", "1"))
328
  QUEUE_MAX = int(os.environ.get("QUEUE_MAX", "64"))
329
+ PORT = int(os.environ.get("PORT", "4200"))
330
+ REQUEST_TIMEOUT = float(os.environ.get("DEFAULT_TIMEOUT", "120"))
331
+
332
+ # If your RKNN2Model does NOT accept multi_context/core_mask kwargs, set this to 0.
333
+ USE_RKNN_CONTEXT_CFGS = os.environ.get("USE_RKNN_CONTEXT_CFGS", "1") not in ("0", "false", "False")
334
 
335
  paths = ModelPaths(root=MODEL_ROOT)
336
 
337
+
338
+ @asynccontextmanager
339
+ async def lifespan(app: FastAPI):
340
+ # Create singleton service at startup
341
+ app.state.service = PipelineService.get_instance(
342
+ paths=paths,
343
+ num_workers=NUM_WORKERS,
344
+ queue_max=QUEUE_MAX,
345
+ rknn_context_cfgs=build_rknn_context_cfgs_for_rk3588(NUM_WORKERS),
346
+ use_rknn_context_cfgs=USE_RKNN_CONTEXT_CFGS,
347
+ )
348
+ yield
349
+ # Shutdown on app stop
350
+ app.state.service.shutdown()
351
+
352
+
353
+ app = FastAPI(lifespan=lifespan)
354
 
355
 
356
  @app.post("/generate", responses={200: {"content": {"image/png": {}}}})
357
  def generate(req: GenerateRequest):
358
+ service: PipelineService = app.state.service
359
 
360
+ fut = service.submit(req, timeout_s=0.25)
361
  try:
362
+ png_bytes, seed = fut.result(timeout=REQUEST_TIMEOUT)
363
  except Exception as e:
364
  msg = str(e)
365
  if "Queue full" in msg:
 
371
  media_type="image/png",
372
  headers={
373
  "Cache-Control": "no-store",
374
+ "X-Seed": str(seed),
375
  },
376
  )
377
 
378
+
379
  if __name__ == "__main__":
380
  import uvicorn
381
+
382
  uvicorn.run(
383
  app,
384
  host="0.0.0.0",
385
+ port=PORT,
386
+ log_config=None, # avoids logger dictConfig surprises
387
  )
rknnlcm.py CHANGED
@@ -69,6 +69,15 @@ class RKNN2Model:
69
  self.verbose_shapes = verbose_shapes
70
  self.multi_context = multi_context
71
  self.runtime_kwargs = runtime_kwargs or {}
 
 
 
 
 
 
 
 
 
72
 
73
  logger.info(f"Loading {model_dir}")
74
  start = time.time()
@@ -125,32 +134,46 @@ class RKNN2Model:
125
 
126
  raise TypeError(f"core_mask must be None, int, or str; got {type(core_mask)}")
127
 
128
- def __call__(self, **kwargs) -> List[np.ndarray]:
129
- # TODO We need deterministic ordering
130
- input_list = [self._prep(v) for v in kwargs.values()]
131
- results = self.rknnlite.inference(inputs=input_list, data_format=self.data_format)
132
-
133
- logger.info("%s out[0] shape=%s dtype=%s",
134
- self.modelname, results[0].shape, results[0].dtype)
135
 
136
- return results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
 
138
- def _prep(self, x):
139
- import numpy as np
140
- if isinstance(x, np.ndarray):
141
- # dtype safety
142
- if self.force_fp32 and x.dtype in (np.float64, np.float16):
143
- x = x.astype(np.float32, copy=False)
144
 
145
- # layout safety for 4D tensors
146
- if x.ndim == 4:
147
- if self.data_format == "nhwc" and x.shape[1] in (1, 3, 4): # likely NCHW
148
- x = x.transpose(0, 2, 3, 1)
149
- elif self.data_format == "nchw" and x.shape[-1] in (1, 3, 4): # likely NHWC
150
- x = x.transpose(0, 3, 1, 2)
151
 
152
- x = np.ascontiguousarray(x)
153
- return x
154
 
155
  class RKNN2LatentConsistencyPipeline(DiffusionPipeline):
156
 
@@ -554,7 +577,8 @@ class RKNN2LatentConsistencyPipeline(DiffusionPipeline):
554
  )
555
 
556
  # Adapted from diffusers to extend it for other runtimes than ORT
557
- timestep_dtype = np.int64
 
558
 
559
  num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order
560
 
@@ -586,12 +610,18 @@ class RKNN2LatentConsistencyPipeline(DiffusionPipeline):
586
  image = denoised
587
  has_nsfw_concept = None
588
  else:
 
589
  denoised /= self.vae_decoder.config["scaling_factor"]
590
- # it seems likes there is a strange result for using half-precision vae decoder if batchsize>1
591
- image = np.concatenate(
592
- [self.vae_decoder(latent_sample=denoised[i : i + 1])[0] for i in range(denoised.shape[0])]
593
- )
594
- # image, has_nsfw_concept = self.run_safety_checker(image)
 
 
 
 
 
595
  has_nsfw_concept = None # skip safety checker
596
 
597
  if has_nsfw_concept is None:
@@ -599,7 +629,12 @@ class RKNN2LatentConsistencyPipeline(DiffusionPipeline):
599
  else:
600
  do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept]
601
 
 
602
  image = self.postprocess(image, output_type=output_type, do_denormalize=do_denormalize)
 
 
 
 
603
  decode_time = time.time() - decode_start
604
  print(f"Decode time: {decode_time:.2f}s")
605
 
@@ -672,9 +707,9 @@ def generate_png_bytes(args):
672
  user_specified_scheduler = LCMScheduler.from_config(scheduler_config)
673
 
674
  pipe = RKNN2LatentConsistencyPipeline(
675
- text_encoder = RKNN2Model(os.path.join(args.i, "text_encoder"), data_format="nchw"), # probably irrelevant
676
- unet = RKNN2Model(os.path.join(args.i, "unet"), data_format="nhwc"), # important
677
- vae_decoder = RKNN2Model(os.path.join(args.i, "vae_decoder"), data_format="nhwc"), # important
678
  scheduler=user_specified_scheduler,
679
  tokenizer=CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch16"),
680
  )
 
69
  self.verbose_shapes = verbose_shapes
70
  self.multi_context = multi_context
71
  self.runtime_kwargs = runtime_kwargs or {}
72
+ self.modelname = os.path.basename(model_dir.rstrip("/"))
73
+
74
+
75
+ # Known-good key orders (fallback)
76
+ self.key_orders = {
77
+ "text_encoder": ("input_ids",),
78
+ "unet": ("sample", "timestep", "encoder_hidden_states", "timestep_cond"),
79
+ "vae_decoder": ("latent_sample",), # change to match your call
80
+ }
81
 
82
  logger.info(f"Loading {model_dir}")
83
  start = time.time()
 
134
 
135
  raise TypeError(f"core_mask must be None, int, or str; got {type(core_mask)}")
136
 
137
+ def __call__(self, **kwargs):
138
+ import numpy as np
 
 
 
 
 
139
 
140
+ def prep(x):
141
+ if isinstance(x, np.ndarray):
142
+ # dtype safety
143
+ if x.dtype == np.float64:
144
+ x = x.astype(np.float32, copy=False)
145
+ elif x.dtype == np.float16:
146
+ x = x.astype(np.float32, copy=False)
147
+
148
+ # layout safety: only transpose 4D tensors at RKNN boundary
149
+ if x.ndim == 4:
150
+ if self.data_format == "nhwc" and x.shape[1] in (1, 3, 4): # NCHW -> NHWC
151
+ x = x.transpose(0, 2, 3, 1)
152
+ elif self.data_format == "nchw" and x.shape[-1] in (1, 3, 4): # NHWC -> NCHW
153
+ x = x.transpose(0, 3, 1, 2)
154
+
155
+ x = np.ascontiguousarray(x)
156
+ return x
157
+
158
+ # deterministic per-model input ordering
159
+ if self.modelname == "text_encoder":
160
+ order = ("input_ids",)
161
+ elif self.modelname == "unet":
162
+ order = ("sample", "timestep", "encoder_hidden_states", "timestep_cond")
163
+ elif self.modelname == "vae_decoder":
164
+ order = ("latent_sample",)
165
+ else:
166
+ order = tuple(sorted(kwargs.keys()))
167
 
168
+ input_list = [prep(kwargs[k]) for k in order]
 
 
 
 
 
169
 
170
+ if self.modelname == "vae_decoder":
171
+ x = input_list[0]
172
+ logger.info("vae in[0] shape=%s dtype=%s contiguous=%s", x.shape, x.dtype, x.flags['C_CONTIGUOUS'])
173
+ results = self.rknnlite.inference(inputs=input_list, data_format=self.data_format)
 
 
174
 
175
+ logger.info("%s out[0] shape=%s dtype=%s", self.modelname, results[0].shape, results[0].dtype)
176
+ return results
177
 
178
  class RKNN2LatentConsistencyPipeline(DiffusionPipeline):
179
 
 
577
  )
578
 
579
  # Adapted from diffusers to extend it for other runtimes than ORT
580
+ #timestep_dtype = np.int64
581
+ timestep_dtype = np.int32
582
 
583
  num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order
584
 
 
610
  image = denoised
611
  has_nsfw_concept = None
612
  else:
613
+ t0 = time.time()
614
  denoised /= self.vae_decoder.config["scaling_factor"]
615
+ t1 = time.time()
616
+
617
+ t_inf0 = time.time()
618
+ outs = [self.vae_decoder(latent_sample=denoised[i:i+1])[0] for i in range(denoised.shape[0])]
619
+ t_inf1 = time.time()
620
+
621
+ t_cat0 = time.time()
622
+ image = np.concatenate(outs)
623
+ t_cat1 = time.time()
624
+
625
  has_nsfw_concept = None # skip safety checker
626
 
627
  if has_nsfw_concept is None:
 
629
  else:
630
  do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept]
631
 
632
+ t_post0 = time.time()
633
  image = self.postprocess(image, output_type=output_type, do_denormalize=do_denormalize)
634
+ t_post1 = time.time()
635
+
636
+ print("scale:", t1-t0, "vae_inf:", t_inf1-t_inf0, "concat:", t_cat1-t_cat0, "post:", t_post1-t_post0)
637
+
638
  decode_time = time.time() - decode_start
639
  print(f"Decode time: {decode_time:.2f}s")
640
 
 
707
  user_specified_scheduler = LCMScheduler.from_config(scheduler_config)
708
 
709
  pipe = RKNN2LatentConsistencyPipeline(
710
+ text_encoder=RKNN2Model(self.paths.text_encoder, data_format="nchw", **self.rknn_context_cfg),
711
+ unet=RKNN2Model(self.paths.unet, data_format="nhwc", **self.rknn_context_cfg),
712
+ vae_decoder=RKNN2Model(self.paths.vae_decoder, data_format="nchw", **self.rknn_context_cfg),
713
  scheduler=user_specified_scheduler,
714
  tokenizer=CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch16"),
715
  )