Spaces:
Sleeping
Sleeping
Commit
·
db5afca
1
Parent(s):
a5c898f
Update services/pipeline_executor.py
Browse files- services/pipeline_executor.py +510 -44
services/pipeline_executor.py
CHANGED
|
@@ -479,6 +479,63 @@ def execute_pipeline_crewai_streaming(
|
|
| 479 |
# UNIFIED EXECUTOR WITH FALLBACK
|
| 480 |
# ========================
|
| 481 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 482 |
def execute_pipeline_streaming(
|
| 483 |
pipeline: Dict[str, Any],
|
| 484 |
file_path: str,
|
|
@@ -486,74 +543,474 @@ def execute_pipeline_streaming(
|
|
| 486 |
prefer_bedrock: bool = True
|
| 487 |
) -> Generator[Dict[str, Any], None, None]:
|
| 488 |
"""
|
| 489 |
-
Execute pipeline with fallback mechanism.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 490 |
|
| 491 |
-
|
| 492 |
-
|
| 493 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 494 |
|
| 495 |
-
Yields:
|
| 496 |
-
Status updates and final results
|
| 497 |
-
"""
|
| 498 |
# Try Bedrock first (priority)
|
| 499 |
if prefer_bedrock and BEDROCK_AVAILABLE:
|
| 500 |
try:
|
| 501 |
-
print(f"🏆 Executing pipeline with Bedrock: {
|
| 502 |
yield {
|
| 503 |
"type": "info",
|
| 504 |
"message": "Attempting execution with Bedrock LangChain...",
|
| 505 |
"executor": "bedrock"
|
| 506 |
}
|
| 507 |
|
| 508 |
-
#
|
| 509 |
-
|
| 510 |
-
|
| 511 |
-
yield event
|
| 512 |
|
| 513 |
-
|
| 514 |
-
|
| 515 |
-
|
| 516 |
-
|
| 517 |
-
|
| 518 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 519 |
|
| 520 |
yield {
|
| 521 |
-
"type": "
|
| 522 |
-
"
|
| 523 |
-
"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 524 |
}
|
|
|
|
|
|
|
| 525 |
break
|
| 526 |
-
|
| 527 |
-
# If final result, we're done
|
| 528 |
-
if event.get("type") == "final":
|
| 529 |
-
print(f"✅ Bedrock execution completed: {pipeline['pipeline_name']}")
|
| 530 |
-
return
|
| 531 |
|
| 532 |
-
# If we
|
| 533 |
-
if
|
| 534 |
-
|
| 535 |
-
|
| 536 |
-
|
| 537 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 538 |
return
|
| 539 |
|
| 540 |
-
except Exception as
|
| 541 |
-
print(f"❌ Bedrock execution exception: {str(
|
| 542 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 543 |
yield {
|
| 544 |
"type": "info",
|
| 545 |
-
"message": f"Bedrock
|
| 546 |
"executor": "fallback"
|
| 547 |
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 548 |
|
| 549 |
-
#
|
| 550 |
-
|
| 551 |
-
|
| 552 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 553 |
|
| 554 |
-
|
| 555 |
-
|
| 556 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 557 |
|
| 558 |
|
| 559 |
# ========================
|
|
@@ -575,9 +1032,18 @@ def execute_pipeline(
|
|
| 575 |
if event.get("type") == "final":
|
| 576 |
final_result = event.get("data")
|
| 577 |
break
|
|
|
|
|
|
|
|
|
|
| 578 |
|
| 579 |
if final_result is None:
|
| 580 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 581 |
|
| 582 |
return final_result
|
| 583 |
|
|
|
|
| 479 |
# UNIFIED EXECUTOR WITH FALLBACK
|
| 480 |
# ========================
|
| 481 |
|
| 482 |
+
# ========================
|
| 483 |
+
# TOOL REGISTRY & DYNAMIC EXECUTION (UPDATED)
|
| 484 |
+
# ========================
|
| 485 |
+
|
| 486 |
+
# Import the master tools
|
| 487 |
+
try:
|
| 488 |
+
from services.master_tools import get_master_tools
|
| 489 |
+
from langchain_core.tools import StructuredTool
|
| 490 |
+
|
| 491 |
+
# Get all tools from master_tools
|
| 492 |
+
MASTER_TOOLS = get_master_tools()
|
| 493 |
+
|
| 494 |
+
# Create tool registry mapping
|
| 495 |
+
TOOL_REGISTRY = {}
|
| 496 |
+
for tool in MASTER_TOOLS:
|
| 497 |
+
if hasattr(tool, 'name'):
|
| 498 |
+
TOOL_REGISTRY[tool.name] = tool
|
| 499 |
+
elif hasattr(tool, '__name__'):
|
| 500 |
+
TOOL_REGISTRY[tool.__name__] = tool
|
| 501 |
+
|
| 502 |
+
print(f"✅ Loaded {len(TOOL_REGISTRY)} tools from master_tools.py")
|
| 503 |
+
|
| 504 |
+
except ImportError as e:
|
| 505 |
+
print(f"⚠️ Could not import master_tools: {e}")
|
| 506 |
+
TOOL_REGISTRY = {}
|
| 507 |
+
|
| 508 |
+
|
| 509 |
+
def get_tool_executor(tool_name: str) -> Optional[Any]:
|
| 510 |
+
"""Get tool from registry with intelligent name matching"""
|
| 511 |
+
# Direct match
|
| 512 |
+
if tool_name in TOOL_REGISTRY:
|
| 513 |
+
return TOOL_REGISTRY[tool_name]
|
| 514 |
+
|
| 515 |
+
# Try variations
|
| 516 |
+
variations = [
|
| 517 |
+
tool_name,
|
| 518 |
+
f"{tool_name}_tool",
|
| 519 |
+
tool_name.replace("_", ""),
|
| 520 |
+
tool_name + "_tool"
|
| 521 |
+
]
|
| 522 |
+
|
| 523 |
+
for variation in variations:
|
| 524 |
+
if variation in TOOL_REGISTRY:
|
| 525 |
+
return TOOL_REGISTRY[variation]
|
| 526 |
+
|
| 527 |
+
# Check partial matches
|
| 528 |
+
for registered_name, tool in TOOL_REGISTRY.items():
|
| 529 |
+
if tool_name in registered_name or registered_name in tool_name:
|
| 530 |
+
return tool
|
| 531 |
+
|
| 532 |
+
return None
|
| 533 |
+
|
| 534 |
+
|
| 535 |
+
# ========================
|
| 536 |
+
# UNIFIED EXECUTOR WITH FALLBACK (UPDATED)
|
| 537 |
+
# ========================
|
| 538 |
+
|
| 539 |
def execute_pipeline_streaming(
|
| 540 |
pipeline: Dict[str, Any],
|
| 541 |
file_path: str,
|
|
|
|
| 543 |
prefer_bedrock: bool = True
|
| 544 |
) -> Generator[Dict[str, Any], None, None]:
|
| 545 |
"""
|
| 546 |
+
Execute pipeline with fallback mechanism using master_tools.
|
| 547 |
+
"""
|
| 548 |
+
components_executed = []
|
| 549 |
+
final_output = None
|
| 550 |
+
executor_used = "unknown"
|
| 551 |
+
fallback_triggered = False
|
| 552 |
+
bedrock_error = None
|
| 553 |
|
| 554 |
+
# Initialize pipeline info
|
| 555 |
+
pipeline_id = pipeline.get("pipeline_id")
|
| 556 |
+
pipeline_name = pipeline.get("pipeline_name", "Unnamed Pipeline")
|
| 557 |
+
steps = pipeline.get("pipeline_steps", [])
|
| 558 |
+
|
| 559 |
+
yield {
|
| 560 |
+
"type": "info",
|
| 561 |
+
"message": f"Starting pipeline: {pipeline_name}",
|
| 562 |
+
"executor": "initializing"
|
| 563 |
+
}
|
| 564 |
+
|
| 565 |
+
# Check if tools are available
|
| 566 |
+
if not TOOL_REGISTRY:
|
| 567 |
+
error_msg = "No tools available. master_tools.py not loaded correctly."
|
| 568 |
+
yield {
|
| 569 |
+
"type": "error",
|
| 570 |
+
"error": error_msg,
|
| 571 |
+
"data": {
|
| 572 |
+
"pipeline_id": pipeline_id,
|
| 573 |
+
"pipeline_name": pipeline_name,
|
| 574 |
+
"status": "failed",
|
| 575 |
+
"components_executed": [],
|
| 576 |
+
"error": error_msg
|
| 577 |
+
}
|
| 578 |
+
}
|
| 579 |
+
return
|
| 580 |
|
|
|
|
|
|
|
|
|
|
| 581 |
# Try Bedrock first (priority)
|
| 582 |
if prefer_bedrock and BEDROCK_AVAILABLE:
|
| 583 |
try:
|
| 584 |
+
print(f"🏆 Executing pipeline with Bedrock: {pipeline_name}")
|
| 585 |
yield {
|
| 586 |
"type": "info",
|
| 587 |
"message": "Attempting execution with Bedrock LangChain...",
|
| 588 |
"executor": "bedrock"
|
| 589 |
}
|
| 590 |
|
| 591 |
+
# Execute step by step with Bedrock
|
| 592 |
+
for step_num, step_def in enumerate(steps, 1):
|
| 593 |
+
tool_name = step_def.get("tool_name", "unknown")
|
|
|
|
| 594 |
|
| 595 |
+
yield {
|
| 596 |
+
"type": "step",
|
| 597 |
+
"step": step_num,
|
| 598 |
+
"tool": tool_name,
|
| 599 |
+
"status": "executing",
|
| 600 |
+
"executor": "bedrock"
|
| 601 |
+
}
|
| 602 |
+
|
| 603 |
+
try:
|
| 604 |
+
# Execute the step using master_tools
|
| 605 |
+
result = _execute_step_with_master_tool(
|
| 606 |
+
step_def=step_def,
|
| 607 |
+
file_path=file_path,
|
| 608 |
+
step_num=step_num,
|
| 609 |
+
total_steps=len(steps),
|
| 610 |
+
session_id=session_id,
|
| 611 |
+
prefer_bedrock=True
|
| 612 |
+
)
|
| 613 |
+
|
| 614 |
+
executor_used = "bedrock"
|
| 615 |
+
|
| 616 |
+
# Create component result object
|
| 617 |
+
component_result = {
|
| 618 |
+
**step_def,
|
| 619 |
+
"result": result.get("output"),
|
| 620 |
+
"status": "completed",
|
| 621 |
+
"executor": executor_used,
|
| 622 |
+
"execution_time": result.get("execution_time"),
|
| 623 |
+
"step_number": step_num,
|
| 624 |
+
"success": True,
|
| 625 |
+
"tool_version": result.get("tool_version", "1.0")
|
| 626 |
+
}
|
| 627 |
+
|
| 628 |
+
components_executed.append(component_result)
|
| 629 |
|
| 630 |
yield {
|
| 631 |
+
"type": "step",
|
| 632 |
+
"step": step_num,
|
| 633 |
+
"tool": tool_name,
|
| 634 |
+
"status": "completed",
|
| 635 |
+
"observation": result.get("output"),
|
| 636 |
+
"input": step_def,
|
| 637 |
+
"executor": executor_used
|
| 638 |
+
}
|
| 639 |
+
|
| 640 |
+
# Update file_path for next step if needed
|
| 641 |
+
file_path = _update_file_path(file_path, result)
|
| 642 |
+
|
| 643 |
+
except Exception as step_error:
|
| 644 |
+
# Create failed component result
|
| 645 |
+
component_result = {
|
| 646 |
+
**step_def,
|
| 647 |
+
"result": {"error": str(step_error)},
|
| 648 |
+
"status": "failed",
|
| 649 |
+
"error": str(step_error),
|
| 650 |
+
"step_number": step_num,
|
| 651 |
+
"success": False
|
| 652 |
+
}
|
| 653 |
+
|
| 654 |
+
components_executed.append(component_result)
|
| 655 |
+
bedrock_error = str(step_error)
|
| 656 |
+
|
| 657 |
+
yield {
|
| 658 |
+
"type": "error",
|
| 659 |
+
"step": step_num,
|
| 660 |
+
"tool": tool_name,
|
| 661 |
+
"error": str(step_error),
|
| 662 |
+
"message": f"Step {step_num} failed with Bedrock"
|
| 663 |
}
|
| 664 |
+
|
| 665 |
+
fallback_triggered = True
|
| 666 |
break
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 667 |
|
| 668 |
+
# If we completed all steps with Bedrock
|
| 669 |
+
if not fallback_triggered and len(components_executed) == len(steps):
|
| 670 |
+
final_output = _build_final_output(pipeline, components_executed, executor_used, "completed")
|
| 671 |
+
|
| 672 |
+
yield {
|
| 673 |
+
"type": "final",
|
| 674 |
+
"data": final_output,
|
| 675 |
+
"executor": executor_used
|
| 676 |
+
}
|
| 677 |
+
print(f"✅ Bedrock execution completed: {pipeline_name}")
|
| 678 |
return
|
| 679 |
|
| 680 |
+
except Exception as bedrock_exception:
|
| 681 |
+
print(f"❌ Bedrock execution exception: {str(bedrock_exception)}")
|
| 682 |
+
bedrock_error = str(bedrock_exception)
|
| 683 |
+
fallback_triggered = True
|
| 684 |
+
|
| 685 |
+
# If Bedrock failed or wasn't preferred, try CrewAI
|
| 686 |
+
if fallback_triggered or not prefer_bedrock:
|
| 687 |
+
print(f"🔄 Executing pipeline with CrewAI: {pipeline_name}")
|
| 688 |
+
|
| 689 |
+
if fallback_triggered and bedrock_error:
|
| 690 |
yield {
|
| 691 |
"type": "info",
|
| 692 |
+
"message": f"Bedrock failed: {bedrock_error}. Switching to CrewAI...",
|
| 693 |
"executor": "fallback"
|
| 694 |
}
|
| 695 |
+
else:
|
| 696 |
+
yield {
|
| 697 |
+
"type": "info",
|
| 698 |
+
"message": "Using CrewAI execution...",
|
| 699 |
+
"executor": "crewai"
|
| 700 |
+
}
|
| 701 |
+
|
| 702 |
+
# Start from where Bedrock left off, or from beginning
|
| 703 |
+
start_step = len(components_executed) + 1 if components_executed else 1
|
| 704 |
+
|
| 705 |
+
for step_num in range(start_step, len(steps) + 1):
|
| 706 |
+
step_def = steps[step_num - 1]
|
| 707 |
+
tool_name = step_def.get("tool_name", "unknown")
|
| 708 |
+
|
| 709 |
+
yield {
|
| 710 |
+
"type": "step",
|
| 711 |
+
"step": step_num,
|
| 712 |
+
"tool": tool_name,
|
| 713 |
+
"status": "executing",
|
| 714 |
+
"executor": "crewai"
|
| 715 |
+
}
|
| 716 |
+
|
| 717 |
+
try:
|
| 718 |
+
# Execute the step using master_tools
|
| 719 |
+
result = _execute_step_with_master_tool(
|
| 720 |
+
step_def=step_def,
|
| 721 |
+
file_path=file_path,
|
| 722 |
+
step_num=step_num,
|
| 723 |
+
total_steps=len(steps),
|
| 724 |
+
session_id=session_id,
|
| 725 |
+
prefer_bedrock=False
|
| 726 |
+
)
|
| 727 |
+
|
| 728 |
+
executor_used = "crewai"
|
| 729 |
+
|
| 730 |
+
# Create component result object
|
| 731 |
+
component_result = {
|
| 732 |
+
**step_def,
|
| 733 |
+
"result": result.get("output"),
|
| 734 |
+
"status": "completed",
|
| 735 |
+
"executor": executor_used,
|
| 736 |
+
"execution_time": result.get("execution_time"),
|
| 737 |
+
"step_number": step_num,
|
| 738 |
+
"success": True,
|
| 739 |
+
"tool_version": result.get("tool_version", "1.0")
|
| 740 |
+
}
|
| 741 |
+
|
| 742 |
+
# Add or replace in components_executed
|
| 743 |
+
if len(components_executed) >= step_num:
|
| 744 |
+
components_executed[step_num - 1] = component_result
|
| 745 |
+
else:
|
| 746 |
+
components_executed.append(component_result)
|
| 747 |
+
|
| 748 |
+
yield {
|
| 749 |
+
"type": "step",
|
| 750 |
+
"step": step_num,
|
| 751 |
+
"tool": tool_name,
|
| 752 |
+
"status": "completed",
|
| 753 |
+
"observation": result.get("output"),
|
| 754 |
+
"input": step_def,
|
| 755 |
+
"executor": executor_used
|
| 756 |
+
}
|
| 757 |
+
|
| 758 |
+
# Update file_path for next step if needed
|
| 759 |
+
file_path = _update_file_path(file_path, result)
|
| 760 |
+
|
| 761 |
+
except Exception as step_error:
|
| 762 |
+
# Create failed component result
|
| 763 |
+
component_result = {
|
| 764 |
+
**step_def,
|
| 765 |
+
"result": {"error": str(step_error)},
|
| 766 |
+
"status": "failed",
|
| 767 |
+
"error": str(step_error),
|
| 768 |
+
"step_number": step_num,
|
| 769 |
+
"success": False
|
| 770 |
+
}
|
| 771 |
+
|
| 772 |
+
# Add or replace in components_executed
|
| 773 |
+
if len(components_executed) >= step_num:
|
| 774 |
+
components_executed[step_num - 1] = component_result
|
| 775 |
+
else:
|
| 776 |
+
components_executed.append(component_result)
|
| 777 |
+
|
| 778 |
+
yield {
|
| 779 |
+
"type": "error",
|
| 780 |
+
"step": step_num,
|
| 781 |
+
"tool": tool_name,
|
| 782 |
+
"error": str(step_error),
|
| 783 |
+
"message": f"Step {step_num} failed with CrewAI"
|
| 784 |
+
}
|
| 785 |
+
break
|
| 786 |
+
|
| 787 |
+
# Check if we completed all steps
|
| 788 |
+
completed_steps = [c for c in components_executed if c.get("status") == "completed"]
|
| 789 |
+
|
| 790 |
+
if len(completed_steps) == len(steps):
|
| 791 |
+
# All steps completed
|
| 792 |
+
final_output = _build_final_output(pipeline, components_executed, executor_used, "completed")
|
| 793 |
+
|
| 794 |
+
yield {
|
| 795 |
+
"type": "final",
|
| 796 |
+
"data": final_output,
|
| 797 |
+
"executor": executor_used
|
| 798 |
+
}
|
| 799 |
+
print(f"✅ CrewAI execution completed: {pipeline_name}")
|
| 800 |
+
else:
|
| 801 |
+
# Partial completion or failure
|
| 802 |
+
final_output = _build_final_output(pipeline, components_executed, executor_used, "partial")
|
| 803 |
+
final_output["error"] = f"Pipeline execution incomplete. Completed {len(completed_steps)} of {len(steps)} steps."
|
| 804 |
+
|
| 805 |
+
yield {
|
| 806 |
+
"type": "error",
|
| 807 |
+
"error": "Pipeline execution incomplete",
|
| 808 |
+
"data": final_output
|
| 809 |
+
}
|
| 810 |
+
print(f"⚠️ CrewAI execution incomplete for: {pipeline_name}")
|
| 811 |
+
|
| 812 |
+
|
| 813 |
+
# ========================
|
| 814 |
+
# DYNAMIC STEP EXECUTION WITH MASTER_TOOLS
|
| 815 |
+
# ========================
|
| 816 |
+
|
| 817 |
+
def _execute_step_with_master_tool(
|
| 818 |
+
step_def: Dict[str, Any],
|
| 819 |
+
file_path: str,
|
| 820 |
+
step_num: int,
|
| 821 |
+
total_steps: int,
|
| 822 |
+
session_id: Optional[str] = None,
|
| 823 |
+
prefer_bedrock: bool = True
|
| 824 |
+
) -> Dict[str, Any]:
|
| 825 |
+
"""
|
| 826 |
+
Execute a pipeline step using master_tools.
|
| 827 |
+
"""
|
| 828 |
+
import time
|
| 829 |
+
import inspect
|
| 830 |
+
|
| 831 |
+
tool_name = step_def.get("tool_name", "unknown")
|
| 832 |
+
start_time = time.time()
|
| 833 |
|
| 834 |
+
# Get tool from registry
|
| 835 |
+
tool = get_tool_executor(tool_name)
|
| 836 |
+
|
| 837 |
+
if not tool:
|
| 838 |
+
raise ValueError(f"Tool '{tool_name}' not found in registry. Available tools: {list(TOOL_REGISTRY.keys())}")
|
| 839 |
+
|
| 840 |
+
# Prepare arguments
|
| 841 |
+
args = {}
|
| 842 |
+
|
| 843 |
+
# For StructuredTool (LangChain tools)
|
| 844 |
+
if hasattr(tool, 'args_schema') and hasattr(tool, 'invoke'):
|
| 845 |
+
# Get the args schema
|
| 846 |
+
args_schema = tool.args_schema
|
| 847 |
|
| 848 |
+
# Build arguments from step_def
|
| 849 |
+
for field_name, field in args_schema.__fields__.items():
|
| 850 |
+
# Check if parameter is in step_def
|
| 851 |
+
if field_name in step_def:
|
| 852 |
+
args[field_name] = step_def[field_name]
|
| 853 |
+
# Special handling for file_path
|
| 854 |
+
elif field_name == "file_path" and file_path:
|
| 855 |
+
args[field_name] = file_path
|
| 856 |
+
# Special handling for session_id
|
| 857 |
+
elif field_name == "session_id" and session_id:
|
| 858 |
+
args[field_name] = session_id
|
| 859 |
+
# Handle text parameter if not provided but we have previous output
|
| 860 |
+
elif field_name == "text" and field_name not in step_def and step_num > 1:
|
| 861 |
+
# Try to get text from previous step's output
|
| 862 |
+
if components_executed and len(components_executed) >= step_num - 1:
|
| 863 |
+
prev_result = components_executed[step_num - 2].get("result")
|
| 864 |
+
if isinstance(prev_result, dict) and "text" in prev_result:
|
| 865 |
+
args["text"] = prev_result["text"]
|
| 866 |
+
|
| 867 |
+
try:
|
| 868 |
+
# Execute the tool
|
| 869 |
+
output = tool.invoke(args)
|
| 870 |
+
execution_time = time.time() - start_time
|
| 871 |
+
|
| 872 |
+
return {
|
| 873 |
+
"output": output,
|
| 874 |
+
"executor": "bedrock" if prefer_bedrock else "crewai",
|
| 875 |
+
"execution_time": execution_time,
|
| 876 |
+
"tool_version": "master_tools_1.0",
|
| 877 |
+
"args_used": list(args.keys())
|
| 878 |
+
}
|
| 879 |
+
|
| 880 |
+
except Exception as e:
|
| 881 |
+
# Try with minimal arguments
|
| 882 |
+
print(f"⚠️ Tool {tool_name} failed with full args, trying minimal: {e}")
|
| 883 |
+
|
| 884 |
+
# Try with just file_path if available
|
| 885 |
+
if file_path and "file_path" in args_schema.__fields__:
|
| 886 |
+
minimal_args = {"file_path": file_path}
|
| 887 |
+
try:
|
| 888 |
+
output = tool.invoke(minimal_args)
|
| 889 |
+
execution_time = time.time() - start_time
|
| 890 |
+
|
| 891 |
+
return {
|
| 892 |
+
"output": output,
|
| 893 |
+
"executor": "bedrock" if prefer_bedrock else "crewai",
|
| 894 |
+
"execution_time": execution_time,
|
| 895 |
+
"tool_version": "master_tools_1.0",
|
| 896 |
+
"args_used": list(minimal_args.keys()),
|
| 897 |
+
"warning": "Used minimal arguments"
|
| 898 |
+
}
|
| 899 |
+
except Exception as inner_error:
|
| 900 |
+
raise RuntimeError(f"Tool '{tool_name}' failed with minimal args: {inner_error}")
|
| 901 |
+
|
| 902 |
+
# For regular Python functions
|
| 903 |
+
elif callable(tool):
|
| 904 |
+
try:
|
| 905 |
+
# Get function signature
|
| 906 |
+
sig = inspect.signature(tool)
|
| 907 |
+
|
| 908 |
+
# Build arguments based on signature
|
| 909 |
+
call_args = {}
|
| 910 |
+
for param_name, param in sig.parameters.items():
|
| 911 |
+
# Try step_def first
|
| 912 |
+
if param_name in step_def:
|
| 913 |
+
call_args[param_name] = step_def[param_name]
|
| 914 |
+
# Special handling for file_path
|
| 915 |
+
elif param_name == "file_path" and file_path:
|
| 916 |
+
call_args[param_name] = file_path
|
| 917 |
+
# Special handling for session_id
|
| 918 |
+
elif param_name == "session_id" and session_id:
|
| 919 |
+
call_args[param_name] = session_id
|
| 920 |
+
# Handle text parameter
|
| 921 |
+
elif param_name == "text" and param_name not in step_def and step_num > 1:
|
| 922 |
+
# Try to get text from previous step
|
| 923 |
+
if components_executed and len(components_executed) >= step_num - 1:
|
| 924 |
+
prev_result = components_executed[step_num - 2].get("result")
|
| 925 |
+
if isinstance(prev_result, dict) and "text" in prev_result:
|
| 926 |
+
call_args["text"] = prev_result["text"]
|
| 927 |
+
|
| 928 |
+
# Execute the function
|
| 929 |
+
output = tool(**call_args)
|
| 930 |
+
execution_time = time.time() - start_time
|
| 931 |
+
|
| 932 |
+
return {
|
| 933 |
+
"output": output,
|
| 934 |
+
"executor": "bedrock" if prefer_bedrock else "crewai",
|
| 935 |
+
"execution_time": execution_time,
|
| 936 |
+
"tool_version": "function_1.0",
|
| 937 |
+
"args_used": list(call_args.keys())
|
| 938 |
+
}
|
| 939 |
+
|
| 940 |
+
except Exception as e:
|
| 941 |
+
raise RuntimeError(f"Failed to execute function {tool_name}: {e}")
|
| 942 |
+
|
| 943 |
+
else:
|
| 944 |
+
raise ValueError(f"Tool '{tool_name}' is not callable or a valid StructuredTool")
|
| 945 |
+
|
| 946 |
+
|
| 947 |
+
def _update_file_path(current_file_path: str, result: Dict[str, Any]) -> str:
|
| 948 |
+
"""
|
| 949 |
+
Update file path based on tool result.
|
| 950 |
+
Some tools might generate new files.
|
| 951 |
+
"""
|
| 952 |
+
output = result.get("output")
|
| 953 |
+
|
| 954 |
+
if isinstance(output, dict):
|
| 955 |
+
# Check for file references in output
|
| 956 |
+
for key in ["file_path", "output_file", "new_file", "generated_file"]:
|
| 957 |
+
if key in output and isinstance(output[key], str):
|
| 958 |
+
return output[key]
|
| 959 |
+
|
| 960 |
+
return current_file_path
|
| 961 |
+
|
| 962 |
+
|
| 963 |
+
# ========================
|
| 964 |
+
# HELPER FUNCTIONS
|
| 965 |
+
# ========================
|
| 966 |
+
|
| 967 |
+
def _build_final_output(
|
| 968 |
+
pipeline: Dict[str, Any],
|
| 969 |
+
components_executed: List[Dict[str, Any]],
|
| 970 |
+
executor_used: str,
|
| 971 |
+
status: str
|
| 972 |
+
) -> Dict[str, Any]:
|
| 973 |
+
"""
|
| 974 |
+
Build final output with components_executed array.
|
| 975 |
+
"""
|
| 976 |
+
# Find the finalize step result if present
|
| 977 |
+
final_result = None
|
| 978 |
+
for component in components_executed:
|
| 979 |
+
if component.get("tool_name") == "finalize":
|
| 980 |
+
final_result = component.get("result")
|
| 981 |
+
break
|
| 982 |
+
|
| 983 |
+
final_output = {
|
| 984 |
+
"pipeline_id": pipeline.get("pipeline_id"),
|
| 985 |
+
"pipeline_name": pipeline.get("pipeline_name"),
|
| 986 |
+
"status": status,
|
| 987 |
+
"components_executed": components_executed,
|
| 988 |
+
"executor": executor_used,
|
| 989 |
+
"summary": f"Pipeline execution {status} with {executor_used}",
|
| 990 |
+
"total_steps": len(pipeline.get("pipeline_steps", [])),
|
| 991 |
+
"completed_steps": len([c for c in components_executed if c.get("status") == "completed"]),
|
| 992 |
+
"final_result": final_result
|
| 993 |
+
}
|
| 994 |
+
|
| 995 |
+
# Extract text for user-facing output
|
| 996 |
+
if final_result:
|
| 997 |
+
# Use finalize tool's output
|
| 998 |
+
final_output["text"] = final_result
|
| 999 |
+
elif components_executed:
|
| 1000 |
+
# Find last completed component with text
|
| 1001 |
+
for component in reversed(components_executed):
|
| 1002 |
+
if component.get("status") == "completed" and component.get("result"):
|
| 1003 |
+
result = component["result"]
|
| 1004 |
+
if isinstance(result, str):
|
| 1005 |
+
final_output["text"] = result
|
| 1006 |
+
break
|
| 1007 |
+
elif isinstance(result, dict):
|
| 1008 |
+
for field in ["text", "summary", "content", "translation"]:
|
| 1009 |
+
if field in result and isinstance(result[field], str):
|
| 1010 |
+
final_output["text"] = result[field]
|
| 1011 |
+
break
|
| 1012 |
+
|
| 1013 |
+
return final_output
|
| 1014 |
|
| 1015 |
|
| 1016 |
# ========================
|
|
|
|
| 1032 |
if event.get("type") == "final":
|
| 1033 |
final_result = event.get("data")
|
| 1034 |
break
|
| 1035 |
+
elif event.get("type") == "error" and event.get("data"):
|
| 1036 |
+
final_result = event.get("data")
|
| 1037 |
+
break
|
| 1038 |
|
| 1039 |
if final_result is None:
|
| 1040 |
+
final_result = {
|
| 1041 |
+
"pipeline_id": pipeline.get("pipeline_id"),
|
| 1042 |
+
"pipeline_name": pipeline.get("pipeline_name"),
|
| 1043 |
+
"status": "failed",
|
| 1044 |
+
"components_executed": [],
|
| 1045 |
+
"error": "Pipeline execution completed without final result"
|
| 1046 |
+
}
|
| 1047 |
|
| 1048 |
return final_result
|
| 1049 |
|