|
|
""" |
|
|
Google Cloud wrapper for switching between multiple authenticated accounts. |
|
|
Supports both Cloud Storage and Google Sheets via gspread. |
|
|
|
|
|
Credential Sources (in order of priority): |
|
|
1. Explicit service account JSON file |
|
|
2. ADC (Application Default Credentials) - local or WIF |
|
|
|
|
|
Usage: |
|
|
from google_src.gcloud_wrapper import GCloudWrapper, GCloudAccount |
|
|
|
|
|
# Initialize with explicit credentials |
|
|
gcloud = GCloudWrapper({ |
|
|
'account1': GCloudAccount('Account 1', credentials_path='./gcloud-key1.json'), |
|
|
'account2': GCloudAccount('Account 2', credentials_path='./gcloud-key2.json'), |
|
|
}) |
|
|
|
|
|
# Or use ADC (works with local gcloud auth AND Workload Identity Federation) |
|
|
gcloud = GCloudWrapper({ |
|
|
'default': GCloudAccount('Default ADC', use_adc=True), |
|
|
}) |
|
|
|
|
|
# Get a storage client |
|
|
storage_client = gcloud.get_storage_client('account1') |
|
|
bucket = storage_client.bucket('my-bucket') |
|
|
|
|
|
# Get a gspread client for Sheets |
|
|
sheets_client = gcloud.get_sheets_client('account1') |
|
|
spreadsheet = sheets_client.open('My Sheet') |
|
|
|
|
|
# Convenience: get blob directly |
|
|
blob = gcloud.get_blob('account1', 'bucket-name', 'path/to/file.txt') |
|
|
""" |
|
|
|
|
|
import os |
|
|
from dataclasses import dataclass, field |
|
|
from typing import Dict, Optional, List |
|
|
from contextlib import contextmanager |
|
|
|
|
|
from google.auth import default as google_auth_default |
|
|
from google.auth import load_credentials_from_file |
|
|
from google.auth.credentials import Credentials as BaseCredentials |
|
|
from google.oauth2 import service_account |
|
|
from google.cloud import storage |
|
|
import gspread |
|
|
from src.config import get_config_value |
|
|
from src.logger_config import logger |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_SCOPES = [ |
|
|
"https://www.googleapis.com/auth/cloud-platform", |
|
|
"https://www.googleapis.com/auth/spreadsheets", |
|
|
"https://www.googleapis.com/auth/drive.file", |
|
|
] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class GCloudAccount: |
|
|
""" |
|
|
Represents a Google Cloud account configuration. |
|
|
|
|
|
Args: |
|
|
name: Human-readable name for the account |
|
|
credentials_path: Path to service account JSON file (optional if use_adc=True) |
|
|
use_adc: If True, use Application Default Credentials (ADC/WIF) |
|
|
project_id: Optional project ID override |
|
|
scopes: Optional custom scopes (defaults to storage + sheets + drive) |
|
|
""" |
|
|
name: str |
|
|
credentials_path: Optional[str] = None |
|
|
use_adc: bool = False |
|
|
project_id: Optional[str] = None |
|
|
scopes: List[str] = field(default_factory=lambda: DEFAULT_SCOPES.copy()) |
|
|
|
|
|
def __post_init__(self): |
|
|
|
|
|
if not self.credentials_path and not self.use_adc: |
|
|
raise ValueError( |
|
|
f"Account '{self.name}': Must provide either credentials_path or set use_adc=True" |
|
|
) |
|
|
|
|
|
|
|
|
if self.credentials_path: |
|
|
if not os.path.isabs(self.credentials_path): |
|
|
self.credentials_path = os.path.abspath(self.credentials_path) |
|
|
|
|
|
|
|
|
if not os.path.exists(self.credentials_path): |
|
|
raise FileNotFoundError( |
|
|
f"Credentials file not found for account '{self.name}': {self.credentials_path}" |
|
|
) |
|
|
|
|
|
def get_credentials(self) -> BaseCredentials: |
|
|
"""Get Google credentials for this account.""" |
|
|
if self.use_adc: |
|
|
|
|
|
creds, project = google_auth_default(scopes=self.scopes) |
|
|
|
|
|
|
|
|
if not self.project_id and project: |
|
|
self.project_id = project |
|
|
|
|
|
|
|
|
if hasattr(creds, "with_scopes") and not creds.scopes: |
|
|
creds = creds.with_scopes(self.scopes) |
|
|
|
|
|
return creds |
|
|
else: |
|
|
|
|
|
|
|
|
|
|
|
creds, _ = load_credentials_from_file(self.credentials_path, scopes=self.scopes) |
|
|
return creds |
|
|
|
|
|
|
|
|
class GCloudWrapper: |
|
|
""" |
|
|
Wrapper class for managing multiple Google Cloud accounts. |
|
|
Provides unified access to Storage and Sheets clients. |
|
|
""" |
|
|
|
|
|
def __init__(self, accounts: Optional[Dict[str, GCloudAccount]] = None): |
|
|
""" |
|
|
Initialize the wrapper with optional accounts. |
|
|
|
|
|
Args: |
|
|
accounts: Dictionary mapping account IDs to GCloudAccount objects. |
|
|
""" |
|
|
self._accounts: Dict[str, GCloudAccount] = accounts or {} |
|
|
self._credentials_cache: Dict[str, BaseCredentials] = {} |
|
|
self._storage_clients: Dict[str, storage.Client] = {} |
|
|
self._sheets_clients: Dict[str, gspread.Client] = {} |
|
|
|
|
|
|
|
|
|
|
|
def add_account( |
|
|
self, |
|
|
account_id: str, |
|
|
name: str, |
|
|
credentials_path: Optional[str] = None, |
|
|
use_adc: bool = False, |
|
|
project_id: Optional[str] = None, |
|
|
) -> None: |
|
|
""" |
|
|
Add a new account to the wrapper. |
|
|
|
|
|
Args: |
|
|
account_id: Unique identifier for this account |
|
|
name: Human-readable name for the account |
|
|
credentials_path: Path to the JSON credentials file |
|
|
use_adc: If True, use ADC instead of explicit credentials |
|
|
project_id: Optional project ID override |
|
|
""" |
|
|
self._accounts[account_id] = GCloudAccount( |
|
|
name=name, |
|
|
credentials_path=credentials_path, |
|
|
use_adc=use_adc, |
|
|
project_id=project_id, |
|
|
) |
|
|
|
|
|
self._clear_account_cache(account_id) |
|
|
|
|
|
def add_adc_account( |
|
|
self, |
|
|
account_id: str, |
|
|
name: str, |
|
|
project_id: Optional[str] = None, |
|
|
) -> None: |
|
|
""" |
|
|
Convenience method to add an ADC-based account. |
|
|
|
|
|
Args: |
|
|
account_id: Unique identifier for this account |
|
|
name: Human-readable name for the account |
|
|
project_id: Optional project ID override |
|
|
""" |
|
|
self.add_account( |
|
|
account_id=account_id, |
|
|
name=name, |
|
|
use_adc=True, |
|
|
project_id=project_id, |
|
|
) |
|
|
|
|
|
def remove_account(self, account_id: str) -> None: |
|
|
"""Remove an account from the wrapper.""" |
|
|
if account_id in self._accounts: |
|
|
del self._accounts[account_id] |
|
|
self._clear_account_cache(account_id) |
|
|
|
|
|
def list_accounts(self) -> Dict[str, str]: |
|
|
"""Return a dictionary of account IDs to account names.""" |
|
|
return {aid: acc.name for aid, acc in self._accounts.items()} |
|
|
|
|
|
def _clear_account_cache(self, account_id: str) -> None: |
|
|
"""Clear all cached clients for an account.""" |
|
|
self._credentials_cache.pop(account_id, None) |
|
|
self._storage_clients.pop(account_id, None) |
|
|
self._sheets_clients.pop(account_id, None) |
|
|
|
|
|
def _get_account(self, account_id: str) -> GCloudAccount: |
|
|
"""Get account by ID with validation.""" |
|
|
if account_id not in self._accounts: |
|
|
raise KeyError( |
|
|
f"Account '{account_id}' not found. Available: {list(self._accounts.keys())}" |
|
|
) |
|
|
return self._accounts[account_id] |
|
|
|
|
|
def _get_credentials(self, account_id: str) -> BaseCredentials: |
|
|
"""Get (cached) credentials for an account.""" |
|
|
if account_id not in self._credentials_cache: |
|
|
account = self._get_account(account_id) |
|
|
self._credentials_cache[account_id] = account.get_credentials() |
|
|
return self._credentials_cache[account_id] |
|
|
|
|
|
|
|
|
|
|
|
def get_storage_client(self, account_id: str, force_new: bool = False) -> storage.Client: |
|
|
""" |
|
|
Get a Cloud Storage client for the specified account. |
|
|
|
|
|
Args: |
|
|
account_id: The account ID to get the client for |
|
|
force_new: If True, create a new client even if cached |
|
|
|
|
|
Returns: |
|
|
A google.cloud.storage.Client configured for the account |
|
|
""" |
|
|
if force_new or account_id not in self._storage_clients: |
|
|
account = self._get_account(account_id) |
|
|
creds = self._get_credentials(account_id) |
|
|
|
|
|
self._storage_clients[account_id] = storage.Client( |
|
|
credentials=creds, |
|
|
project=account.project_id, |
|
|
) |
|
|
|
|
|
return self._storage_clients[account_id] |
|
|
|
|
|
|
|
|
|
|
|
def get_sheets_client(self, account_id: str, force_new: bool = False) -> gspread.Client: |
|
|
""" |
|
|
Get a gspread client for Google Sheets for the specified account. |
|
|
|
|
|
Args: |
|
|
account_id: The account ID to get the client for |
|
|
force_new: If True, create a new client even if cached |
|
|
|
|
|
Returns: |
|
|
A gspread.Client configured for the account |
|
|
""" |
|
|
if force_new or account_id not in self._sheets_clients: |
|
|
creds = self._get_credentials(account_id) |
|
|
self._sheets_clients[account_id] = gspread.authorize(creds) |
|
|
|
|
|
return self._sheets_clients[account_id] |
|
|
|
|
|
|
|
|
|
|
|
@contextmanager |
|
|
def use_account(self, account_id: str): |
|
|
""" |
|
|
Context manager that yields both storage and sheets clients. |
|
|
|
|
|
Usage: |
|
|
with gcloud.use_account('account1') as (storage_client, sheets_client): |
|
|
bucket = storage_client.bucket('my-bucket') |
|
|
sheet = sheets_client.open('My Sheet') |
|
|
""" |
|
|
storage_client = self.get_storage_client(account_id) |
|
|
sheets_client = self.get_sheets_client(account_id) |
|
|
yield storage_client, sheets_client |
|
|
|
|
|
|
|
|
|
|
|
def get_bucket(self, account_id: str, bucket_name: str) -> storage.Bucket: |
|
|
"""Get a bucket object for the specified account and bucket name.""" |
|
|
client = self.get_storage_client(account_id) |
|
|
return client.bucket(bucket_name) |
|
|
|
|
|
def get_blob(self, account_id: str, bucket_name: str, blob_path: str) -> storage.Blob: |
|
|
"""Get a blob object for the specified account, bucket, and path.""" |
|
|
bucket = self.get_bucket(account_id, bucket_name) |
|
|
return bucket.blob(blob_path) |
|
|
|
|
|
def download_blob( |
|
|
self, |
|
|
account_id: str, |
|
|
bucket_name: str, |
|
|
blob_path: str, |
|
|
destination_path: str, |
|
|
) -> str: |
|
|
"""Download a blob to a local file. Returns the destination path.""" |
|
|
blob = self.get_blob(account_id, bucket_name, blob_path) |
|
|
blob.download_to_filename(destination_path) |
|
|
return destination_path |
|
|
|
|
|
def upload_blob( |
|
|
self, |
|
|
account_id: str, |
|
|
bucket_name: str, |
|
|
blob_path: str, |
|
|
source_path: str, |
|
|
content_type: Optional[str] = None, |
|
|
) -> storage.Blob: |
|
|
"""Upload a local file to a blob. Returns the Blob object.""" |
|
|
blob = self.get_blob(account_id, bucket_name, blob_path) |
|
|
blob.upload_from_filename(source_path, content_type=content_type) |
|
|
return blob |
|
|
|
|
|
def list_blobs( |
|
|
self, |
|
|
account_id: str, |
|
|
bucket_name: str, |
|
|
prefix: Optional[str] = None, |
|
|
) -> list: |
|
|
"""List blobs in a bucket.""" |
|
|
client = self.get_storage_client(account_id) |
|
|
bucket = client.bucket(bucket_name) |
|
|
return list(bucket.list_blobs(prefix=prefix)) |
|
|
|
|
|
|
|
|
|
|
|
def open_spreadsheet( |
|
|
self, |
|
|
account_id: str, |
|
|
sheet_name: Optional[str] = None, |
|
|
sheet_id: Optional[str] = None, |
|
|
) -> gspread.Spreadsheet: |
|
|
""" |
|
|
Open a spreadsheet by name or ID. |
|
|
|
|
|
Args: |
|
|
account_id: The account ID |
|
|
sheet_name: Name of the spreadsheet (if opening by name) |
|
|
sheet_id: ID of the spreadsheet (if opening by ID, preferred) |
|
|
|
|
|
Returns: |
|
|
A gspread.Spreadsheet object |
|
|
""" |
|
|
if not sheet_name and not sheet_id: |
|
|
raise ValueError("Must provide either sheet_name or sheet_id") |
|
|
|
|
|
client = self.get_sheets_client(account_id) |
|
|
|
|
|
if sheet_id: |
|
|
return client.open_by_key(sheet_id) |
|
|
return client.open(sheet_name) |
|
|
|
|
|
def open_worksheet( |
|
|
self, |
|
|
account_id: str, |
|
|
worksheet_name: str, |
|
|
sheet_name: Optional[str] = None, |
|
|
sheet_id: Optional[str] = None, |
|
|
) -> gspread.Worksheet: |
|
|
""" |
|
|
Open a specific worksheet from a spreadsheet. |
|
|
|
|
|
Args: |
|
|
account_id: The account ID |
|
|
worksheet_name: Name of the worksheet tab |
|
|
sheet_name: Name of the spreadsheet |
|
|
sheet_id: ID of the spreadsheet (preferred) |
|
|
|
|
|
Returns: |
|
|
A gspread.Worksheet object |
|
|
""" |
|
|
spreadsheet = self.open_spreadsheet(account_id, sheet_name, sheet_id) |
|
|
return spreadsheet.worksheet(worksheet_name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_adc_wrapper(account_id: str = "default", name: str = "Default ADC") -> GCloudWrapper: |
|
|
""" |
|
|
Create a wrapper with a single ADC-based account. |
|
|
Works with local gcloud auth AND Workload Identity Federation. |
|
|
|
|
|
Usage: |
|
|
gcloud = create_adc_wrapper() |
|
|
storage = gcloud.get_storage_client('default') |
|
|
sheets = gcloud.get_sheets_client('default') |
|
|
""" |
|
|
return GCloudWrapper({ |
|
|
account_id: GCloudAccount(name=name, use_adc=True) |
|
|
}) |
|
|
|
|
|
|
|
|
def create_wrapper_from_env( |
|
|
env_prefix: str = "GCLOUD_ACCOUNT_", |
|
|
default_to_adc: bool = True, |
|
|
) -> GCloudWrapper: |
|
|
""" |
|
|
Create a GCloudWrapper from environment variables. |
|
|
|
|
|
Looks for environment variables like: |
|
|
GCLOUD_ACCOUNT_1_NAME=Account 1 |
|
|
GCLOUD_ACCOUNT_1_CREDENTIALS=./gcloud-key1.json |
|
|
GCLOUD_ACCOUNT_2_NAME=Account 2 |
|
|
GCLOUD_ACCOUNT_2_CREDENTIALS=./gcloud-key2.json |
|
|
|
|
|
Or for ADC-based accounts: |
|
|
GCLOUD_ACCOUNT_DEFAULT_NAME=Default |
|
|
GCLOUD_ACCOUNT_DEFAULT_ADC=true |
|
|
|
|
|
Args: |
|
|
env_prefix: Prefix for environment variables |
|
|
default_to_adc: If True and no accounts found, create a default ADC account |
|
|
|
|
|
Returns: |
|
|
Configured GCloudWrapper instance |
|
|
""" |
|
|
wrapper = GCloudWrapper() |
|
|
|
|
|
|
|
|
account_ids = set() |
|
|
for key in os.environ: |
|
|
if key.startswith(env_prefix): |
|
|
|
|
|
parts = key[len(env_prefix):].split("_") |
|
|
if parts: |
|
|
account_ids.add(parts[0]) |
|
|
|
|
|
|
|
|
for aid in sorted(account_ids): |
|
|
name_key = f"{env_prefix}{aid}_NAME" |
|
|
creds_key = f"{env_prefix}{aid}_CREDENTIALS" |
|
|
adc_key = f"{env_prefix}{aid}_ADC" |
|
|
project_key = f"{env_prefix}{aid}_PROJECT" |
|
|
|
|
|
name = os.environ.get(name_key, f"Account {aid}") |
|
|
creds = os.environ.get(creds_key) |
|
|
use_adc = os.environ.get(adc_key, "").lower() in ("true", "1", "yes") |
|
|
project = os.environ.get(project_key) |
|
|
|
|
|
try: |
|
|
if use_adc: |
|
|
wrapper.add_adc_account(aid.lower(), name, project_id=project) |
|
|
elif creds: |
|
|
wrapper.add_account(aid.lower(), name, credentials_path=creds, project_id=project) |
|
|
except (FileNotFoundError, ValueError) as e: |
|
|
logger.warning(f"Could not add account '{aid}': {e}") |
|
|
|
|
|
|
|
|
if not wrapper._accounts and default_to_adc: |
|
|
wrapper.add_adc_account("default", "Default ADC") |
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
def create_default_wrapper() -> GCloudWrapper: |
|
|
""" |
|
|
Create default wrapper with two accounts for Elvoro: |
|
|
- final_data: Production data (from GCLOUD_FINAL_DATA_CREDENTIALS or ADC) |
|
|
- test_data: Test data (from GCLOUD_TEST_DATA_CREDENTIALS or ADC) |
|
|
""" |
|
|
accounts = {} |
|
|
|
|
|
|
|
|
final_creds = get_config_value("GCLOUD_FINAL_DATA_CREDENTIALS") |
|
|
final_project = os.environ.get("GCP_PROJECT_FINAL") |
|
|
|
|
|
if final_creds: |
|
|
accounts["final_data"] = GCloudAccount("Final Data", credentials_path=final_creds, project_id=final_project) |
|
|
else: |
|
|
accounts["final_data"] = GCloudAccount("Final Data (ADC)", use_adc=True, project_id=final_project) |
|
|
|
|
|
|
|
|
test_creds = get_config_value("GCLOUD_TEST_DATA_CREDENTIALS") |
|
|
test_project = os.environ.get("GCP_PROJECT_TEST") |
|
|
|
|
|
if test_creds: |
|
|
accounts["test_data"] = GCloudAccount("Test Data", credentials_path=test_creds, project_id=test_project) |
|
|
else: |
|
|
accounts["test_data"] = GCloudAccount("Test Data (ADC)", use_adc=True, project_id=test_project) |
|
|
|
|
|
|
|
|
accounts["default"] = accounts["final_data"] |
|
|
|
|
|
return GCloudWrapper(accounts) |
|
|
|
|
|
|
|
|
|
|
|
_default_wrapper_instance = None |
|
|
|
|
|
def get_default_wrapper() -> GCloudWrapper: |
|
|
""" |
|
|
Get the cached default wrapper (singleton). |
|
|
Authentication happens only once, subsequent calls return the cached instance. |
|
|
""" |
|
|
global _default_wrapper_instance |
|
|
if _default_wrapper_instance is None: |
|
|
_default_wrapper_instance = create_default_wrapper() |
|
|
return _default_wrapper_instance |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
print("GCloud Wrapper - Example Usage") |
|
|
print("-" * 40) |
|
|
|
|
|
|
|
|
gcloud = create_adc_wrapper() |
|
|
|
|
|
print("Available accounts:", gcloud.list_accounts()) |
|
|
|
|
|
|
|
|
try: |
|
|
storage_client = gcloud.get_storage_client("default") |
|
|
print(f"Storage client project: {storage_client.project}") |
|
|
|
|
|
sheets_client = gcloud.get_sheets_client("default") |
|
|
print("Sheets client ready!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error: {e}") |
|
|
print("\nTo use ADC locally, run:") |
|
|
print(" gcloud auth application-default login") |
|
|
|