chawin.chen commited on
Commit
1055306
·
1 Parent(s): 5f103ce
Files changed (3) hide show
  1. api_routes.py +99 -0
  2. config.py +6 -5
  3. utils.py +87 -41
api_routes.py CHANGED
@@ -178,6 +178,8 @@ from utils import (
178
  compress_image_by_file_size,
179
  convert_image_format,
180
  upload_file_to_bos,
 
 
181
  )
182
  from cleanup_scheduler import get_cleanup_status, manual_cleanup
183
 
@@ -1763,6 +1765,103 @@ async def get_available_models():
1763
  }
1764
 
1765
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1766
  @api_router.post("/restore")
1767
  @log_api_params
1768
  async def restore_old_photo(
 
178
  compress_image_by_file_size,
179
  convert_image_format,
180
  upload_file_to_bos,
181
+ ensure_bos_resources,
182
+ download_bos_directory,
183
  )
184
  from cleanup_scheduler import get_cleanup_status, manual_cleanup
185
 
 
1765
  }
1766
 
1767
 
1768
+ @api_router.post("/sync_resources", tags=["系统维护"])
1769
+ @log_api_params
1770
+ async def sync_bos_resources(
1771
+ force_download: bool = Query(False, description="是否强制重新下载已存在的文件"),
1772
+ include_background: bool = Query(
1773
+ False, description="是否同步配置中标记为后台的资源"
1774
+ ),
1775
+ bos_prefix: str | None = Query(
1776
+ None, description="自定义 BOS 前缀,例如 20220620/models"
1777
+ ),
1778
+ destination_dir: str | None = Query(
1779
+ None, description="自定义本地目录,例如 /opt/models/custom"
1780
+ ),
1781
+ background: bool = Query(
1782
+ False, description="与自定义前缀搭配使用时,是否在后台异步下载"
1783
+ ),
1784
+ ):
1785
+ """
1786
+ 手动触发 BOS 资源同步。
1787
+ - 若提供 bos_prefix 与 destination_dir,则按指定路径同步;
1788
+ - 否则根据配置的 BOS_DOWNLOAD_TARGETS 执行批量同步。
1789
+ """
1790
+ start_time = time.perf_counter()
1791
+
1792
+ if (bos_prefix and not destination_dir) or (destination_dir and not bos_prefix):
1793
+ raise HTTPException(status_code=400, detail="bos_prefix 和 destination_dir 需要同时提供")
1794
+
1795
+ if bos_prefix and destination_dir:
1796
+ dest_path = os.path.abspath(os.path.expanduser(destination_dir.strip()))
1797
+
1798
+ async def _sync_single():
1799
+ return await asyncio.to_thread(
1800
+ download_bos_directory,
1801
+ bos_prefix.strip(),
1802
+ dest_path,
1803
+ force_download=force_download,
1804
+ )
1805
+
1806
+ if background:
1807
+ async def _background_task():
1808
+ success = await _sync_single()
1809
+ if success:
1810
+ logger.info(
1811
+ "后台 BOS 下载完成: prefix=%s -> %s", bos_prefix, dest_path
1812
+ )
1813
+ else:
1814
+ logger.warning(
1815
+ "后台 BOS 下载失败: prefix=%s -> %s", bos_prefix, dest_path
1816
+ )
1817
+
1818
+ asyncio.create_task(_background_task())
1819
+ elapsed = time.perf_counter() - start_time
1820
+ return {
1821
+ "success": True,
1822
+ "force_download": force_download,
1823
+ "include_background": False,
1824
+ "bos_prefix": bos_prefix,
1825
+ "destination_dir": dest_path,
1826
+ "elapsed_seconds": round(elapsed, 3),
1827
+ "message": "后台下载任务已启动",
1828
+ }
1829
+
1830
+ success = await _sync_single()
1831
+ elapsed = time.perf_counter() - start_time
1832
+ return {
1833
+ "success": bool(success),
1834
+ "force_download": force_download,
1835
+ "include_background": False,
1836
+ "bos_prefix": bos_prefix,
1837
+ "destination_dir": dest_path,
1838
+ "elapsed_seconds": round(elapsed, 3),
1839
+ "message": "资源同步完成" if success else "资源同步失败,请查看日志",
1840
+ }
1841
+
1842
+ # 未指定前缀时,按配置批量同步
1843
+ success = await asyncio.to_thread(
1844
+ ensure_bos_resources,
1845
+ force_download,
1846
+ include_background,
1847
+ )
1848
+ elapsed = time.perf_counter() - start_time
1849
+ message = (
1850
+ "后台下载任务已启动,将在后台继续运行"
1851
+ if not include_background
1852
+ else "资源同步完成"
1853
+ )
1854
+ return {
1855
+ "success": bool(success),
1856
+ "force_download": force_download,
1857
+ "include_background": include_background,
1858
+ "elapsed_seconds": round(elapsed, 3),
1859
+ "message": message,
1860
+ "bos_prefix": None,
1861
+ "destination_dir": None,
1862
+ }
1863
+
1864
+
1865
  @api_router.post("/restore")
