debaghtk commited on
Commit
28d8414
·
1 Parent(s): 8f5e88f

save transcripts with timestamps

Browse files
Files changed (2) hide show
  1. requirements.txt +2 -0
  2. youtubeTranscription.py +83 -41
requirements.txt CHANGED
@@ -11,6 +11,8 @@ langchain-openai
11
  langdetect
12
  googletrans
13
  youtube-transcript-api
 
 
14
 
15
  # dev dependencies
16
  watchdog
 
11
  langdetect
12
  googletrans
13
  youtube-transcript-api
14
+ google-api-python-client
15
+ deep-translator
16
 
17
  # dev dependencies
18
  watchdog
youtubeTranscription.py CHANGED
@@ -1,12 +1,14 @@
1
  import os
 
 
2
  from googleapiclient.discovery import build
3
  from youtube_transcript_api import YouTubeTranscriptApi
4
  from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptAvailable, VideoUnavailable
5
  from langdetect import detect
6
- from googletrans import Translator
7
  from tqdm import tqdm
8
- import re
9
  from dotenv import load_dotenv
 
 
10
 
11
  load_dotenv()
12
 
@@ -18,9 +20,8 @@ CHANNEL_ID = os.getenv("CHANNEL_ID")
18
  youtube = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY)
19
 
20
  # Initialize Translator
21
- translator = Translator()
22
 
23
- # Get all video IDs from a YouTube channel
24
  def get_all_video_ids(channel_id):
25
  video_ids = []
26
  next_page_token = None
@@ -45,9 +46,20 @@ def get_all_video_ids(channel_id):
45
 
46
  return video_ids
47
 
 
 
 
 
 
 
 
 
 
48
  # Fetch transcripts for the given video IDs
49
  def fetch_video_data(video_ids):
50
  video_data = {}
 
 
51
  # Create a progress bar
52
  pbar = tqdm(total=len(video_ids), desc="Processing videos", unit="video")
53
 
@@ -59,76 +71,113 @@ def fetch_video_data(video_ids):
59
  id=video_id
60
  ).execute()
61
 
62
- video_title = video_response['items'][0]['snippet']['title']
63
- video_description = video_response['items'][0]['snippet']['description']
64
-
 
 
 
 
 
 
 
 
65
  # Clean the description to remove social handles and links
66
  cleaned_description = clean_description(video_description)
67
 
68
  # Attempt to fetch transcript using different methods
69
- transcript_text = None
70
  for method in range(1, 4):
71
  try:
72
  if method == 1:
73
  # Method 1: Try to get all transcripts
74
  transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
75
  transcript = transcript_list.find_transcript(['en'])
76
- transcript_text = " ".join([entry['text'] for entry in transcript.fetch()])
77
  elif method == 2:
78
  # Method 2: Try to get transcript directly
79
- transcript = YouTubeTranscriptApi.get_transcript(video_id)
80
- transcript_text = " ".join([entry['text'] for entry in transcript])
81
  else:
82
  # Method 3: Try to get any available transcript and translate if necessary
83
  transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
84
  transcript = transcript_list.find_generated_transcript(['hi', 'en'])
85
- transcript_text = " ".join([entry['text'] for entry in transcript.fetch()])
86
 
87
- if transcript_text:
88
  break
89
  except Exception:
90
  continue
91
 
92
- if transcript_text:
93
  # Detect language of the transcript
 
94
  detected_language = detect(transcript_text)
95
- if detected_language != 'en': # If not in English, translate it to English
96
- transcript_text = translator.translate(transcript_text, src=detected_language, dest='en').text
97
 
