shadowbrain / scripts /tools /cloud_tool.py
taemin1980's picture
πŸ”± Imperial Deployment: Shadow Brain Core ignition
e2e734e verified
Raw
History Blame Contribute Delete
43.4 kB
import os
import sys
import subprocess
import json
import re
import datetime
try:
from google import genai
from google.genai import types
except ImportError:
genai = None
types = None
from rclone_installer import RCLONE_EXE, check_and_install_rclone
import quota_manager
from quota_manager import ImperialQuotaManager, QuotaExceededError
# Remote Names
GD_REMOTE = "gdrive"
R2_REMOTE = "r2"
# Folders
GD_FOLDER = "Guardians_Workspace/Aizen_Forge"
R2_BUCKET = "cache" # Default bucket name for R2
# ====== R2 Multi-Account Registry (Imperial Core) ======
R2_ACCOUNTS = {
"live": {
"id": "live",
"label": "🟒 Live (TaeminGames)",
"account_id": "Imperial:R2_LIVE_ACCOUNT",
"bucket": "cache",
"public_url": "https://pub-9319df3ac5d34263a673eaa4ff3621d3.r2.dev",
"access_key_id": "Imperial:R2_LIVE_ACCESS_KEY_ID",
"secret_access_key": "Imperial:R2_LIVE_SECRET_ACCESS_KEY",
"lifecycle_access_key_id": "Imperial:R2_LIVE_LIFECYCLE_ACCESS_KEY_ID",
"lifecycle_secret_access_key": "Imperial:R2_LIVE_LIFECYCLE_SECRET_ACCESS_KEY"
},
"vault": {
"id": "vault",
"label": "🟑 Vault (Jarvis)",
"account_id": "Imperial:R2_VAULT_ACCOUNT",
"bucket": "cache",
"public_url": "https://pub-33ef6fd6e25b486c9c703182b9782a13.r2.dev",
"access_key_id": "Imperial:R2_VAULT_ACCESS_KEY_ID",
"secret_access_key": "Imperial:R2_VAULT_SECRET_ACCESS_KEY",
"lifecycle_access_key_id": "Imperial:R2_VALUT_LIFECYCLE_ACCESS_KEY_ID",
"lifecycle_secret_access_key": "Imperial:R2_VALUT_LIFECYCLE_SECRET_ACCESS_KEY"
}
}
# Threshold for large files (100MB)
LARGE_FILE_THRESHOLD = 100 * 1024 * 1024
# Local R2 Root
LOCAL_R2_ROOT = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "R2")
def get_account_id(public_url):
"""Extracts the unique ID from R2 Public URL."""
if not public_url:
return "default"
# Matches 'pub-[ID].r2.dev'
match = re.search(r"pub-([a-z0-9]+)", public_url)
if match:
return match.group(1)
return "default"
# πŸ”± [Imperial Aegis Cache] BW 자격증λͺ… 인메λͺ¨λ¦¬ μΊμ‹œ
# - ν”„λ‘œμ„ΈμŠ€ μž¬μ‹œμž‘ μ „κΉŒμ§€ 동일 ν‚€λŠ” BW CLI 없이 μ¦‰μ‹œ λ°˜ν™˜
# - μ„œλ²„ 기동 μ‹œ _prewarm_credentials()둜 λͺ¨λ“  ν‚€λ₯Ό μ„ μ œ λ‘œλ“œ
_BW_CREDENTIAL_CACHE: dict = {}
# πŸ”± [Imperial R2 List Cache] 파일 λͺ©λ‘ 응닡 μΊμ‹œ (TTL: 30초)
# - 동일 bucket+path μž¬μš”μ²­μ€ API 호좜 없이 μ¦‰μ‹œ λ°˜ν™˜
import time as _time
_R2_LIST_CACHE: dict = {} # key: (bucket, path) β†’ {'files': [...], 'ts': float}
_R2_LIST_TTL = 30 # 초
# πŸ”± [Imperial Cloud Guard] Determine if running in cloud
IS_RENDER = os.environ.get('RENDER') == 'true'
IS_CLOUD_RUN = os.environ.get('K_SERVICE') is not None
IS_CLOUD = IS_RENDER or IS_CLOUD_RUN
def _get_bw_credential(name):
"""Fetches a credential. On Cloud (Render/GCP), skips BW and uses Env only.
κ²°κ³ΌλŠ” 인메λͺ¨λ¦¬ μΊμ‹œ(_BW_CREDENTIAL_CACHE)에 μ €μž₯λ©λ‹ˆλ‹€.
"""
if not isinstance(name, str) or "Imperial:" not in str(name):
return name
if name in _BW_CREDENTIAL_CACHE:
return _BW_CREDENTIAL_CACHE[name]
# Standardize key for Env lookup
env_key = name.replace("Imperial:", "").replace("Imperial: ", "").strip()
env_val = os.environ.get(env_key)
if env_val and "Imperial:" not in str(env_val):
_BW_CREDENTIAL_CACHE[name] = env_val
return env_val
# πŸ”± Cloud Environment: BW CLI is not available.
if IS_CLOUD:
print(f"[πŸ”± Cloud Alert] Missing Environment Variable: {env_key}")
return None
# Standardize both ways: 'Imperial: KEY' and 'Imperial:KEY'
name_no_space = name.replace("Imperial: ", "Imperial:").strip()
name_with_space = name_no_space.replace("Imperial:", "Imperial: ").strip()
candidates = [name, name_no_space, name_with_space]
unique_candidates = []
for c in candidates:
if c not in unique_candidates:
unique_candidates.append(c)
for cand in unique_candidates:
try:
result = subprocess.run(
f'bw get password "{cand}" --raw',
capture_output=True, text=True, check=True, shell=True, timeout=30
)
val = result.stdout.strip()
if val:
for key in unique_candidates:
_BW_CREDENTIAL_CACHE[key] = val
return val
except:
continue
print(f"[Aegis Error] Failed to fetch {name} from BW/Env. (Key: {env_key})")
return None
def clear_bw_credential_cache():
"""πŸ”± 자격증λͺ… μΊμ‹œλ₯Ό μˆ˜λ™μœΌλ‘œ λΉ„μ›λ‹ˆλ‹€. BW λΉ„λ°€λ²ˆν˜Έ λ³€κ²½ ν›„ ν˜ΈμΆœν•˜μ„Έμš”."""
global _BW_CREDENTIAL_CACHE, _R2_LIST_CACHE, _R2_STATS_CACHE
count = len(_BW_CREDENTIAL_CACHE)
_BW_CREDENTIAL_CACHE.clear()
_R2_LIST_CACHE.clear()
_R2_STATS_CACHE.clear()
print(f"[Aegis Cache] {count}개 자격증λͺ… + R2 λͺ©λ‘/μƒνƒœ μΊμ‹œ μ „λ©΄ μ •ν™” μ™„λ£Œ.")
def clear_r2_list_cache(account_id=None, bucket=None, remote_path=None):
"""πŸ”± νŠΉμ • ν˜Ήμ€ 전체 R2 파일 λͺ©λ‘ μΊμ‹œλ₯Ό μ¦‰μ‹œ μ œκ±°ν•©λ‹ˆλ‹€."""
global _R2_LIST_CACHE, _R2_STATS_CACHE
# [제ꡭ κ·œμ•½] λΆ€λΆ„ μ‚­μ œ 둜직의 경둜 포맷 뢈일치 버그λ₯Ό λ°©μ§€ν•˜κΈ° μœ„ν•΄,
# μ—…λ‘œλ“œ/μ‚­μ œ λ°œμƒ μ‹œ μΊμ‹œ 전체λ₯Ό 무쑰건 μ •ν™”(Purge)ν•©λ‹ˆλ‹€.
_R2_LIST_CACHE.clear()
_R2_STATS_CACHE.clear()
return
def _prewarm_credentials():
"""πŸ”± μ„œλ²„ 기동 μ‹œ λͺ¨λ“  R2 κ³„μ •μ˜ 자격증λͺ…을 μ„ μ œ λ‘œλ“œν•©λ‹ˆλ‹€."""
# Cloudμ—μ„œλŠ” μ–΄μ°¨ν”Ό Envμ—μ„œ κ°€μ Έμ˜€λ―€λ‘œ Pre-warm이 λΆˆν•„μš”ν•˜κ±°λ‚˜ μ—λŸ¬ λ©”μ‹œμ§€λ§Œ λ‚¨λ°œν•¨
if IS_CLOUD:
return
all_keys = set()
for acc in R2_ACCOUNTS.values():
for k in ["access_key_id", "secret_access_key", "account_id"]:
key_val = acc.get(k, "")
if key_val and "Imperial:" in str(key_val):
all_keys.add(key_val)
loaded = 0
for key in all_keys:
if key not in _BW_CREDENTIAL_CACHE:
val = _get_bw_credential(key)
if val and "Imperial:" not in str(val):
loaded += 1
if loaded > 0:
print(f"[Aegis Pre-warm] R2 자격증λͺ… {loaded}/{len(all_keys)}개 μ„ μ œ λ‘œλ“œ μ™„λ£Œ.")
# πŸ”₯ μ„œλ²„/λͺ¨λ“ˆ 기동 μ‹œ μžλ™ Pre-warm (둜컬 μ „μš©)
def _prewarm_async():
if IS_CLOUD:
return
import threading
t = threading.Thread(target=_prewarm_credentials, name="aegis-prewarm", daemon=True)
t.start()
_prewarm_async()
def get_rclone_args(account_config):
"""
Returns base rclone flags for a specific R2 account to avoid config file dependency.
"""
access_key = _get_bw_credential(account_config.get("access_key_id"))
secret_key = _get_bw_credential(account_config.get("secret_access_key"))
account_id = _get_bw_credential(account_config.get("account_id"))
return [
"--s3-provider", "Cloudflare",
"--s3-access-key-id", access_key,
"--s3-secret-access-key", secret_key,
"--s3-endpoint", f"https://{account_id}.r2.cloudflarestorage.com",
]
def run_rclone(args, account_config=None):
"""Executes rclone with the given arguments, always returning a standardized dict."""
if not os.path.exists(RCLONE_EXE):
return {"success": False, "output": None, "error": "rclone not installed"}
base_args = [RCLONE_EXE]
if account_config:
base_args.extend(get_rclone_args(account_config))
cmd = base_args + args
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding='utf-8')
return {"success": True, "output": result.stdout, "error": None}
except subprocess.CalledProcessError as e:
print(f"[Cloud Error] rclone failed: {e.stderr}")
return {"success": False, "output": None, "error": e.stderr}
except Exception as e:
print(f"[Cloud Error] Unexpected error: {e}")
return {"success": False, "output": None, "error": str(e)}
_R2_LIFECYCLE_CACHE: dict = {} # key: (account_id, bucket) β†’ {'rules': [...], 'ts': float}
_R2_LIFECYCLE_TTL = 60 # 초
def _get_expiration_rules(account_config, bucket_name, account_id):
"""Lifecycle κ·œμΉ™ 쀑 Expiration이 μžˆλŠ” (prefix, days) λͺ©λ‘μ„ μΊμ‹œμ™€ ν•¨κ»˜ 쑰회.
Admin(Lifecycle) 자격증λͺ…이 μ—†κ±°λ‚˜ μ‹€νŒ¨ν•˜λ©΄ 쑰용히 빈 λͺ©λ‘μ„ λ°˜ν™˜ν•œλ‹€(파일 λͺ©λ‘ μžμ²΄λŠ” μ‹€νŒ¨μ‹œν‚€μ§€ μ•ŠμŒ)."""
cache_key = (account_id, bucket_name)
cached = _R2_LIFECYCLE_CACHE.get(cache_key)
if cached and (_time.time() - cached['ts']) < _R2_LIFECYCLE_TTL:
return cached['rules']
rules = []
try:
import boto3
from botocore.config import Config
lc_access = _get_bw_credential(account_config.get("lifecycle_access_key_id")) or _get_bw_credential(account_config.get("access_key_id"))
lc_secret = _get_bw_credential(account_config.get("lifecycle_secret_access_key")) or _get_bw_credential(account_config.get("secret_access_key"))
s3_admin = boto3.client(
service_name='s3',
endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com',
aws_access_key_id=lc_access,
aws_secret_access_key=lc_secret,
region_name='auto',
config=Config(signature_version='s3v4')
)
resp = s3_admin.get_bucket_lifecycle_configuration(Bucket=bucket_name)
for r in resp.get("Rules", []) or []:
exp = r.get("Expiration")
if not exp or "Days" not in exp:
continue
flt = r.get("Filter") or {}
r_prefix = flt.get("Prefix")
if r_prefix is None and isinstance(flt.get("And"), dict):
r_prefix = flt["And"].get("Prefix")
if r_prefix is None:
r_prefix = r.get("Prefix", "")
if r.get("Status") != "Enabled":
continue
rules.append({"prefix": r_prefix or "", "days": exp["Days"]})
except Exception as e:
print(f"[R2 Lifecycle] 만료 κ·œμΉ™ 쑰회 μ‹€νŒ¨(λ¬΄μ‹œ): {e}")
rules = []
_R2_LIFECYCLE_CACHE[cache_key] = {'rules': rules, 'ts': _time.time()}
return rules
def _match_expiration_rule(key, rules):
"""κ°€μž₯ 길게 λ§€μΉ­λ˜λŠ” prefix κ·œμΉ™μ„ μ°Ύμ•„ λ°˜ν™˜(μ—†μœΌλ©΄ None)."""
best = None
for r in rules:
if key.startswith(r["prefix"]):
if best is None or len(r["prefix"]) > len(best["prefix"]):
best = r
return best
def _list_r2_files_boto3(account_config, remote_path="", bucket_name="cache"):
"""boto3λ₯Ό μ‚¬μš©ν•œ R2 파일 λͺ©λ‘ 쑰회 (rclone λ―Έμ„€μΉ˜ 폴백, λΈŒλΌμš°μ§•μš©)."""
import boto3
access_key = _get_bw_credential(account_config.get("access_key_id"))
secret_key = _get_bw_credential(account_config.get("secret_access_key"))
account_id = _get_bw_credential(account_config.get("account_id"))
from botocore.config import Config
# πŸ”± [Imperial Debug] λ§ˆμŠ€ν‚Ήλœ ID 좜λ ₯ν•˜μ—¬ ν™˜κ²½λ³€μˆ˜ λ‘œλ“œ μ—¬λΆ€ 확인
masked_id = f"{account_id[:4]}...{account_id[-4:]}" if (account_id and len(account_id) > 8) else str(account_id)
print(f"[R2 Debug] Using Account ID: {masked_id}")
s3 = boto3.client(
service_name='s3',
endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name='auto',
config=Config(signature_version='s3v4')
)
expiration_rules = _get_expiration_rules(account_config, bucket_name, account_id)
prefix = remote_path.strip("/") + "/" if remote_path else ""
kwargs = {"Bucket": bucket_name, "Delimiter": "/"}
if prefix:
kwargs["Prefix"] = prefix
files = []
while True:
resp = s3.list_objects_v2(**kwargs)
# 디렉토리(곡톡 접두사)
for cp in resp.get("CommonPrefixes") or []:
name = cp["Prefix"].rstrip("/").split("/")[-1]
files.append({"Path": cp["Prefix"], "Name": name, "Size": 0, "IsDir": True, "MimeType": "inode/directory", "ModTime": None, "ExpireDays": None})
# 파일
for obj in resp.get("Contents") or []:
key = obj["Key"]
if key == prefix: # 폴더 자체 μŠ€ν‚΅
continue
name = key.split("/")[-1]
last_modified = obj["LastModified"]
mod_time = last_modified.strftime("%Y-%m-%dT%H:%M:%S.000Z")
expire_days = None
rule = _match_expiration_rule(key, expiration_rules)
if rule:
import datetime as _dt
expire_at = last_modified + _dt.timedelta(days=rule["days"])
now = _dt.datetime.now(last_modified.tzinfo)
expire_days = max(0, (expire_at - now).days)
files.append({"Path": key, "Name": name, "Size": obj["Size"], "IsDir": False, "MimeType": "application/octet-stream", "ModTime": mod_time, "ExpireDays": expire_days})
if resp.get("IsTruncated"):
kwargs["ContinuationToken"] = resp["NextContinuationToken"]
else:
break
return files
def _list_r2_all_files_boto3(account_config):
"""boto3λ₯Ό μ‚¬μš©ν•œ R2 전체 파일 λͺ©λ‘ μž¬κ·€μ  쑰회 (rclone λ―Έμ„€μΉ˜μš©)."""
import boto3
access_key = _get_bw_credential(account_config.get("access_key_id"))
secret_key = _get_bw_credential(account_config.get("secret_access_key"))
account_id = _get_bw_credential(account_config.get("account_id"))
bucket_name = account_config.get("bucket", "cache")
s3 = boto3.client(
service_name='s3',
endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
file_paths = []
continuation_token = None
while True:
list_args = {"Bucket": bucket_name}
if continuation_token:
list_args["ContinuationToken"] = continuation_token
resp = s3.list_objects_v2(**list_args)
for obj in resp.get("Contents", []):
key = obj["Key"]
if not key.endswith('/'): # Skip directory placeholders
file_paths.append(key)
if resp.get("IsTruncated"):
continuation_token = resp.get("NextContinuationToken")
else:
break
return file_paths
def _download_from_r2_boto3(account_config, files_to_download, local_base_dir, progress_callback=None):
"""boto3λ₯Ό μ‚¬μš©ν•œ R2 파일 λ‹€μš΄λ‘œλ“œ (rclone λ―Έμ„€μΉ˜μš©)."""
import boto3
access_key = _get_bw_credential(account_config.get("access_key_id"))
secret_key = _get_bw_credential(account_config.get("secret_access_key"))
account_id = _get_bw_credential(account_config.get("account_id"))
bucket_name = account_config.get("bucket", "cache")
s3 = boto3.client(
service_name='s3',
endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
downloaded = 0
errors = 0
for i, file_path in enumerate(files_to_download):
local_path = os.path.join(local_base_dir, file_path.replace('/', os.sep))
os.makedirs(os.path.dirname(local_path), exist_ok=True)
try:
s3.download_file(bucket_name, file_path, local_path)
downloaded += 1
except Exception as e:
errors += 1
print(f"[R2 Download Error (boto3)] {file_path}: {e}")
if progress_callback:
progress_callback(i + 1, len(files_to_download))
return {"success": True, "downloaded": downloaded, "errors": errors}
def list_r2_files(account_config, remote_path="", bucket_name="cache"):
"""Lists files in R2. rclone μš°μ„ , λ―Έμ„€μΉ˜ μ‹œ boto3 폴백.
πŸ”± 응닡 μΊμ‹œ: 동일 bucket+pathλŠ” 30μ΄ˆκ°„ 재쑰회 없이 μ¦‰μ‹œ λ°˜ν™˜.
"""
cache_key = (account_config.get("account_id", "default"), bucket_name, remote_path)
now = _time.time()
cached = _R2_LIST_CACHE.get(cache_key)
if cached and (now - cached['ts']) < _R2_LIST_TTL:
return cached['files']
# β—μ œκ΅­ κ·œμ•½: rclone μ‚¬μš© 쀑지, boto3 100% μ „μš©
try:
files = _list_r2_files_boto3(account_config, remote_path, bucket_name)
except Exception as e:
raise RuntimeError(f"R2 Listing Failed (boto3 fallback): {e}")
if files is not None:
_R2_LIST_CACHE[cache_key] = {'files': files, 'ts': now}
return files
def get_r2_bucket_size(account_config, bucket_name="cache"):
"""Returns {'count': N, 'bytes': B} for the ENTIRE bucket (recursive).
πŸ”± Uses boto3 100% (rclone obsoleted).
이 ν•¨μˆ˜λŠ” ν•˜νŠΈλΉ„νŠΈ μ‚¬μš©λŸ‰ ν‘œμ‹œ μ „μš©μž…λ‹ˆλ‹€. λΈŒλΌμš°μ € UIμ—λŠ” list_r2_filesλ₯Ό μ‚¬μš©ν•˜μ‹­μ‹œμ˜€.
"""
# β—μ œκ΅­ κ·œμ•½: rclone μ‚¬μš© 쀑지, boto3 μž¬κ·€
try:
import boto3
access_key = _get_bw_credential(account_config.get("access_key_id"))
secret_key = _get_bw_credential(account_config.get("secret_access_key"))
account_id = _get_bw_credential(account_config.get("account_id"))
if not all([access_key, secret_key, account_id]):
missing = []
if not access_key: missing.append("ACCESS_KEY_ID")
if not secret_key: missing.append("SECRET_ACCESS_KEY")
if not account_id: missing.append("ACCOUNT_ID")
raise RuntimeError(f"Missing R2 Credentials in Environment: {', '.join(missing)}")
from botocore.config import Config
s3 = boto3.client(
service_name='s3',
endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name='auto',
config=Config(signature_version='s3v4')
)
total_bytes = 0
total_count = 0
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket_name):
for obj in page.get("Contents", []):
if not obj["Key"].endswith("/"): # 폴더 자체 μ œμ™Έ
total_bytes += obj["Size"]
total_count += 1
formatted = f"{total_bytes / (1024*1024):.2f} MB" if total_bytes < 1024**3 else f"{total_bytes / (1024**3):.2f} GB"
return {"count": total_count, "bytes": total_bytes, "formatted": formatted}
except Exception as e:
raise RuntimeError(f"get_r2_bucket_size failed: {e}")
def sync_r2_with_local(account_config, direction="to_cloud", bucket_name="cache"):
"""
Syncs local folder with R2.
Local path is automatically derived from Account ID.
"""
account_id = get_account_id(account_config.get("public_url"))
local_path = os.path.join(LOCAL_R2_ROOT, account_id)
os.makedirs(local_path, exist_ok=True)
remote_path = f":s3:{bucket_name}"
if direction == "to_cloud":
args = ["copy", local_path, remote_path, "--progress"]
elif direction == "from_cloud":
args = ["copy", remote_path, local_path, "--progress"]
else:
# Dangerous: sync can delete files. We use 'copy' for safety unless 'sync' is forced.
args = ["copy", local_path, remote_path, "--progress"]
result = run_rclone(args, account_config)
if not result["success"]:
raise RuntimeError(f"R2 Sync Failed: {result['error']}")
return result
def _list_files_boto3(storage="r2", remote_path=""):
"""boto3λ₯Ό μ‚¬μš©ν•œ 파일 λͺ©λ‘ 쑰회 (rclone λ―Έμ„€μΉ˜μš©). gdriveλŠ” ν˜„μž¬ 미지원(R2 μ „μš©)."""
if storage == "gdrive":
return {"success": False, "error": "boto3 fallback only supports R2. gdrive needs rclone."}
# R2 λΈŒλΌμš°μ§•μš© (live 계정 κΈ°λ³Έκ°’ μ‚¬μš©)
config = R2_ACCOUNTS.get("live")
return {"success": True, "files": _list_r2_files_boto3(config, remote_path), "storage": storage}
def _upload_file_boto3(local_path, storage="r2", remote_name=None):
"""boto3λ₯Ό μ‚¬μš©ν•œ 파일 μ—…λ‘œλ“œ (rclone λ―Έμ„€μΉ˜μš©). gdriveλŠ” ν˜„μž¬ 미지원."""
import boto3
if storage == "gdrive":
return {"success": False, "error": "boto3 fallback only supports R2. gdrive needs rclone."}
config = R2_ACCOUNTS.get("live")
access_key = _get_bw_credential(config.get("access_key_id"))
secret_key = _get_bw_credential(config.get("secret_access_key"))
account_id = _get_bw_credential(config.get("account_id"))
bucket_name = config.get("bucket", "cache")
s3 = boto3.client(
service_name='s3',
endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
target_key = remote_name or os.path.basename(local_path)
try:
s3.upload_file(local_path, bucket_name, target_key)
return {"success": True, "output": f"Uploaded {target_key} to R2", "error": None}
except Exception as e:
return {"success": False, "output": None, "error": str(e)}
def _download_file_boto3(remote_name, local_path, storage="r2"):
"""boto3λ₯Ό μ‚¬μš©ν•œ 파일 λ‹€μš΄λ‘œλ“œ (rclone λ―Έμ„€μΉ˜μš©). gdriveλŠ” ν˜„μž¬ 미지원."""
import boto3
if storage == "gdrive":
return {"success": False, "error": "boto3 fallback only supports R2. gdrive needs rclone."}
config = R2_ACCOUNTS.get("live")
access_key = _get_bw_credential(config.get("access_key_id"))
secret_key = _get_bw_credential(config.get("secret_access_key"))
account_id = _get_bw_credential(config.get("account_id"))
bucket_name = config.get("bucket", "cache")
s3 = boto3.client(
service_name='s3',
endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
try:
s3.download_file(bucket_name, remote_name, local_path)
return {"success": True, "output": f"Downloaded {remote_name} from R2", "error": None}
except Exception as e:
return {"success": False, "output": None, "error": str(e)}
def list_files(storage="gdrive"):
"""Lists files in the specified cloud storage."""
remote = GD_REMOTE if storage == "gdrive" else R2_REMOTE
folder = GD_FOLDER if storage == "gdrive" else ""
remote_path = f"{remote}:{folder}"
if os.path.exists(RCLONE_EXE):
result = run_rclone(["lsjson", remote_path, "--max-depth", "1"])
if result["success"] and isinstance(result["output"], str):
try:
files = json.loads(result["output"])
return {"success": True, "files": files, "storage": storage}
except:
return {"success": True, "raw": result["output"], "storage": storage}
if storage == "gdrive":
return result
# Fallback to boto3 for R2
return _list_files_boto3(storage)
def upload_file(local_path, storage=None, remote_name=None):
"""
Uploads a local file to cloud.
If storage is None, it suggests R2 for large files (>100MB).
"""
if not os.path.exists(local_path):
return {"success": False, "error": f"Local file not found: {local_path}"}
file_size = os.path.getsize(local_path)
if storage is None:
storage = "r2" if file_size > LARGE_FILE_THRESHOLD else "gdrive"
print(f"πŸ’‘ File size: {file_size/1024/1024:.1f}MB. Auto-selecting {storage}...")
remote = GD_REMOTE if storage == "gdrive" else R2_REMOTE
folder = GD_FOLDER if storage == "gdrive" else R2_BUCKET
target_name = remote_name or os.path.basename(local_path)
remote_path = f"{remote}:{folder}/{target_name}"
print(f"⏳ Uploading to {storage}: {target_name}...")
if os.path.exists(RCLONE_EXE):
return run_rclone(["copyto", local_path, remote_path, "--progress"])
return _upload_file_boto3(local_path, storage, target_name)
def generate_image_google(prompt, model_id="imagen-4.0-generate-001", aspect_ratio="1:1", output_file="generated_image.png"):
"""
Generates an image using Google GenAI SDK 2.0+ (Imagen 4.0 Pro).
This is the dedicated high-quality image generation model.
"""
if genai is None:
return {"success": False, "error": "google-genai SDK 2.0+ is not installed."}
api_key = _get_bw_credential("Imperial: GEMINI_API_KEY")
if not api_key or "Imperial:" in str(api_key):
return {"success": False, "error": "Failed to fetch GEMINI_API_KEY from Bitwarden."}
try:
client = genai.Client(api_key=api_key)
# πŸ”± Imperial Quota Guard: 호좜 μ „ μΏΌν„° 확인
try:
ImperialQuotaManager.validate_quota(model_id)
except QuotaExceededError as qe:
return {"success": False, "error": str(qe)}
# Standard Imagen 4.0 Pro Call (using predict-style or generate_images)
full_model_id = f"models/{model_id}" if not model_id.startswith("models/") else model_id
response = client.models.generate_images(
model=full_model_id,
prompt=prompt,
config=types.GenerateImagesConfig(
number_of_images=1,
aspect_ratio=aspect_ratio,
output_mime_type="image/png"
)
)
if response.generated_images:
# Track Usage in DB
ImperialQuotaManager.track_usage(model_id)
img = response.generated_images[0]
image_data = None
if hasattr(img, 'image') and hasattr(img.image, 'image_bytes'):
image_data = img.image.image_bytes
elif hasattr(img, 'image_bytes'):
image_data = img.image_bytes
if image_data:
with open(output_file, "wb") as f:
f.write(image_data)
return {
"success": True,
"output_file": output_file,
"model": full_model_id
}
return {"success": False, "error": "No images were generated in response."}
except Exception as e:
return {"success": False, "error": str(e)}
def generate_image_nano_banana(prompt, input_image_path=None, model_id="gemini-2.5-flash-image", output_file="nano_banana_output.png"):
"""
Generates or edits an image using 'Nano Banana' (Gemini generateContent flow).
Supports: gemini-2.5-flash-image, gemini-3.1-flash-image-preview, gemini-3-pro-image-preview
"""
if genai is None:
return {"success": False, "error": "google-genai SDK 2.0+ is not installed."}
api_key = _get_bw_credential("Imperial: GEMINI_API_KEY")
if not api_key:
return {"success": False, "error": "Failed to fetch GEMINI_API_KEY."}
try:
client = genai.Client(api_key=api_key)
contents = []
parts = [{"text": prompt}]
if input_image_path and os.path.exists(input_image_path):
with open(input_image_path, "rb") as f:
image_bytes = f.read()
parts.append(types.Part.from_bytes(data=image_bytes, mime_type="image/png"))
contents.append(types.Content(role="user", parts=parts))
# Nano Banana (Gemini 2.5 Flash Image) specific call
response = client.models.generate_content(
model=model_id,
contents=contents,
config=types.GenerateContentConfig(
response_modalities=['IMAGE']
)
)
# Extract image from multimodal response
for part in response.candidates[0].content.parts:
if part.inline_data:
with open(output_file, "wb") as f:
f.write(part.inline_data.data)
return {"success": True, "output_file": output_file, "model": model_id}
return {"success": False, "error": "No image data found in Nano Banana response."}
except Exception as e:
return {"success": False, "error": str(e)}
def download_file(remote_name, local_path, storage="gdrive"):
"""Downloads a file from cloud to local."""
remote = GD_REMOTE if storage == "gdrive" else R2_REMOTE
folder = GD_FOLDER if storage == "gdrive" else R2_BUCKET
remote_path = f"{remote}:{folder}/{remote_name}"
if os.path.exists(RCLONE_EXE):
return run_rclone(["copyto", remote_path, local_path, "--progress"])
return _download_file_boto3(remote_name, local_path, storage)
def get_db_connection(db_url):
"""Returns a psycopg2 connection, sanitizing the URL for non-standard params."""
try:
import psycopg2
from urllib.parse import urlparse, unquote
# πŸ”± [Imperial Aegis] Bitwarden ν‚€(Imperial:)라면 μ‹€μ œ κ°’ 인좜
if db_url and "Imperial:" in str(db_url):
db_url = _get_bw_credential(str(db_url))
# 1. Strip extra quotes
db_url = db_url.strip('"').strip("'")
# 2. Parse URI and use keyword arguments (Safer for psycopg2)
if db_url.startswith('postgresql://') or db_url.startswith('postgres://'):
parsed = urlparse(db_url)
return psycopg2.connect(
dbname=parsed.path.lstrip('/'),
user=parsed.username,
password=unquote(parsed.password) if parsed.password else None,
host=parsed.hostname,
port=parsed.port
)
return psycopg2.connect(db_url)
except Exception as e:
raise RuntimeError(f"Database Connection Error: {str(e)}")
def analyze_r2_db_gap(db_url, account_id, actual_file_list):
"""Analyzes the gap between DB and actual files without executing deletion."""
conn = None
try:
conn = get_db_connection(db_url)
cur = conn.cursor()
# 1. Find orphans in DB (to be deleted)
cur.execute("SELECT file_path FROM taemingames.r2_storage_index WHERE account_id = %s", (account_id,))
db_files = set(row[0] for row in cur.fetchall())
actual_set = set(actual_file_list)
orphans = list(db_files - actual_set)
# 2. Find missing in DB (to be registered)
missing = list(actual_set - db_files)
cur.close()
return {
"success": True,
"orphans_in_db": len(orphans),
"missing_in_r2": len(missing),
"final_r2_count": len(actual_file_list)
}
except Exception as e:
return {"success": False, "error": str(e)}
finally:
if conn:
conn.close()
def list_r2_all_files(account_id):
"""Lists ALL files recursively from R2 cloud for a specific account."""
config = R2_ACCOUNTS.get(account_id)
if not config:
return {"success": False, "error": f"Account {account_id} not found"}
bucket = config.get("bucket")
# β—μ œκ΅­ κ·œμ•½: rclone μ‚¬μš© 쀑지, boto3 μ „μš©μœΌλ‘œ ν΄λΌμš°λ“œ 인좜
try:
# [제ꡭ μ •μ •] images/portraits//file.png 와 같은 이쀑 μŠ¬λž˜μ‹œ 경둜 μ •κ·œν™”
file_paths = [p.replace('//', '/') for p in _list_r2_all_files_boto3(config)]
return {"success": True, "files": file_paths, "count": len(file_paths)}
except Exception as e:
return {"success": False, "error": f"R2 Cloud Scan Failed (boto3): {str(e)}"}
def analyze_r2_sync_full(db_url, account_id, local_base_dir):
"""Full 3-way analysis: R2 Cloud vs Local Cache vs DB."""
result = {
"success": False,
"cloud_count": 0,
"local_count": 0,
"db_count": 0,
"to_download": 0,
"orphans_in_db": 0,
"missing_in_db": 0,
"local_only": 0,
"final_r2_count": 0,
}
# 1. R2 Cloud listing
cloud_result = list_r2_all_files(account_id)
if not cloud_result.get("success"):
result["error"] = f"R2 ν΄λΌμš°λ“œ 쑰회 μ‹€νŒ¨: {cloud_result.get('error')}"
return result
cloud_set = set(cloud_result["files"])
result["cloud_count"] = len(cloud_set)
# 2. Local cache scan
local_files = []
if os.path.exists(local_base_dir):
for root, dirs, files in os.walk(local_base_dir):
for file in files:
if file.startswith('.') or 'node_modules' in root:
continue
src_path = os.path.join(root, file)
rel_path = os.path.relpath(src_path, local_base_dir).replace('\\', '/')
local_files.append(rel_path)
local_set = set(local_files)
result["local_count"] = len(local_set)
# 3. DB records
conn = None
try:
conn = get_db_connection(db_url)
cur = conn.cursor()
cur.execute("SELECT file_path FROM taemingames.r2_storage_index WHERE account_id = %s", (account_id,))
db_set = set(row[0] for row in cur.fetchall())
cur.close()
except Exception as e:
result["error"] = f"DB 쑰회 μ‹€νŒ¨: {str(e)}"
return result
finally:
if conn:
conn.close()
result["db_count"] = len(db_set)
# 4. Compute differences (cloud is the source of truth)
result["to_download"] = len(cloud_set - local_set) # Cloud에 μžˆμ§€λ§Œ λ‘œμ»¬μ— μ—†μŒ
result["orphans_in_db"] = len(db_set - cloud_set) # DB에 μžˆμ§€λ§Œ Cloud에 μ—†μŒ
result["missing_in_db"] = len(cloud_set - db_set) # Cloud에 μžˆμ§€λ§Œ DB에 μ—†μŒ
result["local_only"] = len(local_set - cloud_set) # λ‘œμ»¬μ—λ§Œ 있음 (Cloud에 μ—†μŒ)
result["final_r2_count"] = len(cloud_set) # 동기화 ν›„ μ΅œμ’… 파일 수
result["success"] = True
return result
def cleanup_local_orphans(local_base_dir, cloud_file_set):
"""Deletes local files that don't exist in R2 cloud (local-only orphans)."""
removed = 0
errors = 0
removed_paths = []
if not os.path.exists(local_base_dir):
return {"success": True, "removed": 0, "errors": 0}
for root, dirs, files in os.walk(local_base_dir):
for file in files:
if file.startswith('.') or 'node_modules' in root:
continue
src_path = os.path.join(root, file)
rel_path = os.path.relpath(src_path, local_base_dir).replace('\\', '/')
if rel_path not in cloud_file_set:
try:
os.remove(src_path)
removed += 1
removed_paths.append(rel_path)
except Exception as e:
print(f"[Local Cleanup Error] {rel_path}: {e}")
errors += 1
# Remove empty directories left behind
for root, dirs, files in os.walk(local_base_dir, topdown=False):
for d in dirs:
dir_path = os.path.join(root, d)
try:
if not os.listdir(dir_path):
os.rmdir(dir_path)
except Exception:
pass
return {"success": True, "removed": removed, "errors": errors}
def download_from_r2(account_id, file_paths, local_base_dir, progress_cb=None):
"""
[Imperial Boto3 Direct Download] rclone 없이 boto3둜 R2μ—μ„œ μ§€μ •λœ νŒŒμΌλ“€μ„ 둜컬둜 λ‹€μš΄λ‘œλ“œ.
Zero-Waste μ •ν•©μ„± μœ μ§€λ₯Ό μœ„ν•œ 동기화 μ „μš© ν•¨μˆ˜.
"""
config = R2_ACCOUNTS.get(account_id)
if not config:
return {"success": False, "error": f"Account '{account_id}' not found in registry."}
try:
import boto3
raw_account_id = config.get("account_id", account_id)
bucket = config.get("bucket", "cache")
access_key_name = config.get("access_key_id", "")
secret_key_name = config.get("secret_access_key", "")
# 기쑴의 κ²€μ¦λœ _get_bw_credential μ‚¬μš© (shell=True, env폴백 포함)
access_key = _get_bw_credential(access_key_name)
secret_key = _get_bw_credential(secret_key_name)
# 아직도 'Imperial:' 접두어가 남아 있으면 λ‘œλ“œ μ‹€νŒ¨
if not access_key or "Imperial:" in str(access_key):
return {"success": False, "error": "R2 자격증λͺ…을 λ‘œλ“œν•  수 μ—†μŠ΅λ‹ˆλ‹€ (Bitwarden 확인 ν•„μš”)."}
endpoint = f"https://{raw_account_id}.r2.cloudflarestorage.com"
s3 = boto3.client(
"s3",
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name="auto",
)
downloaded = 0
errors = 0
total = len(file_paths)
for i, rel_path in enumerate(file_paths):
dest_path = os.path.join(local_base_dir, rel_path.replace("/", os.sep))
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
try:
s3.download_file(bucket, rel_path, dest_path)
downloaded += 1
print(f"[R2 Download] βœ… ({i+1}/{total}) {rel_path}")
except Exception as e:
errors += 1
print(f"[R2 Download] ❌ ({i+1}/{total}) {rel_path}: {e}")
if progress_cb:
try:
progress_cb(i + 1, total)
except Exception:
pass
return {"success": True, "downloaded": downloaded, "errors": errors, "total": total}
except Exception as e:
import traceback
traceback.print_exc()
return {"success": False, "error": str(e)}
def cleanup_r2_db(db_url, account_id, actual_file_list):
"""Deletes orphaned records from DB that are not in the actual file list."""
if not actual_file_list:
return {"success": False, "error": "Actual file list is empty. Aborting to prevent total wipe."}
conn = None
try:
conn = get_db_connection(db_url)
cur = conn.cursor()
# Build query safely
query = "DELETE FROM taemingames.r2_storage_index WHERE account_id = %s AND file_path NOT IN %s"
cur.execute(query, (account_id, tuple(actual_file_list),))
row_count = cur.rowcount
conn.commit()
cur.close()
return {"success": True, "count": row_count}
except Exception as e:
return {"success": False, "error": str(e)}
finally:
if conn:
conn.close()
def cleanup_r2_physical(account_id, actual_file_list):
"""
Deletes files from R2 Cloud that are NOT in the local/DB actual list.
This effectively reduces physical storage usage.
"""
config = R2_ACCOUNTS.get(account_id)
if not config:
return {"success": False, "error": f"Account {account_id} not found in registry."}
bucket = config.get("bucket")
remote = f":s3:{bucket}"
# 1. Get current cloud file list
ls_res = list_files("r2", config)
if not ls_res["success"]:
return ls_res
cloud_files = [f['Path'] for f in ls_res.get("files", [])]
actual_set = set(actual_file_list)
# 2. Identify cloud orphans (Files in Cloud but not in our master list)
cloud_orphans = [f for f in cloud_files if f not in actual_set]
if not cloud_orphans:
return {"success": True, "count": 0, "message": "No physical orphans found in R2."}
print(f"🧹 [Imperial Cleanup] Deleting {len(cloud_orphans)} ghost files from R2:{bucket}...")
# 3. Executing deletion (One by one or via delete-from file if too many)
count = 0
for file_path in cloud_orphans:
res = run_rclone(["deletefile", f"{remote}/{file_path}"] + get_rclone_args(config))
if res["success"]:
count += 1
return {"success": True, "count": count, "total_scanned": len(cloud_files)}
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python cloud_tool.py [list|upload|download] [storage:gdrive/r2] [args...]")
sys.exit(1)
check_and_install_rclone()
action = sys.argv[1]
if action == "list":
storage = sys.argv[2] if len(sys.argv) > 2 else "gdrive"
res = list_files(storage)
if res["success"]:
print(f"\n--- {storage.upper()} File List ---")
files = res.get("files")
if files and isinstance(files, list):
for f in files:
size = f.get('Size', 0)
print(f"[{size} bytes] {f.get('Name')} ({f.get('ModTime', 'N/A')})")
else:
print(res.get("raw") or "No files found.")
else:
print(f"❌ Error: {res['error']}")
if "r2" in str(res['error']).lower():
print("πŸ’‘ R2 remote might not be configured. Try 'rclone config' to add R2.")
elif action == "upload":
if len(sys.argv) < 3:
print("Usage: python cloud_tool.py upload <local_path> [storage:gdrive/r2] [remote_name]")
sys.exit(1)
local_path = sys.argv[2]
storage = sys.argv[3] if len(sys.argv) > 3 else None
remote_name = sys.argv[4] if len(sys.argv) > 4 else None
res = upload_file(local_path, storage, remote_name)
if res["success"]:
print(f"βœ… Upload successful!")
else:
print(f"❌ Upload failed: {res['error']}")
elif action == "download":
if len(sys.argv) < 4:
print("Usage: python cloud_tool.py download <remote_name> <local_path> [storage:gdrive/r2]")
sys.exit(1)
remote_name = sys.argv[2]
local_target = sys.argv[3]
storage = sys.argv[4] if len(sys.argv) > 4 else "gdrive"
res = download_file(remote_name, local_target, storage)
if res["success"]:
print(f"βœ… Download successful!")
else:
print(f"❌ Download failed: {res['error']}")