any-env-code / jihou.py
izuemon's picture
Update jihou.py
af09506 verified
raw
history blame
3.01 kB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import time
import requests
from typing import List, Dict, Any
# ===== 設定 =====
X_ACCOUNT = os.getenv("dmsendertoken")
if not X_ACCOUNT:
raise RuntimeError("環境変数 dmsendertoken が設定されていません")
BASE = "https://desk-api.channel.io/desk/channels/200605"
GROUP_ID = 463667
CHECK_PERSON_ID = "604730"
HEADERS = {
"accept": "application/json",
"accept-language": "ja",
"content-type": "application/json",
"x-account": X_ACCOUNT,
}
GET_PARAMS = {
"sortOrder": "desc",
"limit": 34,
"logFolded": "false",
}
WELCOME_TEXT = "こんにちは。ITRSAにようこそ。自動でいくつかの部屋に招待しています。"
INVITE_GROUPS = [519217, 536194, 534868]
# ===== API =====
def get_messages() -> List[Dict[str, Any]]:
url = f"{BASE}/groups/{GROUP_ID}/messages"
r = requests.get(url, headers=HEADERS, params=GET_PARAMS, timeout=20)
r.raise_for_status()
return r.json().get("messages", [])
def post_welcome_message() -> None:
url = f"{BASE}/groups/{GROUP_ID}/messages"
payload = {
"requestId": f"desk-web-{int(time.time() * 1000)}",
"blocks": [
{"type": "text", "value": WELCOME_TEXT}
]
}
requests.post(url, headers=HEADERS, json=payload, timeout=20).raise_for_status()
print("[INFO] 歓迎メッセージを送信しました")
def invite_person(group_id: int, person_id: str) -> None:
url = f"{BASE}/groups/{group_id}/invite"
params = {"managerIds": person_id}
requests.post(url, headers=HEADERS, params=params, timeout=20).raise_for_status()
print(f"[INFO] personId={person_id} を group {group_id} に招待しました")
# ===== ロジック =====
def process():
messages = get_messages()
# personId=599642 の最新発言時刻
latest_599642 = max(
(int(m.get("createdAt", 0))
for m in messages
if str(m.get("personId")) == CHECK_PERSON_ID),
default=0
)
# 条件を満たす join メッセージを抽出
join_targets = []
for m in messages:
log = m.get("log") or {}
if log.get("action") != "join":
continue
created_at = int(m.get("createdAt", 0))
person_id = str(m.get("personId", ""))
# join 以降に 599642 の発言が無い
if created_at > latest_599642:
join_targets.append(person_id)
if not join_targets:
return
# 重複排除
join_targets = list(set(join_targets))
# 歓迎メッセージは1回だけ
post_welcome_message()
# 全員を各グループへ招待
for pid in join_targets:
for gid in INVITE_GROUPS:
invite_person(gid, pid)
def main():
print("[INFO] Bot 起動(10秒間隔)")
while True:
try:
process()
except Exception as e:
print(f"[ERROR] {e}")
time.sleep(10)
if __name__ == "__main__":
main()