|
|
import asyncio |
|
|
import gradio as gr |
|
|
from prefect import flow, get_run_logger, pause_flow_run, settings |
|
|
from prefect.context import get_run_context |
|
|
from prefect.input import RunInput |
|
|
|
|
|
|
|
|
class UserFeedback(RunInput): |
|
|
"""Input model for capturing user feedback in the Prefect UI.""" |
|
|
|
|
|
rating: int |
|
|
comment: str = "" |
|
|
improve_feature: bool = False |
|
|
|
|
|
|
|
|
@flow(name="interactive-feedback-flow") |
|
|
async def collect_feedback(username: str): |
|
|
"""A flow that pauses to collect user feedback via the Prefect UI.""" |
|
|
logger = get_run_logger() |
|
|
|
|
|
logger.info(f"Started feedback collection for user: {username}") |
|
|
|
|
|
|
|
|
flow_run = get_run_context().flow_run |
|
|
flow_run_url = "" |
|
|
|
|
|
if flow_run and settings.PREFECT_UI_URL: |
|
|
flow_run_url = ( |
|
|
f"{settings.PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run.id}" |
|
|
) |
|
|
logger.info(f"Flow run UI available at: {flow_run_url}") |
|
|
|
|
|
|
|
|
logger.info("Pausing flow to collect feedback...") |
|
|
user_feedback = await pause_flow_run( |
|
|
wait_for_input=UserFeedback.with_initial_data( |
|
|
description="Please provide your feedback about our service.", |
|
|
rating=5, |
|
|
comment="", |
|
|
improve_feature=False, |
|
|
), |
|
|
timeout=300, |
|
|
) |
|
|
|
|
|
|
|
|
logger.info(f"Feedback received from {username}:") |
|
|
logger.info(f"Rating: {user_feedback.rating}/5") |
|
|
logger.info(f"Comment: {user_feedback.comment}") |
|
|
logger.info(f"Wants feature improvement: {user_feedback.improve_feature}") |
|
|
|
|
|
return { |
|
|
"username": username, |
|
|
"rating": user_feedback.rating, |
|
|
"comment": user_feedback.comment, |
|
|
"improve_feature": user_feedback.improve_feature, |
|
|
} |
|
|
|
|
|
|
|
|
def run_feedback_flow(username): |
|
|
"""Run the Prefect flow and handle the UI interaction.""" |
|
|
try: |
|
|
if not username: |
|
|
return "Please enter a username before starting the process." |
|
|
|
|
|
print(f"Starting feedback collection for user: {username}") |
|
|
|
|
|
|
|
|
import threading |
|
|
|
|
|
def run_flow(): |
|
|
|
|
|
result = asyncio.run(collect_feedback(username)) |
|
|
print(f"Flow completed with results: {result}") |
|
|
|
|
|
|
|
|
threading.Thread(target=run_flow).start() |
|
|
|
|
|
return f""" |
|
|
Process started for {username}! |
|
|
|
|
|
IMPORTANT INSTRUCTIONS: |
|
|
1. Look for a line in the terminal that says "Flow run UI available at: [URL]" |
|
|
2. Copy that URL and open it in your browser |
|
|
3. Click "Resume" in the Prefect UI |
|
|
4. Fill out the feedback form and submit |
|
|
|
|
|
The results will be available in the terminal after completion. |
|
|
""" |
|
|
except Exception as e: |
|
|
return f"An error occurred: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# Interactive Human-in-the-Loop Workflow with Prefect") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
username_input = gr.Textbox(label="Your Name") |
|
|
submit_btn = gr.Button("Start Feedback Process") |
|
|
|
|
|
with gr.Column(): |
|
|
output_text = gr.Textbox(label="Result", lines=10) |
|
|
flow_instructions = gr.Markdown( |
|
|
""" |
|
|
## Instructions: |
|
|
1. Enter your name and click "Start Feedback Process" |
|
|
2. The flow will pause waiting for your input |
|
|
3. Go to the Prefect UI (URL will be printed in terminal) |
|
|
4. Click "Resume" and provide your feedback |
|
|
5. Results will appear here after completion |
|
|
""" |
|
|
) |
|
|
|
|
|
submit_btn.click(fn=run_feedback_flow, inputs=username_input, outputs=output_text) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|