Spaces:
Runtime error
Runtime error
| import asyncio | |
| import os | |
| from concurrent.futures import ThreadPoolExecutor | |
| from datetime import datetime | |
| import gspread | |
| class GoogleSheetClient: | |
| """ | |
| A client for interacting with Google Sheets using gspread. | |
| Methods | |
| ------- | |
| __aenter__(): | |
| Asynchronous context manager entry. | |
| __aexit__(exc_type, exc_val, exc_tb): | |
| Asynchronous context manager exit. | |
| _get_content_delivery_data(): | |
| Asynchronously retrieves all records from the worksheet. | |
| update_worksheet(data: dict): | |
| Asynchronously updates the worksheet with the provided data. | |
| get_undelivered_content(): | |
| Asynchronously retrieves records that have not been marked as delivered. | |
| """ | |
| def __init__(self): | |
| googleSheetClient = gspread.service_account( | |
| filename=os.environ.get("SERVICE_ACCOUNT_CREDENTIAL_FILE_PATH") | |
| ) | |
| workbook = googleSheetClient.open_by_key(os.environ.get("SPREAD_SHEET_ID")) | |
| self.worksheet = workbook.worksheet(os.environ.get("WORKSHEET_NAME")) | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| pass | |
| async def _get_content_delivery_data(self): | |
| loop = asyncio.get_event_loop() | |
| with ThreadPoolExecutor() as pool: | |
| records = await loop.run_in_executor( | |
| pool, lambda: self.worksheet.get_all_records() | |
| ) | |
| return records | |
| async def update_worksheet(self, data: dict): | |
| loop = asyncio.get_event_loop() | |
| with ThreadPoolExecutor() as pool: | |
| list_of_scheduled_content = await self._get_content_delivery_data() | |
| for item in list_of_scheduled_content: | |
| data_time = datetime.strptime( | |
| f"{data['Date']} {data['Time']}", "%m-%d-%Y %H:%M" | |
| ) | |
| item_time = datetime.strptime( | |
| f"{item['Date']} {item['Time']}", "%m-%d-%Y %H:%M" | |
| ) | |
| if item["Message"] == data["Message"] and item_time == data_time: | |
| list_of_scheduled_content[list_of_scheduled_content.index(item)][ | |
| "Delivered" | |
| ] = "TRUE" | |
| values = list(map(lambda x: list(x.values()), list_of_scheduled_content)) | |
| header = ( | |
| await loop.run_in_executor(pool, lambda: self.worksheet.row_values(1)) | |
| if self.worksheet.row_count > 0 | |
| else [] | |
| ) | |
| update_values = [header] | |
| update_values.extend(values) | |
| res = loop.run_in_executor( | |
| pool, lambda: self.worksheet.update(values=update_values, raw=False) | |
| ) | |
| return res | |
| async def get_undelivered_content(self): | |
| records = await self._get_content_delivery_data() | |
| return list(filter(lambda x: x["Delivered"] != "TRUE", records)) | |