File size: 3,397 Bytes
cebfc1b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from youtube_transcript_api import YouTubeTranscriptApi
from Data.get_video_link import video_links_main
from pathlib import Path
from datetime import datetime

# Dynamically get the root directory of the project
PROJECT_ROOT = Path(__file__).resolve().parent.parent  # Moves up from /Data/
TRANSCRIPTS_FOLDER = PROJECT_ROOT / "Data" / "transcripts"

def save_transcript(video_id, transcript_text):
    """
    Saves transcripts to the local folder
    """
    # Ensure the transcripts folder exists
    TRANSCRIPTS_FOLDER.mkdir(parents=True, exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    filename = f"{video_id}_{timestamp}.txt"
    file_path = TRANSCRIPTS_FOLDER / filename

    file_path.write_text('\n'.join(transcript_text), encoding="utf-8")
    return file_path


def get_video_id(video_links_list):
    return [link.replace("https://www.youtube.com/watch?v=", "") for link in video_links_list]


def fetch_yt_transcript(video_ids):
    """
    Fetches YouTube transcripts using video IDs.
    """
    video_transcripts = {}

    for video_id in video_ids:
        print(f"Fetching transcript for: {video_id}")
        try:
            output = YouTubeTranscriptApi.get_transcript(video_id)
            transcript_text = [item['text'] for item in output]

            # Save transcript and get file path
            file_path = save_transcript(video_id, transcript_text)
            video_transcripts[video_id] = {
                'text': transcript_text,
                'file_path': str(file_path)
            }
            print(f"Transcript saved to: {file_path}")

        except Exception as e:
            print(f"Transcript not found for video: {video_id}")
            video_transcripts[video_id] = {
                'text': [],
                'file_path': None
            }

    return video_transcripts


def all_video_transcript_pipeline():
    """
    Handles fetching and storing transcripts, checking for new videos.
    """
    print(f"Looking for transcripts in: {TRANSCRIPTS_FOLDER}")
    video_links_list, new_video_added, new_videos_link = video_links_main()
    video_transcripts = {}

    # Always load existing transcripts
    if TRANSCRIPTS_FOLDER.exists():
        existing_files = list(TRANSCRIPTS_FOLDER.glob("*.txt"))
        print(f"Found {len(existing_files)} transcript files.")

        for file in existing_files:
            video_id = file.stem.split("_")[0]  # Extract video ID
            try:
                transcript_text = file.read_text(encoding="utf-8").splitlines()
                video_transcripts[video_id] = {
                    'text': transcript_text,
                    'file_path': str(file)
                }
                print(f"Loaded transcript for video: {video_id}")
            except Exception as e:
                print(f"Error loading transcript {file.name}: {e}")
    else:
        print(f"Transcripts folder not found at: {TRANSCRIPTS_FOLDER}, creating it.")
        TRANSCRIPTS_FOLDER.mkdir(parents=True, exist_ok=True)

    # Fetch new transcripts if needed
    if new_video_added and new_videos_link:
        print("New videos detected... Fetching transcripts.")
        new_video_ids = [url.split("v=")[1] for url in new_videos_link]  # Extract video IDs
        new_transcripts = fetch_yt_transcript(new_video_ids)

    print(f"Total transcripts loaded: {len(video_transcripts)}")