Spaces:
Paused
Paused
| import random | |
| import os | |
| import sys | |
| from typing import List | |
| import datetime | |
| import asyncio | |
| from httpx import AsyncClient, Response | |
| from .message_validations import ResponseToMessage | |
| from .db import db_read_users, Reminder, User | |
| from .constants import Constants | |
| class Events: | |
| TOKEN = os.environ.get("TELEGRAM_TOKEN") | |
| TELEGRAM_SEND_MESSAGE_URL = f"https://api.telegram.org/bot{TOKEN}/sendMessage" | |
| TELEGRAM_SET_WEBHOOK_URL = f"https://api.telegram.org/bot{TOKEN}/setWebhook" | |
| TELEGRAM_SEND_DOCUMENT_URL = f"https://api.telegram.org/bot{TOKEN}/sendDocument" | |
| PORT = 8000 | |
| HOST_URL = None | |
| SELF_SIGNED = False | |
| async def main_event(cls) -> None: | |
| """ | |
| Main Event Loop | |
| Runs in a while loop, Triggers Events.send_user_hourly_memories at every hour. | |
| """ | |
| while True: | |
| await asyncio.sleep(cls.get_time_until_next_hour()) | |
| async with AsyncClient() as client: | |
| endpoint = f"http://0.0.0.0:{cls.PORT}/trigger_send_user_hourly_memories/{Events.TOKEN}" | |
| response = await client.post(url=endpoint) | |
| endpoint = f"http://0.0.0.0:{cls.PORT}/trigger_archive_db/{Events.TOKEN}" | |
| response = await client.post(url=endpoint) | |
| def get_time_until_next_hour(cls) -> float: | |
| # Ref: https://stackoverflow.com/a/52808375/15282482 | |
| delta = datetime.timedelta(hours=1) | |
| now = datetime.datetime.now() | |
| next_hour = (now + delta).replace(microsecond=0, second=0, minute=0) | |
| return (next_hour - now).total_seconds() | |
| def send_user_hourly_memories( | |
| cls, | |
| user: User, | |
| hour: int, | |
| ) -> None: | |
| """ | |
| Sends memories to user if the current_hour is in his schedule. | |
| """ | |
| hour = (hour + user.gmt) % 24 | |
| if user.scheduled_hours == "": | |
| return | |
| scheduled_hours = user.scheduled_hours.split(",") | |
| number_of_messages_at_this_hour = 0 | |
| for str_hour in scheduled_hours: | |
| if ( | |
| int(str_hour) > hour | |
| ): # Scheduled_hours are sorted, next items will be > hour as well. | |
| break | |
| if int(str_hour) == hour: | |
| number_of_messages_at_this_hour += 1 | |
| number_of_messages_at_this_hour = min( | |
| len(user.reminders), number_of_messages_at_this_hour | |
| ) | |
| selected_reminders = random.sample( | |
| user.reminders, number_of_messages_at_this_hour | |
| ) | |
| for reminder in selected_reminders: # Send the memory in background | |
| asyncio.create_task( | |
| cls.send_a_message_to_user(user.telegram_chat_id, reminder.reminder) | |
| ) | |
| now = datetime.datetime.now() | |
| print( | |
| f"Created task to, {user.name}, {reminder.reminder}, hour: {hour}, gmt: {user.gmt}, now: {now}" | |
| ) | |
| async def send_message_list_at_background( | |
| cls, telegram_chat_id: int, message_list: List[str] | |
| ) -> bool: | |
| for message in message_list: | |
| print(f"sending the message: {message}, to chat: {telegram_chat_id} ") | |
| await Events.send_a_message_to_user( | |
| telegram_id=telegram_chat_id, message=message | |
| ) | |
| return True | |
| async def send_a_message_to_user( | |
| cls, | |
| telegram_id: int, | |
| message: str, | |
| retry_count: int = 3, | |
| sleep_time: float = 0.1, | |
| ) -> bool: | |
| message = ResponseToMessage( | |
| **{ | |
| "text": message, | |
| "chat_id": telegram_id, | |
| } | |
| ) | |
| await asyncio.sleep(sleep_time) | |
| for retry in range(retry_count): | |
| print(f"Sending the message in send_a_message_to_user, count {retry}") | |
| # Avoid too many requests error from Telegram | |
| response = await cls.request(cls.TELEGRAM_SEND_MESSAGE_URL, message.dict()) | |
| if response.status_code == 200: | |
| return True | |
| elif response.status_code == 429: | |
| retry_after = int(response.json()["parameters"]["retry_after"]) | |
| print(f"Retry After: {retry_after}, message: {message}") | |
| await asyncio.sleep(retry_after) | |
| else: | |
| print( | |
| f"Unhandled response code: {response.status_code}, response: {response.json()}" | |
| ) | |
| return False | |
| async def broadcast_message(cls, message: str) -> None: | |
| users = db_read_users(limit=100000, only_active_users=False) | |
| await asyncio.gather( | |
| *( | |
| Events.send_a_message_to_user( | |
| user.telegram_chat_id, | |
| message, | |
| ) | |
| for user in users | |
| ) | |
| ) | |
| async def request(cls, url: str, payload: dict, debug: bool = True) -> Response: | |
| async with AsyncClient(timeout=30 * 60) as client: | |
| request = await client.post(url, json=payload) | |
| if debug: | |
| print(request.json()) | |
| return request | |
| async def set_telegram_webhook_url(cls) -> bool: | |
| print(f"webhook_url:{cls.HOST_URL}") | |
| if cls.SELF_SIGNED: | |
| payload = { | |
| "url": f"{cls.HOST_URL}/webhook/{cls.TOKEN}", | |
| "certificate": open(os.environ.get("PEM_FILE"), "rb"), | |
| } | |
| else: | |
| payload = {"url": f"{cls.HOST_URL}/webhook/{cls.TOKEN}"} | |
| req = await cls.request(cls.TELEGRAM_SET_WEBHOOK_URL, payload) | |
| return req.status_code == 200 | |
| def archive_db(cls) -> bool: | |
| command = f'curl -v -F "chat_id={Constants.BROADCAST_CHAT_ID}" -F document=@database.db {cls.TELEGRAM_SEND_DOCUMENT_URL}' | |
| os.system(command) | |
| async def get_public_ip(cls): | |
| # Reference: https://pytutorial.com/python-get-public-ip | |
| endpoint = "https://ipinfo.io/json" | |
| async with AsyncClient() as client: | |
| response = await client.get(endpoint) | |
| if response.status_code != 200: | |
| sys.exit("Could not get the public ip, exiting!") | |
| data = response.json() | |
| return data["ip"] | |