Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import pickle | |
| import base64 | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any | |
| from cryptography.fernet import Fernet | |
| import google.auth.transport.requests | |
| import google_auth_oauthlib.flow | |
| import googleapiclient.discovery | |
| from google.oauth2.credentials import Credentials | |
| from google.auth.transport.requests import Request | |
| import webbrowser | |
| import threading | |
| import time | |
| from http.server import HTTPServer, BaseHTTPRequestHandler | |
| from urllib.parse import urlparse,parse_qs | |
| from logger import logger | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| redirect_uri=os.getenv("GOOGLE_REDIRECT_URI") | |
| class OAuthCallbackHandler(BaseHTTPRequestHandler): | |
| """HTTP request handler for OAuth callback""" | |
| def do_GET(self): | |
| """Handle GET request (OAuth callback)""" | |
| # Parse the callback URL to extract authorization code | |
| parsed_path = urlparse.urlparse(self.path) | |
| query_params = urlparse.parse_qs(parsed_path.query) | |
| if 'code' in query_params: | |
| # Store the authorization code | |
| self.server.auth_code = query_params['code'][0] | |
| # Send success response | |
| self.send_response(200) | |
| self.send_header('Content-type', 'text/html') | |
| self.end_headers() | |
| success_html = """ | |
| <html> | |
| <head><title>Authentication Successful</title></head> | |
| <body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> | |
| <h1 style="color: #4CAF50;">✅ Authentication Successful!</h1> | |
| <p>You have successfully authenticated with Gmail.</p> | |
| <p>You can now close this window and return to Claude Desktop.</p> | |
| <script> | |
| setTimeout(function() { | |
| window.close(); | |
| }, 3000); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| self.wfile.write(success_html.encode()) | |
| else: | |
| # Send error response | |
| self.send_response(400) | |
| self.send_header('Content-type', 'text/html') | |
| self.end_headers() | |
| error_html = """ | |
| <html> | |
| <head><title>Authentication Error</title></head> | |
| <body style="font-family: Arial, sans-serif; text-align: center; padding: 50px;"> | |
| <h1 style="color: #f44336;">❌ Authentication Failed</h1> | |
| <p>There was an error during authentication.</p> | |
| <p>Please try again.</p> | |
| </body> | |
| </html> | |
| """ | |
| self.wfile.write(error_html.encode()) | |
| def log_message(self, format, *args): | |
| """Suppress server log messages""" | |
| pass | |
| class GmailOAuthManager: | |
| """Manages Gmail OAuth 2.0 authentication and token storage for multiple accounts""" | |
| # Gmail API scopes | |
| SCOPES = [ | |
| 'https://www.googleapis.com/auth/gmail.readonly', | |
| 'https://www.googleapis.com/auth/gmail.modify' | |
| ] | |
| def __init__(self, credentials_dir: str = None): | |
| """Initialize OAuth manager | |
| Args: | |
| credentials_dir: Directory to store credentials (defaults to ~/.mailquery_oauth) | |
| """ | |
| if credentials_dir is None: | |
| credentials_dir = os.path.expanduser("~/.mailquery_oauth") | |
| self.credentials_dir = Path(credentials_dir) | |
| self.credentials_dir.mkdir(exist_ok=True) | |
| # File paths | |
| self.client_secrets_file = self.credentials_dir / "client_secret.json" | |
| self.accounts_file = self.credentials_dir / "accounts.json" | |
| self.encryption_key_file = self.credentials_dir / "key.key" | |
| self.current_account_file = self.credentials_dir / "current_account.txt" | |
| # Initialize encryption | |
| self._init_encryption() | |
| # OAuth flow settings | |
| self.redirect_uri = redirect_uri | |
| # Current account | |
| self.current_account_email = self._load_current_account() | |
| def _init_encryption(self): | |
| """Initialize encryption for secure credential storage""" | |
| if self.encryption_key_file.exists(): | |
| with open(self.encryption_key_file, 'rb') as key_file: | |
| self.encryption_key = key_file.read() | |
| else: | |
| self.encryption_key = Fernet.generate_key() | |
| with open(self.encryption_key_file, 'wb') as key_file: | |
| key_file.write(self.encryption_key) | |
| # Make key file readable only by owner | |
| os.chmod(self.encryption_key_file, 0o600) | |
| self.cipher_suite = Fernet(self.encryption_key) | |
| def _load_current_account(self) -> Optional[str]: | |
| """Load the currently selected account""" | |
| if self.current_account_file.exists(): | |
| try: | |
| with open(self.current_account_file, 'r') as f: | |
| return f.read().strip() | |
| except Exception as e: | |
| logger.error(f"Failed to load current account: {e}") | |
| return None | |
| def _save_current_account(self, email: str): | |
| """Save the currently selected account""" | |
| try: | |
| with open(self.current_account_file, 'w') as f: | |
| f.write(email) | |
| self.current_account_email = email | |
| logger.info(f"Set current account to: {email}") | |
| except Exception as e: | |
| logger.error(f"Failed to save current account: {e}") | |
| def setup_client_secrets(self, client_id: str, client_secret: str): | |
| """Setup OAuth client secrets | |
| Args: | |
| client_id: Google OAuth 2.0 client ID | |
| client_secret: Google OAuth 2.0 client secret | |
| """ | |
| client_config = { | |
| "web": { | |
| "client_id": client_id, | |
| "client_secret": client_secret, | |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
| "token_uri": "https://oauth2.googleapis.com/token", | |
| "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | |
| "redirect_uris": [self.redirect_uri] | |
| } | |
| } | |
| with open(self.client_secrets_file, 'w') as f: | |
| json.dump(client_config, f, indent=2) | |
| logger.info("Client secrets saved successfully") | |
| def _encrypt_data(self, data: Any) -> bytes: | |
| """Encrypt data using Fernet encryption""" | |
| serialized_data = pickle.dumps(data) | |
| return self.cipher_suite.encrypt(serialized_data) | |
| def _decrypt_data(self, encrypted_data: bytes) -> Any: | |
| """Decrypt data using Fernet encryption""" | |
| decrypted_data = self.cipher_suite.decrypt(encrypted_data) | |
| return pickle.loads(decrypted_data) | |
| def get_authorization_url(self) -> str: | |
| """Get the authorization URL for OAuth flow | |
| Returns: | |
| Authorization URL that user should visit | |
| """ | |
| if not self.client_secrets_file.exists(): | |
| raise ValueError("Client secrets not found. Please run setup first.") | |
| flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( | |
| str(self.client_secrets_file), | |
| scopes=self.SCOPES | |
| ) | |
| flow.redirect_uri = self.redirect_uri | |
| print("👉 redirect_uri being sent to Google:", self.redirect_uri, flush=True) | |
| auth_url, _ = flow.authorization_url( | |
| access_type='offline', | |
| include_granted_scopes='true', | |
| prompt='consent' # Force consent to get refresh token | |
| ) | |
| return auth_url | |
| def authenticate_interactive(self) -> bool: | |
| """Interactive authentication flow for Hugging Face Spaces | |
| Returns: | |
| True if authentication successful, False otherwise | |
| """ | |
| try: | |
| # Check if already authenticated | |
| if self.is_authenticated(): | |
| logger.info("Already authenticated") | |
| return True | |
| # Get authorization URL | |
| auth_url = self.get_authorization_url() | |
| logger.info("Running on Hugging Face Spaces") | |
| logger.info(f"Authentication URL generated: {auth_url}") | |
| logger.info("User must visit the URL manually to complete authentication") | |
| # Store the auth URL for the Gradio interface to use | |
| self._pending_auth_url = auth_url | |
| self._auth_completed = False | |
| # For setup_oauth.py and testing contexts, we'll print the URL | |
| # and wait briefly to see if authentication completes | |
| print(f"\n🌐 Please visit this URL to authenticate:") | |
| print(f" {auth_url}") | |
| print("\n⏳ Waiting for authentication completion...") | |
| # Wait for a reasonable time to see if auth completes | |
| # This allows the callback to potentially complete the auth | |
| timeout = 10 # 1 minute for manual completion | |
| start_time = time.time() | |
| while (time.time() - start_time) < timeout: | |
| # Check if authentication was completed via callback | |
| if getattr(self, '_auth_completed', False): | |
| logger.info("Authentication completed successfully!") | |
| return True | |
| # Check if user is now authenticated (credentials were saved) | |
| if self.is_authenticated(): | |
| self._auth_completed = True | |
| logger.info("Authentication verified successful!") | |
| return True | |
| time.sleep(2) # Check every 2 seconds | |
| # Timeout reached - authentication not completed | |
| logger.info("Authentication timeout. Please complete authentication via the provided URL.") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Authentication failed: {e}") | |
| return False | |
| def complete_hf_spaces_auth(self, auth_code: str) -> bool: | |
| """Complete authentication for HF Spaces with received auth code | |
| Args: | |
| auth_code: Authorization code received from OAuth callback | |
| Returns: | |
| True if authentication successful, False otherwise | |
| """ | |
| try: | |
| success = self._exchange_code_for_credentials(auth_code) | |
| if success: | |
| # Mark authentication as completed | |
| self._auth_completed = True | |
| logger.info("HF Spaces authentication marked as completed") | |
| return success | |
| except Exception as e: | |
| logger.error(f"Failed to complete HF Spaces authentication: {e}") | |
| return False | |
| def _exchange_code_for_credentials(self, auth_code: str) -> bool: | |
| """Exchange authorization code for credentials | |
| Args: | |
| auth_code: Authorization code from OAuth flow | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| # Exchange authorization code for credentials | |
| flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( | |
| str(self.client_secrets_file), | |
| scopes=self.SCOPES | |
| ) | |
| flow.redirect_uri = self.redirect_uri | |
| flow.fetch_token(code=auth_code) | |
| credentials = flow.credentials | |
| # Get user email from credentials | |
| user_email = self._get_email_from_credentials(credentials) | |
| if not user_email: | |
| logger.error("Failed to get user email from credentials") | |
| return False | |
| # Save encrypted credentials for this account | |
| self._save_credentials(user_email, credentials) | |
| # Set as current account | |
| self._save_current_account(user_email) | |
| logger.info("Authentication successful!") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to exchange code for credentials: {e}") | |
| return False | |
| def get_pending_auth_url(self) -> str: | |
| """Get the pending authentication URL for manual completion | |
| Returns: | |
| Authentication URL string or None if not available | |
| """ | |
| return getattr(self, '_pending_auth_url', None) | |
| def get_hf_redirect_uri(self) -> str: | |
| """Get the Hugging Face Spaces redirect URI | |
| Returns: | |
| Redirect URI string | |
| # """ | |
| # space_id = os.getenv('SPACE_ID') | |
| # space_author = os.getenv('SPACE_AUTHOR', 'username') | |
| return redirect_uri | |
| # For running in a local machine, use this method instead | |
| # def authenticate_interactive(self) -> bool: | |
| # """Interactive authentication flow that opens browser | |
| # Returns: | |
| # True if authentication successful, False otherwise | |
| # """ | |
| # try: | |
| # # Start local HTTP server for OAuth callback | |
| # server = HTTPServer(('localhost', 8080), OAuthCallbackHandler) | |
| # server.auth_code = None | |
| # # Get authorization URL | |
| # auth_url = self.get_authorization_url() | |
| # logger.info("Opening browser for authentication...") | |
| # logger.info(f"If browser doesn't open, visit: {auth_url}") | |
| # # Open browser | |
| # webbrowser.open(auth_url) | |
| # # Start server in background thread | |
| # server_thread = threading.Thread(target=server.handle_request) | |
| # server_thread.daemon = True | |
| # server_thread.start() | |
| # # Wait for callback (max 5 minutes) | |
| # timeout = 300 # 5 minutes | |
| # start_time = time.time() | |
| # while server.auth_code is None and (time.time() - start_time) < timeout: | |
| # time.sleep(1) | |
| # if server.auth_code is None: | |
| # logger.error("Authentication timed out") | |
| # return False | |
| # # Exchange authorization code for credentials | |
| # flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file( | |
| # str(self.client_secrets_file), | |
| # scopes=self.SCOPES | |
| # ) | |
| # flow.redirect_uri = self.redirect_uri | |
| # flow.fetch_token(code=server.auth_code) | |
| # credentials = flow.credentials | |
| # # Get user email from credentials | |
| # user_email = self._get_email_from_credentials(credentials) | |
| # if not user_email: | |
| # logger.error("Failed to get user email from credentials") | |
| # return False | |
| # # Save encrypted credentials for this account | |
| # self._save_credentials(user_email, credentials) | |
| # # Set as current account | |
| # self._save_current_account(user_email) | |
| # logger.info("Authentication successful!") | |
| # return True | |
| # except Exception as e: | |
| # logger.error(f"Authentication failed: {e}") | |
| # return False | |
| def _get_email_from_credentials(self, credentials: Credentials) -> Optional[str]: | |
| """Get email address from credentials""" | |
| try: | |
| service = googleapiclient.discovery.build( | |
| 'gmail', 'v1', credentials=credentials | |
| ) | |
| profile = service.users().getProfile(userId='me').execute() | |
| return profile.get('emailAddress') | |
| except Exception as e: | |
| logger.error(f"Failed to get email from credentials: {e}") | |
| return None | |
| def _save_credentials(self, email: str, credentials: Credentials): | |
| """Save encrypted credentials for a specific account""" | |
| try: | |
| # Load existing accounts | |
| accounts = self._load_accounts() | |
| # Encrypt and store credentials | |
| encrypted_credentials = self._encrypt_data(credentials) | |
| accounts[email] = base64.b64encode(encrypted_credentials).decode('utf-8') | |
| # Save accounts file | |
| with open(self.accounts_file, 'w') as f: | |
| json.dump(accounts, f, indent=2) | |
| # Make accounts file readable only by owner | |
| os.chmod(self.accounts_file, 0o600) | |
| logger.info(f"Credentials saved for account: {email}") | |
| except Exception as e: | |
| logger.error(f"Failed to save credentials for {email}: {e}") | |
| raise | |
| def _load_accounts(self) -> Dict[str, str]: | |
| """Load accounts data""" | |
| if not self.accounts_file.exists(): | |
| return {} | |
| try: | |
| with open(self.accounts_file, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| logger.error(f"Failed to load accounts: {e}") | |
| return {} | |
| def _load_credentials(self, email: str) -> Optional[Credentials]: | |
| """Load and decrypt credentials for a specific account""" | |
| accounts = self._load_accounts() | |
| if email not in accounts: | |
| return None | |
| try: | |
| encrypted_credentials = base64.b64decode(accounts[email]) | |
| credentials = self._decrypt_data(encrypted_credentials) | |
| return credentials | |
| except Exception as e: | |
| logger.error(f"Failed to load credentials for {email}: {e}") | |
| return None | |
| def get_valid_credentials(self, email: str = None) -> Optional[Credentials]: | |
| """Get valid credentials for an account, refreshing if necessary | |
| Args: | |
| email: Email address of account (uses current account if None) | |
| Returns: | |
| Valid Credentials object or None if authentication required | |
| """ | |
| if email is None: | |
| email = self.current_account_email | |
| if not email: | |
| logger.warning("No current account set") | |
| return None | |
| credentials = self._load_credentials(email) | |
| if not credentials: | |
| logger.warning(f"No stored credentials found for {email}") | |
| return None | |
| # Refresh if expired | |
| if credentials.expired and credentials.refresh_token: | |
| try: | |
| logger.info(f"Refreshing expired credentials for {email}...") | |
| credentials.refresh(Request()) | |
| self._save_credentials(email, credentials) | |
| logger.info("Credentials refreshed successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to refresh credentials for {email}: {e}") | |
| return None | |
| if not credentials.valid: | |
| logger.warning(f"Credentials are not valid for {email}") | |
| return None | |
| return credentials | |
| def is_authenticated(self, email: str = None) -> bool: | |
| """Check if user is authenticated | |
| Args: | |
| email: Email address to check (uses current account if None) | |
| Returns: | |
| True if valid credentials exist, False otherwise | |
| """ | |
| return self.get_valid_credentials(email) is not None | |
| def switch_account(self, email: str) -> bool: | |
| """Switch to a different authenticated account | |
| Args: | |
| email: Email address to switch to | |
| Returns: | |
| True if switch successful, False if account not found or not authenticated | |
| """ | |
| if self.is_authenticated(email): | |
| self._save_current_account(email) | |
| logger.info(f"Switched to account: {email}") | |
| return True | |
| else: | |
| logger.error(f"Account {email} is not authenticated") | |
| return False | |
| def list_accounts(self) -> Dict[str, bool]: | |
| """List all stored accounts and their authentication status | |
| Returns: | |
| Dictionary mapping email addresses to authentication status | |
| """ | |
| accounts = self._load_accounts() | |
| result = {} | |
| for email in accounts.keys(): | |
| result[email] = self.is_authenticated(email) | |
| return result | |
| def remove_account(self, email: str): | |
| """Remove an account and its credentials | |
| Args: | |
| email: Email address to remove | |
| """ | |
| accounts = self._load_accounts() | |
| if email in accounts: | |
| del accounts[email] | |
| # Save updated accounts | |
| with open(self.accounts_file, 'w') as f: | |
| json.dump(accounts, f, indent=2) | |
| # If this was the current account, clear it | |
| if self.current_account_email == email: | |
| if self.current_account_file.exists(): | |
| self.current_account_file.unlink() | |
| self.current_account_email = None | |
| logger.info(f"Removed account: {email}") | |
| else: | |
| logger.warning(f"Account {email} not found") | |
| def clear_credentials(self): | |
| """Clear all stored credentials""" | |
| if self.accounts_file.exists(): | |
| self.accounts_file.unlink() | |
| if self.current_account_file.exists(): | |
| self.current_account_file.unlink() | |
| self.current_account_email = None | |
| logger.info("All credentials cleared") | |
| def get_gmail_service(self, email: str = None): | |
| """Get authenticated Gmail service object | |
| Args: | |
| email: Email address (uses current account if None) | |
| Returns: | |
| Gmail service object or None if not authenticated | |
| """ | |
| credentials = self.get_valid_credentials(email) | |
| if not credentials: | |
| return None | |
| try: | |
| service = googleapiclient.discovery.build( | |
| 'gmail', 'v1', credentials=credentials | |
| ) | |
| return service | |
| except Exception as e: | |
| logger.error(f"Failed to build Gmail service: {e}") | |
| return None | |
| def get_user_email(self, email: str = None) -> Optional[str]: | |
| """Get the authenticated user's email address | |
| Args: | |
| email: Email address (uses current account if None) | |
| Returns: | |
| User's email address or None if not authenticated | |
| """ | |
| if email is None: | |
| return self.current_account_email | |
| return email if self.is_authenticated(email) else None | |
| def get_current_account(self) -> Optional[str]: | |
| """Get the currently selected account | |
| Returns: | |
| Current account email or None if no account selected | |
| """ | |
| return self.current_account_email | |
| # Global OAuth manager instance | |
| oauth_manager = GmailOAuthManager(credentials_dir="secure_data") |