from fastapi import FastAPI import os, requests from datetime import datetime import pytz from dotenv import load_dotenv load_dotenv() app = FastAPI() clickup_api = os.getenv('CLICKUP_API_KEY') telegram_bot_token = os.getenv('TELEGRAM_BOT_TOKEN') telegram_chat_id = os.getenv('TELEGRAM_CHAT_ID') TIMEZONE = 'Asia/Dhaka' LISTS = [ {"id": '901800830748', "title": "*Developer Task Board:*"}, {"id": '901800835985', "title": "*Designer Task Board:*"} ] def get_tasks(list_id): headers = {"Authorization": clickup_api} url = f"https://api.clickup.com/api/v2/list/{list_id}/task" resp = requests.get(url, headers=headers) data = resp.json() tasks = data.get("tasks", []) tz = pytz.timezone(TIMEZONE) today = datetime.now(tz).date() filtered_tasks = [] for task in tasks: due_ts = task.get("due_date") status = task.get("status", {}).get("status", "").lower() if status in ["done", "complete", "completed"]: continue if due_ts: due_dt = datetime.fromtimestamp(int(due_ts) / 1000, tz).date() if due_dt >= today: filtered_tasks.append(task) return filtered_tasks def send_to_telegram(message): url = f"https://api.telegram.org/bot{telegram_bot_token}/sendMessage" payload = { "chat_id": telegram_chat_id, "text": message, "parse_mode": "Markdown" } return requests.post(url, data=payload) def format_and_send(list_id, board_title): tasks = get_tasks(list_id) if not tasks: return send_to_telegram(f"{board_title}\nNo tasks due today!") tz = pytz.timezone(TIMEZONE) task_map = {} today_str = datetime.now(tz).strftime("%B %d, %Y") for task in tasks: task_name = task.get("name", "No title") task_url = task.get("url", "") due_ts = task.get("due_date") status = task.get("status", {}).get("status", "Unknown") due_str = datetime.fromtimestamp(int(due_ts) / 1000, tz).strftime("%B %d, %Y") if due_ts else "No due time" assignees = task.get("assignees", []) or [{"name": "Unassigned"}] for a in assignees: name = a.get("name") or a.get("username") or a.get("email") or "Unassigned" task_map.setdefault(name, []).append(f"- [{task_name}]({task_url}) ({due_str}) - *{status}*") msg = f"{board_title} ({today_str})\n" for name, items in task_map.items(): msg += f"\n*{name}* is assigned at:\n" + "\n".join(items) + "\n" return send_to_telegram(msg + "\n") @app.get("/") def run_notifier(): results = [] for board in LISTS: resp = format_and_send(board["id"], board["title"]) results.append(resp.json()) return {"status": "sent", "results": results}