File size: 4,589 Bytes
b9cc1a2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import json
import logging
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound

# ๋กœ๊น… ์„ค์ •
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def collect_video_transcripts(
    max_retries: int = 3,
    retry_delay: int = 5,
    videos_file: str = "data/videos.json"
) -> List[Dict]:
    """
    ๋น„๋””์˜ค ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์ˆ˜์ง‘ ํ•จ์ˆ˜
    
    Args:
        max_retries: API ํ˜ธ์ถœ ์‹คํŒจ ์‹œ ์ตœ๋Œ€ ์žฌ์‹œ๋„ ํšŸ์ˆ˜
        retry_delay: ์žฌ์‹œ๋„ ๊ฐ„ ๋Œ€๊ธฐ ์‹œ๊ฐ„(์ดˆ)
        videos_file: ๋น„๋””์˜ค ์ •๋ณด๊ฐ€ ๋‹ด๊ธด JSON ํŒŒ์ผ ๊ฒฝ๋กœ
    """
    output_dir = Path("data")
    output_dir.mkdir(parents=True, exist_ok=True)

    # ๋น„๋””์˜ค ์ •๋ณด ๋กœ๋“œ
    try:
        with open(videos_file, 'r', encoding='utf-8') as f:
            videos_data = json.load(f)
            videos = videos_data.get('videos', [])
    except Exception as e:
        logger.error(f"๋น„๋””์˜ค ํŒŒ์ผ ๋กœ๋“œ ์‹คํŒจ: {str(e)}")
        return []

    # ๊ฒฐ๊ณผ ์ €์žฅ์šฉ ๋ฆฌ์ŠคํŠธ
    all_transcripts = []
    failed_videos = []

    # ๊ฐ ๋น„๋””์˜ค์˜ ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์ˆ˜์ง‘
    total_videos = len(videos)
    for idx, video in enumerate(videos, 1):
        video_id = video['video_id']
        logger.info(f"\n[{idx}/{total_videos}] ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์ˆ˜์ง‘ ์‹œ๋„: {video_id} - {video['title']}")

        # ์žฌ์‹œ๋„ ๋กœ์ง
        transcript_segments = None
        error_message = None
        
        for attempt in range(max_retries):
            try:
                transcript_list = YouTubeTranscriptApi.get_transcript(
                    video_id,
                    languages=['ko', 'en']
                )
                transcript_segments = transcript_list
                break
            except (TranscriptsDisabled, NoTranscriptFound) as e:
                error_message = f"ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์—†์Œ: {str(e)}"
                break
            except Exception as e:
                if attempt < max_retries - 1:
                    wait_time = retry_delay * (attempt + 1)
                    logger.warning(f"์˜ค๋ฅ˜ ๋ฐœ์ƒ (์žฌ์‹œ๋„ {attempt + 1}/{max_retries}), {wait_time}์ดˆ ํ›„ ์žฌ์‹œ๋„...")
                    time.sleep(wait_time)
                else:
                    error_message = f"์ตœ๋Œ€ ์žฌ์‹œ๋„ ํšŸ์ˆ˜ ์ดˆ๊ณผ: {str(e)}"

        if transcript_segments:
            transcript_info = {
                'video_id': video_id,
                'channel_id': video['channel_id'],
                'channel_handle': video['channel_handle'],
                'title': video['title'],
                'transcript_segments': transcript_segments,
                'collected_at': datetime.now().isoformat()
            }
            all_transcripts.append(transcript_info)
            logger.info(f"ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์ˆ˜์ง‘ ์„ฑ๊ณต")
        else:
            failed_videos.append({
                'video_id': video_id,
                'channel_handle': video['channel_handle'],
                'title': video['title'],
                'error': error_message
            })
            logger.warning(f"ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์ˆ˜์ง‘ ์‹คํŒจ: {error_message}")

        # API ํ• ๋‹น๋Ÿ‰ ๋ณดํ˜ธ๋ฅผ ์œ„ํ•œ ๋Œ€๊ธฐ
        time.sleep(1)

    # ๊ฒฐ๊ณผ ์ €์žฅ
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    result = {
        "collected_at": datetime.now().isoformat(),
        "total_videos": total_videos,
        "successful_videos": len(all_transcripts),
        "failed_videos": len(failed_videos),
        "transcripts": all_transcripts,
        "failures": failed_videos
    }

    # ๊ฒฐ๊ณผ ํŒŒ์ผ ์ €์žฅ
    output_file = output_dir / f"transcripts_{timestamp}.json"
    try:
        with open(output_file, "w", encoding="utf-8") as f:
            json.dump(result, f, ensure_ascii=False, indent=2)
        logger.info(f"\n๊ฒฐ๊ณผ ์ €์žฅ ์™„๋ฃŒ: {output_file}")
        logger.info(f"์ด {len(all_transcripts)}๊ฐœ ํŠธ๋žœ์Šคํฌ๋ฆฝํŠธ ์ˆ˜์ง‘ ์™„๋ฃŒ (์‹คํŒจ: {len(failed_videos)}๊ฐœ)")

        if failed_videos:
            logger.warning("\n์‹คํŒจํ•œ ๋น„๋””์˜ค๋“ค:")
            for fail in failed_videos:
                logger.warning(f"- [{fail['channel_handle']}] {fail['title']}: {fail['error']}")
    except Exception as e:
        logger.error(f"๊ฒฐ๊ณผ ํŒŒ์ผ ์ €์žฅ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {str(e)}")
        return all_transcripts

    return all_transcripts

if __name__ == "__main__":
    collect_video_transcripts()