| |
|
|
| |
| |
| |
| |
|
|
| import logging |
| import os |
| import shutil |
| from typing import List, Optional |
|
|
|
|
| logger = logging.getLogger(__file__) |
|
|
|
|
| try: |
| from iopath.common.file_io import g_pathmgr as IOPathManager |
|
|
| try: |
| |
| from .fb_pathhandlers import S3PathHandler |
|
|
| IOPathManager.register_handler(S3PathHandler()) |
| except KeyError: |
| logging.warning("S3PathHandler already registered.") |
| except ImportError: |
| logging.debug( |
| "S3PathHandler couldn't be imported. Either missing fb-only files, or boto3 module." |
| ) |
|
|
| except ImportError: |
| IOPathManager = None |
|
|
|
|
| class PathManager: |
| """ |
| Wrapper for insulating OSS I/O (using Python builtin operations) from |
| iopath's PathManager abstraction (for transparently handling various |
| internal backends). |
| """ |
|
|
| @staticmethod |
| def open( |
| path: str, |
| mode: str = "r", |
| buffering: int = -1, |
| encoding: Optional[str] = None, |
| errors: Optional[str] = None, |
| newline: Optional[str] = None, |
| ): |
| if IOPathManager: |
| return IOPathManager.open( |
| path=path, |
| mode=mode, |
| buffering=buffering, |
| encoding=encoding, |
| errors=errors, |
| newline=newline, |
| ) |
| return open( |
| path, |
| mode=mode, |
| buffering=buffering, |
| encoding=encoding, |
| errors=errors, |
| newline=newline, |
| ) |
|
|
| @staticmethod |
| def copy(src_path: str, dst_path: str, overwrite: bool = False) -> bool: |
| if IOPathManager: |
| return IOPathManager.copy( |
| src_path=src_path, dst_path=dst_path, overwrite=overwrite |
| ) |
| return shutil.copyfile(src_path, dst_path) |
|
|
| @staticmethod |
| def get_local_path(path: str, **kwargs) -> str: |
| if IOPathManager: |
| return IOPathManager.get_local_path(path, **kwargs) |
| return path |
|
|
| @staticmethod |
| def exists(path: str) -> bool: |
| if IOPathManager: |
| return IOPathManager.exists(path) |
| return os.path.exists(path) |
|
|
| @staticmethod |
| def isfile(path: str) -> bool: |
| if IOPathManager: |
| return IOPathManager.isfile(path) |
| return os.path.isfile(path) |
|
|
| @staticmethod |
| def ls(path: str) -> List[str]: |
| if IOPathManager: |
| return IOPathManager.ls(path) |
| return os.listdir(path) |
|
|
| @staticmethod |
| def mkdirs(path: str) -> None: |
| if IOPathManager: |
| return IOPathManager.mkdirs(path) |
| os.makedirs(path, exist_ok=True) |
|
|
| @staticmethod |
| def rm(path: str) -> None: |
| if IOPathManager: |
| return IOPathManager.rm(path) |
| os.remove(path) |
|
|
| @staticmethod |
| def chmod(path: str, mode: int) -> None: |
| if not PathManager.path_requires_pathmanager(path): |
| os.chmod(path, mode) |
|
|
| @staticmethod |
| def register_handler(handler) -> None: |
| if IOPathManager: |
| return IOPathManager.register_handler(handler=handler) |
|
|
| @staticmethod |
| def copy_from_local( |
| local_path: str, dst_path: str, overwrite: bool = False, **kwargs |
| ) -> None: |
| if IOPathManager: |
| return IOPathManager.copy_from_local( |
| local_path=local_path, dst_path=dst_path, overwrite=overwrite, **kwargs |
| ) |
| return shutil.copyfile(local_path, dst_path) |
|
|
| @staticmethod |
| def path_requires_pathmanager(path: str) -> bool: |
| """Do we require PathManager to access given path?""" |
| if IOPathManager: |
| for p in IOPathManager._path_handlers.keys(): |
| if path.startswith(p): |
| return True |
| return False |
|
|
| @staticmethod |
| def supports_rename(path: str) -> bool: |
| |
| return not PathManager.path_requires_pathmanager(path) |
|
|
| @staticmethod |
| def rename(src: str, dst: str): |
| os.rename(src, dst) |
|
|
| """ |
| ioPath async PathManager methods: |
| """ |
| @staticmethod |
| def opena( |
| path: str, |
| mode: str = "r", |
| buffering: int = -1, |
| encoding: Optional[str] = None, |
| errors: Optional[str] = None, |
| newline: Optional[str] = None, |
| ): |
| """ |
| Return file descriptor with asynchronous write operations. |
| """ |
| global IOPathManager |
| if not IOPathManager: |
| logging.info("ioPath is initializing PathManager.") |
| try: |
| from iopath.common.file_io import PathManager |
| IOPathManager = PathManager() |
| except Exception: |
| logging.exception("Failed to initialize ioPath PathManager object.") |
| return IOPathManager.opena( |
| path=path, |
| mode=mode, |
| buffering=buffering, |
| encoding=encoding, |
| errors=errors, |
| newline=newline, |
| ) |
|
|
| @staticmethod |
| def async_close() -> bool: |
| """ |
| Wait for files to be written and clean up asynchronous PathManager. |
| NOTE: `PathManager.async_close()` must be called at the end of any |
| script that uses `PathManager.opena(...)`. |
| """ |
| global IOPathManager |
| if IOPathManager: |
| return IOPathManager.async_close() |
| return False |
|
|