redhairedshanks1 commited on
Commit
4555a81
Β·
1 Parent(s): a955a4b

Update services/pipeline_executor.py

Browse files
Files changed (1) hide show
  1. services/pipeline_executor.py +364 -364
services/pipeline_executor.py CHANGED
@@ -1,364 +1,364 @@
1
- # services/pipeline_executor.py
2
- """
3
- Unified pipeline executor with Bedrock LangChain (priority) and CrewAI (fallback)
4
- """
5
- import json
6
- import os
7
- from typing import Dict, Any, Optional, Generator, List
8
-
9
- # For Bedrock LangChain
10
- try:
11
- from langchain_aws import ChatBedrock
12
- from langchain.agents import AgentExecutor, create_tool_calling_agent
13
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
14
- from services.master_tools import get_master_tools as get_langchain_tools
15
- BEDROCK_AVAILABLE = True
16
- except ImportError:
17
- BEDROCK_AVAILABLE = False
18
- print("Warning: LangChain Bedrock not available")
19
-
20
- # For CrewAI fallback
21
- from services.agent_crewai import run_agent_streaming as crewai_run_streaming
22
-
23
-
24
- # ========================
25
- # BEDROCK LANGCHAIN EXECUTOR
26
- # ========================
27
-
28
- def execute_pipeline_bedrock(
29
- pipeline: Dict[str, Any],
30
- file_path: str,
31
- session_id: Optional[str] = None
32
- ) -> Dict[str, Any]:
33
- """
34
- Execute pipeline using Bedrock + LangChain (priority method)
35
- """
36
- if not BEDROCK_AVAILABLE:
37
- raise RuntimeError("Bedrock LangChain not available")
38
-
39
- try:
40
- llm = ChatBedrock(
41
- model_id=os.getenv("BEDROCK_MODEL", "anthropic.claude-3-5-sonnet-20241022-v2:0"),
42
- region_name=os.getenv("AWS_REGION", "us-east-1"),
43
- temperature=0.0,
44
- )
45
-
46
- tools = get_langchain_tools()
47
-
48
- system_instructions = """You are MasterLLM, a precise document processing agent.
49
-
50
- Execute the provided pipeline components in ORDER. For each component:
51
- 1. Call the corresponding tool with exact parameters
52
- 2. Wait for the result
53
- 3. Move to next component
54
-
55
- IMPORTANT:
56
- - Follow the pipeline order strictly
57
- - Use the file_path provided for all file-based operations
58
- - For text-processing tools (summarize, classify, NER, translate), use extracted text from previous steps
59
- - At the end, call 'finalize' tool with complete results
60
-
61
- Pipeline components will be in format:
62
- {
63
- "tool_name": "extract_text",
64
- "start_page": 1,
65
- "end_page": 5,
66
- "params": {}
67
- }"""
68
-
69
- prompt = ChatPromptTemplate.from_messages([
70
- ("system", system_instructions),
71
- ("system", "File path: {file_path}"),
72
- ("system", "Pipeline to execute: {pipeline_json}"),
73
- ("system", "Session ID: {session_id}"),
74
- ("human", "Execute the pipeline. Process each component in order and finalize with complete JSON results.")
75
- ])
76
-
77
- agent = create_tool_calling_agent(llm, tools, prompt)
78
- executor = AgentExecutor(
79
- agent=agent,
80
- tools=tools,
81
- verbose=True,
82
- max_iterations=15,
83
- handle_parsing_errors=True,
84
- )
85
-
86
- result = executor.invoke({
87
- "input": f"Execute pipeline: {pipeline['pipeline_name']}",
88
- "file_path": file_path,
89
- "pipeline_json": json.dumps(pipeline, indent=2),
90
- "session_id": session_id or "unknown"
91
- })
92
-
93
- return result
94
-
95
- except Exception as e:
96
- raise RuntimeError(f"Bedrock execution failed: {str(e)}")
97
-
98
-
99
- def execute_pipeline_bedrock_streaming(
100
- pipeline: Dict[str, Any],
101
- file_path: str,
102
- session_id: Optional[str] = None
103
- ) -> Generator[Dict[str, Any], None, None]:
104
- """
105
- Execute pipeline using Bedrock + LangChain with streaming
106
- """
107
- if not BEDROCK_AVAILABLE:
108
- raise RuntimeError("Bedrock LangChain not available")
109
-
110
- try:
111
- llm = ChatBedrock(
112
- model_id=os.getenv("BEDROCK_MODEL", "anthropic.claude-3-5-sonnet-20241022-v2:0"),
113
- region_name=os.getenv("AWS_REGION", "us-east-1"),
114
- temperature=0.0,
115
- )
116
-
117
- tools = get_langchain_tools()
118
-
119
- system_instructions = """You are MasterLLM. Execute the pipeline components in ORDER.
120
-
121
- For each component, call the tool and wait for results."""
122
-
123
- prompt = ChatPromptTemplate.from_messages([
124
- ("system", system_instructions),
125
- ("system", "File: {file_path}"),
126
- ("system", "Pipeline: {pipeline_json}"),
127
- ("human", "Execute the pipeline")
128
- ])
129
-
130
- agent = create_tool_calling_agent(llm, tools, prompt)
131
- executor = AgentExecutor(
132
- agent=agent,
133
- tools=tools,
134
- verbose=True,
135
- max_iterations=15,
136
- handle_parsing_errors=True,
137
- )
138
-
139
- # Yield initial status
140
- yield {
141
- "type": "status",
142
- "message": "Initializing Bedrock executor...",
143
- "executor": "bedrock"
144
- }
145
-
146
- step_count = 0
147
-
148
- # Stream execution
149
- for event in executor.stream({
150
- "input": f"Execute: {pipeline['pipeline_name']}",
151
- "file_path": file_path,
152
- "pipeline_json": json.dumps(pipeline, indent=2)
153
- }):
154
- if "actions" in event:
155
- for action in event.get("actions", []):
156
- step_count += 1
157
- tool = getattr(action, "tool", "unknown")
158
- yield {
159
- "type": "step",
160
- "step": step_count,
161
- "tool": tool,
162
- "status": "executing",
163
- "executor": "bedrock"
164
- }
165
-
166
- elif "steps" in event:
167
- for step in event.get("steps", []):
168
- observation = str(getattr(step, "observation", ""))[:500]
169
- yield {
170
- "type": "step",
171
- "step": step_count,
172
- "status": "completed",
173
- "observation": observation,
174
- "executor": "bedrock"
175
- }
176
-
177
- elif "output" in event:
178
- yield {
179
- "type": "final",
180
- "data": event.get("output"),
181
- "executor": "bedrock"
182
- }
183
- return
184
-
185
- except Exception as e:
186
- yield {
187
- "type": "error",
188
- "error": str(e),
189
- "executor": "bedrock"
190
- }
191
-
192
-
193
- # ========================
194
- # CREWAI EXECUTOR (FALLBACK)
195
- # ========================
196
-
197
- def execute_pipeline_crewai_streaming(
198
- pipeline: Dict[str, Any],
199
- file_path: str,
200
- session_id: Optional[str] = None
201
- ) -> Generator[Dict[str, Any], None, None]:
202
- """
203
- Execute pipeline using CrewAI (fallback method)
204
- """
205
- try:
206
- # Yield initial status
207
- yield {
208
- "type": "status",
209
- "message": "Using CrewAI executor (fallback)...",
210
- "executor": "crewai"
211
- }
212
-
213
- # Use existing CrewAI streaming function
214
- execution_goal = (
215
- f"Execute the approved plan: {pipeline['pipeline_name']}. "
216
- f"Process {len(pipeline.get('components', []))} components in order."
217
- )
218
-
219
- for event in crewai_run_streaming(
220
- user_input=execution_goal,
221
- session_file_path=file_path,
222
- plan=pipeline,
223
- chat_history=[]
224
- ):
225
- # Pass through CrewAI events with executor tag
226
- if isinstance(event, dict):
227
- event["executor"] = "crewai"
228
- yield event
229
-
230
- except Exception as e:
231
- yield {
232
- "type": "error",
233
- "error": str(e),
234
- "executor": "crewai"
235
- }
236
-
237
-
238
- # ========================
239
- # UNIFIED EXECUTOR WITH FALLBACK
240
- # ========================
241
-
242
- def execute_pipeline_streaming(
243
- pipeline: Dict[str, Any],
244
- file_path: str,
245
- session_id: Optional[str] = None,
246
- prefer_bedrock: bool = True
247
- ) -> Generator[Dict[str, Any], None, None]:
248
- """
249
- Execute pipeline with fallback mechanism.
250
-
251
- Priority:
252
- 1. Try Bedrock + LangChain - if available
253
- 2. Fallback to CrewAI - if Bedrock fails
254
-
255
- Yields:
256
- Status updates and final results
257
- """
258
- # Try Bedrock first (priority)
259
- if prefer_bedrock and BEDROCK_AVAILABLE:
260
- try:
261
- print(f"πŸ† Executing pipeline with Bedrock: {pipeline['pipeline_name']}")
262
- yield {
263
- "type": "info",
264
- "message": "Attempting execution with Bedrock LangChain...",
265
- "executor": "bedrock"
266
- }
267
-
268
- # Try to execute with Bedrock
269
- error_occurred = False
270
- for event in execute_pipeline_bedrock_streaming(pipeline, file_path, session_id):
271
- yield event
272
-
273
- # Check if error occurred
274
- if event.get("type") == "error":
275
- error_occurred = True
276
- bedrock_error = event.get("error")
277
- print(f"❌ Bedrock execution failed: {bedrock_error}")
278
- print("πŸ”„ Falling back to CrewAI...")
279
-
280
- yield {
281
- "type": "info",
282
- "message": f"Bedrock failed: {bedrock_error}. Switching to CrewAI...",
283
- "executor": "fallback"
284
- }
285
- break
286
-
287
- # If final result, we're done
288
- if event.get("type") == "final":
289
- print(f"βœ… Bedrock execution completed: {pipeline['pipeline_name']}")
290
- return
291
-
292
- # If we got here with error, fall back to CrewAI
293
- if error_occurred:
294
- # Fall through to CrewAI
295
- pass
296
- else:
297
- # Successful completion (shouldn't reach here normally)
298
- return
299
-
300
- except Exception as bedrock_error:
301
- print(f"❌ Bedrock execution exception: {str(bedrock_error)}")
302
- print("πŸ”„ Falling back to CrewAI...")
303
- yield {
304
- "type": "info",
305
- "message": f"Bedrock exception: {str(bedrock_error)}. Switching to CrewAI...",
306
- "executor": "fallback"
307
- }
308
-
309
- # Fallback to CrewAI
310
- print(f"πŸ”„ Executing pipeline with CrewAI: {pipeline['pipeline_name']}")
311
- for event in execute_pipeline_crewai_streaming(pipeline, file_path, session_id):
312
- yield event
313
-
314
- if event.get("type") == "final":
315
- print(f"βœ… CrewAI execution completed: {pipeline['pipeline_name']}")
316
- return
317
-
318
-
319
- # ========================
320
- # NON-STREAMING EXECUTOR
321
- # ========================
322
-
323
- def execute_pipeline(
324
- pipeline: Dict[str, Any],
325
- file_path: str,
326
- session_id: Optional[str] = None,
327
- prefer_bedrock: bool = True
328
- ) -> Dict[str, Any]:
329
- """
330
- Execute pipeline (non-streaming) with fallback
331
- """
332
- final_result = None
333
-
334
- for event in execute_pipeline_streaming(pipeline, file_path, session_id, prefer_bedrock):
335
- if event.get("type") == "final":
336
- final_result = event.get("data")
337
- break
338
-
339
- if final_result is None:
340
- raise RuntimeError("Pipeline execution completed without final result")
341
-
342
- return final_result
343
-
344
-
345
- if __name__ == "__main__":
346
- # Test
347
- test_pipeline = {
348
- "pipeline_name": "test-extraction",
349
- "components": [
350
- {
351
- "tool_name": "extract_text",
352
- "start_page": 1,
353
- "end_page": 1,
354
- "params": {}
355
- }
356
- ],
357
- "_generator": "test"
358
- }
359
-
360
- test_file = "test.pdf"
361
-
362
- print("Testing streaming execution...")
363
- for event in execute_pipeline_streaming(test_pipeline, test_file):
364
- print(f"Event: {event}")
 
