import logging
import queue
import threading
import time
import gradio as gr
import plotly.graph_objects as go
from dotenv import load_dotenv
from typing import Tuple
import os
### Internal classes
from deal_agent_framework import DealAgentFramework
from log_utils import reformat
load_dotenv(override=True)
class QueueHandler(logging.Handler):
def __init__(self, log_queue):
super().__init__()
self.log_queue = log_queue
def emit(self, record):
self.log_queue.put(self.format(record))
def html_for(log_data):
output = "
".join(log_data[-18:])
return f"""
{output}
"""
def setup_logging(log_queue):
"""
Register and configure for logging
"""
logger = logging.getLogger()
### Remove previous QueueHandlers to avoid duplication and UI floods (handler duplication)
for h in list(logger.handlers):
if isinstance(h, QueueHandler):
logger.removeHandler(h)
handler = QueueHandler(log_queue)
formatter = logging.Formatter(
"[%(asctime)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %z",
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class App:
def __init__(self):
### lazy initialization
self.agent_framework = None
self.timer_enabled = False
### And assign it here
def get_agent_framework(self):
if not self.agent_framework:
self.agent_framework = DealAgentFramework()
return self.agent_framework
def run(self):
with gr.Blocks(
title="WorthBrain",
fill_width=True,
css="footer {visibility: hidden}"
) as ui:
log_data = gr.State([])
def table_for(opps):
"""
Format the initial/final_result
"""
return [
[
opp.deal.product_description,
f"${opp.deal.price:.2f}",
f"${opp.estimate:.2f}",
f"${opp.discount:.2f}",
opp.deal.url,
]
for opp in opps
]
def stream_ui_updates(log_data, log_queue, result_queue):
"""
A generator loop that streams log messages and opportunities data for the data frame in the UI
"""
initial_result = table_for(self.get_agent_framework().memory)
final_result = None
while True:
try:
message = log_queue.get_nowait()
log_data.append(reformat(message))
yield log_data, html_for(log_data), final_result or initial_result
except queue.Empty:
try:
final_result = result_queue.get_nowait()
yield log_data, html_for(log_data), final_result or initial_result
except queue.Empty:
if final_result is not None:
break
time.sleep(0.1)
def get_plot():
"""
Create 3D scatter plot for the result data
"""
documents, vectors, colors = DealAgentFramework.get_plot_data(max_datapoints=800)
fig = go.Figure(
data=[
go.Scatter3d(
x=vectors[:, 0],
y=vectors[:, 1],
z=vectors[:, 2],
mode="markers",
marker=dict(size=2, color=colors, opacity=0.7),
)
]
)
fig.update_layout(
scene=dict(
xaxis_title="x",
yaxis_title="y",
zaxis_title="z",
aspectmode="manual",
aspectratio=dict(x=2.2, y=2.2, z=1), # Make x-axis twice as long
camera=dict(
eye=dict(x=1.6, y=1.6, z=0.8) # Adjust camera position
),
),
height=400,
margin=dict(r=5, b=1, l=5, t=2),
)
return fig
def do_run():
new_opportunities = self.get_agent_framework().run()
table = table_for(new_opportunities)
return table
def run_with_logging(initial_log_data):
"""
Load event handler that starts a background worker thread, streaming log updates
to the UI and yields incremental UI updates.
:yield:
Tuple containing:
- log_data (persistent log state)
- output (HTML-formatted log message)
- final_result (table data for opportunities)
"""
log_queue = queue.Queue()
result_queue = queue.Queue()
setup_logging(log_queue)
def worker():
"""
Background task that executes the agent framework,
sends final results to result_queue, and emits logs
through the logging system.
"""
result = do_run()
result_queue.put(result)
thread = threading.Thread(target=worker, daemon=True)
thread.start()
for log_data, html_output, final_result in stream_ui_updates(
initial_log_data, log_queue, result_queue
):
yield log_data, html_output, final_result
with gr.Row():
gr.Markdown(
'WorthBrain - Autonomous Agent Framework that hunts for deals
'
)
with gr.Row():
gr.Markdown(
'A proprietary fine-tuned LLM deployed on Modal and a RAG pipeline with a frontier model collaborate to send push notifications with great online deals.
'
)
with gr.Row():
gr.Markdown(
"""
The Autonomous Team of Agents Automatically Finds a Deal
Every 10 Minutes!
"""
)
with gr.Row():
opportunities_dataframe = gr.Dataframe(
headers=["Deals found so far", "Price", "Estimate", "Discount", "URL"],
wrap=True,
column_widths=[6, 1, 1, 1, 3],
row_count=10,
column_count=5,
max_height=400,
)
with gr.Row():
with gr.Column(scale=1):
logs = gr.HTML()
with gr.Column(scale=1):
plot = gr.Plot(value=get_plot(), show_label=False)
### Footer
with gr.Row():
gr.Markdown(
"""
© 2026 WorthBrain · Autonomous Deal Hunting Intelligence
"""
)
ui.load(
### connect a load event handler
run_with_logging,
### set inputs
inputs=[log_data],
### set outputs
outputs=[log_data, logs, opportunities_dataframe]
)
### Set timer to re-run the program every N-minutes
timer = gr.Timer(value=600, active=self.timer_enabled)
timer.tick(
run_with_logging,
inputs=[log_data],
outputs=[log_data, logs, opportunities_dataframe],
)
port = int(os.getenv("PORT", 7860))
ui.launch(inbrowser=True, server_name="0.0.0.0", server_port=port)
if __name__ == "__main__":
App().run()