import os import re import time import json import requests from datetime import datetime, timezone from bs4 import BeautifulSoup # ===== Channel.io 設定 ===== GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages" POST_URL = GET_URL PARAMS = { "sortOrder": "desc", "limit": 36, "logFolded": "false", } X_ACCOUNT = os.getenv("channeliotokenbot2") if not X_ACCOUNT: raise RuntimeError("環境変数 channeliotokenbot2 が設定されていません") HEADERS_GET = { "accept": "application/json", "accept-language": "ja", "x-account": X_ACCOUNT, } HEADERS_POST = { "accept": "application/json", "accept-language": "ja", "content-type": "application/json", "x-account": X_ACCOUNT, } # ===== ssyoutube ===== SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/" # ===== Utils ===== def parse_updated_at(value): if isinstance(value, (int, float)): return datetime.fromtimestamp(value / 1000, tz=timezone.utc) elif isinstance(value, str): return datetime.fromisoformat(value.replace("Z", "+00:00")) return None def extract_youtube_id(text): _patterns = [ # クエリパラメータ v= または vi= r"(?:[?&](?:v|vi)=)([A-Za-z0-9_-]{11})", # youtu.be 短縮リンク r"youtu\.be/([A-Za-z0-9_-]{11})", # /embed/ /v/ /shorts/ r"youtube(?:-nocookie)?\.com/(?:embed|v|shorts)/([A-Za-z0-9_-]{11})", # watch? の中に v= が含まれるケース(ioパラメータ等が混ざっても OK) r"youtube(?:-nocookie)?\.com/watch\?.*?v=([A-Za-z0-9_-]{11})", # hash や他パラメータに混ざるケース: ...#v=xxxx or &v=xxxx r"[#&]v=([A-Za-z0-9_-]{11})" ] for p in patterns: m = re.search(p, text) if m: return m.group(1) return None def get_video_title(youtube_url): """YouTube動画のタイトルを取得する簡易的な関数""" try: res = requests.get(youtube_url, timeout=10) soup = BeautifulSoup(res.text, "lxml") title_tag = soup.find("meta", property="og:title") if title_tag: return title_tag.get("content", "").replace(" - YouTube", "") except: pass return "YouTube動画" # ===== ssyoutube HTML 解析 ===== def fetch_download_links(youtube_url): res = requests.post( SSYOUTUBE_URL, data={"videoURL": youtube_url}, timeout=30, headers={ "User-Agent": "Mozilla/5.0", "Referer": "https://ssyoutube.online/", } ) res.raise_for_status() # nonceを抽出 soup = BeautifulSoup(res.text, "lxml") nonce_match = re.search(r"formData\.append\('nonce', '([^']+)'\);", res.text) nonce = nonce_match.group(1) if nonce_match else None buttons = soup.select("button[data-url]") results = [] for btn in buttons: url = btn.get("data-url") quality = btn.get("data-quality") has_audio = btn.get("data-has-audio") if not url: continue results.append({ "url": url, "quality": quality or "audio", "has_audio": has_audio, }) return results, nonce # ===== 最高画質映像 + 音声を選択 ===== def select_best_video_and_audio(items): video_items = [] audio_items = [] for item in items: if item["quality"] == "audio": audio_items.append(item) elif item["has_audio"] == "false": m = re.match(r"(\d+)p", item["quality"]) if m: video_items.append((int(m.group(1)), item)) if not video_items or not audio_items: return None, None # 最高画質の動画を選択 video_items.sort(key=lambda x: x[0], reverse=True) best_video = video_items[0][1] # 最高音質の音声を選択(例: 128kbps > 64kbps) # 実際の音声品質情報に基づいて選択する場合は要修正 best_audio = audio_items[0] if audio_items else None return best_video, best_audio # ===== ssyoutube サーバー側結合 ===== def merge_video_on_server(video_url, audio_url, video_id, quality, video_title, nonce): """ ssyoutubeサーバーで動画と音声を結合する """ # IDと画質からリクエストIDを生成 request_id = f"{video_id}_{quality}p" # リクエストデータの構築 request_data = { "id": request_id, "ttl": 3600000, "inputs": [ { "url": video_url, "ext": "mp4", "chunkDownload": { "type": "header", "size": 52428800, "concurrency": 3 } }, { "url": audio_url, "ext": "m4a" } ], "output": { "ext": "mp4", "downloadName": f"{video_title}_{quality}p.mp4", "chunkUpload": { "size": 104857600, "concurrency": 3 } }, "operation": { "type": "replace_audio_in_video" } } # 結合開始リクエスト payload = { "action": "process_video_merge", "nonce": nonce, "request_data": json.dumps(request_data) } headers = { "User-Agent": "Mozilla/5.0", "Referer": "https://ssyoutube.online/", "Origin": "https://ssyoutube.online", "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", } res = requests.post( "https://ssyoutube.online/wp-admin/admin-ajax.php", data=payload, headers=headers, timeout=30 ) res.raise_for_status() result = res.json() if not result.get("success") or not result.get("data", {}).get("success"): raise RuntimeError(f"結合開始に失敗しました: {result}") # ステータス監視用URLを取得 monitor_url = result["data"]["result"]["monitor"]["http"] # 完了するまでポーリング max_attempts = 60 # 最大60回試行(30秒間隔で約30分) attempt = 0 while attempt < max_attempts: time.sleep(30) # 30秒間隔でチェック status_res = requests.get(monitor_url, timeout=30) status_res.raise_for_status() status_data = status_res.json() if not status_data.get("success"): raise RuntimeError(f"ステータス取得に失敗しました: {status_data}") status = status_data["result"]["status"] if status == "done": # 完了したらダウンロードURLを返す return status_data["result"]["output"]["url"] elif status == "error": error_msg = status_data["result"]["error"] raise RuntimeError(f"結合処理でエラーが発生しました: {error_msg}") # "processing" の場合は継続 attempt += 1 print(f"結合処理中... 進捗: {status_data['result'].get('progress_in_percent', 0)}%") raise RuntimeError("結合処理がタイムアウトしました") def send_to_channel(text): payload = { "requestId": f"desk-web-{int(time.time() * 1000)}", "blocks": [ {"type": "text", "value": text} ], } res = requests.post( POST_URL, headers=HEADERS_POST, data=json.dumps(payload), timeout=30 ) res.raise_for_status() # ===== Main ===== def main(): processed_messages = set() # 処理済みメッセージを追跡 while True: try: res = requests.get( GET_URL, headers=HEADERS_GET, params=PARAMS, timeout=30, ) res.raise_for_status() messages = res.json().get("messages", []) latest_msg = None latest_time = None for msg in messages: msg_id = msg.get("id") plain_text = msg.get("plainText") updated_at = msg.get("updatedAt") if not plain_text or updated_at is None: continue # 既に処理済みのメッセージはスキップ if msg_id in processed_messages: continue t = parse_updated_at(updated_at) if not t: continue if latest_time is None or t > latest_time: latest_time = t latest_msg = msg if not latest_msg: time.sleep(10) continue youtube_id = extract_youtube_id(latest_msg["plainText"]) if not youtube_id: time.sleep(10) continue youtube_url = f"https://www.youtube.com/watch?v={youtube_id}" send_to_channel(f"{youtube_id}のダウンロードを開始します。") # 動画タイトルを取得 video_title = get_video_title(youtube_url) items, nonce = fetch_download_links(youtube_url) if not nonce: print("nonceの取得に失敗しました") time.sleep(10) continue # すべてのダウンロードリンクを収集 video_items = [] audio_items = [] all_items = [] for item in items: all_items.append(item) if item["quality"] == "audio": audio_items.append(item) elif item["has_audio"] == "false": video_items.append(item) # 最高画質の動画と音声を選択 best_video, best_audio = select_best_video_and_audio(items) # メッセージを構築 message_lines = [] message_lines.append(f"{video_title}\n") message_lines.append("ダウンロードリンク") # すべての動画リンクを追加 for item in all_items: quality = item["quality"] if item["quality"] == "audio": quality_display = "音声" elif item["has_audio"] == "true": quality_display = f"{quality} (音声付き)" else: quality_display = f"{quality} (映像のみ)" message_lines.append(f" {quality_display} ") # 最高画質の結合動画を作成 if best_video and best_audio and nonce: message_lines.append("\n 最高画質結合動画") # 画質情報を取得 quality_match = re.match(r"(\d+)p", best_video["quality"]) if quality_match: quality = quality_match.group(1) # サーバー側で結合 print(f"サーバー側での結合を開始します: {youtube_id} {quality}p") try: merged_url = merge_video_on_server( best_video["url"], best_audio["url"], youtube_id, quality, video_title, nonce ) message_lines.append(f"{quality}p 結合済み動画") except Exception as e: print(f"結合処理エラー: {e}") message_lines.append(f"結合処理に失敗: {str(e)}") message_lines.append(f"ダウンロードが完了しました。\nAPIの変更により、一次的に使用できなくなっておりました。今後は維持していけるように頑張ります。\nこのツールの使い方:ダウンロードしたいYoutube動画のURLを貼り付けると、自動でボットがダウンロードURLを生成します。\nシステムについて:このシステムは、Huggingfaceのサーバーより、自動で送信されています。本人がいなくても多分いつでも使えます。動画は、最高画質のものに音声を付けるようにしています。音声を付ける処理に時間がかかりますが、この方法しかありませんでした。") # メッセージを送信 message = "\n".join(message_lines) send_to_channel(message) # 処理済みとしてマーク processed_messages.add(latest_msg.get("id")) print(f"送信完了: {youtube_id}") except Exception as e: print("エラー:", e) import traceback traceback.print_exc() time.sleep(15) if __name__ == "__main__": main()