apigateway / services /drive_service.py
jebin2's picture
update db
d218e46
import os
import logging
import io
import asyncio
from dotenv import load_dotenv
load_dotenv()
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
from googleapiclient.errors import HttpError
from core.database import DB_FILENAME
logger = logging.getLogger(__name__)
class DriveService:
SCOPES = [
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/drive.file'
]
FOLDER_NAME = "apigateway"
# DB_FILENAME is now imported from core.database
def __init__(self):
self.creds = None
self.service = None
# Server-side credentials for Drive API
self.client_id = os.getenv('SERVER_GOOGLE_CLIENT_ID') or os.getenv('GOOGLE_CLIENT_ID')
self.client_secret = os.getenv('SERVER_GOOGLE_CLIENT_SECRET') or os.getenv('GOOGLE_CLIENT_SECRET')
self.refresh_token = os.getenv('SERVER_GOOGLE_REFRESH_TOKEN') or os.getenv('GOOGLE_REFRESH_TOKEN')
def authenticate(self):
"""Authenticate using the refresh token."""
if not all([self.client_id, self.client_secret, self.refresh_token]):
logger.error("Missing Google API credentials for Drive Service")
return False
try:
self.creds = Credentials(
None,
refresh_token=self.refresh_token,
token_uri="https://oauth2.googleapis.com/token",
client_id=self.client_id,
client_secret=self.client_secret,
scopes=self.SCOPES
)
if self.creds and self.creds.expired and self.creds.refresh_token:
self.creds.refresh(Request())
self.service = build('drive', 'v3', credentials=self.creds)
return True
except Exception as e:
logger.error(f"Failed to authenticate with Drive API: {e}")
return False
def _find_folder(self):
"""Find the 'apigateway' folder."""
try:
query = f"mimeType='application/vnd.google-apps.folder' and name='{self.FOLDER_NAME}' and trashed=false"
results = self.service.files().list(q=query, spaces='drive', fields='files(id, name)').execute()
files = results.get('files', [])
if not files:
return None
return files[0]['id']
except HttpError as error:
logger.error(f"An error occurred searching for folder: {error}")
return None
def _create_folder(self):
"""Create the 'apigateway' folder."""
try:
file_metadata = {
'name': self.FOLDER_NAME,
'mimeType': 'application/vnd.google-apps.folder'
}
file = self.service.files().create(body=file_metadata, fields='id').execute()
logger.info(f"Created folder with ID: {file.get('id')}")
return file.get('id')
except HttpError as error:
logger.error(f"An error occurred creating folder: {error}")
return None
def _get_folder_id(self):
"""Get folder ID, creating it if it doesn't exist."""
folder_id = self._find_folder()
if not folder_id:
folder_id = self._create_folder()
return folder_id
def upload_db(self):
"""Upload the database file to Google Drive."""
if not self.service and not self.authenticate():
return False
if not os.path.exists(DB_FILENAME):
logger.warning(f"Database file {DB_FILENAME} not found for upload.")
return False
folder_id = self._get_folder_id()
if not folder_id:
return False
try:
# Check if file already exists in folder
query = f"name='{DB_FILENAME}' and '{folder_id}' in parents and trashed=false"
results = self.service.files().list(q=query, spaces='drive', fields='files(id)').execute()
files = results.get('files', [])
media = MediaFileUpload(DB_FILENAME, mimetype='application/x-sqlite3', resumable=True)
if files:
# Update existing file
file_id = files[0]['id']
self.service.files().update(fileId=file_id, media_body=media).execute()
logger.info(f"Updated database file {DB_FILENAME} in Drive (ID: {file_id})")
else:
# Create new file
file_metadata = {
'name': DB_FILENAME,
'parents': [folder_id]
}
self.service.files().create(body=file_metadata, media_body=media, fields='id').execute()
logger.info(f"Uploaded new database file {DB_FILENAME} to Drive")
return True
except HttpError as error:
logger.error(f"An error occurred uploading DB: {error}")
return False
except Exception as e:
logger.error(f"Unexpected error uploading DB: {e}")
return False
async def upload_db_async(self):
"""Async wrapper for upload_db to run in a separate thread."""
return await asyncio.to_thread(self.upload_db)
def download_db(self):
"""Download the database file from Google Drive."""
if not self.service and not self.authenticate():
return False
folder_id = self._find_folder() # Don't create if not found, just return
if not folder_id:
logger.info("No 'apigateway' folder found in Drive. Starting with fresh DB.")
return False
try:
query = f"name='{DB_FILENAME}' and '{folder_id}' in parents and trashed=false"
results = self.service.files().list(q=query, spaces='drive', fields='files(id)').execute()
files = results.get('files', [])
if not files:
logger.info(f"No {DB_FILENAME} found in Drive folder. Starting with fresh DB.")
return False
file_id = files[0]['id']
request = self.service.files().get_media(fileId=file_id)
fh = io.FileIO(DB_FILENAME, 'wb')
downloader = MediaIoBaseDownload(fh, request)
done = False
while done is False:
status, done = downloader.next_chunk()
# logger.info(f"Download {int(status.progress() * 100)}%.")
logger.info(f"Successfully downloaded {DB_FILENAME} from Drive.")
return True
except HttpError as error:
logger.error(f"An error occurred downloading DB: {error}")
return False
except Exception as e:
logger.error(f"Unexpected error downloading DB: {e}")
return False