File size: 4,690 Bytes
d8fd28f
 
 
83f6eac
d8fd28f
 
 
 
 
 
 
 
 
 
 
83f6eac
 
 
 
 
 
 
 
 
 
 
 
 
d8fd28f
 
 
 
 
 
 
 
 
 
a4af32a
d8fd28f
a4af32a
 
 
 
d8fd28f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83f6eac
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# src/gdrive_manager.py
import os
import io
import json
import logging
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
from googleapiclient.errors import HttpError
from google.oauth2 import service_account

logger = logging.getLogger(__name__)

class GoogleDriveManager:
    SCOPES = ['https://www.googleapis.com/auth/drive']

    def __init__(self):
        # Try to load full credentials from environment variable
        gcp_credentials = os.getenv("GCP_CREDENTIALS")
        if gcp_credentials:
            cred_data = json.loads(gcp_credentials)
            creds = service_account.Credentials.from_service_account_info(
                cred_data, scopes=self.SCOPES
            )
        else:
            # Fallback to file-based credentials
            creds = service_account.Credentials.from_service_account_file(
                "credentials.json", scopes=self.SCOPES
            )
        self.service = build('drive', 'v3', credentials=creds)

    def get_folder_id(self, url):
        """Extract folder ID from Google Drive URL"""
        if 'folders/' in url:
            return url.split('folders/')[-1].split('?')[0]
        elif 'id=' in url:
            return url.split('id=')[-1].split('&')[0]
        return url

    def list_files(self, folder_id, file_types=('video/mp4', 'video/x-matroska')):
        """List files in a Google Drive folder"""
        if isinstance(file_types, str):
            file_types = [file_types]
        type_query = " or ".join([f"mimeType='{ftype}'" for ftype in file_types])
        query = f"('{folder_id}' in parents) and ({type_query}) and trashed=false"
        results = self.service.files().list(
            q=query,
            fields="files(id, name, mimeType)"
        ).execute()
        return results.get('files', [])

    def download_file(self, file_id, destination):
        """Download a file from Google Drive"""
        logger.info("Downloading file %s to %s", file_id, destination)
        request = self.service.files().get_media(fileId=file_id)
        fh = io.FileIO(destination, 'wb')
        downloader = MediaIoBaseDownload(fh, request)
        done = False
        while not done:
            status, done = downloader.next_chunk()
            logger.info(f"Download {int(status.progress() * 100)}%")
        logger.info("Download complete: %s", destination)
        return destination

    def upload_file(self, local_path, drive_folder_id, mime_type):
        """Upload a file to Google Drive"""
        file_metadata = {
            'name': os.path.basename(local_path),
            'parents': [drive_folder_id]
        }
        media = MediaFileUpload(local_path, mimetype=mime_type)
        file = self.service.files().create(
            body=file_metadata,
            media_body=media,
            fields='id'
        ).execute()
        logger.info(f"Uploaded {local_path} to Drive folder {drive_folder_id}")
        return file.get('id')

    def delete_file(self, file_id):
        """Delete a file from Google Drive"""
        try:
            self.service.files().delete(fileId=file_id).execute()
            logger.info(f"Deleted file {file_id} from Drive")
            return True
        except HttpError as error:
            logger.error(f"An error occurred: {error}")
            return False

    def find_file_by_name(self, folder_id, filename):
        """Find a file by name in a folder"""
        query = f"'{folder_id}' in parents and name='{filename}' and trashed=false"
        results = self.service.files().list(
            q=query,
            fields="files(id)"
        ).execute()
        files = results.get('files', [])
        return files[0]['id'] if files else None

    def list_txt_files(self, folder_id):
        """List all .txt files in a Google Drive folder"""
        query = f"'{folder_id}' in parents and mimeType='text/plain' and trashed=false"
        results = self.service.files().list(
            q=query,
            fields="files(id, name)"
        ).execute()
        return results.get('files', [])

    def remove_duplicates_by_name(self, folder_id):
        """Remove duplicate files (by name) in a Drive folder, keeping only the latest."""
        files = self.list_txt_files(folder_id)
        name_map = {}
        for file in files:
            if file['name'] not in name_map:
                name_map[file['name']] = file
            else:
                # If duplicate, delete the older one (or you can keep the latest by timestamp if available)
                self.delete_file(file['id'])
                logger.info(f"Deleted duplicate file: {file['name']} ({file['id']})")