| | import ast |
| | import contextlib |
| | import logging |
| | import os |
| | import re |
| | from typing import ClassVar, Sequence |
| |
|
| | import panel as pn |
| |
|
| | from .core import OpenFile, get_filesystem_class, split_protocol |
| | from .registry import known_implementations |
| |
|
| | pn.extension() |
| | logger = logging.getLogger("fsspec.gui") |
| |
|
| |
|
| | class SigSlot: |
| | """Signal-slot mixin, for Panel event passing |
| | |
| | Include this class in a widget manager's superclasses to be able to |
| | register events and callbacks on Panel widgets managed by that class. |
| | |
| | The method ``_register`` should be called as widgets are added, and external |
| | code should call ``connect`` to associate callbacks. |
| | |
| | By default, all signals emit a DEBUG logging statement. |
| | """ |
| |
|
| | |
| | |
| | signals: ClassVar[Sequence[str]] = [] |
| | |
| | slots: ClassVar[Sequence[str]] = [] |
| |
|
| | |
| |
|
| | def __init__(self): |
| | self._ignoring_events = False |
| | self._sigs = {} |
| | self._map = {} |
| | self._setup() |
| |
|
| | def _setup(self): |
| | """Create GUI elements and register signals""" |
| | self.panel = pn.pane.PaneBase() |
| | |
| |
|
| | def _register( |
| | self, widget, name, thing="value", log_level=logging.DEBUG, auto=False |
| | ): |
| | """Watch the given attribute of a widget and assign it a named event |
| | |
| | This is normally called at the time a widget is instantiated, in the |
| | class which owns it. |
| | |
| | Parameters |
| | ---------- |
| | widget : pn.layout.Panel or None |
| | Widget to watch. If None, an anonymous signal not associated with |
| | any widget. |
| | name : str |
| | Name of this event |
| | thing : str |
| | Attribute of the given widget to watch |
| | log_level : int |
| | When the signal is triggered, a logging event of the given level |
| | will be fired in the dfviz logger. |
| | auto : bool |
| | If True, automatically connects with a method in this class of the |
| | same name. |
| | """ |
| | if name not in self.signals: |
| | raise ValueError(f"Attempt to assign an undeclared signal: {name}") |
| | self._sigs[name] = { |
| | "widget": widget, |
| | "callbacks": [], |
| | "thing": thing, |
| | "log": log_level, |
| | } |
| | wn = "-".join( |
| | [ |
| | getattr(widget, "name", str(widget)) if widget is not None else "none", |
| | thing, |
| | ] |
| | ) |
| | self._map[wn] = name |
| | if widget is not None: |
| | widget.param.watch(self._signal, thing, onlychanged=True) |
| | if auto and hasattr(self, name): |
| | self.connect(name, getattr(self, name)) |
| |
|
| | def _repr_mimebundle_(self, *args, **kwargs): |
| | """Display in a notebook or a server""" |
| | try: |
| | return self.panel._repr_mimebundle_(*args, **kwargs) |
| | except (ValueError, AttributeError): |
| | raise NotImplementedError("Panel does not seem to be set " "up properly") |
| |
|
| | def connect(self, signal, slot): |
| | """Associate call back with given event |
| | |
| | The callback must be a function which takes the "new" value of the |
| | watched attribute as the only parameter. If the callback return False, |
| | this cancels any further processing of the given event. |
| | |
| | Alternatively, the callback can be a string, in which case it means |
| | emitting the correspondingly-named event (i.e., connect to self) |
| | """ |
| | self._sigs[signal]["callbacks"].append(slot) |
| |
|
| | def _signal(self, event): |
| | """This is called by a an action on a widget |
| | |
| | Within an self.ignore_events context, nothing happens. |
| | |
| | Tests can execute this method by directly changing the values of |
| | widget components. |
| | """ |
| | if not self._ignoring_events: |
| | wn = "-".join([event.obj.name, event.name]) |
| | if wn in self._map and self._map[wn] in self._sigs: |
| | self._emit(self._map[wn], event.new) |
| |
|
| | @contextlib.contextmanager |
| | def ignore_events(self): |
| | """Temporarily turn off events processing in this instance |
| | |
| | (does not propagate to children) |
| | """ |
| | self._ignoring_events = True |
| | try: |
| | yield |
| | finally: |
| | self._ignoring_events = False |
| |
|
| | def _emit(self, sig, value=None): |
| | """An event happened, call its callbacks |
| | |
| | This method can be used in tests to simulate message passing without |
| | directly changing visual elements. |
| | |
| | Calling of callbacks will halt whenever one returns False. |
| | """ |
| | logger.log(self._sigs[sig]["log"], f"{sig}: {value}") |
| | for callback in self._sigs[sig]["callbacks"]: |
| | if isinstance(callback, str): |
| | self._emit(callback) |
| | else: |
| | try: |
| | |
| | ret = callback(value) |
| | if ret is False: |
| | break |
| | except Exception as e: |
| | logger.exception( |
| | "Exception (%s) while executing callback for signal: %s" |
| | "" % (e, sig) |
| | ) |
| |
|
| | def show(self, threads=False): |
| | """Open a new browser tab and display this instance's interface""" |
| | self.panel.show(threads=threads, verbose=False) |
| | return self |
| |
|
| |
|
| | class SingleSelect(SigSlot): |
| | """A multiselect which only allows you to select one item for an event""" |
| |
|
| | signals = ["_selected", "selected"] |
| | slots = ["set_options", "set_selection", "add", "clear", "select"] |
| |
|
| | def __init__(self, **kwargs): |
| | self.kwargs = kwargs |
| | super().__init__() |
| |
|
| | def _setup(self): |
| | self.panel = pn.widgets.MultiSelect(**self.kwargs) |
| | self._register(self.panel, "_selected", "value") |
| | self._register(None, "selected") |
| | self.connect("_selected", self.select_one) |
| |
|
| | def _signal(self, *args, **kwargs): |
| | super()._signal(*args, **kwargs) |
| |
|
| | def select_one(self, *_): |
| | with self.ignore_events(): |
| | val = [self.panel.value[-1]] if self.panel.value else [] |
| | self.panel.value = val |
| | self._emit("selected", self.panel.value) |
| |
|
| | def set_options(self, options): |
| | self.panel.options = options |
| |
|
| | def clear(self): |
| | self.panel.options = [] |
| |
|
| | @property |
| | def value(self): |
| | return self.panel.value |
| |
|
| | def set_selection(self, selection): |
| | self.panel.value = [selection] |
| |
|
| |
|
| | class FileSelector(SigSlot): |
| | """Panel-based graphical file selector widget |
| | |
| | Instances of this widget are interactive and can be displayed in jupyter by having |
| | them as the output of a cell, or in a separate browser tab using ``.show()``. |
| | """ |
| |
|
| | signals = [ |
| | "protocol_changed", |
| | "selection_changed", |
| | "directory_entered", |
| | "home_clicked", |
| | "up_clicked", |
| | "go_clicked", |
| | "filters_changed", |
| | ] |
| | slots = ["set_filters", "go_home"] |
| |
|
| | def __init__(self, url=None, filters=None, ignore=None, kwargs=None): |
| | """ |
| | |
| | Parameters |
| | ---------- |
| | url : str (optional) |
| | Initial value of the URL to populate the dialog; should include protocol |
| | filters : list(str) (optional) |
| | File endings to include in the listings. If not included, all files are |
| | allowed. Does not affect directories. |
| | If given, the endings will appear as checkboxes in the interface |
| | ignore : list(str) (optional) |
| | Regex(s) of file basename patterns to ignore, e.g., "\\." for typical |
| | hidden files on posix |
| | kwargs : dict (optional) |
| | To pass to file system instance |
| | """ |
| | if url: |
| | self.init_protocol, url = split_protocol(url) |
| | else: |
| | self.init_protocol, url = "file", os.getcwd() |
| | self.init_url = url |
| | self.init_kwargs = (kwargs if isinstance(kwargs, str) else str(kwargs)) or "{}" |
| | self.filters = filters |
| | self.ignore = [re.compile(i) for i in ignore or []] |
| | self._fs = None |
| | super().__init__() |
| |
|
| | def _setup(self): |
| | self.url = pn.widgets.TextInput( |
| | name="url", |
| | value=self.init_url, |
| | align="end", |
| | sizing_mode="stretch_width", |
| | width_policy="max", |
| | ) |
| | self.protocol = pn.widgets.Select( |
| | options=sorted(known_implementations), |
| | value=self.init_protocol, |
| | name="protocol", |
| | align="center", |
| | ) |
| | self.kwargs = pn.widgets.TextInput( |
| | name="kwargs", value=self.init_kwargs, align="center" |
| | ) |
| | self.go = pn.widgets.Button(name="⇨", align="end", width=45) |
| | self.main = SingleSelect(size=10) |
| | self.home = pn.widgets.Button(name="🏠", width=40, height=30, align="end") |
| | self.up = pn.widgets.Button(name="‹", width=30, height=30, align="end") |
| |
|
| | self._register(self.protocol, "protocol_changed", auto=True) |
| | self._register(self.go, "go_clicked", "clicks", auto=True) |
| | self._register(self.up, "up_clicked", "clicks", auto=True) |
| | self._register(self.home, "home_clicked", "clicks", auto=True) |
| | self._register(None, "selection_changed") |
| | self.main.connect("selected", self.selection_changed) |
| | self._register(None, "directory_entered") |
| | self.prev_protocol = self.protocol.value |
| | self.prev_kwargs = self.storage_options |
| |
|
| | self.filter_sel = pn.widgets.CheckBoxGroup( |
| | value=[], options=[], inline=False, align="end", width_policy="min" |
| | ) |
| | self._register(self.filter_sel, "filters_changed", auto=True) |
| |
|
| | self.panel = pn.Column( |
| | pn.Row(self.protocol, self.kwargs), |
| | pn.Row(self.home, self.up, self.url, self.go, self.filter_sel), |
| | self.main.panel, |
| | ) |
| | self.set_filters(self.filters) |
| | self.go_clicked() |
| |
|
| | def set_filters(self, filters=None): |
| | self.filters = filters |
| | if filters: |
| | self.filter_sel.options = filters |
| | self.filter_sel.value = filters |
| | else: |
| | self.filter_sel.options = [] |
| | self.filter_sel.value = [] |
| |
|
| | @property |
| | def storage_options(self): |
| | """Value of the kwargs box as a dictionary""" |
| | return ast.literal_eval(self.kwargs.value) or {} |
| |
|
| | @property |
| | def fs(self): |
| | """Current filesystem instance""" |
| | if self._fs is None: |
| | cls = get_filesystem_class(self.protocol.value) |
| | self._fs = cls(**self.storage_options) |
| | return self._fs |
| |
|
| | @property |
| | def urlpath(self): |
| | """URL of currently selected item""" |
| | return ( |
| | (f"{self.protocol.value}://{self.main.value[0]}") |
| | if self.main.value |
| | else None |
| | ) |
| |
|
| | def open_file(self, mode="rb", compression=None, encoding=None): |
| | """Create OpenFile instance for the currently selected item |
| | |
| | For example, in a notebook you might do something like |
| | |
| | .. code-block:: |
| | |
| | [ ]: sel = FileSelector(); sel |
| | |
| | # user selects their file |
| | |
| | [ ]: with sel.open_file('rb') as f: |
| | ... out = f.read() |
| | |
| | Parameters |
| | ---------- |
| | mode: str (optional) |
| | Open mode for the file. |
| | compression: str (optional) |
| | The interact with the file as compressed. Set to 'infer' to guess |
| | compression from the file ending |
| | encoding: str (optional) |
| | If using text mode, use this encoding; defaults to UTF8. |
| | """ |
| | if self.urlpath is None: |
| | raise ValueError("No file selected") |
| | return OpenFile(self.fs, self.urlpath, mode, compression, encoding) |
| |
|
| | def filters_changed(self, values): |
| | self.filters = values |
| | self.go_clicked() |
| |
|
| | def selection_changed(self, *_): |
| | if self.urlpath is None: |
| | return |
| | if self.fs.isdir(self.urlpath): |
| | self.url.value = self.fs._strip_protocol(self.urlpath) |
| | self.go_clicked() |
| |
|
| | def go_clicked(self, *_): |
| | if ( |
| | self.prev_protocol != self.protocol.value |
| | or self.prev_kwargs != self.storage_options |
| | ): |
| | self._fs = None |
| | self.prev_protocol = self.protocol.value |
| | self.prev_kwargs = self.storage_options |
| | listing = sorted( |
| | self.fs.ls(self.url.value, detail=True), key=lambda x: x["name"] |
| | ) |
| | listing = [ |
| | l |
| | for l in listing |
| | if not any(i.match(l["name"].rsplit("/", 1)[-1]) for i in self.ignore) |
| | ] |
| | folders = { |
| | "📁 " + o["name"].rsplit("/", 1)[-1]: o["name"] |
| | for o in listing |
| | if o["type"] == "directory" |
| | } |
| | files = { |
| | "📄 " + o["name"].rsplit("/", 1)[-1]: o["name"] |
| | for o in listing |
| | if o["type"] == "file" |
| | } |
| | if self.filters: |
| | files = { |
| | k: v |
| | for k, v in files.items() |
| | if any(v.endswith(ext) for ext in self.filters) |
| | } |
| | self.main.set_options(dict(**folders, **files)) |
| |
|
| | def protocol_changed(self, *_): |
| | self._fs = None |
| | self.main.options = [] |
| | self.url.value = "" |
| |
|
| | def home_clicked(self, *_): |
| | self.protocol.value = self.init_protocol |
| | self.kwargs.value = self.init_kwargs |
| | self.url.value = self.init_url |
| | self.go_clicked() |
| |
|
| | def up_clicked(self, *_): |
| | self.url.value = self.fs._parent(self.url.value) |
| | self.go_clicked() |
| |
|