# prioritized todos # supabase to store index (apparently can't rely on vector db to do it?) and user's curations / popular curations # - paused after 1 week inactivity (and i believe pinecone index DELETED after some days of inactivity?!) # - - TODO backup both pinecone and supabase daily (this should count as the activity), and make publicly accessible # TODO add discord/github/google auth...via custom js? see supabase docs # - make users maintainers of their own curations, restrict add perms, introduce edit/delete/clone perms # - add stars to curations+users profile -> display starred curations first, then sort by most popular # - securely store user's openai key in supabase for convenience # TODO better ai arch # - eg let user customize instr (via langchain's jinji support?) # - better: make easy to experiment with langchain's chains/agents # - maybe something like model_laboratory with gradio's Parallel block? # - account for mpnet's limit of 384 word pieces per chunk (is it done already?) # - - more deliberate chunking strat in general # TODO summarize a vid (and optionally add to curation) # TODO support yt playlists and yt channels in addition to just one-off videos # - can i make this really easy to add via a well designed api? # unprioritized todos # TODO some inline todos below that should reduce need to reset/rollback DBs # - how to easily rollback bad data? # TODO harrison thinks editing vectorDB abstraction to consume Embedding class vs func is a good approach -> need to PR this # TODO can i generalize the query filter approach (add to langchain?) to remove coupling to pinecone? # - i believe elastic8.5 supports rdb and vdb, but need nontrivial specs to run it i think # TODO user prefs data model (their curations) # TODO finalize deployment strategy # - supabase free tier for db + blob storage of transcripts # - hf space to host model computations (langchain bits need to run here) # - replit or supabase to host edge functiosn to call hf space # TODO gradio global state to track recently asked questions from everyone # TODO create pinecone index without indexing text metadata field for performance: https://docs.pinecone.io/docs/manage-indexes#selective-metadata-indexing # TODO could use pinecone namespace per embedding model # TODO deploy txtai to fly.io free tier? not sure compute reqs # - or haystack? # - both these come with many features out of the box import os import json import uuid import openai import spacy import en_core_web_sm import gradio as gr from gradio import blocks import pinecone from supabase import create_client, Client from langchain.vectorstores import Pinecone from langchain.text_splitter import SpacyTextSplitter from langchain.embeddings import HuggingFaceEmbeddings from pytube import YouTube from youtube_transcript_api import YouTubeTranscriptApi from typing import Any, Callable, Dict, Iterable, List, Optional Embedder = HuggingFaceEmbeddings().embed_query Model_name = HuggingFaceEmbeddings().model_name PINECONE_APIKEY: str = os.environ.get("PINECONE_APIKEY") SUPABASE_URL: str = os.environ.get("SUPABASE_URL") SUPABASE_KEY: str = os.environ.get("SUPABASE_KEY") supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) pinecone.init( api_key=PINECONE_APIKEY, environment='us-west1-gcp' # only option for for free tier ) class MyPinecone(Pinecone): def add_texts( self, texts: Iterable[str], metadatas: Optional[List[dict]] = None ) -> List[str]: """Run more texts through the embeddings and add to the vectorstore. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. Returns: List of ids from adding the texts into the vectorstore. """ # Embed and create the documents docs = [] ids = [] for i, text in enumerate(texts): id = str(uuid.uuid4()) embedding = self._embedding_function(text)#.tolist() metadata = metadatas[i] if metadatas else {} metadata[self._text_key] = text docs.append((id, embedding, metadata)) ids.append(id) # upsert to Pinecone self._index.upsert(vectors=docs) return ids Pinecone_index = pinecone.Index('semantic-curations') Vdb = MyPinecone(Pinecone_index, Embedder, "text") def supa_all(supa_data) -> List[dict]: datajson = json.loads(supa_data.json()) return datajson['data'] def transcript2chunks(transcript): print("starting transcript2chunks") # TODO what's a good chunk_size? # TODO should store as metadata in dbs r = SpacyTextSplitter(chunk_size = 2000).split_text(transcript) print("finished chunking") return r def video_id_to_media_id(video_id: str) -> Optional[str]: rows = supa_all(supabase.table('ingested_youtube_videos').select('media_id').eq('video_id', video_id).execute()) print(rows) if len(rows) == 1: return rows[0]['media_id'] else: return None # returns curation_ids that already have the video_id def check_curations_with_video(video_id: str) -> List[str]: media_id = video_id_to_media_id(video_id) print(f"media_id {media_id}") if media_id is None: return [] data = supa_all(supabase.table("junction_curations").select("curation_id").eq('media_id', media_id).execute()) in_curations = [r['curation_id'] for r in data] return in_curations def yt2transcript(video_id): print(f"\n\nstarting yt2transcript on id: {video_id}") # data looks like [{'text': 'hey friends welcome to one little coder', 'start': 0.84, 'duration': 4.38}, ...] data = YouTubeTranscriptApi.get_transcript(video_id) transcript = ' '.join([x['text'] for x in data]) print("got transcript") # TODO if there is no transcript (how likely is this?), run through whisper-large on hf (but 30k free characters per month) # TODO ought to store timestamp of chunks in metadata for better Sources. # - instead of splitting transcript into chunks, can i merge these fragments into approp size? langchain has merge func return transcript def yt_id2name(video_id: str) -> str: video = YouTube(f"https://www.youtube.com/watch?v={video_id}") return video.title # db guarantees name is unique across rows def curation_name2id() -> dict: rows = supa_all(supabase.table("curations_metadata").select("curation_id, name").execute()) c = {} for r in rows: c[r['name']] = r['curation_id'] return c def get_curation_names(): d = curation_name2id() return list(d.keys()) def get_curations_and_videos(): rows = supa_all(supabase.table("curations_metadata").select("curation_id, name, media_id:ingested_youtube_videos ( video_name )").execute()) row_d = {} for r in rows: for m in r['media_id']: row_d.setdefault(r['name'], []).append(m['video_name']) return row_d def gen_curation_md(): output = "" for curation_name,video_names in get_curations_and_videos().items(): output += f"\n## {curation_name}\n" output += "1. " + "\n1. ".join(video_names) return output def ingest_video(video_id: str, selected_curation_names: List[str], new_curation: str = ""): video_id = video_id.strip() if new_curation: curcur = curation_name2id() if new_curation in curcur.keys(): return "dupe curation name", gr.update(), gr.update(), gr.update() # add to db here, which will autogen the id supabase.table("curations_metadata").insert({"name": new_curation}).execute() selected_curation_names.append(new_curation) if not selected_curation_names: # contains new_curation at this point return "need >=1 curations", gr.update(), gr.update(), gr.update() cur_dict = curation_name2id() selected_curation_ids = [cur_dict[n] for n in selected_curation_names] existing_curations_with_video = check_curations_with_video(video_id) curations_to_add_video_to = list(set(selected_curation_ids).difference(set(existing_curations_with_video))) goal_curations_with_video = existing_curations_with_video + curations_to_add_video_to if not curations_to_add_video_to: # video already in all selected curations return "dupe video", gr.update(), gr.update(), gr.update() if len(existing_curations_with_video) == 0: # no curations have the video, we need to add it to vector db assert(goal_curations_with_video == curations_to_add_video_to) # this should be true in this case print("new video, processing\n") try: video_name = yt_id2name(video_id) except Exception as e: # TODO undo new_curation create supabase.table("curations_metadata").insert({"name": new_curation}).execute() # - in all try/catches. maybe have upper try/catch to do this in one place. extract return f"Error loading video with id '{video_id}'. Exception: {e}", gr.update(), gr.update(), gr.update() try: transcript = yt2transcript(video_id) except Exception as e: return f"Error fetching transcripts for video with id '{video_id}'. Exception: {e}", gr.update(), gr.update(), gr.update() chunks = transcript2chunks(transcript) metadatas = [{'video_id': video_id, 'video_name': video_name, 'curation_ids': goal_curations_with_video} for c in chunks] # *len() was buggy? #import pprint #for i, c in enumerate(chunks): # print(f"{i}: {c}") #print(metadata) print("embedding & uploading to vector db TODO how to get progress from langchain?\n") # TODO consider storing chunk text in supabase - maybe get more storage out of pinecone's s1 if supabase's free tier is sufficient chunk_ids = Vdb.add_texts(chunks, metadatas) print("bookkeeping supabase with new video\n") inserted_row = supa_all(supabase.table("ingested_youtube_videos").insert({"video_id": video_id, "video_name": video_name}).execute())[0] data = supabase.table("ingested_transcripts").insert({'source_id': inserted_row['media_id'], 'num_chunks': len(chunks), 'embedding_model': str(Model_name), 'transcribed_by': 'youtube_transcript_api'}).execute() print("\t- transcripts\n") data = supabase.table('junction_curations').insert([{'curation_id': c, 'media_id': inserted_row['media_id']} for c in goal_curations_with_video]).execute() print("\t- curations\n") data = supabase.table('junction_vectors').insert( [{'chunk_id': c, 'media_id': inserted_row['media_id']} for c in chunk_ids ]).execute() print("\t- vectors\n") else: # some curations already ahve video, so no need to chunk+embed+insert into vector db. just adjust bookkeeping in vector db + supa print("video already in vector db, updating metadata to include selected curations\n") # get media_id of given video media_id = video_id_to_media_id(video_id) # get chunk_ids for the video chunk_rows = supa_all(supabase.table("junction_vectors").select("chunk_id").eq('media_id', media_id).execute()) # then update metadata of both supabase and vectorDB to include new curations for r in chunk_rows: update_response = Pinecone_index.update( id=r['chunk_id'], set_metadata={'curation_ids': goal_curations_with_video} ) # TODO error check update_response data = supabase.table('junction_curations').insert([{'curation_id': c, 'media_id': media_id} for c in curations_to_add_video_to]).execute() #curation_ids = [cur_dict[name] for name in curations_to_add_video_to] status = "Status: Done! Video added, thanks for contributing :D" return status, gr.update(choices=get_curation_names()), gr.update(choices=get_curation_names()), gr.update(value=gen_curation_md()) def query_llm(prompt): response = openai.Completion.create( prompt=prompt, temperature=0, max_tokens=400, top_p=1, frequency_penalty=0, presence_penalty=0, #stop=stop_sequence, model=f'text-davinci-003' ) #print(response) return response["choices"][0]["text"].strip() # this needn't be in hf space, as it will just call out to openai and the db # but why not host it here since it's free vs replits 2 cents/day def ask_question(question: str, openai_apikey: str, curation_names: List[str]): if not question or not openai_apikey or not curation_names: return "error: need all inputs", "" openai.api_key = openai_apikey # query vector db for topk chunks # can't use langchain bc we are using pinecone metadata filtering q_embedding = Embedder(question)#.tolist() curations_dict = curation_name2id() curation_ids = [curations_dict[name] for name in curation_names] results = Pinecone_index.query(vector=q_embedding, filter={'curation_ids': {"$in": curation_ids}}, top_k=5, include_metadata=True) #pprint.pprint(results) # TODO add filters to langchain's pinecone impl? sources = {} chunks = [] for r in results['matches']: chunk = r['metadata']['text'] chunks.append(chunk) video_name = r['metadata']['video_name'] sources.setdefault(video_name, []).append(chunk) sources_md = "## Sources\n" + "\n\n".join([f"### {name}\n" + "\n\n---\n\n".join([f'{c}' for c in chunks]) for name, chunks in sources.items()]) # format prompt (textwrap to guarantee length?) instr = "Answer the question based on the context below, and if the question can't be answered based on the context, say 'I don't know'.\n\nContext:\n- " prompt = instr + "\n- ".join(chunks) + f"\n\nQuestion: {question}\n\nAnswer:" #pprint.pprint(prompt) try: answer = "## Answer\n" + query_llm(prompt) except Exception as e: answer = f"Error: {e}" # query llm and return output and topk return answer, sources_md with gr.Blocks() as demo: curations_from_db = get_curation_names() refresh_button = gr.Button("Synchronize data (with other user's changes)") with gr.Tab("Ask a question"): q = gr.Textbox(label="Your question") openai_apikey = gr.Textbox(label="OpenAI API Key", type="password") curation_names_1 = gr.CheckboxGroup(choices=curations_from_db, label="Curations to query") button = gr.Button("Submit") answer = gr.Markdown(value="") sources = gr.Markdown(value="") button.click(ask_question, inputs=[q, openai_apikey, curation_names_1], outputs=[answer, sources]) with gr.Tab("Browse & Organize Curations"): def refresh_curation_accordion(): output = gen_curation_md() return gr.update(value=output) #md.change(fn=refresh_curation_accordion, inputs=[curation_names_1], outputs=[md]) # for name,id in curation_name2id().items(): # print(id,name,rows) # accordions_state[name] = {'gr_obj': gr.Accordion(name), 'rows': []} # with accordions_state[name]['gr_obj']: # for i,medium in enumerate(row_d[id]): # accordions_state[name]['rows'].append(gr.Row(variant='compact')) # with accordions_state[name]['rows'][i]: # gr.Markdown(medium['video_name']) #delete_button = gr.Button("Delete from Curation") #delete_button.click(...) #refresh_button = gr.Button("Refresh curations") md = gr.Markdown(gen_curation_md()) #refresh_button.click(fn=refresh_curation_accordion, inputs=[], outputs=[md]) with gr.Tab("Add data to Curations"): gr.Markdown("An hour's worth of video seems to take about a minute to upload (ymmv).") video_id = gr.Textbox(label="Youtube video id (NOT full url)", placeholder="lvh3g7eszVQ") curation_names_2 = gr.CheckboxGroup(choices=curations_from_db, label="Add to existing Curations") new_curation = gr.Textbox(label="and/or add to new Curation") button = gr.Button("Submit") status_field = gr.Markdown() submit_click = button.click(ingest_video, inputs=[video_id, curation_names_2, new_curation], outputs=[status_field, curation_names_1, curation_names_2, md]) # TODO need to undo rdb and vdb state if cancel clicked #cancel_button = gr.Button("Cancel", cancels=[submit_click]) def refresh_all_curation_lists(): return gr.update(choices=get_curation_names()), gr.update(choices=get_curation_names()), gr.update(value=gen_curation_md()) refresh_button.click(fn=refresh_all_curation_lists, inputs=[], outputs=[curation_names_1, curation_names_2, md]) demo.launch()