518_yt_monitor / src /scripts /collect_transcript.py
atoye1's picture
adding updated datafiles
591c7e2
import json
import logging
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List
from youtube_transcript_api import (
NoTranscriptFound,
TranscriptsDisabled,
YouTubeTranscriptApi,
)
# ๋กœ๊น… ์„ค์ •
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def load_existing_transcripts(file_path: Path) -> Dict:
"""๊ธฐ์กด ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ๋ฐ์ดํ„ฐ ๋กœ๋“œ"""
if not file_path.exists():
return {"transcripts": []}
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data
except Exception as e:
logger.error(f"ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ํŒŒ์ผ ๋กœ๋“œ ์‹คํŒจ: {e}")
return {"transcripts": []}
def load_video_info(videos_file: str) -> List[Dict]:
"""๋น„๋””์˜ค ์ •๋ณด๋ฅผ ๋กœ๋“œํ•˜๋Š” ํ•จ์ˆ˜"""
try:
with open(videos_file, "r", encoding="utf-8") as f:
videos_data = json.load(f)
return videos_data.get("videos", [])
except Exception as e:
logger.error(f"๋น„๋””์˜ค ํŒŒ์ผ ๋กœ๋“œ ์‹คํŒจ: {str(e)}")
return []
def fetch_transcript(video_id: str, max_retries: int, retry_delay: int) -> Dict:
"""๊ฐœ๋ณ„ ๋น„๋””์˜ค์˜ ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ๋ฅผ API๋กœ ํ˜ธ์ถœํ•˜๋Š” ํ•จ์ˆ˜"""
for attempt in range(max_retries):
try:
transcript_list = YouTubeTranscriptApi.get_transcript(
video_id, languages=["ko", "en"]
)
return {"transcript_segments": transcript_list, "error": None}
except (TranscriptsDisabled, NoTranscriptFound) as e:
return {
"transcript_segments": None,
"error": f"ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์—†์Œ: {str(e)}",
}
except Exception as e:
if attempt < max_retries - 1:
wait_time = retry_delay * (attempt + 1)
logger.warning(
f"์˜ค๋ฅ˜ ๋ฐœ์ƒ (์žฌ์‹œ๋„ {attempt + 1}/{max_retries}), {wait_time}์ดˆ ํ›„ ์žฌ์‹œ๋„..."
)
time.sleep(wait_time)
else:
return {
"transcript_segments": None,
"error": f"์ตœ๋Œ€ ์žฌ์‹œ๋„ ํšŸ์ˆ˜ ์ดˆ๊ณผ: {str(e)}",
}
def save_transcripts_to_file(transcripts: List[Dict], output_file: Path):
"""ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ๋ฅผ ํŒŒ์ผ์— ์ €์žฅํ•˜๋Š” ํ•จ์ˆ˜"""
try:
with open(output_file, "w", encoding="utf-8") as f:
json.dump(transcripts, f, ensure_ascii=False, indent=2)
logger.info(f"\n๊ฒฐ๊ณผ ์ €์žฅ ์™„๋ฃŒ: {output_file}")
except Exception as e:
logger.error(f"๊ฒฐ๊ณผ ํŒŒ์ผ ์ €์žฅ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {str(e)}")
def collect_video_transcripts(
max_retries: int = 3, retry_delay: int = 5, videos_file: str = "data/videos.json"
) -> List[Dict]:
"""
๋น„๋””์˜ค ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์ˆ˜์ง‘ ํ•จ์ˆ˜
Args:
max_retries: API ํ˜ธ์ถœ ์‹คํŒจ ์‹œ ์ตœ๋Œ€ ์žฌ์‹œ๋„ ํšŸ์ˆ˜
retry_delay: ์žฌ์‹œ๋„ ๊ฐ„ ๋Œ€๊ธฐ ์‹œ๊ฐ„(์ดˆ)
videos_file: ๋น„๋””์˜ค ์ •๋ณด๊ฐ€ ๋‹ด๊ธด JSON ํŒŒ์ผ ๊ฒฝ๋กœ
"""
output_dir = Path("data")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / "transcripts_cache.json"
all_transcripts = load_existing_transcripts(output_file).get("transcripts", [])
# ๋น„๋””์˜ค ์ •๋ณด ๋กœ๋“œ
videos = load_video_info(videos_file)
# ๊ฒฐ๊ณผ ์ €์žฅ์šฉ ๋ฆฌ์ŠคํŠธ
failed_videos = []
# ์ด๋ฏธ ์ˆ˜์ง‘๋œ ๋น„๋””์˜ค ์•„์ด๋”” ๋ชฉ๋ก
collected_video_ids = {transcript["video_id"] for transcript in all_transcripts}
# ๊ฐ ๋น„๋””์˜ค์˜ ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์ˆ˜์ง‘
total_videos = len(videos)
for idx, video in enumerate(videos, 1):
video_id = video["video_id"]
# ์ด๋ฏธ ์ˆ˜์ง‘๋œ ๋น„๋””์˜ค์ธ ๊ฒฝ์šฐ ํŒจ์Šค
if video_id in collected_video_ids:
logger.info(f"\n[{idx}/{total_videos}] ์ด๋ฏธ ์ˆ˜์ง‘๋œ ๋น„๋””์˜ค: {video_id} - {video['title']}")
continue
logger.info(
f"\n[{idx}/{total_videos}] ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์ˆ˜์ง‘ ์‹œ๋„: {video_id} - {video['title']}"
)
result = fetch_transcript(video_id, max_retries, retry_delay)
transcript_segments = result["transcript_segments"]
error_message = result["error"]
if transcript_segments:
transcript_info = {
"video_id": video_id,
"channel_id": video["channel_id"],
"channel_handle": video["channel_handle"],
"title": video["title"],
"transcript_segments": transcript_segments,
"collected_at": datetime.now().isoformat(),
}
all_transcripts.append(transcript_info)
logger.info("ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์ˆ˜์ง‘ ์„ฑ๊ณต")
else:
failed_videos.append(
{
"video_id": video_id,
"channel_handle": video["channel_handle"],
"title": video["title"],
"error": error_message,
}
)
logger.warning(f"ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์ˆ˜์ง‘ ์‹คํŒจ: {error_message}")
# 50๊ฐœ๋งˆ๋‹ค ์ค‘๊ฐ„ ์ €์žฅ
if idx % 50 == 0:
save_transcripts_to_file({"transcripts": all_transcripts}, output_file)
# API ํ• ๋‹น๋Ÿ‰ ๋ณดํ˜ธ๋ฅผ ์œ„ํ•œ ๋Œ€๊ธฐ
time.sleep(0.2)
# ์ตœ์ข… ๊ฒฐ๊ณผ ์ €์žฅ
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
final_output_file = output_dir / f"transcripts_{timestamp}.json"
result = {
"collected_at": datetime.now().isoformat(),
"total_videos": total_videos,
"successful_videos": len(all_transcripts),
"failed_videos": len(failed_videos),
"transcripts": all_transcripts,
"failures": failed_videos,
}
save_transcripts_to_file(result, final_output_file)
if failed_videos:
logger.warning("\n์‹คํŒจํ•œ ๋น„๋””์˜ค๋“ค:")
for fail in failed_videos:
logger.warning(
f"- [{fail['channel_handle']}] {fail['title']}: {fail['error']}"
)
return all_transcripts
if __name__ == "__main__":
collect_video_transcripts()