File size: 12,406 Bytes
aceb1b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
"""
Spreadsheet Format Handler

Extracts data from spreadsheet files (Excel, CSV, TSV) with row/cell
coordinate mapping for annotation.

Usage:
    from potato.format_handlers.spreadsheet_handler import SpreadsheetHandler

    handler = SpreadsheetHandler()
    output = handler.extract("data.xlsx", {
        "annotation_mode": "row",  # or "cell"
        "max_rows": 1000,
    })
"""

from typing import Dict, List, Any, Optional
from pathlib import Path
import html
import logging
import csv

from .base import BaseFormatHandler, FormatOutput
from .coordinate_mapping import (
    CoordinateMapper,
    SpreadsheetCoordinate,
    get_cell_reference,
)

logger = logging.getLogger(__name__)

# Check if dependencies are available
try:
    import openpyxl
    OPENPYXL_AVAILABLE = True
except ImportError:
    OPENPYXL_AVAILABLE = False
    openpyxl = None

try:
    import pandas as pd
    PANDAS_AVAILABLE = True
except ImportError:
    PANDAS_AVAILABLE = False
    pd = None


class SpreadsheetHandler(BaseFormatHandler):
    """
    Handler for spreadsheet files.

    Supports Excel (.xlsx, .xls) via openpyxl and CSV/TSV via pandas or stdlib.
    Provides row-based and cell-based annotation modes.
    """

    format_name = "spreadsheet"
    supported_extensions = [".csv", ".tsv", ".xlsx", ".xls"]
    description = "Spreadsheet extraction with row/cell coordinate mapping"
    requires_dependencies = ["openpyxl"]

    def get_default_options(self) -> Dict[str, Any]:
        """Get default extraction options."""
        return {
            "annotation_mode": "row",  # "row", "cell", or "range"
            "max_rows": 1000,
            "header_row": 0,  # Row index for headers (None for no headers)
            "sheet_name": None,  # Sheet to extract (None for first/active)
            "skip_empty_rows": True,
            "text_columns": None,  # Columns to include (None for all)
            "row_separator": "\n",
            "cell_separator": "\t",
        }

    def extract(
        self,
        file_path: str,
        options: Optional[Dict[str, Any]] = None
    ) -> FormatOutput:
        """
        Extract data from a spreadsheet file.

        Args:
            file_path: Path to the spreadsheet file
            options: Extraction options:
                - annotation_mode: "row" (annotate rows) or "cell" (annotate cells)
                - max_rows: Maximum rows to process
                - header_row: Row index for column headers
                - sheet_name: Sheet to extract (for Excel files)

        Returns:
            FormatOutput with extracted text, HTML table, and coordinate mappings
        """
        opts = self.merge_options(options)
        path = Path(file_path)
        ext = path.suffix.lower()

        # Load data based on file type
        if ext in [".xlsx", ".xls"]:
            if not OPENPYXL_AVAILABLE:
                raise ImportError(
                    "openpyxl is required for Excel extraction. "
                    "Install with: pip install openpyxl"
                )
            data, headers, sheet_name = self._load_excel(file_path, opts)
        else:
            # CSV/TSV
            data, headers = self._load_csv(file_path, opts)
            sheet_name = None

        # Process data
        mapper = CoordinateMapper()
        text_parts = []
        html_parts = []
        current_offset = 0

        metadata = {
            "format": "spreadsheet",
            "source_file": str(file_path),
            "file_type": ext[1:],  # Remove dot
            "row_count": len(data),
            "column_count": len(headers) if headers else (len(data[0]) if data else 0),
            "headers": headers,
            "sheet_name": sheet_name,
            "annotation_mode": opts["annotation_mode"],
        }

        # Build HTML table
        html_parts.append(
            f'<div class="spreadsheet-container" data-mode="{opts["annotation_mode"]}">'
        )
        html_parts.append('<table class="spreadsheet-table">')

        # Header row
        if headers:
            html_parts.append('<thead><tr>')
            for col_idx, header in enumerate(headers):
                html_parts.append(f'<th data-col="{col_idx}">{html.escape(str(header))}</th>')
            html_parts.append('</tr></thead>')

        # Data rows
        html_parts.append('<tbody>')

        row_separator = opts["row_separator"]
        cell_separator = opts["cell_separator"]

        for row_idx, row in enumerate(data):
            row_start = current_offset
            row_texts = []

            html_parts.append(
                f'<tr class="spreadsheet-row" data-row="{row_idx}" '
                f'data-start="{row_start}">'
            )

            for col_idx, cell_value in enumerate(row):
                cell_text = str(cell_value) if cell_value is not None else ""
                cell_start = current_offset
                cell_end = cell_start + len(cell_text)

                row_texts.append(cell_text)

                # Add cell coordinate mapping
                cell_ref = get_cell_reference(row_idx, col_idx)

                if opts["annotation_mode"] == "cell":
                    mapper.add_mapping(
                        cell_start,
                        cell_end,
                        SpreadsheetCoordinate(
                            row=row_idx,
                            col=col_idx,
                            cell_ref=cell_ref,
                            sheet=sheet_name,
                        )
                    )

                # Build cell HTML
                data_attrs = (
                    f'data-row="{row_idx}" '
                    f'data-col="{col_idx}" '
                    f'data-cell-ref="{cell_ref}" '
                    f'data-start="{cell_start}" '
                    f'data-end="{cell_end}"'
                )
                html_parts.append(
                    f'<td class="spreadsheet-cell" {data_attrs}>'
                    f'{html.escape(cell_text)}</td>'
                )

                current_offset = cell_end
                if col_idx < len(row) - 1:
                    current_offset += len(cell_separator)

            html_parts.append('</tr>')

            # Build row text
            row_text = cell_separator.join(row_texts)
            text_parts.append(row_text)

            row_end = current_offset

            # Add row coordinate mapping
            if opts["annotation_mode"] == "row":
                mapper.add_mapping(
                    row_start,
                    row_end,
                    SpreadsheetCoordinate(
                        row=row_idx,
                        sheet=sheet_name,
                    )
                )

            # Add row separator
            current_offset += len(row_separator)

        html_parts.append('</tbody>')
        html_parts.append('</table>')
        html_parts.append('</div>')

        full_text = row_separator.join(text_parts)
        full_html = "\n".join(html_parts)

        coord_dict = mapper.to_dict()
        coord_dict["get_coords_for_range"] = mapper.get_coords_for_range

        return FormatOutput(
            text=full_text,
            rendered_html=full_html,
            coordinate_map=coord_dict,
            metadata=metadata,
            format_name=self.format_name,
            source_path=str(file_path),
        )

    def _load_excel(
        self,
        file_path: str,
        opts: Dict[str, Any]
    ) -> tuple:
        """
        Load data from an Excel file using openpyxl.

        Returns:
            Tuple of (data_rows, headers, sheet_name)
        """
        wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True)

        # Select sheet
        sheet_name = opts.get("sheet_name")
        if sheet_name:
            if sheet_name not in wb.sheetnames:
                raise ValueError(
                    f"Sheet '{sheet_name}' not found. "
                    f"Available: {', '.join(wb.sheetnames)}"
                )
            ws = wb[sheet_name]
        else:
            ws = wb.active
            sheet_name = ws.title

        # Read data
        data = []
        headers = None
        header_row = opts.get("header_row")
        max_rows = opts.get("max_rows", 1000)
        skip_empty = opts.get("skip_empty_rows", True)
        text_columns = opts.get("text_columns")

        for row_idx, row in enumerate(ws.iter_rows(max_row=max_rows + (1 if header_row is not None else 0))):
            row_values = [cell.value for cell in row]

            # Filter columns if specified
            if text_columns:
                row_values = [row_values[i] for i in text_columns if i < len(row_values)]

            # Check for empty rows
            if skip_empty and all(v is None or str(v).strip() == "" for v in row_values):
                continue

            # Handle header row
            if header_row is not None and row_idx == header_row:
                headers = [str(v) if v else f"Column_{i}" for i, v in enumerate(row_values)]
                continue

            data.append(row_values)

            if len(data) >= max_rows:
                break

        wb.close()
        return data, headers, sheet_name

    def _load_csv(
        self,
        file_path: str,
        opts: Dict[str, Any]
    ) -> tuple:
        """
        Load data from a CSV/TSV file.

        Returns:
            Tuple of (data_rows, headers)
        """
        path = Path(file_path)
        ext = path.suffix.lower()

        # Determine delimiter
        delimiter = "\t" if ext == ".tsv" else ","

        data = []
        headers = None
        header_row = opts.get("header_row")
        max_rows = opts.get("max_rows", 1000)
        skip_empty = opts.get("skip_empty_rows", True)
        text_columns = opts.get("text_columns")

        # Try pandas first if available
        if PANDAS_AVAILABLE:
            try:
                df = pd.read_csv(
                    file_path,
                    delimiter=delimiter,
                    header=header_row,
                    nrows=max_rows,
                    skip_blank_lines=skip_empty,
                    usecols=text_columns,
                )
                headers = df.columns.tolist()
                data = df.values.tolist()
                return data, headers
            except Exception as e:
                logger.debug(f"Pandas read failed, falling back to csv: {e}")

        # Fall back to stdlib csv
        with open(file_path, "r", encoding="utf-8", newline="") as f:
            reader = csv.reader(f, delimiter=delimiter)

            for row_idx, row in enumerate(reader):
                # Filter columns if specified
                if text_columns:
                    row = [row[i] for i in text_columns if i < len(row)]

                # Check for empty rows
                if skip_empty and all(v.strip() == "" for v in row):
                    continue

                # Handle header row
                if header_row is not None and row_idx == header_row:
                    headers = [v if v else f"Column_{i}" for i, v in enumerate(row)]
                    continue

                data.append(row)

                if len(data) >= max_rows:
                    break

        return data, headers

    def get_sheet_names(self, file_path: str) -> List[str]:
        """
        Get list of sheet names in an Excel file.

        Args:
            file_path: Path to Excel file

        Returns:
            List of sheet names
        """
        if not OPENPYXL_AVAILABLE:
            raise ImportError("openpyxl is required")

        wb = openpyxl.load_workbook(file_path, read_only=True)
        names = wb.sheetnames
        wb.close()
        return names

    def extract_sheet(
        self,
        file_path: str,
        sheet_name: str,
        options: Optional[Dict[str, Any]] = None
    ) -> FormatOutput:
        """
        Extract a specific sheet from an Excel file.

        Args:
            file_path: Path to Excel file
            sheet_name: Name of sheet to extract
            options: Extraction options

        Returns:
            FormatOutput for the specified sheet
        """
        opts = self.merge_options(options) if options else self.get_default_options()
        opts["sheet_name"] = sheet_name
        return self.extract(file_path, opts)