import streamlit as st import pinecone from sentence_transformers import SentenceTransformer import logging import openai import gradio as gr PINECONE_KEY = st.secrets["PINECONE_KEY"] # app.pinecone.io OPENAI_KEY = None # st.secrets["OPENAI_KEY"] INDEX_ID = 'filled-stacks-search' @st.experimental_singleton def init_openai(): openai.api_key = OPENAI_KEY @st.experimental_singleton def init_pinecone(): pinecone.init(api_key=PINECONE_KEY, environment="us-west1-gcp") return pinecone.Index(INDEX_ID) @st.experimental_singleton def init_retriever(): return SentenceTransformer("multi-qa-mpnet-base-dot-v1") def make_query(query, retriever, top_k=3, include_values=True, include_metadata=True, filter=None): xq = retriever.encode([query]).tolist() logging.info(f"Query: {query}") attempt = 0 while attempt < 3: try: xc = st.session_state.index.query( xq, top_k=top_k, include_values=include_values, include_metadata=include_metadata, filter=filter ) matches = xc['matches'] break except: # force reload pinecone.init(api_key=PINECONE_KEY, environment="us-west1-gcp") st.session_state.index = pinecone.Index(INDEX_ID) attempt += 1 matches = [] if len(matches) == 0: logging.error(f"Query failed") return matches def get_prompt(matches): contexts = [ x['metadata']['text'] for x in matches ] prompt_start = ( "Answer the question based on the context below.\n\n"+ "Context:\n" ) prompt_end = ( f"\n\nQuestion: {query}\nAnswer:" ) limit = 3750 for i in range(1, len(contexts)): if len("\n\n--\n\n".join(contexts[:i])) >= limit: prompt = ( prompt_start + "\n\n--\n\n".join(contexts[:i-1]) + prompt_end ) break elif i == len(contexts) - 1: prompt = ( prompt_start + "\n\n--\n\n".join(contexts) + prompt_end ) return prompt st.session_state.index = init_pinecone() retriever = init_retriever() def card(thumbnail: str, title: str, urls: list, contexts: list, starts: list, ends: list): meta = [(e, s, u, c) for e, s, u, c in zip(ends, starts, urls, contexts)] meta.sort(reverse=False) text_content = [] current_start = 0 current_end = 0 for end, start, url, context in meta: # reformat seconds to timestamp time = start / 60 mins = f"0{int(time)}"[-2:] secs = f"0{int(round((time - int(mins))*60, 0))}"[-2:] timestamp = f"{mins}:{secs}" if start < current_end and start > current_start: # this means it is a continuation of the previous sentence text_content[-1][0] = text_content[-1][0].split(context[:10])[0] text_content.append([f"[{timestamp}] {context.capitalize()}", url]) else: text_content.append(["xxLINEBREAKxx", ""]) text_content.append([f"[{timestamp}] {context}", url]) current_start = start current_end = end html_text = "" for text, url in text_content: if text == "xxLINEBREAKxx": html_text += "
" else: html_text += f"{text.strip()}... " print(text) html = f"""

{title}

{html_text}

""" return st.markdown(html, unsafe_allow_html=True) st.write(""" # FilledStacks Search """) st.info(""" Ask a question about the FilledStacks YouTube Channel """) st.markdown(""" """, unsafe_allow_html=True) query = st.text_input("", "", placeholder="e.g.: how does stacked work?") if query != "": print(f"query: {query}") matches = make_query( query, retriever, top_k=5, ) # if st.session_state.summarize: # if OPENAI_KEY is not None: # prompt = get_prompt(matches) # res = openai.Completion.create( # engine='text-davinci-003', # prompt=prompt, # temperature=0, # max_tokens=300, # top_p=1, # frequency_penalty=0, # presence_penalty=0, # stop=".", # ) # summary = res['choices'][0]['text'].strip() # st.info(f"Summary:\n{summary}") # else: # st.info("Please enter your OpenAI key to generate a summary") results = {} order = [] for context in matches: video_id = context['metadata']['url'].split('/')[-1] if video_id not in results: results[video_id] = { 'title': context['metadata']['title'], 'urls': [f"{context['metadata']['url']}?t={int(context['metadata']['start'])}"], 'contexts': [context['metadata']['text']], 'starts': [int(context['metadata']['start'])], 'ends': [int(context['metadata']['end'])] } order.append(video_id) else: results[video_id]['urls'].append( f"{context['metadata']['url']}?t={int(context['metadata']['start'])}" ) results[video_id]['contexts'].append( context['metadata']['text'] ) results[video_id]['starts'].append(int(context['metadata']['start'])) results[video_id]['ends'].append(int(context['metadata']['end'])) # now display cards for video_id in order: card( thumbnail=f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg", title=results[video_id]['title'], urls=results[video_id]['urls'], contexts=results[video_id]['contexts'], starts=results[video_id]['starts'], ends=results[video_id]['ends'] )