any-env-code / yt.py
izuemon's picture
Rename watcher.py to yt.py
7567b6e verified
raw
history blame
17.3 kB
import os
import re
import time
import json
import requests
from datetime import datetime, timezone
from bs4 import BeautifulSoup
# ===== Channel.io 設定 =====
GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages"
POST_URL = GET_URL
PARAMS = {
"sortOrder": "desc",
"limit": 36,
"logFolded": "false",
}
X_ACCOUNT = os.getenv("channeliotokenbot2")
if not X_ACCOUNT:
raise RuntimeError("環境変数 channeliotokenbot2 が設定されていません")
HEADERS_GET = {
"accept": "application/json",
"accept-language": "ja",
"x-account": X_ACCOUNT,
}
HEADERS_POST = {
"accept": "application/json",
"accept-language": "ja",
"content-type": "application/json",
"x-account": X_ACCOUNT,
}
# ===== ssyoutube =====
SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/"
# ===== Utils =====
def parse_updated_at(value):
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value / 1000, tz=timezone.utc)
elif isinstance(value, str):
return datetime.fromisoformat(value.replace("Z", "+00:00"))
return None
def extract_youtube_id(text):
patterns = [
# クエリパラメータ v= または vi=
r"(?:[?&](?:v|vi)=)([A-Za-z0-9_-]{11})",
# youtu.be 短縮リンク
r"youtu\.be/([A-Za-z0-9_-]{11})",
# /embed/ /v/ /shorts/
r"youtube(?:-nocookie)?\.com/(?:embed|v|shorts)/([A-Za-z0-9_-]{11})",
# watch? の中に v= が含まれるケース(ioパラメータ等が混ざっても OK)
r"youtube(?:-nocookie)?\.com/watch\?.*?v=([A-Za-z0-9_-]{11})",
# hash や他パラメータに混ざるケース: ...#v=xxxx or &v=xxxx
r"[#&]v=([A-Za-z0-9_-]{11})"
]
for p in patterns:
m = re.search(p, text)
if m:
return m.group(1)
return None
def extract_youtube_and_option(text):
"""
YouTube URL と画質指定オプションを抽出する
戻り値: (video_id, option) 例: ("abcdEFGHijk", "720")
"""
# 全角スペースも対象に
parts = re.split(r"[ \u3000]+", text.strip())
video_id = extract_youtube_id(parts[0])
option = None
if len(parts) > 1:
option = parts[1].lower()
return video_id, option
def normalize_quality(option):
"""
★ ユーザー入力を内部品質コードへ変換(全角も対応)
"""
if option is None:
return "720"
# 全角 → 半角 に変換
# 例: "720" → "720", "4k" → "4k"
trans_table = str.maketrans(
"0123456789kKpP",
"0123456789kKpP"
)
option = option.translate(trans_table).lower()
quality_map = {
"144": "144",
"240": "240",
"360": "360",
"480": "480",
"720": "720",
"1080": "1080",
"2k": "1440",
"1440": "1440",
"4k": "2160",
"2160": "2160",
"8k": "4320",
"4320": "4320",
"144p": "144",
"240p": "240",
"360p": "360",
"480p": "480",
"720p": "720",
"1080p": "1080",
"2kp": "1440",
"1440p": "1440",
"4kp": "2160",
"2160p": "2160",
"8kp": "4320",
"4320p": "4320",
}
if option in quality_map:
return quality_map[option]
if option in ["a", "f"]:
return option # 特殊指定
return "720" # デフォルト
def get_video_title(youtube_url):
"""YouTube動画のタイトルを取得する簡易的な関数"""
try:
res = requests.get(youtube_url, timeout=10)
soup = BeautifulSoup(res.text, "lxml")
title_tag = soup.find("meta", property="og:title")
if title_tag:
return title_tag.get("content", "").replace(" - YouTube", "")
except:
pass
return "YouTube動画"
# ===== ssyoutube HTML 解析 =====
def fetch_download_links(youtube_url):
res = requests.post(
SSYOUTUBE_URL,
data={"videoURL": youtube_url},
timeout=20,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": "https://ssyoutube.online/",
}
)
res.raise_for_status()
# nonceを抽出
soup = BeautifulSoup(res.text, "lxml")
nonce_match = re.search(r"formData\.append\('nonce', '([^']+)'\);", res.text)
nonce = nonce_match.group(1) if nonce_match else None
buttons = soup.select("button[data-url]")
results = []
for btn in buttons:
url = btn.get("data-url")
quality = btn.get("data-quality")
has_audio = btn.get("data-has-audio")
if not url:
continue
results.append({
"url": url,
"quality": quality or "audio",
"has_audio": has_audio,
})
return results, nonce
# ===== 最高画質映像 + 音声を選択 =====
def select_best_video_and_audio(items):
video_items = []
audio_items = []
for item in items:
if item["quality"] == "audio":
audio_items.append(item)
elif item["has_audio"] == "false":
m = re.match(r"(\d+)p", item["quality"])
if m:
video_items.append((int(m.group(1)), item))
if not video_items or not audio_items:
return None, None
# 最高画質の動画を選択
video_items.sort(key=lambda x: x[0], reverse=True)
best_video = video_items[0][1]
# 最高音質の音声を選択(例: 128kbps > 64kbps)
# 実際の音声品質情報に基づいて選択する場合は要修正
best_audio = audio_items[0] if audio_items else None
return best_video, best_audio
def select_quality_video(items, quality):
"""
★ 指定画質に最適な動画を選択(音声なし映像のみ)
"""
video_candidates = []
for item in items:
if item["has_audio"] == "false":
m = re.match(r"(\d+)p", item["quality"])
if m:
q = int(m.group(1))
video_candidates.append((q, item))
if not video_candidates:
return None
# 最大画質
video_candidates.sort(key=lambda x: x[0])
if quality == "f": # 最大画質
return video_candidates[-1][1]
req = int(quality)
best = None
# 指定画質以上で一番近いもの
for q, item in video_candidates:
if q >= req:
best = item
break
# 無ければ最大画質
return best if best else video_candidates[-1][1]
# ===== ssyoutube サーバー側結合 =====
def merge_video_on_server(video_url, audio_url, video_id, quality, video_title, nonce):
"""
ssyoutubeサーバーで動画と音声を結合する
"""
# IDと画質からリクエストIDを生成
request_id = f"{video_id}_{quality}p"
# リクエストデータの構築
request_data = {
"id": request_id,
"ttl": 3600000,
"inputs": [
{
"url": video_url,
"ext": "mp4",
"chunkDownload": {
"type": "header",
"size": 52428800,
"concurrency": 3
}
},
{
"url": audio_url,
"ext": "m4a"
}
],
"output": {
"ext": "mp4",
"downloadName": f"{video_title}_{quality}p.mp4",
"chunkUpload": {
"size": 104857600,
"concurrency": 3
}
},
"operation": {
"type": "replace_audio_in_video"
}
}
# 結合開始リクエスト
payload = {
"action": "process_video_merge",
"nonce": nonce,
"request_data": json.dumps(request_data)
}
headers = {
"User-Agent": "Mozilla/5.0",
"Referer": "https://ssyoutube.online/",
"Origin": "https://ssyoutube.online",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
res = requests.post(
"https://ssyoutube.online/wp-admin/admin-ajax.php",
data=payload,
headers=headers,
timeout=60
)
res.raise_for_status()
result = res.json()
if not result.get("success") or not result.get("data", {}).get("success"):
raise RuntimeError(f"結合開始に失敗しました: {result}")
# ステータス監視用URLを取得
monitor_url = result["data"]["result"]["monitor"]["http"]
# 完了するまでポーリング
max_attempts = 5 # 最大60回試行(30秒間隔で約30分)
attempt = 0
while attempt < max_attempts:
time.sleep(20) # 30秒間隔でチェック
status_res = requests.get(monitor_url, timeout=30)
status_res.raise_for_status()
status_data = status_res.json()
if not status_data.get("success"):
raise RuntimeError(f"ステータス取得に失敗しました: {status_data}")
status = status_data["result"]["status"]
if status == "done":
# 完了したらダウンロードURLを返す
return status_data["result"]["output"]["url"]
elif status == "error":
error_msg = status_data["result"]["error"]
raise RuntimeError(f"結合処理でエラーが発生しました: {error_msg}")
# "processing" の場合は継続
attempt += 1
print(f"結合処理中... 進捗: {status_data['result'].get('progress_in_percent', 0)}%")
raise RuntimeError("結合処理がタイムアウトしました")
def send_to_channel(text):
payload = {
"requestId": f"desk-web-{int(time.time() * 1000)}",
"blocks": [
{"type": "text", "value": text}
],
}
res = requests.post(
POST_URL,
headers=HEADERS_POST,
data=json.dumps(payload),
timeout=30
)
res.raise_for_status()
def upload_file_to_channel(file_url):
# mp4バイナリ取得
file_res = requests.get(file_url, timeout=60)
file_res.raise_for_status()
upload_url = "https://media.channel.io/cht/v1/pri-file/200605/groups/519217/message/send_yt_video_file.mp4"
headers = {
"x-account": X_ACCOUNT,
"Content-Type": "video/mp4",
"Content-Length": str(len(file_res.content)),
}
res = requests.post(
upload_url,
headers=headers,
data=file_res.content,
timeout=60
)
res.raise_for_status()
return res.json() # ←これを messages POST の body.files に入れる
def send_video_message(file_json):
request_id = f"desk-web-{int(time.time() * 1000)}"
payload = {
"requestId": request_id,
"blocks": [
{"type": "text", "value": "プレビュー:"}
],
"files": [file_json], # ←ここが重要!
}
res = requests.post(
POST_URL,
headers=HEADERS_POST,
data=json.dumps(payload),
timeout=30
)
res.raise_for_status()
# ===== Main =====
def main():
processed_messages = set() # 処理済みメッセージを追跡
while True:
try:
res = requests.get(
GET_URL,
headers=HEADERS_GET,
params=PARAMS,
timeout=30,
)
res.raise_for_status()
messages = res.json().get("messages", [])
latest_msg = None
latest_time = None
for msg in messages:
msg_id = msg.get("id")
plain_text = msg.get("plainText")
updated_at = msg.get("updatedAt")
if not plain_text or updated_at is None:
continue
# 既に処理済みのメッセージはスキップ
if msg_id in processed_messages:
continue
t = parse_updated_at(updated_at)
if not t:
continue
if latest_time is None or t > latest_time:
latest_time = t
latest_msg = msg
if not latest_msg:
time.sleep(10)
continue
video_id, option = extract_youtube_and_option(latest_msg["plainText"])
if not video_id:
time.sleep(10)
continue
valid_options = ["144", "240", "360", "480", "720", "1080", "1440", "2160", "4320", "4k", "8k", "2k", "a", "f"]
# 画質入力チェックを normalize_quality の前にする
if option is None:
send_to_channel(
"画質が指定されていないため、デフォルトの720pを使用します。\n"
"指定する場合は、動画URLの後にスペースを入れて、"
"144/240/360/480/720/1080/2k(または1440)/4k(または2160)/8k(または4320)/a(結合しない)/f(最高画質) のいずれかを入力してください。\n注意:最大画質や大きい画質、長い動画だと処理に時間がかかるからタイムアウトしやすいです。"
)
else:
opt_lower = option.lower()
if opt_lower not in valid_options:
send_to_channel(
"不明な画質、または予期しない形式でした。\n"
"対応画質: 144, 240, 360, 480, 720, 1080, "
"2k(1440), 4k(2160), 8k(4320), a(結合無効), f(最大画質)\n"
"動画のURLの後にスペースを入れて指定してください。\n"
"例: https://youtu.be/abcdEFGHijk 1080\n注意:最大画質や大きい画質、長い動画だと処理に時間がかかるからタイムアウトしやすいです。"
)
processed_messages.add(latest_msg["id"])
continue
quality_opt = normalize_quality(option)
youtube_url = f"https://www.youtube.com/watch?v={video_id}"
send_to_channel(f"{video_id} のダウンロードを開始します。")
video_title = get_video_title(youtube_url)
items, nonce = fetch_download_links(youtube_url)
all_items = items[:]
audio_items = [i for i in items if i["quality"] == "audio"]
selected_video = None
selected_audio = audio_items[0] if audio_items else None
if quality_opt not in ["a"]:
selected_video = select_quality_video(items, quality_opt)
# ダウンロードリスト送信
message_lines = []
message_lines.append(f"<b>{video_title}</b>\n")
message_lines.append("<b>ダウンロードリンク</b>")
# すべての動画リンクを追加
for item in all_items:
quality = item["quality"]
if item["quality"] == "audio":
quality_display = "音声"
elif item["has_audio"] == "true":
quality_display = f"{quality} (音声付き)"
else:
quality_display = f"{quality} (映像のみ)"
message_lines.append(f"<link type=\"url\" value=\"{item['url']}\"> {quality_display} </link>")
message_lines.append("★動画ファイルと音声ファイルをダウンロードして<link type=\"url\" value=\"https://qooly.com/ja/merge-video-and-audio-online\">ここ</link>などで結合すると早くて確実です。")
send_to_channel("\n".join(message_lines))
# ★ 結合フラグ
do_merge = quality_opt not in ["a"]
if do_merge and selected_video and selected_audio:
m = re.match(r"(\d+)p", selected_video["quality"])
if m:
q = m.group(1)
send_to_channel(f"{q}pで音声との結合を開始しました。")
try:
merged_url = merge_video_on_server(
selected_video["url"],
selected_audio["url"],
video_id,
q,
video_title,
nonce
)
send_to_channel(f"<link type=\"url\" value=\"{merged_url}\">{q}p 結合済み動画</link>")
file_json = upload_file_to_channel(merged_url)
send_video_message(file_json)
except Exception as e:
send_to_channel(f"結合失敗: {e}")
send_to_channel("完了しました!")
processed_messages.add(latest_msg["id"])
print(f"送信完了: {video_id}")
except Exception as e:
print("エラー:", e)
import traceback
traceback.print_exc()
time.sleep(15)
if __name__ == "__main__":
main()