HotTrack / backend /app /api /tasks.py
chanfasf's picture
Feat: add revenue calculation button and better logging
9d27c36
Raw
History Blame Contribute Delete
13.1 kB
from fastapi import APIRouter, Depends, Query, BackgroundTasks
from sqlalchemy.orm import Session
from datetime import datetime
from app.database import get_db
from app.crawlers.youtube import YouTubeCrawler
from app.crawlers.tiktok import TikTokCrawler
from app.analyzers.viral import ViralAnalyzer
from app.analyzers.revenue import RevenueAnalyzer
from app.config import settings
router = APIRouter(prefix="/crawl", tags=["crawl"])
@router.get("/config-status")
def get_config_status():
return {
"youtube_api_key_configured": bool(settings.YOUTUBE_API_KEY),
"youtube_api_key_length": len(settings.YOUTUBE_API_KEY) if settings.YOUTUBE_API_KEY else 0,
"youtube_api_enabled": settings.YOUTUBE_API_ENABLED,
"tikhub_api_key_configured": bool(settings.TIKHUB_API_KEY),
"tikhub_api_key_length": len(settings.TIKHUB_API_KEY) if settings.TIKHUB_API_KEY else 0,
"tikhub_enabled": settings.TIKHUB_ENABLED,
}
@router.get("/test-youtube-api")
def test_youtube_api():
if not settings.YOUTUBE_API_KEY:
return {"success": False, "error": "YouTube API Key 未配置"}
try:
crawler = YouTubeCrawler()
videos = crawler.get_trending_videos(region_code="US", max_results=5)
if videos:
return {
"success": True,
"count": len(videos),
"sample": {
"title": videos[0].get("title", "")[:50],
"views": videos[0].get("view_count", 0),
}
}
return {"success": False, "error": "未获取到数据,请检查 API Key 是否有效"}
except Exception as e:
return {"success": False, "error": str(e)}
@router.post("/calculate-revenue")
def calculate_revenue(db: Session = Depends(get_db)):
try:
revenue_analyzer = RevenueAnalyzer(db)
count = revenue_analyzer.batch_estimate_revenue()
return {"success": True, "message": f"成功估算 {count} 条视频收入", "count": count}
except Exception as e:
return {"success": False, "error": str(e)}
@router.get("/schedule")
def get_schedule_status():
from app.main import scheduler, US_EASTERN
jobs = scheduler.get_jobs()
result = []
for job in jobs:
next_run = job.next_run_time
result.append({
"id": job.id,
"name": job.name or job.func.__name__,
"next_run": next_run.isoformat() if next_run else None,
"next_run_eastern": next_run.astimezone(US_EASTERN).strftime("%Y-%m-%d %H:%M:%S %Z") if next_run else None,
"trigger": str(job.trigger),
})
return {
"timezone": "America/New_York (北美东部时间)",
"schedule": "每天 00:00 执行",
"jobs": result,
}
@router.post("/trigger")
def trigger_crawl_now(db: Session = Depends(get_db)):
from app.scheduler.jobs import run_scheduled_crawl
run_scheduled_crawl()
return {"message": "数据更新已触发", "timestamp": datetime.now().isoformat()}
def _run_post_crawl_analysis(db: Session):
import logging
logger = logging.getLogger(__name__)
try:
viral_analyzer = ViralAnalyzer(db)
count = viral_analyzer.calculate_viral_scores()
logger.info(f"[分析] 爆红指数计算完成: {count} 条")
except Exception as e:
logger.error(f"[分析] 爆红指数计算失败: {e}")
try:
revenue_analyzer = RevenueAnalyzer(db)
count = revenue_analyzer.batch_estimate_revenue()
logger.info(f"[分析] 收入估算完成: {count} 条")
except Exception as e:
logger.error(f"[分析] 收入估算失败: {e}")
@router.post("/youtube")
def crawl_youtube(
type: str = Query("popular", description="采集类型: popular, trending, category"),
region: str = Query("US", description="区域代码"),
max_results: int = Query(50, ge=1, le=50),
db: Session = Depends(get_db),
):
import logging
logger = logging.getLogger(__name__)
logger.info(f"[YouTube 采集] 开始采集, type={type}, region={region}, max_results={max_results}")
logger.info(f"[YouTube 采集] YOUTUBE_API_ENABLED={settings.YOUTUBE_API_ENABLED}")
logger.info(f"[YouTube 采集] YOUTUBE_API_KEY 长度={len(settings.YOUTUBE_API_KEY) if settings.YOUTUBE_API_KEY else 0}")
try:
crawler = YouTubeCrawler()
videos = []
if type == "trending":
videos = crawler.get_trending_videos(region_code=region, max_results=max_results)
elif type == "category":
videos = crawler.get_trending_videos(region_code=region, category_id="10", max_results=max_results)
else:
videos = crawler.get_most_viewed_videos(region_code=region, max_results=max_results)
logger.info(f"[YouTube 采集] 获取到 {len(videos)} 条视频")
if not videos:
return {"message": "未获取到数据,请检查 YouTube API Key 是否有效或是否已启用 YouTube Data API v3", "count": 0, "api_key_configured": settings.YOUTUBE_API_ENABLED}
saved = crawler.save_videos_to_db(videos, db)
_run_post_crawl_analysis(db)
return {"message": f"成功获取并保存 {len(saved)} 条 YouTube 视频", "count": len(saved)}
except Exception as e:
logger.error(f"[YouTube 采集] 失败: {e}")
return {"message": f"YouTube 采集失败: {str(e)}", "count": 0}
@router.post("/tiktok")
def crawl_tiktok(
type: str = Query("trending", description="采集类型: trending, hashtag, user"),
keyword: str = Query("", description="关键词/标签"),
max_results: int = Query(50, ge=1, le=50),
db: Session = Depends(get_db),
):
crawler = TikTokCrawler(use_free_first=True)
videos = []
if type == "hashtag" and keyword:
videos = crawler.get_most_viewed_videos(hashtag=keyword, count=max_results)
elif type == "user" and keyword:
videos = crawler.search_viral_candidates(keyword=keyword, count=max_results)
else:
videos = crawler.get_trending_videos(count=max_results)
if not videos:
return {"message": "未获取到数据,TikTok 爬虫可能被反爬限制", "count": 0}
saved = crawler.save_videos_to_db(videos, db)
_run_post_crawl_analysis(db)
return {"message": f"成功获取并保存 {len(saved)} 条 TikTok 视频", "count": len(saved)}
@router.post("/youtube/trending")
def crawl_youtube_trending(
region_code: str = Query("US", description="区域代码,如 US, TW, JP"),
max_results: int = Query(50, ge=1, le=50),
db: Session = Depends(get_db),
):
crawler = YouTubeCrawler()
videos = crawler.get_trending_videos(region_code=region_code, max_results=max_results)
if not videos:
return {"message": "未获取到数据,请检查 YouTube API Key 是否配置", "count": 0}
saved = crawler.save_videos_to_db(videos, db)
_run_post_crawl_analysis(db)
return {"message": f"成功获取并保存 {len(saved)} 条 YouTube 热门视频", "count": len(saved)}
@router.post("/youtube/most-viewed")
def crawl_youtube_most_viewed(
query: str = Query("", description="搜索关键词"),
region_code: str = Query("US", description="区域代码"),
max_results: int = Query(50, ge=1, le=50),
db: Session = Depends(get_db),
):
crawler = YouTubeCrawler()
videos = crawler.get_most_viewed_videos(query=query, region_code=region_code, max_results=max_results)
if not videos:
return {"message": "未获取到数据", "count": 0}
saved = crawler.save_videos_to_db(videos, db)
_run_post_crawl_analysis(db)
return {"message": f"成功获取并保存 {len(saved)} 条 YouTube 高播放量视频", "count": len(saved)}
@router.post("/youtube/viral-candidates")
def crawl_youtube_viral(
query: str = Query("", description="搜索关键词"),
days: int = Query(7, ge=1, le=30, description="最近几天发布的视频"),
max_results: int = Query(50, ge=1, le=50),
db: Session = Depends(get_db),
):
from datetime import datetime, timezone, timedelta
published_after = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() + "Z"
crawler = YouTubeCrawler()
videos = crawler.search_viral_candidates(query=query, published_after=published_after, max_results=max_results)
if not videos:
return {"message": "未获取到数据", "count": 0}
saved = crawler.save_videos_to_db(videos, db)
_run_post_crawl_analysis(db)
return {"message": f"成功获取并保存 {len(saved)} 条 YouTube 爆红候选视频", "count": len(saved)}
@router.post("/tiktok/trending")
def crawl_tiktok_trending(
count: int = Query(50, ge=1, le=50),
db: Session = Depends(get_db),
):
crawler = TikTokCrawler(use_free_first=True)
videos = crawler.get_trending_videos(count=count)
if not videos:
return {"message": "未获取到数据,TikTok 爬虫可能被反爬限制", "count": 0}
saved = crawler.save_videos_to_db(videos, db)
_run_post_crawl_analysis(db)
return {"message": f"成功获取并保存 {len(saved)} 条 TikTok 热门视频", "count": len(saved)}
@router.post("/tiktok/most-viewed")
def crawl_tiktok_most_viewed(
hashtag: str = Query("", description="话题标签"),
count: int = Query(50, ge=1, le=50),
db: Session = Depends(get_db),
):
crawler = TikTokCrawler(use_free_first=True)
videos = crawler.get_most_viewed_videos(hashtag=hashtag, count=count)
if not videos:
return {"message": "未获取到数据", "count": 0}
saved = crawler.save_videos_to_db(videos, db)
_run_post_crawl_analysis(db)
return {"message": f"成功获取并保存 {len(saved)} 条 TikTok 高播放量视频", "count": len(saved)}
@router.post("/tiktok/viral-candidates")
def crawl_tiktok_viral(
keyword: str = Query("trending", description="搜索关键词"),
count: int = Query(50, ge=1, le=50),
db: Session = Depends(get_db),
):
crawler = TikTokCrawler(use_free_first=True)
videos = crawler.search_viral_candidates(keyword=keyword, count=count)
if not videos:
return {"message": "未获取到数据", "count": 0}
saved = crawler.save_videos_to_db(videos, db)
_run_post_crawl_analysis(db)
return {"message": f"成功获取并保存 {len(saved)} 条 TikTok 爆红候选视频", "count": len(saved)}
@router.post("/all")
def crawl_all_platforms(
region_code: str = Query("US", description="YouTube 区域代码"),
db: Session = Depends(get_db),
):
results = {}
yt_crawler = YouTubeCrawler()
yt_trending = yt_crawler.get_trending_videos(region_code=region_code)
if yt_trending:
yt_saved = yt_crawler.save_videos_to_db(yt_trending, db)
results["youtube_trending"] = len(yt_saved)
tt_crawler = TikTokCrawler(use_free_first=True)
tt_trending = tt_crawler.get_trending_videos()
if tt_trending:
tt_saved = tt_crawler.save_videos_to_db(tt_trending, db)
results["tiktok_trending"] = len(tt_saved)
_run_post_crawl_analysis(db)
return {"message": "全平台数据采集完成", "results": results}
@router.post("/reset-and-crawl")
def reset_and_crawl_real_data(db: Session = Depends(get_db)):
from app.models.video import Video, VideoSnapshot, RevenueEstimate, ViralScore
from app.config import settings
db.query(ViralScore).delete()
db.query(RevenueEstimate).delete()
db.query(VideoSnapshot).delete()
db.query(Video).delete()
db.commit()
results = {
"youtube": 0,
"tiktok": 0,
"errors": []
}
if settings.YOUTUBE_API_ENABLED:
try:
yt_crawler = YouTubeCrawler()
yt_videos = yt_crawler.get_trending_videos(region_code="US", max_results=50)
if yt_videos:
saved = yt_crawler.save_videos_to_db(yt_videos, db)
results["youtube"] = len(saved)
except Exception as e:
results["errors"].append(f"YouTube: {str(e)}")
try:
tt_crawler = TikTokCrawler(tikhub_api_key=settings.TIKHUB_API_KEY, use_free_first=True)
tt_videos = tt_crawler.get_trending_videos(count=50)
if tt_videos:
saved = tt_crawler.save_videos_to_db(tt_videos, db)
results["tiktok"] = len(saved)
except Exception as e:
results["errors"].append(f"TikTok: {str(e)}")
if results["youtube"] > 0 or results["tiktok"] > 0:
_run_post_crawl_analysis(db)
return {
"message": "数据库已重置并重新采集",
"youtube_api_enabled": settings.YOUTUBE_API_ENABLED,
"tikhub_enabled": settings.TIKHUB_ENABLED,
"results": results
}
@router.post("/seed-demo")
def seed_demo_data(db: Session = Depends(get_db)):
from app.demo.seed import generate_demo_data
count = generate_demo_data(db)
_run_post_crawl_analysis(db)
return {"message": f"已生成 {count} 条演示数据", "count": count}