File size: 5,766 Bytes
05fdb87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from pathlib import Path
from anndata import AnnData
import datetime


@dataclass
class DataSource:
    """Represents a loaded h5ad data source"""
    id: str  # Unique identifier
    name: str  # Display name
    source_type: str  # 'demo', 'url', 'upload'
    source_path: str  # Original source (URL, file path, etc.)
    adata: Optional[AnnData]  # The loaded AnnData object (Optional for lazy loading)
    loaded_at: Optional[datetime.datetime]  # When it was loaded
    n_obs: int = 0  # Number of observations
    n_vars: int = 0  # Number of variables

    def get_display_name(self) -> str:
        """Get formatted display name with metadata"""
        if self.adata is not None:
            return f"{self.name} ({self.n_obs:,} cells, {self.n_vars:,} genes)"
        return f"{self.name} (Not loaded)"

    def get_info(self) -> str:
        """Get detailed information string"""
        return (
            f"Dataset: {self.name}\n"
            f"Source: {self.source_type}\n"
            f"Cells/Spots: {self.n_obs:,}\n"
            f"Genes: {self.n_vars:,}\n"
            f"Loaded: {self.loaded_at.strftime('%Y-%m-%d %H:%M:%S')}"
        )


class DataSourceManager:
    """
    Manage multiple loaded h5ad datasets

    This class handles:
    - Tracking all loaded datasets
    - Switching between datasets
    - Providing dataset metadata
    """

    def __init__(self):
        self.sources: Dict[str, DataSource] = {}
        self.current_id: Optional[str] = None
        self._id_counter = 0

    def add_source(
        self,
        name: str,
        source_type: str,
        source_path: str,
        adata: Optional[AnnData] = None
    ) -> str:
        """
        Add a new data source

        Args:
            name: Display name for the dataset
            source_type: Type of source ('demo', 'url', 'upload')
            source_path: Original source location
            adata: Optional loaded AnnData object

        Returns:
            Unique ID of the added source
        """
        # Check if already exists by source_path to avoid duplicates
        for existing_id, source in self.sources.items():
            if source.source_path == source_path:
                if adata is not None and source.adata is None:
                    # Update existing source with loaded adata
                    source.adata = adata
                    source.loaded_at = datetime.datetime.now()
                    source.n_obs = adata.n_obs
                    source.n_vars = adata.n_vars
                return existing_id

        # Generate unique ID
        source_id = f"ds_{self._id_counter}"
        self._id_counter += 1

        # Create data source
        source = DataSource(
            id=source_id,
            name=name,
            source_type=source_type,
            source_path=source_path,
            adata=adata,
            loaded_at=datetime.datetime.now() if adata is not None else None,
            n_obs=adata.n_obs if adata is not None else 0,
            n_vars=adata.n_vars if adata is not None else 0
        )

        self.sources[source_id] = source

        # Set as current if it's the first one
        if self.current_id is None:
            self.current_id = source_id

        return source_id

    def get_source(self, source_id: str) -> Optional[DataSource]:
        """Get a data source by ID"""
        return self.sources.get(source_id)

    def get_current_source(self) -> Optional[DataSource]:
        """Get the currently active data source"""
        if self.current_id is None:
            return None
        return self.sources.get(self.current_id)

    def set_current(self, source_id: str) -> bool:
        """
        Set the current active data source

        Args:
            source_id: ID of the source to activate

        Returns:
            True if successful, False if source not found
        """
        if source_id in self.sources:
            self.current_id = source_id
            return True
        return False

    def get_all_sources(self) -> List[DataSource]:
        """Get list of all loaded data sources"""
        return list(self.sources.values())

    def get_source_choices(self) -> List[Tuple[str, str]]:
        """
        Get list of sources for dropdown/radio selection

        Returns:
            List of (display_name, source_id) tuples
        """
        return [
            (source.get_display_name(), source.id)
            for source in self.sources.values()
        ]

    def get_source_names(self) -> List[str]:
        """Get list of source display names"""
        return [source.name for source in self.sources.values()]

    def remove_source(self, source_id: str) -> bool:
        """
        Remove a data source

        Args:
            source_id: ID of source to remove

        Returns:
            True if removed, False if not found
        """
        if source_id in self.sources:
            del self.sources[source_id]

            # Update current_id if we removed the current source
            if self.current_id == source_id:
                if len(self.sources) > 0:
                    self.current_id = list(self.sources.keys())[0]
                else:
                    self.current_id = None

            return True
        return False

    def has_sources(self) -> bool:
        """Check if any sources are loaded"""
        return len(self.sources) > 0

    def count_sources(self) -> int:
        """Get number of loaded sources"""
        return len(self.sources)

    def clear_all(self):
        """Remove all data sources"""
        self.sources.clear()
        self.current_id = None
        self._id_counter = 0