codebook / potato /data_sources /sources /gsheets_source.py
davidjurgens's picture
Deploy: Potato — Codebook Annotation
aceb1b2 verified
Raw
History Blame Contribute Delete
7.69 kB
"""
Google Sheets data source.
This module provides data loading from Google Sheets spreadsheets,
supporting service account authentication.
"""
import logging
from typing import Any, Dict, Iterator, List, Optional
from potato.data_sources.base import DataSource, SourceConfig
logger = logging.getLogger(__name__)
class GoogleSheetsSource(DataSource):
"""
Data source for Google Sheets.
Loads data from Google Sheets using the Sheets API with
service account authentication.
Configuration:
type: google_sheets
spreadsheet_id: "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms"
sheet_name: "Sheet1" # Optional: sheet name (default: first sheet)
range: "A:Z" # Optional: range to read
credentials_file: "credentials/service_account.json"
# Header options
header_row: 1 # Row containing headers (1-indexed)
skip_rows: 0 # Rows to skip after header
Note: Requires google-api-python-client:
pip install google-api-python-client google-auth
"""
# Check for optional dependencies
_HAS_GOOGLE_API = None
@classmethod
def _check_dependencies(cls) -> bool:
"""Check if Google API dependencies are available."""
if cls._HAS_GOOGLE_API is None:
try:
from google.oauth2 import service_account
from googleapiclient.discovery import build
cls._HAS_GOOGLE_API = True
except ImportError:
cls._HAS_GOOGLE_API = False
return cls._HAS_GOOGLE_API
def __init__(self, config: SourceConfig):
"""Initialize the Google Sheets source."""
super().__init__(config)
self._spreadsheet_id = config.config.get("spreadsheet_id", "")
self._sheet_name = config.config.get("sheet_name")
self._range = config.config.get("range", "A:Z")
self._credentials_file = config.config.get("credentials_file")
self._header_row = config.config.get("header_row", 1)
self._skip_rows = config.config.get("skip_rows", 0)
self._service = None
self._cached_data: Optional[List[Dict]] = None
def get_source_id(self) -> str:
"""Get unique identifier."""
return self._source_id
def validate_config(self) -> List[str]:
"""Validate source configuration."""
errors = []
if not self._spreadsheet_id:
errors.append("'spreadsheet_id' is required for Google Sheets source")
if not self._credentials_file:
errors.append("'credentials_file' is required for Google Sheets source")
return errors
def is_available(self) -> bool:
"""Check if the source is available."""
if not self._check_dependencies():
logger.warning(
"Google API dependencies not installed. "
"Install with: pip install google-api-python-client google-auth"
)
return False
import os
if self._credentials_file and not os.path.exists(self._credentials_file):
logger.warning(f"Credentials file not found: {self._credentials_file}")
return False
return True
def _get_service(self):
"""Get or create the Sheets API service."""
if self._service:
return self._service
from google.oauth2 import service_account
from googleapiclient.discovery import build
credentials = service_account.Credentials.from_service_account_file(
self._credentials_file,
scopes=['https://www.googleapis.com/auth/spreadsheets.readonly']
)
self._service = build('sheets', 'v4', credentials=credentials)
return self._service
def _fetch_data(self) -> List[Dict[str, Any]]:
"""Fetch and parse data from Google Sheets."""
service = self._get_service()
# Construct the range
if self._sheet_name:
range_notation = f"'{self._sheet_name}'!{self._range}"
else:
range_notation = self._range
try:
result = service.spreadsheets().values().get(
spreadsheetId=self._spreadsheet_id,
range=range_notation,
valueRenderOption='UNFORMATTED_VALUE',
dateTimeRenderOption='FORMATTED_STRING'
).execute()
values = result.get('values', [])
if not values:
logger.warning("No data found in spreadsheet")
return []
# Extract headers
header_index = self._header_row - 1 # Convert to 0-indexed
if header_index >= len(values):
raise ValueError(
f"Header row {self._header_row} is beyond data range"
)
headers = values[header_index]
# Clean up headers
headers = [str(h).strip() if h else f"column_{i}"
for i, h in enumerate(headers)]
# Extract data rows
data_start = header_index + 1 + self._skip_rows
data_rows = values[data_start:]
# Convert to list of dictionaries
items = []
for row_index, row in enumerate(data_rows):
# Skip empty rows
if not row or all(cell == '' or cell is None for cell in row):
continue
# Pad row if shorter than headers
while len(row) < len(headers):
row.append('')
item = {headers[i]: row[i] for i in range(len(headers))}
# Add row number as fallback ID if no 'id' column
if 'id' not in item:
item['_row_number'] = data_start + row_index + 1
items.append(item)
logger.info(
f"Loaded {len(items)} rows from spreadsheet "
f"(sheet={self._sheet_name or 'first'}, "
f"columns={len(headers)})"
)
return items
except Exception as e:
raise RuntimeError(f"Failed to fetch spreadsheet data: {e}")
def read_items(
self,
start: int = 0,
count: Optional[int] = None
) -> Iterator[Dict[str, Any]]:
"""Read items from Google Sheets."""
if self._cached_data is None:
self._cached_data = self._fetch_data()
items = self._cached_data[start:]
if count is not None:
items = items[:count]
yield from items
def get_total_count(self) -> Optional[int]:
"""Get total number of rows."""
if self._cached_data is None:
try:
self._cached_data = self._fetch_data()
except Exception as e:
logger.error(f"Error fetching data: {e}")
return None
return len(self._cached_data)
def supports_partial_reading(self) -> bool:
"""Partial reading is supported after initial fetch."""
return True
def refresh(self) -> bool:
"""Refresh by clearing cached data."""
self._cached_data = None
return True
def get_status(self) -> Dict[str, Any]:
"""Get source status."""
status = super().get_status()
status["spreadsheet_id"] = self._spreadsheet_id
status["sheet_name"] = self._sheet_name
status["range"] = self._range
status["cached"] = self._cached_data is not None
return status
def close(self) -> None:
"""Close the source."""
self._service = None
self._cached_data = None