Spaces:
Running
Running
| import os | |
| import sys | |
| import subprocess | |
| import json | |
| from datetime import datetime | |
| try: | |
| from rclone_installer import RCLONE_EXE, check_and_install_rclone | |
| except ImportError: | |
| # Fallback if imported incorrectly | |
| RCLONE_EXE = r"C:\rclone\rclone.exe" | |
| def check_and_install_rclone(): pass | |
| REMOTE_NAME = "gdrive" | |
| REMOTE_FOLDER = "Guardians_Workspace/Aizen_Forge" | |
| def run_rclone(args): | |
| """Executes rclone with the given arguments.""" | |
| cmd = [RCLONE_EXE] + args | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8') | |
| if result.returncode != 0: | |
| return {"success": False, "error": result.stderr.strip() or f"Exit code {result.returncode}"} | |
| return {"success": True, "output": result.stdout.strip()} | |
| except Exception as e: | |
| return {"success": False, "error": str(e)} | |
| def list_files(): | |
| """Lists files in the Google Drive remote folder.""" | |
| remote_path = f"{REMOTE_NAME}:{REMOTE_FOLDER}" | |
| # Using lsjson for machine-readable output | |
| result = run_rclone(["lsjson", remote_path]) | |
| if result["success"]: | |
| try: | |
| files = json.loads(result["output"]) | |
| return {"success": True, "files": files} | |
| except: | |
| return {"success": True, "raw": result["output"]} | |
| return result | |
| def upload_file(local_path, remote_name=None): | |
| """Uploads a local file to Google Drive.""" | |
| if not os.path.exists(local_path): | |
| return {"success": False, "error": f"Local file not found: {local_path}"} | |
| target_name = remote_name or os.path.basename(local_path) | |
| remote_path = f"{REMOTE_NAME}:{REMOTE_FOLDER}/{target_name}" | |
| return run_rclone(["copyto", local_path, remote_path]) | |
| def download_file(remote_name, local_path): | |
| """Downloads a file from Google Drive to local.""" | |
| remote_path = f"{REMOTE_NAME}:{REMOTE_FOLDER}/{remote_name}" | |
| return run_rclone(["copyto", remote_path, local_path]) | |
| def sync_dir(local_dir, direction="to_cloud"): | |
| """Syncs a directory with Google Drive.""" | |
| remote_path = f"{REMOTE_NAME}:{REMOTE_FOLDER}" | |
| if direction == "to_cloud": | |
| return run_rclone(["sync", local_dir, remote_path]) | |
| else: | |
| return run_rclone(["sync", remote_path, local_dir]) | |
| if __name__ == "__main__": | |
| if len(sys.argv) < 2: | |
| print("Usage: python gdrive_tool.py [list|upload|download|sync] <args>") | |
| sys.exit(1) | |
| check_and_install_rclone() | |
| action = sys.argv[1] | |
| if action == "list": | |
| res = list_files() | |
| if res["success"]: | |
| if "files" in res: | |
| for f in res["files"]: | |
| print(f"[{f.get('Size', 0)} bytes] {f.get('Name')} ({f.get('ModTime')})") | |
| else: | |
| print(res["raw"]) | |
| else: | |
| print(f"Error: {res['error']}") | |
| elif action == "upload" and len(sys.argv) >= 3: | |
| local = sys.argv[2] | |
| res = upload_file(local) | |
| print("Upload successful" if res["success"] else f"Upload failed: {res['error']}") | |
| elif action == "download" and len(sys.argv) >= 4: | |
| remote = sys.argv[2] | |
| local = sys.argv[3] | |
| res = download_file(remote, local) | |
| print("Download successful" if res["success"] else f"Download failed: {res['error']}") | |
| else: | |
| print("Invalid action or missing arguments.") | |