|
|
from contextlib import suppress |
|
|
from io import TextIOWrapper |
|
|
|
|
|
from . import abc |
|
|
|
|
|
|
|
|
class SpecLoaderAdapter: |
|
|
""" |
|
|
Adapt a package spec to adapt the underlying loader. |
|
|
""" |
|
|
|
|
|
def __init__(self, spec, adapter=lambda spec: spec.loader): |
|
|
self.spec = spec |
|
|
self.loader = adapter(spec) |
|
|
|
|
|
def __getattr__(self, name): |
|
|
return getattr(self.spec, name) |
|
|
|
|
|
|
|
|
class TraversableResourcesLoader: |
|
|
""" |
|
|
Adapt a loader to provide TraversableResources. |
|
|
""" |
|
|
|
|
|
def __init__(self, spec): |
|
|
self.spec = spec |
|
|
|
|
|
def get_resource_reader(self, name): |
|
|
return CompatibilityFiles(self.spec)._native() |
|
|
|
|
|
|
|
|
def _io_wrapper(file, mode='r', *args, **kwargs): |
|
|
if mode == 'r': |
|
|
return TextIOWrapper(file, *args, **kwargs) |
|
|
elif mode == 'rb': |
|
|
return file |
|
|
raise ValueError( |
|
|
"Invalid mode value '{}', only 'r' and 'rb' are supported".format(mode) |
|
|
) |
|
|
|
|
|
|
|
|
class CompatibilityFiles: |
|
|
""" |
|
|
Adapter for an existing or non-existent resource reader |
|
|
to provide a compatibility .files(). |
|
|
""" |
|
|
|
|
|
class SpecPath(abc.Traversable): |
|
|
""" |
|
|
Path tied to a module spec. |
|
|
Can be read and exposes the resource reader children. |
|
|
""" |
|
|
|
|
|
def __init__(self, spec, reader): |
|
|
self._spec = spec |
|
|
self._reader = reader |
|
|
|
|
|
def iterdir(self): |
|
|
if not self._reader: |
|
|
return iter(()) |
|
|
return iter( |
|
|
CompatibilityFiles.ChildPath(self._reader, path) |
|
|
for path in self._reader.contents() |
|
|
) |
|
|
|
|
|
def is_file(self): |
|
|
return False |
|
|
|
|
|
is_dir = is_file |
|
|
|
|
|
def joinpath(self, other): |
|
|
if not self._reader: |
|
|
return CompatibilityFiles.OrphanPath(other) |
|
|
return CompatibilityFiles.ChildPath(self._reader, other) |
|
|
|
|
|
@property |
|
|
def name(self): |
|
|
return self._spec.name |
|
|
|
|
|
def open(self, mode='r', *args, **kwargs): |
|
|
return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs) |
|
|
|
|
|
class ChildPath(abc.Traversable): |
|
|
""" |
|
|
Path tied to a resource reader child. |
|
|
Can be read but doesn't expose any meaningful children. |
|
|
""" |
|
|
|
|
|
def __init__(self, reader, name): |
|
|
self._reader = reader |
|
|
self._name = name |
|
|
|
|
|
def iterdir(self): |
|
|
return iter(()) |
|
|
|
|
|
def is_file(self): |
|
|
return self._reader.is_resource(self.name) |
|
|
|
|
|
def is_dir(self): |
|
|
return not self.is_file() |
|
|
|
|
|
def joinpath(self, other): |
|
|
return CompatibilityFiles.OrphanPath(self.name, other) |
|
|
|
|
|
@property |
|
|
def name(self): |
|
|
return self._name |
|
|
|
|
|
def open(self, mode='r', *args, **kwargs): |
|
|
return _io_wrapper( |
|
|
self._reader.open_resource(self.name), mode, *args, **kwargs |
|
|
) |
|
|
|
|
|
class OrphanPath(abc.Traversable): |
|
|
""" |
|
|
Orphan path, not tied to a module spec or resource reader. |
|
|
Can't be read and doesn't expose any meaningful children. |
|
|
""" |
|
|
|
|
|
def __init__(self, *path_parts): |
|
|
if len(path_parts) < 1: |
|
|
raise ValueError('Need at least one path part to construct a path') |
|
|
self._path = path_parts |
|
|
|
|
|
def iterdir(self): |
|
|
return iter(()) |
|
|
|
|
|
def is_file(self): |
|
|
return False |
|
|
|
|
|
is_dir = is_file |
|
|
|
|
|
def joinpath(self, other): |
|
|
return CompatibilityFiles.OrphanPath(*self._path, other) |
|
|
|
|
|
@property |
|
|
def name(self): |
|
|
return self._path[-1] |
|
|
|
|
|
def open(self, mode='r', *args, **kwargs): |
|
|
raise FileNotFoundError("Can't open orphan path") |
|
|
|
|
|
def __init__(self, spec): |
|
|
self.spec = spec |
|
|
|
|
|
@property |
|
|
def _reader(self): |
|
|
with suppress(AttributeError): |
|
|
return self.spec.loader.get_resource_reader(self.spec.name) |
|
|
|
|
|
def _native(self): |
|
|
""" |
|
|
Return the native reader if it supports files(). |
|
|
""" |
|
|
reader = self._reader |
|
|
return reader if hasattr(reader, 'files') else self |
|
|
|
|
|
def __getattr__(self, attr): |
|
|
return getattr(self._reader, attr) |
|
|
|
|
|
def files(self): |
|
|
return CompatibilityFiles.SpecPath(self.spec, self._reader) |
|
|
|
|
|
|
|
|
def wrap_spec(package): |
|
|
""" |
|
|
Construct a package spec with traversable compatibility |
|
|
on the spec/loader/reader. |
|
|
""" |
|
|
return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader) |
|
|
|