sgbaird commited on
Commit
836766a
·
1 Parent(s): 4ad5b9d

Enhance feedback collection flow with Prefect Cloud integration and improved error handling

Browse files
Files changed (1) hide show
  1. app.py +106 -19
app.py CHANGED
@@ -1,8 +1,11 @@
1
  import asyncio
 
2
  import gradio as gr
3
  from prefect import flow, get_run_logger, pause_flow_run, settings
4
  from prefect.context import get_run_context
5
  from prefect.input import RunInput
 
 
6
 
7
 
8
  class UserFeedback(RunInput):
@@ -15,22 +18,47 @@ class UserFeedback(RunInput):
15
 
16
  @flow(name="interactive-feedback-flow")
17
  async def collect_feedback(username: str):
18
- """A flow that pauses to collect user feedback via the Prefect UI."""
19
  logger = get_run_logger()
20
 
 
 
 
21
  logger.info(f"Started feedback collection for user: {username}")
22
 
23
  # Construct a message that would typically be sent as a notification
24
- message = f"Collecting feedback from user: {username}"
25
-
26
- # Get flow run context for UI URL
27
  flow_run = get_run_context().flow_run
28
  flow_run_url = ""
29
 
30
- if flow_run and settings.PREFECT_UI_URL:
31
- flow_run_url = (
32
- f"{settings.PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run.id}"
33
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
  logger.info(f"Flow run UI available at: {flow_run_url}")
35
 
36
  # Pause the flow and wait for input
@@ -42,7 +70,7 @@ async def collect_feedback(username: str):
42
  comment="",
43
  improve_feature=False,
44
  ),
45
- timeout=300, # 5 minutes timeout
46
  )
47
 
48
  # Process the input received after the flow is resumed
@@ -66,21 +94,80 @@ async def collect_feedback(username: str):
66
  def run_feedback_flow(username):
67
  """Run the Prefect flow and handle the UI interaction."""
68
  try:
69
- # Run the flow and get the result
70
- result = asyncio.run(collect_feedback(username))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
 
72
- # Format the result as a string for display
73
- message = f"""
74
- Thank you, {result['username']}!
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
 
76
- Your feedback has been recorded:
77
- - Rating: {result['rating']}/5
78
- - Comment: {result['comment']}
79
- - Feature improvement requested: {result['improve_feature']}
 
 
 
 
 
 
 
80
  """
81
 
82
- return message
83
  except Exception as e:
 
 
 
 
84
  return f"An error occurred: {str(e)}"
85
 
86
 
 
1
  import asyncio
2
+ import os
3
  import gradio as gr
4
  from prefect import flow, get_run_logger, pause_flow_run, settings
5
  from prefect.context import get_run_context
6
  from prefect.input import RunInput
7
+ from prefect.deployments import Deployment
8
+ from prefect.server.schemas.schedules import IntervalSchedule
9
 
10
 
11
  class UserFeedback(RunInput):
 
18
 
19
  @flow(name="interactive-feedback-flow")
20
  async def collect_feedback(username: str):
21
+ """A flow that pauses to collect user feedback via the Prefect Cloud UI."""
22
  logger = get_run_logger()
23
 
24
+ # Ensure we're using Prefect Cloud
25
+ logger.info("Running with Prefect Cloud configuration")
26
+
27
  logger.info(f"Started feedback collection for user: {username}")
28
 
29
  # Construct a message that would typically be sent as a notification
30
+ message = (
31
+ f"Collecting feedback from user: {username}" # Get flow run context for UI URL
32
+ )
33
  flow_run = get_run_context().flow_run
34
  flow_run_url = ""
35
 
