sgbaird's picture
Refactor feedback collection flow to simplify UI integration and improve error handling
32563aa
import asyncio
import gradio as gr
from prefect import flow, get_run_logger, pause_flow_run, settings
from prefect.context import get_run_context
from prefect.input import RunInput
class UserFeedback(RunInput):
"""Input model for capturing user feedback in the Prefect UI."""
rating: int
comment: str = ""
improve_feature: bool = False
@flow(name="interactive-feedback-flow")
async def collect_feedback(username: str):
"""A flow that pauses to collect user feedback via the Prefect UI."""
logger = get_run_logger()
logger.info(f"Started feedback collection for user: {username}")
# Get flow run context for UI URL
flow_run = get_run_context().flow_run
flow_run_url = ""
if flow_run and settings.PREFECT_UI_URL:
flow_run_url = (
f"{settings.PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run.id}"
)
logger.info(f"Flow run UI available at: {flow_run_url}")
# Pause the flow and wait for input
logger.info("Pausing flow to collect feedback...")
user_feedback = await pause_flow_run(
wait_for_input=UserFeedback.with_initial_data(
description="Please provide your feedback about our service.",
rating=5,
comment="",
improve_feature=False,
),
timeout=300, # 5 minutes timeout
)
# Process the input received after the flow is resumed
logger.info(f"Feedback received from {username}:")
logger.info(f"Rating: {user_feedback.rating}/5")
logger.info(f"Comment: {user_feedback.comment}")
logger.info(f"Wants feature improvement: {user_feedback.improve_feature}")
return {
"username": username,
"rating": user_feedback.rating,
"comment": user_feedback.comment,
"improve_feature": user_feedback.improve_feature,
}
def run_feedback_flow(username):
"""Run the Prefect flow and handle the UI interaction."""
try:
if not username:
return "Please enter a username before starting the process."
print(f"Starting feedback collection for user: {username}")
# Start the flow in a separate thread to avoid blocking the Gradio interface
import threading
def run_flow():
# Run the flow and get the result
result = asyncio.run(collect_feedback(username))
print(f"Flow completed with results: {result}")
# Start the thread
threading.Thread(target=run_flow).start()
return f"""
Process started for {username}!
IMPORTANT INSTRUCTIONS:
1. Look for a line in the terminal that says "Flow run UI available at: [URL]"
2. Copy that URL and open it in your browser
3. Click "Resume" in the Prefect UI
4. Fill out the feedback form and submit
The results will be available in the terminal after completion.
"""
except Exception as e:
return f"An error occurred: {str(e)}"
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Interactive Human-in-the-Loop Workflow with Prefect")
with gr.Row():
with gr.Column():
username_input = gr.Textbox(label="Your Name")
submit_btn = gr.Button("Start Feedback Process")
with gr.Column():
output_text = gr.Textbox(label="Result", lines=10)
flow_instructions = gr.Markdown(
"""
## Instructions:
1. Enter your name and click "Start Feedback Process"
2. The flow will pause waiting for your input
3. Go to the Prefect UI (URL will be printed in terminal)
4. Click "Resume" and provide your feedback
5. Results will appear here after completion
"""
)
submit_btn.click(fn=run_feedback_flow, inputs=username_input, outputs=output_text)
if __name__ == "__main__":
demo.launch()