from flask import Flask, render_template, request, redirect, url_for, send_file, flash import os import json from datetime import datetime import pandas as pd from yt_dlp import YoutubeDL app = Flask(__name__) app.secret_key = "change_this_to_something_random" HISTORY_FILE = "history.json" OUTPUT_FILE = "output.xlsx" MAX_NEW = 30 # 每次最多采集最新 30 个视频 # 确保 history 文件存在 if os.environ.get("RUN_SCHEDULER", "false") == "true": scheduler = BackgroundScheduler() scheduler.add_job(run_scheduled_scrape, 'interval', hours=get_schedule_hours()) scheduler.start() def load_history(): with open(HISTORY_FILE, "r", encoding="utf-8") as f: return json.load(f) def save_history(history): with open(HISTORY_FILE, "w", encoding="utf-8") as f: json.dump(history, f, ensure_ascii=False, indent=2) def append_to_excel(rows): """ 将新抓取的记录写入两个文件: 1. output.xlsx(累计追加) 2. output_YYYYMMDD_HHMMSS.xlsx(当前批次独立保存) """ df = pd.DataFrame(rows) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") time_file = f"output_{timestamp}.xlsx" # 写出当前批次独立文件 df.to_excel(time_file, index=False) # 同时更新主文件 output.xlsx(追加) if os.path.exists(OUTPUT_FILE): old = pd.read_excel(OUTPUT_FILE) combined = pd.concat([old, df], ignore_index=True) combined.to_excel(OUTPUT_FILE, index=False) else: df.to_excel(OUTPUT_FILE, index=False) return time_file def fetch_latest_videos(profile_url, max_items=10): ydl_opts = { "ignoreerrors": True, "quiet": True, "skip_download": True, } with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(profile_url, download=False) entries = [] if not info: return [] if "entries" in info and info["entries"]: for e in info["entries"]: if e: entries.append(e) else: entries = [info] normalized = [] for e in entries: video_id = e.get("id") or e.get("display_id") or e.get("url") title = e.get("title", "") webpage_url = e.get("webpage_url") or e.get("url") upload_date = e.get("upload_date") timestamp = e.get("timestamp") if timestamp: dt = datetime.utcfromtimestamp(timestamp) date_str = dt.strftime("%Y-%m-%d") elif upload_date: try: date_str = datetime.strptime(str(upload_date), "%Y%m%d").strftime("%Y-%m-%d") except Exception: date_str = str(upload_date) else: date_str = "" view_count = e.get("view_count") if e.get("view_count") is not None else e.get("views") like_count = e.get("like_count") if e.get("like_count") is not None else e.get("likes") normalized.append({ "id": video_id, "title": title, "url": webpage_url, "date": date_str, "views": view_count, "likes": like_count }) def sort_key(x): try: return datetime.strptime(x["date"], "%Y-%m-%d") except: return datetime.min normalized.sort(key=sort_key, reverse=True) return normalized[:max_items] def list_output_files(): """列出当前目录下所有 output*.xlsx 文件(带时间和大小)""" files = [] for f in os.listdir("."): if f.startswith("output") and f.endswith(".xlsx"): size_kb = os.path.getsize(f) / 1024 mtime = datetime.fromtimestamp(os.path.getmtime(f)).strftime("%Y-%m-%d %H:%M:%S") files.append({ "name": f, "mtime": mtime, "size": f"{size_kb:.1f} KB" }) # 按时间倒序 files.sort(key=lambda x: x["mtime"], reverse=True) return files @app.route("/", methods=["GET", "POST"]) def index(): if request.method == "POST": profile_url = request.form.get("profile_url", "").strip() if not profile_url: flash("请先输入 TikTok 或 YouTube 的主页链接。") return redirect(url_for("index")) try: entries = fetch_latest_videos(profile_url, max_items=MAX_NEW) except Exception as ex: flash(f"抓取失败:{ex}") return redirect(url_for("index")) if not entries: flash("未能提取到视频信息,请确认主页链接是否有效。") return redirect(url_for("index")) history = load_history() key = profile_url seen = set(history.get(key, [])) new_rows = [] new_ids = [] skipped = 0 added = 0 for e in entries: vid = e.get("id") or e.get("url") if not vid: continue if vid in seen: skipped += 1 continue row = { "source_profile": profile_url, "video_id": vid, "date": e.get("date", ""), "title": e.get("title", ""), "video_url": e.get("url", ""), "views": e.get("views"), "likes": e.get("likes") } new_rows.append(row) new_ids.append(vid) seen.add(vid) added += 1 if new_rows: time_file = append_to_excel(new_rows) else: time_file = None history[key] = list(seen) save_history(history) if new_rows: flash(f"抓取完成:新增 {added} 条,跳过 {skipped} 条。" f"已保存至 output.xlsx 及 {time_file}。") else: flash(f"没有发现新视频,跳过 {skipped} 条。") return redirect(url_for("index")) files = list_output_files() return render_template("index.html", files=files) @app.route("/download/") def download(filename): if os.path.exists(filename): return send_file(filename, as_attachment=True) else: flash("文件不存在。") return redirect(url_for("index")) if __name__ == '__main__': app.run(host='0.0.0.0', port=7860, debug=False)