import gradio as gr
import psycopg2
import numpy as np
import pandas as pd
from datetime import datetime, timedelta, date
from urllib.parse import quote
import re
import os
# Connect to the database
def connect_db():
return psycopg2.connect(
dbname=os.getenv("credDB_NAME"),
user=os.getenv("credDB_USER"),
password=os.getenv("credDB_PASS"),
host=os.getenv("credDB_HOST"),
port=os.getenv("credDB_PORT")
)
# Fetch meeting types
def get_meeting_types():
conn = connect_db()
cur = conn.cursor()
cur.execute("SELECT DISTINCT meeting_type FROM mopacdb_webcasts.webcasts;")
meeting_types = [row[0] for row in cur.fetchall()]
cur.close()
conn.close()
return meeting_types # no "No Filter"
# Format seconds as HH:MM:SS
def format_seconds(seconds):
return str(timedelta(seconds=seconds)).split('.')[0] if seconds else None
def add_duration_to_datetime(datetime_str, duration_str):
# Parse the input datetime string into a datetime object
dt = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
# Split the duration string into hours, minutes, and seconds
hours, minutes, seconds = map(int, duration_str.split(':'))
# Create a timedelta object from the duration
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
# Add the duration to the original datetime
new_datetime = dt + duration
# Return the new datetime as a string in the original format
return new_datetime.strftime("%Y-%m-%d %H:%M:%S")
def convert_to_url_string(date_str, loc):
# Parse the input string into a datetime object
dt = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
# Format the datetime object to ISO 8601 format
iso_format = dt.strftime("%Y-%m-%dT%H:%M:%S")
# Add timezone offset (assuming UTC+00:00)
iso_format_with_tz = iso_format + "+00:00"
# URL encode the string
url_encoded = quote(iso_format_with_tz)
# Return the final URL query string
if loc.lower() == "start":
return f"?in={url_encoded}"
if loc.lower() == "end":
return f"&out={url_encoded}"
# Search function
def search_transcripts(query, meeting_type, start_date, end_date):
conn = connect_db()
cur = conn.cursor()
sql = """
SELECT w.meeting_type, w.meeting_date, t.start_time, t.end_time,
REPLACE(t.speaker, 'speaker ', ''), t.transcript, w.url, w.time_origin, t.id
FROM mopacdb_webcasts.transcripts t
JOIN mopacdb_webcasts.webcasts w ON t.webcast_id = w.id
WHERE t.transcript ILIKE %s
"""
params = [f"%{query}%"]
if meeting_type: # only apply if user actually picked something
sql += " AND w.meeting_type = ANY(%s)"
params.append(meeting_type)
if start_date and end_date:
sql += " AND w.meeting_date BETWEEN %s AND %s"
params.extend([start_date, end_date])
elif start_date:
sql += " AND w.meeting_date >= %s"
params.append(start_date)
elif end_date:
sql += " AND w.meeting_date <= %s"
params.append(end_date)
# Add ORDER BY clause after all the filters are applied
sql += " ORDER BY w.meeting_date DESC, t.start_time ASC"
cur.execute(sql, params)
rows = cur.fetchall()
cur.close()
conn.close()
# Return both the full data (including identifier) and the displayed data
full_data = [
[row[0], row[1], format_seconds(row[2]), format_seconds(row[3]), row[4], row[5], row[6], row[7], row[8]]
for row in rows
]
displayed_data = [row[:-3] for row in full_data] # Remove identifier column for display
return displayed_data, full_data # Return both datasets
# Function to highlight query in the transcript
def highlight_query(query, transcript):
if query and transcript:
# Simulate highlight by making words uppercase or surrounding them with markers
highlighted_text = re.sub(f"({re.escape(query)})", r'🔶\1🔶', transcript, flags=re.IGNORECASE)
return highlighted_text
return transcript
# Function to identify the hidden table row index
def get_matching_index(full_data: list, selected_row_values: list):
"""
Returns the index of the row in `full_data` that matches the `selected_row_values`.
Converts the selected meeting_date to a date object for comparison.
"""
# Convert the selected meeting_date (which is in string format) to a date object
selected_row_values[1] = datetime.strptime(selected_row_values[1], "%Y-%m-%d").date()
# Find and return the matching row index
return next(
(i for i, row in enumerate(full_data)
if [row[0], row[1], row[2], row[3], row[4], row[5]] == selected_row_values),
None
)
# Callback for transcript selection
def df_select_callback(full_data: list, evt: gr.SelectData, query = ""):
selected_row_values = evt.row_value if evt.row_value else None
# Use the helper function to find the matching row index
matching_index = get_matching_index(full_data, selected_row_values)
if matching_index is not None:
transcript = str(full_data[matching_index][5]) # Extract transcript
return highlight_query(query, transcript)
return ""
# Callback for video iframe
def df_video_callback(full_data: list, evt: gr.SelectData):
selected_row_values = evt.row_value if evt.row_value else None
# Use the helper function to find the matching row index
matching_index = get_matching_index(full_data, selected_row_values)
if matching_index is not None:
identifier = str(full_data[matching_index][6]) # Access identifier from stored data
if identifier.lower() == "replace":
return "
Video currently not availabile for this transcript due to GLA archiving policies."
#Get Origin Timestamp
timestamp = str(full_data[matching_index][7])
#Add Start Time
ts_start = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][2])), "start")
ts_end = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][3])), "end")
return f""
return ""
def df_video_button_callback(full_data: list, evt: gr.SelectData):
selected_row_values = evt.row_value if evt.row_value else None
# Use the helper function to find the matching row index
matching_index = get_matching_index(full_data, selected_row_values)
if matching_index is not None:
identifier = str(full_data[matching_index][6]) # Access identifier from stored data
if identifier.lower() == "replace":
return ""
#Get Origin Timestamp
timestamp = str(full_data[matching_index][7])
#Add Start Time
ts_start = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][2])), "start")
ts_end = convert_to_url_string(add_duration_to_datetime(timestamp, str(full_data[matching_index][3])), "end")
#Set URLs
url_1 = f"https://player.london.gov.uk/Player/Index/{identifier}"
url_2 = f"https://player.london.gov.uk/Player/Index/{identifier}{ts_start}{ts_end}"
return f"""
"""
return ""
# Callback for transcript selection
def df_prior_callback(full_data: list, evt: gr.SelectData):
selected_row_values = evt.row_value if evt.row_value else None
# Use the helper function to find the matching row index
matching_index = get_matching_index(full_data, selected_row_values)
if matching_index is not None:
transcript_id = int(full_data[matching_index][-1]) # Access identifier from stored data
conn = connect_db()
cur = conn.cursor()
sql = """
WITH target_transcript AS (
SELECT webcast_id, start_time
FROM mopacdb_webcasts.transcripts
WHERE id = %s
)
SELECT transcript
FROM mopacdb_webcasts.transcripts
WHERE webcast_id = (SELECT webcast_id FROM target_transcript)
AND start_time < (SELECT start_time FROM target_transcript)
ORDER BY start_time DESC
LIMIT 1;
"""
cur.execute(sql, (transcript_id,))
prior_transcript = cur.fetchone()
cur.close()
conn.close()
# Check if rows are fetched
if not prior_transcript:
return "" # No data, return None to keep button hidden
return prior_transcript[0]
# Callback for transcript selection
def df_posterior_callback(full_data: list, evt: gr.SelectData):
selected_row_values = evt.row_value if evt.row_value else None
# Use the helper function to find the matching row index
matching_index = get_matching_index(full_data, selected_row_values)
if matching_index is not None:
transcript_id = int(full_data[matching_index][-1]) # Access identifier from stored data
conn = connect_db()
cur = conn.cursor()
sql = """
WITH target_transcript AS (
SELECT webcast_id, start_time
FROM mopacdb_webcasts.transcripts
WHERE id = %s
)
SELECT transcript
FROM mopacdb_webcasts.transcripts
WHERE webcast_id = (SELECT webcast_id FROM target_transcript)
AND start_time > (SELECT start_time FROM target_transcript)
ORDER BY start_time ASC
LIMIT 1;
"""
cur.execute(sql, (transcript_id,))
prior_transcript = cur.fetchone()
cur.close()
conn.close()
# Check if rows are fetched
if not prior_transcript:
return "" # No data, return None to keep button hidden
return prior_transcript[0]
def df_transcript_callback(full_data: list, evt: gr.SelectData):
selected_row_values = evt.row_value if evt.row_value else None
# Use the helper function to find the matching row index
matching_index = get_matching_index(full_data, selected_row_values)
if matching_index is not None:
transcript_id = int(full_data[matching_index][-1]) # Access identifier from stored data
conn = connect_db()
cur = conn.cursor()
sql = """
SELECT start_time, end_time, speaker, transcript
FROM mopacdb_webcasts.transcripts
WHERE webcast_id = (
SELECT webcast_id
FROM mopacdb_webcasts.transcripts
WHERE id = %s
)
ORDER BY start_time ASC;
"""
cur.execute(sql, (transcript_id,))
rows = cur.fetchall()
cur.close()
conn.close()
# Check if rows are fetched
if not rows:
return None # No data, return None to keep button hidden
# Create a DataFrame and save it as a CSV
df = pd.DataFrame(rows, columns=["start_time", "end_time", "speaker", "transcript"])
#Create Filename
conn = connect_db()
cur = conn.cursor()
sql = """
SELECT CONCAT(meeting_type, ' - ', meeting_date) AS meeting_details
FROM mopacdb_webcasts.webcasts
WHERE id = (
SELECT webcast_id
FROM mopacdb_webcasts.transcripts
WHERE id = %s
);
"""
cur.execute(sql, (transcript_id,))
filename = cur.fetchone()[0]
cur.close()
conn.close()
csv_file_path = str(filename)+".csv" #Set File path
df.to_csv(csv_file_path, index=False) # Save to CSV
return csv_file_path # This should show the button
# Gradio interface function
def gradio_interface(query, meeting_type, start_date, end_date):
if start_date:
start_date = datetime.fromtimestamp(start_date) if isinstance(start_date, float) else datetime.strptime(start_date, "%Y-%m-%d")
if end_date:
end_date = datetime.fromtimestamp(end_date) if isinstance(end_date, float) else datetime.strptime(end_date, "%Y-%m-%d")
displayed_results, full_results = search_transcripts(query, meeting_type, start_date, end_date)
no_results_message = "No results found. Please try a different search." if not displayed_results else ""
return displayed_results, full_results, no_results_message
#Set custom CSS
custom_css = """
"""
# Build the Gradio app
with gr.Blocks() as app:
gr.Markdown(
"""
MOPAC | DS - 🔍 Webcast Transcript Search Tool
""",
sanitize_html=False
)
#Load TextBox CSS
gr.HTML(custom_css)
meeting_types = get_meeting_types()
with gr.Row():
query = gr.Textbox(label="Search Query (optional)")
meeting_type = gr.Dropdown(label="Meeting Type (optional)", choices=meeting_types, multiselect=True)
start_date = gr.DateTime(label="Start Date (optional)", include_time=False, info="Enter Date in YYYY-MM-DD Format")
end_date = gr.DateTime(label="End Date (optional)", include_time=False, info="Enter Date in YYYY-MM-DD Format", interactive = True, value=date.today().isoformat())
search_btn = gr.Button("Search")
# Table for results (without "Identifier" column)
results_table = gr.DataFrame(
headers=["Meeting Type", "Date", "Start Time", "End Time", "Speaker", "Transcript"],
datatype=["str", "date", "str", "str", "str", "str"],
interactive=False
)
no_results_text = gr.Markdown()
# Store full data (including "Identifier") separately
full_results_state = gr.State([])
# Search button updates the table and stores full results
search_btn.click(fn=gradio_interface, inputs=[query, meeting_type, start_date, end_date], outputs=[results_table, full_results_state, no_results_text])
#Add Carriage Return
query.submit(fn=gradio_interface, inputs=[query, meeting_type, start_date, end_date], outputs=[results_table, full_results_state, no_results_text])
with gr.Row():
with gr.Column(scale=2):
transcript_prior = gr.Textbox(label="Prior Transcript", interactive=False, elem_id="purple-textbox")
selected_transcript = gr.Textbox(label="Selected Transcript",
info ="Search Queries will appear demarcated using orange markers (🔶).",
interactive=False, elem_id="yellow-textbox")
transcript_posterior = gr.Textbox(label="Posterior Transcript", interactive=False, elem_id="purple-textbox")
gr.Markdown("**The full transcript of any selected speech segment will appear available for download here...**")
transcript_bttn = gr.File(label="Download Full Transcript")
with gr.Column(scale=3):
video_iframe = gr.HTML("")
video_bttn_full = gr.HTML("")
# Update transcript and video using the stored full data
results_table.select(fn=df_select_callback, inputs=[full_results_state, query], outputs=[selected_transcript])
results_table.select(fn=df_prior_callback, inputs=[full_results_state], outputs=[transcript_prior])
results_table.select(fn=df_posterior_callback, inputs=[full_results_state], outputs=[transcript_posterior])
results_table.select(fn=df_video_callback, inputs=[full_results_state], outputs=[video_iframe])
results_table.select(fn=df_video_button_callback, inputs=[full_results_state], outputs=[video_bttn_full])
results_table.select(fn=df_transcript_callback, inputs=[full_results_state], outputs=[transcript_bttn])
app.launch()