Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import psycopg2 | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime, timedelta, date | |
| from urllib.parse import quote | |
| import re | |
| import os | |
| # Connect to the database | |
| def connect_db(): | |
| return psycopg2.connect( | |
| dbname=os.getenv("credDB_NAME"), | |
| user=os.getenv("credDB_USER"), | |
| password=os.getenv("credDB_PASS"), | |
| host=os.getenv("credDB_HOST"), | |
| port=os.getenv("credDB_PORT") | |
| ) | |
| # Fetch meeting types | |
| def get_meeting_types(): | |
| conn = connect_db() | |
| cur = conn.cursor() | |
| cur.execute("SELECT DISTINCT meeting_type FROM mopacdb_webcasts.webcasts;") | |
| meeting_types = [row[0] for row in cur.fetchall()] | |
| cur.close() | |
| conn.close() | |
| return meeting_types # no "No Filter" | |
| # Format seconds as HH:MM:SS | |
| def format_seconds(seconds): | |
| return str(timedelta(seconds=seconds)).split('.')[0] if seconds else None | |
| def add_duration_to_datetime(datetime_str, duration_str): | |
| # Parse the input datetime string into a datetime object | |
| dt = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S") | |
| # Split the duration string into hours, minutes, and seconds | |
| hours, minutes, seconds = map(int, duration_str.split(':')) | |
| # Create a timedelta object from the duration | |
| duration = timedelta(hours=hours, minutes=minutes, seconds=seconds) | |
| # Add the duration to the original datetime | |
| new_datetime = dt + duration | |
| # Return the new datetime as a string in the original format | |
| return new_datetime.strftime("%Y-%m-%d %H:%M:%S") | |
| def convert_to_url_string(date_str, loc): | |
| # Parse the input string into a datetime object | |
| dt = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S") | |
| # Format the datetime object to ISO 8601 format | |
| iso_format = dt.strftime("%Y-%m-%dT%H:%M:%S") | |
| # Add timezone offset (assuming UTC+00:00) | |
| iso_format_with_tz = iso_format + "+00:00" | |
| # URL encode the string | |
| url_encoded = quote(iso_format_with_tz) | |
| # Return the final URL query string | |
| if loc.lower() == "start": | |
| return f"?in={url_encoded}" | |
| if loc.lower() == "end": | |
| return f"&out={url_encoded}" | |
| # Search function | |
| def search_transcripts(query, meeting_type, start_date, end_date): | |
| conn = connect_db() | |
| cur = conn.cursor() | |
| sql = """ | |
| SELECT w.meeting_type, w.meeting_date, t.start_time, t.end_time, | |
| REPLACE(t.speaker, 'speaker ', ''), t.transcript, w.url, w.time_origin, t.id | |
| FROM mopacdb_webcasts.transcripts t | |
| JOIN mopacdb_webcasts.webcasts w ON t.webcast_id = w.id | |
| WHERE t.transcript ILIKE %s | |
| """ | |
| params = [f"%{query}%"] | |
| if meeting_type: # only apply if user actually picked something | |
| sql += " AND w.meeting_type = ANY(%s)" | |
| params.append(meeting_type) | |
| if start_date and end_date: | |
| sql += " AND w.meeting_date BETWEEN %s AND %s" | |
| params.extend([start_date, end_date]) | |
| elif start_date: | |
| sql += " AND w.meeting_date >= %s" | |
| params.append(start_date) | |
| elif end_date: | |
| sql += " AND w.meeting_date <= %s" | |
| params.append(end_date) | |
| # Add ORDER BY clause after all the filters are applied | |
| sql += " ORDER BY w.meeting_date DESC, t.start_time ASC" | |
| cur.execute(sql, params) | |
| rows = cur.fetchall() | |
| cur.close() | |
| conn.close() | |
| # Return both the full data (including identifier) and the displayed data | |
| full_data = [ | |
| [row[0], row[1], format_seconds(row[2]), format_seconds(row[3]), row[4], row[5], row[6], row[7], row[8]] | |
| for row in rows | |
| ] | |
| displayed_data = [row[:-3] for row in full_data] # Remove identifier column for display | |
| return displayed_data, full_data # Return both datasets | |
| # Function to highlight query in the transcript | |
| def highlight_query(query, transcript): | |
| if query and transcript: | |
| # Simulate highlight by making words uppercase or surrounding them with markers | |
| highlighted_text = re.sub(f"({re.escape(query)})", r'๐ถ\1๐ถ', transcript, flags=re.IGNORECASE) | |
| return highlighted_text | |
| return transcript | |
| # Function to identify the hidden table row index | |
| def get_matching_index(full_data: list, selected_row_values: list): | |
| """ | |
| Returns the index of the row in `full_data` that matches the `selected_row_values`. | |
| Converts the selected meeting_date to a date object for comparison. | |
| """ | |
| # Convert the selected meeting_date (which is in string format) to a date object | |
| selected_row_values[1] = datetime.strptime(selected_row_values[1], "%Y-%m-%d").date() | |
| # Find and return the matching row index | |
| return next( | |
| (i for i, row in enumerate(full_data) | |
| if [row[0], row[1], row[2], row[3], row[4], row[5]] == selected_row_values), | |
| None | |
| ) | |
| # Callback for transcript selection | |
| def df_select_callback(full_data: list, evt: gr.SelectData, query = ""): | |
| selected_row_values = evt.row_value if evt.row_value else None | |
| # Use the helper function to find the matching row index | |
| matching_index = get_matching_index(full_data, selected_row_values) | |
| if matching_index is not None: | |
| transcript = str(full_data[matching_index][5]) # Extract transcript | |
| return highlight_query(query, transcript) | |
| return "" | |
| # Callback for video iframe | |
| def df_video_callback(full_data: list, evt: gr.SelectData): | |
| selected_row_values = evt.row_value if evt.row_value else None | |
| # Use the helper function to find the matching row index | |
| matching_index = get_matching_index(full_data, selected_row_values) | |
| if matching_index is not None: | |
| identifier = str(full_data[matching_index][6]) # Access identifier from stored data | |
| if identifier.lower() == "replace": | |
| return "<center><span style='color: red;'>Video currently not availabile for this transcript due to GLA archiving policies.</span></center>" | |
| #Get Origin Timestamp | |
| timestamp = str(full_data[matching_index][7]) | |
| #Add Start Time | |
| ts_start = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][2])), "start") | |
| ts_end = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][3])), "end") | |
| return f"<iframe src='https://player.london.gov.uk/Player/Index/{identifier}{ts_start}{ts_end}' width='100%' height='360' frameborder='0' scrolling='no' allowfullscreen allow='encrypted-media; autoplay; fullscreen'></iframe>" | |
| return "" | |
| def df_video_button_callback(full_data: list, evt: gr.SelectData): | |
| selected_row_values = evt.row_value if evt.row_value else None | |
| # Use the helper function to find the matching row index | |
| matching_index = get_matching_index(full_data, selected_row_values) | |
| if matching_index is not None: | |
| identifier = str(full_data[matching_index][6]) # Access identifier from stored data | |
| if identifier.lower() == "replace": | |
| return "" | |
| #Get Origin Timestamp | |
| timestamp = str(full_data[matching_index][7]) | |
| #Add Start Time | |
| ts_start = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][2])), "start") | |
| ts_end = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][3])), "end") | |
| #Set URLs | |
| url_1 = f"https://player.london.gov.uk/Player/Index/{identifier}" | |
| url_2 = f"https://player.london.gov.uk/Player/Index/{identifier}{ts_start}{ts_end}" | |
| return f""" | |
| <div style="display: flex; justify-content: space-between; margin: 20px;"> | |
| <a href="{url_1}" target="_blank" style="text-decoration: none; width: 48%;"> | |
| <button class="lg secondary svelte-1ixn6qd" | |
| style="width: 100%; text-align: center; padding: 10px; font-size: 16px;"> | |
| Access the Full Video | |
| </button> | |
| </a> | |
| <a href="{url_2}" target="_blank" style="text-decoration: none; width: 48%;"> | |
| <button class="lg secondary svelte-1ixn6qd" | |
| style="width: 100%; text-align: center; padding: 10px; font-size: 16px;"> | |
| Share this Clip | |
| </button> | |
| </a> | |
| </div> | |
| """ | |
| return "" | |
| # Callback for transcript selection | |
| def df_prior_callback(full_data: list, evt: gr.SelectData): | |
| selected_row_values = evt.row_value if evt.row_value else None | |
| # Use the helper function to find the matching row index | |
| matching_index = get_matching_index(full_data, selected_row_values) | |
| if matching_index is not None: | |
| transcript_id = int(full_data[matching_index][-1]) # Access identifier from stored data | |
| conn = connect_db() | |
| cur = conn.cursor() | |
| sql = """ | |
| WITH target_transcript AS ( | |
| SELECT webcast_id, start_time | |
| FROM mopacdb_webcasts.transcripts | |
| WHERE id = %s | |
| ) | |
| SELECT transcript | |
| FROM mopacdb_webcasts.transcripts | |
| WHERE webcast_id = (SELECT webcast_id FROM target_transcript) | |
| AND start_time < (SELECT start_time FROM target_transcript) | |
| ORDER BY start_time DESC | |
| LIMIT 1; | |
| """ | |
| cur.execute(sql, (transcript_id,)) | |
| prior_transcript = cur.fetchone() | |
| cur.close() | |
| conn.close() | |
| # Check if rows are fetched | |
| if not prior_transcript: | |
| return "" # No data, return None to keep button hidden | |
| return prior_transcript[0] | |
| # Callback for transcript selection | |
| def df_posterior_callback(full_data: list, evt: gr.SelectData): | |
| selected_row_values = evt.row_value if evt.row_value else None | |
| # Use the helper function to find the matching row index | |
| matching_index = get_matching_index(full_data, selected_row_values) | |
| if matching_index is not None: | |
| transcript_id = int(full_data[matching_index][-1]) # Access identifier from stored data | |
| conn = connect_db() | |
| cur = conn.cursor() | |
| sql = """ | |
| WITH target_transcript AS ( | |
| SELECT webcast_id, start_time | |
| FROM mopacdb_webcasts.transcripts | |
| WHERE id = %s | |
| ) | |
| SELECT transcript | |
| FROM mopacdb_webcasts.transcripts | |
| WHERE webcast_id = (SELECT webcast_id FROM target_transcript) | |
| AND start_time > (SELECT start_time FROM target_transcript) | |
| ORDER BY start_time ASC | |
| LIMIT 1; | |
| """ | |
| cur.execute(sql, (transcript_id,)) | |
| prior_transcript = cur.fetchone() | |
| cur.close() | |
| conn.close() | |
| # Check if rows are fetched | |
| if not prior_transcript: | |
| return "" # No data, return None to keep button hidden | |
| return prior_transcript[0] | |
| def df_transcript_callback(full_data: list, evt: gr.SelectData): | |
| selected_row_values = evt.row_value if evt.row_value else None | |
| # Use the helper function to find the matching row index | |
| matching_index = get_matching_index(full_data, selected_row_values) | |
| if matching_index is not None: | |
| transcript_id = int(full_data[matching_index][-1]) # Access identifier from stored data | |
| conn = connect_db() | |
| cur = conn.cursor() | |
| sql = """ | |
| SELECT start_time, end_time, speaker, transcript | |
| FROM mopacdb_webcasts.transcripts | |
| WHERE webcast_id = ( | |
| SELECT webcast_id | |
| FROM mopacdb_webcasts.transcripts | |
| WHERE id = %s | |
| ) | |
| ORDER BY start_time ASC; | |
| """ | |
| cur.execute(sql, (transcript_id,)) | |
| rows = cur.fetchall() | |
| cur.close() | |
| conn.close() | |
| # Check if rows are fetched | |
| if not rows: | |
| return None # No data, return None to keep button hidden | |
| # Create a DataFrame and save it as a CSV | |
| df = pd.DataFrame(rows, columns=["start_time", "end_time", "speaker", "transcript"]) | |
| #Create Filename | |
| conn = connect_db() | |
| cur = conn.cursor() | |
| sql = """ | |
| SELECT CONCAT(meeting_type, ' - ', meeting_date) AS meeting_details | |
| FROM mopacdb_webcasts.webcasts | |
| WHERE id = ( | |
| SELECT webcast_id | |
| FROM mopacdb_webcasts.transcripts | |
| WHERE id = %s | |
| ); | |
| """ | |
| cur.execute(sql, (transcript_id,)) | |
| filename = cur.fetchone()[0] | |
| cur.close() | |
| conn.close() | |
| csv_file_path = str(filename)+".csv" #Set File path | |
| df.to_csv(csv_file_path, index=False) # Save to CSV | |
| return csv_file_path # This should show the button | |
| # Gradio interface function | |
| def gradio_interface(query, meeting_type, start_date, end_date): | |
| if start_date: | |
| start_date = datetime.fromtimestamp(start_date) if isinstance(start_date, float) else datetime.strptime(start_date, "%Y-%m-%d") | |
| if end_date: | |
| end_date = datetime.fromtimestamp(end_date) if isinstance(end_date, float) else datetime.strptime(end_date, "%Y-%m-%d") | |
| displayed_results, full_results = search_transcripts(query, meeting_type, start_date, end_date) | |
| no_results_message = "<span style='color: red;'>No results found. Please try a different search.</span>" if not displayed_results else "" | |
| return displayed_results, full_results, no_results_message | |
| #Set custom CSS | |
| custom_css = """ | |
| <style> | |
| #purple-textbox textarea { | |
| background-color: #F3E5F5 !important; | |
| } | |
| #yellow-textbox textarea { | |
| background-color: #FFF2D1 !important; | |
| } | |
| textarea { | |
| font-family: Arial, sans-serif; | |
| font-size: 14px; | |
| white-space: pre-wrap; | |
| color: black; | |
| } | |
| </style> | |
| """ | |
| # Build the Gradio app | |
| with gr.Blocks() as app: | |
| gr.Markdown( | |
| """ | |
| <div style="background-color: #4B23C0; color: white; padding: 20px; text-align: left; font-size: 24px; font-weight: bold; margin: 0;"> | |
| MOPAC | DS - ๐ Webcast Transcript Search Tool | |
| </div> | |
| """, | |
| sanitize_html=False | |
| ) | |
| #Load TextBox CSS | |
| gr.HTML(custom_css) | |
| meeting_types = get_meeting_types() | |
| with gr.Row(): | |
| query = gr.Textbox(label="Search Query (optional)") | |
| meeting_type = gr.Dropdown(label="Meeting Type (optional)", choices=meeting_types, multiselect=True) | |
| start_date = gr.DateTime(label="Start Date (optional)", include_time=False, info="Enter Date in YYYY-MM-DD Format") | |
| end_date = gr.DateTime(label="End Date (optional)", include_time=False, info="Enter Date in YYYY-MM-DD Format", interactive = True, value=date.today().isoformat()) | |
| search_btn = gr.Button("Search") | |
| # Table for results (without "Identifier" column) | |
| results_table = gr.DataFrame( | |
| headers=["Meeting Type", "Date", "Start Time", "End Time", "Speaker", "Transcript"], | |
| datatype=["str", "date", "str", "str", "str", "str"], | |
| interactive=False | |
| ) | |
| no_results_text = gr.Markdown() | |
| # Store full data (including "Identifier") separately | |
| full_results_state = gr.State([]) | |
| # Search button updates the table and stores full results | |
| search_btn.click(fn=gradio_interface, inputs=[query, meeting_type, start_date, end_date], outputs=[results_table, full_results_state, no_results_text]) | |
| #Add Carriage Return | |
| query.submit(fn=gradio_interface, inputs=[query, meeting_type, start_date, end_date], outputs=[results_table, full_results_state, no_results_text]) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| transcript_prior = gr.Textbox(label="Prior Transcript", interactive=False, elem_id="purple-textbox") | |
| selected_transcript = gr.Textbox(label="Selected Transcript", | |
| info ="Search Queries will appear demarcated using orange markers (๐ถ).", | |
| interactive=False, elem_id="yellow-textbox") | |
| transcript_posterior = gr.Textbox(label="Posterior Transcript", interactive=False, elem_id="purple-textbox") | |
| gr.Markdown("**The full transcript of any selected speech segment will appear available for download here...**") | |
| transcript_bttn = gr.File(label="Download Full Transcript") | |
| with gr.Column(scale=3): | |
| video_iframe = gr.HTML("") | |
| video_bttn_full = gr.HTML("") | |
| # Update transcript and video using the stored full data | |
| results_table.select(fn=df_select_callback, inputs=[full_results_state, query], outputs=[selected_transcript]) | |
| results_table.select(fn=df_prior_callback, inputs=[full_results_state], outputs=[transcript_prior]) | |
| results_table.select(fn=df_posterior_callback, inputs=[full_results_state], outputs=[transcript_posterior]) | |
| results_table.select(fn=df_video_callback, inputs=[full_results_state], outputs=[video_iframe]) | |
| results_table.select(fn=df_video_button_callback, inputs=[full_results_state], outputs=[video_bttn_full]) | |
| results_table.select(fn=df_transcript_callback, inputs=[full_results_state], outputs=[transcript_bttn]) | |
| app.launch() |