Spaces:
Paused
Paused
| """ | |
| OAuth Token Persistence for Hugging Face Spaces | |
| Stores refresh tokens in Google Cloud Secret Manager programmatically | |
| """ | |
| import logging | |
| from typing import Optional, Dict, Any | |
| import json | |
| import os | |
| try: | |
| from google.cloud import secretmanager | |
| from google.oauth2 import service_account | |
| except ImportError: | |
| secretmanager = None | |
| service_account = None | |
| logger = logging.getLogger(__name__) | |
| class OAuthTokenManager: | |
| """Manages OAuth tokens with Secret Manager persistence""" | |
| def __init__(self): | |
| self.project_id = os.getenv('GOOGLE_CLOUD_PROJECT_ID', 'chatcal-voice') | |
| self.secret_name = "oauth-refresh-tokens" | |
| self.client = None | |
| # Initialize Secret Manager client | |
| self._init_secret_manager() | |
| def _init_secret_manager(self): | |
| """Initialize Google Cloud Secret Manager client""" | |
| try: | |
| if secretmanager is None: | |
| logger.warning("google-cloud-secret-manager not available") | |
| return | |
| # Try to initialize with default credentials or service account | |
| self.client = secretmanager.SecretManagerServiceClient() | |
| logger.info("β Secret Manager client initialized") | |
| except Exception as e: | |
| logger.warning(f"β Failed to initialize Secret Manager: {e}") | |
| async def store_refresh_token(self, user_email: str, refresh_token: str) -> bool: | |
| """Store refresh token in Secret Manager""" | |
| if not self.client: | |
| logger.warning("Secret Manager not available, using fallback storage") | |
| return self._store_fallback(user_email, refresh_token) | |
| try: | |
| # Get existing tokens | |
| existing_tokens = await self.get_all_tokens() | |
| # Update with new token | |
| existing_tokens[user_email] = { | |
| "refresh_token": refresh_token, | |
| "stored_at": self._get_timestamp() | |
| } | |
| # Store back to Secret Manager | |
| secret_value = json.dumps(existing_tokens) | |
| parent = f"projects/{self.project_id}" | |
| secret_id = self.secret_name | |
| # Create secret if it doesn't exist | |
| try: | |
| self.client.create_secret( | |
| request={ | |
| "parent": parent, | |
| "secret_id": secret_id, | |
| "secret": {"replication": {"automatic": {}}}, | |
| } | |
| ) | |
| logger.info(f"Created new secret: {secret_id}") | |
| except Exception: | |
| # Secret already exists | |
| pass | |
| # Add new version | |
| self.client.add_secret_version( | |
| request={ | |
| "parent": f"{parent}/secrets/{secret_id}", | |
| "payload": {"data": secret_value.encode("UTF-8")}, | |
| } | |
| ) | |
| logger.info(f"β Stored refresh token for {user_email}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Failed to store refresh token: {e}") | |
| return self._store_fallback(user_email, refresh_token) | |
| async def get_refresh_token(self, user_email: str) -> Optional[str]: | |
| """Retrieve refresh token from Secret Manager""" | |
| if not self.client: | |
| return self._get_fallback(user_email) | |
| try: | |
| secret_path = f"projects/{self.project_id}/secrets/{self.secret_name}/versions/latest" | |
| response = self.client.access_secret_version(request={"name": secret_path}) | |
| secret_value = response.payload.data.decode("UTF-8") | |
| tokens = json.loads(secret_value) | |
| user_data = tokens.get(user_email, {}) | |
| refresh_token = user_data.get("refresh_token") | |
| if refresh_token: | |
| logger.info(f"β Retrieved refresh token for {user_email}") | |
| return refresh_token | |
| else: | |
| logger.warning(f"β οΈ No refresh token found for {user_email}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"β Failed to retrieve refresh token: {e}") | |
| return self._get_fallback(user_email) | |
| async def get_all_tokens(self) -> Dict[str, Any]: | |
| """Get all stored tokens""" | |
| if not self.client: | |
| return {} | |
| try: | |
| secret_path = f"projects/{self.project_id}/secrets/{self.secret_name}/versions/latest" | |
| response = self.client.access_secret_version(request={"name": secret_path}) | |
| secret_value = response.payload.data.decode("UTF-8") | |
| return json.loads(secret_value) | |
| except Exception: | |
| return {} | |
| def _store_fallback(self, user_email: str, refresh_token: str) -> bool: | |
| """Fallback storage using environment variables (not persistent)""" | |
| try: | |
| # Store in environment for current session only | |
| os.environ[f"OAUTH_TOKEN_{user_email.replace('@', '_').replace('.', '_')}"] = refresh_token | |
| logger.warning(f"β οΈ Using fallback storage for {user_email} (not persistent)") | |
| return True | |
| except Exception as e: | |
| logger.error(f"β Fallback storage failed: {e}") | |
| return False | |
| def _get_fallback(self, user_email: str) -> Optional[str]: | |
| """Fallback retrieval from environment variables""" | |
| env_key = f"OAUTH_TOKEN_{user_email.replace('@', '_').replace('.', '_')}" | |
| token = os.getenv(env_key) | |
| if token: | |
| logger.warning(f"β οΈ Using fallback token for {user_email}") | |
| return token | |
| def _get_timestamp(self) -> str: | |
| """Get current timestamp""" | |
| from datetime import datetime | |
| return datetime.utcnow().isoformat() | |
| # Global instance | |
| oauth_manager = OAuthTokenManager() | |
| # Usage example for integration: | |
| async def save_oauth_token_after_auth(user_email: str, credentials): | |
| """Call this after successful OAuth flow""" | |
| if hasattr(credentials, 'refresh_token') and credentials.refresh_token: | |
| success = await oauth_manager.store_refresh_token(user_email, credentials.refresh_token) | |
| if success: | |
| logger.info(f"OAuth token saved for {user_email}") | |
| else: | |
| logger.error(f"Failed to save OAuth token for {user_email}") | |
| async def load_oauth_token_on_startup(user_email: str): | |
| """Call this on app startup to restore tokens""" | |
| refresh_token = await oauth_manager.get_refresh_token(user_email) | |
| if refresh_token: | |
| logger.info(f"OAuth token restored for {user_email}") | |
| return refresh_token | |
| else: | |
| logger.warning(f"No stored OAuth token for {user_email}") | |
| return None |