any-env-code / watcher.py
izuemon's picture
Update watcher.py
bc9968c verified
raw
history blame
9.79 kB
import os
import re
import time
import json
import shutil
import tempfile
import subprocess
import requests
from datetime import datetime, timezone
from bs4 import BeautifulSoup
# ===== Channel.io 設定 =====
GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages"
POST_URL = GET_URL
PARAMS = {
"sortOrder": "desc",
"limit": 36,
"logFolded": "false",
}
X_ACCOUNT = os.getenv("channeliotokenbot2")
if not X_ACCOUNT:
raise RuntimeError("環境変数 channeliotokenbot2 が設定されていません")
HEADERS_GET = {
"accept": "application/json",
"accept-language": "ja",
"x-account": X_ACCOUNT,
}
HEADERS_POST = {
"accept": "application/json",
"accept-language": "ja",
"content-type": "application/json",
"x-account": X_ACCOUNT,
}
# ===== ssyoutube =====
SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/"
# ===== Utils =====
def parse_updated_at(value):
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value / 1000, tz=timezone.utc)
elif isinstance(value, str):
return datetime.fromisoformat(value.replace("Z", "+00:00"))
return None
def extract_youtube_id(text):
patterns = [
r"v=([A-Za-z0-9_-]{11})",
r"youtu\.be/([A-Za-z0-9_-]{11})",
]
for p in patterns:
m = re.search(p, text)
if m:
return m.group(1)
return None
# ===== ssyoutube HTML 解析 =====
def fetch_download_links(youtube_url):
res = requests.post(
SSYOUTUBE_URL,
data={"videoURL": youtube_url},
timeout=30,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": "https://ssyoutube.online/",
}
)
res.raise_for_status()
# nonceを抽出
soup = BeautifulSoup(res.text, "lxml")
nonce_match = re.search(r"formData\.append\('nonce', '([^']+)'\);", res.text)
nonce = nonce_match.group(1) if nonce_match else None
buttons = soup.select("button[data-url]")
results = []
for btn in buttons:
url = btn.get("data-url")
quality = btn.get("data-quality")
has_audio = btn.get("data-has-audio")
if not url:
continue
results.append({
"url": url,
"quality": quality or "audio",
"has_audio": has_audio,
})
return results, nonce
# ===== 最高画質映像 + 音声を選択 =====
def select_best_video_and_audio(items):
video_items = []
audio_item = None
for item in items:
if item["quality"] == "audio":
audio_item = item
elif item["has_audio"] == "false":
m = re.match(r"(\d+)p", item["quality"])
if m:
video_items.append((int(m.group(1)), item))
if not video_items or not audio_item:
return None, None
video_items.sort(key=lambda x: x[0], reverse=True)
return video_items[0][1], audio_item
# ===== ssyoutube サーバー側結合 =====
def merge_video_on_server(video_url, audio_url, video_id, quality, video_title, nonce):
"""
ssyoutubeサーバーで動画と音声を結合する
"""
# IDと画質からリクエストIDを生成
request_id = f"{video_id}_{quality}p"
# リクエストデータの構築
request_data = {
"id": request_id,
"ttl": 3600000,
"inputs": [
{
"url": video_url,
"ext": "mp4",
"chunkDownload": {
"type": "header",
"size": 52428800,
"concurrency": 3
}
},
{
"url": audio_url,
"ext": "m4a"
}
],
"output": {
"ext": "mp4",
"downloadName": f"{video_title}_{quality}p.mp4",
"chunkUpload": {
"size": 104857600,
"concurrency": 3
}
},
"operation": {
"type": "replace_audio_in_video"
}
}
# 結合開始リクエスト
payload = {
"action": "process_video_merge",
"nonce": nonce,
"request_data": json.dumps(request_data)
}
headers = {
"User-Agent": "Mozilla/5.0",
"Referer": "https://ssyoutube.online/",
"Origin": "https://ssyoutube.online",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
res = requests.post(
"https://ssyoutube.online/wp-admin/admin-ajax.php",
data=payload,
headers=headers,
timeout=30
)
res.raise_for_status()
result = res.json()
if not result.get("success") or not result.get("data", {}).get("success"):
raise RuntimeError(f"結合開始に失敗しました: {result}")
# ステータス監視用URLを取得
monitor_url = result["data"]["result"]["monitor"]["http"]
# 完了するまでポーリング
max_attempts = 60 # 最大60回試行(30秒間隔で約30分)
attempt = 0
while attempt < max_attempts:
time.sleep(30) # 30秒間隔でチェック
status_res = requests.get(monitor_url, timeout=30)
status_res.raise_for_status()
status_data = status_res.json()
if not status_data.get("success"):
raise RuntimeError(f"ステータス取得に失敗しました: {status_data}")
status = status_data["result"]["status"]
if status == "done":
# 完了したらダウンロードURLを返す
return status_data["result"]["output"]["url"]
elif status == "error":
error_msg = status_data["result"]["error"]
raise RuntimeError(f"結合処理でエラーが発生しました: {error_msg}")
# "processing" の場合は継続
attempt += 1
print(f"結合処理中... 進捗: {status_data['result'].get('progress_in_percent', 0)}%")
raise RuntimeError("結合処理がタイムアウトしました")
def send_to_channel(text):
payload = {
"requestId": f"desk-web-{int(time.time() * 1000)}",
"blocks": [
{"type": "text", "value": text}
],
}
res = requests.post(
POST_URL,
headers=HEADERS_POST,
data=json.dumps(payload),
timeout=30
)
res.raise_for_status()
# ===== Main =====
def main():
processed_messages = set() # 処理済みメッセージを追跡
while True:
try:
res = requests.get(
GET_URL,
headers=HEADERS_GET,
params=PARAMS,
timeout=30,
)
res.raise_for_status()
messages = res.json().get("messages", [])
latest_msg = None
latest_time = None
for msg in messages:
msg_id = msg.get("id")
plain_text = msg.get("plainText")
updated_at = msg.get("updatedAt")
if not plain_text or updated_at is None:
continue
# 既に処理済みのメッセージはスキップ
if msg_id in processed_messages:
continue
t = parse_updated_at(updated_at)
if not t:
continue
if latest_time is None or t > latest_time:
latest_time = t
latest_msg = msg
if not latest_msg:
time.sleep(10)
continue
youtube_id = extract_youtube_id(latest_msg["plainText"])
if not youtube_id:
time.sleep(10)
continue
youtube_url = f"https://www.youtube.com/watch?v={youtube_id}"
items, nonce = fetch_download_links(youtube_url)
if not nonce:
print("nonceの取得に失敗しました")
time.sleep(10)
continue
video_item, audio_item = select_best_video_and_audio(items)
if not video_item or not audio_item:
print("適切な動画または音声が見つかりません")
time.sleep(10)
continue
# 画質情報を取得
quality_match = re.match(r"(\d+)p", video_item["quality"])
if not quality_match:
print("画質情報の取得に失敗しました")
time.sleep(10)
continue
quality = quality_match.group(1)
# 動画タイトル(簡易版 - 実際にはYouTubeから取得する方が良い)
video_title = f"youtube_video_{youtube_id}"
# サーバー側で結合
print(f"サーバー側での結合を開始します: {youtube_id} {quality}p")
merged_url = merge_video_on_server(
video_item["url"],
audio_item["url"],
youtube_id,
quality,
video_title,
nonce
)
# 結果を送信
message = f"🎬 結合済み動画({quality}p)\n{merged_url}"
send_to_channel(message)
# 処理済みとしてマーク
processed_messages.add(latest_msg.get("id"))
print("送信完了:", merged_url)
except Exception as e:
print("エラー:", e)
import traceback
traceback.print_exc()
time.sleep(15)
if __name__ == "__main__":
main()