| import os |
| import re |
| import time |
| import json |
| import uuid |
| import hashlib |
| import pathlib |
| import tempfile |
| import traceback |
| import subprocess |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| import requests |
| import telebot |
| from telebot import apihelper |
| from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton |
| import yt_dlp |
|
|
| from xhs_extract import extract_xhs |
| from douyin_extract import extract_douyin |
| from big_send import send_big_video, send_photos_mtproto |
|
|
| |
| apihelper.API_URL = "https://nine7.linlizhi0210.workers.dev/bot{0}/{1}" |
| apihelper.FILE_URL = "https://nine7.linlizhi0210.workers.dev/file/bot{0}/{1}" |
|
|
| |
| apihelper.RETRY_ON_ERROR = True |
| apihelper.RETRY_TIMEOUT = 3 |
| apihelper.CONNECT_TIMEOUT = 15 |
| apihelper.READ_TIMEOUT = 30 |
|
|
| |
| BOT_TOKEN = os.environ["BOT_TOKEN"] |
| BOT_API_LIMIT = 50 * 1024 * 1024 |
| MTPROTO_LIMIT = 2000 * 1024 * 1024 |
|
|
| bot = telebot.TeleBot(BOT_TOKEN, parse_mode="HTML") |
|
|
| |
| executor = ThreadPoolExecutor(max_workers=3) |
|
|
| |
| _cache = {} |
| CACHE_TTL = 3600 |
|
|
| def _cache_key(url: str) -> str: |
| return hashlib.md5(url.encode()).hexdigest() |
|
|
| def _get_cache(url: str): |
| k = _cache_key(url) |
| if k in _cache and time.time() - _cache[k]["ts"] < CACHE_TTL: |
| return _cache[k] |
| if k in _cache: |
| del _cache[k] |
| return None |
|
|
| def _set_cache(url: str, file_id: str, caption: str): |
| k = _cache_key(url) |
| _cache[k] = {"ts": time.time(), "file_id": file_id, "caption": caption} |
| now = time.time() |
| expired = [key for key, val in _cache.items() if now - val["ts"] > CACHE_TTL] |
| for key in expired: |
| del _cache[key] |
|
|
| |
| COOKIE_DIR = pathlib.Path("/tmp/cookies") |
| COOKIE_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| NETSCAPE_HEADER = "# Netscape HTTP Cookie File\n" |
|
|
| DEFAULT_DOMAIN = { |
| "bilibili": ".bilibili.com", |
| "douyin": ".douyin.com", |
| "youtube": ".youtube.com", |
| "xhs": ".xiaohongshu.com", |
| "tiktok": ".tiktok.com", |
| "weibo": ".weibo.com", |
| "kuaishou": ".kuaishou.com", |
| "ph": ".pornhub.com", |
|
|
| } |
|
|
| def _extract_cookie_line(raw: str) -> str: |
| for line in raw.splitlines(): |
| s = line.strip() |
| if s.lower().startswith("cookie:"): |
| return s.split(":", 1)[1].strip() |
| return raw.strip() |
|
|
| def _to_netscape(raw: str, domain: str) -> str: |
| raw = raw.strip() |
| if raw.startswith("# Netscape") or "\tTRUE\t" in raw or "\tFALSE\t" in raw: |
| if not raw.startswith("# Netscape"): |
| raw = NETSCAPE_HEADER + raw |
| return raw + "\n" |
| cookie_str = _extract_cookie_line(raw) |
| lines = [NETSCAPE_HEADER.rstrip()] |
| expire = int(time.time()) + 3600 * 24 * 30 |
| for pair in cookie_str.split(";"): |
| pair = pair.strip() |
| if not pair or "=" not in pair: |
| continue |
| k, v = pair.split("=", 1) |
| k, v = k.strip(), v.strip() |
| if not k: |
| continue |
| lines.append(f"{domain}\tTRUE\t/\tFALSE\t{expire}\t{k}\t{v}") |
| return "\n".join(lines) + "\n" |
|
|
| def _dump_cookie(name: str, env: str): |
| data = os.environ.get(env) |
| if not data: |
| return None |
| if name == "douyin": |
| domains = [".douyin.com", ".iesdouyin.com", ".snssdk.com"] |
| else: |
| domains = [DEFAULT_DOMAIN.get(name, f".{name}.com")] |
| p = COOKIE_DIR / f"{name}.txt" |
| parts = [] |
| for d in domains: |
| text = _to_netscape(data, d) |
| if parts: |
| text = text.replace(NETSCAPE_HEADER, "", 1) |
| parts.append(text) |
| p.write_text("".join(parts)) |
| return str(p) |
|
|
| COOKIES = { |
| "bilibili.com": _dump_cookie("bilibili", "BILI_COOKIES"), |
| "b23.tv": _dump_cookie("bilibili", "BILI_COOKIES"), |
| "douyin.com": _dump_cookie("douyin", "DOUYIN_COOKIES"), |
| "iesdouyin.com": _dump_cookie("douyin", "DOUYIN_COOKIES"), |
| "youtube.com": _dump_cookie("youtube", "YT_COOKIES"), |
| "youtu.be": _dump_cookie("youtube", "YT_COOKIES"), |
| "xiaohongshu.com": _dump_cookie("xhs", "XHS_COOKIES"), |
| "xhslink.com": _dump_cookie("xhs", "XHS_COOKIES"), |
| "tiktok.com": _dump_cookie("tiktok", "TIKTOK_COOKIES"), |
| "weibo.com": _dump_cookie("weibo", "WEIBO_COOKIES"), |
| "kuaishou.com": _dump_cookie("kuaishou", "KS_COOKIES"), |
| "pornhub.com": _dump_cookie("ph", "PH_COOKIES"), |
| } |
|
|
| def pick_cookie(url: str): |
| for host, path in COOKIES.items(): |
| if host in url and path: |
| return path |
| return None |
|
|
| def _read_cookie_header(env: str) -> str: |
| raw = os.environ.get(env, "") |
| if not raw: |
| return "" |
| for line in raw.splitlines(): |
| s = line.strip() |
| if s.lower().startswith("cookie:"): |
| return s.split(":", 1)[1].strip() |
| if "=" in raw and ";" in raw: |
| return raw.strip() |
| return "" |
|
|
| |
| PLATFORM_EMOJI = { |
| "douyin": "🎵 抖音", |
| "bilibili": "📺 B站", |
| "youtube": "▶️ YouTube", |
| "tiktok": "🎵 TikTok", |
| "xhs": "📕 小红书", |
| "weibo": "🔥 微博", |
| "kuaishou": "⚡ 快手", |
| "twitter": "🐦 X", |
| "instagram": "📷 Instagram", |
| } |
|
|
| def _detect_platform(url: str) -> str: |
| if "douyin.com" in url or "iesdouyin.com" in url: |
| return "douyin" |
| if "bilibili.com" in url or "b23.tv" in url: |
| return "bilibili" |
| if "youtube.com" in url or "youtu.be" in url: |
| return "youtube" |
| if "tiktok.com" in url: |
| return "tiktok" |
| if "xiaohongshu.com" in url or "xhslink.com" in url: |
| return "xhs" |
| if "weibo.com" in url: |
| return "weibo" |
| if "kuaishou.com" in url: |
| return "kuaishou" |
| if "twitter.com" in url or "x.com" in url: |
| return "twitter" |
| if "instagram.com" in url: |
| return "instagram" |
| return "" |
|
|
| |
| def _format_duration(sec): |
| sec = int(sec) |
| if sec <= 0: |
| return "" |
| if sec < 60: |
| return f"0:{sec:02d}" |
| if sec < 3600: |
| return f"{sec // 60}:{sec % 60:02d}" |
| return f"{sec // 3600}:{(sec % 3600) // 60:02d}:{sec % 60:02d}" |
|
|
| def _format_resolution(w, h): |
| if not w or not h: |
| return "" |
| short = min(w, h) |
| if short >= 2160: return "4K" |
| if short >= 1440: return "2K" |
| if short >= 1080: return "1080p" |
| if short >= 720: return "720p" |
| if short >= 480: return "480p" |
| return f"{w}×{h}" |
|
|
| def _format_filesize(size_bytes): |
| if size_bytes < 1024 * 1024: |
| return f"{size_bytes / 1024:.0f}KB" |
| if size_bytes < 1024 * 1024 * 1024: |
| return f"{size_bytes / 1024 / 1024:.1f}MB" |
| return f"{size_bytes / 1024 / 1024 / 1024:.2f}GB" |
|
|
| |
| def _clean_tags_generic(tags: list) -> list: |
| cleaned = [] |
| for t in tags: |
| t = t.strip() |
| t = re.sub(r"\[.*?\]", "", t) |
| t = t.rstrip("#").strip() |
| if t and not t.startswith("#"): |
| t = f"#{t}" |
| if t and t != "#": |
| cleaned.append(t) |
| return cleaned |
|
|
| |
| URL_RE = re.compile(r"https?://[^\s]+", re.IGNORECASE) |
|
|
| SUPPORTED_HINTS = ( |
| "bilibili.com", "b23.tv", |
| "douyin.com", "iesdouyin.com", |
| "tiktok.com", |
| "pornhub.com", |
| "youtube.com", "youtu.be", |
| "xiaohongshu.com", "xhslink.com", |
| "kuaishou.com", |
| "weibo.com", |
| "twitter.com", "x.com", |
| "instagram.com", |
| "facebook.com", "fb.watch", |
| ) |
|
|
| SHORT_LINK_HOSTS = ( |
| "xhslink.com", "xhs.link", |
| "b23.tv", |
| "v.douyin.com", |
| "vt.tiktok.com", "vm.tiktok.com", |
| "v.kuaishou.com", |
| ) |
|
|
| def extract_supported_url(text: str): |
| if not text: |
| return None |
| for m in URL_RE.finditer(text): |
| url = m.group(0).rstrip("))),,。.!!??") |
| if any(h in url for h in SUPPORTED_HINTS): |
| return url |
| return None |
|
|
| def _expand_short_url(url: str) -> str: |
| try: |
| r = requests.head( |
| url, allow_redirects=True, timeout=10, |
| headers={ |
| "User-Agent": |
| "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) " |
| "AppleWebKit/605.1.15 (KHTML, like Gecko) " |
| "Version/16.6 Mobile/15E148 Safari/604.1" |
| }, |
| ) |
| return r.url or url |
| except Exception as e: |
| print("expand short url failed:", e) |
| return url |
|
|
| def _normalize_url(url: str) -> str: |
| m = re.search(r"iesdouyin\.com/share/video/(\d+)", url) |
| if m: |
| return f"https://www.douyin.com/video/{m.group(1)}" |
| m = re.search(r"douyin\.com/share/video/(\d+)", url) |
| if m: |
| return f"https://www.douyin.com/video/{m.group(1)}" |
| m = re.search(r"iesdouyin\.com/share/user/[^?]+\?.*?aweme_id=(\d+)", url) |
| if m: |
| return f"https://www.douyin.com/video/{m.group(1)}" |
| return url |
|
|
| |
| DEFAULT_HEADERS = { |
| "User-Agent": ( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/124.0.0.0 Safari/537.36" |
| ), |
| "Referer": "https://www.bilibili.com/", |
| } |
|
|
| XHS_HEADERS_UA = { |
| "User-Agent": ( |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " |
| "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15" |
| ), |
| "Referer": "https://www.xiaohongshu.com/", |
| } |
|
|
| DOUYIN_HEADERS = { |
| "User-Agent": ( |
| "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) " |
| "AppleWebKit/605.1.15 (KHTML, like Gecko) " |
| "Version/16.6 Mobile/15E148 Safari/604.1" |
| ), |
| "Referer": "https://www.douyin.com/", |
| } |
|
|
| def pick_headers(url: str): |
| if "xiaohongshu.com" in url or "xhslink.com" in url: |
| return XHS_HEADERS_UA |
| if "douyin.com" in url or "iesdouyin.com" in url: |
| return DOUYIN_HEADERS |
| return DEFAULT_HEADERS |
|
|
| |
| def _download_direct(url: str, workdir: str, referer: str = "") -> str: |
| fp = os.path.join(workdir, f"{uuid.uuid4().hex}.mp4") |
| headers = {"User-Agent": DOUYIN_HEADERS["User-Agent"]} |
| if referer: |
| headers["Referer"] = referer |
| with requests.get(url, headers=headers, stream=True, timeout=300) as r: |
| r.raise_for_status() |
| with open(fp, "wb") as f: |
| for chunk in r.iter_content(1024 * 256): |
| if chunk: |
| f.write(chunk) |
| return fp |
|
|
| |
| def _ensure_h264_aac(path: str) -> str: |
| fixed = path + ".fast.mp4" |
| try: |
| subprocess.run( |
| ["ffmpeg", "-y", "-i", path, "-c", "copy", |
| "-movflags", "+faststart", fixed], |
| check=True, timeout=600, |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, |
| ) |
| return fixed |
| except Exception as e: |
| print("remux failed, use original:", e) |
| return path |
|
|
| |
| def _probe_video_meta(path: str) -> dict: |
| try: |
| out = subprocess.check_output( |
| ["ffprobe", "-v", "error", |
| "-select_streams", "v:0", |
| "-show_entries", "stream=width,height", |
| "-show_entries", "format=duration", |
| "-of", "json", path], |
| timeout=30, |
| ) |
| meta = json.loads(out) |
| vs = (meta.get("streams") or [{}])[0] |
| fmt = meta.get("format") or {} |
| return { |
| "width": int(vs.get("width") or 0), |
| "height": int(vs.get("height") or 0), |
| "duration": float(fmt.get("duration") or 0), |
| } |
| except Exception as e: |
| print("probe meta failed:", e) |
| return {"width": 0, "height": 0, "duration": 0} |
|
|
| |
| def _extract_thumbnail(path: str, workdir: str) -> str: |
| thumb = os.path.join(workdir, "thumb.jpg") |
| try: |
| subprocess.run( |
| ["ffmpeg", "-y", "-i", path, "-ss", "00:00:01", |
| "-vframes", "1", "-vf", "scale=320:-1", "-q:v", "5", thumb], |
| check=True, timeout=30, |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, |
| ) |
| if os.path.exists(thumb) and os.path.getsize(thumb) > 0: |
| return thumb |
| except Exception: |
| pass |
| return "" |
|
|
| |
| def download_video(url: str, workdir: str): |
| ydl_opts = { |
| "outtmpl": os.path.join(workdir, "%(id).80s.%(ext)s"), |
| "format": "bv*+ba/b", |
| "format_sort": ["res", "fps", "br", "size"], |
| "merge_output_format": "mp4", |
| "quiet": True, |
| "no_warnings": True, |
| "noplaylist": True, |
| "retries": 3, |
| "concurrent_fragment_downloads": 8, |
| "http_chunk_size": 10 * 1024 * 1024, |
| "external_downloader": "aria2c", |
| "external_downloader_args": { |
| "aria2c": [ |
| "-x", "16", "-s", "16", "-k", "1M", |
| "--min-split-size=1M", |
| "--max-connection-per-server=16", |
| "--summary-interval=0", |
| "--console-log-level=warn", |
| ], |
| }, |
| "http_headers": pick_headers(url), |
| } |
| cookie_file = pick_cookie(url) |
| if cookie_file: |
| ydl_opts["cookiefile"] = cookie_file |
|
|
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| info = ydl.extract_info(url, download=True) |
| if "entries" in info and info["entries"]: |
| info = info["entries"][0] |
| fp = ydl.prepare_filename(info) |
| base, _ = os.path.splitext(fp) |
| if os.path.exists(base + ".mp4"): |
| fp = base + ".mp4" |
|
|
| fp = _ensure_h264_aac(fp) |
| return fp, info |
|
|
| |
| def _extract_tags_from_info(info: dict) -> list: |
| tags = [] |
| for t in (info.get("tags") or []): |
| if isinstance(t, str): |
| tag = t.strip() |
| if tag and not tag.startswith("#"): |
| tag = f"#{tag}" |
| if tag: |
| tags.append(tag) |
| return _clean_tags_generic(tags[:15]) |
|
|
| |
| def _build_caption(title, tags, uploader, uploader_url, sender, url, |
| platform="", duration=0, width=0, height=0, filesize=0): |
| esc = telebot.util.escape |
| parts = [] |
|
|
| |
| quote_lines = [] |
| if title: |
| quote_lines.append(esc(title)) |
| if tags: |
| tag_str = " ".join(tags) |
| quote_lines.append(esc(tag_str)) |
| if quote_lines: |
| parts.append(f"<blockquote>{chr(10).join(quote_lines)}</blockquote>") |
|
|
| |
| meta = [] |
| plat = PLATFORM_EMOJI.get(platform, "") |
| if plat: |
| meta.append(plat) |
| res = _format_resolution(width, height) |
| if res: |
| meta.append(res) |
| dur = _format_duration(duration) |
| if dur: |
| meta.append(dur) |
| if filesize: |
| meta.append(_format_filesize(filesize)) |
| if meta: |
| parts.append(" · ".join(meta)) |
|
|
| |
| if uploader: |
| if uploader_url: |
| parts.append(f"👤 <a href='{uploader_url}'>{esc(uploader)}</a>") |
| else: |
| parts.append(f"👤 {esc(uploader)}") |
|
|
| |
| if sender: |
| parts.append(f"📤 来自 {esc(sender)}") |
|
|
| |
| parts.append(f"🔗 <a href='{url}'>原链接</a>") |
|
|
| return "\n".join(parts)[:1024] |
|
|
| |
| def _make_buttons(url: str): |
| kb = InlineKeyboardMarkup() |
| url_hash = _cache_key(url) |
| kb.row( |
| InlineKeyboardButton("🗑 删除", callback_data="del"), |
| InlineKeyboardButton("🔄 重试", callback_data=f"retry|{url_hash}"), |
| ) |
| return kb |
|
|
| _url_map = {} |
|
|
| |
| def _update_status(status, text): |
| if not status: |
| return |
| try: |
| bot.edit_message_text(text, status.chat.id, status.message_id) |
| except Exception: |
| pass |
|
|
| |
| def send_video_auto(chat_id, fp, caption, duration, width, height, |
| thumb_path="", buttons=None): |
| duration = int(duration) if duration else 0 |
| width = int(width) if width else 0 |
| height = int(height) if height else 0 |
| size = os.path.getsize(fp) |
|
|
| thumb_file = None |
| if thumb_path and os.path.exists(thumb_path): |
| thumb_file = open(thumb_path, "rb") |
|
|
| sent = None |
| try: |
| if size <= BOT_API_LIMIT: |
| with open(fp, "rb") as f: |
| sent = bot.send_video( |
| chat_id, f, |
| caption=caption, |
| supports_streaming=True, |
| duration=duration or None, |
| width=width or None, |
| height=height or None, |
| thumbnail=thumb_file, |
| reply_markup=buttons, |
| timeout=600, |
| ) |
| else: |
| if size > MTPROTO_LIMIT: |
| raise RuntimeError(f"文件 {size / 1024 / 1024:.1f}MB 超过 2GB 上限") |
| send_big_video( |
| chat_id, fp, caption=caption, |
| duration=duration, width=width, height=height, |
| thumbnail=thumb_path if thumb_path and os.path.exists(thumb_path) else None, |
| ) |
| finally: |
| if thumb_file: |
| thumb_file.close() |
|
|
| return sent |
|
|
| |
| def _send_images(chat_id, images, caption, buttons=None): |
| try: |
| media = [telebot.types.InputMediaPhoto(u) for u in images[:10]] |
| media[0].caption = caption |
| media[0].parse_mode = "HTML" |
| bot.send_media_group(chat_id, media, timeout=120) |
| except Exception as e: |
| print("bot api send_media_group failed, fallback mtproto:", e) |
| send_photos_mtproto(chat_id, images, caption) |
|
|
| |
| @bot.message_handler(commands=["start", "help"]) |
| def cmd_start(m): |
| bot.reply_to( |
| m, |
| "发送 B站 / 抖音 / YouTube / TikTok / 小红书 / 微博 / 快手 等链接," |
| "自动解析并发送视频/图集(最大 2GB)。\n\n" |
| "群组中请关闭隐私模式或将我设为管理员。", |
| ) |
|
|
| @bot.message_handler(commands=["ping"]) |
| def cmd_ping(m): |
| bot.reply_to(m, "pong 🏓") |
|
|
| |
| @bot.callback_query_handler(func=lambda call: call.data == "del") |
| def cb_delete(call): |
| try: |
| bot.delete_message(call.message.chat.id, call.message.message_id) |
| except Exception: |
| pass |
| bot.answer_callback_query(call.id, "已删除 ✓") |
|
|
| @bot.callback_query_handler(func=lambda call: call.data.startswith("retry|")) |
| def cb_retry(call): |
| url_hash = call.data.split("|", 1)[1] |
| url = _url_map.get(url_hash, "") |
| if not url: |
| bot.answer_callback_query(call.id, "链接已过期,请重新发送") |
| return |
| bot.answer_callback_query(call.id, "重新解析中...") |
| if url_hash in _cache: |
| del _cache[url_hash] |
| try: |
| bot.delete_message(call.message.chat.id, call.message.message_id) |
| except Exception: |
| pass |
| executor.submit(_process_link, call.message.chat.id, |
| call.from_user, url, None) |
|
|
| |
| def _process_link(chat_id, from_user, url, original_msg_id): |
| platform = _detect_platform(url) |
|
|
| |
| cached = _get_cache(url) |
| if cached and cached.get("file_id"): |
| try: |
| bot.send_video( |
| chat_id, |
| cached["file_id"], |
| caption=cached.get("caption", ""), |
| supports_streaming=True, |
| reply_markup=_make_buttons(url), |
| timeout=60, |
| ) |
| if original_msg_id: |
| try: |
| bot.delete_message(chat_id, original_msg_id) |
| except Exception: |
| pass |
| return |
| except Exception: |
| pass |
|
|
| status = None |
| try: |
| status_msg = bot.send_message(chat_id, "🔍 正在解析...") |
| status = status_msg |
| except Exception as e: |
| print("send status failed:", e) |
|
|
| try: |
| with tempfile.TemporaryDirectory(dir="/tmp/dl") as tmp: |
| fp = None |
| info = {} |
| tags = [] |
|
|
| is_xhs = "xiaohongshu.com" in url or "xhslink.com" in url |
| is_dy = "douyin.com" in url or "iesdouyin.com" in url |
|
|
| |
| if is_xhs: |
| _update_status(status, "🔍 正在解析小红书...") |
| try: |
| data = extract_xhs(url, _read_cookie_header("XHS_COOKIES")) |
| except Exception as e: |
| print("xhs extract failed:", e) |
| data = None |
|
|
| if data and data.get("type") == "video" and data.get("video_url"): |
| _update_status(status, "⏬ 正在下载视频...") |
| fp = _download_direct( |
| data["video_url"], tmp, |
| referer="https://www.xiaohongshu.com/", |
| ) |
| fp = _ensure_h264_aac(fp) |
| info = { |
| "title": data.get("title", ""), |
| "uploader": data.get("author", "") or "小红书", |
| "uploader_url": data.get("author_url", ""), |
| } |
| tags = data.get("tags", []) |
| elif data and data.get("type") == "image" and data.get("images"): |
| _update_status(status, "🖼 正在发送图集...") |
| sender = "" |
| if from_user: |
| sender = (getattr(from_user, "full_name", "") or |
| getattr(from_user, "username", "") or "").strip() |
| cap = _build_caption( |
| data.get("title", ""), data.get("tags", []), |
| data.get("author", "") or "小红书", |
| data.get("author_url", ""), |
| sender, url, |
| platform=platform, |
| ) |
| _send_images(chat_id, data["images"], cap) |
| try: |
| if status: |
| bot.delete_message(status.chat.id, status.message_id) |
| except Exception: |
| pass |
| if original_msg_id: |
| try: |
| bot.delete_message(chat_id, original_msg_id) |
| except Exception: |
| pass |
| return |
|
|
| |
| if fp is None and is_dy: |
| _update_status(status, "🔍 正在解析抖音...") |
| try: |
| data = extract_douyin(url, _read_cookie_header("DOUYIN_COOKIES")) |
| except Exception as e: |
| print("douyin extract failed:", e) |
| data = None |
|
|
| if data and data.get("type") == "image" and data.get("images"): |
| _update_status(status, "🖼 正在发送图集...") |
| sender = "" |
| if from_user: |
| sender = (getattr(from_user, "full_name", "") or |
| getattr(from_user, "username", "") or "").strip() |
| cap = _build_caption( |
| data.get("title", ""), data.get("tags", []), |
| data.get("author", "") or "抖音", |
| data.get("author_url", ""), |
| sender, url, platform=platform, |
| ) |
| _send_images(chat_id, data["images"], cap) |
| try: |
| if status: |
| bot.delete_message(status.chat.id, status.message_id) |
| except Exception: |
| pass |
| if original_msg_id: |
| try: |
| bot.delete_message(chat_id, original_msg_id) |
| except Exception: |
| pass |
| return |
|
|
| if data and data.get("type") == "video" and data.get("video_url"): |
| _update_status(status, "⏬ 正在下载视频...") |
| try: |
| fp = _download_direct( |
| data["video_url"], tmp, |
| referer="https://www.douyin.com/", |
| ) |
| fp = _ensure_h264_aac(fp) |
| info = { |
| "title": data.get("title", ""), |
| "uploader": data.get("author", "") or "抖音", |
| "uploader_url": data.get("author_url", ""), |
| } |
| tags = data.get("tags", []) |
| except Exception as e: |
| print("douyin direct download failed, fallback yt-dlp:", e) |
| fp = None |
|
|
| |
| if fp is None: |
| plat_name = PLATFORM_EMOJI.get(platform, "").split(" ")[-1] if platform else "视频" |
| _update_status(status, f"⏬ 正在下载{plat_name}...") |
| fp, info = download_video(url, tmp) |
| tags = _extract_tags_from_info(info) |
|
|
| if not os.path.exists(fp): |
| raise RuntimeError("下载完成但未找到文件") |
|
|
| filesize = os.path.getsize(fp) |
| if filesize > MTPROTO_LIMIT: |
| raise RuntimeError(f"文件 {filesize / 1024 / 1024:.1f}MB 超过 2GB 上限") |
|
|
| title = (info.get("title") or "").strip() |
| uploader = (info.get("uploader") or info.get("channel") or "").strip() |
| uploader_url = (info.get("uploader_url") or info.get("channel_url") or "").strip() |
| duration = info.get("duration") or 0 |
| width = info.get("width") or 0 |
| height = info.get("height") or 0 |
|
|
| if not width or not height or not duration: |
| probed = _probe_video_meta(fp) |
| width = width or probed["width"] |
| height = height or probed["height"] |
| duration = duration or probed["duration"] |
|
|
| sender = "" |
| if from_user: |
| sender = (getattr(from_user, "full_name", "") or |
| getattr(from_user, "username", "") or "").strip() |
|
|
| caption = _build_caption( |
| title, tags, uploader, uploader_url, sender, url, |
| platform=platform, duration=duration, |
| width=width, height=height, filesize=filesize, |
| ) |
|
|
| _update_status(status, "📤 正在上传到 Telegram...") |
| thumb_path = _extract_thumbnail(fp, tmp) |
|
|
| url_hash = _cache_key(url) |
| _url_map[url_hash] = url |
|
|
| buttons = _make_buttons(url) |
| sent = send_video_auto( |
| chat_id, fp, caption, duration, width, height, |
| thumb_path=thumb_path, buttons=buttons, |
| ) |
|
|
| if sent and hasattr(sent, "video") and sent.video: |
| _set_cache(url, sent.video.file_id, caption) |
|
|
| try: |
| if status: |
| bot.delete_message(status.chat.id, status.message_id) |
| except Exception: |
| pass |
| if original_msg_id: |
| try: |
| bot.delete_message(chat_id, original_msg_id) |
| except Exception: |
| pass |
|
|
| except Exception as e: |
| traceback.print_exc() |
| err = str(e) |
| if len(err) > 300: |
| err = err[:300] + "..." |
| try: |
| if status: |
| bot.edit_message_text( |
| f"❌ 解析失败:{err}", |
| status.chat.id, status.message_id, |
| ) |
| else: |
| bot.send_message(chat_id, f"❌ 解析失败:{err}") |
| except Exception: |
| pass |
|
|
| |
| @bot.message_handler(func=lambda m: bool(extract_supported_url(m.text or m.caption or "")), |
| content_types=["text"]) |
| def handle_link(m): |
| url = extract_supported_url(m.text or "") |
| if not url: |
| return |
| if any(h in url for h in SHORT_LINK_HOSTS): |
| url = _expand_short_url(url) |
| url = _normalize_url(url) |
| executor.submit(_process_link, m.chat.id, m.from_user, url, m.message_id) |
|
|
| |
| def _delete_webhook(): |
| try: |
| u = apihelper.API_URL.format(BOT_TOKEN, "deleteWebhook") |
| requests.get(u, params={"drop_pending_updates": "true"}, timeout=10) |
| print("deleteWebhook done") |
| except Exception as e: |
| print("deleteWebhook failed:", e) |
|
|
| def start_bot(): |
| print("Bot starting...") |
| _delete_webhook() |
| while True: |
| try: |
| bot.infinity_polling(timeout=30, long_polling_timeout=30, skip_pending=True) |
| except telebot.apihelper.ApiTelegramException as e: |
| if "Conflict" in str(e): |
| print("409 conflict, sleep 15s and retry...") |
| time.sleep(15) |
| continue |
| print("telegram api error:", e) |
| time.sleep(5) |
| except Exception as e: |
| print("polling error:", e) |
| time.sleep(5) |
|
|