import os import sys import subprocess import json import re import datetime try: from google import genai from google.genai import types except ImportError: genai = None types = None from rclone_installer import RCLONE_EXE, check_and_install_rclone import quota_manager from quota_manager import ImperialQuotaManager, QuotaExceededError # Remote Names GD_REMOTE = "gdrive" R2_REMOTE = "r2" # Folders GD_FOLDER = "Guardians_Workspace/Aizen_Forge" R2_BUCKET = "cache" # Default bucket name for R2 # ====== R2 Multi-Account Registry (Imperial Core) ====== R2_ACCOUNTS = { "live": { "id": "live", "label": "๐ŸŸข Live (TaeminGames)", "account_id": "Imperial:R2_LIVE_ACCOUNT", "bucket": "cache", "public_url": "https://pub-9319df3ac5d34263a673eaa4ff3621d3.r2.dev", "access_key_id": "Imperial:R2_LIVE_ACCESS_KEY_ID", "secret_access_key": "Imperial:R2_LIVE_SECRET_ACCESS_KEY", "lifecycle_access_key_id": "Imperial:R2_LIVE_LIFECYCLE_ACCESS_KEY_ID", "lifecycle_secret_access_key": "Imperial:R2_LIVE_LIFECYCLE_SECRET_ACCESS_KEY" }, "vault": { "id": "vault", "label": "๐ŸŸก Vault (Jarvis)", "account_id": "Imperial:R2_VAULT_ACCOUNT", "bucket": "cache", "public_url": "https://pub-33ef6fd6e25b486c9c703182b9782a13.r2.dev", "access_key_id": "Imperial:R2_VAULT_ACCESS_KEY_ID", "secret_access_key": "Imperial:R2_VAULT_SECRET_ACCESS_KEY", "lifecycle_access_key_id": "Imperial:R2_VALUT_LIFECYCLE_ACCESS_KEY_ID", "lifecycle_secret_access_key": "Imperial:R2_VALUT_LIFECYCLE_SECRET_ACCESS_KEY" } } # Threshold for large files (100MB) LARGE_FILE_THRESHOLD = 100 * 1024 * 1024 # Local R2 Root LOCAL_R2_ROOT = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "R2") def get_account_id(public_url): """Extracts the unique ID from R2 Public URL.""" if not public_url: return "default" # Matches 'pub-[ID].r2.dev' match = re.search(r"pub-([a-z0-9]+)", public_url) if match: return match.group(1) return "default" # ๐Ÿ”ฑ [Imperial Aegis Cache] BW ์ž๊ฒฉ์ฆ๋ช… ์ธ๋ฉ”๋ชจ๋ฆฌ ์บ์‹œ # - ํ”„๋กœ์„ธ์Šค ์žฌ์‹œ์ž‘ ์ „๊นŒ์ง€ ๋™์ผ ํ‚ค๋Š” BW CLI ์—†์ด ์ฆ‰์‹œ ๋ฐ˜ํ™˜ # - ์„œ๋ฒ„ ๊ธฐ๋™ ์‹œ _prewarm_credentials()๋กœ ๋ชจ๋“  ํ‚ค๋ฅผ ์„ ์ œ ๋กœ๋“œ _BW_CREDENTIAL_CACHE: dict = {} # ๐Ÿ”ฑ [Imperial R2 List Cache] ํŒŒ์ผ ๋ชฉ๋ก ์‘๋‹ต ์บ์‹œ (TTL: 30์ดˆ) # - ๋™์ผ bucket+path ์žฌ์š”์ฒญ์€ API ํ˜ธ์ถœ ์—†์ด ์ฆ‰์‹œ ๋ฐ˜ํ™˜ import time as _time _R2_LIST_CACHE: dict = {} # key: (bucket, path) โ†’ {'files': [...], 'ts': float} _R2_LIST_TTL = 30 # ์ดˆ # ๐Ÿ”ฑ [Imperial Cloud Guard] Determine if running in cloud IS_RENDER = os.environ.get('RENDER') == 'true' IS_CLOUD_RUN = os.environ.get('K_SERVICE') is not None IS_CLOUD = IS_RENDER or IS_CLOUD_RUN def _get_bw_credential(name): """Fetches a credential. On Cloud (Render/GCP), skips BW and uses Env only. ๊ฒฐ๊ณผ๋Š” ์ธ๋ฉ”๋ชจ๋ฆฌ ์บ์‹œ(_BW_CREDENTIAL_CACHE)์— ์ €์žฅ๋ฉ๋‹ˆ๋‹ค. """ if not isinstance(name, str) or "Imperial:" not in str(name): return name if name in _BW_CREDENTIAL_CACHE: return _BW_CREDENTIAL_CACHE[name] # Standardize key for Env lookup env_key = name.replace("Imperial:", "").replace("Imperial: ", "").strip() env_val = os.environ.get(env_key) if env_val and "Imperial:" not in str(env_val): _BW_CREDENTIAL_CACHE[name] = env_val return env_val # ๐Ÿ”ฑ Cloud Environment: BW CLI is not available. if IS_CLOUD: print(f"[๐Ÿ”ฑ Cloud Alert] Missing Environment Variable: {env_key}") return None # Standardize both ways: 'Imperial: KEY' and 'Imperial:KEY' name_no_space = name.replace("Imperial: ", "Imperial:").strip() name_with_space = name_no_space.replace("Imperial:", "Imperial: ").strip() candidates = [name, name_no_space, name_with_space] unique_candidates = [] for c in candidates: if c not in unique_candidates: unique_candidates.append(c) for cand in unique_candidates: try: result = subprocess.run( f'bw get password "{cand}" --raw', capture_output=True, text=True, check=True, shell=True, timeout=30 ) val = result.stdout.strip() if val: for key in unique_candidates: _BW_CREDENTIAL_CACHE[key] = val return val except: continue print(f"[Aegis Error] Failed to fetch {name} from BW/Env. (Key: {env_key})") return None def clear_bw_credential_cache(): """๐Ÿ”ฑ ์ž๊ฒฉ์ฆ๋ช… ์บ์‹œ๋ฅผ ์ˆ˜๋™์œผ๋กœ ๋น„์›๋‹ˆ๋‹ค. BW ๋น„๋ฐ€๋ฒˆํ˜ธ ๋ณ€๊ฒฝ ํ›„ ํ˜ธ์ถœํ•˜์„ธ์š”.""" global _BW_CREDENTIAL_CACHE, _R2_LIST_CACHE, _R2_STATS_CACHE count = len(_BW_CREDENTIAL_CACHE) _BW_CREDENTIAL_CACHE.clear() _R2_LIST_CACHE.clear() _R2_STATS_CACHE.clear() print(f"[Aegis Cache] {count}๊ฐœ ์ž๊ฒฉ์ฆ๋ช… + R2 ๋ชฉ๋ก/์ƒํƒœ ์บ์‹œ ์ „๋ฉด ์ •ํ™” ์™„๋ฃŒ.") def clear_r2_list_cache(account_id=None, bucket=None, remote_path=None): """๐Ÿ”ฑ ํŠน์ • ํ˜น์€ ์ „์ฒด R2 ํŒŒ์ผ ๋ชฉ๋ก ์บ์‹œ๋ฅผ ์ฆ‰์‹œ ์ œ๊ฑฐํ•ฉ๋‹ˆ๋‹ค.""" global _R2_LIST_CACHE, _R2_STATS_CACHE # [์ œ๊ตญ ๊ทœ์•ฝ] ๋ถ€๋ถ„ ์‚ญ์ œ ๋กœ์ง์˜ ๊ฒฝ๋กœ ํฌ๋งท ๋ถˆ์ผ์น˜ ๋ฒ„๊ทธ๋ฅผ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด, # ์—…๋กœ๋“œ/์‚ญ์ œ ๋ฐœ์ƒ ์‹œ ์บ์‹œ ์ „์ฒด๋ฅผ ๋ฌด์กฐ๊ฑด ์ •ํ™”(Purge)ํ•ฉ๋‹ˆ๋‹ค. _R2_LIST_CACHE.clear() _R2_STATS_CACHE.clear() return def _prewarm_credentials(): """๐Ÿ”ฑ ์„œ๋ฒ„ ๊ธฐ๋™ ์‹œ ๋ชจ๋“  R2 ๊ณ„์ •์˜ ์ž๊ฒฉ์ฆ๋ช…์„ ์„ ์ œ ๋กœ๋“œํ•ฉ๋‹ˆ๋‹ค.""" # Cloud์—์„œ๋Š” ์–ด์ฐจํ”ผ Env์—์„œ ๊ฐ€์ ธ์˜ค๋ฏ€๋กœ Pre-warm์ด ๋ถˆํ•„์š”ํ•˜๊ฑฐ๋‚˜ ์—๋Ÿฌ ๋ฉ”์‹œ์ง€๋งŒ ๋‚จ๋ฐœํ•จ if IS_CLOUD: return all_keys = set() for acc in R2_ACCOUNTS.values(): for k in ["access_key_id", "secret_access_key", "account_id"]: key_val = acc.get(k, "") if key_val and "Imperial:" in str(key_val): all_keys.add(key_val) loaded = 0 for key in all_keys: if key not in _BW_CREDENTIAL_CACHE: val = _get_bw_credential(key) if val and "Imperial:" not in str(val): loaded += 1 if loaded > 0: print(f"[Aegis Pre-warm] R2 ์ž๊ฒฉ์ฆ๋ช… {loaded}/{len(all_keys)}๊ฐœ ์„ ์ œ ๋กœ๋“œ ์™„๋ฃŒ.") # ๐Ÿ”ฅ ์„œ๋ฒ„/๋ชจ๋“ˆ ๊ธฐ๋™ ์‹œ ์ž๋™ Pre-warm (๋กœ์ปฌ ์ „์šฉ) def _prewarm_async(): if IS_CLOUD: return import threading t = threading.Thread(target=_prewarm_credentials, name="aegis-prewarm", daemon=True) t.start() _prewarm_async() def get_rclone_args(account_config): """ Returns base rclone flags for a specific R2 account to avoid config file dependency. """ access_key = _get_bw_credential(account_config.get("access_key_id")) secret_key = _get_bw_credential(account_config.get("secret_access_key")) account_id = _get_bw_credential(account_config.get("account_id")) return [ "--s3-provider", "Cloudflare", "--s3-access-key-id", access_key, "--s3-secret-access-key", secret_key, "--s3-endpoint", f"https://{account_id}.r2.cloudflarestorage.com", ] def run_rclone(args, account_config=None): """Executes rclone with the given arguments, always returning a standardized dict.""" if not os.path.exists(RCLONE_EXE): return {"success": False, "output": None, "error": "rclone not installed"} base_args = [RCLONE_EXE] if account_config: base_args.extend(get_rclone_args(account_config)) cmd = base_args + args try: result = subprocess.run(cmd, capture_output=True, text=True, check=True, encoding='utf-8') return {"success": True, "output": result.stdout, "error": None} except subprocess.CalledProcessError as e: print(f"[Cloud Error] rclone failed: {e.stderr}") return {"success": False, "output": None, "error": e.stderr} except Exception as e: print(f"[Cloud Error] Unexpected error: {e}") return {"success": False, "output": None, "error": str(e)} _R2_LIFECYCLE_CACHE: dict = {} # key: (account_id, bucket) โ†’ {'rules': [...], 'ts': float} _R2_LIFECYCLE_TTL = 60 # ์ดˆ def _get_expiration_rules(account_config, bucket_name, account_id): """Lifecycle ๊ทœ์น™ ์ค‘ Expiration์ด ์žˆ๋Š” (prefix, days) ๋ชฉ๋ก์„ ์บ์‹œ์™€ ํ•จ๊ป˜ ์กฐํšŒ. Admin(Lifecycle) ์ž๊ฒฉ์ฆ๋ช…์ด ์—†๊ฑฐ๋‚˜ ์‹คํŒจํ•˜๋ฉด ์กฐ์šฉํžˆ ๋นˆ ๋ชฉ๋ก์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค(ํŒŒ์ผ ๋ชฉ๋ก ์ž์ฒด๋Š” ์‹คํŒจ์‹œํ‚ค์ง€ ์•Š์Œ).""" cache_key = (account_id, bucket_name) cached = _R2_LIFECYCLE_CACHE.get(cache_key) if cached and (_time.time() - cached['ts']) < _R2_LIFECYCLE_TTL: return cached['rules'] rules = [] try: import boto3 from botocore.config import Config lc_access = _get_bw_credential(account_config.get("lifecycle_access_key_id")) or _get_bw_credential(account_config.get("access_key_id")) lc_secret = _get_bw_credential(account_config.get("lifecycle_secret_access_key")) or _get_bw_credential(account_config.get("secret_access_key")) s3_admin = boto3.client( service_name='s3', endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com', aws_access_key_id=lc_access, aws_secret_access_key=lc_secret, region_name='auto', config=Config(signature_version='s3v4') ) resp = s3_admin.get_bucket_lifecycle_configuration(Bucket=bucket_name) for r in resp.get("Rules", []) or []: exp = r.get("Expiration") if not exp or "Days" not in exp: continue flt = r.get("Filter") or {} r_prefix = flt.get("Prefix") if r_prefix is None and isinstance(flt.get("And"), dict): r_prefix = flt["And"].get("Prefix") if r_prefix is None: r_prefix = r.get("Prefix", "") if r.get("Status") != "Enabled": continue rules.append({"prefix": r_prefix or "", "days": exp["Days"]}) except Exception as e: print(f"[R2 Lifecycle] ๋งŒ๋ฃŒ ๊ทœ์น™ ์กฐํšŒ ์‹คํŒจ(๋ฌด์‹œ): {e}") rules = [] _R2_LIFECYCLE_CACHE[cache_key] = {'rules': rules, 'ts': _time.time()} return rules def _match_expiration_rule(key, rules): """๊ฐ€์žฅ ๊ธธ๊ฒŒ ๋งค์นญ๋˜๋Š” prefix ๊ทœ์น™์„ ์ฐพ์•„ ๋ฐ˜ํ™˜(์—†์œผ๋ฉด None).""" best = None for r in rules: if key.startswith(r["prefix"]): if best is None or len(r["prefix"]) > len(best["prefix"]): best = r return best def _list_r2_files_boto3(account_config, remote_path="", bucket_name="cache"): """boto3๋ฅผ ์‚ฌ์šฉํ•œ R2 ํŒŒ์ผ ๋ชฉ๋ก ์กฐํšŒ (rclone ๋ฏธ์„ค์น˜ ํด๋ฐฑ, ๋ธŒ๋ผ์šฐ์ง•์šฉ).""" import boto3 access_key = _get_bw_credential(account_config.get("access_key_id")) secret_key = _get_bw_credential(account_config.get("secret_access_key")) account_id = _get_bw_credential(account_config.get("account_id")) from botocore.config import Config # ๐Ÿ”ฑ [Imperial Debug] ๋งˆ์Šคํ‚น๋œ ID ์ถœ๋ ฅํ•˜์—ฌ ํ™˜๊ฒฝ๋ณ€์ˆ˜ ๋กœ๋“œ ์—ฌ๋ถ€ ํ™•์ธ masked_id = f"{account_id[:4]}...{account_id[-4:]}" if (account_id and len(account_id) > 8) else str(account_id) print(f"[R2 Debug] Using Account ID: {masked_id}") s3 = boto3.client( service_name='s3', endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='auto', config=Config(signature_version='s3v4') ) expiration_rules = _get_expiration_rules(account_config, bucket_name, account_id) prefix = remote_path.strip("/") + "/" if remote_path else "" kwargs = {"Bucket": bucket_name, "Delimiter": "/"} if prefix: kwargs["Prefix"] = prefix files = [] while True: resp = s3.list_objects_v2(**kwargs) # ๋””๋ ‰ํ† ๋ฆฌ(๊ณตํ†ต ์ ‘๋‘์‚ฌ) for cp in resp.get("CommonPrefixes") or []: name = cp["Prefix"].rstrip("/").split("/")[-1] files.append({"Path": cp["Prefix"], "Name": name, "Size": 0, "IsDir": True, "MimeType": "inode/directory", "ModTime": None, "ExpireDays": None}) # ํŒŒ์ผ for obj in resp.get("Contents") or []: key = obj["Key"] if key == prefix: # ํด๋” ์ž์ฒด ์Šคํ‚ต continue name = key.split("/")[-1] last_modified = obj["LastModified"] mod_time = last_modified.strftime("%Y-%m-%dT%H:%M:%S.000Z") expire_days = None rule = _match_expiration_rule(key, expiration_rules) if rule: import datetime as _dt expire_at = last_modified + _dt.timedelta(days=rule["days"]) now = _dt.datetime.now(last_modified.tzinfo) expire_days = max(0, (expire_at - now).days) files.append({"Path": key, "Name": name, "Size": obj["Size"], "IsDir": False, "MimeType": "application/octet-stream", "ModTime": mod_time, "ExpireDays": expire_days}) if resp.get("IsTruncated"): kwargs["ContinuationToken"] = resp["NextContinuationToken"] else: break return files def _list_r2_all_files_boto3(account_config): """boto3๋ฅผ ์‚ฌ์šฉํ•œ R2 ์ „์ฒด ํŒŒ์ผ ๋ชฉ๋ก ์žฌ๊ท€์  ์กฐํšŒ (rclone ๋ฏธ์„ค์น˜์šฉ).""" import boto3 access_key = _get_bw_credential(account_config.get("access_key_id")) secret_key = _get_bw_credential(account_config.get("secret_access_key")) account_id = _get_bw_credential(account_config.get("account_id")) bucket_name = account_config.get("bucket", "cache") s3 = boto3.client( service_name='s3', endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com', aws_access_key_id=access_key, aws_secret_access_key=secret_key, ) file_paths = [] continuation_token = None while True: list_args = {"Bucket": bucket_name} if continuation_token: list_args["ContinuationToken"] = continuation_token resp = s3.list_objects_v2(**list_args) for obj in resp.get("Contents", []): key = obj["Key"] if not key.endswith('/'): # Skip directory placeholders file_paths.append(key) if resp.get("IsTruncated"): continuation_token = resp.get("NextContinuationToken") else: break return file_paths def _download_from_r2_boto3(account_config, files_to_download, local_base_dir, progress_callback=None): """boto3๋ฅผ ์‚ฌ์šฉํ•œ R2 ํŒŒ์ผ ๋‹ค์šด๋กœ๋“œ (rclone ๋ฏธ์„ค์น˜์šฉ).""" import boto3 access_key = _get_bw_credential(account_config.get("access_key_id")) secret_key = _get_bw_credential(account_config.get("secret_access_key")) account_id = _get_bw_credential(account_config.get("account_id")) bucket_name = account_config.get("bucket", "cache") s3 = boto3.client( service_name='s3', endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com', aws_access_key_id=access_key, aws_secret_access_key=secret_key, ) downloaded = 0 errors = 0 for i, file_path in enumerate(files_to_download): local_path = os.path.join(local_base_dir, file_path.replace('/', os.sep)) os.makedirs(os.path.dirname(local_path), exist_ok=True) try: s3.download_file(bucket_name, file_path, local_path) downloaded += 1 except Exception as e: errors += 1 print(f"[R2 Download Error (boto3)] {file_path}: {e}") if progress_callback: progress_callback(i + 1, len(files_to_download)) return {"success": True, "downloaded": downloaded, "errors": errors} def list_r2_files(account_config, remote_path="", bucket_name="cache"): """Lists files in R2. rclone ์šฐ์„ , ๋ฏธ์„ค์น˜ ์‹œ boto3 ํด๋ฐฑ. ๐Ÿ”ฑ ์‘๋‹ต ์บ์‹œ: ๋™์ผ bucket+path๋Š” 30์ดˆ๊ฐ„ ์žฌ์กฐํšŒ ์—†์ด ์ฆ‰์‹œ ๋ฐ˜ํ™˜. """ cache_key = (account_config.get("account_id", "default"), bucket_name, remote_path) now = _time.time() cached = _R2_LIST_CACHE.get(cache_key) if cached and (now - cached['ts']) < _R2_LIST_TTL: return cached['files'] # โ—์ œ๊ตญ ๊ทœ์•ฝ: rclone ์‚ฌ์šฉ ์ค‘์ง€, boto3 100% ์ „์šฉ try: files = _list_r2_files_boto3(account_config, remote_path, bucket_name) except Exception as e: raise RuntimeError(f"R2 Listing Failed (boto3 fallback): {e}") if files is not None: _R2_LIST_CACHE[cache_key] = {'files': files, 'ts': now} return files def get_r2_bucket_size(account_config, bucket_name="cache"): """Returns {'count': N, 'bytes': B} for the ENTIRE bucket (recursive). ๐Ÿ”ฑ Uses boto3 100% (rclone obsoleted). ์ด ํ•จ์ˆ˜๋Š” ํ•˜ํŠธ๋น„ํŠธ ์‚ฌ์šฉ๋Ÿ‰ ํ‘œ์‹œ ์ „์šฉ์ž…๋‹ˆ๋‹ค. ๋ธŒ๋ผ์šฐ์ € UI์—๋Š” list_r2_files๋ฅผ ์‚ฌ์šฉํ•˜์‹ญ์‹œ์˜ค. """ # โ—์ œ๊ตญ ๊ทœ์•ฝ: rclone ์‚ฌ์šฉ ์ค‘์ง€, boto3 ์žฌ๊ท€ try: import boto3 access_key = _get_bw_credential(account_config.get("access_key_id")) secret_key = _get_bw_credential(account_config.get("secret_access_key")) account_id = _get_bw_credential(account_config.get("account_id")) if not all([access_key, secret_key, account_id]): missing = [] if not access_key: missing.append("ACCESS_KEY_ID") if not secret_key: missing.append("SECRET_ACCESS_KEY") if not account_id: missing.append("ACCOUNT_ID") raise RuntimeError(f"Missing R2 Credentials in Environment: {', '.join(missing)}") from botocore.config import Config s3 = boto3.client( service_name='s3', endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='auto', config=Config(signature_version='s3v4') ) total_bytes = 0 total_count = 0 paginator = s3.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=bucket_name): for obj in page.get("Contents", []): if not obj["Key"].endswith("/"): # ํด๋” ์ž์ฒด ์ œ์™ธ total_bytes += obj["Size"] total_count += 1 formatted = f"{total_bytes / (1024*1024):.2f} MB" if total_bytes < 1024**3 else f"{total_bytes / (1024**3):.2f} GB" return {"count": total_count, "bytes": total_bytes, "formatted": formatted} except Exception as e: raise RuntimeError(f"get_r2_bucket_size failed: {e}") def sync_r2_with_local(account_config, direction="to_cloud", bucket_name="cache"): """ Syncs local folder with R2. Local path is automatically derived from Account ID. """ account_id = get_account_id(account_config.get("public_url")) local_path = os.path.join(LOCAL_R2_ROOT, account_id) os.makedirs(local_path, exist_ok=True) remote_path = f":s3:{bucket_name}" if direction == "to_cloud": args = ["copy", local_path, remote_path, "--progress"] elif direction == "from_cloud": args = ["copy", remote_path, local_path, "--progress"] else: # Dangerous: sync can delete files. We use 'copy' for safety unless 'sync' is forced. args = ["copy", local_path, remote_path, "--progress"] result = run_rclone(args, account_config) if not result["success"]: raise RuntimeError(f"R2 Sync Failed: {result['error']}") return result def _list_files_boto3(storage="r2", remote_path=""): """boto3๋ฅผ ์‚ฌ์šฉํ•œ ํŒŒ์ผ ๋ชฉ๋ก ์กฐํšŒ (rclone ๋ฏธ์„ค์น˜์šฉ). gdrive๋Š” ํ˜„์žฌ ๋ฏธ์ง€์›(R2 ์ „์šฉ).""" if storage == "gdrive": return {"success": False, "error": "boto3 fallback only supports R2. gdrive needs rclone."} # R2 ๋ธŒ๋ผ์šฐ์ง•์šฉ (live ๊ณ„์ • ๊ธฐ๋ณธ๊ฐ’ ์‚ฌ์šฉ) config = R2_ACCOUNTS.get("live") return {"success": True, "files": _list_r2_files_boto3(config, remote_path), "storage": storage} def _upload_file_boto3(local_path, storage="r2", remote_name=None): """boto3๋ฅผ ์‚ฌ์šฉํ•œ ํŒŒ์ผ ์—…๋กœ๋“œ (rclone ๋ฏธ์„ค์น˜์šฉ). gdrive๋Š” ํ˜„์žฌ ๋ฏธ์ง€์›.""" import boto3 if storage == "gdrive": return {"success": False, "error": "boto3 fallback only supports R2. gdrive needs rclone."} config = R2_ACCOUNTS.get("live") access_key = _get_bw_credential(config.get("access_key_id")) secret_key = _get_bw_credential(config.get("secret_access_key")) account_id = _get_bw_credential(config.get("account_id")) bucket_name = config.get("bucket", "cache") s3 = boto3.client( service_name='s3', endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com', aws_access_key_id=access_key, aws_secret_access_key=secret_key, ) target_key = remote_name or os.path.basename(local_path) try: s3.upload_file(local_path, bucket_name, target_key) return {"success": True, "output": f"Uploaded {target_key} to R2", "error": None} except Exception as e: return {"success": False, "output": None, "error": str(e)} def _download_file_boto3(remote_name, local_path, storage="r2"): """boto3๋ฅผ ์‚ฌ์šฉํ•œ ํŒŒ์ผ ๋‹ค์šด๋กœ๋“œ (rclone ๋ฏธ์„ค์น˜์šฉ). gdrive๋Š” ํ˜„์žฌ ๋ฏธ์ง€์›.""" import boto3 if storage == "gdrive": return {"success": False, "error": "boto3 fallback only supports R2. gdrive needs rclone."} config = R2_ACCOUNTS.get("live") access_key = _get_bw_credential(config.get("access_key_id")) secret_key = _get_bw_credential(config.get("secret_access_key")) account_id = _get_bw_credential(config.get("account_id")) bucket_name = config.get("bucket", "cache") s3 = boto3.client( service_name='s3', endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com', aws_access_key_id=access_key, aws_secret_access_key=secret_key, ) try: s3.download_file(bucket_name, remote_name, local_path) return {"success": True, "output": f"Downloaded {remote_name} from R2", "error": None} except Exception as e: return {"success": False, "output": None, "error": str(e)} def list_files(storage="gdrive"): """Lists files in the specified cloud storage.""" remote = GD_REMOTE if storage == "gdrive" else R2_REMOTE folder = GD_FOLDER if storage == "gdrive" else "" remote_path = f"{remote}:{folder}" if os.path.exists(RCLONE_EXE): result = run_rclone(["lsjson", remote_path, "--max-depth", "1"]) if result["success"] and isinstance(result["output"], str): try: files = json.loads(result["output"]) return {"success": True, "files": files, "storage": storage} except: return {"success": True, "raw": result["output"], "storage": storage} if storage == "gdrive": return result # Fallback to boto3 for R2 return _list_files_boto3(storage) def upload_file(local_path, storage=None, remote_name=None): """ Uploads a local file to cloud. If storage is None, it suggests R2 for large files (>100MB). """ if not os.path.exists(local_path): return {"success": False, "error": f"Local file not found: {local_path}"} file_size = os.path.getsize(local_path) if storage is None: storage = "r2" if file_size > LARGE_FILE_THRESHOLD else "gdrive" print(f"๐Ÿ’ก File size: {file_size/1024/1024:.1f}MB. Auto-selecting {storage}...") remote = GD_REMOTE if storage == "gdrive" else R2_REMOTE folder = GD_FOLDER if storage == "gdrive" else R2_BUCKET target_name = remote_name or os.path.basename(local_path) remote_path = f"{remote}:{folder}/{target_name}" print(f"โณ Uploading to {storage}: {target_name}...") if os.path.exists(RCLONE_EXE): return run_rclone(["copyto", local_path, remote_path, "--progress"]) return _upload_file_boto3(local_path, storage, target_name) def generate_image_google(prompt, model_id="imagen-4.0-generate-001", aspect_ratio="1:1", output_file="generated_image.png"): """ Generates an image using Google GenAI SDK 2.0+ (Imagen 4.0 Pro). This is the dedicated high-quality image generation model. """ if genai is None: return {"success": False, "error": "google-genai SDK 2.0+ is not installed."} api_key = _get_bw_credential("Imperial: GEMINI_API_KEY") if not api_key or "Imperial:" in str(api_key): return {"success": False, "error": "Failed to fetch GEMINI_API_KEY from Bitwarden."} try: client = genai.Client(api_key=api_key) # ๐Ÿ”ฑ Imperial Quota Guard: ํ˜ธ์ถœ ์ „ ์ฟผํ„ฐ ํ™•์ธ try: ImperialQuotaManager.validate_quota(model_id) except QuotaExceededError as qe: return {"success": False, "error": str(qe)} # Standard Imagen 4.0 Pro Call (using predict-style or generate_images) full_model_id = f"models/{model_id}" if not model_id.startswith("models/") else model_id response = client.models.generate_images( model=full_model_id, prompt=prompt, config=types.GenerateImagesConfig( number_of_images=1, aspect_ratio=aspect_ratio, output_mime_type="image/png" ) ) if response.generated_images: # Track Usage in DB ImperialQuotaManager.track_usage(model_id) img = response.generated_images[0] image_data = None if hasattr(img, 'image') and hasattr(img.image, 'image_bytes'): image_data = img.image.image_bytes elif hasattr(img, 'image_bytes'): image_data = img.image_bytes if image_data: with open(output_file, "wb") as f: f.write(image_data) return { "success": True, "output_file": output_file, "model": full_model_id } return {"success": False, "error": "No images were generated in response."} except Exception as e: return {"success": False, "error": str(e)} def generate_image_nano_banana(prompt, input_image_path=None, model_id="gemini-2.5-flash-image", output_file="nano_banana_output.png"): """ Generates or edits an image using 'Nano Banana' (Gemini generateContent flow). Supports: gemini-2.5-flash-image, gemini-3.1-flash-image-preview, gemini-3-pro-image-preview """ if genai is None: return {"success": False, "error": "google-genai SDK 2.0+ is not installed."} api_key = _get_bw_credential("Imperial: GEMINI_API_KEY") if not api_key: return {"success": False, "error": "Failed to fetch GEMINI_API_KEY."} try: client = genai.Client(api_key=api_key) contents = [] parts = [{"text": prompt}] if input_image_path and os.path.exists(input_image_path): with open(input_image_path, "rb") as f: image_bytes = f.read() parts.append(types.Part.from_bytes(data=image_bytes, mime_type="image/png")) contents.append(types.Content(role="user", parts=parts)) # Nano Banana (Gemini 2.5 Flash Image) specific call response = client.models.generate_content( model=model_id, contents=contents, config=types.GenerateContentConfig( response_modalities=['IMAGE'] ) ) # Extract image from multimodal response for part in response.candidates[0].content.parts: if part.inline_data: with open(output_file, "wb") as f: f.write(part.inline_data.data) return {"success": True, "output_file": output_file, "model": model_id} return {"success": False, "error": "No image data found in Nano Banana response."} except Exception as e: return {"success": False, "error": str(e)} def download_file(remote_name, local_path, storage="gdrive"): """Downloads a file from cloud to local.""" remote = GD_REMOTE if storage == "gdrive" else R2_REMOTE folder = GD_FOLDER if storage == "gdrive" else R2_BUCKET remote_path = f"{remote}:{folder}/{remote_name}" if os.path.exists(RCLONE_EXE): return run_rclone(["copyto", remote_path, local_path, "--progress"]) return _download_file_boto3(remote_name, local_path, storage) def get_db_connection(db_url): """Returns a psycopg2 connection, sanitizing the URL for non-standard params.""" try: import psycopg2 from urllib.parse import urlparse, unquote # ๐Ÿ”ฑ [Imperial Aegis] Bitwarden ํ‚ค(Imperial:)๋ผ๋ฉด ์‹ค์ œ ๊ฐ’ ์ธ์ถœ if db_url and "Imperial:" in str(db_url): db_url = _get_bw_credential(str(db_url)) # 1. Strip extra quotes db_url = db_url.strip('"').strip("'") # 2. Parse URI and use keyword arguments (Safer for psycopg2) if db_url.startswith('postgresql://') or db_url.startswith('postgres://'): parsed = urlparse(db_url) return psycopg2.connect( dbname=parsed.path.lstrip('/'), user=parsed.username, password=unquote(parsed.password) if parsed.password else None, host=parsed.hostname, port=parsed.port ) return psycopg2.connect(db_url) except Exception as e: raise RuntimeError(f"Database Connection Error: {str(e)}") def analyze_r2_db_gap(db_url, account_id, actual_file_list): """Analyzes the gap between DB and actual files without executing deletion.""" conn = None try: conn = get_db_connection(db_url) cur = conn.cursor() # 1. Find orphans in DB (to be deleted) cur.execute("SELECT file_path FROM taemingames.r2_storage_index WHERE account_id = %s", (account_id,)) db_files = set(row[0] for row in cur.fetchall()) actual_set = set(actual_file_list) orphans = list(db_files - actual_set) # 2. Find missing in DB (to be registered) missing = list(actual_set - db_files) cur.close() return { "success": True, "orphans_in_db": len(orphans), "missing_in_r2": len(missing), "final_r2_count": len(actual_file_list) } except Exception as e: return {"success": False, "error": str(e)} finally: if conn: conn.close() def list_r2_all_files(account_id): """Lists ALL files recursively from R2 cloud for a specific account.""" config = R2_ACCOUNTS.get(account_id) if not config: return {"success": False, "error": f"Account {account_id} not found"} bucket = config.get("bucket") # โ—์ œ๊ตญ ๊ทœ์•ฝ: rclone ์‚ฌ์šฉ ์ค‘์ง€, boto3 ์ „์šฉ์œผ๋กœ ํด๋ผ์šฐ๋“œ ์ธ์ถœ try: # [์ œ๊ตญ ์ •์ •] images/portraits//file.png ์™€ ๊ฐ™์€ ์ด์ค‘ ์Šฌ๋ž˜์‹œ ๊ฒฝ๋กœ ์ •๊ทœํ™” file_paths = [p.replace('//', '/') for p in _list_r2_all_files_boto3(config)] return {"success": True, "files": file_paths, "count": len(file_paths)} except Exception as e: return {"success": False, "error": f"R2 Cloud Scan Failed (boto3): {str(e)}"} def analyze_r2_sync_full(db_url, account_id, local_base_dir): """Full 3-way analysis: R2 Cloud vs Local Cache vs DB.""" result = { "success": False, "cloud_count": 0, "local_count": 0, "db_count": 0, "to_download": 0, "orphans_in_db": 0, "missing_in_db": 0, "local_only": 0, "final_r2_count": 0, } # 1. R2 Cloud listing cloud_result = list_r2_all_files(account_id) if not cloud_result.get("success"): result["error"] = f"R2 ํด๋ผ์šฐ๋“œ ์กฐํšŒ ์‹คํŒจ: {cloud_result.get('error')}" return result cloud_set = set(cloud_result["files"]) result["cloud_count"] = len(cloud_set) # 2. Local cache scan local_files = [] if os.path.exists(local_base_dir): for root, dirs, files in os.walk(local_base_dir): for file in files: if file.startswith('.') or 'node_modules' in root: continue src_path = os.path.join(root, file) rel_path = os.path.relpath(src_path, local_base_dir).replace('\\', '/') local_files.append(rel_path) local_set = set(local_files) result["local_count"] = len(local_set) # 3. DB records conn = None try: conn = get_db_connection(db_url) cur = conn.cursor() cur.execute("SELECT file_path FROM taemingames.r2_storage_index WHERE account_id = %s", (account_id,)) db_set = set(row[0] for row in cur.fetchall()) cur.close() except Exception as e: result["error"] = f"DB ์กฐํšŒ ์‹คํŒจ: {str(e)}" return result finally: if conn: conn.close() result["db_count"] = len(db_set) # 4. Compute differences (cloud is the source of truth) result["to_download"] = len(cloud_set - local_set) # Cloud์— ์žˆ์ง€๋งŒ ๋กœ์ปฌ์— ์—†์Œ result["orphans_in_db"] = len(db_set - cloud_set) # DB์— ์žˆ์ง€๋งŒ Cloud์— ์—†์Œ result["missing_in_db"] = len(cloud_set - db_set) # Cloud์— ์žˆ์ง€๋งŒ DB์— ์—†์Œ result["local_only"] = len(local_set - cloud_set) # ๋กœ์ปฌ์—๋งŒ ์žˆ์Œ (Cloud์— ์—†์Œ) result["final_r2_count"] = len(cloud_set) # ๋™๊ธฐํ™” ํ›„ ์ตœ์ข… ํŒŒ์ผ ์ˆ˜ result["success"] = True return result def cleanup_local_orphans(local_base_dir, cloud_file_set): """Deletes local files that don't exist in R2 cloud (local-only orphans).""" removed = 0 errors = 0 removed_paths = [] if not os.path.exists(local_base_dir): return {"success": True, "removed": 0, "errors": 0} for root, dirs, files in os.walk(local_base_dir): for file in files: if file.startswith('.') or 'node_modules' in root: continue src_path = os.path.join(root, file) rel_path = os.path.relpath(src_path, local_base_dir).replace('\\', '/') if rel_path not in cloud_file_set: try: os.remove(src_path) removed += 1 removed_paths.append(rel_path) except Exception as e: print(f"[Local Cleanup Error] {rel_path}: {e}") errors += 1 # Remove empty directories left behind for root, dirs, files in os.walk(local_base_dir, topdown=False): for d in dirs: dir_path = os.path.join(root, d) try: if not os.listdir(dir_path): os.rmdir(dir_path) except Exception: pass return {"success": True, "removed": removed, "errors": errors} def download_from_r2(account_id, file_paths, local_base_dir, progress_cb=None): """ [Imperial Boto3 Direct Download] rclone ์—†์ด boto3๋กœ R2์—์„œ ์ง€์ •๋œ ํŒŒ์ผ๋“ค์„ ๋กœ์ปฌ๋กœ ๋‹ค์šด๋กœ๋“œ. Zero-Waste ์ •ํ•ฉ์„ฑ ์œ ์ง€๋ฅผ ์œ„ํ•œ ๋™๊ธฐํ™” ์ „์šฉ ํ•จ์ˆ˜. """ config = R2_ACCOUNTS.get(account_id) if not config: return {"success": False, "error": f"Account '{account_id}' not found in registry."} try: import boto3 raw_account_id = config.get("account_id", account_id) bucket = config.get("bucket", "cache") access_key_name = config.get("access_key_id", "") secret_key_name = config.get("secret_access_key", "") # ๊ธฐ์กด์˜ ๊ฒ€์ฆ๋œ _get_bw_credential ์‚ฌ์šฉ (shell=True, envํด๋ฐฑ ํฌํ•จ) access_key = _get_bw_credential(access_key_name) secret_key = _get_bw_credential(secret_key_name) # ์•„์ง๋„ 'Imperial:' ์ ‘๋‘์–ด๊ฐ€ ๋‚จ์•„ ์žˆ์œผ๋ฉด ๋กœ๋“œ ์‹คํŒจ if not access_key or "Imperial:" in str(access_key): return {"success": False, "error": "R2 ์ž๊ฒฉ์ฆ๋ช…์„ ๋กœ๋“œํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค (Bitwarden ํ™•์ธ ํ•„์š”)."} endpoint = f"https://{raw_account_id}.r2.cloudflarestorage.com" s3 = boto3.client( "s3", endpoint_url=endpoint, aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="auto", ) downloaded = 0 errors = 0 total = len(file_paths) for i, rel_path in enumerate(file_paths): dest_path = os.path.join(local_base_dir, rel_path.replace("/", os.sep)) os.makedirs(os.path.dirname(dest_path), exist_ok=True) try: s3.download_file(bucket, rel_path, dest_path) downloaded += 1 print(f"[R2 Download] โœ… ({i+1}/{total}) {rel_path}") except Exception as e: errors += 1 print(f"[R2 Download] โŒ ({i+1}/{total}) {rel_path}: {e}") if progress_cb: try: progress_cb(i + 1, total) except Exception: pass return {"success": True, "downloaded": downloaded, "errors": errors, "total": total} except Exception as e: import traceback traceback.print_exc() return {"success": False, "error": str(e)} def cleanup_r2_db(db_url, account_id, actual_file_list): """Deletes orphaned records from DB that are not in the actual file list.""" if not actual_file_list: return {"success": False, "error": "Actual file list is empty. Aborting to prevent total wipe."} conn = None try: conn = get_db_connection(db_url) cur = conn.cursor() # Build query safely query = "DELETE FROM taemingames.r2_storage_index WHERE account_id = %s AND file_path NOT IN %s" cur.execute(query, (account_id, tuple(actual_file_list),)) row_count = cur.rowcount conn.commit() cur.close() return {"success": True, "count": row_count} except Exception as e: return {"success": False, "error": str(e)} finally: if conn: conn.close() def cleanup_r2_physical(account_id, actual_file_list): """ Deletes files from R2 Cloud that are NOT in the local/DB actual list. This effectively reduces physical storage usage. """ config = R2_ACCOUNTS.get(account_id) if not config: return {"success": False, "error": f"Account {account_id} not found in registry."} bucket = config.get("bucket") remote = f":s3:{bucket}" # 1. Get current cloud file list ls_res = list_files("r2", config) if not ls_res["success"]: return ls_res cloud_files = [f['Path'] for f in ls_res.get("files", [])] actual_set = set(actual_file_list) # 2. Identify cloud orphans (Files in Cloud but not in our master list) cloud_orphans = [f for f in cloud_files if f not in actual_set] if not cloud_orphans: return {"success": True, "count": 0, "message": "No physical orphans found in R2."} print(f"๐Ÿงน [Imperial Cleanup] Deleting {len(cloud_orphans)} ghost files from R2:{bucket}...") # 3. Executing deletion (One by one or via delete-from file if too many) count = 0 for file_path in cloud_orphans: res = run_rclone(["deletefile", f"{remote}/{file_path}"] + get_rclone_args(config)) if res["success"]: count += 1 return {"success": True, "count": count, "total_scanned": len(cloud_files)} if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python cloud_tool.py [list|upload|download] [storage:gdrive/r2] [args...]") sys.exit(1) check_and_install_rclone() action = sys.argv[1] if action == "list": storage = sys.argv[2] if len(sys.argv) > 2 else "gdrive" res = list_files(storage) if res["success"]: print(f"\n--- {storage.upper()} File List ---") files = res.get("files") if files and isinstance(files, list): for f in files: size = f.get('Size', 0) print(f"[{size} bytes] {f.get('Name')} ({f.get('ModTime', 'N/A')})") else: print(res.get("raw") or "No files found.") else: print(f"โŒ Error: {res['error']}") if "r2" in str(res['error']).lower(): print("๐Ÿ’ก R2 remote might not be configured. Try 'rclone config' to add R2.") elif action == "upload": if len(sys.argv) < 3: print("Usage: python cloud_tool.py upload [storage:gdrive/r2] [remote_name]") sys.exit(1) local_path = sys.argv[2] storage = sys.argv[3] if len(sys.argv) > 3 else None remote_name = sys.argv[4] if len(sys.argv) > 4 else None res = upload_file(local_path, storage, remote_name) if res["success"]: print(f"โœ… Upload successful!") else: print(f"โŒ Upload failed: {res['error']}") elif action == "download": if len(sys.argv) < 4: print("Usage: python cloud_tool.py download [storage:gdrive/r2]") sys.exit(1) remote_name = sys.argv[2] local_target = sys.argv[3] storage = sys.argv[4] if len(sys.argv) > 4 else "gdrive" res = download_file(remote_name, local_target, storage) if res["success"]: print(f"โœ… Download successful!") else: print(f"โŒ Download failed: {res['error']}")