StingrayExplorer / modules /DataLoading /DataIngestion.py
kartikmandar's picture
feat: add lazy loading for large FITS files
869b08d
# Standard Imports
import os
import stat
import copy
import logging
import numpy as np
import warnings
import tempfile
import traceback
import requests
from bokeh.models import Tooltip
# HoloViz Imports
import panel as pn
# Stingray Imports
from stingray.events import EventList
from stingray import Lightcurve
# Dashboard Classes and State Management Imports
from utils.state_manager import state_manager
from utils.app_context import AppContext
from utils.error_handler import ErrorHandler
from utils.error_recovery import ErrorRecoveryPanel, show_file_error, show_validation_error, show_success
from utils.DashboardClasses import (
MainHeader,
MainArea,
OutputBox,
WarningBox,
HelpBox,
WarningHandler,
PlotsContainer,
)
# Strings Imports
# Path to the topmost directory for loaded data
loaded_data_path = os.path.join(os.getcwd(), "files", "loaded-data")
# Create the loaded-data directory if it doesn't exist
os.makedirs(loaded_data_path, exist_ok=True)
def create_warning_handler():
"""
Create an instance of WarningHandler and redirect warnings to this custom handler.
Returns:
WarningHandler: An instance of WarningHandler to handle warnings.
Side effects:
Overrides the default warning handler with a custom one.
Example:
>>> warning_handler = create_warning_handler()
>>> warning_handler.warn("Test warning", category=RuntimeWarning)
"""
warning_handler = WarningHandler()
warnings.showwarning = warning_handler.warn
return warning_handler
""" Header Section """
def create_loadingdata_header(context: AppContext):
"""
Create the header for the data loading section.
Args:
context (AppContext): The application context containing containers and state.
Returns:
MainHeader: An instance of MainHeader with the specified heading.
Example:
>>> header = create_loadingdata_header(context)
>>> header.heading.value
'Data Ingestion'
"""
home_heading_input = pn.widgets.TextInput(name="Heading", value="Data Ingestion")
return MainHeader(heading=home_heading_input)
""" Output Box Section """
def create_loadingdata_output_box(content):
"""
Create an output box to display messages.
Args:
content (str): The content to be displayed in the output box.
Returns:
OutputBox: An instance of OutputBox with the specified content.
Example:
>>> output_box = create_loadingdata_output_box("File loaded successfully.")
>>> output_box.output_content
'File loaded successfully.'
"""
return OutputBox(output_content=content)
""" Warning Box Section """
def create_loadingdata_warning_box(content):
"""
Create a warning box to display warnings.
Args:
content (str): The content to be displayed in the warning box.
Returns:
WarningBox: An instance of WarningBox with the specified content.
Example:
>>> warning_box = create_loadingdata_warning_box("Invalid file format.")
>>> warning_box.warning_content
'Invalid file format.'
"""
return WarningBox(warning_content=content)
def read_event_data(
event,
file_selector,
filename_input,
format_input,
format_checkbox,
rmf_file_dropper,
additional_columns_input,
use_lazy_loading,
use_preview_mode,
preview_duration_input,
context: AppContext,
warning_handler,
):
"""
Load event data from selected files with extended EventList.read functionality,
supporting FileDropper for RMF files and additional columns.
Args:
event: The event object triggering the function.
file_selector: The file selector widget.
filename_input: Text input for filenames.
format_input: Text input for file formats.
format_checkbox: Checkbox for default format.
rmf_file_dropper: File dropper for RMF files.
additional_columns_input: Text input for additional columns.
context (AppContext): The application context containing containers and state.
warning_handler: The handler for warnings.
"""
# Validation for required inputs
if not file_selector.value:
context.update_container('output_box',
create_loadingdata_output_box(
"No file selected. Please select a file to upload."
)
)
return
try:
# Parse file paths
file_paths = file_selector.value
filenames = (
[name.strip() for name in filename_input.value.split(",")]
if filename_input.value
else []
)
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Parsing file paths and names",
file_count=len(file_selector.value) if file_selector.value else 0
)
# Create retry callback
def retry_load():
load_event_lists_from_file(
event, file_selector, filename_input, format_input,
format_checkbox, rmf_file_dropper, additional_columns_input,
context, warning_handler
)
# Show error panel with retry option
error_panel = ErrorRecoveryPanel.create_error_panel(
error_message=user_msg,
error_type="error",
retry_callback=retry_load,
help_text="Check that file paths and filenames are correctly formatted (comma-separated if multiple)",
technical_details=tech_msg
)
context.update_container('warning_box', error_panel)
return
try:
# Parse file formats
formats = (
[fmt.strip() for fmt in format_input.value.split(",")]
if format_input.value
else []
)
# Use default format if checkbox is checked
if format_checkbox.value:
formats = ["ogip" for _ in range(len(file_paths))]
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Parsing file formats",
format_input=format_input.value if format_input.value else "None"
)
# Create retry callback
def retry_load():
load_event_lists_from_file(
event, file_selector, filename_input, format_input,
format_checkbox, rmf_file_dropper, additional_columns_input,
context, warning_handler
)
# Show error panel with retry option
error_panel = ErrorRecoveryPanel.create_error_panel(
error_message=user_msg,
error_type="error",
retry_callback=retry_load,
help_text="Supported formats: ogip, hea, fits (comma-separated if multiple files)",
technical_details=tech_msg
)
context.update_container('warning_box', error_panel)
return
try:
# Retrieve the RMF file from FileDropper (if any)
if rmf_file_dropper.value:
rmf_file = list(rmf_file_dropper.value.values())[0]
# Save the file data to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".rmf") as tmp_file:
tmp_file.write(rmf_file)
tmp_file_path = tmp_file.name
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Processing RMF file",
has_rmf=bool(rmf_file_dropper.value)
)
# Create clear callback to reset RMF file
def clear_rmf():
rmf_file_dropper.value = None
context.update_container('warning_box',
pn.pane.Markdown("*RMF file cleared. Ready to try again.*")
)
# Show error panel with clear option
error_panel = ErrorRecoveryPanel.create_error_panel(
error_message=user_msg,
error_type="error",
clear_callback=clear_rmf,
help_text="Make sure the RMF file is valid and in the correct format (.rmf extension)",
technical_details=tech_msg
)
context.update_container('warning_box', error_panel)
return
try:
# Parse additional columns
additional_columns = (
[col.strip() for col in additional_columns_input.value.split(",")]
if additional_columns_input.value
else None
)
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Parsing additional columns",
columns_input=additional_columns_input.value if additional_columns_input.value else "None"
)
# Create retry callback
def retry_load():
load_event_lists_from_file(
event, file_selector, filename_input, format_input,
format_checkbox, rmf_file_dropper, additional_columns_input,
context, warning_handler
)
# Create clear callback
def clear_columns():
additional_columns_input.value = ""
context.update_container('warning_box',
pn.pane.Markdown("*Additional columns cleared. Ready to try again.*")
)
# Show error panel with retry and clear options
error_panel = ErrorRecoveryPanel.create_error_panel(
error_message=user_msg,
error_type="error",
retry_callback=retry_load,
clear_callback=clear_columns,
help_text="Provide column names as comma-separated values (e.g., 'PI, ENERGY')",
technical_details=tech_msg
)
context.update_container('warning_box', error_panel)
return
# Use data service to load files
loaded_files = []
for file_path, file_name, file_format in zip(file_paths, filenames, formats):
# Choose loading method based on mode selection
if use_preview_mode.value:
# Use preview mode for extremely large files
result = context.services.data.load_event_list_preview(
file_path=file_path,
name=file_name,
preview_duration=preview_duration_input.value,
rmf_file=tmp_file_path if rmf_file_dropper.value else None,
additional_columns=additional_columns
)
elif use_lazy_loading.value:
# Use lazy loading method (now supports RMF and additional columns!)
result = context.services.data.load_event_list_lazy(
file_path=file_path,
name=file_name,
safety_margin=0.5,
rmf_file=tmp_file_path if rmf_file_dropper.value else None,
additional_columns=additional_columns
)
else:
# Use standard loading method
result = context.services.data.load_event_list(
file_path=file_path,
name=file_name,
fmt=file_format,
rmf_file=tmp_file_path if rmf_file_dropper.value else None,
additional_columns=additional_columns
)
if result["success"]:
# Add loading method info to message
method_info = result.get("metadata", {}).get("method", "standard")
message = result["message"]
if method_info == "standard_risky":
message += " ⚠️ (Loaded despite memory risk)"
loaded_files.append(message)
else:
# If loading failed, show error panel with retry
def retry_load():
read_event_data(
event, file_selector, filename_input, format_input,
format_checkbox, rmf_file_dropper, additional_columns_input,
use_lazy_loading, context, warning_handler
)
error_panel = ErrorRecoveryPanel.create_error_panel(
error_message=result['message'],
error_type="error",
retry_callback=retry_load,
help_text="Check the file format and try again, or select different files",
technical_details=result.get('error', 'No technical details available')
)
context.update_container('output_box', error_panel)
return
# Show success panel
success_message = f"Successfully loaded {len(loaded_files)} file(s)"
details = "<br>".join([f"• {msg}" for msg in loaded_files])
success_panel = ErrorRecoveryPanel.create_success_panel(
success_message=success_message,
details=details
)
context.update_container('output_box', success_panel)
# Show warnings if any
if warning_handler.warnings:
context.update_container('warning_box',
create_loadingdata_warning_box("\n".join(warning_handler.warnings))
)
else:
context.update_container('warning_box', create_loadingdata_warning_box("No warnings."))
# Clear the warnings after displaying them
warning_handler.warnings.clear()
def save_loaded_files(
event,
filename_input,
format_input,
format_checkbox,
context: AppContext,
warning_handler,
):
"""
Save loaded event data to specified file formats.
Args:
event: The event object triggering the function.
filename_input (TextInput): The input widget for filenames.
format_input (TextInput): The input widget for formats.
format_checkbox (Checkbox): The checkbox for default format.
context (AppContext): The application context containing containers and state.
warning_handler (WarningHandler): The handler for warnings.
Side effects:
- Saves files to disk in the specified formats.
- Updates the output and warning containers with messages.
Restrictions:
- Requires that the number of filenames and formats matches the number of loaded files unless default format is used.
Example:
>>> save_loaded_files(event, filename_input, format_input, format_checkbox, context, warning_handler)
>>> os.path.exists('/path/to/saved/file.hdf5')
True # Assuming the file was saved successfully
"""
# Get all event data from state manager
all_event_data = context.state.get_event_data()
if not all_event_data:
context.update_container('output_box',
create_loadingdata_output_box("No files loaded to save.")
)
return
filenames = (
[name.strip() for name in filename_input.value.split(",")]
if filename_input.value
else [event[0] for event in all_event_data]
)
# TODO: ADD checks for valid formats
formats = (
[fmt.strip() for fmt in format_input.value.split(",")]
if format_input.value
else []
)
if format_checkbox.value:
formats = ["hdf5" for _ in range(len(all_event_data))]
if len(filenames) < len(all_event_data):
context.update_container('output_box',
create_loadingdata_output_box("Please specify names for all loaded files.")
)
return
if len(filenames) != len(all_event_data):
context.update_container('output_box',
create_loadingdata_output_box(
"Please ensure that the number of names matches the number of loaded files."
)
)
return
if len(formats) < len(all_event_data):
context.update_container('output_box',
create_loadingdata_output_box(
"Please specify formats for all loaded files or check the default format option."
)
)
return
saved_files = []
try:
for (loaded_name, event_list), file_name, file_format in zip(
all_event_data, filenames, formats
):
if os.path.exists(
os.path.join(loaded_data_path, f"{file_name}.{file_format}")
):
context.update_container('output_box',
create_loadingdata_output_box(
f"A file with the name '{file_name}' already exists. Please provide a different name."
)
)
return
save_path = os.path.join(loaded_data_path, f"{file_name}.{file_format}")
# Use export service to save the event list
result = context.services.export.export_event_list(
name=file_name,
file_path=save_path,
fmt=file_format
)
if result["success"]:
saved_files.append(result["message"])
else:
saved_files.append(f"Error saving '{file_name}': {result['message']}")
context.update_container('output_box',
create_loadingdata_output_box("\n".join(saved_files))
)
if warning_handler.warnings:
context.update_container('warning_box',
create_loadingdata_warning_box("\n".join(warning_handler.warnings))
)
else:
context.update_container('warning_box', create_loadingdata_warning_box("No warnings."))
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Saving loaded files",
save_directory=loaded_data_path
)
context.update_container('warning_box',
create_loadingdata_warning_box(f"Error: {user_msg}")
)
# Clear the warnings after displaying them
warning_handler.warnings.clear()
# TODO: ADD better comments, error handlling and docstrings
def delete_selected_files(
event,
file_selector,
context: AppContext,
warning_handler,
):
"""
Delete selected files from the file system.
Args:
event: The event object triggering the function.
file_selector (FileSelector): The file selector widget.
context (AppContext): The application context containing containers and state.
warning_handler (WarningHandler): The handler for warnings.
Side effects:
- Deletes files from the file system.
- Updates the output and warning containers with messages.
Restrictions:
- Cannot delete `.py` files for safety reasons.
Example:
>>> delete_selected_files(event, file_selector, context, warning_handler)
>>> os.path.exists('/path/to/deleted/file')
False # Assuming the file was deleted successfully
"""
# Define allowed extensions for deletion
allowed_extensions = {
".pkl",
".pickle",
".fits",
".evt",
".h5",
".hdf5",
".ecsv",
".txt",
".dat",
".csv",
".vot",
".tex",
".html",
".gz",
}
if not file_selector.value:
context.update_container('output_box',
create_loadingdata_output_box(
"No file selected. Please select a file to delete."
)
)
return
file_paths = file_selector.value
deleted_files = []
for file_path in file_paths:
if not any(file_path.endswith(ext) for ext in allowed_extensions):
deleted_files.append(
f"Cannot delete file '{file_path}': File type is not allowed for deletion."
)
continue
try:
# Change the file permissions to ensure it can be deleted
os.chmod(file_path, stat.S_IWUSR | stat.S_IREAD | stat.S_IWRITE)
os.remove(file_path)
deleted_files.append(f"File '{file_path}' deleted successfully.")
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Deleting file",
file_path=file_path,
log_level=logging.WARNING
)
deleted_files.append(f"Error deleting '{file_path}': {user_msg}")
context.update_container('output_box', create_loadingdata_output_box("\n".join(deleted_files)))
if warning_handler.warnings:
context.update_container('warning_box',
create_loadingdata_warning_box("\n".join(warning_handler.warnings))
)
else:
context.update_container('warning_box', create_loadingdata_warning_box("No warnings."))
warning_handler.warnings.clear()
# TODO: ADD better comments, error handlling and docstrings
def preview_loaded_files(
event,
context: AppContext,
warning_handler,
time_limit=10,
):
"""
Preview the loaded event data files and light curves.
Args:
event: The event object triggering the function.
context (AppContext): The application context containing containers and state.
warning_handler (WarningHandler): The handler for warnings.
time_limit (int): The number of time entries to preview.
Side Effects:
Updates the output and warning containers with preview information.
Example:
>>> preview_loaded_files(event, context, warning_handler)
"Event List - my_event_list:\nTimes (first 10): [0.1, 0.2, ...]\nMJDREF: 58000"
"""
preview_data = []
# Get all data from state manager
all_event_data = context.state.get_event_data()
all_light_curves = context.state.get_light_curve()
# Add a summary of loaded files and their names
if all_event_data:
preview_data.append(
f"Loaded Event Files: {len(all_event_data)}\n"
f"Event File Names: {[file_name for file_name, _ in all_event_data]}\n"
)
else:
preview_data.append("No Event Files Loaded.\n")
if all_light_curves:
preview_data.append(
f"Loaded Light Curves: {len(all_light_curves)}\n"
f"Light Curve Names: {[lc_name for lc_name, _ in all_light_curves]}\n"
)
else:
preview_data.append("No Light Curves Loaded.\n")
# Preview EventList data
if all_event_data:
for file_name, event_list in all_event_data:
try:
# Gather available attributes dynamically
attributes = [
("Times (first entries)", event_list.time[:time_limit]),
("Energy (keV)", getattr(event_list, "energy", "Not available")),
("PI Channels", getattr(event_list, "pi", "Not available")),
("MJDREF", event_list.mjdref),
("Good Time Intervals (GTIs)", event_list.gti),
("Mission", getattr(event_list, "mission", "Not available")),
("Instrument", getattr(event_list, "instr", "Not available")),
(
"Detector IDs",
getattr(event_list, "detector_id", "Not available"),
),
("Ephemeris", getattr(event_list, "ephem", "Not available")),
("Time Reference", getattr(event_list, "timeref", "Not available")),
("Time System", getattr(event_list, "timesys", "Not available")),
("Header", getattr(event_list, "header", "Not available")),
]
# Format preview data
event_preview = "\n\n\n----------------------\n"
event_preview += f"Event List - {file_name}:\n"
for attr_name, attr_value in attributes:
if isinstance(
attr_value, np.ndarray
): # Show limited entries for arrays
attr_value = attr_value[:time_limit]
event_preview += f"{attr_name}: {attr_value}\n\n"
event_preview += "----------------------\n\n\n"
preview_data.append(event_preview)
except Exception as e:
user_msg = ErrorHandler.handle_warning(
str(e),
context="Generating event list preview",
file_name=file_name
)
warning_handler.warn(user_msg, category=RuntimeWarning)
# Preview Lightcurve data
if all_light_curves:
for lc_name, lightcurve in all_light_curves:
try:
attributes = [
("Times (first entries)", lightcurve.time[:time_limit]),
("Counts (first entries)", lightcurve.counts[:time_limit]),
(
"Count Errors (first entries)",
getattr(lightcurve, "counts_err", "Not available"),
),
(
"Background Counts",
getattr(lightcurve, "bg_counts", "Not available"),
),
(
"Background Ratio",
getattr(lightcurve, "bg_ratio", "Not available"),
),
(
"Fractional Exposure",
getattr(lightcurve, "frac_exp", "Not available"),
),
("Mean Rate", getattr(lightcurve, "meanrate", "Not available")),
("Mean Counts", getattr(lightcurve, "meancounts", "Not available")),
("Number of Points", getattr(lightcurve, "n", "Not available")),
("Time Resolution (dt)", lightcurve.dt),
("MJDREF", lightcurve.mjdref),
("Good Time Intervals (GTIs)", lightcurve.gti),
("Duration (tseg)", getattr(lightcurve, "tseg", "Not available")),
(
"Start Time (tstart)",
getattr(lightcurve, "tstart", "Not available"),
),
(
"Error Distribution",
getattr(lightcurve, "err_dist", "Not available"),
),
("Mission", getattr(lightcurve, "mission", "Not available")),
("Instrument", getattr(lightcurve, "instr", "Not available")),
]
lightcurve_preview = "\n\n----------------------\n"
lightcurve_preview += f"Light Curve - {lc_name}:\n"
for attr_name, attr_value in attributes:
if isinstance(attr_value, np.ndarray):
attr_value = attr_value[:time_limit]
lightcurve_preview += f"{attr_name}: {attr_value}\n"
lightcurve_preview += "----------------------\n\n"
preview_data.append(lightcurve_preview)
except Exception as e:
user_msg = ErrorHandler.handle_warning(
str(e),
context="Generating lightcurve preview",
lc_name=lc_name
)
warning_handler.warn(user_msg, category=RuntimeWarning)
# Display preview data or message if no data available
if preview_data:
context.update_container('output_box',
create_loadingdata_output_box("\n\n".join(preview_data))
)
else:
context.update_container('output_box',
create_loadingdata_output_box(
"No valid files or light curves loaded for preview."
)
)
if warning_handler.warnings:
context.update_container('warning_box',
create_loadingdata_warning_box("\n".join(warning_handler.warnings))
)
else:
context.update_container('warning_box', create_loadingdata_warning_box("No warnings."))
warning_handler.warnings.clear()
# TODO: ADD better comments, error handlling and docstrings
def clear_loaded_files(event, context: AppContext):
"""
Clear all loaded event data files and light curves from memory.
Args:
event: The event object triggering the function.
context (AppContext): The application context containing containers and state.
Side effects:
- Clears event data and light curves from state manager.
- Updates the output container with messages.
Example:
>>> clear_loaded_files(event, context)
"Loaded event files have been cleared."
"""
event_data_count = len(context.state.get_event_data())
light_curve_count = len(context.state.get_light_curve())
event_data_cleared = False
light_curve_data_cleared = False
# Clear EventList data
if event_data_count > 0:
context.state.clear_event_data()
event_data_cleared = True
# Clear Lightcurve data
if light_curve_count > 0:
context.state.clear_light_curves()
light_curve_data_cleared = True
# Create appropriate messages based on what was cleared
messages = []
if event_data_cleared:
messages.append("Loaded event files have been cleared.")
if light_curve_data_cleared:
messages.append("Loaded light curves have been cleared.")
if not messages:
messages.append("No files or light curves loaded to clear.")
# Update the output container
context.update_container('output_box', create_loadingdata_output_box("\n".join(messages)))
context.update_container('warning_box', create_loadingdata_warning_box("No warnings."))
# TODO: ADD better comments, error handlling and docstrings
def create_loading_tab(context: AppContext, warning_handler):
"""
Create the tab for loading event data files.
Args:
context (AppContext): The application context containing containers and state.
warning_handler (WarningHandler): The handler for warnings.
Returns:
Column: A Panel Column containing the widgets and layout for the loading tab.
Example:
>>> tab = create_loading_tab(context, warning_handler)
>>> isinstance(tab, pn.Column)
True
"""
# Get the user's home directory
home_directory = os.path.expanduser("~")
file_selector = pn.widgets.FileSelector(
home_directory, only_files=True, name="Select File", show_hidden=True
)
filename_input = pn.widgets.TextInput(
name="Enter File Names",
placeholder="Enter file names, comma-separated",
width=400,
)
format_input = pn.widgets.TextInput(
name="Enter Formats",
placeholder="Enter formats (e.g., ogip, pickle, hdf5), comma-separated",
width=400,
)
format_checkbox = pn.widgets.Checkbox(
name='Use default format ("ogip" for reading, "hdf5" for writing/saving)',
value=False,
)
load_button = pn.widgets.Button(name="Read as EventLists", button_type="primary")
save_button = pn.widgets.Button(
name="Save loaded EventLists", button_type="success"
)
delete_button = pn.widgets.Button(
name="Delete Selected Files", button_type="danger"
)
preview_button = pn.widgets.Button(
name="Preview loaded EventLists", button_type="default"
)
clear_button = pn.widgets.Button(
name="Clear Loaded EventLists", button_type="warning"
)
tooltip_format = pn.widgets.TooltipIcon(
value=Tooltip(
content="""For HEASoft-supported missions, use 'ogip'. Using 'fits' directly might cause issues with Astropy tables.""",
position="bottom",
)
)
tooltip_file = pn.widgets.TooltipIcon(
value=Tooltip(
content="""Ensure the file contains at least a 'time' column.""",
position="bottom",
)
)
tooltip_rmf = pn.widgets.TooltipIcon(
value=Tooltip(
content="""Calibrates PI(Pulse invariant) values to physical energy.""",
position="bottom",
)
)
tooltip_additional_columns = pn.widgets.TooltipIcon(
value=Tooltip(
content="""Any further keyword arguments to be passed for reading in event lists in OGIP/HEASOFT format""",
position="bottom",
)
)
# FileDropper for RMF file
rmf_file_dropper = pn.widgets.FileDropper(
# accepted_filetypes=['.rmf', '.fits'], # Accept RMF files or compatible FITS files
multiple=False, # Only allow a single file
name="Upload RMF(Response Matrix File) File (optional)",
max_file_size="1000MB", # Limit file size
layout="compact", # Layout style
)
additional_columns_input = pn.widgets.TextInput(
name="Additional Columns (optional)", placeholder="Comma-separated column names"
)
# Lazy loading controls
use_lazy_loading = pn.widgets.Checkbox(
name="Use lazy loading (recommended for files >1GB)",
value=False,
)
tooltip_lazy = pn.widgets.TooltipIcon(
value=Tooltip(
content="""Lazy loading reads large files in chunks without loading everything into memory.
Recommended for files >1GB. Prevents memory crashes but some operations may be slower.""",
position="bottom",
)
)
# Preview mode controls (for extremely large files)
use_preview_mode = pn.widgets.Checkbox(
name="Preview mode (load only first segment)",
value=False,
)
preview_duration_input = pn.widgets.FloatInput(
name="Preview duration (seconds)",
value=100.0,
start=10.0,
end=1000.0,
step=10.0,
)
tooltip_preview = pn.widgets.TooltipIcon(
value=Tooltip(
content="""Preview mode loads only the first segment of data for extremely large files.
Useful when file is too large to fit in memory even with lazy loading.
You can analyze the preview and decide on next steps.""",
position="bottom",
),
)
# File size info pane (updated dynamically)
file_size_info = pn.pane.Markdown("", sizing_mode="stretch_width")
def update_file_size_info(event=None):
"""Update file size info when file selection changes."""
if not file_selector.value:
file_size_info.object = ""
use_lazy_loading.value = False
return
try:
file_path = file_selector.value[0] if isinstance(file_selector.value, list) else file_selector.value
# Check file size using data service
result = context.services.data.check_file_size(file_path)
if result["success"]:
data = result["data"]
risk_level = data["risk_level"]
file_size_mb = data["file_size_mb"]
file_size_gb = data["file_size_gb"]
estimated_mem_mb = data["estimated_memory_mb"]
memory_info = data["memory_info"]
recommend_lazy = data["recommend_lazy"]
# Color code based on risk
color_map = {
'safe': 'green',
'caution': 'orange',
'risky': 'darkorange',
'critical': 'red'
}
color = color_map.get(risk_level, 'black')
# Auto-enable lazy loading for large/risky files
if recommend_lazy and not use_lazy_loading.value:
use_lazy_loading.value = True
# Create info message
recommendation_text = "Use lazy loading" if recommend_lazy else "Standard loading OK"
# Add preview mode suggestion for critical/extremely large files
show_preview_warning = (risk_level == 'critical') or (file_size_gb > 5.0)
info_md = f"""
**File Size Info:**
- **File Size**: {file_size_gb:.2f} GB ({file_size_mb:.1f} MB)
- **Estimated Memory**: ~{estimated_mem_mb:.1f} MB
- **Risk Level**: <span style="color:{color}; font-weight:bold">{risk_level.upper()}</span>
- **Available RAM**: {memory_info['available_mb']:.0f} MB ({100-memory_info['percent']:.1f}% free)
- **Recommendation**: {recommendation_text}
"""
if show_preview_warning:
info_md += "\n- **CRITICAL**: File may be too large for full load. Consider using Preview Mode!"
file_size_info.object = info_md
else:
file_size_info.object = f"**Error checking file size:** {result['message']}"
except Exception as e:
file_size_info.object = f"**Error:** {str(e)}"
# Update file size info when file selection changes
file_selector.param.watch(update_file_size_info, 'value')
def on_load_click(event):
# Clear previous outputs and warnings
context.update_container('output_box', create_loadingdata_output_box("N.A."))
context.update_container('warning_box', create_loadingdata_warning_box("N.A."))
warning_handler.warnings.clear()
warnings.resetwarnings()
read_event_data(
event,
file_selector,
filename_input,
format_input,
format_checkbox,
rmf_file_dropper,
additional_columns_input,
use_lazy_loading,
use_preview_mode,
preview_duration_input,
context,
warning_handler,
)
def on_save_click(event):
# Clear previous outputs and warnings
context.update_container('output_box', create_loadingdata_output_box("N.A."))
context.update_container('warning_box', create_loadingdata_warning_box("N.A."))
warning_handler.warnings.clear()
warnings.resetwarnings()
save_loaded_files(
event,
filename_input,
format_input,
format_checkbox,
context,
warning_handler,
)
def on_delete_click(event):
# Clear previous outputs and warnings
context.update_container('warning_box', create_loadingdata_warning_box("N.A."))
context.update_container('output_box', create_loadingdata_output_box("N.A."))
warning_handler.warnings.clear()
warnings.resetwarnings()
delete_selected_files(
event,
file_selector,
context,
warning_handler,
)
def on_preview_click(event):
# Clear previous outputs and warnings
context.update_container('output_box', create_loadingdata_output_box("N.A."))
context.update_container('warning_box', create_loadingdata_warning_box("N.A."))
warning_handler.warnings.clear()
warnings.resetwarnings()
preview_loaded_files(
event, context, warning_handler
)
def on_clear_click(event):
# Clear the loaded files list
context.update_container('output_box', create_loadingdata_output_box("N.A."))
context.update_container('warning_box', create_loadingdata_warning_box("N.A."))
warning_handler.warnings.clear()
warnings.resetwarnings()
clear_loaded_files(event, context)
load_button.on_click(on_load_click)
save_button.on_click(on_save_click)
delete_button.on_click(on_delete_click)
preview_button.on_click(on_preview_click)
clear_button.on_click(on_clear_click)
# Left column: Basic file selection and configuration
left_column = pn.Column(
pn.Row(
pn.pane.Markdown("<h2> Read an EventList object from File</h2>"),
pn.widgets.TooltipIcon(
value=Tooltip(
content="Supported Formats: pickle, hea or ogip, any other astropy.table.Table(ascii.ecsv, hdf5, etc.)",
position="bottom",
)
),
),
file_selector,
file_size_info, # Show file size and memory info
pn.pane.Markdown("---"), # Separator
pn.Row(filename_input, tooltip_file),
pn.Row(format_input, tooltip_format),
format_checkbox,
width_policy="min",
)
# Right column: Advanced options and actions
right_column = pn.Column(
pn.pane.Markdown("<h3>Advanced Options</h3>"),
pn.Row(rmf_file_dropper, tooltip_rmf),
pn.Row(additional_columns_input, tooltip_additional_columns),
pn.pane.Markdown("---"), # Separator
pn.pane.Markdown("<h3>Loading Options</h3>"),
pn.Row(use_lazy_loading, tooltip_lazy),
pn.Row(use_preview_mode, tooltip_preview),
preview_duration_input,
pn.pane.Markdown("---"), # Separator
pn.pane.Markdown("<h3>Actions</h3>"),
pn.Row(load_button, save_button, delete_button),
pn.Row(preview_button, clear_button),
width_policy="min",
)
# Two-column layout
tab_content = pn.Row(
left_column,
right_column,
width_policy="max",
)
return tab_content
# TODO: Add better comments, error handlling and docstrings and increase the functionality
def create_fetch_eventlist_tab(context: AppContext, warning_handler):
"""
Create the tab for fetching EventList data from a link.
Args:
context (AppContext): The application context containing containers and state.
warning_handler (WarningHandler): The handler for warnings.
Returns:
Column: A Panel Column containing the widgets and layout for the fetch tab.
"""
link_input = pn.widgets.TextInput(
name="Enter File Link",
placeholder="Enter the URL to the EventList file",
width=400,
)
filename_input = pn.widgets.TextInput(
name="File Name",
placeholder="Provide a name for the EventList",
width=400,
)
format_select = pn.widgets.Select(
name="File Format",
options=["ogip", "hdf5", "ascii.ecsv", "fits", "pickle"],
value="ogip",
)
fetch_button = pn.widgets.Button(
name="Fetch and Load EventList",
button_type="primary",
)
tooltip_link = pn.widgets.TooltipIcon(
value=Tooltip(
content="""When using urls from github use raw links.""",
position="bottom",
)
)
def fetch_eventlist(event):
if not link_input.value or not filename_input.value:
context.update_container('output_box',
create_loadingdata_output_box(
"Error: Please provide both the link and file name."
)
)
return
try:
link = link_input.value.strip()
# Download the file to a temporary file
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
temp_filename = tmp_file.name
response = requests.get(link, stream=True)
if response.status_code != 200:
raise ValueError(f"Failed to download file. Status code: {response.status_code}")
# Save file
with open(temp_filename, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
# Use data service to load from URL
result = context.services.data.load_event_list_from_url(
url=link_input.value,
name=filename_input.value.strip(),
fmt=format_select.value
)
if result["success"]:
context.update_container('output_box',
create_loadingdata_output_box(result["message"])
)
else:
warning_handler.warn(result["error"], category=RuntimeWarning)
context.update_container('output_box',
create_loadingdata_output_box(f"Error: {result['message']}")
)
except Exception as e:
user_msg, tech_msg = ErrorHandler.handle_error(
e,
context="Loading event list from URL",
url=link_input.value,
filename=filename_input.value
)
warning_handler.warn(tech_msg, category=RuntimeWarning)
context.update_container('output_box',
create_loadingdata_output_box(f"Error: {user_msg}")
)
finally:
# Ensure the temporary file is deleted after processing
if os.path.exists(temp_filename):
os.remove(temp_filename)
fetch_button.on_click(fetch_eventlist)
tab_content = pn.Column(
pn.pane.Markdown("### Fetch EventList from Link"),
pn.Row(link_input, tooltip_link),
filename_input,
format_select,
fetch_button,
)
return tab_content
def create_loadingdata_main_area(context: AppContext):
"""
Create the main area for the data loading tab, including all sub-tabs.
Args:
context (AppContext): The application context containing containers and state.
Returns:
MainArea: An instance of MainArea with all the necessary tabs for data loading.
Example:
>>> main_area = create_loadingdata_main_area(context)
>>> isinstance(main_area, MainArea)
True
"""
warning_handler = create_warning_handler()
tabs_content = {
"Read Event List from File": create_loading_tab(
context=context,
warning_handler=warning_handler,
),
"Fetch EventList from Link": create_fetch_eventlist_tab(
context=context,
warning_handler=warning_handler,
),
}
return MainArea(tabs_content=tabs_content)
def create_loadingdata_help_area():
"""
Create the help area for the data loading tab.
Returns:
HelpBox: An instance of HelpBox with the help content.
"""
# Content for "Introduction to Event Lists"
intro_content = """
## Introduction to Event Lists
### What are Event Lists?
In X-ray astronomy, an **Event List** represents a record of individual photon detection events as observed by a telescope. Each event corresponds to the detection of a photon and includes attributes like:
- **Time of Arrival (TOA)**: The exact time when the photon was detected.
- **Photon Energy**: Derived from the pulse height or energy channel recorded.
- **Good Time Intervals (GTIs)**: Periods during which the instrument was actively recording valid data.
- **Pulse Invariant (PI) Channel**: A standardized representation of photon energy.
Event Lists are typically the starting point for data analysis in high-energy astrophysics. They provide unbinned, high-precision information about individual photon arrivals, enabling various scientific analyses such as timing, spectral, and correlation studies.
### Scientific Significance of Event Lists
Event Lists allow astronomers to study the variability of astrophysical sources across a wide range of timescales:
- **Fast Transients**: Sources like X-ray bursts, magnetar flares, or fast radio bursts, which brighten and dim on millisecond-to-minute scales.
- **Quasi-Periodic Oscillations (QPOs)**: Oscillations in black hole and neutron star systems that vary unpredictably around a central frequency.
- **Stochastic Variability**: Random fluctuations in brightness, often associated with accretion processes.
Additionally, Event Lists are fundamental for studying:
- **Time Lags**: Delays between high- and low-energy photon emissions due to processes like reflection or turbulent flows in accretion disks.
- **Spectral Timing**: Techniques that combine time and energy data to probe the physical processes near compact objects.
### Anatomy of an Event List
An Event List is often stored as a FITS (Flexible Image Transport System) file, with each row in the table corresponding to a single detected photon. The table contains columns for various attributes:
- **Time**: Precise timestamp of the event (e.g., in seconds or Modified Julian Date).
- **Energy or PI Channel**: Photon energy or pulse invariant channel.
- **GTIs**: Intervals of valid observation time.
- **Spatial Information** (optional): Detector coordinates or celestial coordinates.
### How Event Lists are Used
Event Lists are typically processed and filtered to remove invalid events or background noise. They can then be converted into:
- **Light Curves**: Binned time series of photon counts.
- **Spectra**: Energy distributions of detected photons.
- **Power Spectra**: Frequency-domain representations of variability.
### Key Terms in Event Lists
- **Photon Time of Arrival (TOA)**: The recorded time when a photon hits the detector.
- **Good Time Intervals (GTIs)**: Periods when the instrument was actively recording valid data.
- **Pulse Invariant (PI) Channel**: A detector-specific channel number that maps to the photon’s energy.
- **RMF File**: Response Matrix File, used to calibrate PI channels into physical energy values (e.g., keV).
- **FITS Format**: The standard file format for Event Lists in high-energy astrophysics.
### Example: Event List Data Structure
A typical Event List in FITS format contains columns like:
```
TIME PI ENERGY GTI
---------------------------------
0.0012 12 2.3 keV [0, 100]
0.0034 15 3.1 keV [0, 100]
0.0048 10 1.8 keV [0, 100]
```
### Advantages of Event Lists
- **High Precision**: Tracks individual photon events without binning, preserving maximum information.
- **Flexibility**: Can be transformed into various forms (e.g., light curves, spectra) for different analyses.
- **Time-Energy Data**: Enables advanced spectral-timing techniques.
### Challenges and Considerations
- **Dead Time**: Time intervals when the detector cannot record new events, affecting variability measurements.
- **Instrumental Noise**: False events caused by electronics or background radiation.
- **Time Resolution**: Limited by the instrument's precision in recording photon arrival times.
By understanding Event Lists, astronomers gain insight into the underlying physical processes driving variability in high-energy astrophysical sources.
### References
- van der Klis, M. (2006). "Rapid X-ray Variability."
- Miniutti, G., et al. (2019). "Quasi-Periodic Eruptions in AGN."
- Galloway, D., & Keek, L. (2021). "X-ray Bursts: Physics and Observations."
- HEASARC Guidelines for FITS Event List Formats.
<br><br>
"""
eventlist_read_content = """
## Reading EventList
The `EventList.read` method is used to read event data files and load them as `EventList` objects in Stingray.
This process involves parsing photon event data, such as arrival times, PI (Pulse Invariant) channels, and energy values.
### Supported File Formats
- **`pickle`**: Serialized Python objects (not recommended for long-term storage).
- **`hea`** / **`ogip`**: FITS event files (commonly used in X-ray astronomy).
- **Other Table-supported formats**: e.g., `hdf5`, `ascii.ecsv`, etc.
### Parameters
- **`filename` (str)**: Path to the file containing the event data.
- **`fmt` (str)**: File format. Supported formats include:
- `'pickle'`
- `'hea'` or `'ogip'`
- Table-compatible formats like `'hdf5'`, `'ascii.ecsv'`.
- If `fmt` is not specified, the method attempts to infer the format based on the file extension.
- **`rmf_file` (str, default=None)**:
- Path to the RMF (Response Matrix File) for energy calibration.
- Behavior:
1. **If `fmt="hea"` or `fmt="ogip"`**:
- `rmf_file` is ignored during the `read` process.
- You must apply it manually after loading using `convert_pi_to_energy`.
2. **If `fmt` is not `hea` or `ogip`**:
- `rmf_file` can be directly specified in the `read` method for automatic energy calibration.
- **`kwargs` (dict)**:
- Additional parameters passed to the FITS reader (`load_events_and_gtis`) for reading OGIP/HEASOFT-compatible event lists.
- Example: `additional_columns` for specifying extra data columns to read.
### Attributes in the Loaded EventList
- **`time`**: Array of photon arrival times in seconds relative to `mjdref`.
- **`energy`**: Array of photon energy values (if calibrated using `rmf_file`).
- **`pi`**: Array of Pulse Invariant (PI) channels.
- **`mjdref`**: Reference time (Modified Julian Date).
- **`gtis`**: Good Time Intervals, defining valid observation periods.
### Stingray Classes and Functions in Use
Below are the key classes and methods from Stingray that are used during this process:
#### Class: `EventList`
```python
from stingray.events import EventList
class EventList:
def __init__(self, time=None, energy=None, pi=None, gti=None, mjdref=0, rmf_file=None):
# Initializes the event list with time, energy, PI channels, and other parameters
```
#### Method: `EventList.read`
```python
@classmethod
def read(cls, filename, fmt=None, rmf_file=None, **kwargs):
if fmt in ("hea", "ogip"):
evt = FITSTimeseriesReader(filename, output_class=EventList, **kwargs)[:]
if rmf_file:
evt.convert_pi_to_energy(rmf_file) # Must be applied manually for hea/ogip
return evt
return super().read(filename, fmt=fmt)
```
#### Function: `convert_pi_to_energy`
```python
def convert_pi_to_energy(self, rmf_file):
self.energy = pi_to_energy(self.pi, rmf_file)
```
### Example Usage
```python
from stingray.events import EventList
# Reading an OGIP-compatible FITS file
event_list = EventList.read("example.evt", fmt="ogip")
# Applying RMF manually after reading
event_list.convert_pi_to_energy("example.rmf")
# Reading an HDF5 file with direct RMF calibration
event_list = EventList.read("example.hdf5", fmt="hdf5", rmf_file="example.rmf")
# Accessing attributes
print(event_list.time) # Photon arrival times
print(event_list.energy) # Calibrated energy values (if rmf_file used)
print(event_list.pi) # PI channels
print(event_list.gtis) # Good Time Intervals
```
### Important Notes
1. **FITS Event Files (`hea` or `ogip`)**:
- `rmf_file` must be applied manually after loading:
```python
event_list.convert_pi_to_energy("example.rmf")
```
2. **Energy Calibration**:
- Ensure the file contains PI channel data for energy calibration.
- Without PI channels, RMF calibration will not work, and energy values will remain `None`.
3. **Good Time Intervals (GTIs)**:
- GTIs define valid observation periods and are automatically extracted from compatible files.
### Common Issues
- **Unsupported File Format**:
Ensure the file extension and format (`fmt`) match.
- **Energy Not Calibrated**:
Check for PI channels and provide an RMF file if needed.
- **Missing Columns**:
For OGIP/HEASOFT-compatible files, ensure required columns (e.g., `time`, `PI`) are available.
### Additional Parameters for Advanced Use
- **`additional_columns`**:
Specify extra columns to read from the file.
Example:
```python
event_list = EventList.read("example.fits", fmt="hea", additional_columns=["detector_id"])
```
<br><br>
"""
# Create the help box
return HelpBox(
title="Help Section",
tabs_content={
"Event Lists": pn.pane.Markdown(intro_content),
"Reading EventList": pn.pane.Markdown(eventlist_read_content),
},
)
def create_loadingdata_plots_area():
"""
Create the plots area for the data loading tab.
Returns:
PlotsContainer: An instance of PlotsContainer with the plots for the data loading tab.
Example:
>>> plots_area = create_loadingdata_plots_area()
>>> isinstance(plots_area, PlotsContainer)
True
"""
return PlotsContainer()