| from flask import Flask, render_template, request, redirect, url_for, send_file, flash
|
| import os
|
| import json
|
| from datetime import datetime
|
| import pandas as pd
|
| from yt_dlp import YoutubeDL
|
|
|
| app = Flask(__name__)
|
| app.secret_key = "change_this_to_something_random"
|
|
|
| HISTORY_FILE = "history.json"
|
| OUTPUT_FILE = "output.xlsx"
|
| MAX_NEW = 30
|
|
|
|
|
| if os.environ.get("RUN_SCHEDULER", "false") == "true":
|
| scheduler = BackgroundScheduler()
|
| scheduler.add_job(run_scheduled_scrape, 'interval', hours=get_schedule_hours())
|
| scheduler.start()
|
|
|
|
|
| def load_history():
|
| with open(HISTORY_FILE, "r", encoding="utf-8") as f:
|
| return json.load(f)
|
|
|
| def save_history(history):
|
| with open(HISTORY_FILE, "w", encoding="utf-8") as f:
|
| json.dump(history, f, ensure_ascii=False, indent=2)
|
|
|
| def append_to_excel(rows):
|
| """
|
| 将新抓取的记录写入两个文件:
|
| 1. output.xlsx(累计追加)
|
| 2. output_YYYYMMDD_HHMMSS.xlsx(当前批次独立保存)
|
| """
|
| df = pd.DataFrame(rows)
|
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
| time_file = f"output_{timestamp}.xlsx"
|
|
|
|
|
| df.to_excel(time_file, index=False)
|
|
|
|
|
| if os.path.exists(OUTPUT_FILE):
|
| old = pd.read_excel(OUTPUT_FILE)
|
| combined = pd.concat([old, df], ignore_index=True)
|
| combined.to_excel(OUTPUT_FILE, index=False)
|
| else:
|
| df.to_excel(OUTPUT_FILE, index=False)
|
|
|
| return time_file
|
|
|
| def fetch_latest_videos(profile_url, max_items=10):
|
| ydl_opts = {
|
| "ignoreerrors": True,
|
| "quiet": True,
|
| "skip_download": True,
|
| }
|
| with YoutubeDL(ydl_opts) as ydl:
|
| info = ydl.extract_info(profile_url, download=False)
|
| entries = []
|
| if not info:
|
| return []
|
| if "entries" in info and info["entries"]:
|
| for e in info["entries"]:
|
| if e:
|
| entries.append(e)
|
| else:
|
| entries = [info]
|
|
|
| normalized = []
|
| for e in entries:
|
| video_id = e.get("id") or e.get("display_id") or e.get("url")
|
| title = e.get("title", "")
|
| webpage_url = e.get("webpage_url") or e.get("url")
|
| upload_date = e.get("upload_date")
|
| timestamp = e.get("timestamp")
|
| if timestamp:
|
| dt = datetime.utcfromtimestamp(timestamp)
|
| date_str = dt.strftime("%Y-%m-%d")
|
| elif upload_date:
|
| try:
|
| date_str = datetime.strptime(str(upload_date), "%Y%m%d").strftime("%Y-%m-%d")
|
| except Exception:
|
| date_str = str(upload_date)
|
| else:
|
| date_str = ""
|
| view_count = e.get("view_count") if e.get("view_count") is not None else e.get("views")
|
| like_count = e.get("like_count") if e.get("like_count") is not None else e.get("likes")
|
| normalized.append({
|
| "id": video_id,
|
| "title": title,
|
| "url": webpage_url,
|
| "date": date_str,
|
| "views": view_count,
|
| "likes": like_count
|
| })
|
|
|
| def sort_key(x):
|
| try:
|
| return datetime.strptime(x["date"], "%Y-%m-%d")
|
| except:
|
| return datetime.min
|
| normalized.sort(key=sort_key, reverse=True)
|
| return normalized[:max_items]
|
|
|
| def list_output_files():
|
| """列出当前目录下所有 output*.xlsx 文件(带时间和大小)"""
|
| files = []
|
| for f in os.listdir("."):
|
| if f.startswith("output") and f.endswith(".xlsx"):
|
| size_kb = os.path.getsize(f) / 1024
|
| mtime = datetime.fromtimestamp(os.path.getmtime(f)).strftime("%Y-%m-%d %H:%M:%S")
|
| files.append({
|
| "name": f,
|
| "mtime": mtime,
|
| "size": f"{size_kb:.1f} KB"
|
| })
|
|
|
| files.sort(key=lambda x: x["mtime"], reverse=True)
|
| return files
|
|
|
| @app.route("/", methods=["GET", "POST"])
|
| def index():
|
| if request.method == "POST":
|
| profile_url = request.form.get("profile_url", "").strip()
|
| if not profile_url:
|
| flash("请先输入 TikTok 或 YouTube 的主页链接。")
|
| return redirect(url_for("index"))
|
| try:
|
| entries = fetch_latest_videos(profile_url, max_items=MAX_NEW)
|
| except Exception as ex:
|
| flash(f"抓取失败:{ex}")
|
| return redirect(url_for("index"))
|
|
|
| if not entries:
|
| flash("未能提取到视频信息,请确认主页链接是否有效。")
|
| return redirect(url_for("index"))
|
|
|
| history = load_history()
|
| key = profile_url
|
| seen = set(history.get(key, []))
|
|
|
| new_rows = []
|
| new_ids = []
|
| skipped = 0
|
| added = 0
|
|
|
| for e in entries:
|
| vid = e.get("id") or e.get("url")
|
| if not vid:
|
| continue
|
| if vid in seen:
|
| skipped += 1
|
| continue
|
| row = {
|
| "source_profile": profile_url,
|
| "video_id": vid,
|
| "date": e.get("date", ""),
|
| "title": e.get("title", ""),
|
| "video_url": e.get("url", ""),
|
| "views": e.get("views"),
|
| "likes": e.get("likes")
|
| }
|
| new_rows.append(row)
|
| new_ids.append(vid)
|
| seen.add(vid)
|
| added += 1
|
|
|
| if new_rows:
|
| time_file = append_to_excel(new_rows)
|
| else:
|
| time_file = None
|
|
|
| history[key] = list(seen)
|
| save_history(history)
|
|
|
| if new_rows:
|
| flash(f"抓取完成:新增 {added} 条,跳过 {skipped} 条。"
|
| f"已保存至 output.xlsx 及 {time_file}。")
|
| else:
|
| flash(f"没有发现新视频,跳过 {skipped} 条。")
|
|
|
| return redirect(url_for("index"))
|
|
|
| files = list_output_files()
|
| return render_template("index.html", files=files)
|
|
|
| @app.route("/download/<filename>")
|
| def download(filename):
|
| if os.path.exists(filename):
|
| return send_file(filename, as_attachment=True)
|
| else:
|
| flash("文件不存在。")
|
| return redirect(url_for("index"))
|
|
|
| if __name__ == '__main__':
|
| app.run(host='0.0.0.0', port=7860, debug=False) |