File size: 3,679 Bytes
9a4172d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import asyncio
import functools
import inspect

import fsspec
from fsspec.asyn import AsyncFileSystem, running_async


def async_wrapper(func, obj=None, semaphore=None):
    """
    Wraps a synchronous function to make it awaitable.

    Parameters
    ----------
    func : callable
        The synchronous function to wrap.
    obj : object, optional
        The instance to bind the function to, if applicable.
    semaphore : asyncio.Semaphore, optional
        A semaphore to limit concurrent calls.

    Returns
    -------
    coroutine
        An awaitable version of the function.
    """

    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        if semaphore:
            async with semaphore:
                return await asyncio.to_thread(func, *args, **kwargs)
        return await asyncio.to_thread(func, *args, **kwargs)

    return wrapper


class AsyncFileSystemWrapper(AsyncFileSystem):
    """
    A wrapper class to convert a synchronous filesystem into an asynchronous one.

    This class takes an existing synchronous filesystem implementation and wraps all
    its methods to provide an asynchronous interface.

    Parameters
    ----------
    sync_fs : AbstractFileSystem
        The synchronous filesystem instance to wrap.
    """

    protocol = "asyncwrapper", "async_wrapper"
    cachable = False

    def __init__(
        self,
        fs=None,
        asynchronous=None,
        target_protocol=None,
        target_options=None,
        semaphore=None,
        max_concurrent_tasks=None,
        **kwargs,
    ):
        if asynchronous is None:
            asynchronous = running_async()
        super().__init__(asynchronous=asynchronous, **kwargs)
        if fs is not None:
            self.sync_fs = fs
        else:
            self.sync_fs = fsspec.filesystem(target_protocol, **target_options)
        self.protocol = self.sync_fs.protocol
        self.semaphore = semaphore
        self._wrap_all_sync_methods()

    @property
    def fsid(self):
        return f"async_{self.sync_fs.fsid}"

    def _wrap_all_sync_methods(self):
        """
        Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
        """
        excluded_methods = {"open"}
        for method_name in dir(self.sync_fs):
            if method_name.startswith("_") or method_name in excluded_methods:
                continue

            attr = inspect.getattr_static(self.sync_fs, method_name)
            if isinstance(attr, property):
                continue

            method = getattr(self.sync_fs, method_name)
            if callable(method) and not inspect.iscoroutinefunction(method):
                async_method = async_wrapper(method, obj=self, semaphore=self.semaphore)
                setattr(self, f"_{method_name}", async_method)

    @classmethod
    def wrap_class(cls, sync_fs_class):
        """
        Create a new class that can be used to instantiate an AsyncFileSystemWrapper
        with lazy instantiation of the underlying synchronous filesystem.

        Parameters
        ----------
        sync_fs_class : type
            The class of the synchronous filesystem to wrap.

        Returns
        -------
        type
            A new class that wraps the provided synchronous filesystem class.
        """

        class GeneratedAsyncFileSystemWrapper(cls):
            def __init__(self, *args, **kwargs):
                sync_fs = sync_fs_class(*args, **kwargs)
                super().__init__(sync_fs)

        GeneratedAsyncFileSystemWrapper.__name__ = (
            f"Async{sync_fs_class.__name__}Wrapper"
        )
        return GeneratedAsyncFileSystemWrapper