import os import csv import tempfile from pathlib import Path from typing import Optional, TYPE_CHECKING import pandas as pd import gspread from google.auth import default from google.auth.credentials import Credentials as BaseCredentials from src.logger_config import logger from src.utils import get_temp_dir from src.config import get_config_value if TYPE_CHECKING: from google_src import GCloudWrapper class GoogleSheetReader: """ Utility class to read & write Google Sheets using ADC (Local) / WIF (CI). """ SCOPES = [ "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive.file", ] def __init__( self, worksheet_name: str | None = None, gcloud_wrapper: Optional["GCloudWrapper"] = None, account_id: str = "default", sheet_id: Optional[str] = None, ): """ Initialize GoogleSheetReader. """ logger.debug("Initializing GoogleSheetReader") if sheet_id is not None: self.sheet_id = sheet_id else: self.sheet_id = get_config_value("GSHEET_ID") if not self.sheet_id: logger.error("Must provide GSHEET_ID") raise RuntimeError("Must provide GSHEET_ID") self.worksheet_name = worksheet_name self._gcloud_wrapper = gcloud_wrapper self._account_id = account_id if self.worksheet_name: logger.debug( "Sheet config | id=%s | worksheet=%s | wrapper=%s", self.sheet_id, self.worksheet_name, "yes" if gcloud_wrapper else "no (using ADC)", ) self.client = self._authorize() self.sheet = self._open_sheet() # -------- Temp directory handling -------- self.temp_dir = self._init_temp_dir() logger.debug("Using temp directory: %s", self.temp_dir) @staticmethod def get_log_reader(worksheet_name: str, initial_header: Optional[list[str]] = None) -> "GoogleSheetReader": """ Factory method to get a reader for the Logs spreadsheet. Derives log sheet name from the current configured sheet's name. Creates the log sheet if it doesn't exist. If initial_header is provided, ensures it exists on the sheet. """ log_sheet_id = get_config_value("LOG_GSHEET_ID") reader = None if log_sheet_id: try: reader = GoogleSheetReader( worksheet_name=worksheet_name, sheet_id=log_sheet_id ) except Exception as e: logger.error("Failed to open configured LOG_GSHEET_ID %s: %s", log_sheet_id, e) if not reader: try: default_reader = GoogleSheetReader(worksheet_name=None) current_sheet_id = default_reader.sheet.spreadsheet.id reader = GoogleSheetReader( worksheet_name=worksheet_name, sheet_id=current_sheet_id ) except Exception as e: raise RuntimeError(f"Could not open default sheet to get ID: {e}") if reader and initial_header: reader._ensure_header(reader.sheet, initial_header) return reader # ------------------ Internal helpers ------------------ def _get_env(self, key: str) -> str: value = get_config_value(key) if not value: logger.error("Missing required env variable: %s", key) raise RuntimeError(f"Missing required env variable: {key}") return value def _authorize(self): # Use wrapper if provided, otherwise fall back to ADC if self._gcloud_wrapper: logger.debug("Authorizing via GCloudWrapper (account: %s)", self._account_id) return self._gcloud_wrapper.get_sheets_client(self._account_id) logger.debug("Authorizing with Application Default Credentials (ADC)") creds, _ = default() # IMPORTANT: re-scope user ADC credentials if isinstance(creds, BaseCredentials) and hasattr(creds, "with_scopes"): creds = creds.with_scopes(self.SCOPES) return gspread.authorize(creds) def _open_sheet(self): """Open spreadsheet by ID""" logger.debug("Opening spreadsheet by ID: %s", self.sheet_id) spreadsheet = self.client.open_by_key(self.sheet_id) if self.worksheet_name: logger.debug("Opening worksheet: %s", self.worksheet_name) try: return spreadsheet.worksheet(self.worksheet_name) except gspread.WorksheetNotFound: logger.warning("Worksheet not found. Creating: %s", self.worksheet_name) return spreadsheet.add_worksheet( title=self.worksheet_name, rows=1000, cols=26, ) logger.debug("Opening default worksheet (sheet1)") return spreadsheet.sheet1 def _init_temp_dir(self) -> Path: """ Creates a temp directory. Uses fixed path during test automation if configured. """ return get_temp_dir(prefix="gsheet_") # ------------------ Sheet creation helpers ------------------ def _get_or_create_spreadsheet(self, sheet_name: str): try: logger.debug("Opening spreadsheet: %s", sheet_name) return self.client.open(sheet_name) except gspread.SpreadsheetNotFound: logger.warning("Spreadsheet not found. Creating new one: %s", sheet_name) return self.client.create(sheet_name) def _get_or_create_worksheet( self, spreadsheet, worksheet_name: str, rows: int = 1000, cols: int = 26, ): try: logger.debug("Opening worksheet: %s", worksheet_name) return spreadsheet.worksheet(worksheet_name) except gspread.WorksheetNotFound: logger.warning("Worksheet not found. Creating: %s", worksheet_name) return spreadsheet.add_worksheet( title=worksheet_name, rows=rows, cols=cols, ) def _ensure_header(self, worksheet, header: list[str]) -> list[str]: """ Ensures header row exists and contains all required columns. Returns final header order. """ existing = worksheet.row_values(1) # Case 1: Empty sheet if not existing: logger.debug("Sheet empty. Writing header.") worksheet.insert_row(header, index=1) return header # Case 2: Header exists, check for missing columns updated = False final_header = existing.copy() for col in header: if col not in final_header: logger.debug("Adding missing header column: %s", col) final_header.append(col) updated = True if updated: worksheet.update(values=[final_header], range_name="1:1") return final_header # ------------------ Core APIs ------------------ def get_all_values(self) -> list: logger.debug("Fetching all values from sheet") return self.sheet.get_all_values() def get_dataframe(self): logger.debug("Converting sheet to DataFrame") data = self.get_all_values() return pd.DataFrame(data[1:], columns=data[0]) # ------------------ Filtering APIs ------------------ def filter_rows( self, column_name: str = "STATUS", match_value: str = "SELECTED", case_insensitive: bool = True, ) -> list: logger.debug( "Filtering rows | column=%s | value=%s | case_insensitive=%s", column_name, match_value, case_insensitive, ) rows = self.get_all_values() if not rows: logger.warning("Sheet is empty") return [] header = rows[0] try: col_idx = header.index(column_name) except ValueError: logger.error("Column '%s' not found in sheet", column_name) raise RuntimeError(f"Column '{column_name}' not found in sheet") def _match(val: str) -> bool: if case_insensitive: return val.strip().lower() == match_value.lower() return val == match_value filtered = [ row for row in rows[1:] if len(row) > col_idx and _match(row[col_idx]) ] logger.debug( "Filtered rows: %d / %d", len(filtered), max(len(rows) - 1, 0), ) return [header] + filtered def get_filtered_dataframe( self, column_name: str = "STATUS", match_value: str = "SELECTED", case_insensitive: bool = True, ): logger.debug("Creating filtered DataFrame") rows = self.filter_rows( column_name=column_name, match_value=match_value, case_insensitive=case_insensitive, ) if len(rows) <= 1: logger.warning("No matching rows found") return pd.DataFrame(columns=rows[0] if rows else []) return pd.DataFrame(rows[1:], columns=rows[0]) def create_or_update_sheet( self, worksheet_name: str | None = None, header: list[str] = None, values: list[dict] = None, target_row: int | None = None, ): """ Create or update a sheet + worksheet. Ensures headers exist and appends/inserts values. If sheet_name or worksheet_name not provided, uses instance's sheet. values: List of dicts where keys match header names target_row: Optional 1-indexed row to write to. If row has data, inserts below it. If None, appends to end of sheet. """ if not values: logger.warning("No values provided. Nothing to append.") return logger.debug("Using instance sheet by ID: %s", self.sheet_id) spreadsheet = self.client.open_by_key(self.sheet_id) # Use instance's worksheet if not specified if worksheet_name is None: worksheet_name = self.worksheet_name or "Sheet1" logger.debug("Using worksheet name: %s", worksheet_name) worksheet = self._get_or_create_worksheet(spreadsheet, worksheet_name) final_header = self._ensure_header(worksheet, header) # Convert dict rows -> ordered list rows rows_to_write = [] for item in values: row = [item.get(col, "") for col in final_header] rows_to_write.append(row) if target_row is not None: # Check if target row has data try: existing_row = worksheet.row_values(target_row) except Exception: existing_row = [] if existing_row and any(cell.strip() for cell in existing_row): # Row has data, insert new row below it worksheet.insert_rows(rows_to_write, row=target_row + 1, value_input_option="USER_ENTERED") logger.debug(f"Inserted {len(rows_to_write)} rows below row {target_row}") else: # Row is empty, write directly to it cell_range = f"A{target_row}" worksheet.update(values=rows_to_write, range_name=cell_range, value_input_option="USER_ENTERED") logger.debug(f"Updated row {target_row}") else: # No target row, append to end logger.debug( "Appending %d rows to %s / %s", len(rows_to_write), f"ID:{self.sheet_id}", worksheet_name, ) worksheet.append_rows( rows_to_write, value_input_option="USER_ENTERED", ) # ------------------ CSV Export ------------------ def export_csv( self, output_path: str | None = None, column_name: str | None = None, match_value: str = "SELECTED", case_insensitive: bool = True, ) -> Path: """ Export sheet (or filtered rows) to CSV. If output_path is None, file is written inside temp_dir. """ if output_path: output_path = Path(output_path) else: output_path = self.temp_dir / "sheet_export.csv" logger.debug("Exporting CSV to %s", output_path) if column_name: rows = self.filter_rows( column_name=column_name, match_value=match_value, case_insensitive=case_insensitive, ) else: rows = self.get_all_values() output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerows(rows) logger.debug("CSV export completed | rows=%d", len(rows) - 1) return output_path def delete_rows(self, row_indices: list[int]): """ Delete multiple rows by their 1-based index. Deletes from bottom to top to maintain index validity. """ if not row_indices: return # Sort descending so we delete from bottom up sorted_indices = sorted(row_indices, reverse=True) logger.debug(f"Deleting {len(sorted_indices)} rows from sheet...") for idx in sorted_indices: try: self.sheet.delete_rows(idx) logger.debug(f"Deleted row {idx}") except Exception as e: logger.error(f"Failed to delete row {idx}: {e}") # ------------------ CLI entrypoint ------------------ def main(): """ Examples are controlled via env flags: - EXAMPLE_WRITE=true - EXAMPLE_FILTER_EXPORT=true """ try: from dotenv import load_dotenv load_dotenv() except ImportError: pass reader = GoogleSheetReader(worksheet_name=get_config_value("video_library_gsheet_worksheet")) if __name__ == "__main__": main()