| """CSV and Excel flat-file connector.""" |
| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Dict, List, Optional |
|
|
| import pandas as pd |
|
|
| from core.database.base import ConnectionConfig, DatabaseConnector |
|
|
|
|
| class CSVConnector(DatabaseConnector): |
| """ |
| Connector for CSV and Excel flat files. |
| |
| For CSV: treats the single file as one 'table' named by the filename stem. |
| For Excel: each worksheet is a 'table'. |
| A directory of CSV files is also supported — each file becomes a table. |
| """ |
|
|
| def __init__(self, config: ConnectionConfig) -> None: |
| super().__init__(config) |
| self._dataframes: Dict[str, pd.DataFrame] = {} |
|
|
| def connect(self) -> None: |
| path_str = self.config.params.get("path") |
| if not path_str: |
| raise ValueError("CSV/Excel config must include 'path'.") |
| path = Path(path_str) |
| if not path.exists(): |
| raise FileNotFoundError(f"File not found: {path}") |
|
|
| self._dataframes = {} |
|
|
| if path.is_dir(): |
| |
| for csv_file in sorted(path.glob("*.csv")): |
| df = pd.read_csv(csv_file) |
| self._dataframes[csv_file.stem] = df |
| if not self._dataframes: |
| raise ValueError(f"No CSV files found in directory: {path}") |
|
|
| elif path.suffix.lower() in (".xlsx", ".xls"): |
| xl = pd.ExcelFile(path) |
| for sheet in xl.sheet_names: |
| self._dataframes[sheet] = xl.parse(sheet) |
|
|
| elif path.suffix.lower() == ".csv": |
| df = pd.read_csv(path) |
| self._dataframes[path.stem] = df |
|
|
| else: |
| raise ValueError( |
| f"Unsupported file type: {path.suffix}. Use .csv, .xlsx, or .xls." |
| ) |
|
|
| self._connected = True |
|
|
| def disconnect(self) -> None: |
| self._dataframes.clear() |
| self._connected = False |
|
|
| def list_tables(self) -> List[str]: |
| return list(self._dataframes.keys()) |
|
|
| def get_columns(self, table: str) -> List[str]: |
| self._require_connected() |
| df = self._get_table(table) |
| return list(df.columns) |
|
|
| def get_records( |
| self, |
| table: str, |
| query: Optional[str] = None, |
| limit: Optional[int] = None, |
| ) -> pd.DataFrame: |
| self._require_connected() |
| df = self._get_table(table).copy() |
| if query: |
| try: |
| df = df.query(query) |
| except Exception as e: |
| raise ValueError(f"Query error: {e}") from e |
| if limit: |
| df = df.head(limit) |
| return df.reset_index(drop=True) |
|
|
| def _get_table(self, table: str) -> pd.DataFrame: |
| if table not in self._dataframes: |
| raise KeyError( |
| f"Table '{table}' not found. Available: {self.list_tables()}" |
| ) |
| return self._dataframes[table] |
|
|
| def _require_connected(self) -> None: |
| if not self._connected: |
| raise RuntimeError("Not connected. Call connect() first.") |
|
|