import asyncio import os from concurrent.futures import ThreadPoolExecutor from datetime import datetime import gspread class GoogleSheetClient: """ A client for interacting with Google Sheets using gspread. Methods ------- __aenter__(): Asynchronous context manager entry. __aexit__(exc_type, exc_val, exc_tb): Asynchronous context manager exit. _get_content_delivery_data(): Asynchronously retrieves all records from the worksheet. update_worksheet(data: dict): Asynchronously updates the worksheet with the provided data. get_undelivered_content(): Asynchronously retrieves records that have not been marked as delivered. """ def __init__(self): googleSheetClient = gspread.service_account( filename=os.environ.get("SERVICE_ACCOUNT_CREDENTIAL_FILE_PATH") ) workbook = googleSheetClient.open_by_key(os.environ.get("SPREAD_SHEET_ID")) self.worksheet = workbook.worksheet(os.environ.get("WORKSHEET_NAME")) async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): pass async def _get_content_delivery_data(self): loop = asyncio.get_event_loop() with ThreadPoolExecutor() as pool: records = await loop.run_in_executor( pool, lambda: self.worksheet.get_all_records() ) return records async def update_worksheet(self, data: dict): loop = asyncio.get_event_loop() with ThreadPoolExecutor() as pool: list_of_scheduled_content = await self._get_content_delivery_data() for item in list_of_scheduled_content: data_time = datetime.strptime( f"{data['Date']} {data['Time']}", "%m-%d-%Y %H:%M" ) item_time = datetime.strptime( f"{item['Date']} {item['Time']}", "%m-%d-%Y %H:%M" ) if item["Message"] == data["Message"] and item_time == data_time: list_of_scheduled_content[list_of_scheduled_content.index(item)][ "Delivered" ] = "TRUE" values = list(map(lambda x: list(x.values()), list_of_scheduled_content)) header = ( await loop.run_in_executor(pool, lambda: self.worksheet.row_values(1)) if self.worksheet.row_count > 0 else [] ) update_values = [header] update_values.extend(values) res = loop.run_in_executor( pool, lambda: self.worksheet.update(values=update_values, raw=False) ) return res async def get_undelivered_content(self): records = await self._get_content_delivery_data() return list(filter(lambda x: x["Delivered"] != "TRUE", records))