any-env-code / watcher.py
izuemon's picture
Update watcher.py
677e8ae verified
raw
history blame
6.13 kB
import os
import re
import time
import json
import requests
from datetime import datetime, timezone
from bs4 import BeautifulSoup
import ffmpeg
from tflink import TFLinkClient
# ===== Channel.io 設定 =====
GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages"
POST_URL = GET_URL
PARAMS = {
"sortOrder": "desc",
"limit": 36,
"logFolded": "false",
}
X_ACCOUNT = os.getenv("channeliotokenbot2")
if not X_ACCOUNT:
raise RuntimeError("環境変数 channeliotokenbot2 が設定されていません")
HEADERS_GET = {
"accept": "application/json",
"accept-language": "ja",
"x-account": X_ACCOUNT,
}
HEADERS_POST = {
"accept": "application/json",
"accept-language": "ja",
"content-type": "application/json",
"x-account": X_ACCOUNT,
}
# ===== ssyoutube =====
SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/"
# ===== Utils =====
def parse_updated_at(value):
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value / 1000, tz=timezone.utc)
elif isinstance(value, str):
return datetime.fromisoformat(value.replace("Z", "+00:00"))
return None
def extract_youtube_id(text):
patterns = [
r"v=([A-Za-z0-9_-]{11})",
r"youtu\.be/([A-Za-z0-9_-]{11})",
]
for p in patterns:
m = re.search(p, text)
if m:
return m.group(1)
return None
# ===== ssyoutube HTML 解析 =====
def fetch_download_links(youtube_url):
res = requests.post(
SSYOUTUBE_URL,
data={"videoURL": youtube_url},
timeout=30,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": "https://ssyoutube.online/",
}
)
res.raise_for_status()
soup = BeautifulSoup(res.text, "lxml")
buttons = soup.select("button[data-url]")
results = []
for btn in buttons:
url = btn.get("data-url")
quality = btn.get("data-quality") # 例: 1080p / 720p / None
has_audio = btn.get("data-has-audio") # "true" / "false" / None
if not url:
continue
results.append({
"url": url,
"quality": quality or "audio",
"has_audio": has_audio,
})
return results
def download_file(url, filename):
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return filename
def merge_video_audio(video_file, audio_file, output_file):
(
ffmpeg
.input(video_file)
.output(audio_file, output_file, vcodec='copy', acodec='aac', strict='experimental')
.run(overwrite_output=True)
)
return output_file
def upload_to_tflink(file_path):
client = TFLinkClient()
result = client.upload(file_path)
return result.download_link
def send_to_channel(text):
payload = {
"requestId": f"desk-web-{int(time.time() * 1000)}",
"blocks": [
{
"type": "text",
"value": text
}
],
"buttons": None,
"form": None,
"webPage": None,
"files": None,
"customPayload": None
}
res = requests.post(
POST_URL,
headers=HEADERS_POST,
data=json.dumps(payload),
timeout=30
)
res.raise_for_status()
def main():
while True:
try:
res = requests.get(
GET_URL,
headers=HEADERS_GET,
params=PARAMS,
timeout=30,
)
res.raise_for_status()
messages = res.json().get("messages", [])
latest_msg = None
latest_time = None
for msg in messages:
plain_text = msg.get("plainText")
updated_at = msg.get("updatedAt")
if not plain_text or updated_at is None:
continue
t = parse_updated_at(updated_at)
if not t:
continue
if latest_time is None or t > latest_time:
latest_time = t
latest_msg = msg
if not latest_msg:
time.sleep(10)
continue
text = latest_msg["plainText"]
youtube_id = extract_youtube_id(text)
if not youtube_id:
time.sleep(10)
continue
youtube_url = f"https://www.youtube.com/watch?v={youtube_id}"
items = fetch_download_links(youtube_url)
if not items:
print("ダウンロードリンクが取得できませんでした")
time.sleep(10)
continue
# ===== 最も高画質の動画と音声を選択 =====
video_items = [i for i in items if i["has_audio"] == "false"]
audio_items = [i for i in items if i["has_audio"] == "true" and i["quality"] == "audio"]
if not video_items or not audio_items:
print("動画または音声が見つかりません")
time.sleep(10)
continue
# 解像度順にソートして最初を取得
video_items.sort(key=lambda x: int(re.sub(r"\D", "", x["quality"] or "0")), reverse=True)
video_url = video_items[0]["url"]
audio_url = audio_items[0]["url"]
video_file = download_file(video_url, "video.mp4")
audio_file = download_file(audio_url, "audio.m4a")
output_file = "merged.mp4"
merge_video_audio(video_file, audio_file, output_file)
# tflink にアップロード
tflink_url = upload_to_tflink(output_file)
message_text = f"結合動画のダウンロード: {tflink_url}"
send_to_channel(message_text)
print("送信完了:", tflink_url)
except Exception as e:
print("エラー:", e)
time.sleep(15)
if __name__ == "__main__":
main()