any-env-code / yt.py
izuemon's picture
Rename watcher.py to yt.py
7567b6e verified
import os
import re
import time
import json
import requests
from datetime import datetime, timezone
from bs4 import BeautifulSoup
# ===== Channel.io 設定 =====
GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages"
POST_URL = GET_URL
PARAMS = {
"sortOrder": "desc",
"limit": 36,
"logFolded": "false",
}
X_ACCOUNT = os.getenv("channeliotokenbot2")
if not X_ACCOUNT:
raise RuntimeError("環境変数 channeliotokenbot2 が設定されていません")
HEADERS_GET = {
"accept": "application/json",
"accept-language": "ja",
"x-account": X_ACCOUNT,
}
HEADERS_POST = {
"accept": "application/json",
"accept-language": "ja",
"content-type": "application/json",
"x-account": X_ACCOUNT,
}
# ===== ssyoutube =====
SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/"
# ===== Utils =====
def parse_updated_at(value):
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value / 1000, tz=timezone.utc)
elif isinstance(value, str):
return datetime.fromisoformat(value.replace("Z", "+00:00"))
return None
def extract_youtube_id(text):
patterns = [
# クエリパラメータ v= または vi=
r"(?:[?&](?:v|vi)=)([A-Za-z0-9_-]{11})",
# youtu.be 短縮リンク
r"youtu\.be/([A-Za-z0-9_-]{11})",
# /embed/ /v/ /shorts/
r"youtube(?:-nocookie)?\.com/(?:embed|v|shorts)/([A-Za-z0-9_-]{11})",
# watch? の中に v= が含まれるケース(ioパラメータ等が混ざっても OK)
r"youtube(?:-nocookie)?\.com/watch\?.*?v=([A-Za-z0-9_-]{11})",
# hash や他パラメータに混ざるケース: ...#v=xxxx or &v=xxxx
r"[#&]v=([A-Za-z0-9_-]{11})"
]
for p in patterns:
m = re.search(p, text)
if m:
return m.group(1)
return None
def extract_youtube_and_option(text):
"""
YouTube URL と画質指定オプションを抽出する
戻り値: (video_id, option) 例: ("abcdEFGHijk", "720")
"""
# 全角スペースも対象に
parts = re.split(r"[ \u3000]+", text.strip())
video_id = extract_youtube_id(parts[0])
option = None
if len(parts) > 1:
option = parts[1].lower()
return video_id, option
def normalize_quality(option):
"""
★ ユーザー入力を内部品質コードへ変換(全角も対応)
"""
if option is None:
return "720"
# 全角 → 半角 に変換
# 例: "720" → "720", "4k" → "4k"
trans_table = str.maketrans(
"0123456789kKpP",
"0123456789kKpP"
)
option = option.translate(trans_table).lower()
quality_map = {
"144": "144",
"240": "240",
"360": "360",
"480": "480",
"720": "720",
"1080": "1080",
"2k": "1440",
"1440": "1440",
"4k": "2160",
"2160": "2160",
"8k": "4320",
"4320": "4320",
"144p": "144",
"240p": "240",
"360p": "360",
"480p": "480",
"720p": "720",
"1080p": "1080",
"2kp": "1440",
"1440p": "1440",
"4kp": "2160",
"2160p": "2160",
"8kp": "4320",
"4320p": "4320",
}
if option in quality_map:
return quality_map[option]
if option in ["a", "f"]:
return option # 特殊指定
return "720" # デフォルト
def get_video_title(youtube_url):
"""YouTube動画のタイトルを取得する簡易的な関数"""
try:
res = requests.get(youtube_url, timeout=10)
soup = BeautifulSoup(res.text, "lxml")
title_tag = soup.find("meta", property="og:title")
if title_tag:
return title_tag.get("content", "").replace(" - YouTube", "")
except:
pass
return "YouTube動画"
# ===== ssyoutube HTML 解析 =====
def fetch_download_links(youtube_url):
res = requests.post(
SSYOUTUBE_URL,
data={"videoURL": youtube_url},
timeout=20,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": "https://ssyoutube.online/",
}
)
res.raise_for_status()
# nonceを抽出
soup = BeautifulSoup(res.text, "lxml")
nonce_match = re.search(r"formData\.append\('nonce', '([^']+)'\);", res.text)
nonce = nonce_match.group(1) if nonce_match else None
buttons = soup.select("button[data-url]")
results = []
for btn in buttons:
url = btn.get("data-url")
quality = btn.get("data-quality")
has_audio = btn.get("data-has-audio")
if not url:
continue
results.append({
"url": url,
"quality": quality or "audio",
"has_audio": has_audio,
})
return results, nonce
# ===== 最高画質映像 + 音声を選択 =====
def select_best_video_and_audio(items):
video_items = []
audio_items = []
for item in items:
if item["quality"] == "audio":
audio_items.append(item)
elif item["has_audio"] == "false":
m = re.match(r"(\d+)p", item["quality"])
if m:
video_items.append((int(m.group(1)), item))
if not video_items or not audio_items:
return None, None
# 最高画質の動画を選択
video_items.sort(key=lambda x: x[0], reverse=True)
best_video = video_items[0][1]
# 最高音質の音声を選択(例: 128kbps > 64kbps)
# 実際の音声品質情報に基づいて選択する場合は要修正
best_audio = audio_items[0] if audio_items else None
return best_video, best_audio
def select_quality_video(items, quality):
"""
★ 指定画質に最適な動画を選択(音声なし映像のみ)
"""
video_candidates = []
for item in items:
if item["has_audio"] == "false":
m = re.match(r"(\d+)p", item["quality"])
if m:
q = int(m.group(1))
video_candidates.append((q, item))
if not video_candidates:
return None
# 最大画質
video_candidates.sort(key=lambda x: x[0])
if quality == "f": # 最大画質
return video_candidates[-1][1]
req = int(quality)
best = None
# 指定画質以上で一番近いもの
for q, item in video_candidates:
if q >= req:
best = item
break
# 無ければ最大画質
return best if best else video_candidates[-1][1]
# ===== ssyoutube サーバー側結合 =====
def merge_video_on_server(video_url, audio_url, video_id, quality, video_title, nonce):
"""
ssyoutubeサーバーで動画と音声を結合する
"""
# IDと画質からリクエストIDを生成
request_id = f"{video_id}_{quality}p"
# リクエストデータの構築
request_data = {
"id": request_id,
"ttl": 3600000,
"inputs": [
{
"url": video_url,
"ext": "mp4",
"chunkDownload": {
"type": "header",
"size": 52428800,
"concurrency": 3
}
},
{
"url": audio_url,
"ext": "m4a"
}
],
"output": {
"ext": "mp4",
"downloadName": f"{video_title}_{quality}p.mp4",
"chunkUpload": {
"size": 104857600,
"concurrency": 3
}
},
"operation": {
"type": "replace_audio_in_video"
}
}
# 結合開始リクエスト
payload = {
"action": "process_video_merge",
"nonce": nonce,
"request_data": json.dumps(request_data)
}
headers = {
"User-Agent": "Mozilla/5.0",
"Referer": "https://ssyoutube.online/",
"Origin": "https://ssyoutube.online",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
res = requests.post(
"https://ssyoutube.online/wp-admin/admin-ajax.php",
data=payload,
headers=headers,
timeout=60
)
res.raise_for_status()
result = res.json()
if not result.get("success") or not result.get("data", {}).get("success"):
raise RuntimeError(f"結合開始に失敗しました: {result}")
# ステータス監視用URLを取得
monitor_url = result["data"]["result"]["monitor"]["http"]
# 完了するまでポーリング
max_attempts = 5 # 最大60回試行(30秒間隔で約30分)
attempt = 0
while attempt < max_attempts:
time.sleep(20) # 30秒間隔でチェック
status_res = requests.get(monitor_url, timeout=30)
status_res.raise_for_status()
status_data = status_res.json()
if not status_data.get("success"):
raise RuntimeError(f"ステータス取得に失敗しました: {status_data}")
status = status_data["result"]["status"]
if status == "done":
# 完了したらダウンロードURLを返す
return status_data["result"]["output"]["url"]
elif status == "error":
error_msg = status_data["result"]["error"]
raise RuntimeError(f"結合処理でエラーが発生しました: {error_msg}")
# "processing" の場合は継続
attempt += 1
print(f"結合処理中... 進捗: {status_data['result'].get('progress_in_percent', 0)}%")
raise RuntimeError("結合処理がタイムアウトしました")
def send_to_channel(text):
payload = {
"requestId": f"desk-web-{int(time.time() * 1000)}",
"blocks": [
{"type": "text", "value": text}
],
}
res = requests.post(
POST_URL,
headers=HEADERS_POST,
data=json.dumps(payload),
timeout=30
)
res.raise_for_status()
def upload_file_to_channel(file_url):
# mp4バイナリ取得
file_res = requests.get(file_url, timeout=60)
file_res.raise_for_status()
upload_url = "https://media.channel.io/cht/v1/pri-file/200605/groups/519217/message/send_yt_video_file.mp4"
headers = {
"x-account": X_ACCOUNT,
"Content-Type": "video/mp4",
"Content-Length": str(len(file_res.content)),
}
res = requests.post(
upload_url,
headers=headers,
data=file_res.content,
timeout=60
)
res.raise_for_status()
return res.json() # ←これを messages POST の body.files に入れる
def send_video_message(file_json):
request_id = f"desk-web-{int(time.time() * 1000)}"
payload = {
"requestId": request_id,
"blocks": [
{"type": "text", "value": "プレビュー:"}
],
"files": [file_json], # ←ここが重要!
}
res = requests.post(
POST_URL,
headers=HEADERS_POST,
data=json.dumps(payload),
timeout=30
)
res.raise_for_status()
# ===== Main =====
def main():
processed_messages = set() # 処理済みメッセージを追跡
while True:
try:
res = requests.get(
GET_URL,
headers=HEADERS_GET,
params=PARAMS,
timeout=30,
)
res.raise_for_status()
messages = res.json().get("messages", [])
latest_msg = None
latest_time = None
for msg in messages:
msg_id = msg.get("id")
plain_text = msg.get("plainText")
updated_at = msg.get("updatedAt")
if not plain_text or updated_at is None:
continue
# 既に処理済みのメッセージはスキップ
if msg_id in processed_messages:
continue
t = parse_updated_at(updated_at)
if not t:
continue
if latest_time is None or t > latest_time:
latest_time = t
latest_msg = msg
if not latest_msg:
time.sleep(10)
continue
video_id, option = extract_youtube_and_option(latest_msg["plainText"])
if not video_id:
time.sleep(10)
continue
valid_options = ["144", "240", "360", "480", "720", "1080", "1440", "2160", "4320", "4k", "8k", "2k", "a", "f"]
# 画質入力チェックを normalize_quality の前にする
if option is None:
send_to_channel(
"画質が指定されていないため、デフォルトの720pを使用します。\n"
"指定する場合は、動画URLの後にスペースを入れて、"
"144/240/360/480/720/1080/2k(または1440)/4k(または2160)/8k(または4320)/a(結合しない)/f(最高画質) のいずれかを入力してください。\n注意:最大画質や大きい画質、長い動画だと処理に時間がかかるからタイムアウトしやすいです。"
)
else:
opt_lower = option.lower()
if opt_lower not in valid_options:
send_to_channel(
"不明な画質、または予期しない形式でした。\n"
"対応画質: 144, 240, 360, 480, 720, 1080, "
"2k(1440), 4k(2160), 8k(4320), a(結合無効), f(最大画質)\n"
"動画のURLの後にスペースを入れて指定してください。\n"
"例: https://youtu.be/abcdEFGHijk 1080\n注意:最大画質や大きい画質、長い動画だと処理に時間がかかるからタイムアウトしやすいです。"
)
processed_messages.add(latest_msg["id"])
continue
quality_opt = normalize_quality(option)
youtube_url = f"https://www.youtube.com/watch?v={video_id}"
send_to_channel(f"{video_id} のダウンロードを開始します。")
video_title = get_video_title(youtube_url)
items, nonce = fetch_download_links(youtube_url)
all_items = items[:]
audio_items = [i for i in items if i["quality"] == "audio"]
selected_video = None
selected_audio = audio_items[0] if audio_items else None
if quality_opt not in ["a"]:
selected_video = select_quality_video(items, quality_opt)
# ダウンロードリスト送信
message_lines = []
message_lines.append(f"<b>{video_title}</b>\n")
message_lines.append("<b>ダウンロードリンク</b>")
# すべての動画リンクを追加
for item in all_items:
quality = item["quality"]
if item["quality"] == "audio":
quality_display = "音声"
elif item["has_audio"] == "true":
quality_display = f"{quality} (音声付き)"
else:
quality_display = f"{quality} (映像のみ)"
message_lines.append(f"<link type=\"url\" value=\"{item['url']}\"> {quality_display} </link>")
message_lines.append("★動画ファイルと音声ファイルをダウンロードして<link type=\"url\" value=\"https://qooly.com/ja/merge-video-and-audio-online\">ここ</link>などで結合すると早くて確実です。")
send_to_channel("\n".join(message_lines))
# ★ 結合フラグ
do_merge = quality_opt not in ["a"]
if do_merge and selected_video and selected_audio:
m = re.match(r"(\d+)p", selected_video["quality"])
if m:
q = m.group(1)
send_to_channel(f"{q}pで音声との結合を開始しました。")
try:
merged_url = merge_video_on_server(
selected_video["url"],
selected_audio["url"],
video_id,
q,
video_title,
nonce
)
send_to_channel(f"<link type=\"url\" value=\"{merged_url}\">{q}p 結合済み動画</link>")
file_json = upload_file_to_channel(merged_url)
send_video_message(file_json)
except Exception as e:
send_to_channel(f"結合失敗: {e}")
send_to_channel("完了しました!")
processed_messages.add(latest_msg["id"])
print(f"送信完了: {video_id}")
except Exception as e:
print("エラー:", e)
import traceback
traceback.print_exc()
time.sleep(15)
if __name__ == "__main__":
main()