Clickup-Notifier / main.py
notionhive-ai's picture
Update main.py
3af2c8f verified
from fastapi import FastAPI
import os, requests
from datetime import datetime
import pytz
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
clickup_api = os.getenv('CLICKUP_API_KEY')
telegram_bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
telegram_chat_id = os.getenv('TELEGRAM_CHAT_ID')
TIMEZONE = 'Asia/Dhaka'
LISTS = [
{"id": '901800830748', "title": "*Developer Task Board:*"},
{"id": '901800835985', "title": "*Designer Task Board:*"}
]
def get_tasks(list_id):
headers = {"Authorization": clickup_api}
url = f"https://api.clickup.com/api/v2/list/{list_id}/task"
resp = requests.get(url, headers=headers)
data = resp.json()
tasks = data.get("tasks", [])
tz = pytz.timezone(TIMEZONE)
today = datetime.now(tz).date()
filtered_tasks = []
for task in tasks:
due_ts = task.get("due_date")
status = task.get("status", {}).get("status", "").lower()
if status in ["done", "complete", "completed"]:
continue
if due_ts:
due_dt = datetime.fromtimestamp(int(due_ts) / 1000, tz).date()
if due_dt >= today:
filtered_tasks.append(task)
return filtered_tasks
def send_to_telegram(message):
url = f"https://api.telegram.org/bot{telegram_bot_token}/sendMessage"
payload = {
"chat_id": telegram_chat_id,
"text": message,
"parse_mode": "Markdown"
}
return requests.post(url, data=payload)
def format_and_send(list_id, board_title):
tasks = get_tasks(list_id)
if not tasks:
return send_to_telegram(f"{board_title}\nNo tasks due today!")
tz = pytz.timezone(TIMEZONE)
task_map = {}
today_str = datetime.now(tz).strftime("%B %d, %Y")
for task in tasks:
task_name = task.get("name", "No title")
task_url = task.get("url", "")
due_ts = task.get("due_date")
status = task.get("status", {}).get("status", "Unknown")
due_str = datetime.fromtimestamp(int(due_ts) / 1000, tz).strftime("%B %d, %Y") if due_ts else "No due time"
assignees = task.get("assignees", []) or [{"name": "Unassigned"}]
for a in assignees:
name = a.get("name") or a.get("username") or a.get("email") or "Unassigned"
task_map.setdefault(name, []).append(f"- [{task_name}]({task_url}) ({due_str}) - *{status}*")
msg = f"{board_title} ({today_str})\n"
for name, items in task_map.items():
msg += f"\n*{name}* is assigned at:\n" + "\n".join(items) + "\n"
return send_to_telegram(msg + "\n")
@app.get("/")
def run_notifier():
results = []
for board in LISTS:
resp = format_and_send(board["id"], board["title"])
results.append(resp.json())
return {"status": "sent", "results": results}