import gradio as gr import psycopg2 import numpy as np import pandas as pd from datetime import datetime, timedelta, date from urllib.parse import quote import re import os # Connect to the database def connect_db(): return psycopg2.connect( dbname=os.getenv("credDB_NAME"), user=os.getenv("credDB_USER"), password=os.getenv("credDB_PASS"), host=os.getenv("credDB_HOST"), port=os.getenv("credDB_PORT") ) # Fetch meeting types def get_meeting_types(): conn = connect_db() cur = conn.cursor() cur.execute("SELECT DISTINCT meeting_type FROM mopacdb_webcasts.webcasts;") meeting_types = [row[0] for row in cur.fetchall()] cur.close() conn.close() return meeting_types # no "No Filter" # Format seconds as HH:MM:SS def format_seconds(seconds): return str(timedelta(seconds=seconds)).split('.')[0] if seconds else None def add_duration_to_datetime(datetime_str, duration_str): # Parse the input datetime string into a datetime object dt = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S") # Split the duration string into hours, minutes, and seconds hours, minutes, seconds = map(int, duration_str.split(':')) # Create a timedelta object from the duration duration = timedelta(hours=hours, minutes=minutes, seconds=seconds) # Add the duration to the original datetime new_datetime = dt + duration # Return the new datetime as a string in the original format return new_datetime.strftime("%Y-%m-%d %H:%M:%S") def convert_to_url_string(date_str, loc): # Parse the input string into a datetime object dt = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S") # Format the datetime object to ISO 8601 format iso_format = dt.strftime("%Y-%m-%dT%H:%M:%S") # Add timezone offset (assuming UTC+00:00) iso_format_with_tz = iso_format + "+00:00" # URL encode the string url_encoded = quote(iso_format_with_tz) # Return the final URL query string if loc.lower() == "start": return f"?in={url_encoded}" if loc.lower() == "end": return f"&out={url_encoded}" # Search function def search_transcripts(query, meeting_type, start_date, end_date): conn = connect_db() cur = conn.cursor() sql = """ SELECT w.meeting_type, w.meeting_date, t.start_time, t.end_time, REPLACE(t.speaker, 'speaker ', ''), t.transcript, w.url, w.time_origin, t.id FROM mopacdb_webcasts.transcripts t JOIN mopacdb_webcasts.webcasts w ON t.webcast_id = w.id WHERE t.transcript ILIKE %s """ params = [f"%{query}%"] if meeting_type: # only apply if user actually picked something sql += " AND w.meeting_type = ANY(%s)" params.append(meeting_type) if start_date and end_date: sql += " AND w.meeting_date BETWEEN %s AND %s" params.extend([start_date, end_date]) elif start_date: sql += " AND w.meeting_date >= %s" params.append(start_date) elif end_date: sql += " AND w.meeting_date <= %s" params.append(end_date) # Add ORDER BY clause after all the filters are applied sql += " ORDER BY w.meeting_date DESC, t.start_time ASC" cur.execute(sql, params) rows = cur.fetchall() cur.close() conn.close() # Return both the full data (including identifier) and the displayed data full_data = [ [row[0], row[1], format_seconds(row[2]), format_seconds(row[3]), row[4], row[5], row[6], row[7], row[8]] for row in rows ] displayed_data = [row[:-3] for row in full_data] # Remove identifier column for display return displayed_data, full_data # Return both datasets # Function to highlight query in the transcript def highlight_query(query, transcript): if query and transcript: # Simulate highlight by making words uppercase or surrounding them with markers highlighted_text = re.sub(f"({re.escape(query)})", r'🔶\1🔶', transcript, flags=re.IGNORECASE) return highlighted_text return transcript # Function to identify the hidden table row index def get_matching_index(full_data: list, selected_row_values: list): """ Returns the index of the row in `full_data` that matches the `selected_row_values`. Converts the selected meeting_date to a date object for comparison. """ # Convert the selected meeting_date (which is in string format) to a date object selected_row_values[1] = datetime.strptime(selected_row_values[1], "%Y-%m-%d").date() # Find and return the matching row index return next( (i for i, row in enumerate(full_data) if [row[0], row[1], row[2], row[3], row[4], row[5]] == selected_row_values), None ) # Callback for transcript selection def df_select_callback(full_data: list, evt: gr.SelectData, query = ""): selected_row_values = evt.row_value if evt.row_value else None # Use the helper function to find the matching row index matching_index = get_matching_index(full_data, selected_row_values) if matching_index is not None: transcript = str(full_data[matching_index][5]) # Extract transcript return highlight_query(query, transcript) return "" # Callback for video iframe def df_video_callback(full_data: list, evt: gr.SelectData): selected_row_values = evt.row_value if evt.row_value else None # Use the helper function to find the matching row index matching_index = get_matching_index(full_data, selected_row_values) if matching_index is not None: identifier = str(full_data[matching_index][6]) # Access identifier from stored data if identifier.lower() == "replace": return "
Video currently not availabile for this transcript due to GLA archiving policies.
" #Get Origin Timestamp timestamp = str(full_data[matching_index][7]) #Add Start Time ts_start = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][2])), "start") ts_end = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][3])), "end") return f"" return "" def df_video_button_callback(full_data: list, evt: gr.SelectData): selected_row_values = evt.row_value if evt.row_value else None # Use the helper function to find the matching row index matching_index = get_matching_index(full_data, selected_row_values) if matching_index is not None: identifier = str(full_data[matching_index][6]) # Access identifier from stored data if identifier.lower() == "replace": return "" #Get Origin Timestamp timestamp = str(full_data[matching_index][7]) #Add Start Time ts_start = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][2])), "start") ts_end = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][3])), "end") #Set URLs url_1 = f"https://player.london.gov.uk/Player/Index/{identifier}" url_2 = f"https://player.london.gov.uk/Player/Index/{identifier}{ts_start}{ts_end}" return f"""
""" return "" # Callback for transcript selection def df_prior_callback(full_data: list, evt: gr.SelectData): selected_row_values = evt.row_value if evt.row_value else None # Use the helper function to find the matching row index matching_index = get_matching_index(full_data, selected_row_values) if matching_index is not None: transcript_id = int(full_data[matching_index][-1]) # Access identifier from stored data conn = connect_db() cur = conn.cursor() sql = """ WITH target_transcript AS ( SELECT webcast_id, start_time FROM mopacdb_webcasts.transcripts WHERE id = %s ) SELECT transcript FROM mopacdb_webcasts.transcripts WHERE webcast_id = (SELECT webcast_id FROM target_transcript) AND start_time < (SELECT start_time FROM target_transcript) ORDER BY start_time DESC LIMIT 1; """ cur.execute(sql, (transcript_id,)) prior_transcript = cur.fetchone() cur.close() conn.close() # Check if rows are fetched if not prior_transcript: return "" # No data, return None to keep button hidden return prior_transcript[0] # Callback for transcript selection def df_posterior_callback(full_data: list, evt: gr.SelectData): selected_row_values = evt.row_value if evt.row_value else None # Use the helper function to find the matching row index matching_index = get_matching_index(full_data, selected_row_values) if matching_index is not None: transcript_id = int(full_data[matching_index][-1]) # Access identifier from stored data conn = connect_db() cur = conn.cursor() sql = """ WITH target_transcript AS ( SELECT webcast_id, start_time FROM mopacdb_webcasts.transcripts WHERE id = %s ) SELECT transcript FROM mopacdb_webcasts.transcripts WHERE webcast_id = (SELECT webcast_id FROM target_transcript) AND start_time > (SELECT start_time FROM target_transcript) ORDER BY start_time ASC LIMIT 1; """ cur.execute(sql, (transcript_id,)) prior_transcript = cur.fetchone() cur.close() conn.close() # Check if rows are fetched if not prior_transcript: return "" # No data, return None to keep button hidden return prior_transcript[0] def df_transcript_callback(full_data: list, evt: gr.SelectData): selected_row_values = evt.row_value if evt.row_value else None # Use the helper function to find the matching row index matching_index = get_matching_index(full_data, selected_row_values) if matching_index is not None: transcript_id = int(full_data[matching_index][-1]) # Access identifier from stored data conn = connect_db() cur = conn.cursor() sql = """ SELECT start_time, end_time, speaker, transcript FROM mopacdb_webcasts.transcripts WHERE webcast_id = ( SELECT webcast_id FROM mopacdb_webcasts.transcripts WHERE id = %s ) ORDER BY start_time ASC; """ cur.execute(sql, (transcript_id,)) rows = cur.fetchall() cur.close() conn.close() # Check if rows are fetched if not rows: return None # No data, return None to keep button hidden # Create a DataFrame and save it as a CSV df = pd.DataFrame(rows, columns=["start_time", "end_time", "speaker", "transcript"]) #Create Filename conn = connect_db() cur = conn.cursor() sql = """ SELECT CONCAT(meeting_type, ' - ', meeting_date) AS meeting_details FROM mopacdb_webcasts.webcasts WHERE id = ( SELECT webcast_id FROM mopacdb_webcasts.transcripts WHERE id = %s ); """ cur.execute(sql, (transcript_id,)) filename = cur.fetchone()[0] cur.close() conn.close() csv_file_path = str(filename)+".csv" #Set File path df.to_csv(csv_file_path, index=False) # Save to CSV return csv_file_path # This should show the button # Gradio interface function def gradio_interface(query, meeting_type, start_date, end_date): if start_date: start_date = datetime.fromtimestamp(start_date) if isinstance(start_date, float) else datetime.strptime(start_date, "%Y-%m-%d") if end_date: end_date = datetime.fromtimestamp(end_date) if isinstance(end_date, float) else datetime.strptime(end_date, "%Y-%m-%d") displayed_results, full_results = search_transcripts(query, meeting_type, start_date, end_date) no_results_message = "No results found. Please try a different search." if not displayed_results else "" return displayed_results, full_results, no_results_message #Set custom CSS custom_css = """ """ # Build the Gradio app with gr.Blocks() as app: gr.Markdown( """
MOPAC | DS  - 🔍 Webcast Transcript Search Tool
""", sanitize_html=False ) #Load TextBox CSS gr.HTML(custom_css) meeting_types = get_meeting_types() with gr.Row(): query = gr.Textbox(label="Search Query (optional)") meeting_type = gr.Dropdown(label="Meeting Type (optional)", choices=meeting_types, multiselect=True) start_date = gr.DateTime(label="Start Date (optional)", include_time=False, info="Enter Date in YYYY-MM-DD Format") end_date = gr.DateTime(label="End Date (optional)", include_time=False, info="Enter Date in YYYY-MM-DD Format", interactive = True, value=date.today().isoformat()) search_btn = gr.Button("Search") # Table for results (without "Identifier" column) results_table = gr.DataFrame( headers=["Meeting Type", "Date", "Start Time", "End Time", "Speaker", "Transcript"], datatype=["str", "date", "str", "str", "str", "str"], interactive=False ) no_results_text = gr.Markdown() # Store full data (including "Identifier") separately full_results_state = gr.State([]) # Search button updates the table and stores full results search_btn.click(fn=gradio_interface, inputs=[query, meeting_type, start_date, end_date], outputs=[results_table, full_results_state, no_results_text]) #Add Carriage Return query.submit(fn=gradio_interface, inputs=[query, meeting_type, start_date, end_date], outputs=[results_table, full_results_state, no_results_text]) with gr.Row(): with gr.Column(scale=2): transcript_prior = gr.Textbox(label="Prior Transcript", interactive=False, elem_id="purple-textbox") selected_transcript = gr.Textbox(label="Selected Transcript", info ="Search Queries will appear demarcated using orange markers (🔶).", interactive=False, elem_id="yellow-textbox") transcript_posterior = gr.Textbox(label="Posterior Transcript", interactive=False, elem_id="purple-textbox") gr.Markdown("**The full transcript of any selected speech segment will appear available for download here...**") transcript_bttn = gr.File(label="Download Full Transcript") with gr.Column(scale=3): video_iframe = gr.HTML("") video_bttn_full = gr.HTML("") # Update transcript and video using the stored full data results_table.select(fn=df_select_callback, inputs=[full_results_state, query], outputs=[selected_transcript]) results_table.select(fn=df_prior_callback, inputs=[full_results_state], outputs=[transcript_prior]) results_table.select(fn=df_posterior_callback, inputs=[full_results_state], outputs=[transcript_posterior]) results_table.select(fn=df_video_callback, inputs=[full_results_state], outputs=[video_iframe]) results_table.select(fn=df_video_button_callback, inputs=[full_results_state], outputs=[video_bttn_full]) results_table.select(fn=df_transcript_callback, inputs=[full_results_state], outputs=[transcript_bttn]) app.launch()