| import streamlit as st |
| from ai_config_faiss import get_ai_assistant |
| from ttv_web_scraper import db_load_metadata_sets |
| import json |
| from datetime import datetime |
| import os |
| import base64 |
|
|
| |
| if 'results' not in st.session_state: |
| st.session_state.results = None |
| if 'where' not in st.session_state: |
| st.session_state.where = {} |
| if 'num_results' not in st.session_state: |
| st.session_state.num_results = 3 |
| if 'favorites' not in st.session_state: |
| st.session_state.favorites = {} |
| if 'show_filters' not in st.session_state: |
| st.session_state.show_filters = True |
| |
| for filter_type in ['company', 'speaker', 'subjects']: |
| if f'selected_{filter_type}' not in st.session_state: |
| st.session_state[f'selected_{filter_type}'] = [] |
|
|
|
|
| @st.cache_resource |
| def get_assistant(): |
| return get_ai_assistant() |
|
|
|
|
| def format_timestamp(timestamp): |
| try: |
| time = datetime.strptime(timestamp, "%H:%M:%S") |
| return time.strftime("%M:%S") |
| except ValueError: |
| return timestamp |
|
|
|
|
| def get_file_content(file_path): |
| if os.path.exists(file_path): |
| with open(file_path, "rb") as file: |
| return file.read() |
| return None |
|
|
|
|
| def create_markdown_download_link(markdown_content): |
| b64 = base64.b64encode(markdown_content.encode()).decode() |
| return f'<a href="data:text/markdown;base64,{b64}" download="favorites.md">Download Favorites</a>' |
|
|
|
|
| def update_filter(filter_type, item): |
| if item in st.session_state[f'selected_{filter_type}']: |
| st.session_state[f'selected_{filter_type}'].remove(item) |
| else: |
| st.session_state[f'selected_{filter_type}'].append(item) |
| update_where() |
|
|
|
|
| def update_where(): |
| st.session_state.where = {} |
| for filter_type in ['company', 'speaker', 'subjects']: |
| if st.session_state[f'selected_{filter_type}']: |
| st.session_state.where[filter_type] = st.session_state[f'selected_{filter_type}'] |
|
|
|
|
| def toggle_show_filters(): |
| st.session_state.show_filters = not st.session_state.show_filters |
|
|
|
|
| def update_num_results(): |
| st.session_state.num_results = st.session_state.num_results_slider |
|
|
|
|
| def submit_query(): |
| if not st.session_state.where: |
| st.warning("Please select at least one filter before submitting.") |
| return |
|
|
| assistant = get_assistant() |
| with st.spinner("Thinking..."): |
| response = assistant.query("", num_results=st.session_state.num_results, filters=st.session_state.where) |
|
|
| try: |
| st.session_state.results = json.loads(response) |
| except json.JSONDecodeError: |
| st.error("Failed to parse the response. Please try again.") |
|
|
|
|
| def update_favorite(result_id): |
| result = next((r for r in st.session_state.results if r['id'] == result_id), None) |
| if result: |
| result['favorite'] = not result['favorite'] |
| if result['favorite']: |
| st.session_state.favorites[result_id] = result |
| else: |
| st.session_state.favorites.pop(result_id, None) |
|
|
|
|
| def clear_favorites(): |
| st.session_state.favorites.clear() |
| st.success("All favorites have been cleared.") |
|
|
|
|
| def save_favorites(): |
| if st.session_state.favorites: |
| markdown_content = "# Favorites\n\n" |
| for fav in st.session_state.favorites.values(): |
| markdown_content += f"## {fav['metadata']['title']}\n\n" |
| markdown_content += f"**Speaker:** {fav['metadata']['speaker']} ({fav['metadata']['company']})\n\n" |
| markdown_content += f"**Date:** {fav['metadata']['date']}\n\n" |
| markdown_content += f"**Time:** {format_timestamp(fav['metadata']['start_timestamp'])} - {format_timestamp(fav['metadata']['end_timestamp'])}\n\n" |
| markdown_content += f"**Transcript:** {fav['content']}\n\n" |
| play_link = fav['metadata']['play'] |
| modified_play_link = f"{play_link}&controls=1&showinfo=0&modestbranding=1" |
| markdown_content += f"**Video Link:** [{play_link}]({modified_play_link})\n\n" |
| if fav['metadata']['subjects']: |
| markdown_content += f"**Subjects:** {', '.join(fav['metadata']['subjects'])}\n\n" |
| markdown_content += "---\n\n" |
| st.markdown(create_markdown_download_link(markdown_content), unsafe_allow_html=True) |
| else: |
| st.warning("No favorites selected.") |
|
|
|
|
| def display_result(result, favorite_tab=False): |
| st.markdown(f"### {result['metadata']['title']}") |
| col1, col2 = st.columns([3, 2]) |
| with col1: |
| st.markdown(f"**Speaker:** {result['metadata']['speaker']} ({result['metadata']['company']})") |
| st.markdown(f"**Date:** {result['metadata']['date']}") |
| st.markdown("**Transcript:**") |
| st.markdown(result['content']) |
| with col2: |
| start_time = format_timestamp(result['metadata']['start_timestamp']) |
| end_time = format_timestamp(result['metadata']['end_timestamp']) |
| st.markdown(f"**Time:** {start_time} - {end_time}") |
| play_url = result['metadata']['play'] |
| if play_url: |
| st.components.v1.iframe(src=play_url, width=300, height=169, scrolling=True) |
| else: |
| st.warning("No video found") |
| if 'download' in result['metadata']: |
| download_path = result['metadata']['download'] |
| file_name = os.path.basename(download_path) |
| file_content = get_file_content(download_path) |
| if file_content: |
| prefix = "fav_dl_" if favorite_tab else "dl_" |
| st.download_button(label="Download Clip", data=file_content, file_name=file_name, mime="video/mp4", key=f"{prefix}{result['id']}") |
| else: |
| st.warning(f"Clip file not found: {file_name}") |
| if result['metadata']['subjects']: |
| st.markdown("**Subjects:**") |
| subject_tags = ' '.join([f"<span style='background-color: #f0f0f0; color:black; padding: 2px 6px; margin: 2px; border-radius: 10px;'>{subject}</span>" for subject in result['metadata']['subjects']]) |
| st.markdown(subject_tags, unsafe_allow_html=True) |
| favorite_key = f"fav_{favorite_tab}_{result['id']}" |
| st.checkbox("Favorite", value=result['favorite'], key=favorite_key, on_change=update_favorite, args=(result['id'],)) |
| st.markdown("---") |
|
|
|
|
| def main(): |
| st.title("Telecom TV Video Expert") |
| st.markdown("Trained on data from [here](https://www.telecomtv.com/content/dsp-leaders-forum-videos/)") |
|
|
| _, _, companies, sentiments, subjects = db_load_metadata_sets() |
|
|
| tab1, tab2 = st.tabs(["Search", "Favorites"]) |
|
|
| with tab1: |
| st.header("Filter Options") |
| st.checkbox("Show Filters", value=st.session_state.show_filters, on_change=toggle_show_filters) |
|
|
| if st.session_state.show_filters: |
| col1, col2, col3 = st.columns(3) |
| for filter_type, items in [('company', companies.keys()), ('speaker', set().union(*companies.values())), ('subjects', subjects)]: |
| with locals()[f'col{["company", "speaker", "subjects"].index(filter_type) + 1}']: |
| st.subheader(filter_type.capitalize()) |
| for item in sorted(items): |
| st.checkbox(item, key=f'{filter_type}_{item}', |
| value=item in st.session_state[f'selected_{filter_type}'], |
| on_change=update_filter, |
| args=(filter_type, item)) |
|
|
| st.slider("Number of relevant transcript excerpts to show:", min_value=1, max_value=500, value=st.session_state.num_results, step=1, key='num_results_slider', on_change=update_num_results) |
| st.button("Submit", on_click=submit_query) |
|
|
| if st.session_state.results: |
| for result in st.session_state.results: |
| result['favorite'] = result['id'] in st.session_state.favorites |
| display_result(result) |
|
|
| with tab2: |
| st.header("Favorites") |
| col1, col2 = st.columns(2) |
| with col1: |
| st.button("Save Favorites", on_click=save_favorites) |
| with col2: |
| st.button("Clear Favorites", on_click=clear_favorites) |
|
|
| for fav in st.session_state.favorites.values(): |
| display_result(fav, favorite_tab=True) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|