Tools / src /google_src /google_sheet.py
jebin2's picture
refactor: Centralize logger import to src.logger_config across various modules.
f20025d
import os
import csv
import tempfile
from pathlib import Path
from typing import Optional, TYPE_CHECKING
import pandas as pd
import gspread
from google.auth import default
from google.auth.credentials import Credentials as BaseCredentials
from src.logger_config import logger
from src.utils import get_temp_dir
from src.config import get_config_value
if TYPE_CHECKING:
from google_src import GCloudWrapper
class GoogleSheetReader:
"""
Utility class to read & write Google Sheets using ADC (Local) / WIF (CI).
"""
SCOPES = [
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive.file",
]
def __init__(
self,
worksheet_name: str | None = None,
gcloud_wrapper: Optional["GCloudWrapper"] = None,
account_id: str = "default",
sheet_id: Optional[str] = None,
):
"""
Initialize GoogleSheetReader.
"""
logger.debug("Initializing GoogleSheetReader")
if sheet_id is not None:
self.sheet_id = sheet_id
else:
self.sheet_id = get_config_value("GSHEET_ID")
if not self.sheet_id:
logger.error("Must provide GSHEET_ID")
raise RuntimeError("Must provide GSHEET_ID")
self.worksheet_name = worksheet_name
self._gcloud_wrapper = gcloud_wrapper
self._account_id = account_id
if self.worksheet_name:
logger.debug(
"Sheet config | id=%s | worksheet=%s | wrapper=%s",
self.sheet_id,
self.worksheet_name,
"yes" if gcloud_wrapper else "no (using ADC)",
)
self.client = self._authorize()
self.sheet = self._open_sheet()
# -------- Temp directory handling --------
self.temp_dir = self._init_temp_dir()
logger.debug("Using temp directory: %s", self.temp_dir)
@staticmethod
def get_log_reader(worksheet_name: str, initial_header: Optional[list[str]] = None) -> "GoogleSheetReader":
"""
Factory method to get a reader for the Logs spreadsheet.
Derives log sheet name from the current configured sheet's name.
Creates the log sheet if it doesn't exist.
If initial_header is provided, ensures it exists on the sheet.
"""
log_sheet_id = get_config_value("LOG_GSHEET_ID")
reader = None
if log_sheet_id:
try:
reader = GoogleSheetReader(
worksheet_name=worksheet_name,
sheet_id=log_sheet_id
)
except Exception as e:
logger.error("Failed to open configured LOG_GSHEET_ID %s: %s", log_sheet_id, e)
if not reader:
try:
default_reader = GoogleSheetReader(worksheet_name=None)
current_sheet_id = default_reader.sheet.spreadsheet.id
reader = GoogleSheetReader(
worksheet_name=worksheet_name,
sheet_id=current_sheet_id
)
except Exception as e:
raise RuntimeError(f"Could not open default sheet to get ID: {e}")
if reader and initial_header:
reader._ensure_header(reader.sheet, initial_header)
return reader
# ------------------ Internal helpers ------------------
def _get_env(self, key: str) -> str:
value = get_config_value(key)
if not value:
logger.error("Missing required env variable: %s", key)
raise RuntimeError(f"Missing required env variable: {key}")
return value
def _authorize(self):
# Use wrapper if provided, otherwise fall back to ADC
if self._gcloud_wrapper:
logger.debug("Authorizing via GCloudWrapper (account: %s)", self._account_id)
return self._gcloud_wrapper.get_sheets_client(self._account_id)
logger.debug("Authorizing with Application Default Credentials (ADC)")
creds, _ = default()
# IMPORTANT: re-scope user ADC credentials
if isinstance(creds, BaseCredentials) and hasattr(creds, "with_scopes"):
creds = creds.with_scopes(self.SCOPES)
return gspread.authorize(creds)
def _open_sheet(self):
"""Open spreadsheet by ID"""
logger.debug("Opening spreadsheet by ID: %s", self.sheet_id)
spreadsheet = self.client.open_by_key(self.sheet_id)
if self.worksheet_name:
logger.debug("Opening worksheet: %s", self.worksheet_name)
try:
return spreadsheet.worksheet(self.worksheet_name)
except gspread.WorksheetNotFound:
logger.warning("Worksheet not found. Creating: %s", self.worksheet_name)
return spreadsheet.add_worksheet(
title=self.worksheet_name,
rows=1000,
cols=26,
)
logger.debug("Opening default worksheet (sheet1)")
return spreadsheet.sheet1
def _init_temp_dir(self) -> Path:
"""
Creates a temp directory.
Uses fixed path during test automation if configured.
"""
return get_temp_dir(prefix="gsheet_")
# ------------------ Sheet creation helpers ------------------
def _get_or_create_spreadsheet(self, sheet_name: str):
try:
logger.debug("Opening spreadsheet: %s", sheet_name)
return self.client.open(sheet_name)
except gspread.SpreadsheetNotFound:
logger.warning("Spreadsheet not found. Creating new one: %s", sheet_name)
return self.client.create(sheet_name)
def _get_or_create_worksheet(
self,
spreadsheet,
worksheet_name: str,
rows: int = 1000,
cols: int = 26,
):
try:
logger.debug("Opening worksheet: %s", worksheet_name)
return spreadsheet.worksheet(worksheet_name)
except gspread.WorksheetNotFound:
logger.warning("Worksheet not found. Creating: %s", worksheet_name)
return spreadsheet.add_worksheet(
title=worksheet_name,
rows=rows,
cols=cols,
)
def _ensure_header(self, worksheet, header: list[str]) -> list[str]:
"""
Ensures header row exists and contains all required columns.
Returns final header order.
"""
existing = worksheet.row_values(1)
# Case 1: Empty sheet
if not existing:
logger.debug("Sheet empty. Writing header.")
worksheet.insert_row(header, index=1)
return header
# Case 2: Header exists, check for missing columns
updated = False
final_header = existing.copy()
for col in header:
if col not in final_header:
logger.debug("Adding missing header column: %s", col)
final_header.append(col)
updated = True
if updated:
worksheet.update(values=[final_header], range_name="1:1")
return final_header
# ------------------ Core APIs ------------------
def get_all_values(self) -> list:
logger.debug("Fetching all values from sheet")
return self.sheet.get_all_values()
def get_dataframe(self):
logger.debug("Converting sheet to DataFrame")
data = self.get_all_values()
return pd.DataFrame(data[1:], columns=data[0])
# ------------------ Filtering APIs ------------------
def filter_rows(
self,
column_name: str = "STATUS",
match_value: str = "SELECTED",
case_insensitive: bool = True,
) -> list:
logger.debug(
"Filtering rows | column=%s | value=%s | case_insensitive=%s",
column_name,
match_value,
case_insensitive,
)
rows = self.get_all_values()
if not rows:
logger.warning("Sheet is empty")
return []
header = rows[0]
try:
col_idx = header.index(column_name)
except ValueError:
logger.error("Column '%s' not found in sheet", column_name)
raise RuntimeError(f"Column '{column_name}' not found in sheet")
def _match(val: str) -> bool:
if case_insensitive:
return val.strip().lower() == match_value.lower()
return val == match_value
filtered = [
row for row in rows[1:]
if len(row) > col_idx and _match(row[col_idx])
]
logger.debug(
"Filtered rows: %d / %d",
len(filtered),
max(len(rows) - 1, 0),
)
return [header] + filtered
def get_filtered_dataframe(
self,
column_name: str = "STATUS",
match_value: str = "SELECTED",
case_insensitive: bool = True,
):
logger.debug("Creating filtered DataFrame")
rows = self.filter_rows(
column_name=column_name,
match_value=match_value,
case_insensitive=case_insensitive,
)
if len(rows) <= 1:
logger.warning("No matching rows found")
return pd.DataFrame(columns=rows[0] if rows else [])
return pd.DataFrame(rows[1:], columns=rows[0])
def create_or_update_sheet(
self,
worksheet_name: str | None = None,
header: list[str] = None,
values: list[dict] = None,
target_row: int | None = None,
):
"""
Create or update a sheet + worksheet.
Ensures headers exist and appends/inserts values.
If sheet_name or worksheet_name not provided, uses instance's sheet.
values: List of dicts where keys match header names
target_row: Optional 1-indexed row to write to. If row has data, inserts below it.
If None, appends to end of sheet.
"""
if not values:
logger.warning("No values provided. Nothing to append.")
return
logger.debug("Using instance sheet by ID: %s", self.sheet_id)
spreadsheet = self.client.open_by_key(self.sheet_id)
# Use instance's worksheet if not specified
if worksheet_name is None:
worksheet_name = self.worksheet_name or "Sheet1"
logger.debug("Using worksheet name: %s", worksheet_name)
worksheet = self._get_or_create_worksheet(spreadsheet, worksheet_name)
final_header = self._ensure_header(worksheet, header)
# Convert dict rows -> ordered list rows
rows_to_write = []
for item in values:
row = [item.get(col, "") for col in final_header]
rows_to_write.append(row)
if target_row is not None:
# Check if target row has data
try:
existing_row = worksheet.row_values(target_row)
except Exception:
existing_row = []
if existing_row and any(cell.strip() for cell in existing_row):
# Row has data, insert new row below it
worksheet.insert_rows(rows_to_write, row=target_row + 1, value_input_option="USER_ENTERED")
logger.debug(f"Inserted {len(rows_to_write)} rows below row {target_row}")
else:
# Row is empty, write directly to it
cell_range = f"A{target_row}"
worksheet.update(values=rows_to_write, range_name=cell_range, value_input_option="USER_ENTERED")
logger.debug(f"Updated row {target_row}")
else:
# No target row, append to end
logger.debug(
"Appending %d rows to %s / %s",
len(rows_to_write),
f"ID:{self.sheet_id}",
worksheet_name,
)
worksheet.append_rows(
rows_to_write,
value_input_option="USER_ENTERED",
)
# ------------------ CSV Export ------------------
def export_csv(
self,
output_path: str | None = None,
column_name: str | None = None,
match_value: str = "SELECTED",
case_insensitive: bool = True,
) -> Path:
"""
Export sheet (or filtered rows) to CSV.
If output_path is None, file is written inside temp_dir.
"""
if output_path:
output_path = Path(output_path)
else:
output_path = self.temp_dir / "sheet_export.csv"
logger.debug("Exporting CSV to %s", output_path)
if column_name:
rows = self.filter_rows(
column_name=column_name,
match_value=match_value,
case_insensitive=case_insensitive,
)
else:
rows = self.get_all_values()
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerows(rows)
logger.debug("CSV export completed | rows=%d", len(rows) - 1)
return output_path
def delete_rows(self, row_indices: list[int]):
"""
Delete multiple rows by their 1-based index.
Deletes from bottom to top to maintain index validity.
"""
if not row_indices:
return
# Sort descending so we delete from bottom up
sorted_indices = sorted(row_indices, reverse=True)
logger.debug(f"Deleting {len(sorted_indices)} rows from sheet...")
for idx in sorted_indices:
try:
self.sheet.delete_rows(idx)
logger.debug(f"Deleted row {idx}")
except Exception as e:
logger.error(f"Failed to delete row {idx}: {e}")
# ------------------ CLI entrypoint ------------------
def main():
"""
Examples are controlled via env flags:
- EXAMPLE_WRITE=true
- EXAMPLE_FILTER_EXPORT=true
"""
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
reader = GoogleSheetReader(worksheet_name=get_config_value("video_library_gsheet_worksheet"))
if __name__ == "__main__":
main()