98
- video_data[video_id] = {
99
- 'title': video_title,
100
- 'description': cleaned_description, # Use cleaned description
101
- 'transcript': transcript_text
102
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
103
 
104
  # Create the 'transcriptions' folder if it doesn't exist
105
  os.makedirs('transcriptions', exist_ok=True)
106
 
107
- # Write the file in the 'transcriptions' folder
108
- with open(os.path.join('transcriptions', f"{video_id}_transcription.txt"), "w", encoding='utf-8') as f:
109
- f.write(f"Title: {video_title}\n\nDescription: {cleaned_description}\n\nTranscript: {transcript_text}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
  else:
111
  pbar.write(f"Could not retrieve transcript for Video ID: {video_id}")
 
112
 
113
  except Exception as e:
114
  pbar.write(f"An error occurred while processing video {video_id}: {e}")
115
-
 
 
 
116
  finally:
117
  # Update the progress bar
118
  pbar.update(1)
119
 
120
  # Close the progress bar
121
  pbar.close()
122
- return video_data
123
 
124
- def clean_description(description):
125
- # Remove URLs
126
- description = re.sub(r'http[s]?://\S+', '', description)
127
- # Remove social media handles (e.g., @username)
128
- description = re.sub(r'@\w+', '', description)
129
- # Remove any remaining unwanted characters (optional)
130
- description = re.sub(r'\s+', ' ', description).strip()
131
- return description
132
 
133
  # Main Function
134
  if __name__ == "__main__":
@@ -137,10 +186,3 @@ if __name__ == "__main__":
137
 
138
  # Step 2: Fetch transcripts for the videos
139
  video_data = fetch_video_data(video_ids)
140
-
141
- # Optional: Print or process the video data
142
- # for video_id, data in video_data.items():
143
- # print(f"\nData for Video ID {video_id}:")
144
- # print(f"Title: {data['title']}")
145
- # print(f"Description: {data['description']}")
146
- # print(f"Transcript: {data['transcript']}")
 
1
  import os
2
+ import re
3
+ import time
4
  from googleapiclient.discovery import build
5
  from youtube_transcript_api import YouTubeTranscriptApi
6
  from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptAvailable, VideoUnavailable
7
  from langdetect import detect
 
8
  from tqdm import tqdm
 
9
  from dotenv import load_dotenv
10
+ # Use deep-translator instead of googletrans
11
+ from deep_translator import GoogleTranslator
12
 
13
  load_dotenv()
14
 
 
20
  youtube = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY)
21
 
22
  # Initialize Translator
23
+ translator = GoogleTranslator(source='auto', target='en')
24
 
 
25
  def get_all_video_ids(channel_id):
26
  video_ids = []
27
  next_page_token = None
 
46
 
47
  return video_ids
48
 
49
+ def clean_description(description):
50
+ # Remove URLs
51
+ description = re.sub(r'http[s]?://\S+', '', description)
52
+ # Remove social media handles (e.g., @username)
53
+ description = re.sub(r'@\w+', '', description)
54
+ # Remove any remaining unwanted characters (optional)
55
+ description = re.sub(r'\s+', ' ', description).strip()
56
+ return description
57
+
58
  # Fetch transcripts for the given video IDs
59
  def fetch_video_data(video_ids):
60
  video_data = {}
61
+ failed_videos = [] # List to store video IDs for which transcription failed
62
+
63
  # Create a progress bar
64
  pbar = tqdm(total=len(video_ids), desc="Processing videos", unit="video")
65
 
 
71
  id=video_id
72
  ).execute()
73
 
74
+ if not video_response['items']:
75
+ pbar.write(f"No video details found for Video ID: {video_id}")
76
+ failed_videos.append(video_id)
77
+ continue # Skip to the next video
78
+
79
+ video_item = video_response['items'][0]
80
+ snippet = video_item.get('snippet', {})
81
+
82
+ video_title = snippet.get('title', '')
83
+ video_description = snippet.get('description', '')
84
+
85
  # Clean the description to remove social handles and links
86
  cleaned_description = clean_description(video_description)
87
 
88
  # Attempt to fetch transcript using different methods
89
+ transcript_entries = None
90
  for method in range(1, 4):
91
  try:
