CallMeDaniel Claude Opus 4.6 (1M context) commited on
Commit
923cd86
·
1 Parent(s): a79f922

feat: add memory recall/remember, planning, and collaboration to Flow

Browse files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Files changed (2) hide show
  1. agents/agent_flow.py +56 -4
  2. tests/test_agent_flow.py +87 -0
agents/agent_flow.py CHANGED
@@ -275,6 +275,36 @@ class AgentDispatchFlow(Flow[AgentFlowState]):
275
 
276
  # ── Private helpers ──────────────────────────────────────────────
277
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
278
  def _build_llm(self):
279
  from crewai import LLM
280
  return LLM(model=self.state.model_str, temperature=settings.temperature)
@@ -345,12 +375,18 @@ class AgentDispatchFlow(Flow[AgentFlowState]):
345
  llm=llm,
346
  tools=tools,
347
  verbose=False,
348
- allow_delegation=False,
349
  knowledge_sources=knowledge_sources if knowledge_sources else None,
350
  )
351
 
352
- task_description = (
353
- f"{self.state.context}\n\n"
 
 
 
 
 
 
354
  f"As the {agent_def.role}, respond to the user's latest message. "
355
  f"Keep your response concise (2-4 sentences). "
356
  f"Do NOT repeat anything from the conversation history. "
@@ -415,6 +451,8 @@ class AgentDispatchFlow(Flow[AgentFlowState]):
415
  agents=[p[0] for p in pairs],
416
  tasks=[p[1] for p in pairs],
417
  process=Process.sequential,
 
 
418
  verbose=False,
419
  )
420
  crew_result = crew.kickoff()
@@ -424,6 +462,7 @@ class AgentDispatchFlow(Flow[AgentFlowState]):
424
  raw = str(task_outputs[i]) if i < len(task_outputs) else (str(crew_result) if i == 0 else "")
425
  if raw.strip():
426
  responses.append(AgentResponse.from_agent(agent_id, raw.strip()))
 
427
  return responses
428
 
429
  def _run_single_agent_crew(self, agent_id):
@@ -435,7 +474,14 @@ class AgentDispatchFlow(Flow[AgentFlowState]):
435
  from crewai import Crew, Process
436
  llm = self._build_llm()
437
  crew_agent, task = self._build_crew_agent(agent_id, llm)
438
- crew = Crew(agents=[crew_agent], tasks=[task], process=Process.sequential, verbose=False)
 
 
 
 
 
 
 
439
  crew_result = crew.kickoff()
440
  task_outputs = crew_result.tasks_output if hasattr(crew_result, 'tasks_output') else []
441
  if not task_outputs:
@@ -462,6 +508,9 @@ class AgentDispatchFlow(Flow[AgentFlowState]):
462
  else:
463
  self.state.cad_response = AgentResponse.from_agent("cad", raw_output)
464
 
 
 
 
465
  def _run_cam_step(self):
466
  if "cam" not in self.state.active_agent_ids:
467
  return
@@ -483,3 +532,6 @@ class AgentDispatchFlow(Flow[AgentFlowState]):
483
  )
484
  except (json.JSONDecodeError, ValueError, KeyError):
485
  self.state.cam_response = AgentResponse.from_agent("cam", raw_output)
 
 
 
 
275
 
276
  # ── Private helpers ──────────────────────────────────────────────
277
 
278
+ _memory = None # Set by CrewOrchestrator before kickoff
279
+
280
+ def _recall_for_agent(self, agent_id: str) -> str:
281
+ """Recall relevant memories for this agent, formatted as context."""
282
+ if self._memory is None:
283
+ return ""
284
+ try:
285
+ from config.settings import settings
286
+ matches = self._memory.recall(
287
+ self.state.message,
288
+ scope=f"/agent/{agent_id}",
289
+ limit=settings.memory.recall_limit,
290
+ depth=settings.memory.recall_depth,
291
+ )
292
+ except Exception:
293
+ return ""
294
+ if not matches:
295
+ return ""
296
+ lines = [f"- {m.record.content}" for m in matches]
297
+ return "## Relevant context from prior turns\n" + "\n".join(lines)
298
+
299
+ def _remember_response(self, agent_id: str, content: str):
300
+ """Store an agent's response in its scoped memory."""
301
+ if self._memory is None:
302
+ return
303
+ try:
304
+ self._memory.remember(content, scope=f"/agent/{agent_id}")
305
+ except Exception:
306
+ pass
307
+
308
  def _build_llm(self):
309
  from crewai import LLM
310
  return LLM(model=self.state.model_str, temperature=settings.temperature)
 
375
  llm=llm,
376
  tools=tools,
377
  verbose=False,
378
+ allow_delegation=settings.crew.collaboration and agent_id in ADVISOR_IDS,
379
  knowledge_sources=knowledge_sources if knowledge_sources else None,
380
  )
381
 
382
+ memories = self._recall_for_agent(agent_id)
383
+
384
+ context_parts = [self.state.context]
385
+ if memories:
386
+ context_parts.append(memories)
387
+
388
+ task_description = "\n\n".join(context_parts) + "\n\n"
389
+ task_description += (
390
  f"As the {agent_def.role}, respond to the user's latest message. "
391
  f"Keep your response concise (2-4 sentences). "
392
  f"Do NOT repeat anything from the conversation history. "
 
451
  agents=[p[0] for p in pairs],
452
  tasks=[p[1] for p in pairs],
453
  process=Process.sequential,
454
+ planning=settings.crew.planning,
455
+ planning_llm=self._build_llm(),
456
  verbose=False,
457
  )
