File size: 14,440 Bytes
34471b9 033ce79 c20adb3 34471b9 f20025d 05104b9 3bba70c 34471b9 033ce79 34471b9 033ce79 34471b9 033ce79 9c42c43 033ce79 71d3806 033ce79 503d4ac 34471b9 9c42c43 c2fc9ca 34471b9 befd6e1 033ce79 34471b9 c2fc9ca 34471b9 503d4ac 34471b9 9c42c43 f50bedb 9c42c43 f50bedb 9c42c43 3688015 f50bedb 3688015 8348f26 f50bedb 8348f26 3688015 8348f26 3688015 f50bedb 9c42c43 34471b9 3bba70c 34471b9 033ce79 503d4ac 033ce79 503d4ac 34471b9 c2fc9ca 503d4ac c2fc9ca 34471b9 503d4ac f67c2aa 34471b9 503d4ac 34471b9 fce9bf7 34471b9 503d4ac 34471b9 503d4ac 34471b9 503d4ac 34471b9 503d4ac 34471b9 503d4ac 34471b9 503d4ac 34471b9 f67c2aa 34471b9 f67c2aa 34471b9 f67c2aa 34471b9 503d4ac c2fc9ca 34471b9 503d4ac 34471b9 f67c2aa 34471b9 f67c2aa 34471b9 f67c2aa 503d4ac f67c2aa 503d4ac f67c2aa 503d4ac f67c2aa c2fc9ca f67c2aa 34471b9 503d4ac 34471b9 503d4ac 34471b9 013de99 503d4ac 013de99 34471b9 5f00d5a 34471b9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 |
import os
import csv
import tempfile
from pathlib import Path
from typing import Optional, TYPE_CHECKING
import pandas as pd
import gspread
from google.auth import default
from google.auth.credentials import Credentials as BaseCredentials
from src.logger_config import logger
from src.utils import get_temp_dir
from src.config import get_config_value
if TYPE_CHECKING:
from google_src import GCloudWrapper
class GoogleSheetReader:
"""
Utility class to read & write Google Sheets using ADC (Local) / WIF (CI).
"""
SCOPES = [
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive.file",
]
def __init__(
self,
worksheet_name: str | None = None,
gcloud_wrapper: Optional["GCloudWrapper"] = None,
account_id: str = "default",
sheet_id: Optional[str] = None,
):
"""
Initialize GoogleSheetReader.
"""
logger.debug("Initializing GoogleSheetReader")
if sheet_id is not None:
self.sheet_id = sheet_id
else:
self.sheet_id = get_config_value("GSHEET_ID")
if not self.sheet_id:
logger.error("Must provide GSHEET_ID")
raise RuntimeError("Must provide GSHEET_ID")
self.worksheet_name = worksheet_name
self._gcloud_wrapper = gcloud_wrapper
self._account_id = account_id
if self.worksheet_name:
logger.debug(
"Sheet config | id=%s | worksheet=%s | wrapper=%s",
self.sheet_id,
self.worksheet_name,
"yes" if gcloud_wrapper else "no (using ADC)",
)
self.client = self._authorize()
self.sheet = self._open_sheet()
# -------- Temp directory handling --------
self.temp_dir = self._init_temp_dir()
logger.debug("Using temp directory: %s", self.temp_dir)
@staticmethod
def get_log_reader(worksheet_name: str, initial_header: Optional[list[str]] = None) -> "GoogleSheetReader":
"""
Factory method to get a reader for the Logs spreadsheet.
Derives log sheet name from the current configured sheet's name.
Creates the log sheet if it doesn't exist.
If initial_header is provided, ensures it exists on the sheet.
"""
log_sheet_id = get_config_value("LOG_GSHEET_ID")
reader = None
if log_sheet_id:
try:
reader = GoogleSheetReader(
worksheet_name=worksheet_name,
sheet_id=log_sheet_id
)
except Exception as e:
logger.error("Failed to open configured LOG_GSHEET_ID %s: %s", log_sheet_id, e)
if not reader:
try:
default_reader = GoogleSheetReader(worksheet_name=None)
current_sheet_id = default_reader.sheet.spreadsheet.id
reader = GoogleSheetReader(
worksheet_name=worksheet_name,
sheet_id=current_sheet_id
)
except Exception as e:
raise RuntimeError(f"Could not open default sheet to get ID: {e}")
if reader and initial_header:
reader._ensure_header(reader.sheet, initial_header)
return reader
# ------------------ Internal helpers ------------------
def _get_env(self, key: str) -> str:
value = get_config_value(key)
if not value:
logger.error("Missing required env variable: %s", key)
raise RuntimeError(f"Missing required env variable: {key}")
return value
def _authorize(self):
# Use wrapper if provided, otherwise fall back to ADC
if self._gcloud_wrapper:
logger.debug("Authorizing via GCloudWrapper (account: %s)", self._account_id)
return self._gcloud_wrapper.get_sheets_client(self._account_id)
logger.debug("Authorizing with Application Default Credentials (ADC)")
creds, _ = default()
# IMPORTANT: re-scope user ADC credentials
if isinstance(creds, BaseCredentials) and hasattr(creds, "with_scopes"):
creds = creds.with_scopes(self.SCOPES)
return gspread.authorize(creds)
def _open_sheet(self):
"""Open spreadsheet by ID"""
logger.debug("Opening spreadsheet by ID: %s", self.sheet_id)
spreadsheet = self.client.open_by_key(self.sheet_id)
if self.worksheet_name:
logger.debug("Opening worksheet: %s", self.worksheet_name)
try:
return spreadsheet.worksheet(self.worksheet_name)
except gspread.WorksheetNotFound:
logger.warning("Worksheet not found. Creating: %s", self.worksheet_name)
return spreadsheet.add_worksheet(
title=self.worksheet_name,
rows=1000,
cols=26,
)
logger.debug("Opening default worksheet (sheet1)")
return spreadsheet.sheet1
def _init_temp_dir(self) -> Path:
"""
Creates a temp directory.
Uses fixed path during test automation if configured.
"""
return get_temp_dir(prefix="gsheet_")
# ------------------ Sheet creation helpers ------------------
def _get_or_create_spreadsheet(self, sheet_name: str):
try:
logger.debug("Opening spreadsheet: %s", sheet_name)
return self.client.open(sheet_name)
except gspread.SpreadsheetNotFound:
logger.warning("Spreadsheet not found. Creating new one: %s", sheet_name)
return self.client.create(sheet_name)
def _get_or_create_worksheet(
self,
spreadsheet,
worksheet_name: str,
rows: int = 1000,
cols: int = 26,
):
try:
logger.debug("Opening worksheet: %s", worksheet_name)
return spreadsheet.worksheet(worksheet_name)
except gspread.WorksheetNotFound:
logger.warning("Worksheet not found. Creating: %s", worksheet_name)
return spreadsheet.add_worksheet(
title=worksheet_name,
rows=rows,
cols=cols,
)
def _ensure_header(self, worksheet, header: list[str]) -> list[str]:
"""
Ensures header row exists and contains all required columns.
Returns final header order.
"""
existing = worksheet.row_values(1)
# Case 1: Empty sheet
if not existing:
logger.debug("Sheet empty. Writing header.")
worksheet.insert_row(header, index=1)
return header
# Case 2: Header exists, check for missing columns
updated = False
final_header = existing.copy()
for col in header:
if col not in final_header:
logger.debug("Adding missing header column: %s", col)
final_header.append(col)
updated = True
if updated:
worksheet.update(values=[final_header], range_name="1:1")
return final_header
# ------------------ Core APIs ------------------
def get_all_values(self) -> list:
logger.debug("Fetching all values from sheet")
return self.sheet.get_all_values()
def get_dataframe(self):
logger.debug("Converting sheet to DataFrame")
data = self.get_all_values()
return pd.DataFrame(data[1:], columns=data[0])
# ------------------ Filtering APIs ------------------
def filter_rows(
self,
column_name: str = "STATUS",
match_value: str = "SELECTED",
case_insensitive: bool = True,
) -> list:
logger.debug(
"Filtering rows | column=%s | value=%s | case_insensitive=%s",
column_name,
match_value,
case_insensitive,
)
rows = self.get_all_values()
if not rows:
logger.warning("Sheet is empty")
return []
header = rows[0]
try:
col_idx = header.index(column_name)
except ValueError:
logger.error("Column '%s' not found in sheet", column_name)
raise RuntimeError(f"Column '{column_name}' not found in sheet")
def _match(val: str) -> bool:
if case_insensitive:
return val.strip().lower() == match_value.lower()
return val == match_value
filtered = [
row for row in rows[1:]
if len(row) > col_idx and _match(row[col_idx])
]
logger.debug(
"Filtered rows: %d / %d",
len(filtered),
max(len(rows) - 1, 0),
)
return [header] + filtered
def get_filtered_dataframe(
self,
column_name: str = "STATUS",
match_value: str = "SELECTED",
case_insensitive: bool = True,
):
logger.debug("Creating filtered DataFrame")
rows = self.filter_rows(
column_name=column_name,
match_value=match_value,
case_insensitive=case_insensitive,
)
if len(rows) <= 1:
logger.warning("No matching rows found")
return pd.DataFrame(columns=rows[0] if rows else [])
return pd.DataFrame(rows[1:], columns=rows[0])
def create_or_update_sheet(
self,
worksheet_name: str | None = None,
header: list[str] = None,
values: list[dict] = None,
target_row: int | None = None,
):
"""
Create or update a sheet + worksheet.
Ensures headers exist and appends/inserts values.
If sheet_name or worksheet_name not provided, uses instance's sheet.
values: List of dicts where keys match header names
target_row: Optional 1-indexed row to write to. If row has data, inserts below it.
If None, appends to end of sheet.
"""
if not values:
logger.warning("No values provided. Nothing to append.")
return
logger.debug("Using instance sheet by ID: %s", self.sheet_id)
spreadsheet = self.client.open_by_key(self.sheet_id)
# Use instance's worksheet if not specified
if worksheet_name is None:
worksheet_name = self.worksheet_name or "Sheet1"
logger.debug("Using worksheet name: %s", worksheet_name)
worksheet = self._get_or_create_worksheet(spreadsheet, worksheet_name)
final_header = self._ensure_header(worksheet, header)
# Convert dict rows -> ordered list rows
rows_to_write = []
for item in values:
row = [item.get(col, "") for col in final_header]
rows_to_write.append(row)
if target_row is not None:
# Check if target row has data
try:
existing_row = worksheet.row_values(target_row)
except Exception:
existing_row = []
if existing_row and any(cell.strip() for cell in existing_row):
# Row has data, insert new row below it
worksheet.insert_rows(rows_to_write, row=target_row + 1, value_input_option="USER_ENTERED")
logger.debug(f"Inserted {len(rows_to_write)} rows below row {target_row}")
else:
# Row is empty, write directly to it
cell_range = f"A{target_row}"
worksheet.update(values=rows_to_write, range_name=cell_range, value_input_option="USER_ENTERED")
logger.debug(f"Updated row {target_row}")
else:
# No target row, append to end
logger.debug(
"Appending %d rows to %s / %s",
len(rows_to_write),
f"ID:{self.sheet_id}",
worksheet_name,
)
worksheet.append_rows(
rows_to_write,
value_input_option="USER_ENTERED",
)
# ------------------ CSV Export ------------------
def export_csv(
self,
output_path: str | None = None,
column_name: str | None = None,
match_value: str = "SELECTED",
case_insensitive: bool = True,
) -> Path:
"""
Export sheet (or filtered rows) to CSV.
If output_path is None, file is written inside temp_dir.
"""
if output_path:
output_path = Path(output_path)
else:
output_path = self.temp_dir / "sheet_export.csv"
logger.debug("Exporting CSV to %s", output_path)
if column_name:
rows = self.filter_rows(
column_name=column_name,
match_value=match_value,
case_insensitive=case_insensitive,
)
else:
rows = self.get_all_values()
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerows(rows)
logger.debug("CSV export completed | rows=%d", len(rows) - 1)
return output_path
def delete_rows(self, row_indices: list[int]):
"""
Delete multiple rows by their 1-based index.
Deletes from bottom to top to maintain index validity.
"""
if not row_indices:
return
# Sort descending so we delete from bottom up
sorted_indices = sorted(row_indices, reverse=True)
logger.debug(f"Deleting {len(sorted_indices)} rows from sheet...")
for idx in sorted_indices:
try:
self.sheet.delete_rows(idx)
logger.debug(f"Deleted row {idx}")
except Exception as e:
logger.error(f"Failed to delete row {idx}: {e}")
# ------------------ CLI entrypoint ------------------
def main():
"""
Examples are controlled via env flags:
- EXAMPLE_WRITE=true
- EXAMPLE_FILTER_EXPORT=true
"""
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
reader = GoogleSheetReader(worksheet_name=get_config_value("video_library_gsheet_worksheet"))
if __name__ == "__main__":
main() |