import os
import re
import time
import json
import requests
from datetime import datetime, timezone
from bs4 import BeautifulSoup
# ===== Channel.io 設定 =====
GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages"
POST_URL = GET_URL
PARAMS = {
"sortOrder": "desc",
"limit": 36,
"logFolded": "false",
}
X_ACCOUNT = os.getenv("channeliotokenbot2")
if not X_ACCOUNT:
raise RuntimeError("環境変数 channeliotokenbot2 が設定されていません")
HEADERS_GET = {
"accept": "application/json",
"accept-language": "ja",
"x-account": X_ACCOUNT,
}
HEADERS_POST = {
"accept": "application/json",
"accept-language": "ja",
"content-type": "application/json",
"x-account": X_ACCOUNT,
}
# ===== ssyoutube =====
SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/"
# ===== Utils =====
def parse_updated_at(value):
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value / 1000, tz=timezone.utc)
elif isinstance(value, str):
return datetime.fromisoformat(value.replace("Z", "+00:00"))
return None
def extract_youtube_id(text):
_patterns = [
# クエリパラメータ v= または vi=
r"(?:[?&](?:v|vi)=)([A-Za-z0-9_-]{11})",
# youtu.be 短縮リンク
r"youtu\.be/([A-Za-z0-9_-]{11})",
# /embed/ /v/ /shorts/
r"youtube(?:-nocookie)?\.com/(?:embed|v|shorts)/([A-Za-z0-9_-]{11})",
# watch? の中に v= が含まれるケース(ioパラメータ等が混ざっても OK)
r"youtube(?:-nocookie)?\.com/watch\?.*?v=([A-Za-z0-9_-]{11})",
# hash や他パラメータに混ざるケース: ...#v=xxxx or &v=xxxx
r"[#&]v=([A-Za-z0-9_-]{11})"
]
for p in patterns:
m = re.search(p, text)
if m:
return m.group(1)
return None
def get_video_title(youtube_url):
"""YouTube動画のタイトルを取得する簡易的な関数"""
try:
res = requests.get(youtube_url, timeout=10)
soup = BeautifulSoup(res.text, "lxml")
title_tag = soup.find("meta", property="og:title")
if title_tag:
return title_tag.get("content", "").replace(" - YouTube", "")
except:
pass
return "YouTube動画"
# ===== ssyoutube HTML 解析 =====
def fetch_download_links(youtube_url):
res = requests.post(
SSYOUTUBE_URL,
data={"videoURL": youtube_url},
timeout=30,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": "https://ssyoutube.online/",
}
)
res.raise_for_status()
# nonceを抽出
soup = BeautifulSoup(res.text, "lxml")
nonce_match = re.search(r"formData\.append\('nonce', '([^']+)'\);", res.text)
nonce = nonce_match.group(1) if nonce_match else None
buttons = soup.select("button[data-url]")
results = []
for btn in buttons:
url = btn.get("data-url")
quality = btn.get("data-quality")
has_audio = btn.get("data-has-audio")
if not url:
continue
results.append({
"url": url,
"quality": quality or "audio",
"has_audio": has_audio,
})
return results, nonce
# ===== 最高画質映像 + 音声を選択 =====
def select_best_video_and_audio(items):
video_items = []
audio_items = []
for item in items:
if item["quality"] == "audio":
audio_items.append(item)
elif item["has_audio"] == "false":
m = re.match(r"(\d+)p", item["quality"])
if m:
video_items.append((int(m.group(1)), item))
if not video_items or not audio_items:
return None, None
# 最高画質の動画を選択
video_items.sort(key=lambda x: x[0], reverse=True)
best_video = video_items[0][1]
# 最高音質の音声を選択(例: 128kbps > 64kbps)
# 実際の音声品質情報に基づいて選択する場合は要修正
best_audio = audio_items[0] if audio_items else None
return best_video, best_audio
# ===== ssyoutube サーバー側結合 =====
def merge_video_on_server(video_url, audio_url, video_id, quality, video_title, nonce):
"""
ssyoutubeサーバーで動画と音声を結合する
"""
# IDと画質からリクエストIDを生成
request_id = f"{video_id}_{quality}p"
# リクエストデータの構築
request_data = {
"id": request_id,
"ttl": 3600000,
"inputs": [
{
"url": video_url,
"ext": "mp4",
"chunkDownload": {
"type": "header",
"size": 52428800,
"concurrency": 3
}
},
{
"url": audio_url,
"ext": "m4a"
}
],
"output": {
"ext": "mp4",
"downloadName": f"{video_title}_{quality}p.mp4",
"chunkUpload": {
"size": 104857600,
"concurrency": 3
}
},
"operation": {
"type": "replace_audio_in_video"
}
}
# 結合開始リクエスト
payload = {
"action": "process_video_merge",
"nonce": nonce,
"request_data": json.dumps(request_data)
}
headers = {
"User-Agent": "Mozilla/5.0",
"Referer": "https://ssyoutube.online/",
"Origin": "https://ssyoutube.online",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
res = requests.post(
"https://ssyoutube.online/wp-admin/admin-ajax.php",
data=payload,
headers=headers,
timeout=30
)
res.raise_for_status()
result = res.json()
if not result.get("success") or not result.get("data", {}).get("success"):
raise RuntimeError(f"結合開始に失敗しました: {result}")
# ステータス監視用URLを取得
monitor_url = result["data"]["result"]["monitor"]["http"]
# 完了するまでポーリング
max_attempts = 60 # 最大60回試行(30秒間隔で約30分)
attempt = 0
while attempt < max_attempts:
time.sleep(30) # 30秒間隔でチェック
status_res = requests.get(monitor_url, timeout=30)
status_res.raise_for_status()
status_data = status_res.json()
if not status_data.get("success"):
raise RuntimeError(f"ステータス取得に失敗しました: {status_data}")
status = status_data["result"]["status"]
if status == "done":
# 完了したらダウンロードURLを返す
return status_data["result"]["output"]["url"]
elif status == "error":
error_msg = status_data["result"]["error"]
raise RuntimeError(f"結合処理でエラーが発生しました: {error_msg}")
# "processing" の場合は継続
attempt += 1
print(f"結合処理中... 進捗: {status_data['result'].get('progress_in_percent', 0)}%")
raise RuntimeError("結合処理がタイムアウトしました")
def send_to_channel(text):
payload = {
"requestId": f"desk-web-{int(time.time() * 1000)}",
"blocks": [
{"type": "text", "value": text}
],
}
res = requests.post(
POST_URL,
headers=HEADERS_POST,
data=json.dumps(payload),
timeout=30
)
res.raise_for_status()
# ===== Main =====
def main():
processed_messages = set() # 処理済みメッセージを追跡
while True:
try:
res = requests.get(
GET_URL,
headers=HEADERS_GET,
params=PARAMS,
timeout=30,
)
res.raise_for_status()
messages = res.json().get("messages", [])
latest_msg = None
latest_time = None
for msg in messages:
msg_id = msg.get("id")
plain_text = msg.get("plainText")
updated_at = msg.get("updatedAt")
if not plain_text or updated_at is None:
continue
# 既に処理済みのメッセージはスキップ
if msg_id in processed_messages:
continue
t = parse_updated_at(updated_at)
if not t:
continue
if latest_time is None or t > latest_time:
latest_time = t
latest_msg = msg
if not latest_msg:
time.sleep(10)
continue
youtube_id = extract_youtube_id(latest_msg["plainText"])
if not youtube_id:
time.sleep(10)
continue
youtube_url = f"https://www.youtube.com/watch?v={youtube_id}"
send_to_channel(f"{youtube_id}のダウンロードを開始します。")
# 動画タイトルを取得
video_title = get_video_title(youtube_url)
items, nonce = fetch_download_links(youtube_url)
if not nonce:
print("nonceの取得に失敗しました")
time.sleep(10)
continue
# すべてのダウンロードリンクを収集
video_items = []
audio_items = []
all_items = []
for item in items:
all_items.append(item)
if item["quality"] == "audio":
audio_items.append(item)
elif item["has_audio"] == "false":
video_items.append(item)
# 最高画質の動画と音声を選択
best_video, best_audio = select_best_video_and_audio(items)
# メッセージを構築
message_lines = []
message_lines.append(f"{video_title}\n")
message_lines.append("ダウンロードリンク")
# すべての動画リンクを追加
for item in all_items:
quality = item["quality"]
if item["quality"] == "audio":
quality_display = "音声"
elif item["has_audio"] == "true":
quality_display = f"{quality} (音声付き)"
else:
quality_display = f"{quality} (映像のみ)"
message_lines.append(f" {quality_display} ")
# 最高画質の結合動画を作成
if best_video and best_audio and nonce:
message_lines.append("\n 最高画質結合動画")
# 画質情報を取得
quality_match = re.match(r"(\d+)p", best_video["quality"])
if quality_match:
quality = quality_match.group(1)
# サーバー側で結合
print(f"サーバー側での結合を開始します: {youtube_id} {quality}p")
try:
merged_url = merge_video_on_server(
best_video["url"],
best_audio["url"],
youtube_id,
quality,
video_title,
nonce
)
message_lines.append(f"{quality}p 結合済み動画")
except Exception as e:
print(f"結合処理エラー: {e}")
message_lines.append(f"結合処理に失敗: {str(e)}")
message_lines.append(f"ダウンロードが完了しました。\nAPIの変更により、一次的に使用できなくなっておりました。今後は維持していけるように頑張ります。\nこのツールの使い方:ダウンロードしたいYoutube動画のURLを貼り付けると、自動でボットがダウンロードURLを生成します。\nシステムについて:このシステムは、Huggingfaceのサーバーより、自動で送信されています。本人がいなくても多分いつでも使えます。動画は、最高画質のものに音声を付けるようにしています。音声を付ける処理に時間がかかりますが、この方法しかありませんでした。")
# メッセージを送信
message = "\n".join(message_lines)
send_to_channel(message)
# 処理済みとしてマーク
processed_messages.add(latest_msg.get("id"))
print(f"送信完了: {youtube_id}")
except Exception as e:
print("エラー:", e)
import traceback
traceback.print_exc()
time.sleep(15)
if __name__ == "__main__":
main()