#!/usr/bin/env python3 """ Google OAuth Token Refresh Script for VoiceCal.ai This script automatically refreshes Google OAuth access tokens and updates: 1. Local .env file 2. HuggingFace Spaces secrets 3. Credentials JSON file (if exists) Designed for cron job execution with logging and email notifications. Usage: python3 scripts/refresh_token_only.py [--notify-email EMAIL] [--verbose] """ import requests import json import os import sys import logging import argparse from datetime import datetime, timedelta, timezone from pathlib import Path def setup_logging(verbose=False): """Setup logging configuration.""" # Create logs directory if it doesn't exist log_dir = Path("logs") log_dir.mkdir(exist_ok=True) # Create timestamped log file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") log_file = log_dir / f"token_refresh_{timestamp}.log" # Configure logging log_format = "%(asctime)s | %(levelname)s | %(message)s" log_level = logging.DEBUG if verbose else logging.INFO # Log to both file and console logging.basicConfig( level=log_level, format=log_format, handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) return log_file def load_credentials(): """Load OAuth credentials from environment variables.""" logging.info("🔍 Loading OAuth credentials...") # Load from .env file if it exists env_path = Path(".env") if env_path.exists(): logging.info("📝 Loading from environment variables") from dotenv import load_dotenv load_dotenv() refresh_token = os.environ.get('GOOGLE_REFRESH_TOKEN') client_id = os.environ.get('GOOGLE_CLIENT_ID') client_secret = os.environ.get('GOOGLE_CLIENT_SECRET') if not all([refresh_token, client_id, client_secret]): logging.error("❌ Missing required environment variables:") logging.error(f" GOOGLE_REFRESH_TOKEN: {'✓' if refresh_token else '✗'}") logging.error(f" GOOGLE_CLIENT_ID: {'✓' if client_id else '✗'}") logging.error(f" GOOGLE_CLIENT_SECRET: {'✓' if client_secret else '✗'}") raise ValueError("Missing required environment variables") logging.info("✅ Credentials loaded") logging.info(f" Client ID: {client_id[:20]}...") logging.info(f" Refresh Token: {refresh_token[:20]}...") return { 'refresh_token': refresh_token, 'client_id': client_id, 'client_secret': client_secret } def refresh_access_token(credentials): """Refresh Google OAuth access token.""" logging.info("🔄 Refreshing access token...") data = { 'client_id': credentials['client_id'], 'client_secret': credentials['client_secret'], 'refresh_token': credentials['refresh_token'], 'grant_type': 'refresh_token' } try: response = requests.post('https://oauth2.googleapis.com/token', data=data, timeout=30) if response.status_code == 200: tokens = response.json() access_token = tokens['access_token'] expires_in = tokens.get('expires_in', 3600) logging.info("✅ Access token refreshed successfully!") logging.info(f" New token: {access_token[:20]}...") logging.info(f" Expires in: {expires_in} seconds ({expires_in/3600:.1f} hours)") return { 'access_token': access_token, 'expires_in': expires_in } else: error_msg = f"Token has been expired or revoked." if response.status_code == 400 else response.text logging.error(f"❌ Failed to refresh token: {error_msg}") logging.error(f" Response: {response.text if response.text else 'No response'}") raise Exception(f"Failed to refresh token: {error_msg}") except requests.exceptions.RequestException as e: logging.error(f"❌ Network error during token refresh: {e}") raise except Exception as e: logging.error(f"❌ Token refresh failed: {e}") raise def update_env_file(access_token, expiry_seconds): """Update .env file with new access token.""" logging.info("💾 Updating .env file...") env_path = Path(".env") if not env_path.exists(): logging.warning("⚠️ .env file not found") return False # Calculate expiry time using timezone-aware datetime expiry_time = datetime.now(timezone.utc) + timedelta(seconds=expiry_seconds) # Read current .env file with open(env_path, 'r') as f: lines = f.readlines() # Update token lines updated_lines = [] found_access_token = False found_token_expiry = False for line in lines: if line.startswith('GOOGLE_ACCESS_TOKEN='): updated_lines.append(f'GOOGLE_ACCESS_TOKEN={access_token}\n') found_access_token = True elif line.startswith('GOOGLE_TOKEN_EXPIRY='): updated_lines.append(f'GOOGLE_TOKEN_EXPIRY={expiry_time.isoformat()}\n') found_token_expiry = True else: updated_lines.append(line) # Add lines if they don't exist if not found_access_token: updated_lines.append(f'GOOGLE_ACCESS_TOKEN={access_token}\n') if not found_token_expiry: updated_lines.append(f'GOOGLE_TOKEN_EXPIRY={expiry_time.isoformat()}\n') # Write back to .env file with open(env_path, 'w') as f: f.writelines(updated_lines) logging.info("✅ .env file updated") return True def update_credentials_file(access_token, expiry_seconds, credentials): """Update Google Calendar credentials JSON file.""" logging.info("📝 Updating credentials file...") creds_path = Path("app/credentials/google_calendar_credentials.json") if not creds_path.exists(): logging.warning(f"⚠️ Credentials file not found: {creds_path}") return False # Calculate expiry time expiry_time = datetime.now(timezone.utc) + timedelta(seconds=expiry_seconds) # Load existing credentials with open(creds_path, 'r') as f: creds_data = json.load(f) # Update token and expiry creds_data['token'] = access_token creds_data['expiry'] = expiry_time.isoformat() # Write back to file with open(creds_path, 'w') as f: json.dump(creds_data, f, indent=2) logging.info("✅ Credentials file updated") return True def update_huggingface_secrets(access_token, expiry_seconds): """Update HuggingFace Spaces secrets with new OAuth tokens.""" logging.info("☁️ Updating HuggingFace Spaces secrets...") try: from huggingface_hub import HfApi # Get HF token from environment hf_token = os.environ.get("HF_TOKEN") if not hf_token: logging.warning("⚠️ No HF_TOKEN found - skipping HuggingFace Secrets update") return False # Initialize HF API api = HfApi(token=hf_token) # Your space repository ID repo_id = "pgits/voiceCal-ai-v3" # Calculate expiry time expiry_time = datetime.now(timezone.utc) + timedelta(seconds=expiry_seconds) # Update access token secret api.add_space_secret( repo_id=repo_id, key="GOOGLE_ACCESS_TOKEN", value=access_token ) logging.info("✅ Updated GOOGLE_ACCESS_TOKEN in HF Secrets") # Update expiry as ISO string api.add_space_secret( repo_id=repo_id, key="GOOGLE_TOKEN_EXPIRY", value=expiry_time.isoformat() ) logging.info("✅ Updated GOOGLE_TOKEN_EXPIRY in HF Secrets") logging.info("🎉 HuggingFace Secrets updated successfully!") return True except ImportError: logging.warning("⚠️ huggingface_hub not installed - skipping HF Secrets update") return False except Exception as e: logging.error(f"❌ Failed to update HuggingFace Secrets: {e}") return False def send_email_notification(recipient_email, subject, body): """Send email notification using SMTP.""" try: import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart smtp_host = os.environ.get("SMTP_HOST", "smtp.gmail.com") smtp_port = int(os.environ.get("SMTP_PORT", "587")) smtp_username = os.environ.get("SMTP_USERNAME") smtp_password = os.environ.get("SMTP_PASSWORD") if not all([smtp_username, smtp_password]): logging.warning("⚠️ SMTP credentials not configured - skipping email notification") return False msg = MIMEMultipart() msg['From'] = smtp_username msg['To'] = recipient_email msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) with smtplib.SMTP(smtp_host, smtp_port) as server: server.starttls() server.login(smtp_username, smtp_password) server.send_message(msg) logging.info(f"✅ Email notification sent to {recipient_email}") return True except Exception as e: logging.error(f"❌ Failed to send email notification: {e}") return False def main(): """Main execution function.""" parser = argparse.ArgumentParser(description='Refresh Google OAuth tokens') parser.add_argument('--notify-email', help='Email address for failure notifications') parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') args = parser.parse_args() # Setup logging log_file = setup_logging(args.verbose) logging.info("=" * 80) logging.info("VoiceCal.ai OAuth Token Refresh Script") logging.info(f"Started at: {datetime.now(timezone.utc).isoformat()}") logging.info(f"Log file: {log_file}") logging.info("=" * 80) success = False error_message = None try: # Load credentials credentials = load_credentials() # Refresh access token tokens = refresh_access_token(credentials) # Update .env file update_env_file(tokens['access_token'], tokens['expires_in']) # Update credentials file (optional) update_credentials_file(tokens['access_token'], tokens['expires_in'], credentials) # Update HuggingFace Spaces secrets update_huggingface_secrets(tokens['access_token'], tokens['expires_in']) success = True except Exception as e: error_message = str(e) logging.error(f"❌ Token refresh failed: {error_message}") # Send failure notification if email provided if args.notify_email: logging.info(f"📧 Sending failure notification to {args.notify_email}...") subject = "⚠️ VoiceCal.ai OAuth Token Refresh Failed" body = f""" VoiceCal.ai OAuth token refresh failed. Error: {error_message} Timestamp: {datetime.now(timezone.utc).isoformat()} Log file: {log_file} Please re-authenticate by visiting: http://localhost:8080/auth/login -- VoiceCal.ai Automated Token Refresh """.strip() send_email_notification(args.notify_email, subject, body) # Log completion status logging.info("=" * 80) logging.info(f"Completed at: {datetime.now(timezone.utc).isoformat()}") logging.info(f"Status: {'✅ SUCCESS' if success else '❌ FAILED'}") logging.info("=" * 80) # Exit with appropriate code sys.exit(0 if success else 1) if __name__ == "__main__": main()