1
+ # services/pipeline_executor.py
2
+ """
3
+ Unified pipeline executor with Bedrock LangChain (priority) and CrewAI (fallback)
4
+ """
5
+ import json
6
+ import os
7
+ from typing import Dict, Any, Optional, Generator, List
8
+
9
+ # For Bedrock LangChain
10
+ try:
11
+ from langchain_aws import ChatBedrock
12
+ from langchain.agents import AgentExecutor, create_tool_calling_agent
13
+ from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
14
+ from services.master_tools import get_master_tools as get_langchain_tools
15
+ BEDROCK_AVAILABLE = True
16
+ except ImportError:
17
+ BEDROCK_AVAILABLE = False
18
+ print("Warning: LangChain Bedrock not available")
19
+
20
+ # For CrewAI fallback
21
+ from services.agent_crewai import run_agent_streaming as crewai_run_streaming
22
+
23
+
24
+ # ========================
25
+ # BEDROCK LANGCHAIN EXECUTOR
26
+ # ========================
27
+
28
+ def execute_pipeline_bedrock(
29
+ pipeline: Dict[str, Any],
30
+ file_path: str,
31
+ session_id: Optional[str] = None
32
+ ) -> Dict[str, Any]:
33
+ """
34
+ Execute pipeline using Bedrock + LangChain (priority method)
35
+ """
36
+ if not BEDROCK_AVAILABLE:
37
+ raise RuntimeError("Bedrock LangChain not available")
38
+
39
+ try:
40
+ llm = ChatBedrock(
41
+ model_id=os.getenv("BEDROCK_MODEL", "anthropic.claude-3-5-sonnet-20241022-v2:0"),
42
+ region_name=os.getenv("AWS_REGION", "us-east-1"),
43
+ temperature=0.0,
44
+ )
45
+
46
+ tools = get_langchain_tools()
47
+
48
+ system_instructions = """You are MasterLLM, a precise document processing agent.
49
+
50
+ Execute the provided pipeline components in ORDER. For each component:
51
+ 1. Call the corresponding tool with exact parameters
52
+ 2. Wait for the result
53
+ 3. Move to next component
54
+
55
+ IMPORTANT:
56
+ - Follow the pipeline order strictly
57
+ - Use the file_path provided for all file-based operations
58
+ - For text-processing tools (summarize, classify, NER, translate), use extracted text from previous steps
59
+ - At the end, call 'finalize' tool with complete results
60
+
61
+ Pipeline components will be in format:
62
+ {
63
+ "tool_name": "extract_text",
64
+ "start_page": 1,
65
+ "end_page": 5,
66
+ "params": {}
67
+ }"""
68
+
69
+ prompt = ChatPromptTemplate.from_messages([
70
+ ("system", system_instructions),
71
+ ("system", "File path: {file_path}"),
72
+ ("system", "Pipeline to execute: {pipeline_json}"),
73
+ ("system", "Session ID: {session_id}"),
74
+ ("human", "Execute the pipeline. Process each component in order and finalize with complete JSON results.")
75
+ ])
76
+
77
+ agent = create_tool_calling_agent(llm, tools, prompt)
78
+ executor = AgentExecutor(
79
+ agent=agent,
80
+ tools=tools,
81
+ verbose=True,
82
+ max_iterations=15,
83
+ handle_parsing_errors=True,
84
+ )
85
+
86
+ result = executor.invoke({
87
+ "input": f"Execute pipeline: {pipeline['pipeline_name']}",
88
+ "file_path": file_path,
89
+ "pipeline_json": json.dumps(pipeline, indent=2),
90
+ "session_id": session_id or "unknown"
91
+ })
92
+
93
+ return result
94
+
95
+ except Exception as e:
96
+ raise RuntimeError(f"Bedrock execution failed: {str(e)}")
97
+
98
+
99
+ def execute_pipeline_bedrock_streaming(
100
+ pipeline: Dict[str, Any],
101
+ file_path: str,
102
+ session_id: Optional[str] = None
103
+ ) -> Generator[Dict[str, Any], None, None]:
104
+ """
105
+ Execute pipeline using Bedrock + LangChain with streaming
106
+ """
107
+ if not BEDROCK_AVAILABLE:
108
+ raise RuntimeError("Bedrock LangChain not available")
109
+
110
+ try:
111
+ llm = ChatBedrock(
112
+ model_id=os.getenv("BEDROCK_MODEL", "anthropic.claude-3-5-sonnet-20241022-v2:0"),
113
+ region_name=os.getenv("AWS_REGION", "us-east-1"),
114
+ temperature=0.0,
115
+ )
116
+
117
+ tools = get_langchain_tools()
118
+
119
+ system_instructions = """You are MasterLLM. Execute the pipeline components in ORDER.
120
+
121
+ For each component, call the tool and wait for results."""
122
+
123
+ prompt = ChatPromptTemplate.from_messages([
124
+ ("system", system_instructions),
125
+ ("system", "File: {file_path}"),
126
+ ("system", "Pipeline: {pipeline_json}"),
127
+ ("human", "Execute the pipeline")
128
+ ])
129
+
130
+ agent = create_tool_calling_agent(llm, tools, prompt)
131
+ executor = AgentExecutor(
132
+ agent=agent,
133
+ tools=tools,
134
+ verbose=True,
135
+ max_iterations=15,
136
+ handle_parsing_errors=True,
137
+ )
138
+
139
+ # Yield initial status
140
+ yield {
141
+ "type": "status",
142
+ "message": "Initializing Bedrock executor...",
143
+ "executor": "bedrock"
144
+ }
145
+
146
+ step_count = 0
147
+
148
+ # Stream execution
149
+ for event in executor.stream({
150
+ "input": f"Execute: {pipeline['pipeline_name']}",
151
+ "file_path": file_path,
152
+ "pipeline_json": json.dumps(pipeline, indent=2)
153
+ }):
154
+ if "actions" in event:
155
+ for action in event.get("actions", []):
156
+ step_count += 1
157
+ tool = getattr(action, "tool", "unknown")
158
+ yield {
159
+ "type": "step",
160
+ "step": step_count,
161
+ "tool": tool,
162
+ "status": "executing",
163
+ "executor": "bedrock"
164
+ }
165
+
166
+ elif "steps" in event:
167
+ for step in event.get("steps", []):
168
+ observation = str(getattr(step, "observation", ""))[:500]
169
+ yield {
170
+ "type": "step",
171
+ "step": step_count,
172
+ "status": "completed",
173
+ "observation": observation,
174
+ "executor": "bedrock"
175
+ }
176
+
177
+ elif "output" in event:
178
+ yield {
179
+ "type": "final",
180
+ "data": event.get("output"),
181
+ "executor": "bedrock"
182
+ }
183
+ return
184
+
185
+ except Exception as e:
186
+ yield {
187
+ "type": "error",
188
+ "error": str(e),
189
+ "executor": "bedrock"
190
+ }
191
+
192
+
193
+ # ========================
194
+ # CREWAI EXECUTOR (FALLBACK)
195
+ # ========================
196
+
197
+ def execute_pipeline_crewai_streaming(
198
+ pipeline: Dict[str, Any],
199
+ file_path: str,
200
+ session_id: Optional[str] = None
201
+ ) -> Generator[Dict[str, Any], None, None]:
202
+ """
203
+ Execute pipeline using CrewAI (fallback method)
204
+ """
205
+ try:
206
+ # Yield initial status
207
+ yield {
208
+ "type": "status",
209
+ "message": "Using CrewAI executor (fallback)...",
210
+ "executor": "crewai"
211
+ }
212
+
213
+ # Use existing CrewAI streaming function
214
+ execution_goal = (
215
+ f"Execute the approved plan: {pipeline['pipeline_name']}. "
216
+ f"Process {len(pipeline.get('components', []))} components in order."
217
+ )
218
+
219
+ for event in crewai_run_streaming(
220
+ user_input=execution_goal,
221
+ session_file_path=file_path,
222
+ plan=pipeline,
223
+ chat_history=[]
224
+ ):
225
+ # Pass through CrewAI events with executor tag
226
+ if isinstance(event, dict):
227
+ event["executor"] = "crewai"
228
+ yield event
229
+
230
+ except Exception as e:
231
+ yield {
232
+ "type": "error",
233
+ "error": str(e),
234
+ "executor": "crewai"
235
+ }
236
+
237
+
238
+ # ========================
239
+ # UNIFIED EXECUTOR WITH FALLBACK
240
+ # ========================
241
+
242
+ def execute_pipeline_streaming(
243
+ pipeline: Dict[str, Any],
244
+ file_path: str,
245
+ session_id: Optional[str] = None,
246
+ prefer_bedrock: bool = True
247
+ ) -> Generator[Dict[str, Any], None, None]:
248
+ """
249
+ Execute pipeline with fallback mechanism.
250
+
251
+ Priority:
252
+ 1. Try Bedrock + LangChain - if available
253
+ 2. Fallback to CrewAI - if Bedrock fails
254
+
255
+ Yields:
256
+ Status updates and final results
257
+ """
258
+ # Try Bedrock first (priority)
259
+ if prefer_bedrock and BEDROCK_AVAILABLE:
260
+ try:
261
+ print(f"πŸ† Executing pipeline with Bedrock: {pipeline['pipeline_name']}")
262
+ yield {
263
+ "type": "info",
264
+ "message": "Attempting execution with Bedrock LangChain...",
265
+ "executor": "bedrock"
266
+ }
267
+
268
+ # Try to execute with Bedrock
269
+ error_occurred = False
270
+ for event in execute_pipeline_bedrock_streaming(pipeline, file_path, session_id):
271
+ yield event
272
+
273
+ # Check if error occurred
274
+ if event.get("type") == "error":
275
+ error_occurred = True
276
+ bedrock_error = event.get("error")
277
+ print(f"❌ Bedrock execution failed: {bedrock_error}")
278
+ print("πŸ”„ Falling back to CrewAI...")
279
+
280
+ yield {
281
+ "type": "info",
282
+ "message": f"Bedrock failed: {bedrock_error}. Switching to CrewAI...",
283
+ "executor": "fallback"
284
+ }
285
+ break
286
+
287
+ # If final result, we're done
288
+ if event.get("type") == "final":
289
+ print(f"βœ… Bedrock execution completed: {pipeline['pipeline_name']}")
290
+ return
291
+
292
+ # If we got here with error, fall back to CrewAI
293
+ if error_occurred:
294
+ # Fall through to CrewAI
295
+ pass
296
+ else:
297
+ # Successful completion (shouldn't reach here normally)
298
+ return
299
+
300
+ except Exception as bedrock_error:
301
+ print(f"❌ Bedrock execution exception: {str(bedrock_error)}")
302
+ print("πŸ”„ Falling back to CrewAI...")
303
+ yield {
304
+ "type": "info",
305
+ "message": f"Bedrock exception: {str(bedrock_error)}. Switching to CrewAI...",
306
+ "executor": "fallback"
307
+ }
308
+
309
+ # Fallback to CrewAI
310
+ print(f"πŸ”„ Executing pipeline with CrewAI: {pipeline['pipeline_name']}")
311
+ for event in execute_pipeline_crewai_streaming(pipeline, file_path, session_id):
312
+ yield event
313
+
314
+ if event.get("type") == "final":
315
+ print(f"βœ… CrewAI execution completed: {pipeline['pipeline_name']}")
316
+ return
317
+
318
+
319
+ # ========================
320
+ # NON-STREAMING EXECUTOR
321
+ # ========================
322
+
323
+ def execute_pipeline(
324
+ pipeline: Dict[str, Any],
325
+ file_path: str,
326
+ session_id: Optional[str] = None,
327
+ prefer_bedrock: bool = True
328
+ ) -> Dict[str, Any]:
329
+ """
330
+ Execute pipeline (non-streaming) with fallback
331
+ """
332
+ final_result = None
333
+
334
+ for event in execute_pipeline_streaming(pipeline, file_path, session_id, prefer_bedrock):
335
+ if event.get("type") == "final":
336
+ final_result = event.get("data")
337
+ break
338
+
339
+ if final_result is None:
340
+ raise RuntimeError("Pipeline execution completed without final result")
341
+
342
+ return final_result
343
+
344
+
345
+ if __name__ == "__main__":
346
+ # Test
347
+ test_pipeline = {
348
+ "pipeline_name": "test-extraction",
349
+ "components": [
350
+ {
351
+ "tool_name": "extract_text",
352
+ "start_page": 1,
353
+ "end_page": 1,
354
+ "params": {}
355
+ }
356
+ ],
357
+ "_generator": "test"
358
+ }
359
+
360
+ test_file = "test.pdf"
361
+
362
+ print("Testing streaming execution...")
363
+ for event in execute_pipeline_streaming(test_pipeline, test_file):
364
+ print(f"Event: {event}")