|
|
import os |
|
|
import csv |
|
|
import tempfile |
|
|
from pathlib import Path |
|
|
from typing import Optional, TYPE_CHECKING |
|
|
import pandas as pd |
|
|
|
|
|
import gspread |
|
|
from google.auth import default |
|
|
from google.auth.credentials import Credentials as BaseCredentials |
|
|
|
|
|
from src.logger_config import logger |
|
|
from src.utils import get_temp_dir |
|
|
from src.config import get_config_value |
|
|
|
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from google_src import GCloudWrapper |
|
|
|
|
|
|
|
|
class GoogleSheetReader: |
|
|
""" |
|
|
Utility class to read & write Google Sheets using ADC (Local) / WIF (CI). |
|
|
|
|
|
""" |
|
|
|
|
|
SCOPES = [ |
|
|
"https://www.googleapis.com/auth/spreadsheets", |
|
|
"https://www.googleapis.com/auth/drive.file", |
|
|
] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
worksheet_name: str | None = None, |
|
|
gcloud_wrapper: Optional["GCloudWrapper"] = None, |
|
|
account_id: str = "default", |
|
|
sheet_id: Optional[str] = None, |
|
|
): |
|
|
""" |
|
|
Initialize GoogleSheetReader. |
|
|
""" |
|
|
logger.debug("Initializing GoogleSheetReader") |
|
|
|
|
|
if sheet_id is not None: |
|
|
self.sheet_id = sheet_id |
|
|
else: |
|
|
self.sheet_id = get_config_value("GSHEET_ID") |
|
|
|
|
|
if not self.sheet_id: |
|
|
logger.error("Must provide GSHEET_ID") |
|
|
raise RuntimeError("Must provide GSHEET_ID") |
|
|
|
|
|
self.worksheet_name = worksheet_name |
|
|
self._gcloud_wrapper = gcloud_wrapper |
|
|
self._account_id = account_id |
|
|
|
|
|
if self.worksheet_name: |
|
|
logger.debug( |
|
|
"Sheet config | id=%s | worksheet=%s | wrapper=%s", |
|
|
self.sheet_id, |
|
|
self.worksheet_name, |
|
|
"yes" if gcloud_wrapper else "no (using ADC)", |
|
|
) |
|
|
|
|
|
self.client = self._authorize() |
|
|
self.sheet = self._open_sheet() |
|
|
|
|
|
|
|
|
self.temp_dir = self._init_temp_dir() |
|
|
logger.debug("Using temp directory: %s", self.temp_dir) |
|
|
|
|
|
@staticmethod |
|
|
def get_log_reader(worksheet_name: str, initial_header: Optional[list[str]] = None) -> "GoogleSheetReader": |
|
|
""" |
|
|
Factory method to get a reader for the Logs spreadsheet. |
|
|
Derives log sheet name from the current configured sheet's name. |
|
|
Creates the log sheet if it doesn't exist. |
|
|
If initial_header is provided, ensures it exists on the sheet. |
|
|
""" |
|
|
log_sheet_id = get_config_value("LOG_GSHEET_ID") |
|
|
reader = None |
|
|
|
|
|
if log_sheet_id: |
|
|
try: |
|
|
reader = GoogleSheetReader( |
|
|
worksheet_name=worksheet_name, |
|
|
sheet_id=log_sheet_id |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error("Failed to open configured LOG_GSHEET_ID %s: %s", log_sheet_id, e) |
|
|
|
|
|
if not reader: |
|
|
try: |
|
|
default_reader = GoogleSheetReader(worksheet_name=None) |
|
|
current_sheet_id = default_reader.sheet.spreadsheet.id |
|
|
reader = GoogleSheetReader( |
|
|
worksheet_name=worksheet_name, |
|
|
sheet_id=current_sheet_id |
|
|
) |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Could not open default sheet to get ID: {e}") |
|
|
|
|
|
if reader and initial_header: |
|
|
reader._ensure_header(reader.sheet, initial_header) |
|
|
|
|
|
return reader |
|
|
|
|
|
|
|
|
|
|
|
def _get_env(self, key: str) -> str: |
|
|
value = get_config_value(key) |
|
|
if not value: |
|
|
logger.error("Missing required env variable: %s", key) |
|
|
raise RuntimeError(f"Missing required env variable: {key}") |
|
|
return value |
|
|
|
|
|
def _authorize(self): |
|
|
|
|
|
if self._gcloud_wrapper: |
|
|
logger.debug("Authorizing via GCloudWrapper (account: %s)", self._account_id) |
|
|
return self._gcloud_wrapper.get_sheets_client(self._account_id) |
|
|
|
|
|
logger.debug("Authorizing with Application Default Credentials (ADC)") |
|
|
creds, _ = default() |
|
|
|
|
|
|
|
|
if isinstance(creds, BaseCredentials) and hasattr(creds, "with_scopes"): |
|
|
creds = creds.with_scopes(self.SCOPES) |
|
|
|
|
|
return gspread.authorize(creds) |
|
|
|
|
|
def _open_sheet(self): |
|
|
"""Open spreadsheet by ID""" |
|
|
logger.debug("Opening spreadsheet by ID: %s", self.sheet_id) |
|
|
spreadsheet = self.client.open_by_key(self.sheet_id) |
|
|
|
|
|
if self.worksheet_name: |
|
|
logger.debug("Opening worksheet: %s", self.worksheet_name) |
|
|
try: |
|
|
return spreadsheet.worksheet(self.worksheet_name) |
|
|
except gspread.WorksheetNotFound: |
|
|
logger.warning("Worksheet not found. Creating: %s", self.worksheet_name) |
|
|
return spreadsheet.add_worksheet( |
|
|
title=self.worksheet_name, |
|
|
rows=1000, |
|
|
cols=26, |
|
|
) |
|
|
|
|
|
logger.debug("Opening default worksheet (sheet1)") |
|
|
return spreadsheet.sheet1 |
|
|
|
|
|
def _init_temp_dir(self) -> Path: |
|
|
""" |
|
|
Creates a temp directory. |
|
|
Uses fixed path during test automation if configured. |
|
|
""" |
|
|
return get_temp_dir(prefix="gsheet_") |
|
|
|
|
|
|
|
|
|
|
|
def _get_or_create_spreadsheet(self, sheet_name: str): |
|
|
try: |
|
|
logger.debug("Opening spreadsheet: %s", sheet_name) |
|
|
return self.client.open(sheet_name) |
|
|
except gspread.SpreadsheetNotFound: |
|
|
logger.warning("Spreadsheet not found. Creating new one: %s", sheet_name) |
|
|
return self.client.create(sheet_name) |
|
|
|
|
|
def _get_or_create_worksheet( |
|
|
self, |
|
|
spreadsheet, |
|
|
worksheet_name: str, |
|
|
rows: int = 1000, |
|
|
cols: int = 26, |
|
|
): |
|
|
try: |
|
|
logger.debug("Opening worksheet: %s", worksheet_name) |
|
|
return spreadsheet.worksheet(worksheet_name) |
|
|
except gspread.WorksheetNotFound: |
|
|
logger.warning("Worksheet not found. Creating: %s", worksheet_name) |
|
|
return spreadsheet.add_worksheet( |
|
|
title=worksheet_name, |
|
|
rows=rows, |
|
|
cols=cols, |
|
|
) |
|
|
|
|
|
def _ensure_header(self, worksheet, header: list[str]) -> list[str]: |
|
|
""" |
|
|
Ensures header row exists and contains all required columns. |
|
|
Returns final header order. |
|
|
""" |
|
|
existing = worksheet.row_values(1) |
|
|
|
|
|
|
|
|
if not existing: |
|
|
logger.debug("Sheet empty. Writing header.") |
|
|
worksheet.insert_row(header, index=1) |
|
|
return header |
|
|
|
|
|
|
|
|
updated = False |
|
|
final_header = existing.copy() |
|
|
|
|
|
for col in header: |
|
|
if col not in final_header: |
|
|
logger.debug("Adding missing header column: %s", col) |
|
|
final_header.append(col) |
|
|
updated = True |
|
|
|
|
|
if updated: |
|
|
worksheet.update(values=[final_header], range_name="1:1") |
|
|
|
|
|
return final_header |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_values(self) -> list: |
|
|
logger.debug("Fetching all values from sheet") |
|
|
return self.sheet.get_all_values() |
|
|
|
|
|
def get_dataframe(self): |
|
|
logger.debug("Converting sheet to DataFrame") |
|
|
|
|
|
data = self.get_all_values() |
|
|
return pd.DataFrame(data[1:], columns=data[0]) |
|
|
|
|
|
|
|
|
|
|
|
def filter_rows( |
|
|
self, |
|
|
column_name: str = "STATUS", |
|
|
match_value: str = "SELECTED", |
|
|
case_insensitive: bool = True, |
|
|
) -> list: |
|
|
logger.debug( |
|
|
"Filtering rows | column=%s | value=%s | case_insensitive=%s", |
|
|
column_name, |
|
|
match_value, |
|
|
case_insensitive, |
|
|
) |
|
|
|
|
|
rows = self.get_all_values() |
|
|
if not rows: |
|
|
logger.warning("Sheet is empty") |
|
|
return [] |
|
|
|
|
|
header = rows[0] |
|
|
|
|
|
try: |
|
|
col_idx = header.index(column_name) |
|
|
except ValueError: |
|
|
logger.error("Column '%s' not found in sheet", column_name) |
|
|
raise RuntimeError(f"Column '{column_name}' not found in sheet") |
|
|
|
|
|
def _match(val: str) -> bool: |
|
|
if case_insensitive: |
|
|
return val.strip().lower() == match_value.lower() |
|
|
return val == match_value |
|
|
|
|
|
filtered = [ |
|
|
row for row in rows[1:] |
|
|
if len(row) > col_idx and _match(row[col_idx]) |
|
|
] |
|
|
|
|
|
logger.debug( |
|
|
"Filtered rows: %d / %d", |
|
|
len(filtered), |
|
|
max(len(rows) - 1, 0), |
|
|
) |
|
|
|
|
|
return [header] + filtered |
|
|
|
|
|
def get_filtered_dataframe( |
|
|
self, |
|
|
column_name: str = "STATUS", |
|
|
match_value: str = "SELECTED", |
|
|
case_insensitive: bool = True, |
|
|
): |
|
|
logger.debug("Creating filtered DataFrame") |
|
|
|
|
|
rows = self.filter_rows( |
|
|
column_name=column_name, |
|
|
match_value=match_value, |
|
|
case_insensitive=case_insensitive, |
|
|
) |
|
|
|
|
|
if len(rows) <= 1: |
|
|
logger.warning("No matching rows found") |
|
|
return pd.DataFrame(columns=rows[0] if rows else []) |
|
|
|
|
|
return pd.DataFrame(rows[1:], columns=rows[0]) |
|
|
|
|
|
def create_or_update_sheet( |
|
|
self, |
|
|
worksheet_name: str | None = None, |
|
|
header: list[str] = None, |
|
|
values: list[dict] = None, |
|
|
target_row: int | None = None, |
|
|
): |
|
|
""" |
|
|
Create or update a sheet + worksheet. |
|
|
Ensures headers exist and appends/inserts values. |
|
|
|
|
|
If sheet_name or worksheet_name not provided, uses instance's sheet. |
|
|
|
|
|
values: List of dicts where keys match header names |
|
|
target_row: Optional 1-indexed row to write to. If row has data, inserts below it. |
|
|
If None, appends to end of sheet. |
|
|
""" |
|
|
|
|
|
if not values: |
|
|
logger.warning("No values provided. Nothing to append.") |
|
|
return |
|
|
|
|
|
logger.debug("Using instance sheet by ID: %s", self.sheet_id) |
|
|
spreadsheet = self.client.open_by_key(self.sheet_id) |
|
|
|
|
|
|
|
|
if worksheet_name is None: |
|
|
worksheet_name = self.worksheet_name or "Sheet1" |
|
|
logger.debug("Using worksheet name: %s", worksheet_name) |
|
|
|
|
|
worksheet = self._get_or_create_worksheet(spreadsheet, worksheet_name) |
|
|
|
|
|
final_header = self._ensure_header(worksheet, header) |
|
|
|
|
|
|
|
|
rows_to_write = [] |
|
|
for item in values: |
|
|
row = [item.get(col, "") for col in final_header] |
|
|
rows_to_write.append(row) |
|
|
|
|
|
if target_row is not None: |
|
|
|
|
|
try: |
|
|
existing_row = worksheet.row_values(target_row) |
|
|
except Exception: |
|
|
existing_row = [] |
|
|
|
|
|
if existing_row and any(cell.strip() for cell in existing_row): |
|
|
|
|
|
worksheet.insert_rows(rows_to_write, row=target_row + 1, value_input_option="USER_ENTERED") |
|
|
logger.debug(f"Inserted {len(rows_to_write)} rows below row {target_row}") |
|
|
else: |
|
|
|
|
|
cell_range = f"A{target_row}" |
|
|
worksheet.update(values=rows_to_write, range_name=cell_range, value_input_option="USER_ENTERED") |
|
|
logger.debug(f"Updated row {target_row}") |
|
|
else: |
|
|
|
|
|
logger.debug( |
|
|
"Appending %d rows to %s / %s", |
|
|
len(rows_to_write), |
|
|
f"ID:{self.sheet_id}", |
|
|
worksheet_name, |
|
|
) |
|
|
worksheet.append_rows( |
|
|
rows_to_write, |
|
|
value_input_option="USER_ENTERED", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def export_csv( |
|
|
self, |
|
|
output_path: str | None = None, |
|
|
column_name: str | None = None, |
|
|
match_value: str = "SELECTED", |
|
|
case_insensitive: bool = True, |
|
|
) -> Path: |
|
|
""" |
|
|
Export sheet (or filtered rows) to CSV. |
|
|
If output_path is None, file is written inside temp_dir. |
|
|
""" |
|
|
if output_path: |
|
|
output_path = Path(output_path) |
|
|
else: |
|
|
output_path = self.temp_dir / "sheet_export.csv" |
|
|
|
|
|
logger.debug("Exporting CSV to %s", output_path) |
|
|
|
|
|
if column_name: |
|
|
rows = self.filter_rows( |
|
|
column_name=column_name, |
|
|
match_value=match_value, |
|
|
case_insensitive=case_insensitive, |
|
|
) |
|
|
else: |
|
|
rows = self.get_all_values() |
|
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with output_path.open("w", newline="", encoding="utf-8") as f: |
|
|
writer = csv.writer(f) |
|
|
writer.writerows(rows) |
|
|
|
|
|
logger.debug("CSV export completed | rows=%d", len(rows) - 1) |
|
|
return output_path |
|
|
|
|
|
|
|
|
def delete_rows(self, row_indices: list[int]): |
|
|
""" |
|
|
Delete multiple rows by their 1-based index. |
|
|
Deletes from bottom to top to maintain index validity. |
|
|
""" |
|
|
if not row_indices: |
|
|
return |
|
|
|
|
|
|
|
|
sorted_indices = sorted(row_indices, reverse=True) |
|
|
|
|
|
logger.debug(f"Deleting {len(sorted_indices)} rows from sheet...") |
|
|
|
|
|
for idx in sorted_indices: |
|
|
try: |
|
|
self.sheet.delete_rows(idx) |
|
|
logger.debug(f"Deleted row {idx}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to delete row {idx}: {e}") |
|
|
|
|
|
|
|
|
def main(): |
|
|
""" |
|
|
Examples are controlled via env flags: |
|
|
- EXAMPLE_WRITE=true |
|
|
- EXAMPLE_FILTER_EXPORT=true |
|
|
""" |
|
|
|
|
|
try: |
|
|
from dotenv import load_dotenv |
|
|
load_dotenv() |
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
reader = GoogleSheetReader(worksheet_name=get_config_value("video_library_gsheet_worksheet")) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |