Spaces:
Running
Running
| """Persistent cross-episode memory backed by cognee knowledge graph. | |
| Records episode summaries, gossip ratings, and opponent statistics. | |
| Uses in-memory stats when cognee is not installed. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import threading | |
| from typing import Any | |
| from constant_definitions.var.meta.reputation_constants import ( | |
| COGNEE_DATASET_NAME, | |
| COGNEE_SEARCH_TYPE, | |
| DEFAULT_REPUTATION_SCORE_NUMERATOR, | |
| DEFAULT_REPUTATION_SCORE_DENOMINATOR, | |
| REPUTATION_DECAY_NUMERATOR, | |
| REPUTATION_DECAY_DENOMINATOR, | |
| META_KEY_COOPERATION_RATE, | |
| META_KEY_INTERACTION_COUNT, | |
| META_KEY_GOSSIP_HISTORY, | |
| ) | |
| _ZERO = int() | |
| _ONE = int(bool(True)) | |
| _DEFAULT_SCORE = ( | |
| DEFAULT_REPUTATION_SCORE_NUMERATOR / DEFAULT_REPUTATION_SCORE_DENOMINATOR | |
| ) | |
| _DECAY = REPUTATION_DECAY_NUMERATOR / REPUTATION_DECAY_DENOMINATOR | |
| try: | |
| import cognee as _cognee # type: ignore[import-untyped] | |
| _HAS_COGNEE = True | |
| except ImportError: | |
| _cognee = None | |
| _HAS_COGNEE = False | |
| class AsyncBridge: | |
| """Runs async coroutines from sync code via a dedicated thread.""" | |
| def __init__(self) -> None: | |
| self._loop = asyncio.new_event_loop() | |
| self._thread = threading.Thread( | |
| target=self._loop.run_forever, daemon=True, | |
| ) | |
| self._thread.start() | |
| def run(self, coro: Any) -> Any: | |
| """Submit *coro* to the background loop and block for the result.""" | |
| future = asyncio.run_coroutine_threadsafe(coro, self._loop) | |
| return future.result() | |
| def _default_reputation() -> dict[str, Any]: | |
| """Return a neutral default reputation dict.""" | |
| return { | |
| "score": _DEFAULT_SCORE, | |
| META_KEY_COOPERATION_RATE: _DEFAULT_SCORE, | |
| META_KEY_INTERACTION_COUNT: _ZERO, | |
| META_KEY_GOSSIP_HISTORY: [], | |
| } | |
| def _format_episode_text( | |
| agent_id: str, | |
| opponent_id: str, | |
| game: str, | |
| history: list[Any], | |
| cooperation_rate: float, | |
| scores: tuple[float, float], | |
| ) -> str: | |
| """Format an episode summary for cognee ingestion.""" | |
| rounds = len(history) | |
| p_score, o_score = scores | |
| actions = "; ".join( | |
| f"R{r.round_number}: {r.player_action} vs {r.opponent_action}" | |
| for r in history | |
| ) | |
| return ( | |
| f"Game Interaction Report\n" | |
| f"Agent: {agent_id} | Opponent: {opponent_id} | Game: {game}\n" | |
| f"Rounds: {rounds} | Agent Score: {p_score} | " | |
| f"Opponent Score: {o_score}\n" | |
| f"Cooperation Rate: {cooperation_rate}\n" | |
| f"Actions: {actions}\n" | |
| ) | |
| def _parse_reputation( | |
| results: Any, stats: dict[str, Any], | |
| ) -> dict[str, Any]: | |
| """Merge cognee search results with in-memory stats.""" | |
| rep = dict(stats) if stats else _default_reputation() | |
| if results: | |
| rep["cognee_context"] = str(results) | |
| return rep | |
| class CogneeMemoryStore: | |
| """Persistent memory backed by cognee knowledge graph.""" | |
| def __init__(self) -> None: | |
| self._bridge = AsyncBridge() if _HAS_COGNEE else None | |
| self._stats: dict[str, dict[str, Any]] = {} | |
| def record_episode( | |
| self, | |
| agent_id: str, | |
| opponent_id: str, | |
| game: str, | |
| history: list[Any], | |
| cooperation_rate: float, | |
| scores: tuple[float, float], | |
| ) -> None: | |
| """Format episode as text and add to cognee, then cognify.""" | |
| text = _format_episode_text( | |
| agent_id, opponent_id, game, history, | |
| cooperation_rate, scores, | |
| ) | |
| if self._bridge is not None and _HAS_COGNEE: | |
| try: | |
| self._bridge.run( | |
| _cognee.add(text, dataset_name=COGNEE_DATASET_NAME), | |
| ) | |
| self._bridge.run(_cognee.cognify()) | |
| except Exception: | |
| pass | |
| self._update_stats(opponent_id, cooperation_rate, scores) | |
| def query_reputation(self, opponent_id: str) -> dict[str, Any]: | |
| """Query cognee for opponent reputation. Uses stats if unavailable.""" | |
| stats = self._stats.get(opponent_id, _default_reputation()) | |
| if self._bridge is None or not _HAS_COGNEE: | |
| return stats | |
| try: | |
| results = self._bridge.run( | |
| _cognee.search( | |
| f"reputation and behavior of {opponent_id}", | |
| search_type=COGNEE_SEARCH_TYPE, | |
| ), | |
| ) | |
| return _parse_reputation(results, stats) | |
| except Exception: | |
| return stats | |
| def record_gossip( | |
| self, rater_id: str, target_id: str, rating: str, | |
| ) -> None: | |
| """Record a gossip rating in cognee.""" | |
| text = f"{rater_id} rated {target_id} as {rating}." | |
| if self._bridge is not None and _HAS_COGNEE: | |
| try: | |
| self._bridge.run( | |
| _cognee.add(text, dataset_name=COGNEE_DATASET_NAME), | |
| ) | |
| except Exception: | |
| pass | |
| target_stats = self._stats.setdefault( | |
| target_id, _default_reputation(), | |
| ) | |
| gossip_list = target_stats.setdefault(META_KEY_GOSSIP_HISTORY, []) | |
| gossip_list.append({"rater": rater_id, "rating": rating}) | |
| def get_stats(self, opponent_id: str) -> dict[str, Any]: | |
| """Fast in-memory stats (no LLM call).""" | |
| return self._stats.get(opponent_id, _default_reputation()) | |
| def _update_stats( | |
| self, | |
| opponent_id: str, | |
| coop_rate: float, | |
| scores: tuple[float, float], | |
| ) -> None: | |
| """Update running statistics for an opponent.""" | |
| current = self._stats.get(opponent_id, _default_reputation()) | |
| count = current.get(META_KEY_INTERACTION_COUNT, _ZERO) + _ONE | |
| old_coop = current.get(META_KEY_COOPERATION_RATE, _DEFAULT_SCORE) | |
| blended = old_coop * _DECAY + coop_rate * (_ONE - _DECAY) | |
| current["score"] = blended | |
| current[META_KEY_COOPERATION_RATE] = blended | |
| current[META_KEY_INTERACTION_COUNT] = count | |
| self._stats[opponent_id] = current | |