marcgreen's picture
same Embedded update as before in diff location
c4ca511
# prioritized todos
# supabase to store index (apparently can't rely on vector db to do it?) and user's curations / popular curations
# - paused after 1 week inactivity (and i believe pinecone index DELETED after some days of inactivity?!)
# - - TODO backup both pinecone and supabase daily (this should count as the activity), and make publicly accessible
# TODO add discord/github/google auth...via custom js? see supabase docs
# - make users maintainers of their own curations, restrict add perms, introduce edit/delete/clone perms
# - add stars to curations+users profile -> display starred curations first, then sort by most popular
# - securely store user's openai key in supabase for convenience
# TODO better ai arch
# - eg let user customize instr (via langchain's jinji support?)
# - better: make easy to experiment with langchain's chains/agents
# - maybe something like model_laboratory with gradio's Parallel block?
# - account for mpnet's limit of 384 word pieces per chunk (is it done already?)
# - - more deliberate chunking strat in general
# TODO summarize a vid (and optionally add to curation)
# TODO support yt playlists and yt channels in addition to just one-off videos
# - can i make this really easy to add via a well designed api?
# unprioritized todos
# TODO some inline todos below that should reduce need to reset/rollback DBs
# - how to easily rollback bad data?
# TODO harrison thinks editing vectorDB abstraction to consume Embedding class vs func is a good approach -> need to PR this
# TODO can i generalize the query filter approach (add to langchain?) to remove coupling to pinecone?
# - i believe elastic8.5 supports rdb and vdb, but need nontrivial specs to run it i think
# TODO user prefs data model (their curations)
# TODO finalize deployment strategy
# - supabase free tier for db + blob storage of transcripts
# - hf space to host model computations (langchain bits need to run here)
# - replit or supabase to host edge functiosn to call hf space
# TODO gradio global state to track recently asked questions from everyone
# TODO create pinecone index without indexing text metadata field for performance: https://docs.pinecone.io/docs/manage-indexes#selective-metadata-indexing
# TODO could use pinecone namespace per embedding model
# TODO deploy txtai to fly.io free tier? not sure compute reqs
# - or haystack?
# - both these come with many features out of the box
import os
import json
import uuid
import openai
import spacy
import en_core_web_sm
import gradio as gr
from gradio import blocks
import pinecone
from supabase import create_client, Client
from langchain.vectorstores import Pinecone
from langchain.text_splitter import SpacyTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from pytube import YouTube
from youtube_transcript_api import YouTubeTranscriptApi
from typing import Any, Callable, Dict, Iterable, List, Optional
Embedder = HuggingFaceEmbeddings().embed_query
Model_name = HuggingFaceEmbeddings().model_name
PINECONE_APIKEY: str = os.environ.get("PINECONE_APIKEY")
SUPABASE_URL: str = os.environ.get("SUPABASE_URL")
SUPABASE_KEY: str = os.environ.get("SUPABASE_KEY")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
pinecone.init(
api_key=PINECONE_APIKEY,
environment='us-west1-gcp' # only option for for free tier
)
class MyPinecone(Pinecone):
def add_texts(
self, texts: Iterable[str], metadatas: Optional[List[dict]] = None
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
Returns:
List of ids from adding the texts into the vectorstore.
"""
# Embed and create the documents
docs = []
ids = []
for i, text in enumerate(texts):
id = str(uuid.uuid4())
embedding = self._embedding_function(text)#.tolist()
metadata = metadatas[i] if metadatas else {}
metadata[self._text_key] = text
docs.append((id, embedding, metadata))
ids.append(id)
# upsert to Pinecone
self._index.upsert(vectors=docs)
return ids
Pinecone_index = pinecone.Index('semantic-curations')
Vdb = MyPinecone(Pinecone_index, Embedder, "text")
def supa_all(supa_data) -> List[dict]:
datajson = json.loads(supa_data.json())
return datajson['data']
def transcript2chunks(transcript):
print("starting transcript2chunks")
# TODO what's a good chunk_size?
# TODO should store as metadata in dbs
r = SpacyTextSplitter(chunk_size = 2000).split_text(transcript)
print("finished chunking")
return r
def video_id_to_media_id(video_id: str) -> Optional[str]:
rows = supa_all(supabase.table('ingested_youtube_videos').select('media_id').eq('video_id', video_id).execute())
print(rows)
if len(rows) == 1:
return rows[0]['media_id']
else:
return None
# returns curation_ids that already have the video_id
def check_curations_with_video(video_id: str) -> List[str]:
media_id = video_id_to_media_id(video_id)
print(f"media_id {media_id}")
if media_id is None:
return []
data = supa_all(supabase.table("junction_curations").select("curation_id").eq('media_id', media_id).execute())
in_curations = [r['curation_id'] for r in data]
return in_curations
def yt2transcript(video_id):
print(f"\n\nstarting yt2transcript on id: {video_id}")
# data looks like [{'text': 'hey friends welcome to one little coder', 'start': 0.84, 'duration': 4.38}, ...]
data = YouTubeTranscriptApi.get_transcript(video_id)
transcript = ' '.join([x['text'] for x in data])
print("got transcript")
# TODO if there is no transcript (how likely is this?), run through whisper-large on hf (but 30k free characters per month)
# TODO ought to store timestamp of chunks in metadata for better Sources.
# - instead of splitting transcript into chunks, can i merge these fragments into approp size? langchain has merge func
return transcript
def yt_id2name(video_id: str) -> str:
video = YouTube(f"https://www.youtube.com/watch?v={video_id}")
return video.title
# db guarantees name is unique across rows
def curation_name2id() -> dict:
rows = supa_all(supabase.table("curations_metadata").select("curation_id, name").execute())
c = {}
for r in rows:
c[r['name']] = r['curation_id']
return c
def get_curation_names():
d = curation_name2id()
return list(d.keys())
def get_curations_and_videos():
rows = supa_all(supabase.table("curations_metadata").select("curation_id, name, media_id:ingested_youtube_videos ( video_name )").execute())
row_d = {}
for r in rows:
for m in r['media_id']:
row_d.setdefault(r['name'], []).append(m['video_name'])
return row_d
def gen_curation_md():
output = ""
for curation_name,video_names in get_curations_and_videos().items():
output += f"\n## {curation_name}\n"
output += "1. " + "\n1. ".join(video_names)
return output
def ingest_video(video_id: str, selected_curation_names: List[str], new_curation: str = ""):
video_id = video_id.strip()
if new_curation:
curcur = curation_name2id()
if new_curation in curcur.keys():
return "dupe curation name", gr.update(), gr.update(), gr.update()
# add to db here, which will autogen the id
supabase.table("curations_metadata").insert({"name": new_curation}).execute()
selected_curation_names.append(new_curation)
if not selected_curation_names: # contains new_curation at this point
return "need >=1 curations", gr.update(), gr.update(), gr.update()
cur_dict = curation_name2id()
selected_curation_ids = [cur_dict[n] for n in selected_curation_names]
existing_curations_with_video = check_curations_with_video(video_id)
curations_to_add_video_to = list(set(selected_curation_ids).difference(set(existing_curations_with_video)))
goal_curations_with_video = existing_curations_with_video + curations_to_add_video_to
if not curations_to_add_video_to: # video already in all selected curations
return "dupe video", gr.update(), gr.update(), gr.update()
if len(existing_curations_with_video) == 0: # no curations have the video, we need to add it to vector db
assert(goal_curations_with_video == curations_to_add_video_to) # this should be true in this case
print("new video, processing\n")
try:
video_name = yt_id2name(video_id)
except Exception as e:
# TODO undo new_curation create supabase.table("curations_metadata").insert({"name": new_curation}).execute()
# - in all try/catches. maybe have upper try/catch to do this in one place. extract
return f"Error loading video with id '{video_id}'. Exception: {e}", gr.update(), gr.update(), gr.update()
try:
transcript = yt2transcript(video_id)
except Exception as e:
return f"Error fetching transcripts for video with id '{video_id}'. Exception: {e}", gr.update(), gr.update(), gr.update()
chunks = transcript2chunks(transcript)
metadatas = [{'video_id': video_id, 'video_name': video_name, 'curation_ids': goal_curations_with_video} for c in chunks] # *len() was buggy?
#import pprint
#for i, c in enumerate(chunks):
# print(f"{i}: {c}")
#print(metadata)
print("embedding & uploading to vector db TODO how to get progress from langchain?\n")
# TODO consider storing chunk text in supabase - maybe get more storage out of pinecone's s1 if supabase's free tier is sufficient
chunk_ids = Vdb.add_texts(chunks, metadatas)
print("bookkeeping supabase with new video\n")
inserted_row = supa_all(supabase.table("ingested_youtube_videos").insert({"video_id": video_id,
"video_name": video_name}).execute())[0]
data = supabase.table("ingested_transcripts").insert({'source_id': inserted_row['media_id'],
'num_chunks': len(chunks),
'embedding_model': str(Model_name),
'transcribed_by': 'youtube_transcript_api'}).execute()
print("\t- transcripts\n")
data = supabase.table('junction_curations').insert([{'curation_id': c, 'media_id': inserted_row['media_id']} for c in goal_curations_with_video]).execute()
print("\t- curations\n")
data = supabase.table('junction_vectors').insert( [{'chunk_id': c, 'media_id': inserted_row['media_id']} for c in chunk_ids ]).execute()
print("\t- vectors\n")
else: # some curations already ahve video, so no need to chunk+embed+insert into vector db. just adjust bookkeeping in vector db + supa
print("video already in vector db, updating metadata to include selected curations\n")
# get media_id of given video
media_id = video_id_to_media_id(video_id)
# get chunk_ids for the video
chunk_rows = supa_all(supabase.table("junction_vectors").select("chunk_id").eq('media_id', media_id).execute())
# then update metadata of both supabase and vectorDB to include new curations
for r in chunk_rows:
update_response = Pinecone_index.update(
id=r['chunk_id'],
set_metadata={'curation_ids': goal_curations_with_video}
)
# TODO error check update_response
data = supabase.table('junction_curations').insert([{'curation_id': c, 'media_id': media_id} for c in curations_to_add_video_to]).execute()
#curation_ids = [cur_dict[name] for name in curations_to_add_video_to]
status = "Status: Done! Video added, thanks for contributing :D"
return status, gr.update(choices=get_curation_names()), gr.update(choices=get_curation_names()), gr.update(value=gen_curation_md())
def query_llm(prompt):
response = openai.Completion.create(
prompt=prompt,
temperature=0,
max_tokens=400,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
#stop=stop_sequence,
model=f'text-davinci-003'
)
#print(response)
return response["choices"][0]["text"].strip()
# this needn't be in hf space, as it will just call out to openai and the db
# but why not host it here since it's free vs replits 2 cents/day
def ask_question(question: str, openai_apikey: str, curation_names: List[str]):
if not question or not openai_apikey or not curation_names:
return "error: need all inputs", ""
openai.api_key = openai_apikey
# query vector db for topk chunks
# can't use langchain bc we are using pinecone metadata filtering
q_embedding = Embedder(question)#.tolist()
curations_dict = curation_name2id()
curation_ids = [curations_dict[name] for name in curation_names]
results = Pinecone_index.query(vector=q_embedding, filter={'curation_ids': {"$in": curation_ids}}, top_k=5, include_metadata=True)
#pprint.pprint(results)
# TODO add filters to langchain's pinecone impl?
sources = {}
chunks = []
for r in results['matches']:
chunk = r['metadata']['text']
chunks.append(chunk)
video_name = r['metadata']['video_name']
sources.setdefault(video_name, []).append(chunk)
sources_md = "## Sources\n" + "\n\n".join([f"### {name}\n" + "\n\n---\n\n".join([f'{c}' for c in chunks]) for name, chunks in sources.items()])
# format prompt (textwrap to guarantee length?)
instr = "Answer the question based on the context below, and if the question can't be answered based on the context, say 'I don't know'.\n\nContext:\n- "
prompt = instr + "\n- ".join(chunks) + f"\n\nQuestion: {question}\n\nAnswer:"
#pprint.pprint(prompt)
try:
answer = "## Answer\n" + query_llm(prompt)
except Exception as e:
answer = f"Error: {e}"
# query llm and return output and topk
return answer, sources_md
with gr.Blocks() as demo:
curations_from_db = get_curation_names()
refresh_button = gr.Button("Synchronize data (with other user's changes)")
with gr.Tab("Ask a question"):
q = gr.Textbox(label="Your question")
openai_apikey = gr.Textbox(label="OpenAI API Key", type="password")
curation_names_1 = gr.CheckboxGroup(choices=curations_from_db, label="Curations to query")
button = gr.Button("Submit")
answer = gr.Markdown(value="")
sources = gr.Markdown(value="")
button.click(ask_question, inputs=[q, openai_apikey, curation_names_1], outputs=[answer, sources])
with gr.Tab("Browse & Organize Curations"):
def refresh_curation_accordion():
output = gen_curation_md()
return gr.update(value=output)
#md.change(fn=refresh_curation_accordion, inputs=[curation_names_1], outputs=[md])
# for name,id in curation_name2id().items():
# print(id,name,rows)
# accordions_state[name] = {'gr_obj': gr.Accordion(name), 'rows': []}
# with accordions_state[name]['gr_obj']:
# for i,medium in enumerate(row_d[id]):
# accordions_state[name]['rows'].append(gr.Row(variant='compact'))
# with accordions_state[name]['rows'][i]:
# gr.Markdown(medium['video_name'])
#delete_button = gr.Button("Delete from Curation")
#delete_button.click(...)
#refresh_button = gr.Button("Refresh curations")
md = gr.Markdown(gen_curation_md())
#refresh_button.click(fn=refresh_curation_accordion, inputs=[], outputs=[md])
with gr.Tab("Add data to Curations"):
gr.Markdown("An hour's worth of video seems to take about a minute to upload (ymmv).")
video_id = gr.Textbox(label="Youtube video id (NOT full url)", placeholder="lvh3g7eszVQ")
curation_names_2 = gr.CheckboxGroup(choices=curations_from_db, label="Add to existing Curations")
new_curation = gr.Textbox(label="and/or add to new Curation")
button = gr.Button("Submit")
status_field = gr.Markdown()
submit_click = button.click(ingest_video, inputs=[video_id, curation_names_2, new_curation], outputs=[status_field, curation_names_1, curation_names_2, md])
# TODO need to undo rdb and vdb state if cancel clicked
#cancel_button = gr.Button("Cancel", cancels=[submit_click])
def refresh_all_curation_lists():
return gr.update(choices=get_curation_names()), gr.update(choices=get_curation_names()), gr.update(value=gen_curation_md())
refresh_button.click(fn=refresh_all_curation_lists, inputs=[], outputs=[curation_names_1, curation_names_2, md])
demo.launch()