ttup / scheduler.py
Phoe2004's picture
Upload 8 files
9b2524c verified
import os
import threading
from datetime import datetime, timezone
from typing import Optional
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.date import DateTrigger
from bot.monitor import fetch_latest_videos
from bot.downloader import download_video
from bot.uploader import upload_to_tiktok
from bot.logger import get_logger
logger = get_logger("scheduler")
MMT = pytz.timezone("Asia/Yangon")
# Poll interval (seconds)
POLL_INTERVAL = 150 # 2.5 minutes
class BotScheduler:
def __init__(self):
self.scheduler = BackgroundScheduler(timezone=MMT)
self.seen_ids: set = set()
self.queue: list[dict] = [] # scheduled jobs metadata
self.lock = threading.Lock()
self.running = False
self.log_buffer: list[str] = []
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# Public control
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def start(self):
if self.running:
return
self.running = True
self.scheduler.start()
# Poll job: run every POLL_INTERVAL seconds
self.scheduler.add_job(
self._poll_channel,
"interval",
seconds=POLL_INTERVAL,
id="poll_channel",
next_run_time=datetime.now(tz=MMT), # run immediately on start
)
self._log("Bot started โœ…")
logger.info("Bot started")
def stop(self):
if not self.running:
return
self.running = False
self.scheduler.shutdown(wait=False)
self.scheduler = BackgroundScheduler(timezone=MMT) # reset
self._log("Bot stopped ๐Ÿ›‘")
logger.info("Bot stopped")
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# Internal
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def _poll_channel(self):
"""RSS แ€€แ€ญแ€ฏ แ€…แ€…แ€บแ€†แ€ฑแ€ธแ€•แ€ผแ€ฎแ€ธ แ€กแ€žแ€…แ€บแ€แ€„แ€บแ€žแ€ฑแ€ฌ video แ€™แ€ปแ€ฌแ€ธแ€€แ€ญแ€ฏ schedule แ€œแ€ฏแ€•แ€บแ€žแ€Šแ€บ"""
self._log("๐Ÿ” Checking RSS feed...")
new_videos = fetch_latest_videos(self.seen_ids)
for video in new_videos:
with self.lock:
self.seen_ids.add(video["video_id"])
pub_mmt: datetime = video["published_mmt"]
now_mmt = datetime.now(tz=MMT)
# แ€กแ€แ€ปแ€ญแ€”แ€บแ€€แ€ปแ€ฑแ€ฌแ€บแ€žแ€ฝแ€ฌแ€ธแ€•แ€ผแ€ฎแ€†แ€ญแ€ฏแ€œแ€ปแ€พแ€„แ€บ แ€แ€ปแ€€แ€บแ€แ€ปแ€„แ€บแ€ธแ€แ€„แ€บ
if pub_mmt <= now_mmt:
self._log(f"โšก Immediate upload (past time): {video['title']}")
self.scheduler.add_job(
self._process_video,
trigger=DateTrigger(run_date=now_mmt),
args=[video],
id=f"upload_{video['video_id']}",
replace_existing=True,
)
else:
# Original publish time แ€กแ€แ€ญแ€ฏแ€„แ€บแ€ธ schedule
self._log(
f"๐Ÿ“… Scheduled: '{video['title']}' @ "
f"{pub_mmt.strftime('%Y-%m-%d %H:%M:%S')} MMT"
)
self.scheduler.add_job(
self._process_video,
trigger=DateTrigger(run_date=pub_mmt),
args=[video],
id=f"upload_{video['video_id']}",
replace_existing=True,
)
with self.lock:
self.queue.append({
"video_id": video["video_id"],
"title": video["title"],
"scheduled_time": pub_mmt,
"status": "pending",
})
def _process_video(self, video: dict):
"""Download + Upload pipeline"""
vid_id = video["video_id"]
self._log(f"โฌ‡๏ธ Downloading: {video['title']}")
file_path = download_video(video)
if not file_path:
self._log(f"โŒ Download failed: {video['title']}")
self._update_status(vid_id, "download_failed")
return
self._log(f"โฌ†๏ธ Uploading to TikTok: {video['title']}")
success = upload_to_tiktok(video, file_path)
if success:
self._log(f"โœ… Upload success: {video['title']}")
self._update_status(vid_id, "uploaded")
else:
self._log(f"โŒ Upload failed: {video['title']}")
self._update_status(vid_id, "upload_failed")
# Clean up downloaded file
try:
os.remove(file_path)
except Exception:
pass
def _update_status(self, vid_id: str, status: str):
with self.lock:
for item in self.queue:
if item["video_id"] == vid_id:
item["status"] = status
break
def _log(self, msg: str):
ts = datetime.now(tz=MMT).strftime("%H:%M:%S")
line = f"[{ts}] {msg}"
logger.info(msg)
with self.lock:
self.log_buffer.append(line)
if len(self.log_buffer) > 200:
self.log_buffer = self.log_buffer[-200:]
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# Status helpers (Gradio UI แ€กแ€แ€ฝแ€€แ€บ)
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def get_queue_display(self) -> str:
with self.lock:
if not self.queue:
return "Queue แ€‘แ€ฒแ€แ€ฝแ€„แ€บ video แ€™แ€›แ€พแ€ญแ€žแ€ฑแ€ธ"
lines = []
for item in sorted(self.queue, key=lambda x: x["scheduled_time"]):
t = item["scheduled_time"].strftime("%Y-%m-%d %H:%M:%S")
lines.append(
f"[{item['status'].upper()}] {t} MMT โ€” {item['title'][:60]}"
)
return "\n".join(lines)
def get_upcoming_display(self) -> str:
now_mmt = datetime.now(tz=MMT)
with self.lock:
upcoming = [
item for item in self.queue
if item["status"] == "pending" and item["scheduled_time"] > now_mmt
]
if not upcoming:
return "Upcoming schedule แ€™แ€›แ€พแ€ญแ€žแ€ฑแ€ธ"
upcoming.sort(key=lambda x: x["scheduled_time"])
lines = []
for item in upcoming[:10]:
t = item["scheduled_time"].strftime("%Y-%m-%d %H:%M:%S")
lines.append(f"๐Ÿ• {t} MMT โ€” {item['title'][:60]}")
return "\n".join(lines)
def get_logs(self) -> str:
with self.lock:
return "\n".join(self.log_buffer[-50:])
def get_status_summary(self) -> str:
with self.lock:
total = len(self.queue)
pending = sum(1 for i in self.queue if i["status"] == "pending")
uploaded = sum(1 for i in self.queue if i["status"] == "uploaded")
failed = sum(1 for i in self.queue if "failed" in i["status"])
state = "๐ŸŸข Running" if self.running else "๐Ÿ”ด Stopped"
now = datetime.now(tz=MMT).strftime("%Y-%m-%d %H:%M:%S")
return (
f"Bot Status: {state}\n"
f"Current Time (MMT): {now}\n"
f"Total Queued: {total} | Pending: {pending} | "
f"Uploaded: {uploaded} | Failed: {failed}\n"
f"Poll Interval: {POLL_INTERVAL}s"
)