Spaces:
Sleeping
Sleeping
| # AI_Agent/coordinator.py | |
| from AI_Agent.chains.retrieval_chain import RetrievalChain | |
| from AI_Agent.chains.reasoning_chain import ReasoningChain | |
| from AI_Agent.chains.synthesis_chain import SynthesisChain | |
| from AI_Agent.chains.task_decomposer_chain import TaskDecomposerChain | |
| from AI_Agent.chains.task_assigner_chain import TaskAssignerChain | |
| from AI_Agent.llm_adapters.hf_adapter import HuggingFaceAdapter | |
| from AI_Agent.vector_store.vector_store import SimpleVectorStore | |
| import asyncio | |
| class Coordinator: | |
| def __init__(self): | |
| # Initialize vector store with sample documents | |
| self.vector_store = SimpleVectorStore() | |
| self.vector_store.add_documents([ | |
| "Doc 1: Notes on building retrieval chains.", | |
| "Doc 2: Sample LLM reasoning methods.", | |
| "Doc 3: Multi-chain orchestration tips." | |
| ]) | |
| # Use the CPU-friendly Google Gemma 3n model | |
| self.llm = HuggingFaceAdapter(model_name="EleutherAI/gpt-neo-125M") | |
| # Initialize all chains | |
| self.task_decomposer_chain = TaskDecomposerChain(self.llm) | |
| self.task_assigner_chain = TaskAssignerChain(self.llm) | |
| self.retrieval_chain = RetrievalChain(self.vector_store) | |
| self.reasoning_chain = ReasoningChain(self.llm) | |
| self.synthesis_chain = SynthesisChain(self.llm) | |
| async def run_task(self, brief: str): | |
| # Step 1: Task decomposition | |
| decomposition_result = await self.task_decomposer_chain.run(brief) | |
| tasks_text = decomposition_result["tasks_text"] | |
| # Step 2: Task assignment | |
| assignment_result = await self.task_assigner_chain.run(tasks_text) | |
| assigned_tasks_text = assignment_result["assigned_tasks_text"] | |
| # Step 3: Retrieval | |
| retrieval_result = await self.retrieval_chain.run(brief) | |
| contexts = retrieval_result["contexts"] | |
| # Step 4: Reasoning | |
| reasoning_result = await self.reasoning_chain.run(brief, contexts) | |
| reasoning_text = reasoning_result["reasoning"] | |
| # Step 5: Synthesis | |
| synthesis_result = await self.synthesis_chain.run(brief, reasoning_text, contexts) | |
| return { | |
| "decomposition": decomposition_result, | |
| "assignment": assignment_result, | |
| "retrieval": retrieval_result, | |
| "reasoning": reasoning_result, | |
| "synthesis": synthesis_result | |
| } | |
| if __name__ == "__main__": | |
| coord = Coordinator() | |
| brief = "Build a task management app with user authentication and task sharing" | |
| result = asyncio.run(coord.run_task(brief)) | |
| print("Decomposed tasks:\n", result["decomposition"]["tasks_text"]) | |
| print("Assigned tasks:\n", result["assignment"]["assigned_tasks_text"]) | |
| print("Final synthesis:\n", result["synthesis"]["answer"]) | |