Spaces:
Runtime error
Runtime error
| # prioritized todos | |
| # supabase to store index (apparently can't rely on vector db to do it?) and user's curations / popular curations | |
| # - paused after 1 week inactivity (and i believe pinecone index DELETED after some days of inactivity?!) | |
| # - - TODO backup both pinecone and supabase daily (this should count as the activity), and make publicly accessible | |
| # TODO add discord/github/google auth...via custom js? see supabase docs | |
| # - make users maintainers of their own curations, restrict add perms, introduce edit/delete/clone perms | |
| # - add stars to curations+users profile -> display starred curations first, then sort by most popular | |
| # - securely store user's openai key in supabase for convenience | |
| # TODO better ai arch | |
| # - eg let user customize instr (via langchain's jinji support?) | |
| # - better: make easy to experiment with langchain's chains/agents | |
| # - maybe something like model_laboratory with gradio's Parallel block? | |
| # - account for mpnet's limit of 384 word pieces per chunk (is it done already?) | |
| # - - more deliberate chunking strat in general | |
| # TODO summarize a vid (and optionally add to curation) | |
| # TODO support yt playlists and yt channels in addition to just one-off videos | |
| # - can i make this really easy to add via a well designed api? | |
| # unprioritized todos | |
| # TODO some inline todos below that should reduce need to reset/rollback DBs | |
| # - how to easily rollback bad data? | |
| # TODO harrison thinks editing vectorDB abstraction to consume Embedding class vs func is a good approach -> need to PR this | |
| # TODO can i generalize the query filter approach (add to langchain?) to remove coupling to pinecone? | |
| # - i believe elastic8.5 supports rdb and vdb, but need nontrivial specs to run it i think | |
| # TODO user prefs data model (their curations) | |
| # TODO finalize deployment strategy | |
| # - supabase free tier for db + blob storage of transcripts | |
| # - hf space to host model computations (langchain bits need to run here) | |
| # - replit or supabase to host edge functiosn to call hf space | |
| # TODO gradio global state to track recently asked questions from everyone | |
| # TODO create pinecone index without indexing text metadata field for performance: https://docs.pinecone.io/docs/manage-indexes#selective-metadata-indexing | |
| # TODO could use pinecone namespace per embedding model | |
| # TODO deploy txtai to fly.io free tier? not sure compute reqs | |
| # - or haystack? | |
| # - both these come with many features out of the box | |
| import os | |
| import json | |
| import uuid | |
| import openai | |
| import spacy | |
| import en_core_web_sm | |
| import gradio as gr | |
| from gradio import blocks | |
| import pinecone | |
| from supabase import create_client, Client | |
| from langchain.vectorstores import Pinecone | |
| from langchain.text_splitter import SpacyTextSplitter | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| from pytube import YouTube | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from typing import Any, Callable, Dict, Iterable, List, Optional | |
| Embedder = HuggingFaceEmbeddings().embed_query | |
| Model_name = HuggingFaceEmbeddings().model_name | |
| PINECONE_APIKEY: str = os.environ.get("PINECONE_APIKEY") | |
| SUPABASE_URL: str = os.environ.get("SUPABASE_URL") | |
| SUPABASE_KEY: str = os.environ.get("SUPABASE_KEY") | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| pinecone.init( | |
| api_key=PINECONE_APIKEY, | |
| environment='us-west1-gcp' # only option for for free tier | |
| ) | |
| class MyPinecone(Pinecone): | |
| def add_texts( | |
| self, texts: Iterable[str], metadatas: Optional[List[dict]] = None | |
| ) -> List[str]: | |
| """Run more texts through the embeddings and add to the vectorstore. | |
| Args: | |
| texts: Iterable of strings to add to the vectorstore. | |
| metadatas: Optional list of metadatas associated with the texts. | |
| Returns: | |
| List of ids from adding the texts into the vectorstore. | |
| """ | |
| # Embed and create the documents | |
| docs = [] | |
| ids = [] | |
| for i, text in enumerate(texts): | |
| id = str(uuid.uuid4()) | |
| embedding = self._embedding_function(text)#.tolist() | |
| metadata = metadatas[i] if metadatas else {} | |
| metadata[self._text_key] = text | |
| docs.append((id, embedding, metadata)) | |
| ids.append(id) | |
| # upsert to Pinecone | |
| self._index.upsert(vectors=docs) | |
| return ids | |
| Pinecone_index = pinecone.Index('semantic-curations') | |
| Vdb = MyPinecone(Pinecone_index, Embedder, "text") | |
| def supa_all(supa_data) -> List[dict]: | |
| datajson = json.loads(supa_data.json()) | |
| return datajson['data'] | |
| def transcript2chunks(transcript): | |
| print("starting transcript2chunks") | |
| # TODO what's a good chunk_size? | |
| # TODO should store as metadata in dbs | |
| r = SpacyTextSplitter(chunk_size = 2000).split_text(transcript) | |
| print("finished chunking") | |
| return r | |
| def video_id_to_media_id(video_id: str) -> Optional[str]: | |
| rows = supa_all(supabase.table('ingested_youtube_videos').select('media_id').eq('video_id', video_id).execute()) | |
| print(rows) | |
| if len(rows) == 1: | |
| return rows[0]['media_id'] | |
| else: | |
| return None | |
| # returns curation_ids that already have the video_id | |
| def check_curations_with_video(video_id: str) -> List[str]: | |
| media_id = video_id_to_media_id(video_id) | |
| print(f"media_id {media_id}") | |
| if media_id is None: | |
| return [] | |
| data = supa_all(supabase.table("junction_curations").select("curation_id").eq('media_id', media_id).execute()) | |
| in_curations = [r['curation_id'] for r in data] | |
| return in_curations | |
| def yt2transcript(video_id): | |
| print(f"\n\nstarting yt2transcript on id: {video_id}") | |
| # data looks like [{'text': 'hey friends welcome to one little coder', 'start': 0.84, 'duration': 4.38}, ...] | |
| data = YouTubeTranscriptApi.get_transcript(video_id) | |
| transcript = ' '.join([x['text'] for x in data]) | |
| print("got transcript") | |
| # TODO if there is no transcript (how likely is this?), run through whisper-large on hf (but 30k free characters per month) | |
| # TODO ought to store timestamp of chunks in metadata for better Sources. | |
| # - instead of splitting transcript into chunks, can i merge these fragments into approp size? langchain has merge func | |
| return transcript | |
| def yt_id2name(video_id: str) -> str: | |
| video = YouTube(f"https://www.youtube.com/watch?v={video_id}") | |
| return video.title | |
| # db guarantees name is unique across rows | |
| def curation_name2id() -> dict: | |
| rows = supa_all(supabase.table("curations_metadata").select("curation_id, name").execute()) | |
| c = {} | |
| for r in rows: | |
| c[r['name']] = r['curation_id'] | |
| return c | |
| def get_curation_names(): | |
| d = curation_name2id() | |
| return list(d.keys()) | |
| def get_curations_and_videos(): | |
| rows = supa_all(supabase.table("curations_metadata").select("curation_id, name, media_id:ingested_youtube_videos ( video_name )").execute()) | |
| row_d = {} | |
| for r in rows: | |
| for m in r['media_id']: | |
| row_d.setdefault(r['name'], []).append(m['video_name']) | |
| return row_d | |
| def gen_curation_md(): | |
| output = "" | |
| for curation_name,video_names in get_curations_and_videos().items(): | |
| output += f"\n## {curation_name}\n" | |
| output += "1. " + "\n1. ".join(video_names) | |
| return output | |
| def ingest_video(video_id: str, selected_curation_names: List[str], new_curation: str = ""): | |
| video_id = video_id.strip() | |
| if new_curation: | |
| curcur = curation_name2id() | |
| if new_curation in curcur.keys(): | |
| return "dupe curation name", gr.update(), gr.update(), gr.update() | |
| # add to db here, which will autogen the id | |
| supabase.table("curations_metadata").insert({"name": new_curation}).execute() | |
| selected_curation_names.append(new_curation) | |
| if not selected_curation_names: # contains new_curation at this point | |
| return "need >=1 curations", gr.update(), gr.update(), gr.update() | |
| cur_dict = curation_name2id() | |
| selected_curation_ids = [cur_dict[n] for n in selected_curation_names] | |
| existing_curations_with_video = check_curations_with_video(video_id) | |
| curations_to_add_video_to = list(set(selected_curation_ids).difference(set(existing_curations_with_video))) | |
| goal_curations_with_video = existing_curations_with_video + curations_to_add_video_to | |
| if not curations_to_add_video_to: # video already in all selected curations | |
| return "dupe video", gr.update(), gr.update(), gr.update() | |
| if len(existing_curations_with_video) == 0: # no curations have the video, we need to add it to vector db | |
| assert(goal_curations_with_video == curations_to_add_video_to) # this should be true in this case | |
| print("new video, processing\n") | |
| try: | |
| video_name = yt_id2name(video_id) | |
| except Exception as e: | |
| # TODO undo new_curation create supabase.table("curations_metadata").insert({"name": new_curation}).execute() | |
| # - in all try/catches. maybe have upper try/catch to do this in one place. extract | |
| return f"Error loading video with id '{video_id}'. Exception: {e}", gr.update(), gr.update(), gr.update() | |
| try: | |
| transcript = yt2transcript(video_id) | |
| except Exception as e: | |
| return f"Error fetching transcripts for video with id '{video_id}'. Exception: {e}", gr.update(), gr.update(), gr.update() | |
| chunks = transcript2chunks(transcript) | |
| metadatas = [{'video_id': video_id, 'video_name': video_name, 'curation_ids': goal_curations_with_video} for c in chunks] # *len() was buggy? | |
| #import pprint | |
| #for i, c in enumerate(chunks): | |
| # print(f"{i}: {c}") | |
| #print(metadata) | |
| print("embedding & uploading to vector db TODO how to get progress from langchain?\n") | |
| # TODO consider storing chunk text in supabase - maybe get more storage out of pinecone's s1 if supabase's free tier is sufficient | |
| chunk_ids = Vdb.add_texts(chunks, metadatas) | |
| print("bookkeeping supabase with new video\n") | |
| inserted_row = supa_all(supabase.table("ingested_youtube_videos").insert({"video_id": video_id, | |
| "video_name": video_name}).execute())[0] | |
| data = supabase.table("ingested_transcripts").insert({'source_id': inserted_row['media_id'], | |
| 'num_chunks': len(chunks), | |
| 'embedding_model': str(Model_name), | |
| 'transcribed_by': 'youtube_transcript_api'}).execute() | |
| print("\t- transcripts\n") | |
| data = supabase.table('junction_curations').insert([{'curation_id': c, 'media_id': inserted_row['media_id']} for c in goal_curations_with_video]).execute() | |
| print("\t- curations\n") | |
| data = supabase.table('junction_vectors').insert( [{'chunk_id': c, 'media_id': inserted_row['media_id']} for c in chunk_ids ]).execute() | |
| print("\t- vectors\n") | |
| else: # some curations already ahve video, so no need to chunk+embed+insert into vector db. just adjust bookkeeping in vector db + supa | |
| print("video already in vector db, updating metadata to include selected curations\n") | |
| # get media_id of given video | |
| media_id = video_id_to_media_id(video_id) | |
| # get chunk_ids for the video | |
| chunk_rows = supa_all(supabase.table("junction_vectors").select("chunk_id").eq('media_id', media_id).execute()) | |
| # then update metadata of both supabase and vectorDB to include new curations | |
| for r in chunk_rows: | |
| update_response = Pinecone_index.update( | |
| id=r['chunk_id'], | |
| set_metadata={'curation_ids': goal_curations_with_video} | |
| ) | |
| # TODO error check update_response | |
| data = supabase.table('junction_curations').insert([{'curation_id': c, 'media_id': media_id} for c in curations_to_add_video_to]).execute() | |
| #curation_ids = [cur_dict[name] for name in curations_to_add_video_to] | |
| status = "Status: Done! Video added, thanks for contributing :D" | |
| return status, gr.update(choices=get_curation_names()), gr.update(choices=get_curation_names()), gr.update(value=gen_curation_md()) | |
| def query_llm(prompt): | |
| response = openai.Completion.create( | |
| prompt=prompt, | |
| temperature=0, | |
| max_tokens=400, | |
| top_p=1, | |
| frequency_penalty=0, | |
| presence_penalty=0, | |
| #stop=stop_sequence, | |
| model=f'text-davinci-003' | |
| ) | |
| #print(response) | |
| return response["choices"][0]["text"].strip() | |
| # this needn't be in hf space, as it will just call out to openai and the db | |
| # but why not host it here since it's free vs replits 2 cents/day | |
| def ask_question(question: str, openai_apikey: str, curation_names: List[str]): | |
| if not question or not openai_apikey or not curation_names: | |
| return "error: need all inputs", "" | |
| openai.api_key = openai_apikey | |
| # query vector db for topk chunks | |
| # can't use langchain bc we are using pinecone metadata filtering | |
| q_embedding = Embedder(question)#.tolist() | |
| curations_dict = curation_name2id() | |
| curation_ids = [curations_dict[name] for name in curation_names] | |
| results = Pinecone_index.query(vector=q_embedding, filter={'curation_ids': {"$in": curation_ids}}, top_k=5, include_metadata=True) | |
| #pprint.pprint(results) | |
| # TODO add filters to langchain's pinecone impl? | |
| sources = {} | |
| chunks = [] | |
| for r in results['matches']: | |
| chunk = r['metadata']['text'] | |
| chunks.append(chunk) | |
| video_name = r['metadata']['video_name'] | |
| sources.setdefault(video_name, []).append(chunk) | |
| sources_md = "## Sources\n" + "\n\n".join([f"### {name}\n" + "\n\n---\n\n".join([f'{c}' for c in chunks]) for name, chunks in sources.items()]) | |
| # format prompt (textwrap to guarantee length?) | |
| instr = "Answer the question based on the context below, and if the question can't be answered based on the context, say 'I don't know'.\n\nContext:\n- " | |
| prompt = instr + "\n- ".join(chunks) + f"\n\nQuestion: {question}\n\nAnswer:" | |
| #pprint.pprint(prompt) | |
| try: | |
| answer = "## Answer\n" + query_llm(prompt) | |
| except Exception as e: | |
| answer = f"Error: {e}" | |
| # query llm and return output and topk | |
| return answer, sources_md | |
| with gr.Blocks() as demo: | |
| curations_from_db = get_curation_names() | |
| refresh_button = gr.Button("Synchronize data (with other user's changes)") | |
| with gr.Tab("Ask a question"): | |
| q = gr.Textbox(label="Your question") | |
| openai_apikey = gr.Textbox(label="OpenAI API Key", type="password") | |
| curation_names_1 = gr.CheckboxGroup(choices=curations_from_db, label="Curations to query") | |
| button = gr.Button("Submit") | |
| answer = gr.Markdown(value="") | |
| sources = gr.Markdown(value="") | |
| button.click(ask_question, inputs=[q, openai_apikey, curation_names_1], outputs=[answer, sources]) | |
| with gr.Tab("Browse & Organize Curations"): | |
| def refresh_curation_accordion(): | |
| output = gen_curation_md() | |
| return gr.update(value=output) | |
| #md.change(fn=refresh_curation_accordion, inputs=[curation_names_1], outputs=[md]) | |
| # for name,id in curation_name2id().items(): | |
| # print(id,name,rows) | |
| # accordions_state[name] = {'gr_obj': gr.Accordion(name), 'rows': []} | |
| # with accordions_state[name]['gr_obj']: | |
| # for i,medium in enumerate(row_d[id]): | |
| # accordions_state[name]['rows'].append(gr.Row(variant='compact')) | |
| # with accordions_state[name]['rows'][i]: | |
| # gr.Markdown(medium['video_name']) | |
| #delete_button = gr.Button("Delete from Curation") | |
| #delete_button.click(...) | |
| #refresh_button = gr.Button("Refresh curations") | |
| md = gr.Markdown(gen_curation_md()) | |
| #refresh_button.click(fn=refresh_curation_accordion, inputs=[], outputs=[md]) | |
| with gr.Tab("Add data to Curations"): | |
| gr.Markdown("An hour's worth of video seems to take about a minute to upload (ymmv).") | |
| video_id = gr.Textbox(label="Youtube video id (NOT full url)", placeholder="lvh3g7eszVQ") | |
| curation_names_2 = gr.CheckboxGroup(choices=curations_from_db, label="Add to existing Curations") | |
| new_curation = gr.Textbox(label="and/or add to new Curation") | |
| button = gr.Button("Submit") | |
| status_field = gr.Markdown() | |
| submit_click = button.click(ingest_video, inputs=[video_id, curation_names_2, new_curation], outputs=[status_field, curation_names_1, curation_names_2, md]) | |
| # TODO need to undo rdb and vdb state if cancel clicked | |
| #cancel_button = gr.Button("Cancel", cancels=[submit_click]) | |
| def refresh_all_curation_lists(): | |
| return gr.update(choices=get_curation_names()), gr.update(choices=get_curation_names()), gr.update(value=gen_curation_md()) | |
| refresh_button.click(fn=refresh_all_curation_lists, inputs=[], outputs=[curation_names_1, curation_names_2, md]) | |
| demo.launch() |