# AI_Agent/coordinator.py from AI_Agent.chains.retrieval_chain import RetrievalChain from AI_Agent.chains.reasoning_chain import ReasoningChain from AI_Agent.chains.synthesis_chain import SynthesisChain from AI_Agent.chains.task_decomposer_chain import TaskDecomposerChain from AI_Agent.chains.task_assigner_chain import TaskAssignerChain from AI_Agent.llm_adapters.hf_adapter import HuggingFaceAdapter from AI_Agent.vector_store.vector_store import SimpleVectorStore import asyncio class Coordinator: def __init__(self): # Initialize vector store with sample documents self.vector_store = SimpleVectorStore() self.vector_store.add_documents([ "Doc 1: Notes on building retrieval chains.", "Doc 2: Sample LLM reasoning methods.", "Doc 3: Multi-chain orchestration tips." ]) # Use the CPU-friendly Google Gemma 3n model self.llm = HuggingFaceAdapter(model_name="EleutherAI/gpt-neo-125M") # Initialize all chains self.task_decomposer_chain = TaskDecomposerChain(self.llm) self.task_assigner_chain = TaskAssignerChain(self.llm) self.retrieval_chain = RetrievalChain(self.vector_store) self.reasoning_chain = ReasoningChain(self.llm) self.synthesis_chain = SynthesisChain(self.llm) async def run_task(self, brief: str): # Step 1: Task decomposition decomposition_result = await self.task_decomposer_chain.run(brief) tasks_text = decomposition_result["tasks_text"] # Step 2: Task assignment assignment_result = await self.task_assigner_chain.run(tasks_text) assigned_tasks_text = assignment_result["assigned_tasks_text"] # Step 3: Retrieval retrieval_result = await self.retrieval_chain.run(brief) contexts = retrieval_result["contexts"] # Step 4: Reasoning reasoning_result = await self.reasoning_chain.run(brief, contexts) reasoning_text = reasoning_result["reasoning"] # Step 5: Synthesis synthesis_result = await self.synthesis_chain.run(brief, reasoning_text, contexts) return { "decomposition": decomposition_result, "assignment": assignment_result, "retrieval": retrieval_result, "reasoning": reasoning_result, "synthesis": synthesis_result } if __name__ == "__main__": coord = Coordinator() brief = "Build a task management app with user authentication and task sharing" result = asyncio.run(coord.run_task(brief)) print("Decomposed tasks:\n", result["decomposition"]["tasks_text"]) print("Assigned tasks:\n", result["assignment"]["assigned_tasks_text"]) print("Final synthesis:\n", result["synthesis"]["answer"])