from fastapi import APIRouter, Depends, Query, BackgroundTasks from sqlalchemy.orm import Session from datetime import datetime from app.database import get_db from app.crawlers.youtube import YouTubeCrawler from app.crawlers.tiktok import TikTokCrawler from app.analyzers.viral import ViralAnalyzer from app.analyzers.revenue import RevenueAnalyzer from app.config import settings router = APIRouter(prefix="/crawl", tags=["crawl"]) @router.get("/config-status") def get_config_status(): return { "youtube_api_key_configured": bool(settings.YOUTUBE_API_KEY), "youtube_api_key_length": len(settings.YOUTUBE_API_KEY) if settings.YOUTUBE_API_KEY else 0, "youtube_api_enabled": settings.YOUTUBE_API_ENABLED, "tikhub_api_key_configured": bool(settings.TIKHUB_API_KEY), "tikhub_api_key_length": len(settings.TIKHUB_API_KEY) if settings.TIKHUB_API_KEY else 0, "tikhub_enabled": settings.TIKHUB_ENABLED, } @router.get("/test-youtube-api") def test_youtube_api(): if not settings.YOUTUBE_API_KEY: return {"success": False, "error": "YouTube API Key 未配置"} try: crawler = YouTubeCrawler() videos = crawler.get_trending_videos(region_code="US", max_results=5) if videos: return { "success": True, "count": len(videos), "sample": { "title": videos[0].get("title", "")[:50], "views": videos[0].get("view_count", 0), } } return {"success": False, "error": "未获取到数据,请检查 API Key 是否有效"} except Exception as e: return {"success": False, "error": str(e)} @router.post("/calculate-revenue") def calculate_revenue(db: Session = Depends(get_db)): try: revenue_analyzer = RevenueAnalyzer(db) count = revenue_analyzer.batch_estimate_revenue() return {"success": True, "message": f"成功估算 {count} 条视频收入", "count": count} except Exception as e: return {"success": False, "error": str(e)} @router.get("/schedule") def get_schedule_status(): from app.main import scheduler, US_EASTERN jobs = scheduler.get_jobs() result = [] for job in jobs: next_run = job.next_run_time result.append({ "id": job.id, "name": job.name or job.func.__name__, "next_run": next_run.isoformat() if next_run else None, "next_run_eastern": next_run.astimezone(US_EASTERN).strftime("%Y-%m-%d %H:%M:%S %Z") if next_run else None, "trigger": str(job.trigger), }) return { "timezone": "America/New_York (北美东部时间)", "schedule": "每天 00:00 执行", "jobs": result, } @router.post("/trigger") def trigger_crawl_now(db: Session = Depends(get_db)): from app.scheduler.jobs import run_scheduled_crawl run_scheduled_crawl() return {"message": "数据更新已触发", "timestamp": datetime.now().isoformat()} def _run_post_crawl_analysis(db: Session): import logging logger = logging.getLogger(__name__) try: viral_analyzer = ViralAnalyzer(db) count = viral_analyzer.calculate_viral_scores() logger.info(f"[分析] 爆红指数计算完成: {count} 条") except Exception as e: logger.error(f"[分析] 爆红指数计算失败: {e}") try: revenue_analyzer = RevenueAnalyzer(db) count = revenue_analyzer.batch_estimate_revenue() logger.info(f"[分析] 收入估算完成: {count} 条") except Exception as e: logger.error(f"[分析] 收入估算失败: {e}") @router.post("/youtube") def crawl_youtube( type: str = Query("popular", description="采集类型: popular, trending, category"), region: str = Query("US", description="区域代码"), max_results: int = Query(50, ge=1, le=50), db: Session = Depends(get_db), ): import logging logger = logging.getLogger(__name__) logger.info(f"[YouTube 采集] 开始采集, type={type}, region={region}, max_results={max_results}") logger.info(f"[YouTube 采集] YOUTUBE_API_ENABLED={settings.YOUTUBE_API_ENABLED}") logger.info(f"[YouTube 采集] YOUTUBE_API_KEY 长度={len(settings.YOUTUBE_API_KEY) if settings.YOUTUBE_API_KEY else 0}") try: crawler = YouTubeCrawler() videos = [] if type == "trending": videos = crawler.get_trending_videos(region_code=region, max_results=max_results) elif type == "category": videos = crawler.get_trending_videos(region_code=region, category_id="10", max_results=max_results) else: videos = crawler.get_most_viewed_videos(region_code=region, max_results=max_results) logger.info(f"[YouTube 采集] 获取到 {len(videos)} 条视频") if not videos: return {"message": "未获取到数据,请检查 YouTube API Key 是否有效或是否已启用 YouTube Data API v3", "count": 0, "api_key_configured": settings.YOUTUBE_API_ENABLED} saved = crawler.save_videos_to_db(videos, db) _run_post_crawl_analysis(db) return {"message": f"成功获取并保存 {len(saved)} 条 YouTube 视频", "count": len(saved)} except Exception as e: logger.error(f"[YouTube 采集] 失败: {e}") return {"message": f"YouTube 采集失败: {str(e)}", "count": 0} @router.post("/tiktok") def crawl_tiktok( type: str = Query("trending", description="采集类型: trending, hashtag, user"), keyword: str = Query("", description="关键词/标签"), max_results: int = Query(50, ge=1, le=50), db: Session = Depends(get_db), ): crawler = TikTokCrawler(use_free_first=True) videos = [] if type == "hashtag" and keyword: videos = crawler.get_most_viewed_videos(hashtag=keyword, count=max_results) elif type == "user" and keyword: videos = crawler.search_viral_candidates(keyword=keyword, count=max_results) else: videos = crawler.get_trending_videos(count=max_results) if not videos: return {"message": "未获取到数据,TikTok 爬虫可能被反爬限制", "count": 0} saved = crawler.save_videos_to_db(videos, db) _run_post_crawl_analysis(db) return {"message": f"成功获取并保存 {len(saved)} 条 TikTok 视频", "count": len(saved)} @router.post("/youtube/trending") def crawl_youtube_trending( region_code: str = Query("US", description="区域代码,如 US, TW, JP"), max_results: int = Query(50, ge=1, le=50), db: Session = Depends(get_db), ): crawler = YouTubeCrawler() videos = crawler.get_trending_videos(region_code=region_code, max_results=max_results) if not videos: return {"message": "未获取到数据,请检查 YouTube API Key 是否配置", "count": 0} saved = crawler.save_videos_to_db(videos, db) _run_post_crawl_analysis(db) return {"message": f"成功获取并保存 {len(saved)} 条 YouTube 热门视频", "count": len(saved)} @router.post("/youtube/most-viewed") def crawl_youtube_most_viewed( query: str = Query("", description="搜索关键词"), region_code: str = Query("US", description="区域代码"), max_results: int = Query(50, ge=1, le=50), db: Session = Depends(get_db), ): crawler = YouTubeCrawler() videos = crawler.get_most_viewed_videos(query=query, region_code=region_code, max_results=max_results) if not videos: return {"message": "未获取到数据", "count": 0} saved = crawler.save_videos_to_db(videos, db) _run_post_crawl_analysis(db) return {"message": f"成功获取并保存 {len(saved)} 条 YouTube 高播放量视频", "count": len(saved)} @router.post("/youtube/viral-candidates") def crawl_youtube_viral( query: str = Query("", description="搜索关键词"), days: int = Query(7, ge=1, le=30, description="最近几天发布的视频"), max_results: int = Query(50, ge=1, le=50), db: Session = Depends(get_db), ): from datetime import datetime, timezone, timedelta published_after = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat() + "Z" crawler = YouTubeCrawler() videos = crawler.search_viral_candidates(query=query, published_after=published_after, max_results=max_results) if not videos: return {"message": "未获取到数据", "count": 0} saved = crawler.save_videos_to_db(videos, db) _run_post_crawl_analysis(db) return {"message": f"成功获取并保存 {len(saved)} 条 YouTube 爆红候选视频", "count": len(saved)} @router.post("/tiktok/trending") def crawl_tiktok_trending( count: int = Query(50, ge=1, le=50), db: Session = Depends(get_db), ): crawler = TikTokCrawler(use_free_first=True) videos = crawler.get_trending_videos(count=count) if not videos: return {"message": "未获取到数据,TikTok 爬虫可能被反爬限制", "count": 0} saved = crawler.save_videos_to_db(videos, db) _run_post_crawl_analysis(db) return {"message": f"成功获取并保存 {len(saved)} 条 TikTok 热门视频", "count": len(saved)} @router.post("/tiktok/most-viewed") def crawl_tiktok_most_viewed( hashtag: str = Query("", description="话题标签"), count: int = Query(50, ge=1, le=50), db: Session = Depends(get_db), ): crawler = TikTokCrawler(use_free_first=True) videos = crawler.get_most_viewed_videos(hashtag=hashtag, count=count) if not videos: return {"message": "未获取到数据", "count": 0} saved = crawler.save_videos_to_db(videos, db) _run_post_crawl_analysis(db) return {"message": f"成功获取并保存 {len(saved)} 条 TikTok 高播放量视频", "count": len(saved)} @router.post("/tiktok/viral-candidates") def crawl_tiktok_viral( keyword: str = Query("trending", description="搜索关键词"), count: int = Query(50, ge=1, le=50), db: Session = Depends(get_db), ): crawler = TikTokCrawler(use_free_first=True) videos = crawler.search_viral_candidates(keyword=keyword, count=count) if not videos: return {"message": "未获取到数据", "count": 0} saved = crawler.save_videos_to_db(videos, db) _run_post_crawl_analysis(db) return {"message": f"成功获取并保存 {len(saved)} 条 TikTok 爆红候选视频", "count": len(saved)} @router.post("/all") def crawl_all_platforms( region_code: str = Query("US", description="YouTube 区域代码"), db: Session = Depends(get_db), ): results = {} yt_crawler = YouTubeCrawler() yt_trending = yt_crawler.get_trending_videos(region_code=region_code) if yt_trending: yt_saved = yt_crawler.save_videos_to_db(yt_trending, db) results["youtube_trending"] = len(yt_saved) tt_crawler = TikTokCrawler(use_free_first=True) tt_trending = tt_crawler.get_trending_videos() if tt_trending: tt_saved = tt_crawler.save_videos_to_db(tt_trending, db) results["tiktok_trending"] = len(tt_saved) _run_post_crawl_analysis(db) return {"message": "全平台数据采集完成", "results": results} @router.post("/reset-and-crawl") def reset_and_crawl_real_data(db: Session = Depends(get_db)): from app.models.video import Video, VideoSnapshot, RevenueEstimate, ViralScore from app.config import settings db.query(ViralScore).delete() db.query(RevenueEstimate).delete() db.query(VideoSnapshot).delete() db.query(Video).delete() db.commit() results = { "youtube": 0, "tiktok": 0, "errors": [] } if settings.YOUTUBE_API_ENABLED: try: yt_crawler = YouTubeCrawler() yt_videos = yt_crawler.get_trending_videos(region_code="US", max_results=50) if yt_videos: saved = yt_crawler.save_videos_to_db(yt_videos, db) results["youtube"] = len(saved) except Exception as e: results["errors"].append(f"YouTube: {str(e)}") try: tt_crawler = TikTokCrawler(tikhub_api_key=settings.TIKHUB_API_KEY, use_free_first=True) tt_videos = tt_crawler.get_trending_videos(count=50) if tt_videos: saved = tt_crawler.save_videos_to_db(tt_videos, db) results["tiktok"] = len(saved) except Exception as e: results["errors"].append(f"TikTok: {str(e)}") if results["youtube"] > 0 or results["tiktok"] > 0: _run_post_crawl_analysis(db) return { "message": "数据库已重置并重新采集", "youtube_api_enabled": settings.YOUTUBE_API_ENABLED, "tikhub_enabled": settings.TIKHUB_ENABLED, "results": results } @router.post("/seed-demo") def seed_demo_data(db: Session = Depends(get_db)): from app.demo.seed import generate_demo_data count = generate_demo_data(db) _run_post_crawl_analysis(db) return {"message": f"已生成 {count} 条演示数据", "count": count}