RobotPai / src /agents /crew_workflow.py
atr0p05's picture
Upload 291 files
8a682b5 verified
from typing import Dict, Any, List
import logging
# Placeholder imports – replace with real CrewAI components when available
try:
from crewai import Crew, Agent as CrewAgent, Task as CrewTask # type: ignore
except ImportError:
Crew = None # type: ignore
CrewAgent = object # type: ignore
CrewTask = object # type: ignore
logger = logging.getLogger(__name__)
def run_crew_workflow(query: str, tools: Dict[str, Any]) -> Dict[str, Any]:
"""Lightweight wrapper that executes a 3-agent CrewAI workflow.
1. ResearcherAgent – web & semantic search
2. ExecutionAgent – python / file / media tools
3. SynthesisAgent – synthesise final answer only
Returns a dict with keys: output, intermediate_steps
"""
if Crew is None:
logger.warning("CrewAI not installed – falling back to empty result")
return {"output": "CrewAI not available", "intermediate_steps": []}
# --- Define Crewmates ---
researcher = CrewAgent(
role="ResearcherAgent",
goal="Gather facts and sources needed to answer user questions.",
tools=[tools.get("web_researcher"), tools.get("semantic_search_tool")],
backstory="A diligent internet researcher with quick search skills.",
allow_delegation=False,
)
executor = CrewAgent(
role="ExecutionAgent",
goal="Run code and parse files or media as requested by the researcher.",
tools=[tools.get("python_interpreter"), tools.get("file_reader"), tools.get("audio_transcriber")],
backstory="A Python REPL in agent form, validating all inputs before execution.",
allow_delegation=False,
)
synthesiser = CrewAgent(
role="SynthesisAgent",
goal="Craft the direct, concise final answer using gathered evidence only.",
tools=[],
backstory="A precise writer that outputs only what the user asked for.",
allow_delegation=False,
)
# --- Define tasks ---
t1 = CrewTask(
description=f"Research the answer to: {query}",
agent=researcher,
expected_output="A fact sheet with citations",
)
t2 = CrewTask(
description="If any code execution or parsing is needed, perform it and store results.",
agent=executor,
expected_output="Parsed / computed data ready for final answer",
context=[t1],
)
t3 = CrewTask(
description="Write the final answer in the required minimal format (number, name, etc.).",
agent=synthesiser,
expected_output="Final answer only",
context=[t1, t2],
)
crew = Crew(
agents=[researcher, executor, synthesiser],
tasks=[t1, t2, t3],
verbose=False,
)
try:
result = crew.kickoff()
except Exception as e:
logger.error(f"Crew execution failed: {e}")
return {"output": f"Crew error: {str(e)}", "intermediate_steps": []}
return {
"output": result.get("task_results", {}).get(t3.id, ""),
"intermediate_steps": result.get("task_results", {})
}