""" Worklist panel. Displays the worklist as a sortable table with scoring results. Users can add sequences from the registry, remove items, run analysis, select rows to create new worklists, and export as CSV. """ from __future__ import annotations import io import logging from typing import TYPE_CHECKING, List import pandas as pd import panel as pn import param if TYPE_CHECKING: from ui.state import AppState from ui.components.analysis_settings import AnalysisSettingsPanel logger = logging.getLogger(__name__) class WorklistView(param.Parameterized): """Worklist management panel.""" def __init__(self, state: "AppState", **params: object) -> None: super().__init__(**params) self._state = state self._table_widget = None self._create_wl_section = pn.Column(visible=False, sizing_mode="stretch_width") self._settings_visible = False self._settings_section = pn.Column(visible=False, sizing_mode="stretch_width") # Lazy-init settings panel on first toggle self._settings_panel: "AnalysisSettingsPanel | None" = None def _to_dataframe(self) -> pd.DataFrame: wl = self._state.worklist rows = [] for item in wl.items: seq = item.sequence row = { "id": item.id, "Name": seq.name, "Source": seq.source, "Length (nt)": seq.length, "Origin": item.origin, } # Add base analysis metrics if available if "base_analysis" in item.analyses: base = item.analyses["base_analysis"] row["GC%"] = f"{base.get('gc_content', 0):.1f}" if base.get('gc_content') else "—" row["CAI"] = f"{base.get('cai', 0):.3f}" if base.get('cai') else "—" row["Homopolymers"] = base.get('homopolymer_count', 0) row["Restriction Sites"] = base.get('restriction_site_count', 0) verdict = base.get('liability_verdict') score = base.get('liability_score') row["QC"] = f"{verdict.title()} · {score}" if verdict is not None else "—" row["Liabilities"] = base.get('liability_flag_count', 0) if base.get('liability_flag_count') is not None else "—" else: row["GC%"] = "—" row["CAI"] = "—" row["Homopolymers"] = "—" row["Restriction Sites"] = "—" row["QC"] = "—" row["Liabilities"] = "—" # Add model score columns from analyses for analysis_name, analysis_data in item.analyses.items(): if analysis_name != "base_analysis" and isinstance(analysis_data, dict) and "score" in analysis_data: row[analysis_name] = f"{analysis_data['score']:.1f}" row["Notes"] = item.notes or "" rows.append(row) return pd.DataFrame(rows) if rows else pd.DataFrame( columns=["Name", "Source", "Length (nt)", "Origin", "GC%", "CAI"] ) @param.depends("_state.worklist", "_state.model_registry") def panel(self) -> pn.Column: wl = self._state.worklist df = self._to_dataframe() # Toolbar export_btn = pn.widgets.FileDownload( callback=lambda: self._make_csv(), filename=f"{wl.name.replace(' ', '_')}_export.csv", button_type="light", label="Export CSV", margin=(4, 4), ) # Build analysis options: base analysis + loaded models + "Run All" analysis_options = ["Base Analysis"] # Add loaded scoring models if self._state.model_registry: for model_reg in self._state.model_registry.scoring_models: analysis_options.append(model_reg.model.name) analysis_options.append("Run All Analyses") analysis_select = pn.widgets.Select( name="Analysis", options=analysis_options, value="Base Analysis", width=180, margin=(4, 4), ) run_analysis_btn = pn.widgets.Button( name="Run", button_type="success", width=80, margin=(4, 4), ) run_analysis_btn.on_click(lambda event: self._run_selected_analysis(analysis_select.value)) # Analysis settings gear button settings_btn = pn.widgets.Button( name="Settings", button_type="light", width=70, margin=(4, 0), stylesheets=[""" :host .bk-btn { font-size: 11px; padding: 4px; border-radius: 4px; } """], ) def _open_settings(e): self._settings_visible = not self._settings_visible if self._settings_visible and self._settings_panel is None: from ui.components.analysis_settings import AnalysisSettingsPanel self._settings_panel = AnalysisSettingsPanel(self._state) self._settings_section.append(self._settings_panel.panel()) self._settings_section.visible = self._settings_visible settings_btn.on_click(_open_settings) # Create worklist from selection button create_wl_btn = pn.widgets.Button( name="Create Worklist from Selection", button_type="light", width=220, margin=(4, 4), stylesheets=[""" :host .bk-btn { font-size: 11px; color: #0F766E; border: 1px solid #0F766E; border-radius: 4px; } :host .bk-btn:hover { background: #F0FDFA; } """], ) toolbar = pn.Row( pn.pane.HTML( f'
' f'{wl.name} ({wl.count} items)
' ), pn.layout.HSpacer(), create_wl_btn, analysis_select, run_analysis_btn, settings_btn, export_btn, sizing_mode="stretch_width", ) # Create worklist inline form self._create_wl_section = pn.Column(visible=False, sizing_mode="stretch_width") wl_name_input = pn.widgets.TextInput( name="New Worklist Name", value=f"Selection from {wl.name}", width=300, margin=(4, 4), ) create_confirm_btn = pn.widgets.Button(name="Create", button_type="success", width=80, margin=(4, 4)) create_cancel_btn = pn.widgets.Button(name="Cancel", button_type="light", width=80, margin=(4, 4)) self._create_wl_section.extend([ pn.Row( wl_name_input, create_confirm_btn, create_cancel_btn, sizing_mode="stretch_width", styles={"background": "#F0FDFA", "padding": "8px", "border-radius": "6px", "margin": "4px 0"}, ), ]) if wl.count == 0: table_or_empty: pn.viewable.Viewable = pn.pane.HTML( '
' 'Worklist is empty. Import sequences from a database to get started.
' ) create_wl_btn.visible = False else: # Create table with checkbox selection row_height = 35 header_height = 40 calculated_height = min(len(df) * row_height + header_height, 500) table_widget = pn.widgets.Tabulator( df, hidden_columns=["id"], sizing_mode="stretch_width", show_index=False, height=calculated_height, selectable="checkbox", page_size=20, editors={col: None for col in df.columns}, ) self._table_widget = table_widget # Handle row click for inspection def on_row_click(event: object) -> None: logger.info(f"Row click event fired: {event}") if event.row is not None: row_data = df.iloc[event.row] item_id = row_data["id"] item = next((i for i in wl.items if i.id == item_id), None) if item: self._state.active_sequence = item.sequence logger.debug("Row clicked: %s", item.sequence.name) table_widget.on_click(on_row_click) # Create worklist button handler def on_create_wl(event): self._create_wl_section.visible = True create_wl_btn.on_click(on_create_wl) def on_create_confirm(event): self._create_worklist_from_selection( wl_name_input.value, table_widget.selection, df, ) self._create_wl_section.visible = False def on_create_cancel(event): self._create_wl_section.visible = False create_confirm_btn.on_click(on_create_confirm) create_cancel_btn.on_click(on_create_cancel) table_or_empty = table_widget # Origin summary chips origin_counts = df["Origin"].value_counts() if "Origin" in df.columns else {} origin_chips = " ".join( f'{k}: {v}' for k, v in origin_counts.items() ) return pn.Column( toolbar, self._settings_section, self._create_wl_section, pn.pane.HTML(f'
{origin_chips}
') if origin_chips else pn.pane.HTML(""), table_or_empty, self._liability_detail, sizing_mode="stretch_width", styles={"padding": "8px 16px"}, ) @param.depends("_state.active_sequence") def _liability_detail(self) -> pn.viewable.Viewable: """Liability / QC breakdown for the currently selected sequence.""" seq = self._state.active_sequence if seq is None: return pn.pane.HTML( '
' 'Click a row to see its liability / QC breakdown.
' ) try: from core.analysis.analyzer import SequenceAnalyzer from ui.components.analysis_dashboard import render_liability_panel report = SequenceAnalyzer().run_full_analysis(seq) return pn.Column( pn.pane.HTML( f'
' f'Liabilities — {seq.name}
' ), render_liability_panel(report), sizing_mode="stretch_width", ) except Exception as e: # noqa: BLE001 — surface any analysis error inline return pn.pane.HTML( f'
' f'Liability analysis error: {e}
' ) def _create_worklist_from_selection(self, name: str, selection: List[int], df: pd.DataFrame) -> None: """Create a new worklist from selected rows.""" if not selection: self._state.set_status("No rows selected. Use checkboxes to select sequences.") return from core.models.worklist import Worklist wl = self._state.worklist new_wl = Worklist(name=name or f"Selection ({len(selection)} items)") for idx in selection: if 0 <= idx < len(df): item_id = df.iloc[idx]["id"] item = next((i for i in wl.items if i.id == item_id), None) if item: new_wl.add(item.sequence, origin=item.origin) # Add to worklists list worklists = list(self._state.worklists) if not worklists: # Add current worklist first worklists.append(wl) worklists.append(new_wl) self._state.worklists = worklists self._state.active_worklist_index = len(worklists) - 1 self._state.worklist = new_wl self._state.set_status(f"Created worklist '{name}' with {new_wl.count} sequences") def _make_csv(self) -> io.BytesIO: """Build CSV bytes for the FileDownload widget.""" df = self._to_dataframe().drop(columns=["id"], errors="ignore") buf = io.BytesIO() df.to_csv(buf, index=False) buf.seek(0) return buf def _run_selected_analysis(self, analysis_name: str) -> None: """Run the selected analysis on all sequences that haven't been analyzed yet.""" logger.info(f"Running analysis: {analysis_name}") if analysis_name == "Run All Analyses": self._run_base_analysis() for model_reg in self._state.model_registry.scoring_models: self._run_model_analysis(model_reg.model.name) elif analysis_name == "Base Analysis": self._run_base_analysis() else: self._run_model_analysis(analysis_name) def _run_base_analysis(self) -> None: """Run base analysis on sequences that haven't been analyzed, using current settings.""" from core.analysis.analyzer import SequenceAnalyzer # Get settings from state settings = self._state.analysis_settings or {} # Build analyzer with settings analyzer = SequenceAnalyzer( gc_window=settings.get("gc_window", 100), gc_step=settings.get("gc_step", 1), homopolymer_min_run=settings.get("homopolymer_min_run", 5), restriction_enzymes=settings.get("restriction_enzymes"), cai_organism=settings.get("cai_organism", "human").lower().replace(" ", "").replace(".", ""), ) analyzed_count = 0 skipped_count = 0 for item in self._state.worklist.items: if "base_analysis" in item.analyses: skipped_count += 1 continue try: report = analyzer.run_full_analysis(item.sequence) item.analyses["base_analysis"] = report.to_dict() analyzed_count += 1 except Exception as e: item.status = "error" item.notes = str(e) self._state.param.trigger("worklist") self._state.set_status( f"Base analysis complete: {analyzed_count} analyzed, {skipped_count} skipped (already analyzed)" ) def _run_model_analysis(self, model_name: str) -> None: """Run a scoring model on sequences that haven't been scored by this model.""" analyzed_count = 0 skipped_count = 0 model_reg = next( (m for m in self._state.model_registry.all_models if m.model.name == model_name), None ) if not model_reg: self._state.set_status(f"Model '{model_name}' not found") return for item in self._state.worklist.items: if model_name in item.analyses: skipped_count += 1 continue try: score = model_reg.model.score(item.sequence) item.analyses[model_name] = {"score": score} analyzed_count += 1 except Exception as e: item.status = "error" item.notes = f"{model_name}: {str(e)}" # Record this scoring run for experiment / version tracking self._record_run(model_reg) self._state.param.trigger("worklist") self._state.set_status( f"{model_name} complete: {analyzed_count} scored, {skipped_count} skipped (already scored)" ) def _record_run(self, model_reg: object) -> None: """Capture a ModelRun snapshot of the current scores for this model.""" from datetime import datetime from models.runs import summarize_run name = model_reg.model.name # gather all current scores for this model across the worklist scores = {} for item in self._state.worklist.items: data = item.analyses.get(name) if isinstance(data, dict) and "score" in data: scores[item.sequence.id] = data["score"] if not scores: return try: version = model_reg.model.version except Exception: version = "1.0" run = summarize_run( model_name=name, model_version=str(version), model_source=getattr(model_reg, "source", ""), worklist_name=self._state.worklist.name, scores=scores, timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), ) self._state.run_history.add(run) self._state.param.trigger("run_history")