Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| from tqdm.auto import tqdm | |
| import requests | |
| import tiktoken | |
| from typarse import BaseParser | |
| from openai import OpenAI | |
| import dotenv | |
| import pickle | |
| from core import get_batch_embeddings, Chunk, Dataset | |
| class Parser(BaseParser): | |
| chunk_size: int = 4000 | |
| save_path: str = "dataset.pkl" | |
| _abbrev = { | |
| "chunk_size": "c", | |
| "save_path": "s", | |
| } | |
| _help = { | |
| "chunk_size": "The maximum number of tokens per chunk", | |
| "save_path": "The path to save the dataset", | |
| } | |
| def get_youtube_title(url: str) -> str | None: | |
| """ | |
| Get the title of a youtube video from the url | |
| """ | |
| video_id = url.split("v=")[-1] | |
| api_url = f"https://www.youtube.com/oembed?url=http://www.youtube.com/watch?v={video_id}&format=json" | |
| response = requests.get(api_url) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data["title"] | |
| else: | |
| return None | |
| def num_tokens_from_string(string: str, encoding_name: str) -> int: | |
| """ | |
| Calculate the number of tokens in a string | |
| """ | |
| encoding = tiktoken.get_encoding(encoding_name) | |
| num_tokens = len(encoding.encode(string)) | |
| return num_tokens | |
| def required_chunks( | |
| text: str, max_tokens: int = 8191, encoding_name: str = "cl100k_base" | |
| ) -> int: | |
| """ | |
| Calculate the number of chunks required to split a text into chunks of a maximum number of tokens. | |
| """ | |
| num_tokens = num_tokens_from_string(text, encoding_name) | |
| num_chunks = num_tokens // max_tokens | |
| if num_tokens % max_tokens != 0: | |
| num_chunks += 1 | |
| return num_chunks | |
| def split_in_chunks( | |
| text: str, max_tokens: int = 8191, encoding_name: str = "cl100k_base" | |
| ) -> list[str]: | |
| """ | |
| Split a long text into chunks of a maximum number of tokens | |
| """ | |
| encoding = tiktoken.get_encoding(encoding_name) | |
| tokens = encoding.encode(text) | |
| chunks: list[str] = [] | |
| current_chunk: list[int] = [] | |
| current_chunk_size = 0 | |
| for token in tokens: | |
| if current_chunk_size + 1 > max_tokens: | |
| chunks.append(encoding.decode(current_chunk)) | |
| current_chunk = [] | |
| current_chunk_size = 0 | |
| current_chunk.append(token) | |
| current_chunk_size += 1 | |
| if current_chunk: | |
| chunks.append(encoding.decode(current_chunk)) | |
| return chunks | |
| if __name__ == "__main__": | |
| dotenv.load_dotenv() | |
| client = OpenAI() | |
| args = Parser() | |
| chunk_size = args.chunk_size | |
| links = pd.read_csv("links.csv").URL.tolist() | |
| titles = [get_youtube_title(link) for link in tqdm(links)] | |
| # Get all transcripts | |
| episodes = [] | |
| for i in range(17): | |
| filename = f"transcripts/{i}.vtt" | |
| with open(filename, "r") as file: | |
| data = file.read() | |
| episodes.append(data) | |
| episode_chunks = [ | |
| split_in_chunks(episode, max_tokens=chunk_size) for episode in episodes | |
| ] | |
| chunk_metadata = [ | |
| Chunk( | |
| title=titles[i], | |
| video_idx=i, | |
| text=episode_chunks[i][j], | |
| link=links[i], | |
| ) | |
| for i in range(17) | |
| for j in range(len(episode_chunks[i])) | |
| ] | |
| chunk_texts = [chunk.text for chunk in chunk_metadata] | |
| embeddings = get_batch_embeddings(client, chunk_texts) | |
| dataset = Dataset(chunks=chunk_metadata, embeddings=embeddings) | |
| with open(args.save_path, "wb") as file: | |
| pickle.dump(dataset, file) | |