Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import tempfile | |
| import time | |
| from typing import Dict, List, Optional, Any | |
| from google.oauth2 import service_account | |
| from google.oauth2.credentials import Credentials | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| import streamlit as st | |
| import ssl | |
| def retry_on_ssl_error(max_retries=3, delay=1): | |
| """Decorator to retry functions on SSL errors""" | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| for attempt in range(max_retries): | |
| try: | |
| return func(*args, **kwargs) | |
| except (ssl.SSLError, ConnectionError, OSError) as e: | |
| if attempt == max_retries - 1: | |
| print(f"SSL/Connection error after {max_retries} attempts: {e}") | |
| raise | |
| print(f"SSL/Connection error (attempt {attempt + 1}/{max_retries}): {e}") | |
| time.sleep(delay * (attempt + 1)) # Exponential backoff | |
| except Exception as e: | |
| # Don't retry on other types of errors | |
| raise | |
| return None | |
| return wrapper | |
| return decorator | |
| class GoogleDriveManager: | |
| def __init__(self): | |
| self.service = None | |
| self.folder_id = None | |
| self.is_huggingface = os.getenv('SPACE_ID') is not None | |
| self.temp_dir = "/tmp/wedding_data" if self.is_huggingface else "temp_data" | |
| # Ensure temp directory exists | |
| os.makedirs(self.temp_dir, exist_ok=True) | |
| def initialize(self, folder_id: str = None): | |
| """Initialize Google Drive service and set folder ID""" | |
| try: | |
| if self.is_huggingface: | |
| self._setup_huggingface_auth() | |
| else: | |
| self._setup_local_auth() | |
| if folder_id: | |
| self.folder_id = folder_id | |
| else: | |
| # Try to get folder ID from environment | |
| self.folder_id = os.getenv('GOOGLE_DRIVE_FOLDER_ID') | |
| if not self.folder_id: | |
| st.error("Google Drive folder ID not found. Please set GOOGLE_DRIVE_FOLDER_ID environment variable.") | |
| return False | |
| return True | |
| except Exception as e: | |
| st.error(f"Failed to initialize Google Drive: {str(e)}") | |
| return False | |
| def _setup_huggingface_auth(self): | |
| """Set up authentication for Hugging Face Spaces""" | |
| # For Hugging Face, we'll use service account credentials | |
| # stored as environment variables | |
| service_account_info = { | |
| "type": "service_account", | |
| "project_id": os.getenv('GOOGLE_PROJECT_ID'), | |
| "private_key_id": os.getenv('GOOGLE_PRIVATE_KEY_ID'), | |
| "private_key": os.getenv('GOOGLE_PRIVATE_KEY', '').replace('\\n', '\n'), | |
| "client_email": os.getenv('GOOGLE_CLIENT_EMAIL'), | |
| "client_id": os.getenv('GOOGLE_CLIENT_ID'), | |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
| "token_uri": "https://oauth2.googleapis.com/token", | |
| "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", | |
| "client_x509_cert_url": f"https://www.googleapis.com/robot/v1/metadata/x509/{os.getenv('GOOGLE_CLIENT_EMAIL')}" | |
| } | |
| # Validate that all required fields are present | |
| required_fields = ['project_id', 'private_key_id', 'private_key', 'client_email', 'client_id'] | |
| missing_fields = [field for field in required_fields if not service_account_info.get(field)] | |
| if missing_fields: | |
| raise ValueError(f"Missing Google service account credentials: {missing_fields}") | |
| credentials = service_account.Credentials.from_service_account_info( | |
| service_account_info, | |
| scopes=['https://www.googleapis.com/auth/drive'] | |
| ) | |
| self.service = build('drive', 'v3', credentials=credentials) | |
| def _setup_local_auth(self): | |
| """Set up authentication for local development""" | |
| # For local development, you can use OAuth or service account | |
| # This is a simplified version - you might want to implement OAuth flow | |
| service_account_path = os.getenv('GOOGLE_SERVICE_ACCOUNT_PATH') | |
| if service_account_path and os.path.exists(service_account_path): | |
| credentials = service_account.Credentials.from_service_account_file( | |
| service_account_path, | |
| scopes=['https://www.googleapis.com/auth/drive'] | |
| ) | |
| self.service = build('drive', 'v3', credentials=credentials) | |
| else: | |
| st.warning("Google service account file not found. Using local data only.") | |
| self.service = None | |
| def list_files(self) -> List[Dict[str, Any]]: | |
| """List all files in the Google Drive folder""" | |
| if not self.service or not self.folder_id: | |
| return [] | |
| try: | |
| query = f"'{self.folder_id}' in parents and trashed=false" | |
| results = self.service.files().list( | |
| q=query, | |
| fields="files(id, name, modifiedTime, size)" | |
| ).execute() | |
| return results.get('files', []) | |
| except HttpError as e: | |
| st.error(f"Error listing files: {str(e)}") | |
| return [] | |
| def download_file(self, file_name: str) -> Optional[Dict[str, Any]]: | |
| """Download a file from Google Drive and return its content""" | |
| if not self.service or not self.folder_id: | |
| return None | |
| try: | |
| # Handle subfolder paths like 'laraandumang/wedding_config.json' | |
| if '/' in file_name: | |
| folder_name, actual_file_name = file_name.split('/', 1) | |
| # First, find the subfolder | |
| folder_query = f"name='{folder_name}' and '{self.folder_id}' in parents and trashed=false and mimeType='application/vnd.google-apps.folder'" | |
| folder_results = self.service.files().list(q=folder_query).execute() | |
| folders = folder_results.get('files', []) | |
| if not folders: | |
| st.warning(f"File '{file_name}' not found in Google Drive") | |
| return None | |
| folder_id = folders[0]['id'] | |
| # Now search for the file within that folder | |
| file_query = f"name='{actual_file_name}' and '{folder_id}' in parents and trashed=false" | |
| results = self.service.files().list(q=file_query).execute() | |
| files = results.get('files', []) | |
| if not files: | |
| st.warning(f"File '{file_name}' not found in Google Drive") | |
| return None | |
| else: | |
| # Direct file search in root folder | |
| query = f"name='{file_name}' and '{self.folder_id}' in parents and trashed=false" | |
| results = self.service.files().list(q=query).execute() | |
| files = results.get('files', []) | |
| if not files: | |
| st.warning(f"File '{file_name}' not found in Google Drive") | |
| return None | |
| file_id = files[0]['id'] | |
| # Download file content | |
| request = self.service.files().get_media(fileId=file_id) | |
| content = request.execute() | |
| # Try to parse as JSON | |
| try: | |
| return json.loads(content.decode('utf-8')) | |
| except json.JSONDecodeError: | |
| # If not JSON, return as string | |
| return content.decode('utf-8') | |
| except HttpError as e: | |
| st.error(f"Error downloading file '{file_name}': {str(e)}") | |
| return None | |
| def upload_file(self, file_name: str, content: Any) -> bool: | |
| """Upload a file to Google Drive""" | |
| if not self.service or not self.folder_id: | |
| return False | |
| try: | |
| # Convert content to appropriate string format | |
| if isinstance(content, (dict, list)): | |
| if file_name.endswith('.yaml') or file_name.endswith('.yml'): | |
| # For YAML files, convert to YAML string | |
| import yaml | |
| content_str = yaml.dump(content, default_flow_style=False, sort_keys=False) | |
| else: | |
| # For JSON files, convert to JSON string | |
| content_str = json.dumps(content, indent=2) | |
| else: | |
| content_str = str(content) | |
| # Convert string to bytes | |
| content_bytes = content_str.encode('utf-8') | |
| # Create a temporary file-like object | |
| from io import BytesIO | |
| media_body = BytesIO(content_bytes) | |
| # Determine MIME type based on file extension | |
| if file_name.endswith('.yaml') or file_name.endswith('.yml'): | |
| mimetype = 'text/yaml' | |
| else: | |
| mimetype = 'application/json' | |
| # Handle subfolder paths like 'laraandumang/wedding_config.json' | |
| target_folder_id = self.folder_id | |
| actual_file_name = file_name | |
| if '/' in file_name: | |
| folder_name, actual_file_name = file_name.split('/', 1) | |
| # First, find the subfolder | |
| folder_query = f"name='{folder_name}' and '{self.folder_id}' in parents and trashed=false and mimeType='application/vnd.google-apps.folder'" | |
| folder_results = self.service.files().list(q=folder_query).execute() | |
| folders = folder_results.get('files', []) | |
| if not folders: | |
| # Create the subfolder if it doesn't exist | |
| folder_metadata = { | |
| 'name': folder_name, | |
| 'mimeType': 'application/vnd.google-apps.folder', | |
| 'parents': [self.folder_id] | |
| } | |
| created_folder = self.service.files().create( | |
| body=folder_metadata, | |
| fields='id' | |
| ).execute() | |
| target_folder_id = created_folder.get('id') | |
| else: | |
| target_folder_id = folders[0]['id'] | |
| # Check if file already exists in the target folder | |
| query = f"name='{actual_file_name}' and '{target_folder_id}' in parents and trashed=false" | |
| results = self.service.files().list(q=query).execute() | |
| existing_files = results.get('files', []) | |
| if existing_files: | |
| # Update existing file | |
| file_id = existing_files[0]['id'] | |
| from googleapiclient.http import MediaIoBaseUpload | |
| media = MediaIoBaseUpload(media_body, mimetype=mimetype, resumable=True) | |
| self.service.files().update( | |
| fileId=file_id, | |
| media_body=media | |
| ).execute() | |
| else: | |
| # Create new file | |
| file_metadata = { | |
| 'name': actual_file_name, | |
| 'parents': [target_folder_id] | |
| } | |
| from googleapiclient.http import MediaIoBaseUpload | |
| media = MediaIoBaseUpload(media_body, mimetype=mimetype, resumable=True) | |
| self.service.files().create( | |
| body=file_metadata, | |
| media_body=media | |
| ).execute() | |
| return True | |
| except HttpError as e: | |
| st.error(f"Error uploading file '{file_name}': {str(e)}") | |
| return False | |
| except Exception as e: | |
| st.error(f"Unexpected error uploading file '{file_name}': {str(e)}") | |
| return False | |
| def sync_from_drive(self, file_names: List[str]) -> Dict[str, Any]: | |
| """Download multiple files from Google Drive""" | |
| synced_files = {} | |
| for file_name in file_names: | |
| content = self.download_file(file_name) | |
| if content is not None: | |
| synced_files[file_name] = content | |
| # Save to local temp directory | |
| local_path = os.path.join(self.temp_dir, file_name) | |
| with open(local_path, 'w') as f: | |
| if isinstance(content, (dict, list)): | |
| json.dump(content, f, indent=2) | |
| else: | |
| f.write(str(content)) | |
| return synced_files | |
| def sync_to_drive(self, file_names: List[str], local_data: Dict[str, Any]) -> bool: | |
| """Upload multiple files to Google Drive""" | |
| success = True | |
| for file_name in file_names: | |
| if file_name in local_data: | |
| if not self.upload_file(file_name, local_data[file_name]): | |
| success = False | |
| return success | |
| def get_file_info(self, file_name: str) -> Optional[Dict[str, Any]]: | |
| """Get metadata for a specific file""" | |
| if not self.service or not self.folder_id: | |
| return None | |
| try: | |
| query = f"name='{file_name}' and '{self.folder_id}' in parents and trashed=false" | |
| results = self.service.files().list(q=query).execute() | |
| files = results.get('files', []) | |
| if files: | |
| return files[0] | |
| return None | |
| except HttpError as e: | |
| st.error(f"Error getting file info for '{file_name}': {str(e)}") | |
| return None | |
| def is_online(self) -> bool: | |
| """Check if Google Drive service is available""" | |
| return self.service is not None and self.folder_id is not None | |