JairoDanielMT's picture
Add receipt image registration with category expiry rules
9218640
from __future__ import annotations
import httpx
from ..config import settings
def _api_url(path: str) -> str:
return f"https://api.telegram.org/bot{settings.telegram_bot_token}/{path}"
async def send_message(chat_id: str | int, text: str) -> None:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(_api_url("sendMessage"), json={"chat_id": chat_id, "text": text})
response.raise_for_status()
async def get_file_url(file_id: str) -> str:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(_api_url("getFile"), params={"file_id": file_id})
response.raise_for_status()
file_path = response.json()["result"]["file_path"]
return f"https://api.telegram.org/file/bot{settings.telegram_bot_token}/{file_path}"
async def download_file(file_url: str) -> bytes:
async with httpx.AsyncClient(timeout=120) as client:
response = await client.get(file_url)
response.raise_for_status()
return response.content
def guess_photo_media_type(file_path: str | None = None) -> str:
if file_path and file_path.lower().endswith(".png"):
return "image/png"
return "image/jpeg"
async def set_webhook() -> dict:
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
_api_url("setWebhook"),
json={
"url": f"{settings.app_base_url}/telegram/webhook",
"secret_token": settings.telegram_webhook_secret or None,
},
)
response.raise_for_status()
return response.json()