any-env-code / watcher.py
izuemon's picture
Update watcher.py
a8c8026 verified
raw
history blame
7.27 kB
import os
import re
import time
import json
import subprocess
import requests
from datetime import datetime, timezone
from bs4 import BeautifulSoup
# ===== Channel.io 設定 =====
GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages"
POST_URL = GET_URL
PARAMS = {
"sortOrder": "desc",
"limit": 36,
"logFolded": "false",
}
X_ACCOUNT = os.getenv("channeliotokenbot2")
if not X_ACCOUNT:
raise RuntimeError("環境変数 channeliotokenokenbot2 が設定されていません")
HEADERS_GET = {
"accept": "application/json",
"accept-language": "ja",
"x-account": X_ACCOUNT,
}
HEADERS_POST = {
"accept": "application/json",
"accept-language": "ja",
"content-type": "application/json",
"x-account": X_ACCOUNT,
}
# ===== ssyoutube =====
SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/"
# ===== tfLink クライアント =====
from tflink import TFLinkClient
tf_client = TFLinkClient() # 匿名アップロード
# ===== Utils =====
def parse_updated_at(value):
if isinstance(value, (int, float)):
return datetime.fromtimestamp(value / 1000, tz=timezone.utc)
elif isinstance(value, str):
return datetime.fromisoformat(value.replace("Z", "+00:00"))
return None
def extract_youtube_id(text):
patterns = [
r"v=([A-Za-z0-9_-]{11})",
r"youtu\.be/([A-Za-z0-9_-]{11})",
]
for p in patterns:
m = re.search(p, text)
if m:
return m.group(1)
return None
# ===== ssyoutube HTML 解析 =====
def fetch_download_links(youtube_url):
res = requests.post(
SSYOUTUBE_URL,
data={"videoURL": youtube_url},
timeout=30,
headers={
"User-Agent": "Mozilla/5.0",
"Referer": "https://ssyoutube.online/",
}
)
res.raise_for_status()
soup = BeautifulSoup(res.text, "lxml")
buttons = soup.select("button[data-url]")
results = []
for btn in buttons:
url = btn.get("data-url")
quality = btn.get("data-quality")
has_audio = btn.get("data-has-audio")
if not url:
continue
results.append({
"url": url,
"quality": quality,
"has_audio": has_audio,
})
return results
# ===== 動画と音声の選別 =====
def choose_best_streams(items):
video_only = []
audio_only = []
for item in items:
url = item["url"]
quality = item["quality"] or ""
has_audio = item["has_audio"]
if url.endswith(".m4a") or "audio" in quality.lower():
audio_only.append(item)
else:
video_only.append(item)
if not video_only:
video = None
else:
video = sorted(video_only,
key=lambda x: int(re.sub(r"[^\d]", "", x["quality"] or "0")),
reverse=True)[0]
if not audio_only:
audio = None
else:
audio = sorted(audio_only,
key=lambda x: int(re.sub(r"[^\d]", "", x["quality"] or "0")),
reverse=True)[0]
return video, audio
# ===== 結合 =====
def merge_video_audio(video_url, audio_url, out_file):
cmd = [
"ffmpeg", "-y",
"-i", video_url,
"-i", audio_url,
"-c", "copy",
out_file,
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
print("FFmpeg merge error:", result.stderr.decode())
return False
return True
# ===== tfLink アップロード =====
def upload_to_tflink(file_path):
res = tf_client.upload(file_path)
return res.download_link
def build_links(items, upload_link):
lines = []
for item in items:
url = item["url"]
quality = item["quality"]
line = f'<link type="url" value="{url}"> {quality}</link>'
lines.append(line)
lines.append(f"結合ファイルダウンロード: {upload_link}")
return "\n".join(lines)
def send_to_channel(text):
payload = {
"requestId": f"desk-web-{int(time.time() * 1000)}",
"blocks": [
{
"type": "text",
"value": text
}
],
"buttons": None,
"form": None,
"webPage": None,
"files": None,
"customPayload": None
}
res = requests.post(
POST_URL,
headers=HEADERS_POST,
data=json.dumps(payload),
timeout=30
)
res.raise_for_status()
# ===== Main =====
def main():
while True:
try:
res = requests.get(
GET_URL,
headers=HEADERS_GET,
params=PARAMS,
timeout=30,
)
res.raise_for_status()
messages = res.json().get("messages", [])
latest_msg = None
latest_time = None
for msg in messages:
plain_text = msg.get("plainText")
updated_at = msg.get("updatedAt")
if not plain_text or updated_at is None:
continue
t = parse_updated_at(updated_at)
if not t:
continue
if latest_time is None or t > latest_time:
latest_time = t
latest_msg = msg
if not latest_msg:
time.sleep(10)
continue
text = latest_msg["plainText"]
youtube_id = extract_youtube_id(text)
if not youtube_id:
time.sleep(10)
continue
youtube_url = f"https://www.youtube.com/watch?v={youtube_id}"
items = fetch_download_links(youtube_url)
if not items:
time.sleep(10)
continue
video_stream, audio_stream = choose_best_streams(items)
if not video_stream or not audio_stream:
print("映像または音声ストリームが足りません")
time.sleep(10)
continue
# ダウンロードして結合
temp_video = "video.mp4"
temp_audio = "audio.mp4"
merged = "merged_output.mp4"
# ダウンロード
with requests.get(video_stream["url"], stream=True) as r:
with open(temp_video, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
with requests.get(audio_stream["url"], stream=True) as r:
with open(temp_audio, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if not merge_video_audio(temp_video, temp_audio, merged):
print("結合失敗")
time.sleep(10)
continue
# tfLink アップロード
upload_link = upload_to_tflink(merged)
message_text = build_links(items, upload_link)
send_to_channel(message_text)
print("送信完了")
except Exception as e:
print("エラー:", e)
time.sleep(15)
if __name__ == "__main__":
main()