import os import threading from datetime import datetime, timezone from typing import Optional import pytz from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.date import DateTrigger from bot.monitor import fetch_latest_videos from bot.downloader import download_video from bot.uploader import upload_to_tiktok from bot.logger import get_logger logger = get_logger("scheduler") MMT = pytz.timezone("Asia/Yangon") # Poll interval (seconds) POLL_INTERVAL = 150 # 2.5 minutes class BotScheduler: def __init__(self): self.scheduler = BackgroundScheduler(timezone=MMT) self.seen_ids: set = set() self.queue: list[dict] = [] # scheduled jobs metadata self.lock = threading.Lock() self.running = False self.log_buffer: list[str] = [] # ────────────────────────────────────────────── # Public control # ────────────────────────────────────────────── def start(self): if self.running: return self.running = True self.scheduler.start() # Poll job: run every POLL_INTERVAL seconds self.scheduler.add_job( self._poll_channel, "interval", seconds=POLL_INTERVAL, id="poll_channel", next_run_time=datetime.now(tz=MMT), # run immediately on start ) self._log("Bot started ✅") logger.info("Bot started") def stop(self): if not self.running: return self.running = False self.scheduler.shutdown(wait=False) self.scheduler = BackgroundScheduler(timezone=MMT) # reset self._log("Bot stopped 🛑") logger.info("Bot stopped") # ────────────────────────────────────────────── # Internal # ────────────────────────────────────────────── def _poll_channel(self): """RSS ကို စစ်ဆေးပြီး အသစ်ဝင်သော video များကို schedule လုပ်သည်""" self._log("🔍 Checking RSS feed...") new_videos = fetch_latest_videos(self.seen_ids) for video in new_videos: with self.lock: self.seen_ids.add(video["video_id"]) pub_mmt: datetime = video["published_mmt"] now_mmt = datetime.now(tz=MMT) # အချိန်ကျော်သွားပြီဆိုလျှင် ချက်ချင်းတင် if pub_mmt <= now_mmt: self._log(f"⚡ Immediate upload (past time): {video['title']}") self.scheduler.add_job( self._process_video, trigger=DateTrigger(run_date=now_mmt), args=[video], id=f"upload_{video['video_id']}", replace_existing=True, ) else: # Original publish time အတိုင်း schedule self._log( f"📅 Scheduled: '{video['title']}' @ " f"{pub_mmt.strftime('%Y-%m-%d %H:%M:%S')} MMT" ) self.scheduler.add_job( self._process_video, trigger=DateTrigger(run_date=pub_mmt), args=[video], id=f"upload_{video['video_id']}", replace_existing=True, ) with self.lock: self.queue.append({ "video_id": video["video_id"], "title": video["title"], "scheduled_time": pub_mmt, "status": "pending", }) def _process_video(self, video: dict): """Download + Upload pipeline""" vid_id = video["video_id"] self._log(f"⬇️ Downloading: {video['title']}") file_path = download_video(video) if not file_path: self._log(f"❌ Download failed: {video['title']}") self._update_status(vid_id, "download_failed") return self._log(f"⬆️ Uploading to TikTok: {video['title']}") success = upload_to_tiktok(video, file_path) if success: self._log(f"✅ Upload success: {video['title']}") self._update_status(vid_id, "uploaded") else: self._log(f"❌ Upload failed: {video['title']}") self._update_status(vid_id, "upload_failed") # Clean up downloaded file try: os.remove(file_path) except Exception: pass def _update_status(self, vid_id: str, status: str): with self.lock: for item in self.queue: if item["video_id"] == vid_id: item["status"] = status break def _log(self, msg: str): ts = datetime.now(tz=MMT).strftime("%H:%M:%S") line = f"[{ts}] {msg}" logger.info(msg) with self.lock: self.log_buffer.append(line) if len(self.log_buffer) > 200: self.log_buffer = self.log_buffer[-200:] # ────────────────────────────────────────────── # Status helpers (Gradio UI အတွက်) # ────────────────────────────────────────────── def get_queue_display(self) -> str: with self.lock: if not self.queue: return "Queue ထဲတွင် video မရှိသေး" lines = [] for item in sorted(self.queue, key=lambda x: x["scheduled_time"]): t = item["scheduled_time"].strftime("%Y-%m-%d %H:%M:%S") lines.append( f"[{item['status'].upper()}] {t} MMT — {item['title'][:60]}" ) return "\n".join(lines) def get_upcoming_display(self) -> str: now_mmt = datetime.now(tz=MMT) with self.lock: upcoming = [ item for item in self.queue if item["status"] == "pending" and item["scheduled_time"] > now_mmt ] if not upcoming: return "Upcoming schedule မရှိသေး" upcoming.sort(key=lambda x: x["scheduled_time"]) lines = [] for item in upcoming[:10]: t = item["scheduled_time"].strftime("%Y-%m-%d %H:%M:%S") lines.append(f"🕐 {t} MMT — {item['title'][:60]}") return "\n".join(lines) def get_logs(self) -> str: with self.lock: return "\n".join(self.log_buffer[-50:]) def get_status_summary(self) -> str: with self.lock: total = len(self.queue) pending = sum(1 for i in self.queue if i["status"] == "pending") uploaded = sum(1 for i in self.queue if i["status"] == "uploaded") failed = sum(1 for i in self.queue if "failed" in i["status"]) state = "🟢 Running" if self.running else "🔴 Stopped" now = datetime.now(tz=MMT).strftime("%Y-%m-%d %H:%M:%S") return ( f"Bot Status: {state}\n" f"Current Time (MMT): {now}\n" f"Total Queued: {total} | Pending: {pending} | " f"Uploaded: {uploaded} | Failed: {failed}\n" f"Poll Interval: {POLL_INTERVAL}s" )