Dk / app.py
alige's picture
Upload 8 files
5efe4e4 verified
from flask import Flask, render_template, request, redirect, url_for, send_file, flash
import os
import json
from datetime import datetime
import pandas as pd
from yt_dlp import YoutubeDL
app = Flask(__name__)
app.secret_key = "change_this_to_something_random"
HISTORY_FILE = "history.json"
OUTPUT_FILE = "output.xlsx"
MAX_NEW = 30 # 每次最多采集最新 30 个视频
# 确保 history 文件存在
if os.environ.get("RUN_SCHEDULER", "false") == "true":
scheduler = BackgroundScheduler()
scheduler.add_job(run_scheduled_scrape, 'interval', hours=get_schedule_hours())
scheduler.start()
def load_history():
with open(HISTORY_FILE, "r", encoding="utf-8") as f:
return json.load(f)
def save_history(history):
with open(HISTORY_FILE, "w", encoding="utf-8") as f:
json.dump(history, f, ensure_ascii=False, indent=2)
def append_to_excel(rows):
"""
将新抓取的记录写入两个文件:
1. output.xlsx(累计追加)
2. output_YYYYMMDD_HHMMSS.xlsx(当前批次独立保存)
"""
df = pd.DataFrame(rows)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
time_file = f"output_{timestamp}.xlsx"
# 写出当前批次独立文件
df.to_excel(time_file, index=False)
# 同时更新主文件 output.xlsx(追加)
if os.path.exists(OUTPUT_FILE):
old = pd.read_excel(OUTPUT_FILE)
combined = pd.concat([old, df], ignore_index=True)
combined.to_excel(OUTPUT_FILE, index=False)
else:
df.to_excel(OUTPUT_FILE, index=False)
return time_file
def fetch_latest_videos(profile_url, max_items=10):
ydl_opts = {
"ignoreerrors": True,
"quiet": True,
"skip_download": True,
}
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(profile_url, download=False)
entries = []
if not info:
return []
if "entries" in info and info["entries"]:
for e in info["entries"]:
if e:
entries.append(e)
else:
entries = [info]
normalized = []
for e in entries:
video_id = e.get("id") or e.get("display_id") or e.get("url")
title = e.get("title", "")
webpage_url = e.get("webpage_url") or e.get("url")
upload_date = e.get("upload_date")
timestamp = e.get("timestamp")
if timestamp:
dt = datetime.utcfromtimestamp(timestamp)
date_str = dt.strftime("%Y-%m-%d")
elif upload_date:
try:
date_str = datetime.strptime(str(upload_date), "%Y%m%d").strftime("%Y-%m-%d")
except Exception:
date_str = str(upload_date)
else:
date_str = ""
view_count = e.get("view_count") if e.get("view_count") is not None else e.get("views")
like_count = e.get("like_count") if e.get("like_count") is not None else e.get("likes")
normalized.append({
"id": video_id,
"title": title,
"url": webpage_url,
"date": date_str,
"views": view_count,
"likes": like_count
})
def sort_key(x):
try:
return datetime.strptime(x["date"], "%Y-%m-%d")
except:
return datetime.min
normalized.sort(key=sort_key, reverse=True)
return normalized[:max_items]
def list_output_files():
"""列出当前目录下所有 output*.xlsx 文件(带时间和大小)"""
files = []
for f in os.listdir("."):
if f.startswith("output") and f.endswith(".xlsx"):
size_kb = os.path.getsize(f) / 1024
mtime = datetime.fromtimestamp(os.path.getmtime(f)).strftime("%Y-%m-%d %H:%M:%S")
files.append({
"name": f,
"mtime": mtime,
"size": f"{size_kb:.1f} KB"
})
# 按时间倒序
files.sort(key=lambda x: x["mtime"], reverse=True)
return files
@app.route("/", methods=["GET", "POST"])
def index():
if request.method == "POST":
profile_url = request.form.get("profile_url", "").strip()
if not profile_url:
flash("请先输入 TikTok 或 YouTube 的主页链接。")
return redirect(url_for("index"))
try:
entries = fetch_latest_videos(profile_url, max_items=MAX_NEW)
except Exception as ex:
flash(f"抓取失败:{ex}")
return redirect(url_for("index"))
if not entries:
flash("未能提取到视频信息,请确认主页链接是否有效。")
return redirect(url_for("index"))
history = load_history()
key = profile_url
seen = set(history.get(key, []))
new_rows = []
new_ids = []
skipped = 0
added = 0
for e in entries:
vid = e.get("id") or e.get("url")
if not vid:
continue
if vid in seen:
skipped += 1
continue
row = {
"source_profile": profile_url,
"video_id": vid,
"date": e.get("date", ""),
"title": e.get("title", ""),
"video_url": e.get("url", ""),
"views": e.get("views"),
"likes": e.get("likes")
}
new_rows.append(row)
new_ids.append(vid)
seen.add(vid)
added += 1
if new_rows:
time_file = append_to_excel(new_rows)
else:
time_file = None
history[key] = list(seen)
save_history(history)
if new_rows:
flash(f"抓取完成:新增 {added} 条,跳过 {skipped} 条。"
f"已保存至 output.xlsx 及 {time_file}。")
else:
flash(f"没有发现新视频,跳过 {skipped} 条。")
return redirect(url_for("index"))
files = list_output_files()
return render_template("index.html", files=files)
@app.route("/download/<filename>")
def download(filename):
if os.path.exists(filename):
return send_file(filename, as_attachment=True)
else:
flash("文件不存在。")
return redirect(url_for("index"))
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=False)