458
  crew_result = crew.kickoff()
 
462
  raw = str(task_outputs[i]) if i < len(task_outputs) else (str(crew_result) if i == 0 else "")
463
  if raw.strip():
464
  responses.append(AgentResponse.from_agent(agent_id, raw.strip()))
465
+ self._remember_response(agent_id, raw.strip())
466
  return responses
467
 
468
  def _run_single_agent_crew(self, agent_id):
 
474
  from crewai import Crew, Process
475
  llm = self._build_llm()
476
  crew_agent, task = self._build_crew_agent(agent_id, llm)
477
+ crew = Crew(
478
+ agents=[crew_agent],
479
+ tasks=[task],
480
+ process=Process.sequential,
481
+ planning=settings.crew.planning,
482
+ planning_llm=self._build_llm(),
483
+ verbose=False,
484
+ )
485
  crew_result = crew.kickoff()
486
  task_outputs = crew_result.tasks_output if hasattr(crew_result, 'tasks_output') else []
487
  if not task_outputs:
 
508
  else:
509
  self.state.cad_response = AgentResponse.from_agent("cad", raw_output)
510
 
511
+ if self.state.cad_response is not None:
512
+ self._remember_response("cad", raw_output)
513
+
514
  def _run_cam_step(self):
515
  if "cam" not in self.state.active_agent_ids:
516
  return
 
532
  )
533
  except (json.JSONDecodeError, ValueError, KeyError):
534
  self.state.cam_response = AgentResponse.from_agent("cam", raw_output)
535
+
536
+ if self.state.cam_response is not None:
537
+ self._remember_response("cam", raw_output)
tests/test_agent_flow.py CHANGED
@@ -421,3 +421,90 @@ class TestAgentDispatchFlow:
421
  )
422
  assert len(results) >= 2 # advisor + cad
423
  assert flow.state.cad_code is not None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
421
  )
422
  assert len(results) >= 2 # advisor + cad
423
  assert flow.state.cad_code is not None
424
+
425
+
426
+ class TestMemoryHelpers:
427
+ def test_recall_returns_empty_when_no_memory(self):
428
+ flow = AgentDispatchFlow(initial_state=AgentFlowState(
429
+ message="bracket design",
430
+ model_str="gemini/gemini-2.5-flash",
431
+ ))
432
+ flow._memory = None
433
+ result = flow._recall_for_agent("design")
434
+ assert result == ""
435
+
436
+ def test_recall_formats_matches(self):
437
+ mock_memory = MagicMock()
438
+ mock_match = MagicMock()
439
+ mock_match.record.content = "L-bracket with fillets"
440
+ mock_memory.recall.return_value = [mock_match]
441
+
442
+ flow = AgentDispatchFlow(initial_state=AgentFlowState(
443
+ message="bracket",
444
+ model_str="gemini/gemini-2.5-flash",
445
+ ))
446
+ flow._memory = mock_memory
447
+ result = flow._recall_for_agent("design")
448
+ assert "## Relevant context from prior turns" in result
449
+ assert "L-bracket with fillets" in result
450
+ mock_memory.recall.assert_called_once()
451
+
452
+ def test_recall_returns_empty_when_no_matches(self):
453
+ mock_memory = MagicMock()
454
+ mock_memory.recall.return_value = []
455
+
456
+ flow = AgentDispatchFlow(initial_state=AgentFlowState(
457
+ message="bracket",
458
+ model_str="gemini/gemini-2.5-flash",
459
+ ))
460
+ flow._memory = mock_memory
461
+ result = flow._recall_for_agent("design")
462
+ assert result == ""
463
+
464
+ def test_remember_stores_with_scope(self):
465
+ mock_memory = MagicMock()
466
+
467
+ flow = AgentDispatchFlow(initial_state=AgentFlowState(
468
+ message="test",
469
+ model_str="gemini/gemini-2.5-flash",
470
+ ))
471
+ flow._memory = mock_memory
472
+ flow._remember_response("engineering", "Use 3mm walls in aluminum.")
473
+ mock_memory.remember.assert_called_once_with(
474
+ "Use 3mm walls in aluminum.",
475
+ scope="/agent/engineering",
476
+ )
477
+
478
+ def test_remember_noop_when_no_memory(self):
479
+ flow = AgentDispatchFlow(initial_state=AgentFlowState(
480
+ message="test",
481
+ model_str="gemini/gemini-2.5-flash",
482
+ ))
483
+ flow._memory = None
484
+ flow._remember_response("design", "test") # Should not raise
485
+
486
+
487
+ class TestCollaborationFlag:
488
+ def test_advisors_get_delegation(self):
489
+ flow = AgentDispatchFlow(initial_state=AgentFlowState(
490
+ message="test",
491
+ context="",
492
+ model_str="gemini/gemini-2.5-flash",
493
+ ))
494
+ flow._memory = None
495
+ from crewai import LLM
496
+ llm = LLM(model="gemini/gemini-2.5-flash", temperature=0.2)
497
+ agent, task = flow._build_crew_agent("design", llm)
498
+ assert agent.allow_delegation is True
499
+
500
+ def test_generators_no_delegation(self):
501
+ flow = AgentDispatchFlow(initial_state=AgentFlowState(
502
+ message="test",
503
+ context="",
504
+ model_str="gemini/gemini-2.5-flash",
505
+ ))
506
+ flow._memory = None
507
+ from crewai import LLM
508
+ llm = LLM(model="gemini/gemini-2.5-flash", temperature=0.2)
509
+ agent, task = flow._build_crew_agent("cad", llm)
510
+ assert agent.allow_delegation is False