| import os
|
| import time
|
| import requests
|
| from datetime import datetime, timedelta
|
| from zoneinfo import ZoneInfo
|
|
|
|
|
| CHANNEL_ID = os.getenv("CHANNEL_ID", "")
|
| GROUP_CHAT_ID = os.getenv("GROUP_CHAT_ID", "")
|
| X_ACCOUNT_TOKEN = os.getenv("X_ACCOUNT_TOKEN", "")
|
|
|
| JST = ZoneInfo("Asia/Tokyo")
|
|
|
| BASE_URL = f"https://desk-api.channel.io/desk/channels/{CHANNEL_ID}"
|
| GET_MESSAGES_URL = f"{BASE_URL}/groups/{GROUP_CHAT_ID}/messages"
|
| POST_MESSAGE_URL = GET_MESSAGES_URL
|
|
|
| HEADERS_GET = {
|
| "accept": "application/json",
|
| "accept-language": "ja",
|
| "x-account": X_ACCOUNT_TOKEN,
|
| }
|
|
|
| HEADERS_POST = {
|
| "accept": "application/json",
|
| "accept-language": "ja",
|
| "content-type": "application/json",
|
| "x-account": X_ACCOUNT_TOKEN,
|
| }
|
|
|
|
|
| RANKING_KEYWORD = "あけおめ"
|
| MAX_PARTICIPANTS = 6
|
| MONITOR_DURATION_MINUTES = 10
|
|
|
|
|
|
|
| def parse_updated_at(value):
|
| """メッセージのupdatedAtをパースしてdatetimeを返す"""
|
| if isinstance(value, (int, float)):
|
| return datetime.fromtimestamp(value / 1000, tz=JST)
|
| elif isinstance(value, str):
|
| dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
|
| return dt.astimezone(JST)
|
| return None
|
|
|
|
|
| def get_messages():
|
| """グループチャットのメッセージを取得"""
|
| params = {
|
| "sortOrder": "asc",
|
| "limit": 100,
|
| "logFolded": "false",
|
| }
|
|
|
| try:
|
| res = requests.get(
|
| GET_MESSAGES_URL,
|
| headers=HEADERS_GET,
|
| params=params,
|
| timeout=30,
|
| )
|
| res.raise_for_status()
|
| return res.json().get("messages", [])
|
| except Exception as e:
|
| print(f"メッセージ取得エラー: {e}")
|
| return []
|
|
|
|
|
| def post_message(text):
|
| """グループチャットにメッセージを送信"""
|
| payload = {
|
| "requestId": f"desk-web-{int(time.time() * 1000)}",
|
| "blocks": [
|
| {"type": "text", "value": text}
|
| ],
|
| }
|
|
|
| try:
|
| res = requests.post(
|
| POST_MESSAGE_URL,
|
| headers=HEADERS_POST,
|
| json=payload,
|
| timeout=30,
|
| )
|
| res.raise_for_status()
|
| print("メッセージ送信成功")
|
| return True
|
| except Exception as e:
|
| print(f"メッセージ送信エラー: {e}")
|
| return False
|
|
|
|
|
| def create_ranking_message(participants):
|
| """ランキングメッセージを作成"""
|
| if not participants:
|
| return "🏆 あけおめランキング 🏆\n\n参加者はいませんでした。"
|
|
|
| lines = ["🏆 あけおめランキング 🏆\n"]
|
|
|
| medals = ["🥇", "🥈", "🥉"]
|
|
|
| for i, p in enumerate(participants):
|
| rank = i + 1
|
| medal = medals[rank - 1] if rank <= 3 else f"{rank}位"
|
| time_str = p["time"].strftime("%H:%M:%S")
|
| name = p.get("name", "不明")
|
| lines.append(f"{medal} {name} ({time_str})")
|
|
|
| lines.append(f"\n参加人数: {len(participants)}人")
|
|
|
| return "\n".join(lines)
|
|
|
|
|
| def get_user_name(msg):
|
| """メッセージからユーザー名を取得"""
|
|
|
|
|
| name = msg.get("personName") or msg.get("name") or "不明"
|
| return name
|
|
|
|
|
|
|
| def wait_until_midnight():
|
| """次の0:00まで待機"""
|
| now = datetime.now(JST)
|
| tomorrow = now + timedelta(days=1)
|
| midnight = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
|
| wait_seconds = (midnight - now).total_seconds()
|
|
|
| if wait_seconds > 0:
|
| print(f"次の0:00まで {int(wait_seconds)}秒 待機します...")
|
| time.sleep(wait_seconds)
|
|
|
|
|
| def monitor_and_rank():
|
| """0:00から10分間監視し、あけおめランキングを作成"""
|
| print("あけおめ監視を開始します...")
|
|
|
| participants = []
|
| processed_ids = set()
|
| start_time = datetime.now(JST)
|
| end_time = start_time + timedelta(minutes=MONITOR_DURATION_MINUTES)
|
|
|
| print(f"監視期間: {start_time.strftime('%Y-%m-%d %H:%M:%S')} 〜 {end_time.strftime('%H:%M:%S')}")
|
|
|
| while datetime.now(JST) < end_time:
|
|
|
| now = datetime.now(JST)
|
|
|
|
|
| messages = get_messages()
|
|
|
| for msg in messages:
|
| msg_id = msg.get("id")
|
| if msg_id in processed_ids:
|
| continue
|
|
|
| plain_text = msg.get("plainText", "")
|
| updated_at = msg.get("updatedAt")
|
|
|
| if not plain_text or not updated_at:
|
| continue
|
|
|
|
|
| msg_time = parse_updated_at(updated_at)
|
| if not msg_time:
|
| continue
|
|
|
|
|
| midnight_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
| if msg_time < midnight_today:
|
| processed_ids.add(msg_id)
|
| continue
|
|
|
|
|
| if RANKING_KEYWORD in plain_text:
|
| user_name = get_user_name(msg)
|
|
|
|
|
| if not any(p["id"] == msg_id for p in participants):
|
| participants.append({
|
| "id": msg_id,
|
| "name": user_name,
|
| "time": msg_time,
|
| })
|
| print(f"参加者を検知: {user_name} ({msg_time.strftime('%H:%M:%S')})")
|
|
|
| processed_ids.add(msg_id)
|
| else:
|
| processed_ids.add(msg_id)
|
|
|
|
|
| if len(participants) >= MAX_PARTICIPANTS:
|
| print(f"{MAX_PARTICIPANTS}人達成!ランキングを発表します。")
|
| break
|
|
|
|
|
| time.sleep(5)
|
|
|
|
|
| ranking_msg = create_ranking_message(participants)
|
| post_message(ranking_msg)
|
|
|
| print("ランキング発表完了")
|
|
|
|
|
| def check_config():
|
| """設定が正しいかチェック"""
|
| missing = []
|
|
|
| if not CHANNEL_ID:
|
| missing.append("CHANNEL_ID")
|
| if not GROUP_CHAT_ID:
|
| missing.append("GROUP_CHAT_ID")
|
| if not X_ACCOUNT_TOKEN:
|
| missing.append("X_ACCOUNT_TOKEN")
|
|
|
| if missing:
|
| print("エラー: 以下の環境変数が設定されていません:")
|
| for var in missing:
|
| print(f" - {var}")
|
| return False
|
|
|
| return True
|
|
|
|
|
| def main():
|
| """メインループ"""
|
| print("あけおめランキングボットを起動します...")
|
|
|
| if not check_config():
|
| print("設定エラーのため終了します。")
|
| return
|
|
|
| print("設定確認OK。毎日0:00からの監視を開始します。")
|
|
|
| while True:
|
| try:
|
|
|
| wait_until_midnight()
|
|
|
|
|
| monitor_and_rank()
|
|
|
|
|
| time.sleep(60)
|
|
|
| except KeyboardInterrupt:
|
| print("ボットを停止します。")
|
| break
|
| except Exception as e:
|
| print(f"エラーが発生しました: {e}")
|
| time.sleep(10)
|
|
|
|
|
| if __name__ == "__main__":
|
| main() |