| |
| from __future__ import annotations |
|
|
| from typing import TYPE_CHECKING |
|
|
| from pandas.compat._optional import import_optional_dependency |
| from pandas.util._decorators import doc |
|
|
| from pandas.core.shared_docs import _shared_docs |
|
|
| from pandas.io.excel._base import BaseExcelReader |
|
|
| if TYPE_CHECKING: |
| from pyxlsb import Workbook |
|
|
| from pandas._typing import ( |
| FilePath, |
| ReadBuffer, |
| Scalar, |
| StorageOptions, |
| ) |
|
|
|
|
| class PyxlsbReader(BaseExcelReader["Workbook"]): |
| @doc(storage_options=_shared_docs["storage_options"]) |
| def __init__( |
| self, |
| filepath_or_buffer: FilePath | ReadBuffer[bytes], |
| storage_options: StorageOptions | None = None, |
| engine_kwargs: dict | None = None, |
| ) -> None: |
| """ |
| Reader using pyxlsb engine. |
| |
| Parameters |
| ---------- |
| filepath_or_buffer : str, path object, or Workbook |
| Object to be parsed. |
| {storage_options} |
| engine_kwargs : dict, optional |
| Arbitrary keyword arguments passed to excel engine. |
| """ |
| import_optional_dependency("pyxlsb") |
| |
| |
| super().__init__( |
| filepath_or_buffer, |
| storage_options=storage_options, |
| engine_kwargs=engine_kwargs, |
| ) |
|
|
| @property |
| def _workbook_class(self) -> type[Workbook]: |
| from pyxlsb import Workbook |
|
|
| return Workbook |
|
|
| def load_workbook( |
| self, filepath_or_buffer: FilePath | ReadBuffer[bytes], engine_kwargs |
| ) -> Workbook: |
| from pyxlsb import open_workbook |
|
|
| |
| |
| |
|
|
| return open_workbook(filepath_or_buffer, **engine_kwargs) |
|
|
| @property |
| def sheet_names(self) -> list[str]: |
| return self.book.sheets |
|
|
| def get_sheet_by_name(self, name: str): |
| self.raise_if_bad_sheet_by_name(name) |
| return self.book.get_sheet(name) |
|
|
| def get_sheet_by_index(self, index: int): |
| self.raise_if_bad_sheet_by_index(index) |
| |
| |
| return self.book.get_sheet(index + 1) |
|
|
| def _convert_cell(self, cell) -> Scalar: |
| |
| |
| if cell.v is None: |
| return "" |
| if isinstance(cell.v, float): |
| val = int(cell.v) |
| if val == cell.v: |
| return val |
| else: |
| return float(cell.v) |
|
|
| return cell.v |
|
|
| def get_sheet_data( |
| self, |
| sheet, |
| file_rows_needed: int | None = None, |
| ) -> list[list[Scalar]]: |
| data: list[list[Scalar]] = [] |
| previous_row_number = -1 |
| |
| |
| for row in sheet.rows(sparse=True): |
| row_number = row[0].r |
| converted_row = [self._convert_cell(cell) for cell in row] |
| while converted_row and converted_row[-1] == "": |
| |
| converted_row.pop() |
| if converted_row: |
| data.extend([[]] * (row_number - previous_row_number - 1)) |
| data.append(converted_row) |
| previous_row_number = row_number |
| if file_rows_needed is not None and len(data) >= file_rows_needed: |
| break |
| if data: |
| |
| max_width = max(len(data_row) for data_row in data) |
| if min(len(data_row) for data_row in data) < max_width: |
| empty_cell: list[Scalar] = [""] |
| data = [ |
| data_row + (max_width - len(data_row)) * empty_cell |
| for data_row in data |
| ] |
| return data |
|
|