ai-agent1 / AI_Agent /coordinator.py
curiouscurrent's picture
Update AI_Agent/coordinator.py
33f0ebb verified
# AI_Agent/coordinator.py
from AI_Agent.chains.retrieval_chain import RetrievalChain
from AI_Agent.chains.reasoning_chain import ReasoningChain
from AI_Agent.chains.synthesis_chain import SynthesisChain
from AI_Agent.chains.task_decomposer_chain import TaskDecomposerChain
from AI_Agent.chains.task_assigner_chain import TaskAssignerChain
from AI_Agent.llm_adapters.hf_adapter import HuggingFaceAdapter
from AI_Agent.vector_store.vector_store import SimpleVectorStore
import asyncio
class Coordinator:
def __init__(self):
# Initialize vector store with sample documents
self.vector_store = SimpleVectorStore()
self.vector_store.add_documents([
"Doc 1: Notes on building retrieval chains.",
"Doc 2: Sample LLM reasoning methods.",
"Doc 3: Multi-chain orchestration tips."
])
# Use the CPU-friendly Google Gemma 3n model
self.llm = HuggingFaceAdapter(model_name="EleutherAI/gpt-neo-125M")
# Initialize all chains
self.task_decomposer_chain = TaskDecomposerChain(self.llm)
self.task_assigner_chain = TaskAssignerChain(self.llm)
self.retrieval_chain = RetrievalChain(self.vector_store)
self.reasoning_chain = ReasoningChain(self.llm)
self.synthesis_chain = SynthesisChain(self.llm)
async def run_task(self, brief: str):
# Step 1: Task decomposition
decomposition_result = await self.task_decomposer_chain.run(brief)
tasks_text = decomposition_result["tasks_text"]
# Step 2: Task assignment
assignment_result = await self.task_assigner_chain.run(tasks_text)
assigned_tasks_text = assignment_result["assigned_tasks_text"]
# Step 3: Retrieval
retrieval_result = await self.retrieval_chain.run(brief)
contexts = retrieval_result["contexts"]
# Step 4: Reasoning
reasoning_result = await self.reasoning_chain.run(brief, contexts)
reasoning_text = reasoning_result["reasoning"]
# Step 5: Synthesis
synthesis_result = await self.synthesis_chain.run(brief, reasoning_text, contexts)
return {
"decomposition": decomposition_result,
"assignment": assignment_result,
"retrieval": retrieval_result,
"reasoning": reasoning_result,
"synthesis": synthesis_result
}
if __name__ == "__main__":
coord = Coordinator()
brief = "Build a task management app with user authentication and task sharing"
result = asyncio.run(coord.run_task(brief))
print("Decomposed tasks:\n", result["decomposition"]["tasks_text"])
print("Assigned tasks:\n", result["assignment"]["assigned_tasks_text"])
print("Final synthesis:\n", result["synthesis"]["answer"])