redhairedshanks1 commited on
Commit
e6e087d
Β·
1 Parent(s): 51231a3

Update services/pipeline_executor.py

Browse files
Files changed (1) hide show
  1. services/pipeline_executor.py +104 -24
services/pipeline_executor.py CHANGED
@@ -536,6 +536,8 @@ def get_tool_executor(tool_name: str) -> Optional[Any]:
536
  # UNIFIED EXECUTOR WITH FALLBACK (UPDATED)
537
  # ========================
538
 
 
 
539
  def execute_pipeline_streaming(
540
  pipeline: Dict[str, Any],
541
  file_path: str,
@@ -544,6 +546,7 @@ def execute_pipeline_streaming(
544
  ) -> Generator[Dict[str, Any], None, None]:
545
  """
546
  Execute pipeline with fallback mechanism using master_tools.
 
547
  """
548
  components_executed = []
549
  final_output = None
@@ -554,13 +557,30 @@ def execute_pipeline_streaming(
554
  # Initialize pipeline info
555
  pipeline_id = pipeline.get("pipeline_id")
556
  pipeline_name = pipeline.get("pipeline_name", "Unnamed Pipeline")
557
- steps = pipeline.get("pipeline_steps", [])
558
 
559
- yield {
560
- "type": "info",
561
- "message": f"Starting pipeline: {pipeline_name}",
562
- "executor": "initializing"
563
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
564
 
565
  # Check if tools are available
566
  if not TOOL_REGISTRY:
@@ -578,6 +598,15 @@ def execute_pipeline_streaming(
578
  }
579
  return
580
 
 
 
 
 
 
 
 
 
 
581
  # Try Bedrock first (priority)
582
  if prefer_bedrock and BEDROCK_AVAILABLE:
583
  try:
@@ -588,9 +617,13 @@ def execute_pipeline_streaming(
588
  "executor": "bedrock"
589
  }
590
 
 
 
 
 
591
  # Execute step by step with Bedrock
592
  for step_num, step_def in enumerate(steps, 1):
593
- tool_name = step_def.get("tool_name", "unknown")
594
 
595
  yield {
596
  "type": "step",
@@ -608,14 +641,17 @@ def execute_pipeline_streaming(
608
  step_num=step_num,
609
  total_steps=len(steps),
610
  session_id=session_id,
611
- prefer_bedrock=True
 
612
  )
613
 
614
  executor_used = "bedrock"
615
 
616
  # Create component result object
617
  component_result = {
618
- **step_def,
 
 
619
  "result": result.get("output"),
620
  "status": "completed",
621
  "executor": executor_used,
@@ -641,8 +677,12 @@ def execute_pipeline_streaming(
641
  file_path = _update_file_path(file_path, result)
642
 
643
  except Exception as step_error:
 
 
644
  # Create failed component result
645
  component_result = {
 
 
646
  **step_def,
647
  "result": {"error": str(step_error)},
648
  "status": "failed",
@@ -704,7 +744,7 @@ def execute_pipeline_streaming(
704
 
705
  for step_num in range(start_step, len(steps) + 1):
706
  step_def = steps[step_num - 1]
707
- tool_name = step_def.get("tool_name", "unknown")
708
 
709
  yield {
710
  "type": "step",
@@ -722,13 +762,16 @@ def execute_pipeline_streaming(
722
  step_num=step_num,
723
  total_steps=len(steps),
724
  session_id=session_id,
725
- prefer_bedrock=False
 
726
  )
727
 
728
  executor_used = "crewai"
729
 
730
  # Create component result object
731
  component_result = {
 
 
732
  **step_def,
733
  "result": result.get("output"),
734
  "status": "completed",
@@ -759,8 +802,12 @@ def execute_pipeline_streaming(
759
  file_path = _update_file_path(file_path, result)
760
 
761
  except Exception as step_error:
 
 
762
  # Create failed component result
763
  component_result = {
 
 
764
  **step_def,
765
  "result": {"error": str(step_error)},
766
  "status": "failed",
@@ -820,22 +867,30 @@ def _execute_step_with_master_tool(
820
  step_num: int,
821
  total_steps: int,
822
  session_id: Optional[str] = None,
823
- prefer_bedrock: bool = True
 
824
  ) -> Dict[str, Any]:
825
  """
826
  Execute a pipeline step using master_tools.
 
827
  """
828
  import time
829
  import inspect
830
 
831
- tool_name = step_def.get("tool_name", "unknown")
 
832
  start_time = time.time()
833
 
 
 
 
834
  # Get tool from registry
835
  tool = get_tool_executor(tool_name)
836
 
837
  if not tool:
838
- raise ValueError(f"Tool '{tool_name}' not found in registry. Available tools: {list(TOOL_REGISTRY.keys())}")
 
 
839
 
840
  # Prepare arguments
841
  args = {}
@@ -857,18 +912,25 @@ def _execute_step_with_master_tool(
857
  elif field_name == "session_id" and session_id:
858
  args[field_name] = session_id
859
  # Handle text parameter if not provided but we have previous output
860
- elif field_name == "text" and field_name not in step_def and step_num > 1:
861
  # Try to get text from previous step's output
862
- if components_executed and len(components_executed) >= step_num - 1:
863
- prev_result = components_executed[step_num - 2].get("result")
864
  if isinstance(prev_result, dict) and "text" in prev_result:
865
  args["text"] = prev_result["text"]
 
 
 
 
866
 
867
  try:
868
  # Execute the tool
 
869
  output = tool.invoke(args)
870
  execution_time = time.time() - start_time
871
 
 
 
872
  return {
873
  "output": output,
874
  "executor": "bedrock" if prefer_bedrock else "crewai",
@@ -918,17 +980,22 @@ def _execute_step_with_master_tool(
918
  elif param_name == "session_id" and session_id:
919
  call_args[param_name] = session_id
920
  # Handle text parameter
921
- elif param_name == "text" and param_name not in step_def and step_num > 1:
922
  # Try to get text from previous step
923
- if components_executed and len(components_executed) >= step_num - 1:
924
- prev_result = components_executed[step_num - 2].get("result")
925
  if isinstance(prev_result, dict) and "text" in prev_result:
926
  call_args["text"] = prev_result["text"]
 
 
927
 
928
  # Execute the function
 
929
  output = tool(**call_args)
930
  execution_time = time.time() - start_time
931
 
 
 
932
  return {
933
  "output": output,
934
  "executor": "bedrock" if prefer_bedrock else "crewai",
@@ -972,14 +1039,21 @@ def _build_final_output(
972
  ) -> Dict[str, Any]:
973
  """
974
  Build final output with components_executed array.
 
975
  """
 
 
 
976
  # Find the finalize step result if present
977
  final_result = None
978
  for component in components_executed:
979
- if component.get("tool_name") == "finalize":
980
  final_result = component.get("result")
981
  break
982
 
 
 
 
983
  final_output = {
984
  "pipeline_id": pipeline.get("pipeline_id"),
985
  "pipeline_name": pipeline.get("pipeline_name"),
@@ -987,8 +1061,8 @@ def _build_final_output(
987
  "components_executed": components_executed,
988
  "executor": executor_used,
989
  "summary": f"Pipeline execution {status} with {executor_used}",
990
- "total_steps": len(pipeline.get("pipeline_steps", [])),
991
- "completed_steps": len([c for c in components_executed if c.get("status") == "completed"]),
992
  "final_result": final_result
993
  }
994
 
@@ -1005,10 +1079,16 @@ def _build_final_output(
1005
  final_output["text"] = result
1006
  break
1007
  elif isinstance(result, dict):
1008
- for field in ["text", "summary", "content", "translation"]:
1009
  if field in result and isinstance(result[field], str):
1010
  final_output["text"] = result[field]
1011
  break
 
 
 
 
 
 
1012
 
1013
  return final_output
1014
 
 
536
  # UNIFIED EXECUTOR WITH FALLBACK (UPDATED)
537
  # ========================
538
 
539
+ # Update the execute_pipeline_streaming function:
540
+
541
  def execute_pipeline_streaming(
542
  pipeline: Dict[str, Any],
543
  file_path: str,
 
546
  ) -> Generator[Dict[str, Any], None, None]:
547
  """
548
  Execute pipeline with fallback mechanism using master_tools.
549
+ FIXED: Handle both 'components' and 'pipeline_steps' formats
550
  """
551
  components_executed = []
552
  final_output = None
 
557
  # Initialize pipeline info
558
  pipeline_id = pipeline.get("pipeline_id")
559
  pipeline_name = pipeline.get("pipeline_name", "Unnamed Pipeline")
 
560
 
561
+ # FIX: Get steps from either 'components' or 'pipeline_steps'
562
+ steps = []
563
+ if "pipeline_steps" in pipeline:
564
+ steps = pipeline.get("pipeline_steps", [])
565
+ elif "components" in pipeline:
566
+ steps = pipeline.get("components", [])
567
+ # Also update the pipeline to have both for consistency
568
+ pipeline["pipeline_steps"] = steps
569
+
570
+ if not steps:
571
+ error_msg = f"No steps/components found in pipeline: {pipeline_name}"
572
+ yield {
573
+ "type": "error",
574
+ "error": error_msg,
575
+ "data": {
576
+ "pipeline_id": pipeline_id,
577
+ "pipeline_name": pipeline_name,
578
+ "status": "failed",
579
+ "components_executed": [],
580
+ "error": error_msg
581
+ }
582
+ }
583
+ return
584
 
585
  # Check if tools are available
586
  if not TOOL_REGISTRY:
 
598
  }
599
  return
600
 
601
+ print(f"πŸ† Executing pipeline '{pipeline_name}' with {len(steps)} steps")
602
+ print(f" Steps format: {[s.get('tool_name', s.get('tool', 'unknown')) for s in steps]}")
603
+
604
+ yield {
605
+ "type": "info",
606
+ "message": f"Starting pipeline: {pipeline_name} with {len(steps)} steps",
607
+ "executor": "initializing"
608
+ }
609
+
610
  # Try Bedrock first (priority)
611
  if prefer_bedrock and BEDROCK_AVAILABLE:
612
  try:
 
617
  "executor": "bedrock"
618
  }
619
 
620
+ # Global components_executed list for step-by-step execution
621
+ # NOTE: This needs to be declared as nonlocal or passed to helper functions
622
+ components_executed = [] # Reset for Bedrock execution
623
+
624
  # Execute step by step with Bedrock
625
  for step_num, step_def in enumerate(steps, 1):
626
+ tool_name = step_def.get("tool_name", step_def.get("tool", "unknown"))
627
 
628
  yield {
629
  "type": "step",
 
641
  step_num=step_num,
642
  total_steps=len(steps),
643
  session_id=session_id,
644
+ prefer_bedrock=True,
645
+ previous_results=components_executed # Pass previous results
646
  )
647
 
648
  executor_used = "bedrock"
649
 
650
  # Create component result object
651
  component_result = {
652
+ "tool_name": tool_name,
653
+ "tool": tool_name, # For compatibility
654
+ **step_def, # Include all step definition fields
655
  "result": result.get("output"),
656
  "status": "completed",
657
  "executor": executor_used,
 
677
  file_path = _update_file_path(file_path, result)
678
 
679
  except Exception as step_error:
680
+ print(f"❌ Step {step_num} failed with Bedrock: {str(step_error)}")
681
+
682
  # Create failed component result
683
  component_result = {
684
+ "tool_name": tool_name,
685
+ "tool": tool_name,
686
  **step_def,
687
  "result": {"error": str(step_error)},
688
  "status": "failed",
 
744
 
745
  for step_num in range(start_step, len(steps) + 1):
746
  step_def = steps[step_num - 1]
747
+ tool_name = step_def.get("tool_name", step_def.get("tool", "unknown"))
748
 
749
  yield {
750
  "type": "step",
 
762
  step_num=step_num,
763
  total_steps=len(steps),
764
  session_id=session_id,
765
+ prefer_bedrock=False,
766
+ previous_results=components_executed
767
  )
768
 
769
  executor_used = "crewai"
770
 
771
  # Create component result object
772
  component_result = {
773
+ "tool_name": tool_name,
774
+ "tool": tool_name,
775
  **step_def,
776
  "result": result.get("output"),
777
  "status": "completed",
 
802
  file_path = _update_file_path(file_path, result)
803
 
804
  except Exception as step_error:
805
+ print(f"❌ Step {step_num} failed with CrewAI: {str(step_error)}")
806
+
807
  # Create failed component result
808
  component_result = {
809
+ "tool_name": tool_name,
810
+ "tool": tool_name,
811
  **step_def,
812
  "result": {"error": str(step_error)},
813
  "status": "failed",
 
867
  step_num: int,
868
  total_steps: int,
869
  session_id: Optional[str] = None,
870
+ prefer_bedrock: bool = True,
871
+ previous_results: List[Dict[str, Any]] = None
872
  ) -> Dict[str, Any]:
873
  """
874
  Execute a pipeline step using master_tools.
875
+ FIXED: Handle step_def with either 'tool_name' or 'tool' field
876
  """
877
  import time
878
  import inspect
879
 
880
+ # FIX: Get tool name from either 'tool_name' or 'tool' field
881
+ tool_name = step_def.get("tool_name", step_def.get("tool", "unknown"))
882
  start_time = time.time()
883
 
884
+ print(f" πŸ”¨ Executing step {step_num}/{total_steps}: {tool_name}")
885
+ print(f" Step definition: {step_def}")
886
+
887
  # Get tool from registry
888
  tool = get_tool_executor(tool_name)
889
 
890
  if not tool:
891
+ error_msg = f"Tool '{tool_name}' not found in registry. Available tools: {list(TOOL_REGISTRY.keys())}"
892
+ print(f" ❌ {error_msg}")
893
+ raise ValueError(error_msg)
894
 
895
  # Prepare arguments
896
  args = {}
 
912
  elif field_name == "session_id" and session_id:
913
  args[field_name] = session_id
914
  # Handle text parameter if not provided but we have previous output
915
+ elif field_name == "text" and field_name not in step_def and previous_results:
916
  # Try to get text from previous step's output
917
+ if step_num > 1 and len(previous_results) >= step_num - 1:
918
+ prev_result = previous_results[step_num - 2].get("result")
919
  if isinstance(prev_result, dict) and "text" in prev_result:
920
  args["text"] = prev_result["text"]
921
+ print(f" πŸ“ Using text from previous step: {args['text'][:100]}...")
922
+ elif isinstance(prev_result, str):
923
+ args["text"] = prev_result
924
+ print(f" πŸ“ Using text from previous step: {args['text'][:100]}...")
925
 
926
  try:
927
  # Execute the tool
928
+ print(f" πŸš€ Invoking tool {tool_name} with args: {args}")
929
  output = tool.invoke(args)
930
  execution_time = time.time() - start_time
931
 
932
+ print(f" βœ… Step {step_num} completed in {execution_time:.2f}s")
933
+
934
  return {
935
  "output": output,
936
  "executor": "bedrock" if prefer_bedrock else "crewai",
 
980
  elif param_name == "session_id" and session_id:
981
  call_args[param_name] = session_id
982
  # Handle text parameter
983
+ elif param_name == "text" and param_name not in step_def and previous_results:
984
  # Try to get text from previous step
985
+ if step_num > 1 and len(previous_results) >= step_num - 1:
986
+ prev_result = previous_results[step_num - 2].get("result")
987
  if isinstance(prev_result, dict) and "text" in prev_result:
988
  call_args["text"] = prev_result["text"]
989
+ elif isinstance(prev_result, str):
990
+ call_args["text"] = prev_result
991
 
992
  # Execute the function
993
+ print(f" πŸš€ Calling function {tool_name} with args: {call_args}")
994
  output = tool(**call_args)
995
  execution_time = time.time() - start_time
996
 
997
+ print(f" βœ… Step {step_num} completed in {execution_time:.2f}s")
998
+
999
  return {
1000
  "output": output,
1001
  "executor": "bedrock" if prefer_bedrock else "crewai",
 
1039
  ) -> Dict[str, Any]:
1040
  """
1041
  Build final output with components_executed array.
1042
+ FIXED: Handle both component formats
1043
  """
1044
+ # Get steps count from pipeline
1045
+ steps = pipeline.get("pipeline_steps", pipeline.get("components", []))
1046
+
1047
  # Find the finalize step result if present
1048
  final_result = None
1049
  for component in components_executed:
1050
+ if component.get("tool_name") == "finalize" or component.get("tool") == "finalize":
1051
  final_result = component.get("result")
1052
  break
1053
 
1054
+ # Count completed steps
1055
+ completed_steps = len([c for c in components_executed if c.get("status") == "completed"])
1056
+
1057
  final_output = {
1058
  "pipeline_id": pipeline.get("pipeline_id"),
1059
  "pipeline_name": pipeline.get("pipeline_name"),
 
1061
  "components_executed": components_executed,
1062
  "executor": executor_used,
1063
  "summary": f"Pipeline execution {status} with {executor_used}",
1064
+ "total_steps": len(steps),
1065
+ "completed_steps": completed_steps,
1066
  "final_result": final_result
1067
  }
1068
 
 
1079
  final_output["text"] = result
1080
  break
1081
  elif isinstance(result, dict):
1082
+ for field in ["text", "summary", "content", "translation", "output"]:
1083
  if field in result and isinstance(result[field], str):
1084
  final_output["text"] = result[field]
1085
  break
1086
+ # If no text field found but dict has string values
1087
+ if "text" not in final_output:
1088
+ for key, value in result.items():
1089
+ if isinstance(value, str) and len(value) > 10:
1090
+ final_output["text"] = value
1091
+ break
1092
 
1093
  return final_output
1094