mandala-for-us / src /utils /_google_sheet_client.py
kanha-upadhyay's picture
add documentation comments
adb221d
import asyncio
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import gspread
class GoogleSheetClient:
"""
A client for interacting with Google Sheets using gspread.
Methods
-------
__aenter__():
Asynchronous context manager entry.
__aexit__(exc_type, exc_val, exc_tb):
Asynchronous context manager exit.
_get_content_delivery_data():
Asynchronously retrieves all records from the worksheet.
update_worksheet(data: dict):
Asynchronously updates the worksheet with the provided data.
get_undelivered_content():
Asynchronously retrieves records that have not been marked as delivered.
"""
def __init__(self):
googleSheetClient = gspread.service_account(
filename=os.environ.get("SERVICE_ACCOUNT_CREDENTIAL_FILE_PATH")
)
workbook = googleSheetClient.open_by_key(os.environ.get("SPREAD_SHEET_ID"))
self.worksheet = workbook.worksheet(os.environ.get("WORKSHEET_NAME"))
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
async def _get_content_delivery_data(self):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
records = await loop.run_in_executor(
pool, lambda: self.worksheet.get_all_records()
)
return records
async def update_worksheet(self, data: dict):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
list_of_scheduled_content = await self._get_content_delivery_data()
for item in list_of_scheduled_content:
data_time = datetime.strptime(
f"{data['Date']} {data['Time']}", "%m-%d-%Y %H:%M"
)
item_time = datetime.strptime(
f"{item['Date']} {item['Time']}", "%m-%d-%Y %H:%M"
)
if item["Message"] == data["Message"] and item_time == data_time:
list_of_scheduled_content[list_of_scheduled_content.index(item)][
"Delivered"
] = "TRUE"
values = list(map(lambda x: list(x.values()), list_of_scheduled_content))
header = (
await loop.run_in_executor(pool, lambda: self.worksheet.row_values(1))
if self.worksheet.row_count > 0
else []
)
update_values = [header]
update_values.extend(values)
res = loop.run_in_executor(
pool, lambda: self.worksheet.update(values=update_values, raw=False)
)
return res
async def get_undelivered_content(self):
records = await self._get_content_delivery_data()
return list(filter(lambda x: x["Delivered"] != "TRUE", records))