92
  if method == 1:
93
  # Method 1: Try to get all transcripts
94
  transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
95
  transcript = transcript_list.find_transcript(['en'])
96
+ transcript_entries = transcript.fetch()
97
  elif method == 2:
98
  # Method 2: Try to get transcript directly
99
+ transcript_entries = YouTubeTranscriptApi.get_transcript(video_id)
 
100
  else:
101
  # Method 3: Try to get any available transcript and translate if necessary
102
  transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
103
  transcript = transcript_list.find_generated_transcript(['hi', 'en'])
104
+ transcript_entries = transcript.fetch()
105
 
106
+ if transcript_entries:
107
  break
108
  except Exception:
109
  continue
110
 
111
+ if transcript_entries is not None and len(transcript_entries) > 0:
112
  # Detect language of the transcript
113
+ transcript_text = " ".join([entry['text'] for entry in transcript_entries if entry.get('text')])
114
  detected_language = detect(transcript_text)
 
 
115
 
116
+ if detected_language != 'en':
117
+ # Translate each entry individually
118
+ for entry in transcript_entries:
119
+ text_to_translate = entry.get('text', '')
120
+ if text_to_translate:
121
+ try:
122
+ # Use deep-translator
123
+ translated_text = translator.translate(text_to_translate)
124
+ entry['text'] = translated_text
125
+ except Exception as e:
126
+ print(f"Translation failed for video {video_id} at entry starting at {entry.get('start')}: {e}")
127
+ import traceback
128
+ traceback.print_exc()
129
+ entry['text'] = text_to_translate # Fallback to original text
130
+ # Include a delay to prevent rate limiting
131
+ time.sleep(0.5)
132
+ else:
133
+ entry['text'] = ''
134
 
135
  # Create the 'transcriptions' folder if it doesn't exist
136
  os.makedirs('transcriptions', exist_ok=True)
137
 
138
+ # Write the file in the 'transcriptions' folder with timestamps
139
+ transcription_file = os.path.join('transcriptions', f"{video_id}_transcription.txt")
140
+ with open(transcription_file, "w", encoding='utf-8') as f:
141
+ f.write(f"Title: {video_title}\n\nDescription: {cleaned_description}\n\nTranscript:\n")
142
+ for entry in transcript_entries:
143
+ start_time = entry.get('start')
144
+ duration = entry.get('duration')
145
+ text = entry.get('text')
146
+ if start_time is not None and duration is not None and text is not None:
147
+ f.write(f"[{start_time:.2f} - {start_time + duration:.2f}] {text}\n")
148
+
149
+ # Update video data
150
+ video_data[video_id] = {
151
+ 'title': video_title,
152
+ 'description': cleaned_description,
153
+ 'transcript_entries': transcript_entries
154
+ }
155
+
156
  else:
157
  pbar.write(f"Could not retrieve transcript for Video ID: {video_id}")
158
+ failed_videos.append(video_id) # Log the failed video ID
159
 
160
  except Exception as e:
161
  pbar.write(f"An error occurred while processing video {video_id}: {e}")
162
+ import traceback
163
+ pbar.write(traceback.format_exc()) # Output the traceback
164
+ failed_videos.append(video_id) # Log the failed video ID
165
+
166
  finally:
167
  # Update the progress bar
168
  pbar.update(1)
169
 
170
  # Close the progress bar
171
  pbar.close()
 
172
 
173
+ # Write failed video IDs to a separate file
174
+ if failed_videos:
175
+ with open('failed_videos.txt', 'w') as fv:
176
+ for vid in failed_videos:
177
+ fv.write(f"{vid}\n")
178
+ print(f"\nLogged failed video IDs to 'failed_videos.txt'.")
179
+
180
+ return video_data
181
 
182
  # Main Function
183
  if __name__ == "__main__":
 
186
 
187
  # Step 2: Fetch transcripts for the videos
188
  video_data = fetch_video_data(video_ids)