1866
  @log_api_params
1867
  async def restore_old_photo(
config.py CHANGED
@@ -306,11 +306,12 @@ CLEANUP_AGE_HOURS = float(os.environ.get("CLEANUP_AGE_HOURS", 12.0)) # 清理
306
 
307
  # BOS 自动同步清单:定义 BOS 路径和本地目录的映射,启动时可迭代该结构完成批量下载
308
  BOS_DOWNLOAD_TARGETS = [
309
- {
310
- "description": "明星图库数据集",
311
- "bos_prefix": BOS_CELEBRITY_PREFIX,
312
- "destination": CELEBRITY_DATASET_DIR,
313
- },
 
314
  {
315
  "description": "AI 模型权重",
316
  "bos_prefix": BOS_MODELS_PREFIX,
 
306
 
307
  # BOS 自动同步清单:定义 BOS 路径和本地目录的映射,启动时可迭代该结构完成批量下载
308
  BOS_DOWNLOAD_TARGETS = [
309
+ # {
310
+ # "description": "明星图库数据集",
311
+ # "bos_prefix": BOS_CELEBRITY_PREFIX,
312
+ # "destination": CELEBRITY_DATASET_DIR,
313
+ # "background": True,
314
+ # },
315
  {
316
  "description": "AI 模型权重",
317
  "bos_prefix": BOS_MODELS_PREFIX,
utils.py CHANGED
@@ -37,6 +37,8 @@ _BOS_CLIENT_INITIALIZED = False
37
  _BOS_CLIENT_LOCK = threading.Lock()
38
  _BOS_DOWNLOAD_LOCK = threading.Lock()
39
  _BOS_DOWNLOAD_COMPLETED = False
 
 
40
  _IMAGES_DIR_ABS = os.path.abspath(os.path.expanduser(IMAGES_DIR))
41
 
42
 
@@ -236,16 +238,24 @@ def download_bos_directory(prefix: str, destination_dir: str, *, force_download:
236
  return downloaded > 0 or skipped > 0
237
 
238
 
239
- def ensure_bos_resources(force_download: bool = False) -> bool:
 
 
 
 
 
 
 
240
  """
241
  根据配置的 BOS_DOWNLOAD_TARGETS 同步启动所需的模型与数据资源。
242
  :param force_download: 是否强制重新同步所有资源
 
243
  :return: 资源是否已准备就绪
244
  """
245
- global _BOS_DOWNLOAD_COMPLETED
246
 
247
  with _BOS_DOWNLOAD_LOCK:
248
- if _BOS_DOWNLOAD_COMPLETED and not force_download:
249
  return True
250
 
251
  targets = BOS_DOWNLOAD_TARGETS or []
@@ -255,6 +265,7 @@ def ensure_bos_resources(force_download: bool = False) -> bool:
255
  return True
256
 
257
  download_jobs = []
 
258
  for target in targets:
259
  if not isinstance(target, dict):
260
  logger.warning("无效的 BOS 下载配置项: %r", target)
@@ -263,34 +274,87 @@ def ensure_bos_resources(force_download: bool = False) -> bool:
263
  prefix = target.get("bos_prefix")
264
  destination = target.get("destination")
265
  description = target.get("description") or prefix or "<unnamed>"
 
266
 
267
  if not prefix or not destination:
268
  logger.warning("缺少必要字段,无法处理 BOS 下载配置: %r", target)
269
  continue
270
 
271
- download_jobs.append(
272
- {
273
- "description": description,
274
- "prefix": prefix,
275
- "destination": destination,
276
- }
277
- )
278
 
279
- if not download_jobs:
280
- logger.info("未找到有效的 BOS 下载目标,跳过资源同步")
281
- _BOS_DOWNLOAD_COMPLETED = True
282
- return True
283
 
284
  results = []
285
- max_workers = min(len(download_jobs), max(os.cpu_count() or 1, 1))
286
- if max_workers <= 0:
287
- max_workers = 1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
288
 
289
- with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="bos-sync") as executor:
290
- future_to_job = {}
291
- for job in download_jobs:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
292
  logger.info(
293
- "准备同步 BOS 资源: %s (prefix=%s -> %s)",
294
  job["description"],
295
  job["prefix"],
296
  job["destination"],
@@ -301,26 +365,8 @@ def ensure_bos_resources(force_download: bool = False) -> bool:
301
  job["destination"],
302
  force_download=force_download,
303
  )
304
- future_to_job[future] = job
305
-
306
- for future in as_completed(future_to_job):
307
- job = future_to_job[future]
308
- description = job["description"]
309
- try:
310
- success = future.result()
311
- except Exception as exc:
312
- logger.warning("BOS 资源同步异常: %s (%s)", description, exc)
313
- success = False
314
-
315
- if success:
316
- logger.info("BOS 资源已就绪: %s", description)
317
- else:
318
- logger.warning("BOS 资源同步失败: %s", description)
319
- results.append(success)
320
-
321
- all_ready = all(results) if results else True
322
- if all_ready:
323
- _BOS_DOWNLOAD_COMPLETED = True
324
 
325
  return all_ready
326
 
 
37
  _BOS_CLIENT_LOCK = threading.Lock()
38
  _BOS_DOWNLOAD_LOCK = threading.Lock()
39
  _BOS_DOWNLOAD_COMPLETED = False
40
+ _BOS_BACKGROUND_EXECUTOR = None
41
+ _BOS_BACKGROUND_FUTURES = []
42
  _IMAGES_DIR_ABS = os.path.abspath(os.path.expanduser(IMAGES_DIR))
43
 
44
 
 
238
  return downloaded > 0 or skipped > 0
239
 
240
 
241
+ def _get_background_executor() -> ThreadPoolExecutor:
242
+ global _BOS_BACKGROUND_EXECUTOR
243
+ if _BOS_BACKGROUND_EXECUTOR is None:
244
+ _BOS_BACKGROUND_EXECUTOR = ThreadPoolExecutor(max_workers=2, thread_name_prefix="bos-bg")
245
+ return _BOS_BACKGROUND_EXECUTOR
246
+
247
+
248
+ def ensure_bos_resources(force_download: bool = False, include_background: bool = False) -> bool:
249
  """
250
  根据配置的 BOS_DOWNLOAD_TARGETS 同步启动所需的模型与数据资源。
251
  :param force_download: 是否强制重新同步所有资源
252
+ :param include_background: 是否将标记为后台任务的目标也同步为阻塞任务
253
  :return: 资源是否已准备就绪
254
  """
255
+ global _BOS_DOWNLOAD_COMPLETED, _BOS_BACKGROUND_FUTURES
256
 
257
  with _BOS_DOWNLOAD_LOCK:
258
+ if _BOS_DOWNLOAD_COMPLETED and not force_download and not include_background:
259
  return True
260
 
261
  targets = BOS_DOWNLOAD_TARGETS or []
 
265
  return True
266
 
267
  download_jobs = []
268
+ background_jobs = []
269
  for target in targets:
270
  if not isinstance(target, dict):
271
  logger.warning("无效的 BOS 下载配置项: %r", target)
 
274
  prefix = target.get("bos_prefix")
275
  destination = target.get("destination")
276
  description = target.get("description") or prefix or "<unnamed>"
277
+ background_flag = bool(target.get("background"))
278
 
279
  if not prefix or not destination:
280
  logger.warning("缺少必要字段,无法处理 BOS 下载配置: %r", target)
281
  continue
282
 
283
+ job = {
284
+ "description": description,
285
+ "prefix": prefix,
286
+ "destination": destination,
287
+ }
 
 
288
 
289
+ if background_flag and not include_background:
290
+ background_jobs.append(job)
291
+ else:
292
+ download_jobs.append(job)
293
 
294
  results = []
295
+ if download_jobs:
296
+ max_workers = min(len(download_jobs), max(os.cpu_count() or 1, 1))
297
+ if max_workers <= 0:
298
+ max_workers = 1
299
+
300
+ with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="bos-sync") as executor:
301
+ future_to_job = {}
302
+ for job in download_jobs:
303
+ logger.info(
304
+ "准备同步 BOS 资源: %s (prefix=%s -> %s)",
305
+ job["description"],
306
+ job["prefix"],
307
+ job["destination"],
308
+ )
309
+ future = executor.submit(
310
+ download_bos_directory,
311
+ job["prefix"],
312
+ job["destination"],
313
+ force_download=force_download,
314
+ )
315
+ future_to_job[future] = job
316
+
317
+ for future in as_completed(future_to_job):
318
+ job = future_to_job[future]
319
+ description = job["description"]
320
+ try:
321
+ success = future.result()
322
+ except Exception as exc:
323
+ logger.warning("BOS 资源同步异常: %s (%s)", description, exc)
324
+ success = False
325
+
326
+ if success:
327
+ logger.info("BOS 资源已就绪: %s", description)
328
+ else:
329
+ logger.warning("BOS 资源同步失败: %s", description)
330
+ results.append(success)
331
 
332
+ all_ready = all(results) if results else True
333
+ if all_ready:
334
+ _BOS_DOWNLOAD_COMPLETED = True
335
+
336
+ if background_jobs:
337
+ executor = _get_background_executor()
338
+
339
+ def _make_callback(description: str):
340
+ def _background_done(fut):
341
+ try:
342
+ success = fut.result()
343
+ if success:
344
+ logger.info("后台 BOS 资源已就绪: %s", description)
345
+ else:
346
+ logger.warning("后台 BOS 资源同步失败: %s", description)
347
+ except Exception as exc:
348
+ logger.warning("后台 BOS 资源同步异常: %s (%s)", description, exc)
349
+ finally:
350
+ with _BOS_DOWNLOAD_LOCK:
351
+ if fut in _BOS_BACKGROUND_FUTURES:
352
+ _BOS_BACKGROUND_FUTURES.remove(fut)
353
+ return _background_done
354
+
355
+ for job in background_jobs:
356
  logger.info(
357
+ "后台同步 BOS 资源: %s (prefix=%s -> %s)",
358
  job["description"],
359
  job["prefix"],
360
  job["destination"],
 
365
  job["destination"],
366
  force_download=force_download,
367
  )
368
+ future.add_done_callback(_make_callback(job["description"]))
369
+ _BOS_BACKGROUND_FUTURES.append(future)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
370
 
371
  return all_ready
372