Tools / src /google_src /gcloud_wrapper.py
jebin2's picture
refactor: Centralize logger import to src.logger_config across various modules.
f20025d
"""
Google Cloud wrapper for switching between multiple authenticated accounts.
Supports both Cloud Storage and Google Sheets via gspread.
Credential Sources (in order of priority):
1. Explicit service account JSON file
2. ADC (Application Default Credentials) - local or WIF
Usage:
from google_src.gcloud_wrapper import GCloudWrapper, GCloudAccount
# Initialize with explicit credentials
gcloud = GCloudWrapper({
'account1': GCloudAccount('Account 1', credentials_path='./gcloud-key1.json'),
'account2': GCloudAccount('Account 2', credentials_path='./gcloud-key2.json'),
})
# Or use ADC (works with local gcloud auth AND Workload Identity Federation)
gcloud = GCloudWrapper({
'default': GCloudAccount('Default ADC', use_adc=True),
})
# Get a storage client
storage_client = gcloud.get_storage_client('account1')
bucket = storage_client.bucket('my-bucket')
# Get a gspread client for Sheets
sheets_client = gcloud.get_sheets_client('account1')
spreadsheet = sheets_client.open('My Sheet')
# Convenience: get blob directly
blob = gcloud.get_blob('account1', 'bucket-name', 'path/to/file.txt')
"""
import os
from dataclasses import dataclass, field
from typing import Dict, Optional, List
from contextlib import contextmanager
from google.auth import default as google_auth_default
from google.auth import load_credentials_from_file
from google.auth.credentials import Credentials as BaseCredentials
from google.oauth2 import service_account
from google.cloud import storage
import gspread
from src.config import get_config_value
from src.logger_config import logger
# Default scopes for Google APIs
DEFAULT_SCOPES = [
"https://www.googleapis.com/auth/cloud-platform", # Storage
"https://www.googleapis.com/auth/spreadsheets", # Sheets
"https://www.googleapis.com/auth/drive.file", # Drive (for Sheets)
]
@dataclass
class GCloudAccount:
"""
Represents a Google Cloud account configuration.
Args:
name: Human-readable name for the account
credentials_path: Path to service account JSON file (optional if use_adc=True)
use_adc: If True, use Application Default Credentials (ADC/WIF)
project_id: Optional project ID override
scopes: Optional custom scopes (defaults to storage + sheets + drive)
"""
name: str
credentials_path: Optional[str] = None
use_adc: bool = False
project_id: Optional[str] = None
scopes: List[str] = field(default_factory=lambda: DEFAULT_SCOPES.copy())
def __post_init__(self):
# Validate: must have either credentials_path or use_adc
if not self.credentials_path and not self.use_adc:
raise ValueError(
f"Account '{self.name}': Must provide either credentials_path or set use_adc=True"
)
# Convert relative path to absolute if needed
if self.credentials_path:
if not os.path.isabs(self.credentials_path):
self.credentials_path = os.path.abspath(self.credentials_path)
# Validate that credentials file exists
if not os.path.exists(self.credentials_path):
raise FileNotFoundError(
f"Credentials file not found for account '{self.name}': {self.credentials_path}"
)
def get_credentials(self) -> BaseCredentials:
"""Get Google credentials for this account."""
if self.use_adc:
# Use Application Default Credentials (works with local ADC and WIF)
creds, project = google_auth_default(scopes=self.scopes)
# Capture project from ADC if not explicitly set
if not self.project_id and project:
self.project_id = project
# Re-scope if needed (for user credentials from gcloud auth)
if hasattr(creds, "with_scopes") and not creds.scopes:
creds = creds.with_scopes(self.scopes)
return creds
else:
# Use load_credentials_from_file which supports both:
# 1. Standard Service Account JSON
# 2. External Account Credentials (WIF configuration)
creds, _ = load_credentials_from_file(self.credentials_path, scopes=self.scopes)
return creds
class GCloudWrapper:
"""
Wrapper class for managing multiple Google Cloud accounts.
Provides unified access to Storage and Sheets clients.
"""
def __init__(self, accounts: Optional[Dict[str, GCloudAccount]] = None):
"""
Initialize the wrapper with optional accounts.
Args:
accounts: Dictionary mapping account IDs to GCloudAccount objects.
"""
self._accounts: Dict[str, GCloudAccount] = accounts or {}
self._credentials_cache: Dict[str, BaseCredentials] = {}
self._storage_clients: Dict[str, storage.Client] = {}
self._sheets_clients: Dict[str, gspread.Client] = {}
# ------------------ Account Management ------------------
def add_account(
self,
account_id: str,
name: str,
credentials_path: Optional[str] = None,
use_adc: bool = False,
project_id: Optional[str] = None,
) -> None:
"""
Add a new account to the wrapper.
Args:
account_id: Unique identifier for this account
name: Human-readable name for the account
credentials_path: Path to the JSON credentials file
use_adc: If True, use ADC instead of explicit credentials
project_id: Optional project ID override
"""
self._accounts[account_id] = GCloudAccount(
name=name,
credentials_path=credentials_path,
use_adc=use_adc,
project_id=project_id,
)
# Clear cached clients for this account
self._clear_account_cache(account_id)
def add_adc_account(
self,
account_id: str,
name: str,
project_id: Optional[str] = None,
) -> None:
"""
Convenience method to add an ADC-based account.
Args:
account_id: Unique identifier for this account
name: Human-readable name for the account
project_id: Optional project ID override
"""
self.add_account(
account_id=account_id,
name=name,
use_adc=True,
project_id=project_id,
)
def remove_account(self, account_id: str) -> None:
"""Remove an account from the wrapper."""
if account_id in self._accounts:
del self._accounts[account_id]
self._clear_account_cache(account_id)
def list_accounts(self) -> Dict[str, str]:
"""Return a dictionary of account IDs to account names."""
return {aid: acc.name for aid, acc in self._accounts.items()}
def _clear_account_cache(self, account_id: str) -> None:
"""Clear all cached clients for an account."""
self._credentials_cache.pop(account_id, None)
self._storage_clients.pop(account_id, None)
self._sheets_clients.pop(account_id, None)
def _get_account(self, account_id: str) -> GCloudAccount:
"""Get account by ID with validation."""
if account_id not in self._accounts:
raise KeyError(
f"Account '{account_id}' not found. Available: {list(self._accounts.keys())}"
)
return self._accounts[account_id]
def _get_credentials(self, account_id: str) -> BaseCredentials:
"""Get (cached) credentials for an account."""
if account_id not in self._credentials_cache:
account = self._get_account(account_id)
self._credentials_cache[account_id] = account.get_credentials()
return self._credentials_cache[account_id]
# ------------------ Storage Client ------------------
def get_storage_client(self, account_id: str, force_new: bool = False) -> storage.Client:
"""
Get a Cloud Storage client for the specified account.
Args:
account_id: The account ID to get the client for
force_new: If True, create a new client even if cached
Returns:
A google.cloud.storage.Client configured for the account
"""
if force_new or account_id not in self._storage_clients:
account = self._get_account(account_id)
creds = self._get_credentials(account_id)
self._storage_clients[account_id] = storage.Client(
credentials=creds,
project=account.project_id,
)
return self._storage_clients[account_id]
# ------------------ Sheets Client ------------------
def get_sheets_client(self, account_id: str, force_new: bool = False) -> gspread.Client:
"""
Get a gspread client for Google Sheets for the specified account.
Args:
account_id: The account ID to get the client for
force_new: If True, create a new client even if cached
Returns:
A gspread.Client configured for the account
"""
if force_new or account_id not in self._sheets_clients:
creds = self._get_credentials(account_id)
self._sheets_clients[account_id] = gspread.authorize(creds)
return self._sheets_clients[account_id]
# ------------------ Context Manager ------------------
@contextmanager
def use_account(self, account_id: str):
"""
Context manager that yields both storage and sheets clients.
Usage:
with gcloud.use_account('account1') as (storage_client, sheets_client):
bucket = storage_client.bucket('my-bucket')
sheet = sheets_client.open('My Sheet')
"""
storage_client = self.get_storage_client(account_id)
sheets_client = self.get_sheets_client(account_id)
yield storage_client, sheets_client
# ------------------ Storage Convenience Methods ------------------
def get_bucket(self, account_id: str, bucket_name: str) -> storage.Bucket:
"""Get a bucket object for the specified account and bucket name."""
client = self.get_storage_client(account_id)
return client.bucket(bucket_name)
def get_blob(self, account_id: str, bucket_name: str, blob_path: str) -> storage.Blob:
"""Get a blob object for the specified account, bucket, and path."""
bucket = self.get_bucket(account_id, bucket_name)
return bucket.blob(blob_path)
def download_blob(
self,
account_id: str,
bucket_name: str,
blob_path: str,
destination_path: str,
) -> str:
"""Download a blob to a local file. Returns the destination path."""
blob = self.get_blob(account_id, bucket_name, blob_path)
blob.download_to_filename(destination_path)
return destination_path
def upload_blob(
self,
account_id: str,
bucket_name: str,
blob_path: str,
source_path: str,
content_type: Optional[str] = None,
) -> storage.Blob:
"""Upload a local file to a blob. Returns the Blob object."""
blob = self.get_blob(account_id, bucket_name, blob_path)
blob.upload_from_filename(source_path, content_type=content_type)
return blob
def list_blobs(
self,
account_id: str,
bucket_name: str,
prefix: Optional[str] = None,
) -> list:
"""List blobs in a bucket."""
client = self.get_storage_client(account_id)
bucket = client.bucket(bucket_name)
return list(bucket.list_blobs(prefix=prefix))
# ------------------ Sheets Convenience Methods ------------------
def open_spreadsheet(
self,
account_id: str,
sheet_name: Optional[str] = None,
sheet_id: Optional[str] = None,
) -> gspread.Spreadsheet:
"""
Open a spreadsheet by name or ID.
Args:
account_id: The account ID
sheet_name: Name of the spreadsheet (if opening by name)
sheet_id: ID of the spreadsheet (if opening by ID, preferred)
Returns:
A gspread.Spreadsheet object
"""
if not sheet_name and not sheet_id:
raise ValueError("Must provide either sheet_name or sheet_id")
client = self.get_sheets_client(account_id)
if sheet_id:
return client.open_by_key(sheet_id)
return client.open(sheet_name)
def open_worksheet(
self,
account_id: str,
worksheet_name: str,
sheet_name: Optional[str] = None,
sheet_id: Optional[str] = None,
) -> gspread.Worksheet:
"""
Open a specific worksheet from a spreadsheet.
Args:
account_id: The account ID
worksheet_name: Name of the worksheet tab
sheet_name: Name of the spreadsheet
sheet_id: ID of the spreadsheet (preferred)
Returns:
A gspread.Worksheet object
"""
spreadsheet = self.open_spreadsheet(account_id, sheet_name, sheet_id)
return spreadsheet.worksheet(worksheet_name)
# ------------------ Factory Functions ------------------
def create_adc_wrapper(account_id: str = "default", name: str = "Default ADC") -> GCloudWrapper:
"""
Create a wrapper with a single ADC-based account.
Works with local gcloud auth AND Workload Identity Federation.
Usage:
gcloud = create_adc_wrapper()
storage = gcloud.get_storage_client('default')
sheets = gcloud.get_sheets_client('default')
"""
return GCloudWrapper({
account_id: GCloudAccount(name=name, use_adc=True)
})
def create_wrapper_from_env(
env_prefix: str = "GCLOUD_ACCOUNT_",
default_to_adc: bool = True,
) -> GCloudWrapper:
"""
Create a GCloudWrapper from environment variables.
Looks for environment variables like:
GCLOUD_ACCOUNT_1_NAME=Account 1
GCLOUD_ACCOUNT_1_CREDENTIALS=./gcloud-key1.json
GCLOUD_ACCOUNT_2_NAME=Account 2
GCLOUD_ACCOUNT_2_CREDENTIALS=./gcloud-key2.json
Or for ADC-based accounts:
GCLOUD_ACCOUNT_DEFAULT_NAME=Default
GCLOUD_ACCOUNT_DEFAULT_ADC=true
Args:
env_prefix: Prefix for environment variables
default_to_adc: If True and no accounts found, create a default ADC account
Returns:
Configured GCloudWrapper instance
"""
wrapper = GCloudWrapper()
# Find all account identifiers in environment
account_ids = set()
for key in os.environ:
if key.startswith(env_prefix):
# Extract account ID (e.g., '1' from 'GCLOUD_ACCOUNT_1_NAME')
parts = key[len(env_prefix):].split("_")
if parts:
account_ids.add(parts[0])
# Create accounts
for aid in sorted(account_ids):
name_key = f"{env_prefix}{aid}_NAME"
creds_key = f"{env_prefix}{aid}_CREDENTIALS"
adc_key = f"{env_prefix}{aid}_ADC"
project_key = f"{env_prefix}{aid}_PROJECT"
name = os.environ.get(name_key, f"Account {aid}")
creds = os.environ.get(creds_key)
use_adc = os.environ.get(adc_key, "").lower() in ("true", "1", "yes")
project = os.environ.get(project_key)
try:
if use_adc:
wrapper.add_adc_account(aid.lower(), name, project_id=project)
elif creds:
wrapper.add_account(aid.lower(), name, credentials_path=creds, project_id=project)
except (FileNotFoundError, ValueError) as e:
logger.warning(f"Could not add account '{aid}': {e}")
# Default to ADC if no accounts were found
if not wrapper._accounts and default_to_adc:
wrapper.add_adc_account("default", "Default ADC")
return wrapper
def create_default_wrapper() -> GCloudWrapper:
"""
Create default wrapper with two accounts for Elvoro:
- final_data: Production data (from GCLOUD_FINAL_DATA_CREDENTIALS or ADC)
- test_data: Test data (from GCLOUD_TEST_DATA_CREDENTIALS or ADC)
"""
accounts = {}
# Account 1: final_data
final_creds = get_config_value("GCLOUD_FINAL_DATA_CREDENTIALS")
final_project = os.environ.get("GCP_PROJECT_FINAL")
if final_creds:
accounts["final_data"] = GCloudAccount("Final Data", credentials_path=final_creds, project_id=final_project)
else:
accounts["final_data"] = GCloudAccount("Final Data (ADC)", use_adc=True, project_id=final_project)
# Account 2: test_data
test_creds = get_config_value("GCLOUD_TEST_DATA_CREDENTIALS")
test_project = os.environ.get("GCP_PROJECT_TEST")
if test_creds:
accounts["test_data"] = GCloudAccount("Test Data", credentials_path=test_creds, project_id=test_project)
else:
accounts["test_data"] = GCloudAccount("Test Data (ADC)", use_adc=True, project_id=test_project)
# Alias 'default' to 'final_data' for convenience
accounts["default"] = accounts["final_data"]
return GCloudWrapper(accounts)
# Singleton instance for reuse across modules
_default_wrapper_instance = None
def get_default_wrapper() -> GCloudWrapper:
"""
Get the cached default wrapper (singleton).
Authentication happens only once, subsequent calls return the cached instance.
"""
global _default_wrapper_instance
if _default_wrapper_instance is None:
_default_wrapper_instance = create_default_wrapper()
return _default_wrapper_instance
if __name__ == "__main__":
# Example usage
print("GCloud Wrapper - Example Usage")
print("-" * 40)
# Create wrapper with ADC (works locally and in CI with WIF)
gcloud = create_adc_wrapper()
print("Available accounts:", gcloud.list_accounts())
# Get clients
try:
storage_client = gcloud.get_storage_client("default")
print(f"Storage client project: {storage_client.project}")
sheets_client = gcloud.get_sheets_client("default")
print("Sheets client ready!")
except Exception as e:
print(f"Error: {e}")
print("\nTo use ADC locally, run:")
print(" gcloud auth application-default login")