Spaces:
Running
Running
| import os | |
| import re | |
| import time | |
| import json | |
| import requests | |
| from datetime import datetime, timezone | |
| from bs4 import BeautifulSoup | |
| # ===== Channel.io 設定 ===== | |
| GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages" | |
| POST_URL = GET_URL | |
| PARAMS = { | |
| "sortOrder": "desc", | |
| "limit": 36, | |
| "logFolded": "false", | |
| } | |
| X_ACCOUNT = os.getenv("channeliotokenbot2") | |
| if not X_ACCOUNT: | |
| raise RuntimeError("環境変数 channeliotokenbot2 が設定されていません") | |
| HEADERS_GET = { | |
| "accept": "application/json", | |
| "accept-language": "ja", | |
| "x-account": X_ACCOUNT, | |
| } | |
| HEADERS_POST = { | |
| "accept": "application/json", | |
| "accept-language": "ja", | |
| "content-type": "application/json", | |
| "x-account": X_ACCOUNT, | |
| } | |
| # ===== ssyoutube ===== | |
| SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/" | |
| # ===== Utils ===== | |
| def parse_updated_at(value): | |
| if isinstance(value, (int, float)): | |
| return datetime.fromtimestamp(value / 1000, tz=timezone.utc) | |
| elif isinstance(value, str): | |
| return datetime.fromisoformat(value.replace("Z", "+00:00")) | |
| return None | |
| def extract_youtube_id(text): | |
| patterns = [ | |
| # クエリパラメータ v= または vi= | |
| r"(?:[?&](?:v|vi)=)([A-Za-z0-9_-]{11})", | |
| # youtu.be 短縮リンク | |
| r"youtu\.be/([A-Za-z0-9_-]{11})", | |
| # /embed/ /v/ /shorts/ | |
| r"youtube(?:-nocookie)?\.com/(?:embed|v|shorts)/([A-Za-z0-9_-]{11})", | |
| # watch? の中に v= が含まれるケース(ioパラメータ等が混ざっても OK) | |
| r"youtube(?:-nocookie)?\.com/watch\?.*?v=([A-Za-z0-9_-]{11})", | |
| # hash や他パラメータに混ざるケース: ...#v=xxxx or &v=xxxx | |
| r"[#&]v=([A-Za-z0-9_-]{11})" | |
| ] | |
| for p in patterns: | |
| m = re.search(p, text) | |
| if m: | |
| return m.group(1) | |
| return None | |
| def extract_youtube_and_option(text): | |
| """ | |
| YouTube URL と画質指定オプションを抽出する | |
| 戻り値: (video_id, option) 例: ("abcdEFGHijk", "720") | |
| """ | |
| # 全角スペースも対象に | |
| parts = re.split(r"[ \u3000]+", text.strip()) | |
| video_id = extract_youtube_id(parts[0]) | |
| option = None | |
| if len(parts) > 1: | |
| option = parts[1].lower() | |
| return video_id, option | |
| def normalize_quality(option): | |
| """ | |
| ★ ユーザー入力を内部品質コードへ変換(全角も対応) | |
| """ | |
| if option is None: | |
| return "720" | |
| # 全角 → 半角 に変換 | |
| # 例: "720" → "720", "4k" → "4k" | |
| trans_table = str.maketrans( | |
| "0123456789kKpP", | |
| "0123456789kKpP" | |
| ) | |
| option = option.translate(trans_table).lower() | |
| quality_map = { | |
| "144": "144", | |
| "240": "240", | |
| "360": "360", | |
| "480": "480", | |
| "720": "720", | |
| "1080": "1080", | |
| "2k": "1440", | |
| "1440": "1440", | |
| "4k": "2160", | |
| "2160": "2160", | |
| "8k": "4320", | |
| "4320": "4320", | |
| "144p": "144", | |
| "240p": "240", | |
| "360p": "360", | |
| "480p": "480", | |
| "720p": "720", | |
| "1080p": "1080", | |
| "2kp": "1440", | |
| "1440p": "1440", | |
| "4kp": "2160", | |
| "2160p": "2160", | |
| "8kp": "4320", | |
| "4320p": "4320", | |
| } | |
| if option in quality_map: | |
| return quality_map[option] | |
| if option in ["a", "f"]: | |
| return option # 特殊指定 | |
| return "720" # デフォルト | |
| def get_video_title(youtube_url): | |
| """YouTube動画のタイトルを取得する簡易的な関数""" | |
| try: | |
| res = requests.get(youtube_url, timeout=10) | |
| soup = BeautifulSoup(res.text, "lxml") | |
| title_tag = soup.find("meta", property="og:title") | |
| if title_tag: | |
| return title_tag.get("content", "").replace(" - YouTube", "") | |
| except: | |
| pass | |
| return "YouTube動画" | |
| # ===== ssyoutube HTML 解析 ===== | |
| def fetch_download_links(youtube_url): | |
| res = requests.post( | |
| SSYOUTUBE_URL, | |
| data={"videoURL": youtube_url}, | |
| timeout=20, | |
| headers={ | |
| "User-Agent": "Mozilla/5.0", | |
| "Referer": "https://ssyoutube.online/", | |
| } | |
| ) | |
| res.raise_for_status() | |
| # nonceを抽出 | |
| soup = BeautifulSoup(res.text, "lxml") | |
| nonce_match = re.search(r"formData\.append\('nonce', '([^']+)'\);", res.text) | |
| nonce = nonce_match.group(1) if nonce_match else None | |
| buttons = soup.select("button[data-url]") | |
| results = [] | |
| for btn in buttons: | |
| url = btn.get("data-url") | |
| quality = btn.get("data-quality") | |
| has_audio = btn.get("data-has-audio") | |
| if not url: | |
| continue | |
| results.append({ | |
| "url": url, | |
| "quality": quality or "audio", | |
| "has_audio": has_audio, | |
| }) | |
| return results, nonce | |
| # ===== 最高画質映像 + 音声を選択 ===== | |
| def select_best_video_and_audio(items): | |
| video_items = [] | |
| audio_items = [] | |
| for item in items: | |
| if item["quality"] == "audio": | |
| audio_items.append(item) | |
| elif item["has_audio"] == "false": | |
| m = re.match(r"(\d+)p", item["quality"]) | |
| if m: | |
| video_items.append((int(m.group(1)), item)) | |
| if not video_items or not audio_items: | |
| return None, None | |
| # 最高画質の動画を選択 | |
| video_items.sort(key=lambda x: x[0], reverse=True) | |
| best_video = video_items[0][1] | |
| # 最高音質の音声を選択(例: 128kbps > 64kbps) | |
| # 実際の音声品質情報に基づいて選択する場合は要修正 | |
| best_audio = audio_items[0] if audio_items else None | |
| return best_video, best_audio | |
| def select_quality_video(items, quality): | |
| """ | |
| ★ 指定画質に最適な動画を選択(音声なし映像のみ) | |
| """ | |
| video_candidates = [] | |
| for item in items: | |
| if item["has_audio"] == "false": | |
| m = re.match(r"(\d+)p", item["quality"]) | |
| if m: | |
| q = int(m.group(1)) | |
| video_candidates.append((q, item)) | |
| if not video_candidates: | |
| return None | |
| # 最大画質 | |
| video_candidates.sort(key=lambda x: x[0]) | |
| if quality == "f": # 最大画質 | |
| return video_candidates[-1][1] | |
| req = int(quality) | |
| best = None | |
| # 指定画質以上で一番近いもの | |
| for q, item in video_candidates: | |
| if q >= req: | |
| best = item | |
| break | |
| # 無ければ最大画質 | |
| return best if best else video_candidates[-1][1] | |
| # ===== ssyoutube サーバー側結合 ===== | |
| def merge_video_on_server(video_url, audio_url, video_id, quality, video_title, nonce): | |
| """ | |
| ssyoutubeサーバーで動画と音声を結合する | |
| """ | |
| # IDと画質からリクエストIDを生成 | |
| request_id = f"{video_id}_{quality}p" | |
| # リクエストデータの構築 | |
| request_data = { | |
| "id": request_id, | |
| "ttl": 3600000, | |
| "inputs": [ | |
| { | |
| "url": video_url, | |
| "ext": "mp4", | |
| "chunkDownload": { | |
| "type": "header", | |
| "size": 52428800, | |
| "concurrency": 3 | |
| } | |
| }, | |
| { | |
| "url": audio_url, | |
| "ext": "m4a" | |
| } | |
| ], | |
| "output": { | |
| "ext": "mp4", | |
| "downloadName": f"{video_title}_{quality}p.mp4", | |
| "chunkUpload": { | |
| "size": 104857600, | |
| "concurrency": 3 | |
| } | |
| }, | |
| "operation": { | |
| "type": "replace_audio_in_video" | |
| } | |
| } | |
| # 結合開始リクエスト | |
| payload = { | |
| "action": "process_video_merge", | |
| "nonce": nonce, | |
| "request_data": json.dumps(request_data) | |
| } | |
| headers = { | |
| "User-Agent": "Mozilla/5.0", | |
| "Referer": "https://ssyoutube.online/", | |
| "Origin": "https://ssyoutube.online", | |
| "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", | |
| } | |
| res = requests.post( | |
| "https://ssyoutube.online/wp-admin/admin-ajax.php", | |
| data=payload, | |
| headers=headers, | |
| timeout=60 | |
| ) | |
| res.raise_for_status() | |
| result = res.json() | |
| if not result.get("success") or not result.get("data", {}).get("success"): | |
| raise RuntimeError(f"結合開始に失敗しました: {result}") | |
| # ステータス監視用URLを取得 | |
| monitor_url = result["data"]["result"]["monitor"]["http"] | |
| # 完了するまでポーリング | |
| max_attempts = 5 # 最大60回試行(30秒間隔で約30分) | |
| attempt = 0 | |
| while attempt < max_attempts: | |
| time.sleep(20) # 30秒間隔でチェック | |
| status_res = requests.get(monitor_url, timeout=30) | |
| status_res.raise_for_status() | |
| status_data = status_res.json() | |
| if not status_data.get("success"): | |
| raise RuntimeError(f"ステータス取得に失敗しました: {status_data}") | |
| status = status_data["result"]["status"] | |
| if status == "done": | |
| # 完了したらダウンロードURLを返す | |
| return status_data["result"]["output"]["url"] | |
| elif status == "error": | |
| error_msg = status_data["result"]["error"] | |
| raise RuntimeError(f"結合処理でエラーが発生しました: {error_msg}") | |
| # "processing" の場合は継続 | |
| attempt += 1 | |
| print(f"結合処理中... 進捗: {status_data['result'].get('progress_in_percent', 0)}%") | |
| raise RuntimeError("結合処理がタイムアウトしました") | |
| def send_to_channel(text): | |
| payload = { | |
| "requestId": f"desk-web-{int(time.time() * 1000)}", | |
| "blocks": [ | |
| {"type": "text", "value": text} | |
| ], | |
| } | |
| res = requests.post( | |
| POST_URL, | |
| headers=HEADERS_POST, | |
| data=json.dumps(payload), | |
| timeout=30 | |
| ) | |
| res.raise_for_status() | |
| def upload_file_to_channel(file_url): | |
| # mp4バイナリ取得 | |
| file_res = requests.get(file_url, timeout=60) | |
| file_res.raise_for_status() | |
| upload_url = "https://media.channel.io/cht/v1/pri-file/200605/groups/519217/message/send_yt_video_file.mp4" | |
| headers = { | |
| "x-account": X_ACCOUNT, | |
| "Content-Type": "video/mp4", | |
| "Content-Length": str(len(file_res.content)), | |
| } | |
| res = requests.post( | |
| upload_url, | |
| headers=headers, | |
| data=file_res.content, | |
| timeout=60 | |
| ) | |
| res.raise_for_status() | |
| return res.json() # ←これを messages POST の body.files に入れる | |
| def send_video_message(file_json): | |
| request_id = f"desk-web-{int(time.time() * 1000)}" | |
| payload = { | |
| "requestId": request_id, | |
| "blocks": [ | |
| {"type": "text", "value": "プレビュー:"} | |
| ], | |
| "files": [file_json], # ←ここが重要! | |
| } | |
| res = requests.post( | |
| POST_URL, | |
| headers=HEADERS_POST, | |
| data=json.dumps(payload), | |
| timeout=30 | |
| ) | |
| res.raise_for_status() | |
| # ===== Main ===== | |
| def main(): | |
| processed_messages = set() # 処理済みメッセージを追跡 | |
| while True: | |
| try: | |
| res = requests.get( | |
| GET_URL, | |
| headers=HEADERS_GET, | |
| params=PARAMS, | |
| timeout=30, | |
| ) | |
| res.raise_for_status() | |
| messages = res.json().get("messages", []) | |
| latest_msg = None | |
| latest_time = None | |
| for msg in messages: | |
| msg_id = msg.get("id") | |
| plain_text = msg.get("plainText") | |
| updated_at = msg.get("updatedAt") | |
| if not plain_text or updated_at is None: | |
| continue | |
| # 既に処理済みのメッセージはスキップ | |
| if msg_id in processed_messages: | |
| continue | |
| t = parse_updated_at(updated_at) | |
| if not t: | |
| continue | |
| if latest_time is None or t > latest_time: | |
| latest_time = t | |
| latest_msg = msg | |
| if not latest_msg: | |
| time.sleep(10) | |
| continue | |
| video_id, option = extract_youtube_and_option(latest_msg["plainText"]) | |
| if not video_id: | |
| time.sleep(10) | |
| continue | |
| valid_options = ["144", "240", "360", "480", "720", "1080", "1440", "2160", "4320", "4k", "8k", "2k", "a", "f"] | |
| # 画質入力チェックを normalize_quality の前にする | |
| if option is None: | |
| send_to_channel( | |
| "画質が指定されていないため、デフォルトの720pを使用します。\n" | |
| "指定する場合は、動画URLの後にスペースを入れて、" | |
| "144/240/360/480/720/1080/2k(または1440)/4k(または2160)/8k(または4320)/a(結合しない)/f(最高画質) のいずれかを入力してください。\n注意:最大画質や大きい画質、長い動画だと処理に時間がかかるからタイムアウトしやすいです。" | |
| ) | |
| else: | |
| opt_lower = option.lower() | |
| if opt_lower not in valid_options: | |
| send_to_channel( | |
| "不明な画質、または予期しない形式でした。\n" | |
| "対応画質: 144, 240, 360, 480, 720, 1080, " | |
| "2k(1440), 4k(2160), 8k(4320), a(結合無効), f(最大画質)\n" | |
| "動画のURLの後にスペースを入れて指定してください。\n" | |
| "例: https://youtu.be/abcdEFGHijk 1080\n注意:最大画質や大きい画質、長い動画だと処理に時間がかかるからタイムアウトしやすいです。" | |
| ) | |
| processed_messages.add(latest_msg["id"]) | |
| continue | |
| quality_opt = normalize_quality(option) | |
| youtube_url = f"https://www.youtube.com/watch?v={video_id}" | |
| send_to_channel(f"{video_id} のダウンロードを開始します。") | |
| video_title = get_video_title(youtube_url) | |
| items, nonce = fetch_download_links(youtube_url) | |
| all_items = items[:] | |
| audio_items = [i for i in items if i["quality"] == "audio"] | |
| selected_video = None | |
| selected_audio = audio_items[0] if audio_items else None | |
| if quality_opt not in ["a"]: | |
| selected_video = select_quality_video(items, quality_opt) | |
| # ダウンロードリスト送信 | |
| message_lines = [] | |
| message_lines.append(f"<b>{video_title}</b>\n") | |
| message_lines.append("<b>ダウンロードリンク</b>") | |
| # すべての動画リンクを追加 | |
| for item in all_items: | |
| quality = item["quality"] | |
| if item["quality"] == "audio": | |
| quality_display = "音声" | |
| elif item["has_audio"] == "true": | |
| quality_display = f"{quality} (音声付き)" | |
| else: | |
| quality_display = f"{quality} (映像のみ)" | |
| message_lines.append(f"<link type=\"url\" value=\"{item['url']}\"> {quality_display} </link>") | |
| message_lines.append("★動画ファイルと音声ファイルをダウンロードして<link type=\"url\" value=\"https://qooly.com/ja/merge-video-and-audio-online\">ここ</link>などで結合すると早くて確実です。") | |
| send_to_channel("\n".join(message_lines)) | |
| # ★ 結合フラグ | |
| do_merge = quality_opt not in ["a"] | |
| if do_merge and selected_video and selected_audio: | |
| m = re.match(r"(\d+)p", selected_video["quality"]) | |
| if m: | |
| q = m.group(1) | |
| send_to_channel(f"{q}pで音声との結合を開始しました。") | |
| try: | |
| merged_url = merge_video_on_server( | |
| selected_video["url"], | |
| selected_audio["url"], | |
| video_id, | |
| q, | |
| video_title, | |
| nonce | |
| ) | |
| send_to_channel(f"<link type=\"url\" value=\"{merged_url}\">{q}p 結合済み動画</link>") | |
| file_json = upload_file_to_channel(merged_url) | |
| send_video_message(file_json) | |
| except Exception as e: | |
| send_to_channel(f"結合失敗: {e}") | |
| send_to_channel("完了しました!") | |
| processed_messages.add(latest_msg["id"]) | |
| print(f"送信完了: {video_id}") | |
| except Exception as e: | |
| print("エラー:", e) | |
| import traceback | |
| traceback.print_exc() | |
| time.sleep(15) | |
| if __name__ == "__main__": | |
| main() |