chenchaoyun commited on
Commit
71f998e
·
1 Parent(s): f27f78b
Files changed (2) hide show
  1. config.py +5 -0
  2. utils.py +40 -16
config.py CHANGED
@@ -219,6 +219,11 @@ else:
219
  BOS_BUCKET_NAME,
220
  ]
221
  )
 
 
 
 
 
222
  APP_SECRET_TOKEN = os.environ.get("APP_SECRET_TOKEN", "******")
223
  HOSTNAME = os.environ.get("HOSTNAME", "default-hostname")
224
  MODELS_PATH = os.path.abspath(
 
219
  BOS_BUCKET_NAME,
220
  ]
221
  )
222
+ _bos_async_env = os.environ.get("BOS_UPLOAD_ASYNC")
223
+ if _bos_async_env is None or not _bos_async_env.strip():
224
+ BOS_UPLOAD_ASYNC = True
225
+ else:
226
+ BOS_UPLOAD_ASYNC = _bos_async_env.strip().lower() in ("1", "true", "on", "yes")
227
  APP_SECRET_TOKEN = os.environ.get("APP_SECRET_TOKEN", "******")
228
  HOSTNAME = os.environ.get("HOSTNAME", "default-hostname")
229
  MODELS_PATH = os.path.abspath(
utils.py CHANGED
@@ -32,6 +32,7 @@ from config import (
32
  BOS_BUCKET_NAME,
33
  BOS_IMAGE_DIR,
34
  BOS_UPLOAD_ENABLED,
 
35
  BOS_DOWNLOAD_TARGETS,
36
  HUGGINGFACE_REPO_ID,
37
  HUGGINGFACE_REVISION,
@@ -46,6 +47,7 @@ _BOS_DOWNLOAD_LOCK = threading.Lock()
46
  _BOS_DOWNLOAD_COMPLETED = False
47
  _BOS_BACKGROUND_EXECUTOR = None
48
  _BOS_BACKGROUND_FUTURES = []
 
49
  _IMAGES_DIR_ABS = os.path.abspath(os.path.expanduser(IMAGES_DIR))
50
  _BOS_UPLOAD_CACHE = OrderedDict()
51
  _BOS_UPLOAD_CACHE_LOCK = threading.Lock()
@@ -255,6 +257,13 @@ def _get_background_executor() -> ThreadPoolExecutor:
255
  return _BOS_BACKGROUND_EXECUTOR
256
 
257
 
 
 
 
 
 
 
 
258
  def ensure_huggingface_models(force_download: bool = False) -> bool:
259
  """确保 HuggingFace 模型仓库同步到本地 MODELS_PATH。"""
260
  repo_id = (HUGGINGFACE_REPO_ID or "").strip()
@@ -443,7 +452,7 @@ def ensure_bos_resources(force_download: bool = False, include_background: bool
443
 
444
  def upload_file_to_bos(file_path: str, object_name: str | None = None) -> bool:
445
  """
446
- 将指定文件上传到 BOS,失败不会抛出异常。
447
  :param file_path: 本地文件路径
448
  :param object_name: BOS 对象名称(可选)
449
  :return: 是否成功上传
@@ -465,8 +474,7 @@ def upload_file_to_bos(file_path: str, object_name: str | None = None) -> bool:
465
  except OSError:
466
  return False
467
 
468
- client = _get_bos_client()
469
- if client is None:
470
  return False
471
 
472
  # 生成对象名称
@@ -495,19 +503,35 @@ def upload_file_to_bos(file_path: str, object_name: str | None = None) -> bool:
495
  logger.info("文件已同步至 BOS(跳过重复上传,耗时 %.1f ms): %s", elapsed_ms, object_key)
496
  return True
497
 
498
- try:
499
- client.upload_file(expanded_path, BOS_BUCKET_NAME, object_key)
500
- elapsed_ms = (time.perf_counter() - start_time) * 1000
501
- logger.info("文件已同步至 BOS(耗时 %.1f ms): %s", elapsed_ms, object_key)
502
- with _BOS_UPLOAD_CACHE_LOCK:
503
- _BOS_UPLOAD_CACHE[cache_key] = cache_signature
504
- _BOS_UPLOAD_CACHE.move_to_end(cache_key)
505
- while len(_BOS_UPLOAD_CACHE) > _BOS_UPLOAD_CACHE_MAX:
506
- _BOS_UPLOAD_CACHE.popitem(last=False)
507
- return True
508
- except (ClientError, BotoCoreError, Exception) as e:
509
- logger.warning(f"上传到 BOS 失败({object_key}): {e}")
510
- return False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
511
 
512
 
513
  def delete_file_from_bos(file_path: str | None = None,
 
32
  BOS_BUCKET_NAME,
33
  BOS_IMAGE_DIR,
34
  BOS_UPLOAD_ENABLED,
35
+ BOS_UPLOAD_ASYNC,
36
  BOS_DOWNLOAD_TARGETS,
37
  HUGGINGFACE_REPO_ID,
38
  HUGGINGFACE_REVISION,
 
47
  _BOS_DOWNLOAD_COMPLETED = False
48
  _BOS_BACKGROUND_EXECUTOR = None
49
  _BOS_BACKGROUND_FUTURES = []
50
+ _BOS_UPLOAD_EXECUTOR = None
51
  _IMAGES_DIR_ABS = os.path.abspath(os.path.expanduser(IMAGES_DIR))
52
  _BOS_UPLOAD_CACHE = OrderedDict()
53
  _BOS_UPLOAD_CACHE_LOCK = threading.Lock()
 
257
  return _BOS_BACKGROUND_EXECUTOR
258
 
259
 
260
+ def _get_upload_executor() -> ThreadPoolExecutor:
261
+ global _BOS_UPLOAD_EXECUTOR
262
+ if _BOS_UPLOAD_EXECUTOR is None:
263
+ _BOS_UPLOAD_EXECUTOR = ThreadPoolExecutor(max_workers=2, thread_name_prefix="bos-upload")
264
+ return _BOS_UPLOAD_EXECUTOR
265
+
266
+
267
  def ensure_huggingface_models(force_download: bool = False) -> bool:
268
  """确保 HuggingFace 模型仓库同步到本地 MODELS_PATH。"""
269
  repo_id = (HUGGINGFACE_REPO_ID or "").strip()
 
452
 
453
  def upload_file_to_bos(file_path: str, object_name: str | None = None) -> bool:
454
  """
455
+ 将指定文件上传到 BOS,失败不会抛出异常。默认使用后台线程异步上传。
456
  :param file_path: 本地文件路径
457
  :param object_name: BOS 对象名称(可选)
458
  :return: 是否成功上传
 
474
  except OSError:
475
  return False
476
 
477
+ if _get_bos_client() is None:
 
478
  return False
479
 
480
  # 生成对象名称
 
503
  logger.info("文件已同步至 BOS(跳过重复上传,耗时 %.1f ms): %s", elapsed_ms, object_key)
504
  return True
505
 
506
+ def _do_upload(mode_label: str) -> bool:
507
+ client_inner = _get_bos_client()
508
+ if client_inner is None:
509
+ return False
510
+ upload_start = time.perf_counter()
511
+ try:
512
+ client_inner.upload_file(expanded_path, BOS_BUCKET_NAME, object_key)
513
+ elapsed_ms = (time.perf_counter() - upload_start) * 1000
514
+ logger.info("文件已同步至 BOS(%s,耗时 %.1f ms): %s", mode_label, elapsed_ms, object_key)
515
+ with _BOS_UPLOAD_CACHE_LOCK:
516
+ _BOS_UPLOAD_CACHE[cache_key] = cache_signature
517
+ _BOS_UPLOAD_CACHE.move_to_end(cache_key)
518
+ while len(_BOS_UPLOAD_CACHE) > _BOS_UPLOAD_CACHE_MAX:
519
+ _BOS_UPLOAD_CACHE.popitem(last=False)
520
+ return True
521
+ except (ClientError, BotoCoreError, Exception) as exc:
522
+ logger.warning("上传到 BOS 失败(%s,%s): %s", object_key, mode_label, exc)
523
+ return False
524
+
525
+ if BOS_UPLOAD_ASYNC:
526
+ try:
527
+ executor = _get_upload_executor()
528
+ executor.submit(_do_upload, "异步")
529
+ return True
530
+ except Exception as exc:
531
+ logger.warning("调度 BOS 异步上传失败,将改为同步执行: %s", exc)
532
+ return _do_upload("同步")
533
+
534
+ return _do_upload("同步")
535
 
536
 
537
  def delete_file_from_bos(file_path: str | None = None,