Spaces:
Runtime error
Runtime error
File size: 6,682 Bytes
af19ad5 b34e24f af19ad5 8808888 d0bae51 af19ad5 9e1ad1a af19ad5 0176572 af19ad5 0f7ff0b af19ad5 dab8b9c af19ad5 75c18e3 af19ad5 7430732 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | import streamlit as st
import pinecone
from sentence_transformers import SentenceTransformer
import logging
import openai
import gradio as gr
PINECONE_KEY = st.secrets["PINECONE_KEY"] # app.pinecone.io
OPENAI_KEY = None
# st.secrets["OPENAI_KEY"]
INDEX_ID = 'filled-stacks-search'
@st.experimental_singleton
def init_openai():
openai.api_key = OPENAI_KEY
@st.experimental_singleton
def init_pinecone():
pinecone.init(api_key=PINECONE_KEY, environment="us-west1-gcp")
return pinecone.Index(INDEX_ID)
@st.experimental_singleton
def init_retriever():
return SentenceTransformer("multi-qa-mpnet-base-dot-v1")
def make_query(query, retriever, top_k=3, include_values=True, include_metadata=True, filter=None):
xq = retriever.encode([query]).tolist()
logging.info(f"Query: {query}")
attempt = 0
while attempt < 3:
try:
xc = st.session_state.index.query(
xq,
top_k=top_k,
include_values=include_values,
include_metadata=include_metadata,
filter=filter
)
matches = xc['matches']
break
except:
# force reload
pinecone.init(api_key=PINECONE_KEY, environment="us-west1-gcp")
st.session_state.index = pinecone.Index(INDEX_ID)
attempt += 1
matches = []
if len(matches) == 0:
logging.error(f"Query failed")
return matches
def get_prompt(matches):
contexts = [
x['metadata']['text'] for x in matches
]
prompt_start = (
"Answer the question based on the context below.\n\n"+
"Context:\n"
)
prompt_end = (
f"\n\nQuestion: {query}\nAnswer:"
)
limit = 3750
for i in range(1, len(contexts)):
if len("\n\n--\n\n".join(contexts[:i])) >= limit:
prompt = (
prompt_start +
"\n\n--\n\n".join(contexts[:i-1]) +
prompt_end
)
break
elif i == len(contexts) - 1:
prompt = (
prompt_start +
"\n\n--\n\n".join(contexts) +
prompt_end
)
return prompt
st.session_state.index = init_pinecone()
retriever = init_retriever()
def card(thumbnail: str, title: str, urls: list, contexts: list, starts: list, ends: list):
meta = [(e, s, u, c) for e, s, u, c in zip(ends, starts, urls, contexts)]
meta.sort(reverse=False)
text_content = []
current_start = 0
current_end = 0
for end, start, url, context in meta:
# reformat seconds to timestamp
time = start / 60
mins = f"0{int(time)}"[-2:]
secs = f"0{int(round((time - int(mins))*60, 0))}"[-2:]
timestamp = f"{mins}:{secs}"
if start < current_end and start > current_start:
# this means it is a continuation of the previous sentence
text_content[-1][0] = text_content[-1][0].split(context[:10])[0]
text_content.append([f"[{timestamp}] {context.capitalize()}", url])
else:
text_content.append(["xxLINEBREAKxx", ""])
text_content.append([f"[{timestamp}] {context}", url])
current_start = start
current_end = end
html_text = ""
for text, url in text_content:
if text == "xxLINEBREAKxx":
html_text += "<br>"
else:
html_text += f"<small><a href={url}>{text.strip()}... </a></small>"
print(text)
html = f"""
<div class="container-fluid">
<div class="row align-items-start">
<div class="col-md-4 col-sm-4">
<div class="position-relative">
<a href={urls[0]}><img src={thumbnail} class="img-fluid" style="width: 192px; height: 106px"></a>
</div>
</div>
<div class="col-md-8 col-sm-8">
<h2>{title}</h2>
</div>
<div>
{html_text}
<br><br>
"""
return st.markdown(html, unsafe_allow_html=True)
st.write("""
# FilledStacks Search
""")
st.info("""
Ask a question about the FilledStacks YouTube Channel
""")
st.markdown("""
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.0.0/dist/css/bootstrap.min.css" integrity="sha384-Gn5384xqQ1aoWXA+058RXPxPg6fy4IWvTNh0E263XmFcJlSAwiGgFAW/dAiS6JXm" crossorigin="anonymous">
""", unsafe_allow_html=True)
query = st.text_input("", "", placeholder="e.g.: how does stacked work?")
if query != "":
print(f"query: {query}")
matches = make_query(
query, retriever, top_k=5,
)
# if st.session_state.summarize:
# if OPENAI_KEY is not None:
# prompt = get_prompt(matches)
# res = openai.Completion.create(
# engine='text-davinci-003',
# prompt=prompt,
# temperature=0,
# max_tokens=300,
# top_p=1,
# frequency_penalty=0,
# presence_penalty=0,
# stop=".",
# )
# summary = res['choices'][0]['text'].strip()
# st.info(f"Summary:\n{summary}")
# else:
# st.info("Please enter your OpenAI key to generate a summary")
results = {}
order = []
for context in matches:
video_id = context['metadata']['url'].split('/')[-1]
if video_id not in results:
results[video_id] = {
'title': context['metadata']['title'],
'urls': [f"{context['metadata']['url']}?t={int(context['metadata']['start'])}"],
'contexts': [context['metadata']['text']],
'starts': [int(context['metadata']['start'])],
'ends': [int(context['metadata']['end'])]
}
order.append(video_id)
else:
results[video_id]['urls'].append(
f"{context['metadata']['url']}?t={int(context['metadata']['start'])}"
)
results[video_id]['contexts'].append(
context['metadata']['text']
)
results[video_id]['starts'].append(int(context['metadata']['start']))
results[video_id]['ends'].append(int(context['metadata']['end']))
# now display cards
for video_id in order:
card(
thumbnail=f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg",
title=results[video_id]['title'],
urls=results[video_id]['urls'],
contexts=results[video_id]['contexts'],
starts=results[video_id]['starts'],
ends=results[video_id]['ends']
) |