Recording-QC-Bot / src /preprocessing /gdrive_manager.py
varund2003's picture
added dspy, to allow .mkv files, upload multiple slides and notebooks, remove base name matching in mentor materials
a4af32a
# src/gdrive_manager.py
import os
import io
import json
import logging
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
from googleapiclient.errors import HttpError
from google.oauth2 import service_account
logger = logging.getLogger(__name__)
class GoogleDriveManager:
SCOPES = ['https://www.googleapis.com/auth/drive']
def __init__(self):
# Try to load full credentials from environment variable
gcp_credentials = os.getenv("GCP_CREDENTIALS")
if gcp_credentials:
cred_data = json.loads(gcp_credentials)
creds = service_account.Credentials.from_service_account_info(
cred_data, scopes=self.SCOPES
)
else:
# Fallback to file-based credentials
creds = service_account.Credentials.from_service_account_file(
"credentials.json", scopes=self.SCOPES
)
self.service = build('drive', 'v3', credentials=creds)
def get_folder_id(self, url):
"""Extract folder ID from Google Drive URL"""
if 'folders/' in url:
return url.split('folders/')[-1].split('?')[0]
elif 'id=' in url:
return url.split('id=')[-1].split('&')[0]
return url
def list_files(self, folder_id, file_types=('video/mp4', 'video/x-matroska')):
"""List files in a Google Drive folder"""
if isinstance(file_types, str):
file_types = [file_types]
type_query = " or ".join([f"mimeType='{ftype}'" for ftype in file_types])
query = f"('{folder_id}' in parents) and ({type_query}) and trashed=false"
results = self.service.files().list(
q=query,
fields="files(id, name, mimeType)"
).execute()
return results.get('files', [])
def download_file(self, file_id, destination):
"""Download a file from Google Drive"""
logger.info("Downloading file %s to %s", file_id, destination)
request = self.service.files().get_media(fileId=file_id)
fh = io.FileIO(destination, 'wb')
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
status, done = downloader.next_chunk()
logger.info(f"Download {int(status.progress() * 100)}%")
logger.info("Download complete: %s", destination)
return destination
def upload_file(self, local_path, drive_folder_id, mime_type):
"""Upload a file to Google Drive"""
file_metadata = {
'name': os.path.basename(local_path),
'parents': [drive_folder_id]
}
media = MediaFileUpload(local_path, mimetype=mime_type)
file = self.service.files().create(
body=file_metadata,
media_body=media,
fields='id'
).execute()
logger.info(f"Uploaded {local_path} to Drive folder {drive_folder_id}")
return file.get('id')
def delete_file(self, file_id):
"""Delete a file from Google Drive"""
try:
self.service.files().delete(fileId=file_id).execute()
logger.info(f"Deleted file {file_id} from Drive")
return True
except HttpError as error:
logger.error(f"An error occurred: {error}")
return False
def find_file_by_name(self, folder_id, filename):
"""Find a file by name in a folder"""
query = f"'{folder_id}' in parents and name='{filename}' and trashed=false"
results = self.service.files().list(
q=query,
fields="files(id)"
).execute()
files = results.get('files', [])
return files[0]['id'] if files else None
def list_txt_files(self, folder_id):
"""List all .txt files in a Google Drive folder"""
query = f"'{folder_id}' in parents and mimeType='text/plain' and trashed=false"
results = self.service.files().list(
q=query,
fields="files(id, name)"
).execute()
return results.get('files', [])
def remove_duplicates_by_name(self, folder_id):
"""Remove duplicate files (by name) in a Drive folder, keeping only the latest."""
files = self.list_txt_files(folder_id)
name_map = {}
for file in files:
if file['name'] not in name_map:
name_map[file['name']] = file
else:
# If duplicate, delete the older one (or you can keep the latest by timestamp if available)
self.delete_file(file['id'])
logger.info(f"Deleted duplicate file: {file['name']} ({file['id']})")