Spaces:
Paused
Paused
File size: 3,532 Bytes
c382cb0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | import os
import time
import json
import requests
import hashlib
from datetime import datetime, timezone
from bs4 import BeautifulSoup
import xml.etree.ElementTree as ET
# ===== Channel.io 設定 =====
CHANNEL_ID = "200605"
GROUP_ID = "534457"
POST_URL = f"https://desk-api.channel.io/desk/channels/{CHANNEL_ID}/groups/{GROUP_ID}/messages"
X_ACCOUNT = os.getenv("channeliotokenbot2")
if not X_ACCOUNT:
raise RuntimeError("環境変数 channeliotokenbot2 が設定されていません")
HEADERS_POST = {
"accept": "application/json",
"accept-language": "ja",
"content-type": "application/json",
"x-account": X_ACCOUNT,
}
# ===== RSS =====
RSS_URL = "https://www.nippon.com/ja/rss-all/"
# ===== 設定 =====
INTERVAL_SECONDS = 60 * 60 * 12 # 12時間
SENT_LOG_FILE = "sent_nippon_news.json"
# ===== Utils =====
def load_sent_log():
if not os.path.exists(SENT_LOG_FILE):
return set()
with open(SENT_LOG_FILE, "r", encoding="utf-8") as f:
return set(json.load(f))
def save_sent_log(sent_set):
with open(SENT_LOG_FILE, "w", encoding="utf-8") as f:
json.dump(list(sent_set), f, ensure_ascii=False, indent=2)
def hash_link(link: str) -> str:
"""リンクをハッシュ化(重複防止用)"""
return hashlib.sha256(link.encode("utf-8")).hexdigest()
def send_to_channel(text):
payload = {
"requestId": f"desk-web-{int(time.time() * 1000)}",
"blocks": [
{"type": "text", "value": text}
],
}
res = requests.post(
POST_URL,
headers=HEADERS_POST,
data=json.dumps(payload),
timeout=30
)
res.raise_for_status()
def fetch_rss_items():
res = requests.get(RSS_URL, timeout=30)
res.raise_for_status()
root = ET.fromstring(res.content)
channel = root.find("channel")
if channel is None:
return []
items = []
for item in channel.findall("item"):
title = item.findtext("title", "").strip()
link = item.findtext("link", "").strip()
description_raw = item.findtext("description", "").strip()
# CDATA 内のHTMLをテキスト化
soup = BeautifulSoup(description_raw, "lxml")
description = soup.get_text(strip=True)
if title and link:
items.append({
"title": title,
"link": link,
"description": description,
})
return items
# ===== Main =====
def main():
sent_log = load_sent_log()
while True:
try:
items = fetch_rss_items()
new_count = 0
for item in items:
link_hash = hash_link(item["link"])
if link_hash in sent_log:
continue
message = (
f"<link type=\"url\" value=\"{item['link']}\">"
f"{item['title']}"
f"</link>\n\n"
f"{item['description']}"
)
send_to_channel(message)
sent_log.add(link_hash)
new_count += 1
time.sleep(1) # 連投防止
if new_count > 0:
save_sent_log(sent_log)
print(f"{new_count} 件のニュースを送信しました")
else:
print("新しいニュースはありません")
except Exception as e:
print("エラー:", e)
time.sleep(INTERVAL_SECONDS)
if __name__ == "__main__":
main()
|