niknok / bot.py
ljx77qaq's picture
Upload 4 files
b0b698f verified
import os
import re
import time
import json
import uuid
import hashlib
import pathlib
import tempfile
import traceback
import subprocess
from concurrent.futures import ThreadPoolExecutor
import requests
import telebot
from telebot import apihelper
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
import yt_dlp
from xhs_extract import extract_xhs
from douyin_extract import extract_douyin
from big_send import send_big_video, send_photos_mtproto
# ========== 反代 ==========
apihelper.API_URL = "https://nine7.linlizhi0210.workers.dev/bot{0}/{1}"
apihelper.FILE_URL = "https://nine7.linlizhi0210.workers.dev/file/bot{0}/{1}"
# ========== telebot 网络重试 ==========
apihelper.RETRY_ON_ERROR = True
apihelper.RETRY_TIMEOUT = 3
apihelper.CONNECT_TIMEOUT = 15
apihelper.READ_TIMEOUT = 30
# ========== 基本配置 ==========
BOT_TOKEN = os.environ["BOT_TOKEN"]
BOT_API_LIMIT = 50 * 1024 * 1024
MTPROTO_LIMIT = 2000 * 1024 * 1024
bot = telebot.TeleBot(BOT_TOKEN, parse_mode="HTML")
# ========== 并发处理 ==========
executor = ThreadPoolExecutor(max_workers=3)
# ========== 防重复缓存 ==========
_cache = {}
CACHE_TTL = 3600
def _cache_key(url: str) -> str:
return hashlib.md5(url.encode()).hexdigest()
def _get_cache(url: str):
k = _cache_key(url)
if k in _cache and time.time() - _cache[k]["ts"] < CACHE_TTL:
return _cache[k]
if k in _cache:
del _cache[k]
return None
def _set_cache(url: str, file_id: str, caption: str):
k = _cache_key(url)
_cache[k] = {"ts": time.time(), "file_id": file_id, "caption": caption}
now = time.time()
expired = [key for key, val in _cache.items() if now - val["ts"] > CACHE_TTL]
for key in expired:
del _cache[key]
# ========== Cookies 处理 ==========
COOKIE_DIR = pathlib.Path("/tmp/cookies")
COOKIE_DIR.mkdir(parents=True, exist_ok=True)
NETSCAPE_HEADER = "# Netscape HTTP Cookie File\n"
DEFAULT_DOMAIN = {
"bilibili": ".bilibili.com",
"douyin": ".douyin.com",
"youtube": ".youtube.com",
"xhs": ".xiaohongshu.com",
"tiktok": ".tiktok.com",
"weibo": ".weibo.com",
"kuaishou": ".kuaishou.com",
"ph": ".pornhub.com",
}
def _extract_cookie_line(raw: str) -> str:
for line in raw.splitlines():
s = line.strip()
if s.lower().startswith("cookie:"):
return s.split(":", 1)[1].strip()
return raw.strip()
def _to_netscape(raw: str, domain: str) -> str:
raw = raw.strip()
if raw.startswith("# Netscape") or "\tTRUE\t" in raw or "\tFALSE\t" in raw:
if not raw.startswith("# Netscape"):
raw = NETSCAPE_HEADER + raw
return raw + "\n"
cookie_str = _extract_cookie_line(raw)
lines = [NETSCAPE_HEADER.rstrip()]
expire = int(time.time()) + 3600 * 24 * 30
for pair in cookie_str.split(";"):
pair = pair.strip()
if not pair or "=" not in pair:
continue
k, v = pair.split("=", 1)
k, v = k.strip(), v.strip()
if not k:
continue
lines.append(f"{domain}\tTRUE\t/\tFALSE\t{expire}\t{k}\t{v}")
return "\n".join(lines) + "\n"
def _dump_cookie(name: str, env: str):
data = os.environ.get(env)
if not data:
return None
if name == "douyin":
domains = [".douyin.com", ".iesdouyin.com", ".snssdk.com"]
else:
domains = [DEFAULT_DOMAIN.get(name, f".{name}.com")]
p = COOKIE_DIR / f"{name}.txt"
parts = []
for d in domains:
text = _to_netscape(data, d)
if parts:
text = text.replace(NETSCAPE_HEADER, "", 1)
parts.append(text)
p.write_text("".join(parts))
return str(p)
COOKIES = {
"bilibili.com": _dump_cookie("bilibili", "BILI_COOKIES"),
"b23.tv": _dump_cookie("bilibili", "BILI_COOKIES"),
"douyin.com": _dump_cookie("douyin", "DOUYIN_COOKIES"),
"iesdouyin.com": _dump_cookie("douyin", "DOUYIN_COOKIES"),
"youtube.com": _dump_cookie("youtube", "YT_COOKIES"),
"youtu.be": _dump_cookie("youtube", "YT_COOKIES"),
"xiaohongshu.com": _dump_cookie("xhs", "XHS_COOKIES"),
"xhslink.com": _dump_cookie("xhs", "XHS_COOKIES"),
"tiktok.com": _dump_cookie("tiktok", "TIKTOK_COOKIES"),
"weibo.com": _dump_cookie("weibo", "WEIBO_COOKIES"),
"kuaishou.com": _dump_cookie("kuaishou", "KS_COOKIES"),
"pornhub.com": _dump_cookie("ph", "PH_COOKIES"),
}
def pick_cookie(url: str):
for host, path in COOKIES.items():
if host in url and path:
return path
return None
def _read_cookie_header(env: str) -> str:
raw = os.environ.get(env, "")
if not raw:
return ""
for line in raw.splitlines():
s = line.strip()
if s.lower().startswith("cookie:"):
return s.split(":", 1)[1].strip()
if "=" in raw and ";" in raw:
return raw.strip()
return ""
# ========== 平台识别 ==========
PLATFORM_EMOJI = {
"douyin": "🎵 抖音",
"bilibili": "📺 B站",
"youtube": "▶️ YouTube",
"tiktok": "🎵 TikTok",
"xhs": "📕 小红书",
"weibo": "🔥 微博",
"kuaishou": "⚡ 快手",
"twitter": "🐦 X",
"instagram": "📷 Instagram",
}
def _detect_platform(url: str) -> str:
if "douyin.com" in url or "iesdouyin.com" in url:
return "douyin"
if "bilibili.com" in url or "b23.tv" in url:
return "bilibili"
if "youtube.com" in url or "youtu.be" in url:
return "youtube"
if "tiktok.com" in url:
return "tiktok"
if "xiaohongshu.com" in url or "xhslink.com" in url:
return "xhs"
if "weibo.com" in url:
return "weibo"
if "kuaishou.com" in url:
return "kuaishou"
if "twitter.com" in url or "x.com" in url:
return "twitter"
if "instagram.com" in url:
return "instagram"
return ""
# ========== 格式化工具 ==========
def _format_duration(sec):
sec = int(sec)
if sec <= 0:
return ""
if sec < 60:
return f"0:{sec:02d}"
if sec < 3600:
return f"{sec // 60}:{sec % 60:02d}"
return f"{sec // 3600}:{(sec % 3600) // 60:02d}:{sec % 60:02d}"
def _format_resolution(w, h):
if not w or not h:
return ""
short = min(w, h)
if short >= 2160: return "4K"
if short >= 1440: return "2K"
if short >= 1080: return "1080p"
if short >= 720: return "720p"
if short >= 480: return "480p"
return f"{w}×{h}"
def _format_filesize(size_bytes):
if size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.0f}KB"
if size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / 1024 / 1024:.1f}MB"
return f"{size_bytes / 1024 / 1024 / 1024:.2f}GB"
# ========== 标签清理(通用) ==========
def _clean_tags_generic(tags: list) -> list:
cleaned = []
for t in tags:
t = t.strip()
t = re.sub(r"\[.*?\]", "", t) # 去 [话题] [收藏] 等
t = t.rstrip("#").strip() # 去尾部 #
if t and not t.startswith("#"):
t = f"#{t}"
if t and t != "#":
cleaned.append(t)
return cleaned
# ========== URL 识别 ==========
URL_RE = re.compile(r"https?://[^\s]+", re.IGNORECASE)
SUPPORTED_HINTS = (
"bilibili.com", "b23.tv",
"douyin.com", "iesdouyin.com",
"tiktok.com",
"pornhub.com",
"youtube.com", "youtu.be",
"xiaohongshu.com", "xhslink.com",
"kuaishou.com",
"weibo.com",
"twitter.com", "x.com",
"instagram.com",
"facebook.com", "fb.watch",
)
SHORT_LINK_HOSTS = (
"xhslink.com", "xhs.link",
"b23.tv",
"v.douyin.com",
"vt.tiktok.com", "vm.tiktok.com",
"v.kuaishou.com",
)
def extract_supported_url(text: str):
if not text:
return None
for m in URL_RE.finditer(text):
url = m.group(0).rstrip("))),,。.!!??")
if any(h in url for h in SUPPORTED_HINTS):
return url
return None
def _expand_short_url(url: str) -> str:
try:
r = requests.head(
url, allow_redirects=True, timeout=10,
headers={
"User-Agent":
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/16.6 Mobile/15E148 Safari/604.1"
},
)
return r.url or url
except Exception as e:
print("expand short url failed:", e)
return url
def _normalize_url(url: str) -> str:
m = re.search(r"iesdouyin\.com/share/video/(\d+)", url)
if m:
return f"https://www.douyin.com/video/{m.group(1)}"
m = re.search(r"douyin\.com/share/video/(\d+)", url)
if m:
return f"https://www.douyin.com/video/{m.group(1)}"
m = re.search(r"iesdouyin\.com/share/user/[^?]+\?.*?aweme_id=(\d+)", url)
if m:
return f"https://www.douyin.com/video/{m.group(1)}"
return url
# ========== HTTP Headers ==========
DEFAULT_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Referer": "https://www.bilibili.com/",
}
XHS_HEADERS_UA = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15"
),
"Referer": "https://www.xiaohongshu.com/",
}
DOUYIN_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Version/16.6 Mobile/15E148 Safari/604.1"
),
"Referer": "https://www.douyin.com/",
}
def pick_headers(url: str):
if "xiaohongshu.com" in url or "xhslink.com" in url:
return XHS_HEADERS_UA
if "douyin.com" in url or "iesdouyin.com" in url:
return DOUYIN_HEADERS
return DEFAULT_HEADERS
# ========== 直链下载 ==========
def _download_direct(url: str, workdir: str, referer: str = "") -> str:
fp = os.path.join(workdir, f"{uuid.uuid4().hex}.mp4")
headers = {"User-Agent": DOUYIN_HEADERS["User-Agent"]}
if referer:
headers["Referer"] = referer
with requests.get(url, headers=headers, stream=True, timeout=300) as r:
r.raise_for_status()
with open(fp, "wb") as f:
for chunk in r.iter_content(1024 * 256):
if chunk:
f.write(chunk)
return fp
# ========== 只 remux 不转码 ==========
def _ensure_h264_aac(path: str) -> str:
fixed = path + ".fast.mp4"
try:
subprocess.run(
["ffmpeg", "-y", "-i", path, "-c", "copy",
"-movflags", "+faststart", fixed],
check=True, timeout=600,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
return fixed
except Exception as e:
print("remux failed, use original:", e)
return path
# ========== ffprobe ==========
def _probe_video_meta(path: str) -> dict:
try:
out = subprocess.check_output(
["ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height",
"-show_entries", "format=duration",
"-of", "json", path],
timeout=30,
)
meta = json.loads(out)
vs = (meta.get("streams") or [{}])[0]
fmt = meta.get("format") or {}
return {
"width": int(vs.get("width") or 0),
"height": int(vs.get("height") or 0),
"duration": float(fmt.get("duration") or 0),
}
except Exception as e:
print("probe meta failed:", e)
return {"width": 0, "height": 0, "duration": 0}
# ========== 缩略图 ==========
def _extract_thumbnail(path: str, workdir: str) -> str:
thumb = os.path.join(workdir, "thumb.jpg")
try:
subprocess.run(
["ffmpeg", "-y", "-i", path, "-ss", "00:00:01",
"-vframes", "1", "-vf", "scale=320:-1", "-q:v", "5", thumb],
check=True, timeout=30,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
if os.path.exists(thumb) and os.path.getsize(thumb) > 0:
return thumb
except Exception:
pass
return ""
# ========== yt-dlp 下载 ==========
def download_video(url: str, workdir: str):
ydl_opts = {
"outtmpl": os.path.join(workdir, "%(id).80s.%(ext)s"),
"format": "bv*+ba/b",
"format_sort": ["res", "fps", "br", "size"],
"merge_output_format": "mp4",
"quiet": True,
"no_warnings": True,
"noplaylist": True,
"retries": 3,
"concurrent_fragment_downloads": 8,
"http_chunk_size": 10 * 1024 * 1024,
"external_downloader": "aria2c",
"external_downloader_args": {
"aria2c": [
"-x", "16", "-s", "16", "-k", "1M",
"--min-split-size=1M",
"--max-connection-per-server=16",
"--summary-interval=0",
"--console-log-level=warn",
],
},
"http_headers": pick_headers(url),
}
cookie_file = pick_cookie(url)
if cookie_file:
ydl_opts["cookiefile"] = cookie_file
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
if "entries" in info and info["entries"]:
info = info["entries"][0]
fp = ydl.prepare_filename(info)
base, _ = os.path.splitext(fp)
if os.path.exists(base + ".mp4"):
fp = base + ".mp4"
fp = _ensure_h264_aac(fp)
return fp, info
# ========== 从 yt-dlp info 提取标签 ==========
def _extract_tags_from_info(info: dict) -> list:
tags = []
for t in (info.get("tags") or []):
if isinstance(t, str):
tag = t.strip()
if tag and not tag.startswith("#"):
tag = f"#{tag}"
if tag:
tags.append(tag)
return _clean_tags_generic(tags[:15])
# ========== 构建 caption ==========
def _build_caption(title, tags, uploader, uploader_url, sender, url,
platform="", duration=0, width=0, height=0, filesize=0):
esc = telebot.util.escape
parts = []
# 文案 + 标签(同一个 blockquote)
quote_lines = []
if title:
quote_lines.append(esc(title))
if tags:
tag_str = " ".join(tags)
quote_lines.append(esc(tag_str))
if quote_lines:
parts.append(f"<blockquote>{chr(10).join(quote_lines)}</blockquote>")
# 平台 · 分辨率 · 时长 · 大小
meta = []
plat = PLATFORM_EMOJI.get(platform, "")
if plat:
meta.append(plat)
res = _format_resolution(width, height)
if res:
meta.append(res)
dur = _format_duration(duration)
if dur:
meta.append(dur)
if filesize:
meta.append(_format_filesize(filesize))
if meta:
parts.append(" · ".join(meta))
# 作者
if uploader:
if uploader_url:
parts.append(f"👤 <a href='{uploader_url}'>{esc(uploader)}</a>")
else:
parts.append(f"👤 {esc(uploader)}")
# 来自
if sender:
parts.append(f"📤 来自 {esc(sender)}")
# 原链接
parts.append(f"🔗 <a href='{url}'>原链接</a>")
return "\n".join(parts)[:1024]
# ========== 按钮 ==========
def _make_buttons(url: str):
kb = InlineKeyboardMarkup()
url_hash = _cache_key(url)
kb.row(
InlineKeyboardButton("🗑 删除", callback_data="del"),
InlineKeyboardButton("🔄 重试", callback_data=f"retry|{url_hash}"),
)
return kb
_url_map = {}
# ========== 状态更新 ==========
def _update_status(status, text):
if not status:
return
try:
bot.edit_message_text(text, status.chat.id, status.message_id)
except Exception:
pass
# ========== 统一发送 ==========
def send_video_auto(chat_id, fp, caption, duration, width, height,
thumb_path="", buttons=None):
duration = int(duration) if duration else 0
width = int(width) if width else 0
height = int(height) if height else 0
size = os.path.getsize(fp)
thumb_file = None
if thumb_path and os.path.exists(thumb_path):
thumb_file = open(thumb_path, "rb")
sent = None
try:
if size <= BOT_API_LIMIT:
with open(fp, "rb") as f:
sent = bot.send_video(
chat_id, f,
caption=caption,
supports_streaming=True,
duration=duration or None,
width=width or None,
height=height or None,
thumbnail=thumb_file,
reply_markup=buttons,
timeout=600,
)
else:
if size > MTPROTO_LIMIT:
raise RuntimeError(f"文件 {size / 1024 / 1024:.1f}MB 超过 2GB 上限")
send_big_video(
chat_id, fp, caption=caption,
duration=duration, width=width, height=height,
thumbnail=thumb_path if thumb_path and os.path.exists(thumb_path) else None,
)
finally:
if thumb_file:
thumb_file.close()
return sent
# ========== 图集发送 ==========
def _send_images(chat_id, images, caption, buttons=None):
try:
media = [telebot.types.InputMediaPhoto(u) for u in images[:10]]
media[0].caption = caption
media[0].parse_mode = "HTML"
bot.send_media_group(chat_id, media, timeout=120)
except Exception as e:
print("bot api send_media_group failed, fallback mtproto:", e)
send_photos_mtproto(chat_id, images, caption)
# ========== 命令 ==========
@bot.message_handler(commands=["start", "help"])
def cmd_start(m):
bot.reply_to(
m,
"发送 B站 / 抖音 / YouTube / TikTok / 小红书 / 微博 / 快手 等链接,"
"自动解析并发送视频/图集(最大 2GB)。\n\n"
"群组中请关闭隐私模式或将我设为管理员。",
)
@bot.message_handler(commands=["ping"])
def cmd_ping(m):
bot.reply_to(m, "pong 🏓")
# ========== 回调按钮 ==========
@bot.callback_query_handler(func=lambda call: call.data == "del")
def cb_delete(call):
try:
bot.delete_message(call.message.chat.id, call.message.message_id)
except Exception:
pass
bot.answer_callback_query(call.id, "已删除 ✓")
@bot.callback_query_handler(func=lambda call: call.data.startswith("retry|"))
def cb_retry(call):
url_hash = call.data.split("|", 1)[1]
url = _url_map.get(url_hash, "")
if not url:
bot.answer_callback_query(call.id, "链接已过期,请重新发送")
return
bot.answer_callback_query(call.id, "重新解析中...")
if url_hash in _cache:
del _cache[url_hash]
try:
bot.delete_message(call.message.chat.id, call.message.message_id)
except Exception:
pass
executor.submit(_process_link, call.message.chat.id,
call.from_user, url, None)
# ========== 核心处理 ==========
def _process_link(chat_id, from_user, url, original_msg_id):
platform = _detect_platform(url)
# 缓存检查
cached = _get_cache(url)
if cached and cached.get("file_id"):
try:
bot.send_video(
chat_id,
cached["file_id"],
caption=cached.get("caption", ""),
supports_streaming=True,
reply_markup=_make_buttons(url),
timeout=60,
)
if original_msg_id:
try:
bot.delete_message(chat_id, original_msg_id)
except Exception:
pass
return
except Exception:
pass
status = None
try:
status_msg = bot.send_message(chat_id, "🔍 正在解析...")
status = status_msg
except Exception as e:
print("send status failed:", e)
try:
with tempfile.TemporaryDirectory(dir="/tmp/dl") as tmp:
fp = None
info = {}
tags = []
is_xhs = "xiaohongshu.com" in url or "xhslink.com" in url
is_dy = "douyin.com" in url or "iesdouyin.com" in url
# ---------- 小红书 ----------
if is_xhs:
_update_status(status, "🔍 正在解析小红书...")
try:
data = extract_xhs(url, _read_cookie_header("XHS_COOKIES"))
except Exception as e:
print("xhs extract failed:", e)
data = None
if data and data.get("type") == "video" and data.get("video_url"):
_update_status(status, "⏬ 正在下载视频...")
fp = _download_direct(
data["video_url"], tmp,
referer="https://www.xiaohongshu.com/",
)
fp = _ensure_h264_aac(fp)
info = {
"title": data.get("title", ""),
"uploader": data.get("author", "") or "小红书",
"uploader_url": data.get("author_url", ""),
}
tags = data.get("tags", [])
elif data and data.get("type") == "image" and data.get("images"):
_update_status(status, "🖼 正在发送图集...")
sender = ""
if from_user:
sender = (getattr(from_user, "full_name", "") or
getattr(from_user, "username", "") or "").strip()
cap = _build_caption(
data.get("title", ""), data.get("tags", []),
data.get("author", "") or "小红书",
data.get("author_url", ""),
sender, url,
platform=platform,
)
_send_images(chat_id, data["images"], cap)
try:
if status:
bot.delete_message(status.chat.id, status.message_id)
except Exception:
pass
if original_msg_id:
try:
bot.delete_message(chat_id, original_msg_id)
except Exception:
pass
return
# ---------- 抖音 ----------
if fp is None and is_dy:
_update_status(status, "🔍 正在解析抖音...")
try:
data = extract_douyin(url, _read_cookie_header("DOUYIN_COOKIES"))
except Exception as e:
print("douyin extract failed:", e)
data = None
if data and data.get("type") == "image" and data.get("images"):
_update_status(status, "🖼 正在发送图集...")
sender = ""
if from_user:
sender = (getattr(from_user, "full_name", "") or
getattr(from_user, "username", "") or "").strip()
cap = _build_caption(
data.get("title", ""), data.get("tags", []),
data.get("author", "") or "抖音",
data.get("author_url", ""),
sender, url, platform=platform,
)
_send_images(chat_id, data["images"], cap)
try:
if status:
bot.delete_message(status.chat.id, status.message_id)
except Exception:
pass
if original_msg_id:
try:
bot.delete_message(chat_id, original_msg_id)
except Exception:
pass
return
if data and data.get("type") == "video" and data.get("video_url"):
_update_status(status, "⏬ 正在下载视频...")
try:
fp = _download_direct(
data["video_url"], tmp,
referer="https://www.douyin.com/",
)
fp = _ensure_h264_aac(fp)
info = {
"title": data.get("title", ""),
"uploader": data.get("author", "") or "抖音",
"uploader_url": data.get("author_url", ""),
}
tags = data.get("tags", [])
except Exception as e:
print("douyin direct download failed, fallback yt-dlp:", e)
fp = None
# ---------- 兜底:yt-dlp ----------
if fp is None:
plat_name = PLATFORM_EMOJI.get(platform, "").split(" ")[-1] if platform else "视频"
_update_status(status, f"⏬ 正在下载{plat_name}...")
fp, info = download_video(url, tmp)
tags = _extract_tags_from_info(info)
if not os.path.exists(fp):
raise RuntimeError("下载完成但未找到文件")
filesize = os.path.getsize(fp)
if filesize > MTPROTO_LIMIT:
raise RuntimeError(f"文件 {filesize / 1024 / 1024:.1f}MB 超过 2GB 上限")
title = (info.get("title") or "").strip()
uploader = (info.get("uploader") or info.get("channel") or "").strip()
uploader_url = (info.get("uploader_url") or info.get("channel_url") or "").strip()
duration = info.get("duration") or 0
width = info.get("width") or 0
height = info.get("height") or 0
if not width or not height or not duration:
probed = _probe_video_meta(fp)
width = width or probed["width"]
height = height or probed["height"]
duration = duration or probed["duration"]
sender = ""
if from_user:
sender = (getattr(from_user, "full_name", "") or
getattr(from_user, "username", "") or "").strip()
caption = _build_caption(
title, tags, uploader, uploader_url, sender, url,
platform=platform, duration=duration,
width=width, height=height, filesize=filesize,
)
_update_status(status, "📤 正在上传到 Telegram...")
thumb_path = _extract_thumbnail(fp, tmp)
url_hash = _cache_key(url)
_url_map[url_hash] = url
buttons = _make_buttons(url)
sent = send_video_auto(
chat_id, fp, caption, duration, width, height,
thumb_path=thumb_path, buttons=buttons,
)
if sent and hasattr(sent, "video") and sent.video:
_set_cache(url, sent.video.file_id, caption)
try:
if status:
bot.delete_message(status.chat.id, status.message_id)
except Exception:
pass
if original_msg_id:
try:
bot.delete_message(chat_id, original_msg_id)
except Exception:
pass
except Exception as e:
traceback.print_exc()
err = str(e)
if len(err) > 300:
err = err[:300] + "..."
try:
if status:
bot.edit_message_text(
f"❌ 解析失败:{err}",
status.chat.id, status.message_id,
)
else:
bot.send_message(chat_id, f"❌ 解析失败:{err}")
except Exception:
pass
# ========== 链接入口 ==========
@bot.message_handler(func=lambda m: bool(extract_supported_url(m.text or m.caption or "")),
content_types=["text"])
def handle_link(m):
url = extract_supported_url(m.text or "")
if not url:
return
if any(h in url for h in SHORT_LINK_HOSTS):
url = _expand_short_url(url)
url = _normalize_url(url)
executor.submit(_process_link, m.chat.id, m.from_user, url, m.message_id)
# ========== 启动 ==========
def _delete_webhook():
try:
u = apihelper.API_URL.format(BOT_TOKEN, "deleteWebhook")
requests.get(u, params={"drop_pending_updates": "true"}, timeout=10)
print("deleteWebhook done")
except Exception as e:
print("deleteWebhook failed:", e)
def start_bot():
print("Bot starting...")
_delete_webhook()
while True:
try:
bot.infinity_polling(timeout=30, long_polling_timeout=30, skip_pending=True)
except telebot.apihelper.ApiTelegramException as e:
if "Conflict" in str(e):
print("409 conflict, sleep 15s and retry...")
time.sleep(15)
continue
print("telegram api error:", e)
time.sleep(5)
except Exception as e:
print("polling error:", e)
time.sleep(5)