Spaces:
Sleeping
Sleeping
| import asyncio | |
| import functools | |
| import inspect | |
| import fsspec | |
| from fsspec.asyn import AsyncFileSystem, running_async | |
| def async_wrapper(func, obj=None): | |
| """ | |
| Wraps a synchronous function to make it awaitable. | |
| Parameters | |
| ---------- | |
| func : callable | |
| The synchronous function to wrap. | |
| obj : object, optional | |
| The instance to bind the function to, if applicable. | |
| Returns | |
| ------- | |
| coroutine | |
| An awaitable version of the function. | |
| """ | |
| async def wrapper(*args, **kwargs): | |
| return await asyncio.to_thread(func, *args, **kwargs) | |
| return wrapper | |
| class AsyncFileSystemWrapper(AsyncFileSystem): | |
| """ | |
| A wrapper class to convert a synchronous filesystem into an asynchronous one. | |
| This class takes an existing synchronous filesystem implementation and wraps all | |
| its methods to provide an asynchronous interface. | |
| Parameters | |
| ---------- | |
| sync_fs : AbstractFileSystem | |
| The synchronous filesystem instance to wrap. | |
| """ | |
| protocol = "asyncwrapper", "async_wrapper" | |
| cachable = False | |
| def __init__( | |
| self, | |
| fs=None, | |
| asynchronous=None, | |
| target_protocol=None, | |
| target_options=None, | |
| **kwargs, | |
| ): | |
| if asynchronous is None: | |
| asynchronous = running_async() | |
| super().__init__(asynchronous=asynchronous, **kwargs) | |
| if fs is not None: | |
| self.sync_fs = fs | |
| else: | |
| self.sync_fs = fsspec.filesystem(target_protocol, **target_options) | |
| self.protocol = self.sync_fs.protocol | |
| self._wrap_all_sync_methods() | |
| def fsid(self): | |
| return f"async_{self.sync_fs.fsid}" | |
| def _wrap_all_sync_methods(self): | |
| """ | |
| Wrap all synchronous methods of the underlying filesystem with asynchronous versions. | |
| """ | |
| excluded_methods = {"open"} | |
| for method_name in dir(self.sync_fs): | |
| if method_name.startswith("_") or method_name in excluded_methods: | |
| continue | |
| attr = inspect.getattr_static(self.sync_fs, method_name) | |
| if isinstance(attr, property): | |
| continue | |
| method = getattr(self.sync_fs, method_name) | |
| if callable(method) and not inspect.iscoroutinefunction(method): | |
| async_method = async_wrapper(method, obj=self) | |
| setattr(self, f"_{method_name}", async_method) | |
| def wrap_class(cls, sync_fs_class): | |
| """ | |
| Create a new class that can be used to instantiate an AsyncFileSystemWrapper | |
| with lazy instantiation of the underlying synchronous filesystem. | |
| Parameters | |
| ---------- | |
| sync_fs_class : type | |
| The class of the synchronous filesystem to wrap. | |
| Returns | |
| ------- | |
| type | |
| A new class that wraps the provided synchronous filesystem class. | |
| """ | |
| class GeneratedAsyncFileSystemWrapper(cls): | |
| def __init__(self, *args, **kwargs): | |
| sync_fs = sync_fs_class(*args, **kwargs) | |
| super().__init__(sync_fs) | |
| GeneratedAsyncFileSystemWrapper.__name__ = ( | |
| f"Async{sync_fs_class.__name__}Wrapper" | |
| ) | |
| return GeneratedAsyncFileSystemWrapper | |