Spaces:
Running
Running
| import os | |
| import re | |
| import time | |
| import json | |
| import subprocess | |
| import requests | |
| from datetime import datetime, timezone | |
| from bs4 import BeautifulSoup | |
| # ===== Channel.io 設定 ===== | |
| GET_URL = "https://desk-api.channel.io/desk/channels/200605/groups/519217/messages" | |
| POST_URL = GET_URL | |
| PARAMS = { | |
| "sortOrder": "desc", | |
| "limit": 36, | |
| "logFolded": "false", | |
| } | |
| X_ACCOUNT = os.getenv("channeliotokenbot2") | |
| if not X_ACCOUNT: | |
| raise RuntimeError("環境変数 channeliotokenokenbot2 が設定されていません") | |
| HEADERS_GET = { | |
| "accept": "application/json", | |
| "accept-language": "ja", | |
| "x-account": X_ACCOUNT, | |
| } | |
| HEADERS_POST = { | |
| "accept": "application/json", | |
| "accept-language": "ja", | |
| "content-type": "application/json", | |
| "x-account": X_ACCOUNT, | |
| } | |
| # ===== ssyoutube ===== | |
| SSYOUTUBE_URL = "https://ssyoutube.online/yt-video-detail/" | |
| # ===== tfLink クライアント ===== | |
| from tflink import TFLinkClient | |
| tf_client = TFLinkClient() # 匿名アップロード | |
| # ===== Utils ===== | |
| def parse_updated_at(value): | |
| if isinstance(value, (int, float)): | |
| return datetime.fromtimestamp(value / 1000, tz=timezone.utc) | |
| elif isinstance(value, str): | |
| return datetime.fromisoformat(value.replace("Z", "+00:00")) | |
| return None | |
| def extract_youtube_id(text): | |
| patterns = [ | |
| r"v=([A-Za-z0-9_-]{11})", | |
| r"youtu\.be/([A-Za-z0-9_-]{11})", | |
| ] | |
| for p in patterns: | |
| m = re.search(p, text) | |
| if m: | |
| return m.group(1) | |
| return None | |
| # ===== ssyoutube HTML 解析 ===== | |
| def fetch_download_links(youtube_url): | |
| res = requests.post( | |
| SSYOUTUBE_URL, | |
| data={"videoURL": youtube_url}, | |
| timeout=30, | |
| headers={ | |
| "User-Agent": "Mozilla/5.0", | |
| "Referer": "https://ssyoutube.online/", | |
| } | |
| ) | |
| res.raise_for_status() | |
| soup = BeautifulSoup(res.text, "lxml") | |
| buttons = soup.select("button[data-url]") | |
| results = [] | |
| for btn in buttons: | |
| url = btn.get("data-url") | |
| quality = btn.get("data-quality") | |
| has_audio = btn.get("data-has-audio") | |
| if not url: | |
| continue | |
| results.append({ | |
| "url": url, | |
| "quality": quality, | |
| "has_audio": has_audio, | |
| }) | |
| return results | |
| # ===== 動画と音声の選別 ===== | |
| def choose_best_streams(items): | |
| video_only = [] | |
| audio_only = [] | |
| for item in items: | |
| url = item["url"] | |
| quality = item["quality"] or "" | |
| has_audio = item["has_audio"] | |
| if url.endswith(".m4a") or "audio" in quality.lower(): | |
| audio_only.append(item) | |
| else: | |
| video_only.append(item) | |
| if not video_only: | |
| video = None | |
| else: | |
| video = sorted(video_only, | |
| key=lambda x: int(re.sub(r"[^\d]", "", x["quality"] or "0")), | |
| reverse=True)[0] | |
| if not audio_only: | |
| audio = None | |
| else: | |
| audio = sorted(audio_only, | |
| key=lambda x: int(re.sub(r"[^\d]", "", x["quality"] or "0")), | |
| reverse=True)[0] | |
| return video, audio | |
| # ===== 結合 ===== | |
| def merge_video_audio(video_url, audio_url, out_file): | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", video_url, | |
| "-i", audio_url, | |
| "-c", "copy", | |
| out_file, | |
| ] | |
| result = subprocess.run(cmd, capture_output=True) | |
| if result.returncode != 0: | |
| print("FFmpeg merge error:", result.stderr.decode()) | |
| return False | |
| return True | |
| # ===== tfLink アップロード ===== | |
| def upload_to_tflink(file_path): | |
| res = tf_client.upload(file_path) | |
| return res.download_link | |
| def build_links(items, upload_link): | |
| lines = [] | |
| for item in items: | |
| url = item["url"] | |
| quality = item["quality"] | |
| line = f'<link type="url" value="{url}"> {quality}</link>' | |
| lines.append(line) | |
| lines.append(f"結合ファイルダウンロード: {upload_link}") | |
| return "\n".join(lines) | |
| def send_to_channel(text): | |
| payload = { | |
| "requestId": f"desk-web-{int(time.time() * 1000)}", | |
| "blocks": [ | |
| { | |
| "type": "text", | |
| "value": text | |
| } | |
| ], | |
| "buttons": None, | |
| "form": None, | |
| "webPage": None, | |
| "files": None, | |
| "customPayload": None | |
| } | |
| res = requests.post( | |
| POST_URL, | |
| headers=HEADERS_POST, | |
| data=json.dumps(payload), | |
| timeout=30 | |
| ) | |
| res.raise_for_status() | |
| # ===== Main ===== | |
| def main(): | |
| while True: | |
| try: | |
| res = requests.get( | |
| GET_URL, | |
| headers=HEADERS_GET, | |
| params=PARAMS, | |
| timeout=30, | |
| ) | |
| res.raise_for_status() | |
| messages = res.json().get("messages", []) | |
| latest_msg = None | |
| latest_time = None | |
| for msg in messages: | |
| plain_text = msg.get("plainText") | |
| updated_at = msg.get("updatedAt") | |
| if not plain_text or updated_at is None: | |
| continue | |
| t = parse_updated_at(updated_at) | |
| if not t: | |
| continue | |
| if latest_time is None or t > latest_time: | |
| latest_time = t | |
| latest_msg = msg | |
| if not latest_msg: | |
| time.sleep(10) | |
| continue | |
| text = latest_msg["plainText"] | |
| youtube_id = extract_youtube_id(text) | |
| if not youtube_id: | |
| time.sleep(10) | |
| continue | |
| youtube_url = f"https://www.youtube.com/watch?v={youtube_id}" | |
| items = fetch_download_links(youtube_url) | |
| if not items: | |
| time.sleep(10) | |
| continue | |
| video_stream, audio_stream = choose_best_streams(items) | |
| if not video_stream or not audio_stream: | |
| print("映像または音声ストリームが足りません") | |
| time.sleep(10) | |
| continue | |
| # ダウンロードして結合 | |
| temp_video = "video.mp4" | |
| temp_audio = "audio.mp4" | |
| merged = "merged_output.mp4" | |
| # ダウンロード | |
| with requests.get(video_stream["url"], stream=True) as r: | |
| with open(temp_video, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| with requests.get(audio_stream["url"], stream=True) as r: | |
| with open(temp_audio, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| if not merge_video_audio(temp_video, temp_audio, merged): | |
| print("結合失敗") | |
| time.sleep(10) | |
| continue | |
| # tfLink アップロード | |
| upload_link = upload_to_tflink(merged) | |
| message_text = build_links(items, upload_link) | |
| send_to_channel(message_text) | |
| print("送信完了") | |
| except Exception as e: | |
| print("エラー:", e) | |
| time.sleep(15) | |
| if __name__ == "__main__": | |
| main() | |