Upload rag_data_layer.py + JSON (raw!)

#1
by Landhoff - opened
Files changed (1) hide show
  1. rag_data_layer.py +614 -0
rag_data_layer.py ADDED
@@ -0,0 +1,614 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # -*- coding: utf-8 -*-
2
+ """RAG Data Layer.ipynb
3
+
4
+ Automatically generated by Colab.
5
+
6
+ Original file is located at
7
+ https://colab.research.google.com/drive/17nEwUcytqDID3-27YOGBv36tRhLj3E3p
8
+ """
9
+
10
+
11
+
12
+ """# Task
13
+ Build a RAG data layer by loading and processing data from "knowledge_base.json", "video_transcript.json", and "youtube_transcripts (1).json", then consolidating and exporting the processed chunks into a unified "rag_data_layer.json" file.
14
+
15
+ ## Load and Process Knowledge Base
16
+
17
+ ### Subtask:
18
+ Load the 'knowledge_base.json' file, process each item to extract relevant information, derive language and topic, assign a unique ID, and store the processed chunks.
19
+
20
+ **Reasoning**:
21
+ First, I'll import the `json` module to handle JSON file operations. Then, I will load the `knowledge_base.json` file, initialize an empty list to store processed chunks, and iterate through each item to extract, transform, and derive the required information such as URL, text, source type, language, topic, and a unique ID, as per the instructions.
22
+ """
23
+
24
+ import json
25
+
26
+ # 1. Load the content of 'knowledge_base.json'
27
+ with open('/content/knowledge_base.json', 'r') as f:
28
+ knowledge_base_data = json.load(f)
29
+
30
+ # 2. Initialize an empty list to store the processed knowledge base chunks.
31
+ processed_kb_chunks = []
32
+
33
+ # 3. Iterate through each item and process it
34
+ for i, item in enumerate(knowledge_base_data):
35
+ processed_chunk = {}
36
+
37
+ # Extract 'url'
38
+ processed_chunk['url'] = item.get('url')
39
+
40
+ # Assign 'content' to 'text'
41
+ processed_chunk['text'] = item.get('content')
42
+
43
+ # Set 'source_type'
44
+ processed_chunk['source_type'] = 'blog_article'
45
+
46
+ # Determine 'language'
47
+ if processed_chunk['url'] and '/ca/' in processed_chunk['url']:
48
+ processed_chunk['language'] = 'ca'
49
+ else:
50
+ processed_chunk['language'] = 'es'
51
+
52
+ # Derive 'topic'
53
+ topic = None
54
+ if processed_chunk['url']:
55
+ # Remove 'https://blog.example.com/' or 'https://blog.example.com/ca/' to get the path
56
+ path = processed_chunk['url'].replace('https://blog.example.com/ca/', '').replace('https://blog.example.com/', '')
57
+ path_parts = path.split('/')
58
+
59
+ # Try to find a non-empty part that could be the topic
60
+ for part in path_parts:
61
+ if part and '.html' not in part and part != 'ca' and part != 'es': # Exclude empty strings and file extensions
62
+ topic = part
63
+ break
64
+ if topic == None:
65
+ # Fallback for URLs that might not have a clear topic segment before an article name
66
+ # If the path looks like domain/language/article-name.html, take the part before .html
67
+ if len(path_parts) > 0 and '.html' in path_parts[-1]:
68
+ topic = path_parts[-1].split('.html')[0].replace('-',' ')
69
+ elif len(path_parts) > 1 and '.html' in path_parts[-2]:
70
+ topic = path_parts[-2].split('.html')[0].replace('-',' ')
71
+
72
+ if topic and topic.endswith('.html'): # Clean up if .html is still there
73
+ topic = topic.replace('.html', '')
74
+ if topic and '-' in topic and ' ' not in topic: # Replace hyphens with spaces if it looks like a slug
75
+ topic = topic.replace('-', ' ')
76
+
77
+ processed_chunk['topic'] = topic
78
+
79
+ # Assign a unique 'id'
80
+ processed_chunk['id'] = f"kb_{i+1}"
81
+
82
+ processed_kb_chunks.append(processed_chunk)
83
+
84
+ print(f"Processed {len(processed_kb_chunks)} knowledge base chunks.")
85
+ # Display the first 3 processed chunks to verify
86
+ print("\nFirst 3 processed chunks:")
87
+ for i in range(min(3, len(processed_kb_chunks))):
88
+ print(processed_kb_chunks[i])
89
+
90
+ """**Reasoning**:
91
+ The next step is to load and process the `video_transcript.json` file, extracting relevant information and structuring it similarly to the `knowledge_base.json` data, while assigning a unique ID and identifying the language and topic.
92
+
93
+
94
+ """
95
+
96
+ import re # Import regex for advanced topic extraction
97
+ import json
98
+
99
+ # Load the content of 'video_transcript.json'
100
+ with open('/content/video_transcript.json', 'r') as f:
101
+ video_transcript_data = json.load(f)
102
+
103
+ # Initialize an empty list to store the processed video transcript chunks.
104
+ processed_vt_chunks = []
105
+
106
+ # The video_transcript_data loaded from the file is a single dictionary,
107
+ # not a list of dictionaries. Wrap it in a list to use the existing loop structure.
108
+ items_to_process = [video_transcript_data] if isinstance(video_transcript_data, dict) else video_transcript_data
109
+
110
+ # Iterate through each item and process it
111
+ for i, item in enumerate(items_to_process):
112
+ processed_chunk = {}
113
+
114
+ # Extract 'url'. The 'video_transcript_data' example does not have a 'url' field directly.
115
+ # Assigning None if not found, as per dict.get() behavior.
116
+ processed_chunk['url'] = item.get('url')
117
+
118
+ # Assign 'text' from the 'text' key, not 'content'
119
+ processed_chunk['text'] = item.get('text')
120
+
121
+ # Set 'source_type'
122
+ processed_chunk['source_type'] = 'video_transcript'
123
+
124
+ # Determine 'language'. Prioritize the 'language' field from the item itself if available.
125
+ if item.get('language'):
126
+ processed_chunk['language'] = item.get('language')
127
+ elif processed_chunk['url'] and '/ca/' in processed_chunk['url']:
128
+ processed_chunk['language'] = 'ca'
129
+ else:
130
+ # Fallback if neither 'language' key nor URL contains language info
131
+ processed_chunk['language'] = 'es' # Defaulting to 'es' as per original logic
132
+
133
+ # Derive 'topic' from the URL or text
134
+ topic = None
135
+ if processed_chunk['url']:
136
+ # Original URL parsing logic (will likely not be triggered as 'url' is None for this data)
137
+ path_parts = [part for part in processed_chunk['url'].split('/') if part]
138
+ try:
139
+ videos_index = path_parts.index('videos')
140
+ if len(path_parts) > videos_index + 2 and path_parts[videos_index + 1] in ['ca', 'es']:
141
+ topic = path_parts[videos_index + 2]
142
+ elif len(path_parts) > videos_index + 1:
143
+ topic = path_parts[videos_index + 1]
144
+
145
+ if topic:
146
+ topic = topic.replace('-', ' ').replace('.html', '').strip()
147
+ except ValueError:
148
+ pass
149
+
150
+ # Fallback if topic extraction from URL is not successful or URL is None
151
+ if not topic and processed_chunk['text']:
152
+ # Taking the first sentence as a fallback topic
153
+ first_sentence = processed_chunk['text'].split('.')[0]
154
+ if len(first_sentence) > 5: # Avoid very short sentences as topics
155
+ topic = first_sentence.strip()
156
+ else:
157
+ topic = "General Video Topic" # A more generic fallback if first sentence is too short
158
+
159
+ processed_chunk['topic'] = topic
160
+
161
+ # Assign a unique 'id'
162
+ processed_chunk['id'] = f"vt_{i+1}"
163
+
164
+ processed_vt_chunks.append(processed_chunk)
165
+
166
+ print(f"Processed {len(processed_vt_chunks)} video transcript chunks.")
167
+ # Display the first 3 processed chunks to verify
168
+ print("\nFirst 3 processed chunks:")
169
+ for i in range(min(3, len(processed_vt_chunks))):
170
+ print(processed_vt_chunks[i])
171
+
172
+ import json
173
+
174
+ # Define the output file name
175
+ output_file_name = 'rag_data_layer.json'
176
+
177
+ # Load the content of 'rag_data_layer.json'
178
+ with open(output_file_name, 'r', encoding='utf-8') as f:
179
+ rag_data_layer = json.load(f)
180
+
181
+ print(f"Total chunks loaded from {output_file_name}: {len(rag_data_layer)}")
182
+
183
+ # Display the first 5 chunks to verify the content
184
+ print("\nFirst 5 chunks from rag_data_layer.json:")
185
+ for i in range(min(5, len(rag_data_layer))):
186
+ print(rag_data_layer[i])
187
+
188
+ """**Reasoning**:
189
+ The previous code failed because it attempted to iterate over a dictionary as if it were a list of dictionaries, causing `item` to be a string (the dictionary key) which lacks the `.get()` method. Additionally, the structure of `video_transcript_data` shows a 'text' key instead of 'content' and has a 'language' key which should be prioritized. I will wrap the single dictionary into a list for iteration, update the key access for text, and refine the language extraction to use the provided 'language' key.
190
+
191
+
192
+ """
193
+
194
+ import re # Import regex for advanced topic extraction
195
+ import json
196
+
197
+ # Load the content of 'video_transcript.json'
198
+ with open('/content/video_transcript.json', 'r') as f:
199
+ video_transcript_data = json.load(f)
200
+
201
+ # Initialize an empty list to store the processed video transcript chunks.
202
+ processed_vt_chunks = []
203
+
204
+ # The video_transcript_data loaded from the file is a single dictionary,
205
+ # not a list of dictionaries. Wrap it in a list to use the existing loop structure.
206
+ items_to_process = [video_transcript_data] if isinstance(video_transcript_data, dict) else video_transcript_data
207
+
208
+ # Iterate through each item and process it
209
+ for i, item in enumerate(items_to_process):
210
+ processed_chunk = {}
211
+
212
+ # Extract 'url'. The 'video_transcript_data' example does not have a 'url' field directly.
213
+ # Assigning None if not found, as per dict.get() behavior.
214
+ processed_chunk['url'] = item.get('url')
215
+
216
+ # Assign 'text' from the 'text' key, not 'content'
217
+ processed_chunk['text'] = item.get('text')
218
+
219
+ # Set 'source_type'
220
+ processed_chunk['source_type'] = 'video_transcript'
221
+
222
+ # Determine 'language'. Prioritize the 'language' field from the item itself if available.
223
+ if item.get('language'):
224
+ processed_chunk['language'] = item.get('language')
225
+ elif processed_chunk['url'] and '/ca/' in processed_chunk['url']:
226
+ processed_chunk['language'] = 'ca'
227
+ else:
228
+ # Fallback if neither 'language' key nor URL contains language info
229
+ processed_chunk['language'] = 'es' # Defaulting to 'es' as per original logic
230
+
231
+ # Derive 'topic' from the URL or text
232
+ topic = None
233
+ if processed_chunk['url']:
234
+ # Original URL parsing logic (will likely not be triggered as 'url' is None for this data)
235
+ path_parts = [part for part in processed_chunk['url'].split('/') if part]
236
+ try:
237
+ videos_index = path_parts.index('videos')
238
+ if len(path_parts) > videos_index + 2 and path_parts[videos_index + 1] in ['ca', 'es']:
239
+ topic = path_parts[videos_index + 2]
240
+ elif len(path_parts) > videos_index + 1:
241
+ topic = path_parts[videos_index + 1]
242
+
243
+ if topic:
244
+ topic = topic.replace('-', ' ').replace('.html', '').strip()
245
+ except ValueError:
246
+ pass
247
+
248
+ # Fallback if topic extraction from URL is not successful or URL is None
249
+ if not topic and processed_chunk['text']:
250
+ # Taking the first sentence as a fallback topic
251
+ first_sentence = processed_chunk['text'].split('.')[0]
252
+ if len(first_sentence) > 5: # Avoid very short sentences as topics
253
+ topic = first_sentence.strip()
254
+ else:
255
+ topic = "General Video Topic" # A more generic fallback if first sentence is too short
256
+
257
+ processed_chunk['topic'] = topic
258
+
259
+ # Assign a unique 'id'
260
+ processed_chunk['id'] = f"vt_{i+1}"
261
+
262
+ processed_vt_chunks.append(processed_chunk)
263
+
264
+ print(f"Processed {len(processed_vt_chunks)} video transcript chunks.")
265
+ # Display the first 3 processed chunks to verify
266
+ print("\nFirst 3 processed chunks:")
267
+ for i in range(min(3, len(processed_vt_chunks))):
268
+ print(processed_vt_chunks[i])
269
+
270
+ """## Load and Process YouTube Transcripts
271
+
272
+ ### Subtask:
273
+ Load the 'youtube_transcripts (1).json' file, process each item to extract relevant information, derive language and topic, assign a unique ID, and store the processed chunks.
274
+
275
+ **Reasoning**:
276
+ The next step is to load the 'youtube_transcripts (1).json' file, extract the relevant information, and process it into a structured format, similar to how the previous data sources were handled. This includes assigning a unique ID, identifying the language, and deriving a topic for each transcript.
277
+ """
278
+
279
+ import re
280
+ import json
281
+
282
+ # Load the content of 'youtube_transcripts (1).json'
283
+ with open('/content/youtube_transcripts (1).json', 'r') as f:
284
+ youtube_transcript_data = json.load(f)
285
+
286
+ # Initialize an empty list to store the processed YouTube transcript chunks.
287
+ processed_yt_chunks = []
288
+
289
+ # Iterate through each item and process it
290
+ for i, item in enumerate(youtube_transcript_data):
291
+ processed_chunk = {}
292
+
293
+ # Extract 'url'
294
+ processed_chunk['url'] = item.get('url')
295
+
296
+ # Assign 'text' from the 'text' key
297
+ processed_chunk['text'] = item.get('text')
298
+
299
+ # Set 'source_type'
300
+ processed_chunk['source_type'] = 'youtube_transcript'
301
+
302
+ # Determine 'language'. Prioritize the 'language' field from the item itself if available.
303
+ if item.get('language'):
304
+ processed_chunk['language'] = item.get('language')
305
+ elif processed_chunk['url'] and '/ca/' in processed_chunk['url']:
306
+ processed_chunk['language'] = 'ca'
307
+ else:
308
+ # Fallback if neither 'language' key nor URL contains language info
309
+ processed_chunk['language'] = 'es' # Defaulting to 'es'
310
+
311
+ # Derive 'topic' from the URL or text
312
+ topic = None
313
+ if processed_chunk['url']:
314
+ path_parts = [part for part in processed_chunk['url'].split('/') if part]
315
+ try:
316
+ # Assuming YouTube URLs might contain 'watch?v=' and the video ID is after that
317
+ # Or, if there's a more structured path for topics, use that.
318
+ if 'watch?v=' in processed_chunk['url']:
319
+ # Attempt to get a topic from a 'title' or 'description' if available in the item
320
+ # This JSON doesn't seem to have a title/description, so falling back to text.
321
+ pass # Will use the text fallback
322
+ else: # Try to extract from path segments like blog_article or video_transcript
323
+ # Look for a part after 'videos' and language, but before any video ID or file extension
324
+ if len(path_parts) > 2 and path_parts[1] in ['ca', 'es']:
325
+ topic = path_parts[2]
326
+ elif len(path_parts) > 1:
327
+ topic = path_parts[1]
328
+
329
+ if topic:
330
+ topic = topic.replace('-', ' ').replace('.html', '').strip()
331
+ except Exception:
332
+ pass # If any error during URL parsing, fallback to text
333
+
334
+ # Fallback if topic extraction from URL is not successful or URL is None
335
+ if not topic and processed_chunk['text']:
336
+ # Taking the first sentence as a fallback topic
337
+ first_sentence = processed_chunk['text'].split('.')[0]
338
+ if len(first_sentence) > 5: # Avoid very short sentences as topics
339
+ topic = first_sentence.strip()
340
+ else:
341
+ topic = "General YouTube Topic" # A more generic fallback
342
+
343
+ processed_chunk['topic'] = topic
344
+
345
+ # Assign a unique 'id'
346
+ processed_chunk['id'] = f"yt_{i+1}"
347
+
348
+ processed_yt_chunks.append(processed_chunk)
349
+
350
+ print(f"Processed {len(processed_yt_chunks)} YouTube transcript chunks.")
351
+ # Display the first 3 processed chunks to verify
352
+ print("\nFirst 3 processed chunks:")
353
+ for i in range(min(3, len(processed_yt_chunks))):
354
+ print(processed_yt_chunks[i])
355
+
356
+ """**Reasoning**:
357
+ The previous code block failed to extract 'url' and 'text' correctly from the YouTube transcript data because it was looking for keys 'url' and 'text' instead of 'video_url' and 'content', respectively. I need to update the key names to match the actual structure of the `youtube_transcript_data` and also improve the topic extraction for YouTube URLs, as the topic is not directly available in the URL path.
358
+
359
+
360
+ """
361
+
362
+ import re
363
+ import json
364
+
365
+ # Load the content of 'youtube_transcripts (1).json'
366
+ with open('/content/youtube_transcripts (1).json', 'r') as f:
367
+ youtube_transcript_data = json.load(f)
368
+
369
+ # Initialize an empty list to store the processed YouTube transcript chunks.
370
+ processed_yt_chunks = []
371
+
372
+ # Iterate through each item and process it
373
+ for i, item in enumerate(youtube_transcript_data):
374
+ processed_chunk = {}
375
+
376
+ # Extract 'video_url' and assign to 'url'
377
+ processed_chunk['url'] = item.get('video_url')
378
+
379
+ # Assign 'content' to 'text'
380
+ processed_chunk['text'] = item.get('content')
381
+
382
+ # Set 'source_type'
383
+ processed_chunk['source_type'] = 'youtube_transcript'
384
+
385
+ # Determine 'language'. Prioritize the 'language' field from the item itself if available.
386
+ # The example YouTube data doesn't have a 'language' key or '/ca/' in URL, so defaulting to 'es'.
387
+ if item.get('language'):
388
+ processed_chunk['language'] = item.get('language')
389
+ elif processed_chunk['url'] and '/ca/' in processed_chunk['url']:
390
+ processed_chunk['language'] = 'ca'
391
+ else:
392
+ processed_chunk['language'] = 'es' # Defaulting to 'es'
393
+
394
+ # Derive 'topic' from the URL or text
395
+ topic = None
396
+ if processed_chunk['url']:
397
+ # For YouTube URLs, a direct topic from path segments is unlikely.
398
+ # Attempt to extract topic from the video_id or a general placeholder.
399
+ match = re.search(r'v=([\w-]+)', processed_chunk['url'])
400
+ if match:
401
+ # Use video_id as a temporary topic or a placeholder, as no descriptive topic is in the URL path
402
+ topic = f"YouTube Video ID: {match.group(1)}"
403
+ # Further refinement of topic from URL might be needed if video titles are available.
404
+
405
+ # Fallback if topic extraction from URL is not successful or URL is None
406
+ if not topic and processed_chunk['text']:
407
+ # Taking the first sentence as a fallback topic if more descriptive topic not found
408
+ first_sentence = processed_chunk['text'].split('.')[0]
409
+ if len(first_sentence) > 5: # Avoid very short sentences as topics
410
+ topic = first_sentence.strip()
411
+ else:
412
+ topic = "General YouTube Topic" # A more generic fallback
413
+
414
+ processed_chunk['topic'] = topic
415
+
416
+ # Assign a unique 'id'
417
+ processed_chunk['id'] = f"yt_{i+1}"
418
+
419
+ processed_yt_chunks.append(processed_chunk)
420
+
421
+ print(f"Processed {len(processed_yt_chunks)} YouTube transcript chunks.")
422
+ # Display the first 3 processed chunks to verify
423
+ print("\nFirst 3 processed chunks:")
424
+ for i in range(min(3, len(processed_yt_chunks))):
425
+ print(processed_yt_chunks[i])
426
+
427
+ """**Reasoning**:
428
+ All data sources have been processed individually. The next logical step is to combine the processed chunks from the knowledge base, video transcripts, and YouTube transcripts into a single, unified list to prepare for exporting as the final RAG data layer.
429
+
430
+
431
+ """
432
+
433
+ all_processed_chunks = []
434
+ all_processed_chunks.extend(processed_kb_chunks)
435
+ all_processed_chunks.extend(processed_vt_chunks)
436
+ all_processed_chunks.extend(processed_yt_chunks)
437
+
438
+ print(f"Total processed chunks: {len(all_processed_chunks)}")
439
+ # Display the first 5 unified chunks to verify
440
+ print("\nFirst 5 unified chunks:")
441
+ for i in range(min(5, len(all_processed_chunks))):
442
+ print(all_processed_chunks[i])
443
+
444
+ """**Reasoning**:
445
+ The processed data from all sources has been unified into a single list. The final step is to export this consolidated list of chunks into a JSON file, completing the RAG data layer.
446
+
447
+
448
+ """
449
+
450
+ import json
451
+
452
+ # Define the output file name
453
+ output_file = 'rag_data_layer.json'
454
+
455
+ # Export the unified list of processed chunks to a JSON file
456
+ with open(output_file, 'w', encoding='utf-8') as f:
457
+ json.dump(all_processed_chunks, f, ensure_ascii=False, indent=4)
458
+
459
+ print(f"Successfully exported {len(all_processed_chunks)} chunks to {output_file}")
460
+
461
+ """## Process Breathing Transcript
462
+
463
+ ### Subtask:
464
+ Load the 'video_transcript.json' file, extract the 'text' content, and assign specific fixed values for 'url', 'source_type', 'language', and 'topic', along with a unique ID.
465
+
466
+ **Reasoning**:
467
+ The subtask requires loading the 'video_transcript.json' file, extracting its 'text' content, and assigning specific fixed values for 'url', 'source_type', 'language', 'topic', and a unique ID to the processed chunk. This code block will perform these operations and store the result in a new list.
468
+ """
469
+
470
+ import json
471
+
472
+ # 1. Load the content of 'video_transcript.json'
473
+ with open('/content/video_transcript.json', 'r') as f:
474
+ breathing_transcript_data = json.load(f)
475
+
476
+ # 2. Initialize an empty list to store the processed breathing chunks.
477
+ processed_breathing_chunks = []
478
+
479
+ # 3. Create a dictionary to represent the processed chunk and extract 'text'.
480
+ processed_chunk = {}
481
+ processed_chunk['text'] = breathing_transcript_data.get('text')
482
+
483
+ # 4. Assign the fixed 'url'
484
+ processed_chunk['url'] = 'https://video.local/breathing_exercise'
485
+
486
+ # 5. Assign the fixed 'source_type'
487
+ processed_chunk['source_type'] = 'breathing_video'
488
+
489
+ # 6. Assign the fixed 'language'
490
+ processed_chunk['language'] = 'es'
491
+
492
+ # 7. Assign the fixed 'topic'
493
+ processed_chunk['topic'] = 'anxiety'
494
+
495
+ # 8. Assign a unique 'id'
496
+ processed_chunk['id'] = 'vt_1'
497
+
498
+ # 9. Append the processed chunk to the list.
499
+ processed_breathing_chunks.append(processed_chunk)
500
+
501
+ # 10. Print the number of processed chunks and display the first processed chunk to verify.
502
+ print(f"Processed {len(processed_breathing_chunks)} breathing transcript chunks.")
503
+ print("\nFirst processed chunk:")
504
+ print(processed_breathing_chunks[0])
505
+
506
+ """## Process Legal Webinar Transcript
507
+
508
+ ### Subtask:
509
+ Load the 'youtube_transcripts (1).json' file, extract the 'url' and 'text' content, assign fixed values for 'source_type', 'language', and 'topic', along with a unique ID.
510
+
511
+ **Reasoning**:
512
+ The subtask requires loading the 'youtube_transcripts (1).json' file, extracting specific content, and assigning fixed metadata fields. This code block will perform these operations as per the instructions, creating a list of processed legal webinar chunks with unique IDs.
513
+ """
514
+
515
+ import json
516
+
517
+ # 1. Load the content of 'youtube_transcripts (1).json'
518
+ with open('/content/youtube_transcripts (1).json', 'r') as f:
519
+ legal_webinar_data = json.load(f)
520
+
521
+ # 2. Initialize an empty list to store the processed legal webinar chunks.
522
+ processed_legal_webinar_chunks = []
523
+
524
+ # 3. Iterate through each item in the loaded YouTube transcript data.
525
+ # 4. For each item, create a new dictionary to represent the processed chunk.
526
+ for i, item in enumerate(legal_webinar_data):
527
+ processed_chunk = {}
528
+
529
+ # 5. Extract the value associated with the 'video_url' key from the item and assign it to the 'url' key.
530
+ processed_chunk['url'] = item.get('video_url')
531
+
532
+ # 6. Extract the value associated with the 'content' key from the item and assign it to the 'text' key.
533
+ processed_chunk['text'] = item.get('content')
534
+
535
+ # 7. Assign the string 'legal_webinar' to the 'source_type' key.
536
+ processed_chunk['source_type'] = 'legal_webinar'
537
+
538
+ # 8. Assign the string 'es' to the 'language' key.
539
+ processed_chunk['language'] = 'es'
540
+
541
+ # 9. Assign the string 'legal' to the 'topic' key.
542
+ processed_chunk['topic'] = 'legal'
543
+
544
+ # 10. Assign a unique identifier (e.g., 'lw_1', 'lw_2', etc.) to the 'id' key.
545
+ processed_chunk['id'] = f"lw_{i+1}"
546
+
547
+ # 11. Append the processed_chunk to the processed_legal_webinar_chunks list.
548
+ processed_legal_webinar_chunks.append(processed_chunk)
549
+
550
+ # 12. Print the total number of processed chunks and display the first few processed chunks to verify.
551
+ print(f"Processed {len(processed_legal_webinar_chunks)} legal webinar chunks.")
552
+ print("\nFirst 3 processed chunks:")
553
+ for i in range(min(3, len(processed_legal_webinar_chunks))):
554
+ print(processed_legal_webinar_chunks[i])
555
+
556
+ """## Consolidate and Export All Corrected Chunks
557
+
558
+ ### Subtask:
559
+ Combine all the corrected processed chunks from the knowledge base, breathing transcript, and legal webinar into a single list, and then export this unified list to a new JSON file.
560
+
561
+ **Reasoning**:
562
+ The subtask requires combining all corrected processed chunks from different sources into a single list and then exporting this unified list to a JSON file. I will initialize an empty list, extend it with each of the processed chunk lists, print the total count for verification, and finally, save the combined list to 'rag_data_layer.json' with proper JSON formatting.
563
+ """
564
+
565
+ import json
566
+
567
+ # 1. Initialize an empty list called final_rag_data_layer.
568
+ final_rag_data_layer = []
569
+
570
+ # 2. Extend final_rag_data_layer with the contents of processed_kb_chunks.
571
+ final_rag_data_layer.extend(processed_kb_chunks)
572
+
573
+ # 3. Extend final_rag_data_layer with the contents of processed_breathing_chunks.
574
+ final_rag_data_layer.extend(processed_breathing_chunks)
575
+
576
+ # 4. Extend final_rag_data_layer with the contents of processed_legal_webinar_chunks.
577
+ final_rag_data_layer.extend(processed_legal_webinar_chunks)
578
+
579
+ # 5. Print the total number of chunks in final_rag_data_layer to verify the count.
580
+ print(f"Total chunks in final RAG data layer: {len(final_rag_data_layer)}")
581
+
582
+ # 6. Save final_rag_data_layer to a JSON file named 'rag_data_layer.json', ensuring proper formatting.
583
+ output_file_name = 'rag_data_layer.json'
584
+ with open(output_file_name, 'w', encoding='utf-8') as f:
585
+ json.dump(final_rag_data_layer, f, ensure_ascii=False, indent=4)
586
+
587
+ print(f"Successfully exported {len(final_rag_data_layer)} chunks to {output_file_name}")
588
+
589
+ # Display the first few chunks to verify the content
590
+ print("\nFirst 3 unified chunks:")
591
+ for i in range(min(3, len(final_rag_data_layer))):
592
+ print(final_rag_data_layer[i])
593
+
594
+ """## Final Task
595
+
596
+ ### Subtask:
597
+ Confirm that the RAG data layer has been successfully built and exported into a unified JSON file, ready for further use in the hackathon.
598
+
599
+ ## Summary:
600
+
601
+ ### Q&A
602
+ Yes, the RAG data layer has been successfully built and exported into a unified JSON file named `rag_data_layer.json`, containing all processed data and ready for further use in the hackathon.
603
+
604
+ ### Data Analysis Key Findings
605
+ * **Knowledge Base Processing:** The `knowledge_base.json` file was loaded, and its content was processed to extract URLs, content (mapped to 'text'), source type ('blog\_article'), language (derived from URL as 'ca' or 'es'), and a derived topic. A total of 28 knowledge base chunks were successfully processed.
606
+ * **Breathing Transcript Processing:** The `video_transcript.json` file was specifically processed as a "breathing transcript". Its 'text' content was extracted, and fixed metadata values were assigned: `url` as 'https://video.local/breathing\_exercise', `source_type` as 'breathing\_video', `language` as 'es', and `topic` as 'anxiety'. A single (1) breathing transcript chunk was processed.
607
+ * **Legal Webinar Transcript Processing:** The `youtube_transcripts (1).json` file was specifically processed as a "legal webinar transcript". Its 'video\_url' was mapped to 'url', 'content' to 'text', and fixed metadata values were assigned: `source_type` as 'legal\_webinar', `language` as 'es', and `topic` as 'legal'. A single (1) legal webinar chunk was processed.
608
+ * **Consolidation and Export:** All processed chunks from the knowledge base, breathing transcript, and legal webinar were consolidated into a single list. This unified list contained a total of 30 chunks.
609
+ * **Final Output:** The consolidated data layer, comprising 30 chunks, was successfully exported to `rag_data_layer.json` in a well-formatted JSON structure.
610
+
611
+ ### Insights or Next Steps
612
+ * Consider implementing a more dynamic and robust topic extraction method, potentially leveraging natural language processing (NLP) models, to reduce reliance on URL patterns or fixed assignments, especially as data sources grow.
613
+ * For future iterations, centralize the metadata schema and processing rules to ensure consistency and easier maintenance across diverse data sources, instead of having source-specific hardcoded values.
614
+ """