| import os |
| import threading |
| from datetime import datetime, timezone |
| from typing import Optional |
| import pytz |
| from apscheduler.schedulers.background import BackgroundScheduler |
| from apscheduler.triggers.date import DateTrigger |
|
|
| from bot.monitor import fetch_latest_videos |
| from bot.downloader import download_video |
| from bot.uploader import upload_to_tiktok |
| from bot.logger import get_logger |
|
|
| logger = get_logger("scheduler") |
| MMT = pytz.timezone("Asia/Yangon") |
|
|
| |
| POLL_INTERVAL = 150 |
|
|
|
|
| class BotScheduler: |
| def __init__(self): |
| self.scheduler = BackgroundScheduler(timezone=MMT) |
| self.seen_ids: set = set() |
| self.queue: list[dict] = [] |
| self.lock = threading.Lock() |
| self.running = False |
| self.log_buffer: list[str] = [] |
|
|
| |
| |
| |
| def start(self): |
| if self.running: |
| return |
| self.running = True |
| self.scheduler.start() |
| |
| self.scheduler.add_job( |
| self._poll_channel, |
| "interval", |
| seconds=POLL_INTERVAL, |
| id="poll_channel", |
| next_run_time=datetime.now(tz=MMT), |
| ) |
| self._log("Bot started โ
") |
| logger.info("Bot started") |
|
|
| def stop(self): |
| if not self.running: |
| return |
| self.running = False |
| self.scheduler.shutdown(wait=False) |
| self.scheduler = BackgroundScheduler(timezone=MMT) |
| self._log("Bot stopped ๐") |
| logger.info("Bot stopped") |
|
|
| |
| |
| |
| def _poll_channel(self): |
| """RSS แแญแฏ แ
แ
แบแแฑแธแแผแฎแธ แกแแ
แบแแแบแแฑแฌ video แแปแฌแธแแญแฏ schedule แแฏแแบแแแบ""" |
| self._log("๐ Checking RSS feed...") |
| new_videos = fetch_latest_videos(self.seen_ids) |
|
|
| for video in new_videos: |
| with self.lock: |
| self.seen_ids.add(video["video_id"]) |
|
|
| pub_mmt: datetime = video["published_mmt"] |
| now_mmt = datetime.now(tz=MMT) |
|
|
| |
| if pub_mmt <= now_mmt: |
| self._log(f"โก Immediate upload (past time): {video['title']}") |
| self.scheduler.add_job( |
| self._process_video, |
| trigger=DateTrigger(run_date=now_mmt), |
| args=[video], |
| id=f"upload_{video['video_id']}", |
| replace_existing=True, |
| ) |
| else: |
| |
| self._log( |
| f"๐
Scheduled: '{video['title']}' @ " |
| f"{pub_mmt.strftime('%Y-%m-%d %H:%M:%S')} MMT" |
| ) |
| self.scheduler.add_job( |
| self._process_video, |
| trigger=DateTrigger(run_date=pub_mmt), |
| args=[video], |
| id=f"upload_{video['video_id']}", |
| replace_existing=True, |
| ) |
|
|
| with self.lock: |
| self.queue.append({ |
| "video_id": video["video_id"], |
| "title": video["title"], |
| "scheduled_time": pub_mmt, |
| "status": "pending", |
| }) |
|
|
| def _process_video(self, video: dict): |
| """Download + Upload pipeline""" |
| vid_id = video["video_id"] |
| self._log(f"โฌ๏ธ Downloading: {video['title']}") |
|
|
| file_path = download_video(video) |
| if not file_path: |
| self._log(f"โ Download failed: {video['title']}") |
| self._update_status(vid_id, "download_failed") |
| return |
|
|
| self._log(f"โฌ๏ธ Uploading to TikTok: {video['title']}") |
| success = upload_to_tiktok(video, file_path) |
|
|
| if success: |
| self._log(f"โ
Upload success: {video['title']}") |
| self._update_status(vid_id, "uploaded") |
| else: |
| self._log(f"โ Upload failed: {video['title']}") |
| self._update_status(vid_id, "upload_failed") |
|
|
| |
| try: |
| os.remove(file_path) |
| except Exception: |
| pass |
|
|
| def _update_status(self, vid_id: str, status: str): |
| with self.lock: |
| for item in self.queue: |
| if item["video_id"] == vid_id: |
| item["status"] = status |
| break |
|
|
| def _log(self, msg: str): |
| ts = datetime.now(tz=MMT).strftime("%H:%M:%S") |
| line = f"[{ts}] {msg}" |
| logger.info(msg) |
| with self.lock: |
| self.log_buffer.append(line) |
| if len(self.log_buffer) > 200: |
| self.log_buffer = self.log_buffer[-200:] |
|
|
| |
| |
| |
| def get_queue_display(self) -> str: |
| with self.lock: |
| if not self.queue: |
| return "Queue แแฒแแฝแแบ video แแแพแญแแฑแธ" |
| lines = [] |
| for item in sorted(self.queue, key=lambda x: x["scheduled_time"]): |
| t = item["scheduled_time"].strftime("%Y-%m-%d %H:%M:%S") |
| lines.append( |
| f"[{item['status'].upper()}] {t} MMT โ {item['title'][:60]}" |
| ) |
| return "\n".join(lines) |
|
|
| def get_upcoming_display(self) -> str: |
| now_mmt = datetime.now(tz=MMT) |
| with self.lock: |
| upcoming = [ |
| item for item in self.queue |
| if item["status"] == "pending" and item["scheduled_time"] > now_mmt |
| ] |
| if not upcoming: |
| return "Upcoming schedule แแแพแญแแฑแธ" |
| upcoming.sort(key=lambda x: x["scheduled_time"]) |
| lines = [] |
| for item in upcoming[:10]: |
| t = item["scheduled_time"].strftime("%Y-%m-%d %H:%M:%S") |
| lines.append(f"๐ {t} MMT โ {item['title'][:60]}") |
| return "\n".join(lines) |
|
|
| def get_logs(self) -> str: |
| with self.lock: |
| return "\n".join(self.log_buffer[-50:]) |
|
|
| def get_status_summary(self) -> str: |
| with self.lock: |
| total = len(self.queue) |
| pending = sum(1 for i in self.queue if i["status"] == "pending") |
| uploaded = sum(1 for i in self.queue if i["status"] == "uploaded") |
| failed = sum(1 for i in self.queue if "failed" in i["status"]) |
| state = "๐ข Running" if self.running else "๐ด Stopped" |
| now = datetime.now(tz=MMT).strftime("%Y-%m-%d %H:%M:%S") |
| return ( |
| f"Bot Status: {state}\n" |
| f"Current Time (MMT): {now}\n" |
| f"Total Queued: {total} | Pending: {pending} | " |
| f"Uploaded: {uploaded} | Failed: {failed}\n" |
| f"Poll Interval: {POLL_INTERVAL}s" |
| ) |
|
|