import streamlit as st import pinecone from sentence_transformers import SentenceTransformer import logging PINECONE_KEY = st.secrets["PINECONE_KEY"] # app.pinecone.io INDEX_ID = 'youtube-search' st.markdown("", unsafe_allow_html=True) @st.experimental_singleton def init_pinecone(): pinecone.init(api_key=PINECONE_KEY, environment="us-west1-gcp") return pinecone.Index(INDEX_ID) @st.experimental_singleton def init_retriever(): return SentenceTransformer("multi-qa-mpnet-base-dot-v1") def make_query(query, retriever, top_k=10, include_values=True, include_metadata=True, filter=None): xq = retriever.encode([query]).tolist() logging.info(f"Query: {query}") attempt = 0 while attempt < 3: try: xc = st.session_state.index.query( xq, top_k=top_k, include_values=include_values, include_metadata=include_metadata, filter=filter ) matches = xc['matches'] break except: # force reload pinecone.init(api_key=PINECONE_KEY, environment="us-west1-gcp") st.session_state.index = pinecone.Index(INDEX_ID) attempt += 1 matches = [] if len(matches) == 0: logging.error(f"Query failed") return matches st.session_state.index = init_pinecone() retriever = init_retriever() def card(thumbnail: str, title: str, urls: list, contexts: list, starts: list, ends: list, publication: str): meta = [(e, s, u, c) for e, s, u, c in zip(ends, starts, urls, contexts)] meta.sort(reverse=False) text_content = [] current_start = 0 current_end = 0 for end, start, url, context in meta: # reformat seconds to timestamp time = start / 60 mins = f"0{int(time)}"[-2:] secs = f"0{int(round((time - int(mins))*60, 0))}"[-2:] timestamp = f"{mins}:{secs}" if start < current_end and start > current_start: # this means it is a continuation of the previous sentence text_content[-1][0] = text_content[-1][0].split(context[:10])[0] text_content.append([f"[{timestamp}] {context.capitalize()}", url]) else: text_content.append(["xxLINEBREAKxx", ""]) text_content.append([f"[{timestamp}] {context}", url]) current_start = start current_end = end html_text = "" for text, url in text_content: if text == "xxLINEBREAKxx": html_text += "
" else: html_text += f"{text.strip()}... " print(text) html = f"""

{title}

{publication}

{html_text}

""" return st.markdown(html, unsafe_allow_html=True) publication_map = { 'los angeles times': 'los angeles times', 'breitbart': 'breitbart', 'vox': 'vox', 'cnn': 'cnn', 'new york post': 'new york post', 'new york times': 'new york times' } st.write(""" # Example """) st.info(""" YouTube search built as [explained here](https://pinecone.io/learn/openai-whisper)! *The current search scope is limited to a few videos talking about ML, NLP, and vector search*. Add requests for channels to include in the [*Community* tab](https://huggingface.co/spaces/jamescalam/ask-youtube/discussions). """) st.markdown(""" """, unsafe_allow_html=True) query = st.text_input("Search!", "") with st.expander("Advanced Options"): publication_options = st.multiselect( 'Publications to Search', ['los angeles times','breitbart','vox','new york post','cnn','new york times'], ['los angeles times','breitbart','vox','new york post','cnn','new york times'] ) if query != "": publications = [publication_map[name] for name in publication_options] print(f"query: {query}") filter = {'$and': [ {'publication': {'$in': publications}} # {'category': {'$in': ['longform', 'newspaper']}} ] } matches = make_query( query, retriever, top_k=5, filter=filter ) results = {} order = [] for context in matches: video_id = context['metadata']['url'].split('/')[-1] if video_id not in results: results[video_id] = { 'title': context['metadata']['title'], 'thumbnail': context['metadata']['thumbnail'], 'urls': [f"{context['metadata']['url']}"], 'contexts': [context['metadata']['text']], 'starts': [int(context['metadata']['start_second'])], 'ends': [int(context['metadata']['end_second'])], 'publication': context['metadata']['publication'], 'category': context['metadata']['category'] } order.append(video_id) else: results[video_id]['urls'].append( f"{context['metadata']['url']}" ) results[video_id]['contexts'].append( context['metadata']['text'] ) results[video_id]['starts'].append(int(context['metadata']['start_second'])) results[video_id]['ends'].append(int(context['metadata']['end_second'])) # now display cards for video_id in order: card( thumbnail=results[video_id]['thumbnail'], title=results[video_id]['title'], urls=results[video_id]['urls'], contexts=results[video_id]['contexts'], starts=results[video_id]['starts'], ends=results[video_id]['ends'], publication=results[video_id]['publication'] )