Spaces:
Running
Running
| import panel as pn | |
| import holoviews as hv | |
| from utils.app_context import AppContext | |
| import pandas as pd | |
| import warnings | |
| import hvplot.pandas | |
| import holoviews.operation.datashader as hd | |
| from utils.DashboardClasses import ( | |
| MainHeader, | |
| MainArea, | |
| OutputBox, | |
| WarningBox, | |
| HelpBox, | |
| Footer, | |
| WarningHandler, | |
| FloatingPlot, | |
| PlotsContainer, | |
| ) | |
| from stingray import AveragedPowerspectrum | |
| colors = [ | |
| "#1f77b4", | |
| "#ff7f0e", | |
| "#2ca02c", | |
| "#d62728", | |
| "#9467bd", | |
| "#8c564b", | |
| "#e377c2", | |
| "#7f7f7f", | |
| "#bcbd22", | |
| "#17becf", | |
| "#aec7e8", | |
| "#ffbb78", | |
| "#98df8a", | |
| "#ff9896", | |
| "#c5b0d5", | |
| "#c49c94", | |
| "#f7b6d2", | |
| "#c7c7c7", | |
| "#dbdb8d", | |
| "#9edae5", | |
| ] | |
| # Create a warning handler | |
| def create_warning_handler(): | |
| warning_handler = WarningHandler() | |
| warnings.showwarning = warning_handler.warn | |
| return warning_handler | |
| """ Header Section """ | |
| def create_quicklook_avg_powerspectrum_header(context: AppContext): | |
| home_heading_input = pn.widgets.TextInput( | |
| name="Heading", value="QuickLook Averaged Power Spectrum" | |
| ) | |
| home_subheading_input = pn.widgets.TextInput(name="Subheading", value="") | |
| return MainHeader(heading=home_heading_input, subheading=home_subheading_input) | |
| """ Output Box Section """ | |
| def create_loadingdata_output_box(content): | |
| return OutputBox(output_content=content) | |
| """ Warning Box Section """ | |
| def create_loadingdata_warning_box(content): | |
| return WarningBox(warning_content=content) | |
| """ Float Panel """ | |
| def create_floatpanel_area(content, title): | |
| return FloatingPlot(content=content, title=title) | |
| """ Main Area Section """ | |
| def create_avg_powerspectrum_tab( | |
| context: AppContext, | |
| warning_handler, | |
| ): | |
| # Define Widgets | |
| event_list_dropdown = pn.widgets.Select( | |
| name="Select Event List(s)", | |
| options={name: i for i, (name, event) in enumerate(context.state.get_event_data())}, | |
| ) | |
| dt_input = pn.widgets.FloatInput( | |
| name="Select dt", | |
| value=1.0, | |
| step=0.0001, | |
| start=0.0000000001, | |
| end=1000.0, | |
| ) | |
| norm_select = pn.widgets.Select( | |
| name="Normalization", | |
| options=["frac", "leahy", "abs", "none"], | |
| value="leahy", | |
| ) | |
| segment_size_input = pn.widgets.FloatInput(name="Segment Size", value=10, step=1) | |
| multi_event_select = pn.widgets.MultiSelect( | |
| name="Or Select Event List(s) to Combine", | |
| options={name: i for i, (name, event) in enumerate(context.state.get_event_data())}, | |
| size=8, | |
| ) | |
| floatpanel_plots_checkbox = pn.widgets.Checkbox( | |
| name="Add Plot to FloatingPanel", value=True | |
| ) | |
| dataframe_checkbox = pn.widgets.Checkbox( | |
| name="Add DataFrame to FloatingPanel", value=False | |
| ) | |
| rasterize_checkbox = pn.widgets.Checkbox(name="Rasterize Plots", value=True) | |
| # New Checkboxes for Rebinning | |
| linear_rebin_checkbox = pn.widgets.Checkbox(name="Linear Rebinning", value=False) | |
| log_rebin_checkbox = pn.widgets.Checkbox(name="Logarithmic Rebinning", value=False) | |
| rebin_with_original_checkbox = pn.widgets.Checkbox( | |
| name="Plot Rebin with Original", value=False | |
| ) | |
| # Input for Rebin Size | |
| rebin_size_input = pn.widgets.FloatInput( | |
| name="Rebin Size", | |
| value=0.1, | |
| step=0.000001, | |
| start=0.01, | |
| end=1000.0, | |
| ) | |
| time_info_pane = pn.pane.Markdown( | |
| "Select an event list to see time range", width=600 | |
| ) | |
| # Update time info when an event list is selected | |
| def update_time_info(event): | |
| selected_index = event_list_dropdown.value | |
| if selected_index is not None: | |
| event_list_name = context.state.get_event_data()[selected_index][0] | |
| event_list = context.state.get_event_data()[selected_index][1] | |
| start_time = event_list.time[0] | |
| end_time = event_list.time[-1] | |
| time_info_pane.object = ( | |
| f"**Event List:** {event_list_name} \n" | |
| f"**Start Time:** {start_time} \n" | |
| f"**End Time:** {end_time}" | |
| ) | |
| else: | |
| time_info_pane.object = "Select an event list to see time range" | |
| # Internal functions to encapsulate functionality | |
| def create_dataframe(selected_event_list_index, dt, norm, segment_size): | |
| if selected_event_list_index is not None: | |
| event_list = context.state.get_event_data()[selected_event_list_index][1] | |
| # Use spectrum service to create averaged power spectrum | |
| result = context.services.spectrum.create_averaged_power_spectrum( | |
| event_list=event_list, | |
| dt=dt, | |
| segment_size=segment_size, | |
| norm=norm | |
| ) | |
| if not result["success"]: | |
| output_box_container[:] = [ | |
| create_loadingdata_output_box(f"Error: {result['message']}") | |
| ] | |
| return None, None | |
| ps = result["data"] | |
| # Use export service to convert to DataFrame | |
| df_result = context.services.export.to_dataframe_power_spectrum(ps) | |
| if df_result["success"]: | |
| return df_result["data"], ps | |
| else: | |
| output_box_container[:] = [ | |
| create_loadingdata_output_box(f"Error: {df_result['message']}") | |
| ] | |
| return None, None | |
| return None, None | |
| def create_holoviews_panes(plot): | |
| return pn.pane.HoloViews(plot, width=600, height=600) | |
| def create_holoviews_plots(ps, title, dt, norm, segment_size, color_key=None): | |
| label = f"{title} (dt={dt}, norm={norm}, segment={segment_size})" | |
| plot = hv.Curve((ps.freq, ps.power), label=label).opts( | |
| xlabel="Frequency (Hz)", | |
| ylabel="Power", | |
| width=600, | |
| height=600, | |
| shared_axes=False, | |
| ) | |
| if color_key: | |
| if rasterize_checkbox.value: | |
| return hd.rasterize(plot, line_width=3, pixel_ratio=2).opts( | |
| tools=["hover"], | |
| cmap=[color_key], | |
| width=600, | |
| height=600, | |
| colorbar=True, | |
| ) | |
| else: | |
| return plot | |
| else: | |
| if rasterize_checkbox.value: | |
| return hd.rasterize(plot, line_width=3, pixel_ratio=2).opts( | |
| tools=["hover"], | |
| width=600, | |
| height=600, | |
| colorbar=True, | |
| cmap="Viridis", | |
| ) | |
| else: | |
| return plot | |
| def rebin_powerspectrum(ps): | |
| rebin_size = rebin_size_input.value | |
| if linear_rebin_checkbox.value: | |
| return ps.rebin(rebin_size, method="mean") | |
| elif log_rebin_checkbox.value: | |
| return ps.rebin_log(f=rebin_size) | |
| return None | |
| def create_holoviews_plots_no_colorbar( | |
| ps, title, dt, norm, segment_size, color_key=None | |
| ): | |
| label = f"{title} (dt={dt}, norm={norm}, segment={segment_size})" | |
| plot = hv.Curve((ps.freq, ps.power), label=label).opts( | |
| xlabel="Frequency (Hz)", | |
| ylabel="Power", | |
| width=600, | |
| height=600, | |
| shared_axes=False, | |
| ) | |
| if color_key: | |
| if rasterize_checkbox.value: | |
| return hd.rasterize(plot, line_width=3, pixel_ratio=2).opts( | |
| tools=["hover"], | |
| cmap=[color_key], | |
| width=600, | |
| height=600, | |
| colorbar=False, | |
| ) | |
| else: | |
| return plot | |
| else: | |
| if rasterize_checkbox.value: | |
| return hd.rasterize(plot, line_width=3, pixel_ratio=2).opts( | |
| tools=["hover"], | |
| width=600, | |
| height=600, | |
| colorbar=False, | |
| cmap="Viridis", | |
| ) | |
| else: | |
| return plot | |
| def create_dataframe_panes(df, title, dt, norm, segment_size): | |
| return pn.FlexBox( | |
| pn.pane.Markdown( | |
| f"**{title} (dt={dt}, norm={norm}, segment={segment_size})**" | |
| ), | |
| pn.pane.DataFrame(df, width=600, height=600), | |
| align_items="center", | |
| justify_content="center", | |
| flex_wrap="nowrap", | |
| flex_direction="column", | |
| ) | |
| def generate_avg_powerspectrum(event=None): | |
| if not context.state.get_event_data(): | |
| context.update_container('output_box', | |
| create_loadingdata_output_box("No loaded event data available.") | |
| ) | |
| return | |
| selected_event_list_index = event_list_dropdown.value | |
| if selected_event_list_index is None: | |
| context.update_container('output_box', | |
| create_loadingdata_output_box("No event list selected.") | |
| ) | |
| return | |
| dt = dt_input.value | |
| norm = norm_select.value | |
| segment_size = segment_size_input.value | |
| df, ps = create_dataframe(selected_event_list_index, dt, norm, segment_size) | |
| if df is not None: | |
| plot_title = f"Averaged Power Spectrum for {context.state.get_event_data()[selected_event_list_index][0]}" | |
| plot_hv = create_holoviews_plots( | |
| ps, title=plot_title, dt=dt, norm=norm, segment_size=segment_size | |
| ) | |
| holoviews_output = create_holoviews_panes(plot_hv) | |
| if floatpanel_plots_checkbox.value: | |
| context.append_to_container('float_panel', | |
| create_floatpanel_area(content=holoviews_output, title=plot_title) | |
| ) | |
| else: | |
| markdown_content = f"## {plot_title}" | |
| context.append_to_container('plots', | |
| pn.FlexBox( | |
| pn.pane.Markdown(markdown_content), | |
| holoviews_output, | |
| align_items="center", | |
| justify_content="center", | |
| flex_wrap="nowrap", | |
| flex_direction="column", | |
| ) | |
| ) | |
| context.update_container('output_box', | |
| create_loadingdata_output_box( | |
| "Averaged Power Spectrum generated successfully." | |
| ) | |
| ) | |
| else: | |
| context.update_container('output_box', | |
| create_loadingdata_output_box( | |
| "Failed to create averaged power spectrum." | |
| ) | |
| ) | |
| def show_dataframe(event=None): | |
| if not context.state.get_event_data(): | |
| context.update_container('output_box', | |
| create_loadingdata_output_box("No loaded event data available.") | |
| ) | |
| return | |
| selected_event_list_index = event_list_dropdown.value | |
| if selected_event_list_index is None: | |
| context.update_container('output_box', | |
| create_loadingdata_output_box("No event list selected.") | |
| ) | |
| return | |
| dt = dt_input.value | |
| norm = norm_select.value | |
| segment_size = segment_size_input.value | |
| df, ps = create_dataframe(selected_event_list_index, dt, norm, segment_size) | |
| if df is not None: | |
| dataframe_output = create_dataframe_panes( | |
| df, | |
| f"{context.state.get_event_data()[selected_event_list_index][0]}", | |
| dt, | |
| norm, | |
| segment_size, | |
| ) | |
| if dataframe_checkbox.value: | |
| context.append_to_container('float_panel', | |
| create_floatpanel_area( | |
| content=dataframe_output, | |
| title=f"DataFrame for {context.state.get_event_data()[selected_event_list_index][0]}", | |
| ) | |
| ) | |
| else: | |
| context.append_to_container('plots',dataframe_output) | |
| context.update_container('output_box', | |
| create_loadingdata_output_box("DataFrame generated successfully.") | |
| ) | |
| else: | |
| context.update_container('output_box', | |
| create_loadingdata_output_box("Failed to create dataframe.") | |
| ) | |
| def combine_selected_plots(event=None): | |
| selected_event_list_indices = multi_event_select.value | |
| if not selected_event_list_indices: | |
| context.update_container('output_box', | |
| create_loadingdata_output_box("No event lists selected.") | |
| ) | |
| return | |
| combined_plots = [] | |
| combined_title = [] | |
| for index in selected_event_list_indices: | |
| dt = dt_input.value | |
| norm = norm_select.value | |
| segment_size = segment_size_input.value | |
| df, ps = create_dataframe(index, dt, norm, segment_size) | |
| if df is not None: | |
| event_list_name = context.state.get_event_data()[index][0] | |
| plot_hv = create_holoviews_plots_no_colorbar( | |
| ps, | |
| title=event_list_name, | |
| dt=dt, | |
| norm=norm, | |
| segment_size=segment_size, | |
| ) | |
| combined_plots.append(plot_hv) | |
| combined_title.append(event_list_name) | |
| if combined_plots: | |
| combined_plot = hv.Overlay(combined_plots).opts(shared_axes=False) | |
| combined_pane = create_holoviews_panes(combined_plot) | |
| combined_title_str = " + ".join(combined_title) | |
| if floatpanel_plots_checkbox.value: | |
| context.append_to_container('float_panel', | |
| create_floatpanel_area( | |
| content=combined_pane, title=combined_title_str | |
| ) | |
| ) | |
| else: | |
| markdown_content = f"## {combined_title_str}" | |
| context.append_to_container('plots', | |
| pn.FlexBox( | |
| pn.pane.Markdown(markdown_content), | |
| combined_pane, | |
| align_items="center", | |
| justify_content="center", | |
| flex_wrap="nowrap", | |
| flex_direction="column", | |
| ) | |
| ) | |
| context.update_container('output_box', | |
| create_loadingdata_output_box("Combined plots generated successfully.") | |
| ) | |
| else: | |
| context.update_container('output_box', | |
| create_loadingdata_output_box("Failed to combine plots.") | |
| ) | |
| generate_powerspectrum_button = pn.widgets.Button( | |
| name="Generate Averaged Power Spectrum", button_type="primary" | |
| ) | |
| generate_powerspectrum_button.on_click(generate_avg_powerspectrum) | |
| combine_plots_button = pn.widgets.Button( | |
| name="Combine Selected Plots", button_type="success" | |
| ) | |
| combine_plots_button.on_click(combine_selected_plots) | |
| show_dataframe_button = pn.widgets.Button( | |
| name="Show DataFrame", button_type="primary" | |
| ) | |
| show_dataframe_button.on_click(show_dataframe) | |
| tab_content = pn.Column( | |
| event_list_dropdown, | |
| dt_input, | |
| norm_select, | |
| segment_size_input, | |
| multi_event_select, | |
| floatpanel_plots_checkbox, | |
| dataframe_checkbox, | |
| rasterize_checkbox, | |
| pn.Row( | |
| generate_powerspectrum_button, show_dataframe_button, combine_plots_button | |
| ), | |
| ) | |
| return tab_content | |
| def create_quicklook_avg_powerspectrum_main_area(context: AppContext): | |
| warning_handler = create_warning_handler() | |
| tabs_content = { | |
| "Averaged Power Spectrum": create_avg_powerspectrum_tab( | |
| context=context, | |
| warning_handler=warning_handler, | |
| ), | |
| } | |
| return MainArea(tabs_content=tabs_content) | |
| def create_quicklook_avg_powerspectrum_area(): | |
| """ | |
| Create the plots area for the quicklook averaged power spectrum tab. | |
| Returns: | |
| PlotsContainer: An instance of PlotsContainer with the plots for the quicklook averaged power spectrum tab. | |
| """ | |
| return PlotsContainer() | |