Spaces:
Paused
Paused
| """ | |
| Google Sheets data source. | |
| This module provides data loading from Google Sheets spreadsheets, | |
| supporting service account authentication. | |
| """ | |
| import logging | |
| from typing import Any, Dict, Iterator, List, Optional | |
| from potato.data_sources.base import DataSource, SourceConfig | |
| logger = logging.getLogger(__name__) | |
| class GoogleSheetsSource(DataSource): | |
| """ | |
| Data source for Google Sheets. | |
| Loads data from Google Sheets using the Sheets API with | |
| service account authentication. | |
| Configuration: | |
| type: google_sheets | |
| spreadsheet_id: "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms" | |
| sheet_name: "Sheet1" # Optional: sheet name (default: first sheet) | |
| range: "A:Z" # Optional: range to read | |
| credentials_file: "credentials/service_account.json" | |
| # Header options | |
| header_row: 1 # Row containing headers (1-indexed) | |
| skip_rows: 0 # Rows to skip after header | |
| Note: Requires google-api-python-client: | |
| pip install google-api-python-client google-auth | |
| """ | |
| # Check for optional dependencies | |
| _HAS_GOOGLE_API = None | |
| def _check_dependencies(cls) -> bool: | |
| """Check if Google API dependencies are available.""" | |
| if cls._HAS_GOOGLE_API is None: | |
| try: | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| cls._HAS_GOOGLE_API = True | |
| except ImportError: | |
| cls._HAS_GOOGLE_API = False | |
| return cls._HAS_GOOGLE_API | |
| def __init__(self, config: SourceConfig): | |
| """Initialize the Google Sheets source.""" | |
| super().__init__(config) | |
| self._spreadsheet_id = config.config.get("spreadsheet_id", "") | |
| self._sheet_name = config.config.get("sheet_name") | |
| self._range = config.config.get("range", "A:Z") | |
| self._credentials_file = config.config.get("credentials_file") | |
| self._header_row = config.config.get("header_row", 1) | |
| self._skip_rows = config.config.get("skip_rows", 0) | |
| self._service = None | |
| self._cached_data: Optional[List[Dict]] = None | |
| def get_source_id(self) -> str: | |
| """Get unique identifier.""" | |
| return self._source_id | |
| def validate_config(self) -> List[str]: | |
| """Validate source configuration.""" | |
| errors = [] | |
| if not self._spreadsheet_id: | |
| errors.append("'spreadsheet_id' is required for Google Sheets source") | |
| if not self._credentials_file: | |
| errors.append("'credentials_file' is required for Google Sheets source") | |
| return errors | |
| def is_available(self) -> bool: | |
| """Check if the source is available.""" | |
| if not self._check_dependencies(): | |
| logger.warning( | |
| "Google API dependencies not installed. " | |
| "Install with: pip install google-api-python-client google-auth" | |
| ) | |
| return False | |
| import os | |
| if self._credentials_file and not os.path.exists(self._credentials_file): | |
| logger.warning(f"Credentials file not found: {self._credentials_file}") | |
| return False | |
| return True | |
| def _get_service(self): | |
| """Get or create the Sheets API service.""" | |
| if self._service: | |
| return self._service | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| credentials = service_account.Credentials.from_service_account_file( | |
| self._credentials_file, | |
| scopes=['https://www.googleapis.com/auth/spreadsheets.readonly'] | |
| ) | |
| self._service = build('sheets', 'v4', credentials=credentials) | |
| return self._service | |
| def _fetch_data(self) -> List[Dict[str, Any]]: | |
| """Fetch and parse data from Google Sheets.""" | |
| service = self._get_service() | |
| # Construct the range | |
| if self._sheet_name: | |
| range_notation = f"'{self._sheet_name}'!{self._range}" | |
| else: | |
| range_notation = self._range | |
| try: | |
| result = service.spreadsheets().values().get( | |
| spreadsheetId=self._spreadsheet_id, | |
| range=range_notation, | |
| valueRenderOption='UNFORMATTED_VALUE', | |
| dateTimeRenderOption='FORMATTED_STRING' | |
| ).execute() | |
| values = result.get('values', []) | |
| if not values: | |
| logger.warning("No data found in spreadsheet") | |
| return [] | |
| # Extract headers | |
| header_index = self._header_row - 1 # Convert to 0-indexed | |
| if header_index >= len(values): | |
| raise ValueError( | |
| f"Header row {self._header_row} is beyond data range" | |
| ) | |
| headers = values[header_index] | |
| # Clean up headers | |
| headers = [str(h).strip() if h else f"column_{i}" | |
| for i, h in enumerate(headers)] | |
| # Extract data rows | |
| data_start = header_index + 1 + self._skip_rows | |
| data_rows = values[data_start:] | |
| # Convert to list of dictionaries | |
| items = [] | |
| for row_index, row in enumerate(data_rows): | |
| # Skip empty rows | |
| if not row or all(cell == '' or cell is None for cell in row): | |
| continue | |
| # Pad row if shorter than headers | |
| while len(row) < len(headers): | |
| row.append('') | |
| item = {headers[i]: row[i] for i in range(len(headers))} | |
| # Add row number as fallback ID if no 'id' column | |
| if 'id' not in item: | |
| item['_row_number'] = data_start + row_index + 1 | |
| items.append(item) | |
| logger.info( | |
| f"Loaded {len(items)} rows from spreadsheet " | |
| f"(sheet={self._sheet_name or 'first'}, " | |
| f"columns={len(headers)})" | |
| ) | |
| return items | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to fetch spreadsheet data: {e}") | |
| def read_items( | |
| self, | |
| start: int = 0, | |
| count: Optional[int] = None | |
| ) -> Iterator[Dict[str, Any]]: | |
| """Read items from Google Sheets.""" | |
| if self._cached_data is None: | |
| self._cached_data = self._fetch_data() | |
| items = self._cached_data[start:] | |
| if count is not None: | |
| items = items[:count] | |
| yield from items | |
| def get_total_count(self) -> Optional[int]: | |
| """Get total number of rows.""" | |
| if self._cached_data is None: | |
| try: | |
| self._cached_data = self._fetch_data() | |
| except Exception as e: | |
| logger.error(f"Error fetching data: {e}") | |
| return None | |
| return len(self._cached_data) | |
| def supports_partial_reading(self) -> bool: | |
| """Partial reading is supported after initial fetch.""" | |
| return True | |
| def refresh(self) -> bool: | |
| """Refresh by clearing cached data.""" | |
| self._cached_data = None | |
| return True | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get source status.""" | |
| status = super().get_status() | |
| status["spreadsheet_id"] = self._spreadsheet_id | |
| status["sheet_name"] = self._sheet_name | |
| status["range"] = self._range | |
| status["cached"] = self._cached_data is not None | |
| return status | |
| def close(self) -> None: | |
| """Close the source.""" | |
| self._service = None | |
| self._cached_data = None | |