voiceCal / oauth_persistence.py
Peter Michael Gits
feat: Deploy complete VoiceCal application with all files v0.5.6
5e8a657
"""
OAuth Token Persistence for Hugging Face Spaces
Stores refresh tokens in Google Cloud Secret Manager programmatically
"""
import logging
from typing import Optional, Dict, Any
import json
import os
try:
from google.cloud import secretmanager
from google.oauth2 import service_account
except ImportError:
secretmanager = None
service_account = None
logger = logging.getLogger(__name__)
class OAuthTokenManager:
"""Manages OAuth tokens with Secret Manager persistence"""
def __init__(self):
self.project_id = os.getenv('GOOGLE_CLOUD_PROJECT_ID', 'chatcal-voice')
self.secret_name = "oauth-refresh-tokens"
self.client = None
# Initialize Secret Manager client
self._init_secret_manager()
def _init_secret_manager(self):
"""Initialize Google Cloud Secret Manager client"""
try:
if secretmanager is None:
logger.warning("google-cloud-secret-manager not available")
return
# Try to initialize with default credentials or service account
self.client = secretmanager.SecretManagerServiceClient()
logger.info("βœ… Secret Manager client initialized")
except Exception as e:
logger.warning(f"❌ Failed to initialize Secret Manager: {e}")
async def store_refresh_token(self, user_email: str, refresh_token: str) -> bool:
"""Store refresh token in Secret Manager"""
if not self.client:
logger.warning("Secret Manager not available, using fallback storage")
return self._store_fallback(user_email, refresh_token)
try:
# Get existing tokens
existing_tokens = await self.get_all_tokens()
# Update with new token
existing_tokens[user_email] = {
"refresh_token": refresh_token,
"stored_at": self._get_timestamp()
}
# Store back to Secret Manager
secret_value = json.dumps(existing_tokens)
parent = f"projects/{self.project_id}"
secret_id = self.secret_name
# Create secret if it doesn't exist
try:
self.client.create_secret(
request={
"parent": parent,
"secret_id": secret_id,
"secret": {"replication": {"automatic": {}}},
}
)
logger.info(f"Created new secret: {secret_id}")
except Exception:
# Secret already exists
pass
# Add new version
self.client.add_secret_version(
request={
"parent": f"{parent}/secrets/{secret_id}",
"payload": {"data": secret_value.encode("UTF-8")},
}
)
logger.info(f"βœ… Stored refresh token for {user_email}")
return True
except Exception as e:
logger.error(f"❌ Failed to store refresh token: {e}")
return self._store_fallback(user_email, refresh_token)
async def get_refresh_token(self, user_email: str) -> Optional[str]:
"""Retrieve refresh token from Secret Manager"""
if not self.client:
return self._get_fallback(user_email)
try:
secret_path = f"projects/{self.project_id}/secrets/{self.secret_name}/versions/latest"
response = self.client.access_secret_version(request={"name": secret_path})
secret_value = response.payload.data.decode("UTF-8")
tokens = json.loads(secret_value)
user_data = tokens.get(user_email, {})
refresh_token = user_data.get("refresh_token")
if refresh_token:
logger.info(f"βœ… Retrieved refresh token for {user_email}")
return refresh_token
else:
logger.warning(f"⚠️ No refresh token found for {user_email}")
return None
except Exception as e:
logger.error(f"❌ Failed to retrieve refresh token: {e}")
return self._get_fallback(user_email)
async def get_all_tokens(self) -> Dict[str, Any]:
"""Get all stored tokens"""
if not self.client:
return {}
try:
secret_path = f"projects/{self.project_id}/secrets/{self.secret_name}/versions/latest"
response = self.client.access_secret_version(request={"name": secret_path})
secret_value = response.payload.data.decode("UTF-8")
return json.loads(secret_value)
except Exception:
return {}
def _store_fallback(self, user_email: str, refresh_token: str) -> bool:
"""Fallback storage using environment variables (not persistent)"""
try:
# Store in environment for current session only
os.environ[f"OAUTH_TOKEN_{user_email.replace('@', '_').replace('.', '_')}"] = refresh_token
logger.warning(f"⚠️ Using fallback storage for {user_email} (not persistent)")
return True
except Exception as e:
logger.error(f"❌ Fallback storage failed: {e}")
return False
def _get_fallback(self, user_email: str) -> Optional[str]:
"""Fallback retrieval from environment variables"""
env_key = f"OAUTH_TOKEN_{user_email.replace('@', '_').replace('.', '_')}"
token = os.getenv(env_key)
if token:
logger.warning(f"⚠️ Using fallback token for {user_email}")
return token
def _get_timestamp(self) -> str:
"""Get current timestamp"""
from datetime import datetime
return datetime.utcnow().isoformat()
# Global instance
oauth_manager = OAuthTokenManager()
# Usage example for integration:
async def save_oauth_token_after_auth(user_email: str, credentials):
"""Call this after successful OAuth flow"""
if hasattr(credentials, 'refresh_token') and credentials.refresh_token:
success = await oauth_manager.store_refresh_token(user_email, credentials.refresh_token)
if success:
logger.info(f"OAuth token saved for {user_email}")
else:
logger.error(f"Failed to save OAuth token for {user_email}")
async def load_oauth_token_on_startup(user_email: str):
"""Call this on app startup to restore tokens"""
refresh_token = await oauth_manager.get_refresh_token(user_email)
if refresh_token:
logger.info(f"OAuth token restored for {user_email}")
return refresh_token
else:
logger.warning(f"No stored OAuth token for {user_email}")
return None