Spaces:
Running
Running
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import Optional | |
| from r2_storage import R2Storage | |
| ROOT_DIR = Path(__file__).resolve().parent | |
| def build_settings_prefix() -> str: | |
| cinema_id = (os.getenv("CINEMA_ID") or "").strip() or "default" | |
| return f"{cinema_id}/app_settings" | |
| def build_settings_key(file_name: str) -> str: | |
| return f"{build_settings_prefix().rstrip('/')}/{str(file_name).lstrip('/')}" | |
| def get_local_settings_path(file_name: str | Path) -> Path: | |
| return ROOT_DIR / str(file_name) | |
| def get_r2_storage_or_none() -> Optional[R2Storage]: | |
| try: | |
| return R2Storage() | |
| except Exception: | |
| return None | |
| def upload_settings_file_to_r2(file_name: str | Path) -> bool: | |
| storage = get_r2_storage_or_none() | |
| if storage is None: | |
| return False | |
| local_path = get_local_settings_path(file_name) | |
| if not local_path.exists(): | |
| return False | |
| try: | |
| storage.upload_file(local_path, build_settings_key(local_path.name), content_type="application/json") | |
| return True | |
| except Exception: | |
| return False | |
| def ensure_settings_file_from_r2(file_name: str | Path, force_download: bool = False) -> bool: | |
| storage = get_r2_storage_or_none() | |
| if storage is None: | |
| return False | |
| local_path = get_local_settings_path(file_name) | |
| key = build_settings_key(local_path.name) | |
| try: | |
| remote_exists = storage.exists(key) | |
| except Exception: | |
| return False | |
| if not remote_exists: | |
| return False | |
| if local_path.exists() and not force_download: | |
| return False | |
| try: | |
| storage.download_file(key, local_path) | |
| return True | |
| except Exception: | |
| return False | |
| def ensure_settings_file_synced(file_name: str | Path) -> str: | |
| storage = get_r2_storage_or_none() | |
| if storage is None: | |
| return "r2_unavailable" | |
| local_path = get_local_settings_path(file_name) | |
| key = build_settings_key(local_path.name) | |
| try: | |
| remote_exists = storage.exists(key) | |
| except Exception: | |
| return "r2_check_failed" | |
| local_exists = local_path.exists() | |
| if not local_exists and remote_exists: | |
| try: | |
| storage.download_file(key, local_path) | |
| return "downloaded" | |
| except Exception: | |
| return "download_failed" | |
| if local_exists and not remote_exists: | |
| try: | |
| storage.upload_file(local_path, key, content_type="application/json") | |
| return "uploaded" | |
| except Exception: | |
| return "upload_failed" | |
| if local_exists and remote_exists: | |
| return "already_synced" | |
| return "missing_both" | |
| def write_json_file(file_name: str | Path, payload: dict) -> Path: | |
| local_path = get_local_settings_path(file_name) | |
| local_path.write_text(json.dumps(payload, ensure_ascii=False, indent=4), encoding="utf-8") | |
| return local_path | |