tracker-test / google_drive_manager.py
umangchaudhry's picture
Upload 11 files
b03a18c verified
import os
import json
import tempfile
import time
from typing import Dict, List, Optional, Any
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import streamlit as st
import ssl
def retry_on_ssl_error(max_retries=3, delay=1):
"""Decorator to retry functions on SSL errors"""
def decorator(func):
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (ssl.SSLError, ConnectionError, OSError) as e:
if attempt == max_retries - 1:
print(f"SSL/Connection error after {max_retries} attempts: {e}")
raise
print(f"SSL/Connection error (attempt {attempt + 1}/{max_retries}): {e}")
time.sleep(delay * (attempt + 1)) # Exponential backoff
except Exception as e:
# Don't retry on other types of errors
raise
return None
return wrapper
return decorator
class GoogleDriveManager:
def __init__(self):
self.service = None
self.folder_id = None
self.is_huggingface = os.getenv('SPACE_ID') is not None
self.temp_dir = "/tmp/wedding_data" if self.is_huggingface else "temp_data"
# Ensure temp directory exists
os.makedirs(self.temp_dir, exist_ok=True)
def initialize(self, folder_id: str = None):
"""Initialize Google Drive service and set folder ID"""
try:
if self.is_huggingface:
self._setup_huggingface_auth()
else:
self._setup_local_auth()
if folder_id:
self.folder_id = folder_id
else:
# Try to get folder ID from environment
self.folder_id = os.getenv('GOOGLE_DRIVE_FOLDER_ID')
if not self.folder_id:
st.error("Google Drive folder ID not found. Please set GOOGLE_DRIVE_FOLDER_ID environment variable.")
return False
return True
except Exception as e:
st.error(f"Failed to initialize Google Drive: {str(e)}")
return False
def _setup_huggingface_auth(self):
"""Set up authentication for Hugging Face Spaces"""
# For Hugging Face, we'll use service account credentials
# stored as environment variables
service_account_info = {
"type": "service_account",
"project_id": os.getenv('GOOGLE_PROJECT_ID'),
"private_key_id": os.getenv('GOOGLE_PRIVATE_KEY_ID'),
"private_key": os.getenv('GOOGLE_PRIVATE_KEY', '').replace('\\n', '\n'),
"client_email": os.getenv('GOOGLE_CLIENT_EMAIL'),
"client_id": os.getenv('GOOGLE_CLIENT_ID'),
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": f"https://www.googleapis.com/robot/v1/metadata/x509/{os.getenv('GOOGLE_CLIENT_EMAIL')}"
}
# Validate that all required fields are present
required_fields = ['project_id', 'private_key_id', 'private_key', 'client_email', 'client_id']
missing_fields = [field for field in required_fields if not service_account_info.get(field)]
if missing_fields:
raise ValueError(f"Missing Google service account credentials: {missing_fields}")
credentials = service_account.Credentials.from_service_account_info(
service_account_info,
scopes=['https://www.googleapis.com/auth/drive']
)
self.service = build('drive', 'v3', credentials=credentials)
def _setup_local_auth(self):
"""Set up authentication for local development"""
# For local development, you can use OAuth or service account
# This is a simplified version - you might want to implement OAuth flow
service_account_path = os.getenv('GOOGLE_SERVICE_ACCOUNT_PATH')
if service_account_path and os.path.exists(service_account_path):
credentials = service_account.Credentials.from_service_account_file(
service_account_path,
scopes=['https://www.googleapis.com/auth/drive']
)
self.service = build('drive', 'v3', credentials=credentials)
else:
st.warning("Google service account file not found. Using local data only.")
self.service = None
@retry_on_ssl_error(max_retries=3, delay=1)
def list_files(self) -> List[Dict[str, Any]]:
"""List all files in the Google Drive folder"""
if not self.service or not self.folder_id:
return []
try:
query = f"'{self.folder_id}' in parents and trashed=false"
results = self.service.files().list(
q=query,
fields="files(id, name, modifiedTime, size)"
).execute()
return results.get('files', [])
except HttpError as e:
st.error(f"Error listing files: {str(e)}")
return []
@retry_on_ssl_error(max_retries=3, delay=1)
def download_file(self, file_name: str) -> Optional[Dict[str, Any]]:
"""Download a file from Google Drive and return its content"""
if not self.service or not self.folder_id:
return None
try:
# Handle subfolder paths like 'laraandumang/wedding_config.json'
if '/' in file_name:
folder_name, actual_file_name = file_name.split('/', 1)
# First, find the subfolder
folder_query = f"name='{folder_name}' and '{self.folder_id}' in parents and trashed=false and mimeType='application/vnd.google-apps.folder'"
folder_results = self.service.files().list(q=folder_query).execute()
folders = folder_results.get('files', [])
if not folders:
st.warning(f"File '{file_name}' not found in Google Drive")
return None
folder_id = folders[0]['id']
# Now search for the file within that folder
file_query = f"name='{actual_file_name}' and '{folder_id}' in parents and trashed=false"
results = self.service.files().list(q=file_query).execute()
files = results.get('files', [])
if not files:
st.warning(f"File '{file_name}' not found in Google Drive")
return None
else:
# Direct file search in root folder
query = f"name='{file_name}' and '{self.folder_id}' in parents and trashed=false"
results = self.service.files().list(q=query).execute()
files = results.get('files', [])
if not files:
st.warning(f"File '{file_name}' not found in Google Drive")
return None
file_id = files[0]['id']
# Download file content
request = self.service.files().get_media(fileId=file_id)
content = request.execute()
# Try to parse as JSON
try:
return json.loads(content.decode('utf-8'))
except json.JSONDecodeError:
# If not JSON, return as string
return content.decode('utf-8')
except HttpError as e:
st.error(f"Error downloading file '{file_name}': {str(e)}")
return None
@retry_on_ssl_error(max_retries=3, delay=1)
def upload_file(self, file_name: str, content: Any) -> bool:
"""Upload a file to Google Drive"""
if not self.service or not self.folder_id:
return False
try:
# Convert content to appropriate string format
if isinstance(content, (dict, list)):
if file_name.endswith('.yaml') or file_name.endswith('.yml'):
# For YAML files, convert to YAML string
import yaml
content_str = yaml.dump(content, default_flow_style=False, sort_keys=False)
else:
# For JSON files, convert to JSON string
content_str = json.dumps(content, indent=2)
else:
content_str = str(content)
# Convert string to bytes
content_bytes = content_str.encode('utf-8')
# Create a temporary file-like object
from io import BytesIO
media_body = BytesIO(content_bytes)
# Determine MIME type based on file extension
if file_name.endswith('.yaml') or file_name.endswith('.yml'):
mimetype = 'text/yaml'
else:
mimetype = 'application/json'
# Handle subfolder paths like 'laraandumang/wedding_config.json'
target_folder_id = self.folder_id
actual_file_name = file_name
if '/' in file_name:
folder_name, actual_file_name = file_name.split('/', 1)
# First, find the subfolder
folder_query = f"name='{folder_name}' and '{self.folder_id}' in parents and trashed=false and mimeType='application/vnd.google-apps.folder'"
folder_results = self.service.files().list(q=folder_query).execute()
folders = folder_results.get('files', [])
if not folders:
# Create the subfolder if it doesn't exist
folder_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder',
'parents': [self.folder_id]
}
created_folder = self.service.files().create(
body=folder_metadata,
fields='id'
).execute()
target_folder_id = created_folder.get('id')
else:
target_folder_id = folders[0]['id']
# Check if file already exists in the target folder
query = f"name='{actual_file_name}' and '{target_folder_id}' in parents and trashed=false"
results = self.service.files().list(q=query).execute()
existing_files = results.get('files', [])
if existing_files:
# Update existing file
file_id = existing_files[0]['id']
from googleapiclient.http import MediaIoBaseUpload
media = MediaIoBaseUpload(media_body, mimetype=mimetype, resumable=True)
self.service.files().update(
fileId=file_id,
media_body=media
).execute()
else:
# Create new file
file_metadata = {
'name': actual_file_name,
'parents': [target_folder_id]
}
from googleapiclient.http import MediaIoBaseUpload
media = MediaIoBaseUpload(media_body, mimetype=mimetype, resumable=True)
self.service.files().create(
body=file_metadata,
media_body=media
).execute()
return True
except HttpError as e:
st.error(f"Error uploading file '{file_name}': {str(e)}")
return False
except Exception as e:
st.error(f"Unexpected error uploading file '{file_name}': {str(e)}")
return False
def sync_from_drive(self, file_names: List[str]) -> Dict[str, Any]:
"""Download multiple files from Google Drive"""
synced_files = {}
for file_name in file_names:
content = self.download_file(file_name)
if content is not None:
synced_files[file_name] = content
# Save to local temp directory
local_path = os.path.join(self.temp_dir, file_name)
with open(local_path, 'w') as f:
if isinstance(content, (dict, list)):
json.dump(content, f, indent=2)
else:
f.write(str(content))
return synced_files
def sync_to_drive(self, file_names: List[str], local_data: Dict[str, Any]) -> bool:
"""Upload multiple files to Google Drive"""
success = True
for file_name in file_names:
if file_name in local_data:
if not self.upload_file(file_name, local_data[file_name]):
success = False
return success
def get_file_info(self, file_name: str) -> Optional[Dict[str, Any]]:
"""Get metadata for a specific file"""
if not self.service or not self.folder_id:
return None
try:
query = f"name='{file_name}' and '{self.folder_id}' in parents and trashed=false"
results = self.service.files().list(q=query).execute()
files = results.get('files', [])
if files:
return files[0]
return None
except HttpError as e:
st.error(f"Error getting file info for '{file_name}': {str(e)}")
return None
def is_online(self) -> bool:
"""Check if Google Drive service is available"""
return self.service is not None and self.folder_id is not None