# Standard Imports
import os
import stat
import copy
import logging
import numpy as np
import warnings
import tempfile
import traceback
import requests
from bokeh.models import Tooltip
# HoloViz Imports
import panel as pn
# Stingray Imports
from stingray.events import EventList
from stingray import Lightcurve
# Dashboard Classes and State Management Imports
from utils.state_manager import state_manager
from utils.app_context import AppContext
from utils.error_handler import ErrorHandler
from utils.error_recovery import ErrorRecoveryPanel, show_file_error, show_validation_error, show_success
from utils.DashboardClasses import (
MainHeader,
MainArea,
OutputBox,
WarningBox,
HelpBox,
WarningHandler,
PlotsContainer,
)
# Strings Imports
# Path to the topmost directory for loaded data
loaded_data_path = os.path.join(os.getcwd(), "files", "loaded-data")
# Create the loaded-data directory if it doesn't exist
os.makedirs(loaded_data_path, exist_ok=True)
def create_warning_handler():
"""
Create an instance of WarningHandler and redirect warnings to this custom handler.
Returns:
WarningHandler: An instance of WarningHandler to handle warnings.
Side effects:
Overrides the default warning handler with a custom one.
Example:
>>> warning_handler = create_warning_handler()
>>> warning_handler.warn("Test warning", category=RuntimeWarning)
"""
warning_handler = WarningHandler()
warnings.showwarning = warning_handler.warn
return warning_handler
""" Header Section """
def create_loadingdata_header(context: AppContext):
"""
Create the header for the data loading section.
Args:
context (AppContext): The application context containing containers and state.
Returns:
MainHeader: An instance of MainHeader with the specified heading.
Example:
>>> header = create_loadingdata_header(context)
>>> header.heading.value
'Data Ingestion'
"""
home_heading_input = pn.widgets.TextInput(name="Heading", value="Data Ingestion")
return MainHeader(heading=home_heading_input)
""" Output Box Section """
def create_loadingdata_output_box(content):
"""
Create an output box to display messages.
Args:
content (str): The content to be displayed in the output box.
Returns:
OutputBox: An instance of OutputBox with the specified content.
Example:
>>> output_box = create_loadingdata_output_box("File loaded successfully.")
>>> output_box.output_content
'File loaded successfully.'
"""
return OutputBox(output_content=content)
""" Warning Box Section """
def create_loadingdata_warning_box(content):
"""
Create a warning box to display warnings.
Args:
content (str): The content to be displayed in the warning box.
Returns:
WarningBox: An instance of WarningBox with the specified content.
Example:
>>> warning_box = create_loadingdata_warning_box("Invalid file format.")
>>> warning_box.warning_content
'Invalid file format.'
"""
return WarningBox(warning_content=content)
def read_event_data(
event,
file_selector,
filename_input,
format_input,
format_checkbox,
rmf_file_dropper,
additional_columns_input,
use_lazy_loading,
use_preview_mode,
preview_duration_input,
context: AppContext,
warning_handler,
):
"""
Load event data from selected files with extended EventList.read functionality,
supporting FileDropper for RMF files and additional columns.
Args:
event: The event object triggering the function.
file_selector: The file selector widget.
filename_input: Text input for filenames.
format_input: Text input for file formats.
format_checkbox: Checkbox for default format.
rmf_file_dropper: File dropper for RMF files.
additional_columns_input: Text input for additional columns.
context (AppContext): The application context containing containers and state.
warning_handler: The handler for warnings.
"""
# Validation for required inputs
if not file_selector.value:
context.update_container('output_box',
create_loadingdata_output_box(
"No file selected. Please select a file to upload."
)
)
return
try:
# Parse file paths
file_paths = file_selector.value
filenames = (
[name.strip() for name in filename_input.value.split(",")]
if filename_input.value
else []
)
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Parsing file paths and names",
file_count=len(file_selector.value) if file_selector.value else 0
)
# Create retry callback
def retry_load():
load_event_lists_from_file(
event, file_selector, filename_input, format_input,
format_checkbox, rmf_file_dropper, additional_columns_input,
context, warning_handler
)
# Show error panel with retry option
error_panel = ErrorRecoveryPanel.create_error_panel(
error_message=user_msg,
error_type="error",
retry_callback=retry_load,
help_text="Check that file paths and filenames are correctly formatted (comma-separated if multiple)",
technical_details=tech_msg
)
context.update_container('warning_box', error_panel)
return
try:
# Parse file formats
formats = (
[fmt.strip() for fmt in format_input.value.split(",")]
if format_input.value
else []
)
# Use default format if checkbox is checked
if format_checkbox.value:
formats = ["ogip" for _ in range(len(file_paths))]
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Parsing file formats",
format_input=format_input.value if format_input.value else "None"
)
# Create retry callback
def retry_load():
load_event_lists_from_file(
event, file_selector, filename_input, format_input,
format_checkbox, rmf_file_dropper, additional_columns_input,
context, warning_handler
)
# Show error panel with retry option
error_panel = ErrorRecoveryPanel.create_error_panel(
error_message=user_msg,
error_type="error",
retry_callback=retry_load,
help_text="Supported formats: ogip, hea, fits (comma-separated if multiple files)",
technical_details=tech_msg
)
context.update_container('warning_box', error_panel)
return
try:
# Retrieve the RMF file from FileDropper (if any)
if rmf_file_dropper.value:
rmf_file = list(rmf_file_dropper.value.values())[0]
# Save the file data to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".rmf") as tmp_file:
tmp_file.write(rmf_file)
tmp_file_path = tmp_file.name
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Processing RMF file",
has_rmf=bool(rmf_file_dropper.value)
)
# Create clear callback to reset RMF file
def clear_rmf():
rmf_file_dropper.value = None
context.update_container('warning_box',
pn.pane.Markdown("*RMF file cleared. Ready to try again.*")
)
# Show error panel with clear option
error_panel = ErrorRecoveryPanel.create_error_panel(
error_message=user_msg,
error_type="error",
clear_callback=clear_rmf,
help_text="Make sure the RMF file is valid and in the correct format (.rmf extension)",
technical_details=tech_msg
)
context.update_container('warning_box', error_panel)
return
try:
# Parse additional columns
additional_columns = (
[col.strip() for col in additional_columns_input.value.split(",")]
if additional_columns_input.value
else None
)
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Parsing additional columns",
columns_input=additional_columns_input.value if additional_columns_input.value else "None"
)
# Create retry callback
def retry_load():
load_event_lists_from_file(
event, file_selector, filename_input, format_input,
format_checkbox, rmf_file_dropper, additional_columns_input,
context, warning_handler
)
# Create clear callback
def clear_columns():
additional_columns_input.value = ""
context.update_container('warning_box',
pn.pane.Markdown("*Additional columns cleared. Ready to try again.*")
)
# Show error panel with retry and clear options
error_panel = ErrorRecoveryPanel.create_error_panel(
error_message=user_msg,
error_type="error",
retry_callback=retry_load,
clear_callback=clear_columns,
help_text="Provide column names as comma-separated values (e.g., 'PI, ENERGY')",
technical_details=tech_msg
)
context.update_container('warning_box', error_panel)
return
# Use data service to load files
loaded_files = []
for file_path, file_name, file_format in zip(file_paths, filenames, formats):
# Choose loading method based on mode selection
if use_preview_mode.value:
# Use preview mode for extremely large files
result = context.services.data.load_event_list_preview(
file_path=file_path,
name=file_name,
preview_duration=preview_duration_input.value,
rmf_file=tmp_file_path if rmf_file_dropper.value else None,
additional_columns=additional_columns
)
elif use_lazy_loading.value:
# Use lazy loading method (now supports RMF and additional columns!)
result = context.services.data.load_event_list_lazy(
file_path=file_path,
name=file_name,
safety_margin=0.5,
rmf_file=tmp_file_path if rmf_file_dropper.value else None,
additional_columns=additional_columns
)
else:
# Use standard loading method
result = context.services.data.load_event_list(
file_path=file_path,
name=file_name,
fmt=file_format,
rmf_file=tmp_file_path if rmf_file_dropper.value else None,
additional_columns=additional_columns
)
if result["success"]:
# Add loading method info to message
method_info = result.get("metadata", {}).get("method", "standard")
message = result["message"]
if method_info == "standard_risky":
message += " ⚠️ (Loaded despite memory risk)"
loaded_files.append(message)
else:
# If loading failed, show error panel with retry
def retry_load():
read_event_data(
event, file_selector, filename_input, format_input,
format_checkbox, rmf_file_dropper, additional_columns_input,
use_lazy_loading, context, warning_handler
)
error_panel = ErrorRecoveryPanel.create_error_panel(
error_message=result['message'],
error_type="error",
retry_callback=retry_load,
help_text="Check the file format and try again, or select different files",
technical_details=result.get('error', 'No technical details available')
)
context.update_container('output_box', error_panel)
return
# Show success panel
success_message = f"Successfully loaded {len(loaded_files)} file(s)"
details = "
".join([f"• {msg}" for msg in loaded_files])
success_panel = ErrorRecoveryPanel.create_success_panel(
success_message=success_message,
details=details
)
context.update_container('output_box', success_panel)
# Show warnings if any
if warning_handler.warnings:
context.update_container('warning_box',
create_loadingdata_warning_box("\n".join(warning_handler.warnings))
)
else:
context.update_container('warning_box', create_loadingdata_warning_box("No warnings."))
# Clear the warnings after displaying them
warning_handler.warnings.clear()
def save_loaded_files(
event,
filename_input,
format_input,
format_checkbox,
context: AppContext,
warning_handler,
):
"""
Save loaded event data to specified file formats.
Args:
event: The event object triggering the function.
filename_input (TextInput): The input widget for filenames.
format_input (TextInput): The input widget for formats.
format_checkbox (Checkbox): The checkbox for default format.
context (AppContext): The application context containing containers and state.
warning_handler (WarningHandler): The handler for warnings.
Side effects:
- Saves files to disk in the specified formats.
- Updates the output and warning containers with messages.
Restrictions:
- Requires that the number of filenames and formats matches the number of loaded files unless default format is used.
Example:
>>> save_loaded_files(event, filename_input, format_input, format_checkbox, context, warning_handler)
>>> os.path.exists('/path/to/saved/file.hdf5')
True # Assuming the file was saved successfully
"""
# Get all event data from state manager
all_event_data = context.state.get_event_data()
if not all_event_data:
context.update_container('output_box',
create_loadingdata_output_box("No files loaded to save.")
)
return
filenames = (
[name.strip() for name in filename_input.value.split(",")]
if filename_input.value
else [event[0] for event in all_event_data]
)
# TODO: ADD checks for valid formats
formats = (
[fmt.strip() for fmt in format_input.value.split(",")]
if format_input.value
else []
)
if format_checkbox.value:
formats = ["hdf5" for _ in range(len(all_event_data))]
if len(filenames) < len(all_event_data):
context.update_container('output_box',
create_loadingdata_output_box("Please specify names for all loaded files.")
)
return
if len(filenames) != len(all_event_data):
context.update_container('output_box',
create_loadingdata_output_box(
"Please ensure that the number of names matches the number of loaded files."
)
)
return
if len(formats) < len(all_event_data):
context.update_container('output_box',
create_loadingdata_output_box(
"Please specify formats for all loaded files or check the default format option."
)
)
return
saved_files = []
try:
for (loaded_name, event_list), file_name, file_format in zip(
all_event_data, filenames, formats
):
if os.path.exists(
os.path.join(loaded_data_path, f"{file_name}.{file_format}")
):
context.update_container('output_box',
create_loadingdata_output_box(
f"A file with the name '{file_name}' already exists. Please provide a different name."
)
)
return
save_path = os.path.join(loaded_data_path, f"{file_name}.{file_format}")
# Use export service to save the event list
result = context.services.export.export_event_list(
name=file_name,
file_path=save_path,
fmt=file_format
)
if result["success"]:
saved_files.append(result["message"])
else:
saved_files.append(f"Error saving '{file_name}': {result['message']}")
context.update_container('output_box',
create_loadingdata_output_box("\n".join(saved_files))
)
if warning_handler.warnings:
context.update_container('warning_box',
create_loadingdata_warning_box("\n".join(warning_handler.warnings))
)
else:
context.update_container('warning_box', create_loadingdata_warning_box("No warnings."))
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Saving loaded files",
save_directory=loaded_data_path
)
context.update_container('warning_box',
create_loadingdata_warning_box(f"Error: {user_msg}")
)
# Clear the warnings after displaying them
warning_handler.warnings.clear()
# TODO: ADD better comments, error handlling and docstrings
def delete_selected_files(
event,
file_selector,
context: AppContext,
warning_handler,
):
"""
Delete selected files from the file system.
Args:
event: The event object triggering the function.
file_selector (FileSelector): The file selector widget.
context (AppContext): The application context containing containers and state.
warning_handler (WarningHandler): The handler for warnings.
Side effects:
- Deletes files from the file system.
- Updates the output and warning containers with messages.
Restrictions:
- Cannot delete `.py` files for safety reasons.
Example:
>>> delete_selected_files(event, file_selector, context, warning_handler)
>>> os.path.exists('/path/to/deleted/file')
False # Assuming the file was deleted successfully
"""
# Define allowed extensions for deletion
allowed_extensions = {
".pkl",
".pickle",
".fits",
".evt",
".h5",
".hdf5",
".ecsv",
".txt",
".dat",
".csv",
".vot",
".tex",
".html",
".gz",
}
if not file_selector.value:
context.update_container('output_box',
create_loadingdata_output_box(
"No file selected. Please select a file to delete."
)
)
return
file_paths = file_selector.value
deleted_files = []
for file_path in file_paths:
if not any(file_path.endswith(ext) for ext in allowed_extensions):
deleted_files.append(
f"Cannot delete file '{file_path}': File type is not allowed for deletion."
)
continue
try:
# Change the file permissions to ensure it can be deleted
os.chmod(file_path, stat.S_IWUSR | stat.S_IREAD | stat.S_IWRITE)
os.remove(file_path)
deleted_files.append(f"File '{file_path}' deleted successfully.")
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Deleting file",
file_path=file_path,
log_level=logging.WARNING
)
deleted_files.append(f"Error deleting '{file_path}': {user_msg}")
context.update_container('output_box', create_loadingdata_output_box("\n".join(deleted_files)))
if warning_handler.warnings:
context.update_container('warning_box',
create_loadingdata_warning_box("\n".join(warning_handler.warnings))
)
else:
context.update_container('warning_box', create_loadingdata_warning_box("No warnings."))
warning_handler.warnings.clear()
# TODO: ADD better comments, error handlling and docstrings
def preview_loaded_files(
event,
context: AppContext,
warning_handler,
time_limit=10,
):
"""
Preview the loaded event data files and light curves.
Args:
event: The event object triggering the function.
context (AppContext): The application context containing containers and state.
warning_handler (WarningHandler): The handler for warnings.
time_limit (int): The number of time entries to preview.
Side Effects:
Updates the output and warning containers with preview information.
Example:
>>> preview_loaded_files(event, context, warning_handler)
"Event List - my_event_list:\nTimes (first 10): [0.1, 0.2, ...]\nMJDREF: 58000"
"""
preview_data = []
# Get all data from state manager
all_event_data = context.state.get_event_data()
all_light_curves = context.state.get_light_curve()
# Add a summary of loaded files and their names
if all_event_data:
preview_data.append(
f"Loaded Event Files: {len(all_event_data)}\n"
f"Event File Names: {[file_name for file_name, _ in all_event_data]}\n"
)
else:
preview_data.append("No Event Files Loaded.\n")
if all_light_curves:
preview_data.append(
f"Loaded Light Curves: {len(all_light_curves)}\n"
f"Light Curve Names: {[lc_name for lc_name, _ in all_light_curves]}\n"
)
else:
preview_data.append("No Light Curves Loaded.\n")
# Preview EventList data
if all_event_data:
for file_name, event_list in all_event_data:
try:
# Gather available attributes dynamically
attributes = [
("Times (first entries)", event_list.time[:time_limit]),
("Energy (keV)", getattr(event_list, "energy", "Not available")),
("PI Channels", getattr(event_list, "pi", "Not available")),
("MJDREF", event_list.mjdref),
("Good Time Intervals (GTIs)", event_list.gti),
("Mission", getattr(event_list, "mission", "Not available")),
("Instrument", getattr(event_list, "instr", "Not available")),
(
"Detector IDs",
getattr(event_list, "detector_id", "Not available"),
),
("Ephemeris", getattr(event_list, "ephem", "Not available")),
("Time Reference", getattr(event_list, "timeref", "Not available")),
("Time System", getattr(event_list, "timesys", "Not available")),
("Header", getattr(event_list, "header", "Not available")),
]
# Format preview data
event_preview = "\n\n\n----------------------\n"
event_preview += f"Event List - {file_name}:\n"
for attr_name, attr_value in attributes:
if isinstance(
attr_value, np.ndarray
): # Show limited entries for arrays
attr_value = attr_value[:time_limit]
event_preview += f"{attr_name}: {attr_value}\n\n"
event_preview += "----------------------\n\n\n"
preview_data.append(event_preview)
except Exception as e:
user_msg = ErrorHandler.handle_warning(
str(e),
context="Generating event list preview",
file_name=file_name
)
warning_handler.warn(user_msg, category=RuntimeWarning)
# Preview Lightcurve data
if all_light_curves:
for lc_name, lightcurve in all_light_curves:
try:
attributes = [
("Times (first entries)", lightcurve.time[:time_limit]),
("Counts (first entries)", lightcurve.counts[:time_limit]),
(
"Count Errors (first entries)",
getattr(lightcurve, "counts_err", "Not available"),
),
(
"Background Counts",
getattr(lightcurve, "bg_counts", "Not available"),
),
(
"Background Ratio",
getattr(lightcurve, "bg_ratio", "Not available"),
),
(
"Fractional Exposure",
getattr(lightcurve, "frac_exp", "Not available"),
),
("Mean Rate", getattr(lightcurve, "meanrate", "Not available")),
("Mean Counts", getattr(lightcurve, "meancounts", "Not available")),
("Number of Points", getattr(lightcurve, "n", "Not available")),
("Time Resolution (dt)", lightcurve.dt),
("MJDREF", lightcurve.mjdref),
("Good Time Intervals (GTIs)", lightcurve.gti),
("Duration (tseg)", getattr(lightcurve, "tseg", "Not available")),
(
"Start Time (tstart)",
getattr(lightcurve, "tstart", "Not available"),
),
(
"Error Distribution",
getattr(lightcurve, "err_dist", "Not available"),
),
("Mission", getattr(lightcurve, "mission", "Not available")),
("Instrument", getattr(lightcurve, "instr", "Not available")),
]
lightcurve_preview = "\n\n----------------------\n"
lightcurve_preview += f"Light Curve - {lc_name}:\n"
for attr_name, attr_value in attributes:
if isinstance(attr_value, np.ndarray):
attr_value = attr_value[:time_limit]
lightcurve_preview += f"{attr_name}: {attr_value}\n"
lightcurve_preview += "----------------------\n\n"
preview_data.append(lightcurve_preview)
except Exception as e:
user_msg = ErrorHandler.handle_warning(
str(e),
context="Generating lightcurve preview",
lc_name=lc_name
)
warning_handler.warn(user_msg, category=RuntimeWarning)
# Display preview data or message if no data available
if preview_data:
context.update_container('output_box',
create_loadingdata_output_box("\n\n".join(preview_data))
)
else:
context.update_container('output_box',
create_loadingdata_output_box(
"No valid files or light curves loaded for preview."
)
)
if warning_handler.warnings:
context.update_container('warning_box',
create_loadingdata_warning_box("\n".join(warning_handler.warnings))
)
else:
context.update_container('warning_box', create_loadingdata_warning_box("No warnings."))
warning_handler.warnings.clear()
# TODO: ADD better comments, error handlling and docstrings
def clear_loaded_files(event, context: AppContext):
"""
Clear all loaded event data files and light curves from memory.
Args:
event: The event object triggering the function.
context (AppContext): The application context containing containers and state.
Side effects:
- Clears event data and light curves from state manager.
- Updates the output container with messages.
Example:
>>> clear_loaded_files(event, context)
"Loaded event files have been cleared."
"""
event_data_count = len(context.state.get_event_data())
light_curve_count = len(context.state.get_light_curve())
event_data_cleared = False
light_curve_data_cleared = False
# Clear EventList data
if event_data_count > 0:
context.state.clear_event_data()
event_data_cleared = True
# Clear Lightcurve data
if light_curve_count > 0:
context.state.clear_light_curves()
light_curve_data_cleared = True
# Create appropriate messages based on what was cleared
messages = []
if event_data_cleared:
messages.append("Loaded event files have been cleared.")
if light_curve_data_cleared:
messages.append("Loaded light curves have been cleared.")
if not messages:
messages.append("No files or light curves loaded to clear.")
# Update the output container
context.update_container('output_box', create_loadingdata_output_box("\n".join(messages)))
context.update_container('warning_box', create_loadingdata_warning_box("No warnings."))
# TODO: ADD better comments, error handlling and docstrings
def create_loading_tab(context: AppContext, warning_handler):
"""
Create the tab for loading event data files.
Args:
context (AppContext): The application context containing containers and state.
warning_handler (WarningHandler): The handler for warnings.
Returns:
Column: A Panel Column containing the widgets and layout for the loading tab.
Example:
>>> tab = create_loading_tab(context, warning_handler)
>>> isinstance(tab, pn.Column)
True
"""
# Get the user's home directory
home_directory = os.path.expanduser("~")
file_selector = pn.widgets.FileSelector(
home_directory, only_files=True, name="Select File", show_hidden=True
)
filename_input = pn.widgets.TextInput(
name="Enter File Names",
placeholder="Enter file names, comma-separated",
width=400,
)
format_input = pn.widgets.TextInput(
name="Enter Formats",
placeholder="Enter formats (e.g., ogip, pickle, hdf5), comma-separated",
width=400,
)
format_checkbox = pn.widgets.Checkbox(
name='Use default format ("ogip" for reading, "hdf5" for writing/saving)',
value=False,
)
load_button = pn.widgets.Button(name="Read as EventLists", button_type="primary")
save_button = pn.widgets.Button(
name="Save loaded EventLists", button_type="success"
)
delete_button = pn.widgets.Button(
name="Delete Selected Files", button_type="danger"
)
preview_button = pn.widgets.Button(
name="Preview loaded EventLists", button_type="default"
)
clear_button = pn.widgets.Button(
name="Clear Loaded EventLists", button_type="warning"
)
tooltip_format = pn.widgets.TooltipIcon(
value=Tooltip(
content="""For HEASoft-supported missions, use 'ogip'. Using 'fits' directly might cause issues with Astropy tables.""",
position="bottom",
)
)
tooltip_file = pn.widgets.TooltipIcon(
value=Tooltip(
content="""Ensure the file contains at least a 'time' column.""",
position="bottom",
)
)
tooltip_rmf = pn.widgets.TooltipIcon(
value=Tooltip(
content="""Calibrates PI(Pulse invariant) values to physical energy.""",
position="bottom",
)
)
tooltip_additional_columns = pn.widgets.TooltipIcon(
value=Tooltip(
content="""Any further keyword arguments to be passed for reading in event lists in OGIP/HEASOFT format""",
position="bottom",
)
)
# FileDropper for RMF file
rmf_file_dropper = pn.widgets.FileDropper(
# accepted_filetypes=['.rmf', '.fits'], # Accept RMF files or compatible FITS files
multiple=False, # Only allow a single file
name="Upload RMF(Response Matrix File) File (optional)",
max_file_size="1000MB", # Limit file size
layout="compact", # Layout style
)
additional_columns_input = pn.widgets.TextInput(
name="Additional Columns (optional)", placeholder="Comma-separated column names"
)
# Lazy loading controls
use_lazy_loading = pn.widgets.Checkbox(
name="Use lazy loading (recommended for files >1GB)",
value=False,
)
tooltip_lazy = pn.widgets.TooltipIcon(
value=Tooltip(
content="""Lazy loading reads large files in chunks without loading everything into memory.
Recommended for files >1GB. Prevents memory crashes but some operations may be slower.""",
position="bottom",
)
)
# Preview mode controls (for extremely large files)
use_preview_mode = pn.widgets.Checkbox(
name="Preview mode (load only first segment)",
value=False,
)
preview_duration_input = pn.widgets.FloatInput(
name="Preview duration (seconds)",
value=100.0,
start=10.0,
end=1000.0,
step=10.0,
)
tooltip_preview = pn.widgets.TooltipIcon(
value=Tooltip(
content="""Preview mode loads only the first segment of data for extremely large files.
Useful when file is too large to fit in memory even with lazy loading.
You can analyze the preview and decide on next steps.""",
position="bottom",
),
)
# File size info pane (updated dynamically)
file_size_info = pn.pane.Markdown("", sizing_mode="stretch_width")
def update_file_size_info(event=None):
"""Update file size info when file selection changes."""
if not file_selector.value:
file_size_info.object = ""
use_lazy_loading.value = False
return
try:
file_path = file_selector.value[0] if isinstance(file_selector.value, list) else file_selector.value
# Check file size using data service
result = context.services.data.check_file_size(file_path)
if result["success"]:
data = result["data"]
risk_level = data["risk_level"]
file_size_mb = data["file_size_mb"]
file_size_gb = data["file_size_gb"]
estimated_mem_mb = data["estimated_memory_mb"]
memory_info = data["memory_info"]
recommend_lazy = data["recommend_lazy"]
# Color code based on risk
color_map = {
'safe': 'green',
'caution': 'orange',
'risky': 'darkorange',
'critical': 'red'
}
color = color_map.get(risk_level, 'black')
# Auto-enable lazy loading for large/risky files
if recommend_lazy and not use_lazy_loading.value:
use_lazy_loading.value = True
# Create info message
recommendation_text = "Use lazy loading" if recommend_lazy else "Standard loading OK"
# Add preview mode suggestion for critical/extremely large files
show_preview_warning = (risk_level == 'critical') or (file_size_gb > 5.0)
info_md = f"""
**File Size Info:**
- **File Size**: {file_size_gb:.2f} GB ({file_size_mb:.1f} MB)
- **Estimated Memory**: ~{estimated_mem_mb:.1f} MB
- **Risk Level**: {risk_level.upper()}
- **Available RAM**: {memory_info['available_mb']:.0f} MB ({100-memory_info['percent']:.1f}% free)
- **Recommendation**: {recommendation_text}
"""
if show_preview_warning:
info_md += "\n- **CRITICAL**: File may be too large for full load. Consider using Preview Mode!"
file_size_info.object = info_md
else:
file_size_info.object = f"**Error checking file size:** {result['message']}"
except Exception as e:
file_size_info.object = f"**Error:** {str(e)}"
# Update file size info when file selection changes
file_selector.param.watch(update_file_size_info, 'value')
def on_load_click(event):
# Clear previous outputs and warnings
context.update_container('output_box', create_loadingdata_output_box("N.A."))
context.update_container('warning_box', create_loadingdata_warning_box("N.A."))
warning_handler.warnings.clear()
warnings.resetwarnings()
read_event_data(
event,
file_selector,
filename_input,
format_input,
format_checkbox,
rmf_file_dropper,
additional_columns_input,
use_lazy_loading,
use_preview_mode,
preview_duration_input,
context,
warning_handler,
)
def on_save_click(event):
# Clear previous outputs and warnings
context.update_container('output_box', create_loadingdata_output_box("N.A."))
context.update_container('warning_box', create_loadingdata_warning_box("N.A."))
warning_handler.warnings.clear()
warnings.resetwarnings()
save_loaded_files(
event,
filename_input,
format_input,
format_checkbox,
context,
warning_handler,
)
def on_delete_click(event):
# Clear previous outputs and warnings
context.update_container('warning_box', create_loadingdata_warning_box("N.A."))
context.update_container('output_box', create_loadingdata_output_box("N.A."))
warning_handler.warnings.clear()
warnings.resetwarnings()
delete_selected_files(
event,
file_selector,
context,
warning_handler,
)
def on_preview_click(event):
# Clear previous outputs and warnings
context.update_container('output_box', create_loadingdata_output_box("N.A."))
context.update_container('warning_box', create_loadingdata_warning_box("N.A."))
warning_handler.warnings.clear()
warnings.resetwarnings()
preview_loaded_files(
event, context, warning_handler
)
def on_clear_click(event):
# Clear the loaded files list
context.update_container('output_box', create_loadingdata_output_box("N.A."))
context.update_container('warning_box', create_loadingdata_warning_box("N.A."))
warning_handler.warnings.clear()
warnings.resetwarnings()
clear_loaded_files(event, context)
load_button.on_click(on_load_click)
save_button.on_click(on_save_click)
delete_button.on_click(on_delete_click)
preview_button.on_click(on_preview_click)
clear_button.on_click(on_clear_click)
# Left column: Basic file selection and configuration
left_column = pn.Column(
pn.Row(
pn.pane.Markdown("