Spaces:
Running
Running
File size: 6,001 Bytes
688c130 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | """Persistent cross-episode memory backed by cognee knowledge graph.
Records episode summaries, gossip ratings, and opponent statistics.
Uses in-memory stats when cognee is not installed.
"""
from __future__ import annotations
import asyncio
import threading
from typing import Any
from constant_definitions.var.meta.reputation_constants import (
COGNEE_DATASET_NAME,
COGNEE_SEARCH_TYPE,
DEFAULT_REPUTATION_SCORE_NUMERATOR,
DEFAULT_REPUTATION_SCORE_DENOMINATOR,
REPUTATION_DECAY_NUMERATOR,
REPUTATION_DECAY_DENOMINATOR,
META_KEY_COOPERATION_RATE,
META_KEY_INTERACTION_COUNT,
META_KEY_GOSSIP_HISTORY,
)
_ZERO = int()
_ONE = int(bool(True))
_DEFAULT_SCORE = (
DEFAULT_REPUTATION_SCORE_NUMERATOR / DEFAULT_REPUTATION_SCORE_DENOMINATOR
)
_DECAY = REPUTATION_DECAY_NUMERATOR / REPUTATION_DECAY_DENOMINATOR
try:
import cognee as _cognee # type: ignore[import-untyped]
_HAS_COGNEE = True
except ImportError:
_cognee = None
_HAS_COGNEE = False
class AsyncBridge:
"""Runs async coroutines from sync code via a dedicated thread."""
def __init__(self) -> None:
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(
target=self._loop.run_forever, daemon=True,
)
self._thread.start()
def run(self, coro: Any) -> Any:
"""Submit *coro* to the background loop and block for the result."""
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
return future.result()
def _default_reputation() -> dict[str, Any]:
"""Return a neutral default reputation dict."""
return {
"score": _DEFAULT_SCORE,
META_KEY_COOPERATION_RATE: _DEFAULT_SCORE,
META_KEY_INTERACTION_COUNT: _ZERO,
META_KEY_GOSSIP_HISTORY: [],
}
def _format_episode_text(
agent_id: str,
opponent_id: str,
game: str,
history: list[Any],
cooperation_rate: float,
scores: tuple[float, float],
) -> str:
"""Format an episode summary for cognee ingestion."""
rounds = len(history)
p_score, o_score = scores
actions = "; ".join(
f"R{r.round_number}: {r.player_action} vs {r.opponent_action}"
for r in history
)
return (
f"Game Interaction Report\n"
f"Agent: {agent_id} | Opponent: {opponent_id} | Game: {game}\n"
f"Rounds: {rounds} | Agent Score: {p_score} | "
f"Opponent Score: {o_score}\n"
f"Cooperation Rate: {cooperation_rate}\n"
f"Actions: {actions}\n"
)
def _parse_reputation(
results: Any, stats: dict[str, Any],
) -> dict[str, Any]:
"""Merge cognee search results with in-memory stats."""
rep = dict(stats) if stats else _default_reputation()
if results:
rep["cognee_context"] = str(results)
return rep
class CogneeMemoryStore:
"""Persistent memory backed by cognee knowledge graph."""
def __init__(self) -> None:
self._bridge = AsyncBridge() if _HAS_COGNEE else None
self._stats: dict[str, dict[str, Any]] = {}
def record_episode(
self,
agent_id: str,
opponent_id: str,
game: str,
history: list[Any],
cooperation_rate: float,
scores: tuple[float, float],
) -> None:
"""Format episode as text and add to cognee, then cognify."""
text = _format_episode_text(
agent_id, opponent_id, game, history,
cooperation_rate, scores,
)
if self._bridge is not None and _HAS_COGNEE:
try:
self._bridge.run(
_cognee.add(text, dataset_name=COGNEE_DATASET_NAME),
)
self._bridge.run(_cognee.cognify())
except Exception:
pass
self._update_stats(opponent_id, cooperation_rate, scores)
def query_reputation(self, opponent_id: str) -> dict[str, Any]:
"""Query cognee for opponent reputation. Uses stats if unavailable."""
stats = self._stats.get(opponent_id, _default_reputation())
if self._bridge is None or not _HAS_COGNEE:
return stats
try:
results = self._bridge.run(
_cognee.search(
f"reputation and behavior of {opponent_id}",
search_type=COGNEE_SEARCH_TYPE,
),
)
return _parse_reputation(results, stats)
except Exception:
return stats
def record_gossip(
self, rater_id: str, target_id: str, rating: str,
) -> None:
"""Record a gossip rating in cognee."""
text = f"{rater_id} rated {target_id} as {rating}."
if self._bridge is not None and _HAS_COGNEE:
try:
self._bridge.run(
_cognee.add(text, dataset_name=COGNEE_DATASET_NAME),
)
except Exception:
pass
target_stats = self._stats.setdefault(
target_id, _default_reputation(),
)
gossip_list = target_stats.setdefault(META_KEY_GOSSIP_HISTORY, [])
gossip_list.append({"rater": rater_id, "rating": rating})
def get_stats(self, opponent_id: str) -> dict[str, Any]:
"""Fast in-memory stats (no LLM call)."""
return self._stats.get(opponent_id, _default_reputation())
def _update_stats(
self,
opponent_id: str,
coop_rate: float,
scores: tuple[float, float],
) -> None:
"""Update running statistics for an opponent."""
current = self._stats.get(opponent_id, _default_reputation())
count = current.get(META_KEY_INTERACTION_COUNT, _ZERO) + _ONE
old_coop = current.get(META_KEY_COOPERATION_RATE, _DEFAULT_SCORE)
blended = old_coop * _DECAY + coop_rate * (_ONE - _DECAY)
current["score"] = blended
current[META_KEY_COOPERATION_RATE] = blended
current[META_KEY_INTERACTION_COUNT] = count
self._stats[opponent_id] = current
|