AI_SNS / services /scheduler.py
Corin1998's picture
Create scheduler.py
c65ba31 verified
import os
from datetime import datetime, timezone
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.date import DateTrigger
import asyncio
from db import SessionLocal, Post
scheduler = AsyncIOScheduler(timezone="UTC")
def ensure_scheduler_started(sched: AsyncIOScheduler):
if not sched.running:
sched.start()
def schedule_post_job(post_id: int, platform: str, text: str, image_prompt: str | None, scheduled_at_iso: str):
"""予約投稿ジョブ登録"""
when = _parse_iso(scheduled_at_iso)
trigger = DateTrigger(run_date=when)
scheduler.add_job(_execute_post_job, trigger, args=[post_id, platform, text, image_prompt], id=f"post:{post_id}", replace_existing=True)
def _parse_iso(s: str) -> datetime:
# 簡易ISOパース(末尾Z対応)
s = s.strip()
if s.endswith("Z"):
s = s[:-1]
dt = datetime.fromisoformat(s).replace(tzinfo=timezone.utc)
else:
dt = datetime.fromisoformat(s)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
async def _execute_post_job(post_id: int, platform: str, text: str, image_prompt: str | None):
"""実投稿(アダプタに委譲)。成功したらDBをpublishedに。"""
ok = await _post_to_platform(platform, text, image_prompt)
session = SessionLocal()
try:
post = session.get(Post, post_id)
if post:
post.status = "published" if ok else "failed"
session.commit()
finally:
session.close()
async def _post_to_platform(platform: str, text: str, image_prompt: str | None) -> bool:
# 実運用: 本アダプタを呼ぶ
try:
if platform == "x":
from .social_adapters.x_adapter import post_to_x
return await post_to_x(text, image_prompt)
elif platform == "instagram":
from .social_adapters.instagram_adapter import post_to_instagram
return await post_to_instagram(text, image_prompt)
else:
return False
except Exception:
return False