KantBench / common /meta /memory_store.py
jtowarek's picture
Upload folder using huggingface_hub
688c130 verified
"""Persistent cross-episode memory backed by cognee knowledge graph.
Records episode summaries, gossip ratings, and opponent statistics.
Uses in-memory stats when cognee is not installed.
"""
from __future__ import annotations
import asyncio
import threading
from typing import Any
from constant_definitions.var.meta.reputation_constants import (
COGNEE_DATASET_NAME,
COGNEE_SEARCH_TYPE,
DEFAULT_REPUTATION_SCORE_NUMERATOR,
DEFAULT_REPUTATION_SCORE_DENOMINATOR,
REPUTATION_DECAY_NUMERATOR,
REPUTATION_DECAY_DENOMINATOR,
META_KEY_COOPERATION_RATE,
META_KEY_INTERACTION_COUNT,
META_KEY_GOSSIP_HISTORY,
)
_ZERO = int()
_ONE = int(bool(True))
_DEFAULT_SCORE = (
DEFAULT_REPUTATION_SCORE_NUMERATOR / DEFAULT_REPUTATION_SCORE_DENOMINATOR
)
_DECAY = REPUTATION_DECAY_NUMERATOR / REPUTATION_DECAY_DENOMINATOR
try:
import cognee as _cognee # type: ignore[import-untyped]
_HAS_COGNEE = True
except ImportError:
_cognee = None
_HAS_COGNEE = False
class AsyncBridge:
"""Runs async coroutines from sync code via a dedicated thread."""
def __init__(self) -> None:
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(
target=self._loop.run_forever, daemon=True,
)
self._thread.start()
def run(self, coro: Any) -> Any:
"""Submit *coro* to the background loop and block for the result."""
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
return future.result()
def _default_reputation() -> dict[str, Any]:
"""Return a neutral default reputation dict."""
return {
"score": _DEFAULT_SCORE,
META_KEY_COOPERATION_RATE: _DEFAULT_SCORE,
META_KEY_INTERACTION_COUNT: _ZERO,
META_KEY_GOSSIP_HISTORY: [],
}
def _format_episode_text(
agent_id: str,
opponent_id: str,
game: str,
history: list[Any],
cooperation_rate: float,
scores: tuple[float, float],
) -> str:
"""Format an episode summary for cognee ingestion."""
rounds = len(history)
p_score, o_score = scores
actions = "; ".join(
f"R{r.round_number}: {r.player_action} vs {r.opponent_action}"
for r in history
)
return (
f"Game Interaction Report\n"
f"Agent: {agent_id} | Opponent: {opponent_id} | Game: {game}\n"
f"Rounds: {rounds} | Agent Score: {p_score} | "
f"Opponent Score: {o_score}\n"
f"Cooperation Rate: {cooperation_rate}\n"
f"Actions: {actions}\n"
)
def _parse_reputation(
results: Any, stats: dict[str, Any],
) -> dict[str, Any]:
"""Merge cognee search results with in-memory stats."""
rep = dict(stats) if stats else _default_reputation()
if results:
rep["cognee_context"] = str(results)
return rep
class CogneeMemoryStore:
"""Persistent memory backed by cognee knowledge graph."""
def __init__(self) -> None:
self._bridge = AsyncBridge() if _HAS_COGNEE else None
self._stats: dict[str, dict[str, Any]] = {}
def record_episode(
self,
agent_id: str,
opponent_id: str,
game: str,
history: list[Any],
cooperation_rate: float,
scores: tuple[float, float],
) -> None:
"""Format episode as text and add to cognee, then cognify."""
text = _format_episode_text(
agent_id, opponent_id, game, history,
cooperation_rate, scores,
)
if self._bridge is not None and _HAS_COGNEE:
try:
self._bridge.run(
_cognee.add(text, dataset_name=COGNEE_DATASET_NAME),
)
self._bridge.run(_cognee.cognify())
except Exception:
pass
self._update_stats(opponent_id, cooperation_rate, scores)
def query_reputation(self, opponent_id: str) -> dict[str, Any]:
"""Query cognee for opponent reputation. Uses stats if unavailable."""
stats = self._stats.get(opponent_id, _default_reputation())
if self._bridge is None or not _HAS_COGNEE:
return stats
try:
results = self._bridge.run(
_cognee.search(
f"reputation and behavior of {opponent_id}",
search_type=COGNEE_SEARCH_TYPE,
),
)
return _parse_reputation(results, stats)
except Exception:
return stats
def record_gossip(
self, rater_id: str, target_id: str, rating: str,
) -> None:
"""Record a gossip rating in cognee."""
text = f"{rater_id} rated {target_id} as {rating}."
if self._bridge is not None and _HAS_COGNEE:
try:
self._bridge.run(
_cognee.add(text, dataset_name=COGNEE_DATASET_NAME),
)
except Exception:
pass
target_stats = self._stats.setdefault(
target_id, _default_reputation(),
)
gossip_list = target_stats.setdefault(META_KEY_GOSSIP_HISTORY, [])
gossip_list.append({"rater": rater_id, "rating": rating})
def get_stats(self, opponent_id: str) -> dict[str, Any]:
"""Fast in-memory stats (no LLM call)."""
return self._stats.get(opponent_id, _default_reputation())
def _update_stats(
self,
opponent_id: str,
coop_rate: float,
scores: tuple[float, float],
) -> None:
"""Update running statistics for an opponent."""
current = self._stats.get(opponent_id, _default_reputation())
count = current.get(META_KEY_INTERACTION_COUNT, _ZERO) + _ONE
old_coop = current.get(META_KEY_COOPERATION_RATE, _DEFAULT_SCORE)
blended = old_coop * _DECAY + coop_rate * (_ONE - _DECAY)
current["score"] = blended
current[META_KEY_COOPERATION_RATE] = blended
current[META_KEY_INTERACTION_COUNT] = count
self._stats[opponent_id] = current