import os import re import time import json import requests from datetime import datetime, timezone from bs4 import BeautifulSoup # ===== Channel.io 設定 ===== GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages" POST_URL = GET_URL PARAMS = { "sortOrder": "desc", "limit": 36, "logFolded": "false", } X_ACCOUNT = os.getenv("channeliotokenbot2") if not X_ACCOUNT: raise RuntimeError("環境変数 channeliotokenbot2 が設定されていません") HEADERS_GET = { "accept": "application/json", "accept-language": "ja", "x-account": X_ACCOUNT, } HEADERS_POST = { "accept": "application/json", "accept-language": "ja", "content-type": "application/json", "x-account": X_ACCOUNT, } # ===== ssyoutube ===== SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/" # ===== Utils ===== def parse_updated_at(value): if isinstance(value, (int, float)): return datetime.fromtimestamp(value / 1000, tz=timezone.utc) elif isinstance(value, str): return datetime.fromisoformat(value.replace("Z", "+00:00")) return None def extract_youtube_id(text): patterns = [ # クエリパラメータ v= または vi= r"(?:[?&](?:v|vi)=)([A-Za-z0-9_-]{11})", # youtu.be 短縮リンク r"youtu\.be/([A-Za-z0-9_-]{11})", # /embed/ /v/ /shorts/ r"youtube(?:-nocookie)?\.com/(?:embed|v|shorts)/([A-Za-z0-9_-]{11})", # watch? の中に v= が含まれるケース(ioパラメータ等が混ざっても OK) r"youtube(?:-nocookie)?\.com/watch\?.*?v=([A-Za-z0-9_-]{11})", # hash や他パラメータに混ざるケース: ...#v=xxxx or &v=xxxx r"[#&]v=([A-Za-z0-9_-]{11})" ] for p in patterns: m = re.search(p, text) if m: return m.group(1) return None def extract_youtube_and_option(text): """ YouTube URL と画質指定オプションを抽出する 戻り値: (video_id, option) 例: ("abcdEFGHijk", "720") """ # 全角スペースも対象に parts = re.split(r"[ \u3000]+", text.strip()) video_id = extract_youtube_id(parts[0]) option = None if len(parts) > 1: option = parts[1].lower() return video_id, option def normalize_quality(option): """ ★ ユーザー入力を内部品質コードへ変換(全角も対応) """ if option is None: return "720" # 全角 → 半角 に変換 # 例: "720" → "720", "4k" → "4k" trans_table = str.maketrans( "0123456789kKpP", "0123456789kKpP" ) option = option.translate(trans_table).lower() quality_map = { "144": "144", "240": "240", "360": "360", "480": "480", "720": "720", "1080": "1080", "2k": "1440", "1440": "1440", "4k": "2160", "2160": "2160", "8k": "4320", "4320": "4320", "144p": "144", "240p": "240", "360p": "360", "480p": "480", "720p": "720", "1080p": "1080", "2kp": "1440", "1440p": "1440", "4kp": "2160", "2160p": "2160", "8kp": "4320", "4320p": "4320", } if option in quality_map: return quality_map[option] if option in ["a", "f"]: return option # 特殊指定 return "720" # デフォルト def get_video_title(youtube_url): """YouTube動画のタイトルを取得する簡易的な関数""" try: res = requests.get(youtube_url, timeout=10) soup = BeautifulSoup(res.text, "lxml") title_tag = soup.find("meta", property="og:title") if title_tag: return title_tag.get("content", "").replace(" - YouTube", "") except: pass return "YouTube動画" # ===== ssyoutube HTML 解析 ===== def fetch_download_links(youtube_url): res = requests.post( SSYOUTUBE_URL, data={"videoURL": youtube_url}, timeout=20, headers={ "User-Agent": "Mozilla/5.0", "Referer": "https://ssyoutube.online/", } ) res.raise_for_status() # nonceを抽出 soup = BeautifulSoup(res.text, "lxml") nonce_match = re.search(r"formData\.append\('nonce', '([^']+)'\);", res.text) nonce = nonce_match.group(1) if nonce_match else None buttons = soup.select("button[data-url]") results = [] for btn in buttons: url = btn.get("data-url") quality = btn.get("data-quality") has_audio = btn.get("data-has-audio") if not url: continue results.append({ "url": url, "quality": quality or "audio", "has_audio": has_audio, }) return results, nonce # ===== 最高画質映像 + 音声を選択 ===== def select_best_video_and_audio(items): video_items = [] audio_items = [] for item in items: if item["quality"] == "audio": audio_items.append(item) elif item["has_audio"] == "false": m = re.match(r"(\d+)p", item["quality"]) if m: video_items.append((int(m.group(1)), item)) if not video_items or not audio_items: return None, None # 最高画質の動画を選択 video_items.sort(key=lambda x: x[0], reverse=True) best_video = video_items[0][1] # 最高音質の音声を選択(例: 128kbps > 64kbps) # 実際の音声品質情報に基づいて選択する場合は要修正 best_audio = audio_items[0] if audio_items else None return best_video, best_audio def select_quality_video(items, quality): """ ★ 指定画質に最適な動画を選択(音声なし映像のみ) """ video_candidates = [] for item in items: if item["has_audio"] == "false": m = re.match(r"(\d+)p", item["quality"]) if m: q = int(m.group(1)) video_candidates.append((q, item)) if not video_candidates: return None # 最大画質 video_candidates.sort(key=lambda x: x[0]) if quality == "f": # 最大画質 return video_candidates[-1][1] req = int(quality) best = None # 指定画質以上で一番近いもの for q, item in video_candidates: if q >= req: best = item break # 無ければ最大画質 return best if best else video_candidates[-1][1] # ===== ssyoutube サーバー側結合 ===== def merge_video_on_server(video_url, audio_url, video_id, quality, video_title, nonce): """ ssyoutubeサーバーで動画と音声を結合する """ # IDと画質からリクエストIDを生成 request_id = f"{video_id}_{quality}p" # リクエストデータの構築 request_data = { "id": request_id, "ttl": 3600000, "inputs": [ { "url": video_url, "ext": "mp4", "chunkDownload": { "type": "header", "size": 52428800, "concurrency": 3 } }, { "url": audio_url, "ext": "m4a" } ], "output": { "ext": "mp4", "downloadName": f"{video_title}_{quality}p.mp4", "chunkUpload": { "size": 104857600, "concurrency": 3 } }, "operation": { "type": "replace_audio_in_video" } } # 結合開始リクエスト payload = { "action": "process_video_merge", "nonce": nonce, "request_data": json.dumps(request_data) } headers = { "User-Agent": "Mozilla/5.0", "Referer": "https://ssyoutube.online/", "Origin": "https://ssyoutube.online", "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", } res = requests.post( "https://ssyoutube.online/wp-admin/admin-ajax.php", data=payload, headers=headers, timeout=60 ) res.raise_for_status() result = res.json() if not result.get("success") or not result.get("data", {}).get("success"): raise RuntimeError(f"結合開始に失敗しました: {result}") # ステータス監視用URLを取得 monitor_url = result["data"]["result"]["monitor"]["http"] # 完了するまでポーリング max_attempts = 5 # 最大60回試行(30秒間隔で約30分) attempt = 0 while attempt < max_attempts: time.sleep(20) # 30秒間隔でチェック status_res = requests.get(monitor_url, timeout=30) status_res.raise_for_status() status_data = status_res.json() if not status_data.get("success"): raise RuntimeError(f"ステータス取得に失敗しました: {status_data}") status = status_data["result"]["status"] if status == "done": # 完了したらダウンロードURLを返す return status_data["result"]["output"]["url"] elif status == "error": error_msg = status_data["result"]["error"] raise RuntimeError(f"結合処理でエラーが発生しました: {error_msg}") # "processing" の場合は継続 attempt += 1 print(f"結合処理中... 進捗: {status_data['result'].get('progress_in_percent', 0)}%") raise RuntimeError("結合処理がタイムアウトしました") def send_to_channel(text): payload = { "requestId": f"desk-web-{int(time.time() * 1000)}", "blocks": [ {"type": "text", "value": text} ], } res = requests.post( POST_URL, headers=HEADERS_POST, data=json.dumps(payload), timeout=30 ) res.raise_for_status() def upload_file_to_channel(file_url): # mp4バイナリ取得 file_res = requests.get(file_url, timeout=60) file_res.raise_for_status() upload_url = "https://media.channel.io/cht/v1/pri-file/200605/groups/519217/message/send_yt_video_file.mp4" headers = { "x-account": X_ACCOUNT, "Content-Type": "video/mp4", "Content-Length": str(len(file_res.content)), } res = requests.post( upload_url, headers=headers, data=file_res.content, timeout=60 ) res.raise_for_status() return res.json() # ←これを messages POST の body.files に入れる def send_video_message(file_json): request_id = f"desk-web-{int(time.time() * 1000)}" payload = { "requestId": request_id, "blocks": [ {"type": "text", "value": "プレビュー:"} ], "files": [file_json], # ←ここが重要! } res = requests.post( POST_URL, headers=HEADERS_POST, data=json.dumps(payload), timeout=30 ) res.raise_for_status() # ===== Main ===== def main(): processed_messages = set() # 処理済みメッセージを追跡 while True: try: res = requests.get( GET_URL, headers=HEADERS_GET, params=PARAMS, timeout=30, ) res.raise_for_status() messages = res.json().get("messages", []) latest_msg = None latest_time = None for msg in messages: msg_id = msg.get("id") plain_text = msg.get("plainText") updated_at = msg.get("updatedAt") if not plain_text or updated_at is None: continue # 既に処理済みのメッセージはスキップ if msg_id in processed_messages: continue t = parse_updated_at(updated_at) if not t: continue if latest_time is None or t > latest_time: latest_time = t latest_msg = msg if not latest_msg: time.sleep(10) continue video_id, option = extract_youtube_and_option(latest_msg["plainText"]) if not video_id: time.sleep(10) continue valid_options = ["144", "240", "360", "480", "720", "1080", "1440", "2160", "4320", "4k", "8k", "2k", "a", "f"] # 画質入力チェックを normalize_quality の前にする if option is None: send_to_channel( "画質が指定されていないため、デフォルトの720pを使用します。\n" "指定する場合は、動画URLの後にスペースを入れて、" "144/240/360/480/720/1080/2k(または1440)/4k(または2160)/8k(または4320)/a(結合しない)/f(最高画質) のいずれかを入力してください。\n注意:最大画質や大きい画質、長い動画だと処理に時間がかかるからタイムアウトしやすいです。" ) else: opt_lower = option.lower() if opt_lower not in valid_options: send_to_channel( "不明な画質、または予期しない形式でした。\n" "対応画質: 144, 240, 360, 480, 720, 1080, " "2k(1440), 4k(2160), 8k(4320), a(結合無効), f(最大画質)\n" "動画のURLの後にスペースを入れて指定してください。\n" "例: https://youtu.be/abcdEFGHijk 1080\n注意:最大画質や大きい画質、長い動画だと処理に時間がかかるからタイムアウトしやすいです。" ) processed_messages.add(latest_msg["id"]) continue quality_opt = normalize_quality(option) youtube_url = f"https://www.youtube.com/watch?v={video_id}" send_to_channel(f"{video_id} のダウンロードを開始します。") video_title = get_video_title(youtube_url) items, nonce = fetch_download_links(youtube_url) all_items = items[:] audio_items = [i for i in items if i["quality"] == "audio"] selected_video = None selected_audio = audio_items[0] if audio_items else None if quality_opt not in ["a"]: selected_video = select_quality_video(items, quality_opt) # ダウンロードリスト送信 message_lines = [] message_lines.append(f"{video_title}\n") message_lines.append("ダウンロードリンク") # すべての動画リンクを追加 for item in all_items: quality = item["quality"] if item["quality"] == "audio": quality_display = "音声" elif item["has_audio"] == "true": quality_display = f"{quality} (音声付き)" else: quality_display = f"{quality} (映像のみ)" message_lines.append(f" {quality_display} ") message_lines.append("★動画ファイルと音声ファイルをダウンロードしてここなどで結合すると早くて確実です。") send_to_channel("\n".join(message_lines)) # ★ 結合フラグ do_merge = quality_opt not in ["a"] if do_merge and selected_video and selected_audio: m = re.match(r"(\d+)p", selected_video["quality"]) if m: q = m.group(1) send_to_channel(f"{q}pで音声との結合を開始しました。") try: merged_url = merge_video_on_server( selected_video["url"], selected_audio["url"], video_id, q, video_title, nonce ) send_to_channel(f"{q}p 結合済み動画") file_json = upload_file_to_channel(merged_url) send_video_message(file_json) except Exception as e: send_to_channel(f"結合失敗: {e}") send_to_channel("完了しました!") processed_messages.add(latest_msg["id"]) print(f"送信完了: {video_id}") except Exception as e: print("エラー:", e) import traceback traceback.print_exc() time.sleep(15) if __name__ == "__main__": main()