| import os |
| from datetime import datetime, timezone |
| from apscheduler.schedulers.asyncio import AsyncIOScheduler |
| from apscheduler.triggers.date import DateTrigger |
| import asyncio |
|
|
| from db import SessionLocal, Post |
|
|
| scheduler = AsyncIOScheduler(timezone="UTC") |
|
|
| def ensure_scheduler_started(sched: AsyncIOScheduler): |
| if not sched.running: |
| sched.start() |
|
|
| def schedule_post_job(post_id: int, platform: str, text: str, image_prompt: str | None, scheduled_at_iso: str): |
| """予約投稿ジョブ登録""" |
| when = _parse_iso(scheduled_at_iso) |
| trigger = DateTrigger(run_date=when) |
| scheduler.add_job(_execute_post_job, trigger, args=[post_id, platform, text, image_prompt], id=f"post:{post_id}", replace_existing=True) |
|
|
| def _parse_iso(s: str) -> datetime: |
| |
| s = s.strip() |
| if s.endswith("Z"): |
| s = s[:-1] |
| dt = datetime.fromisoformat(s).replace(tzinfo=timezone.utc) |
| else: |
| dt = datetime.fromisoformat(s) |
| if dt.tzinfo is None: |
| dt = dt.replace(tzinfo=timezone.utc) |
| return dt |
|
|
| async def _execute_post_job(post_id: int, platform: str, text: str, image_prompt: str | None): |
| """実投稿(アダプタに委譲)。成功したらDBをpublishedに。""" |
| ok = await _post_to_platform(platform, text, image_prompt) |
| session = SessionLocal() |
| try: |
| post = session.get(Post, post_id) |
| if post: |
| post.status = "published" if ok else "failed" |
| session.commit() |
| finally: |
| session.close() |
|
|
| async def _post_to_platform(platform: str, text: str, image_prompt: str | None) -> bool: |
| |
| try: |
| if platform == "x": |
| from .social_adapters.x_adapter import post_to_x |
| return await post_to_x(text, image_prompt) |
| elif platform == "instagram": |
| from .social_adapters.instagram_adapter import post_to_instagram |
| return await post_to_instagram(text, image_prompt) |
| else: |
| return False |
| except Exception: |
| return False |
|
|