File size: 6,703 Bytes
99f834c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""
Abstract database connector interface and SchemaMapper.

Every database backend (SQLite, PostgreSQL, CSV) implements DatabaseConnector.
SchemaMapper translates arbitrary column names to the mRNASequence model fields.
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

import pandas as pd

from core.models.sequence import mRNASequence


# Fields in mRNASequence that can be mapped from a database
SEQUENCE_FIELDS = {
    "name",
    "five_prime_utr",
    "kozak",
    "cds",
    "three_prime_utr",
    "poly_a",
    "full_mrna",
}


@dataclass
class ConnectionConfig:
    """Generic connection configuration (fields vary by backend)."""
    backend: str                        # "sqlite", "postgres", "csv", "excel"
    display_name: str                   # User-facing label for the connection
    params: Dict[str, Any] = field(default_factory=dict)
    # e.g. sqlite: {"path": "/data/seqs.db"}
    # e.g. postgres: {"host": "...", "port": 5432, "dbname": "...", "user": "...", "password": "..."}
    # e.g. csv: {"path": "/data/seqs.csv"}


class DatabaseConnector(ABC):
    """Abstract database connector. One instance per active connection."""

    def __init__(self, config: ConnectionConfig) -> None:
        self.config = config
        self._connected = False

    @abstractmethod
    def connect(self) -> None:
        """Open the connection. Raises ConnectionError on failure."""
        ...

    @abstractmethod
    def disconnect(self) -> None:
        """Close the connection."""
        ...

    @abstractmethod
    def list_tables(self) -> List[str]:
        """Return available table / sheet names."""
        ...

    @abstractmethod
    def get_records(
        self,
        table: str,
        query: Optional[str] = None,
        limit: Optional[int] = None,
    ) -> pd.DataFrame:
        """
        Fetch records from a table.

        Parameters
        ----------
        table : str
            Table name (from list_tables).
        query : str, optional
            Backend-specific filter string (SQL WHERE clause for SQL backends,
            pandas query string for file backends).
        limit : int, optional
            Max rows to return.
        """
        ...

    @abstractmethod
    def get_columns(self, table: str) -> List[str]:
        """Return column names for a table."""
        ...

    @property
    def is_connected(self) -> bool:
        return self._connected

    @property
    def name(self) -> str:
        return self.config.display_name

    def __repr__(self) -> str:
        status = "connected" if self._connected else "disconnected"
        return f"{self.__class__.__name__}({self.name!r}, {status})"


# ── Schema Mapper ────────────────────────────────────────────────────────────

@dataclass
class FieldMapping:
    """
    Describes how one database column maps to a mRNASequence field.

    source_column : str
        Column name in the database.
    target_field : str
        Field name in mRNASequence. Must be in SEQUENCE_FIELDS.
    transform : callable, optional
        Optional transform applied to the raw value before assignment.
        E.g. str.upper, lambda x: x.replace(" ", "")
    """
    source_column: str
    target_field: str
    transform: Optional[Any] = None   # callable or None

    def __post_init__(self) -> None:
        if self.target_field not in SEQUENCE_FIELDS:
            raise ValueError(
                f"'{self.target_field}' is not a valid mRNASequence field. "
                f"Valid fields: {sorted(SEQUENCE_FIELDS)}"
            )


class SchemaMapper:
    """
    Maps a DataFrame (from any DatabaseConnector) to a list of mRNASequence
    objects using a user-configured field mapping.

    Example
    -------
    mapper = SchemaMapper([
        FieldMapping("mrna_sequence", "full_mrna"),
        FieldMapping("gene_name", "name"),
        FieldMapping("utr5_sequence", "five_prime_utr", transform=str.upper),
    ])
    sequences = mapper.map_dataframe(df, db_source="my_lims")
    """

    def __init__(self, mappings: List[FieldMapping], db_source: str = "") -> None:
        self.mappings = mappings
        self.db_source = db_source
        # Validate: exactly one mapping targeting 'name' must exist
        name_targets = [m for m in mappings if m.target_field == "name"]
        if not name_targets:
            raise ValueError(
                "SchemaMapper requires at least one FieldMapping targeting 'name'."
            )

    def map_row(self, row: Dict[str, Any]) -> mRNASequence:
        """Map a single row dict to an mRNASequence."""
        kwargs: Dict[str, Any] = {
            "source": "database",
            "db_source": self.db_source,
            "raw_metadata": dict(row),
        }
        for mapping in self.mappings:
            value = row.get(mapping.source_column)
            # Skip None and NaN values (pandas often returns NaN for SQL NULL)
            if value is None or (isinstance(value, float) and pd.isna(value)):
                continue
            if mapping.transform is not None:
                try:
                    value = mapping.transform(value)
                except Exception:
                    pass
            kwargs[mapping.target_field] = value
        # name is required β€” fall back to first non-empty string value in the row
        if "name" not in kwargs or not kwargs["name"]:
            for v in row.values():
                if isinstance(v, str) and v.strip():
                    kwargs["name"] = v.strip()[:80]
                    break
            else:
                kwargs["name"] = "unnamed"
        return mRNASequence(**kwargs)  # type: ignore[arg-type]

    def map_dataframe(self, df: pd.DataFrame) -> List[mRNASequence]:
        """Map every row in df to an mRNASequence."""
        return [self.map_row(row.to_dict()) for _, row in df.iterrows()]

    @classmethod
    def from_dict(cls, mapping_dict: Dict[str, str], db_source: str = "") -> "SchemaMapper":
        """
        Convenience constructor from a plain {db_column: sequence_field} dict.

        Example
        -------
        mapper = SchemaMapper.from_dict({
            "gene_name": "name",
            "mrna_seq": "full_mrna",
            "utr": "five_prime_utr",
        })
        """
        mappings = [
            FieldMapping(source_column=col, target_field=field_)
            for col, field_ in mapping_dict.items()
        ]
        return cls(mappings, db_source=db_source)