36
+ if flow_run:
37
+ # For Prefect Cloud, use the PREFECT_API_URL to determine the Cloud UI URL
38
+ if os.environ.get("PREFECT_API_URL") and "api.prefect.cloud" in os.environ.get(
39
+ "PREFECT_API_URL", ""
40
+ ):
41
+ # Extract account_id and workspace_name from the API URL
42
+ # Format: https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_name}/
43
+ api_url = os.environ["PREFECT_API_URL"]
44
+ parts = api_url.split("/")
45
+
46
+ if (
47
+ len(parts) >= 7
48
+ ): # Ensure we have enough parts to extract account_id and workspace_name
49
+ account_id = parts[5]
50
+ workspace_name = parts[7]
51
+ cloud_ui_url = f"https://app.prefect.cloud/account/{account_id}/workspace/{workspace_name}"
52
+ flow_run_url = f"{cloud_ui_url}/flow-runs/flow-run/{flow_run.id}"
53
+ else:
54
+ # Fallback to standard URL formation if we can't parse the API URL
55
+ if settings.PREFECT_UI_URL:
56
+ flow_run_url = f"{settings.PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run.id}"
57
+ else:
58
+ # Fallback for local Prefect or other setups
59
+ if settings.PREFECT_UI_URL:
60
+ flow_run_url = f"{settings.PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run.id}"
61
+
62
  logger.info(f"Flow run UI available at: {flow_run_url}")
63
 
64
  # Pause the flow and wait for input
 
70
  comment="",
71
  improve_feature=False,
72
  ),
73
+ timeout=60, # 5 minutes timeout
74
  )
75
 
76
  # Process the input received after the flow is resumed
 
94
  def run_feedback_flow(username):
95
  """Run the Prefect flow and handle the UI interaction."""
96
  try:
97
+ if not username:
98
+ return "Please enter a username before starting the process."
99
+
100
+ print(f"Starting feedback collection for user: {username}")
101
+
102
+ # Create a deployment of the flow to ensure it runs properly in Prefect Cloud
103
+ from prefect.deployments import Deployment
104
+
105
+ # Create or update the deployment
106
+ deployment = Deployment.build_from_flow(
107
+ flow=collect_feedback,
108
+ name="interactive-feedback-deployment",
109
+ version=None, # Auto-version
110
+ work_queue_name="default",
111
+ tags=["interactive", "feedback"],
112
+ parameters={"username": username},
113
+ enforce_parameter_schema=False,
114
+ )
115
+
116
+ # Apply the deployment
117
+ deployment_id = deployment.apply()
118
+ print(f"Created deployment with ID: {deployment_id}")
119
+
120
+ # Run the deployment
121
+ from prefect.deployments import run_deployment
122
 
123
+ flow_run = run_deployment(
124
+ name="interactive-feedback-flow/interactive-feedback-deployment",
125
+ parameters={"username": username},
126
+ )
127
+
128
+ print(f"Flow run created with ID: {flow_run.id}")
129
+
130
+ # Get the URL
131
+ flow_run_url = ""
132
+ if settings.PREFECT_UI_URL:
133
+ flow_run_url = (
134
+ f"{settings.PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run.id}"
135
+ )
136
+ elif os.environ.get(
137
+ "PREFECT_API_URL"
138
+ ) and "api.prefect.cloud" in os.environ.get("PREFECT_API_URL", ""):
139
+ # Extract account_id and workspace_name from the API URL
140
+ api_url = os.environ["PREFECT_API_URL"]
141
+ parts = api_url.split("/")
142
+
143
+ if len(parts) >= 7:
144
+ account_id = parts[5]
145
+ workspace_name = parts[7]
146
+ flow_run_url = f"https://app.prefect.cloud/account/{account_id}/workspace/{workspace_name}/flow-runs/flow-run/{flow_run.id}"
147
+
148
+ print(f"Flow run UI available at: {flow_run_url}")
149
+
150
+ return f"""
151
+ Process started for {username}!
152
 
153
+ IMPORTANT INSTRUCTIONS:
154
+
155
+ Go to this URL to provide your feedback:
156
+ {flow_run_url}
157
+
158
+ 1. Click the "Resume" button in the Prefect UI
159
+ 2. Fill out the feedback form and submit
160
+ 3. Your input will be processed by the workflow
161
+
162
+ Note: This window will not automatically update with the results.
163
+ The complete results will be available in the Prefect UI and terminal.
164
  """
165
 
 
166
  except Exception as e:
167
+ import traceback
168
+
169
+ print(f"Error: {str(e)}")
170
+ print(traceback.format_exc())
171
  return f"An error occurred: {str(e)}"
172
 
173