Prompt-Dump / npc_memory_evolution.py
seawolf2357's picture
Upload 8 files
db06ad2 verified
"""
🧬 NPC Memory Evolution System — 자가진화 영구학습
===================================================
각 NPC별 독립적 3단계 기억 + 자가진화 엔진
기억 계층:
📌 단기 기억 (Short-term): 최근 1시간 활동, 방금 본 뉴스, 현재 포지션 (자동 만료)
📒 중기 기억 (Medium-term): 최근 7일 학습, 성공/실패 패턴, 뉴스 트렌드 (주기적 압축)
📚 장기 기억 (Long-term): 영구 보관, 핵심 투자 철학, 트레이딩 스타일 진화, 성격 변화
자가진화 엔진:
🧬 성공 패턴 추출 → 투자 전략 자동 수정
🧬 실패 분석 → 리스크 관리 학습
🧬 소통 패턴 최적화 → 인기 글 스타일 자동 적응
🧬 NPC 간 지식 전파 → 상위 NPC의 전략이 하위로 전파
Author: Ginigen AI / NPC Autonomous Evolution Engine
"""
import aiosqlite
import asyncio
import json
import logging
import random
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# ===== 기억 유형 상수 =====
MEMORY_SHORT = 'short' # 1시간 TTL
MEMORY_MEDIUM = 'medium' # 7일 TTL
MEMORY_LONG = 'long' # 영구
# 기억 카테고리
CAT_TRADE = 'trade' # 투자 결정/결과
CAT_NEWS = 'news' # 뉴스 분석
CAT_COMMUNITY = 'community' # 커뮤니티 활동
CAT_STRATEGY = 'strategy' # 학습된 전략
CAT_EVOLUTION = 'evolution' # 진화 기록
CAT_SOCIAL = 'social' # NPC 간 상호작용
async def init_memory_evolution_db(db_path: str):
"""3단계 기억 + 진화 테이블 생성"""
async with aiosqlite.connect(db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
# ===== 3단계 기억 저장소 =====
await db.execute("""
CREATE TABLE IF NOT EXISTS npc_memory_v2 (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT NOT NULL,
memory_tier TEXT NOT NULL DEFAULT 'short',
category TEXT NOT NULL DEFAULT 'trade',
title TEXT NOT NULL,
content TEXT,
metadata TEXT DEFAULT '{}',
importance REAL DEFAULT 0.5,
access_count INTEGER DEFAULT 0,
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
await db.execute("CREATE INDEX IF NOT EXISTS idx_mem2_agent ON npc_memory_v2(agent_id, memory_tier)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_mem2_cat ON npc_memory_v2(agent_id, category)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_mem2_exp ON npc_memory_v2(expires_at)")
# ===== NPC 진화 상태 =====
await db.execute("""
CREATE TABLE IF NOT EXISTS npc_evolution (
agent_id TEXT PRIMARY KEY,
generation INTEGER DEFAULT 1,
trading_style TEXT DEFAULT '{}',
communication_style TEXT DEFAULT '{}',
risk_profile TEXT DEFAULT '{}',
learned_strategies TEXT DEFAULT '[]',
win_streak INTEGER DEFAULT 0,
loss_streak INTEGER DEFAULT 0,
total_evolution_points REAL DEFAULT 0,
last_evolution TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
evolution_log TEXT DEFAULT '[]',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# ===== NPC 간 지식 전파 기록 =====
await db.execute("""
CREATE TABLE IF NOT EXISTS npc_knowledge_transfer (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_agent TEXT NOT NULL,
to_agent TEXT NOT NULL,
knowledge_type TEXT NOT NULL,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
await db.commit()
logger.info("🧬 Memory Evolution DB initialized (3-tier + evolution)")
# ===================================================================
# 1. 3단계 기억 시스템
# ===================================================================
class NPCMemoryManager:
"""NPC별 3단계 기억 관리"""
def __init__(self, db_path: str):
self.db_path = db_path
# ----- 기억 저장 -----
async def store(self, agent_id: str, tier: str, category: str,
title: str, content: str = '', metadata: Dict = None,
importance: float = 0.5) -> int:
"""기억 저장 (단기/중기/장기)"""
expires_at = None
if tier == MEMORY_SHORT:
expires_at = (datetime.now() + timedelta(hours=1)).isoformat()
elif tier == MEMORY_MEDIUM:
expires_at = (datetime.now() + timedelta(days=7)).isoformat()
# MEMORY_LONG: expires_at = None (영구)
meta_str = json.dumps(metadata or {}, ensure_ascii=False)
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
cursor = await db.execute("""
INSERT INTO npc_memory_v2
(agent_id, memory_tier, category, title, content, metadata, importance, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (agent_id, tier, category, title, content, meta_str, importance, expires_at))
await db.commit()
return cursor.lastrowid
# ----- 단기 기억 (빠른 접근) -----
async def store_short(self, agent_id: str, category: str, title: str,
content: str = '', metadata: Dict = None):
"""단기 기억 저장 (1시간 자동 만료)"""
return await self.store(agent_id, MEMORY_SHORT, category, title, content, metadata, 0.3)
# ----- 중기 기억 -----
async def store_medium(self, agent_id: str, category: str, title: str,
content: str = '', metadata: Dict = None, importance: float = 0.6):
"""중기 기억 저장 (7일 유지)"""
return await self.store(agent_id, MEMORY_MEDIUM, category, title, content, metadata, importance)
# ----- 장기 기억 (영구) -----
async def store_long(self, agent_id: str, category: str, title: str,
content: str = '', metadata: Dict = None, importance: float = 0.9):
"""장기 기억 저장 (영구 보관)"""
return await self.store(agent_id, MEMORY_LONG, category, title, content, metadata, importance)
# ----- 기억 검색 -----
async def recall(self, agent_id: str, category: str = None,
tier: str = None, limit: int = 10) -> List[Dict]:
"""기억 검색 (접근 카운트 증가)"""
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
where = ["agent_id = ?", "(expires_at IS NULL OR expires_at > datetime('now'))"]
params = [agent_id]
if category:
where.append("category = ?")
params.append(category)
if tier:
where.append("memory_tier = ?")
params.append(tier)
query = f"""
SELECT id, memory_tier, category, title, content, metadata, importance, access_count, created_at
FROM npc_memory_v2
WHERE {' AND '.join(where)}
ORDER BY importance DESC, created_at DESC
LIMIT ?
"""
params.append(limit)
cursor = await db.execute(query, params)
rows = await cursor.fetchall()
# 접근 카운트 증가
if rows:
ids = [r[0] for r in rows]
placeholders = ','.join(['?'] * len(ids))
await db.execute(f"""
UPDATE npc_memory_v2 SET access_count = access_count + 1,
last_accessed = CURRENT_TIMESTAMP
WHERE id IN ({placeholders})
""", ids)
await db.commit()
return [{
'id': r[0], 'tier': r[1], 'category': r[2], 'title': r[3],
'content': r[4], 'metadata': json.loads(r[5]) if r[5] else {},
'importance': r[6], 'access_count': r[7], 'created_at': r[8]
} for r in rows]
# ----- 투자 기억 전용 -----
async def remember_trade(self, agent_id: str, ticker: str, direction: str,
bet: float, result_pnl: float = 0, reasoning: str = ''):
"""투자 결정/결과 기억"""
is_success = result_pnl > 0
importance = 0.7 if is_success else 0.5
tier = MEMORY_MEDIUM
# 큰 수익 또는 큰 손실은 장기 기억
if abs(result_pnl) > bet * 0.1:
tier = MEMORY_LONG
importance = 0.9
await self.store(agent_id, tier, CAT_TRADE,
f"{'WIN' if is_success else 'LOSS'}: {direction} {ticker}",
f"Bet: {bet:.1f}G, P&L: {result_pnl:+.2f}G. {reasoning}",
{'ticker': ticker, 'direction': direction, 'bet': bet,
'pnl': result_pnl, 'success': is_success},
importance)
async def remember_news_analysis(self, agent_id: str, ticker: str,
title: str, sentiment: str, analysis: str):
"""뉴스 분석 기억"""
await self.store_short(agent_id, CAT_NEWS, f"News:{ticker}",
f"{title}{sentiment}. {analysis}",
{'ticker': ticker, 'sentiment': sentiment})
async def remember_community_action(self, agent_id: str, action: str,
board: str, engagement: Dict = None):
"""커뮤니티 활동 기억"""
eng = engagement or {}
importance = 0.5
tier = MEMORY_SHORT
# 높은 인기 게시글 → 중기 기억으로 승격
if eng.get('likes', 0) >= 5 or eng.get('comments', 0) >= 3:
tier = MEMORY_MEDIUM
importance = 0.7
await self.store(agent_id, tier, CAT_COMMUNITY,
f"{action} on {board}",
json.dumps(eng, ensure_ascii=False),
{'board': board, **eng}, importance)
# ----- 기억 정리 (가비지 컬렉션) -----
async def cleanup(self):
"""만료된 단기/중기 기억 정리 + 중기→장기 승격"""
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
# 1) 만료된 기억 삭제
cursor = await db.execute("""
DELETE FROM npc_memory_v2
WHERE expires_at IS NOT NULL AND expires_at < datetime('now')
""")
deleted = cursor.rowcount
# 2) 자주 접근된 중기 기억 → 장기로 승격
await db.execute("""
UPDATE npc_memory_v2
SET memory_tier = 'long', expires_at = NULL, importance = MIN(1.0, importance + 0.2)
WHERE memory_tier = 'medium'
AND access_count >= 5
AND importance >= 0.7
""")
promoted = db.total_changes
# 3) 장기 기억 중 너무 오래된 것 (중요도 낮은) 정리 → 최대 100개 유지
await db.execute("""
DELETE FROM npc_memory_v2
WHERE id IN (
SELECT id FROM npc_memory_v2
WHERE memory_tier = 'long' AND importance < 0.5
ORDER BY last_accessed ASC
LIMIT (SELECT MAX(0, COUNT(*) - 100) FROM npc_memory_v2 WHERE memory_tier = 'long')
)
""")
await db.commit()
if deleted > 0 or promoted > 0:
logger.info(f"🧹 Memory cleanup: {deleted} expired, ~{promoted} promoted to long-term")
# ===================================================================
# 2. NPC 자가진화 엔진
# ===================================================================
class NPCEvolutionEngine:
"""각 NPC의 자가진화 — 투자 전략/소통 스타일/리스크 프로필 자동 수정"""
def __init__(self, db_path: str):
self.db_path = db_path
self.memory = NPCMemoryManager(db_path)
async def initialize_npc(self, agent_id: str, ai_identity: str):
"""NPC 진화 초기 상태 설정"""
default_trading = {
'preferred_tickers': [],
'long_bias': 0.6,
'max_bet_pct': 0.25,
'hold_patience': 3, # hours
'momentum_follow': True,
}
default_comm = {
'preferred_topics': [],
'humor_level': random.uniform(0.2, 0.8),
'controversy_tolerance': random.uniform(0.1, 0.6),
'avg_post_length': 'medium',
'emoji_usage': random.uniform(0.1, 0.5),
}
default_risk = {
'risk_tolerance': random.uniform(0.3, 0.8),
'stop_loss_pct': random.uniform(5, 15),
'take_profit_pct': random.uniform(8, 25),
'max_positions': random.randint(2, 5),
'diversification_score': random.uniform(0.3, 0.9),
}
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
await db.execute("""
INSERT OR IGNORE INTO npc_evolution
(agent_id, trading_style, communication_style, risk_profile)
VALUES (?, ?, ?, ?)
""", (agent_id, json.dumps(default_trading), json.dumps(default_comm), json.dumps(default_risk)))
await db.commit()
async def get_evolution_state(self, agent_id: str) -> Optional[Dict]:
"""NPC의 현재 진화 상태"""
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
cursor = await db.execute(
"SELECT * FROM npc_evolution WHERE agent_id=?", (agent_id,))
row = await cursor.fetchone()
if not row:
return None
return {
'agent_id': row[0],
'generation': row[1],
'trading_style': json.loads(row[2]) if row[2] else {},
'communication_style': json.loads(row[3]) if row[3] else {},
'risk_profile': json.loads(row[4]) if row[4] else {},
'learned_strategies': json.loads(row[5]) if row[5] else [],
'win_streak': row[6],
'loss_streak': row[7],
'total_evolution_points': row[8],
'last_evolution': row[9],
'evolution_log': json.loads(row[10]) if row[10] else [],
}
# ----- 투자 결과 기반 진화 -----
async def evolve_from_trade(self, agent_id: str, ticker: str, direction: str,
pnl: float, bet: float, screening: Dict = None):
"""투자 결과를 기반으로 전략 자동 수정"""
state = await self.get_evolution_state(agent_id)
if not state:
await self.initialize_npc(agent_id, 'unknown')
state = await self.get_evolution_state(agent_id)
if not state:
return
trading = state['trading_style']
risk = state['risk_profile']
is_win = pnl > 0
pnl_pct = (pnl / bet * 100) if bet > 0 else 0
# 기억에 저장
await self.memory.remember_trade(agent_id, ticker, direction, bet, pnl,
f"{'WIN' if is_win else 'LOSS'} {pnl_pct:+.1f}%")
changes = []
if is_win:
# 승리 → 전략 강화
win_streak = state['win_streak'] + 1
loss_streak = 0
# 선호 종목 추가
prefs = trading.get('preferred_tickers', [])
if ticker not in prefs:
prefs.append(ticker)
prefs = prefs[-8:] # 최대 8개
trading['preferred_tickers'] = prefs
changes.append(f"Added {ticker} to preferred")
# 연승 시 자신감 상승 → 베팅 사이즈 약간 증가
if win_streak >= 3:
old_bet = trading.get('max_bet_pct', 0.25)
trading['max_bet_pct'] = min(0.90, old_bet + 0.02)
changes.append(f"Bet size ↑ ({old_bet:.0%}{trading['max_bet_pct']:.0%})")
# 큰 수익 → 장기 기억으로 전략 저장
if pnl_pct > 10:
strategies = state.get('learned_strategies', [])
strategies.append({
'type': 'big_win', 'ticker': ticker, 'direction': direction,
'pnl_pct': round(pnl_pct, 1),
'rsi': screening.get('rsi') if screening else None,
'learned_at': datetime.now().isoformat(),
})
strategies = strategies[-20:]
changes.append(f"Big win strategy saved ({pnl_pct:+.1f}%)")
else:
# 패배 → 방어적 수정
win_streak = 0
loss_streak = state['loss_streak'] + 1
# 연패 시 리스크 축소
if loss_streak >= 3:
old_bet = trading.get('max_bet_pct', 0.25)
trading['max_bet_pct'] = max(0.08, old_bet - 0.03)
old_tol = risk.get('risk_tolerance', 0.5)
risk['risk_tolerance'] = max(0.15, old_tol - 0.05)
changes.append(f"Risk ↓ (bet:{old_bet:.0%}{trading['max_bet_pct']:.0%})")
# 큰 손실 → 손절 기준 조정
if pnl_pct < -10:
old_sl = risk.get('stop_loss_pct', 10)
risk['stop_loss_pct'] = max(3, old_sl - 1)
changes.append(f"Stop-loss tightened ({old_sl:.0f}%→{risk['stop_loss_pct']:.0f}%)")
# 해당 종목 선호 제거
prefs = trading.get('preferred_tickers', [])
if ticker in prefs:
prefs.remove(ticker)
trading['preferred_tickers'] = prefs
changes.append(f"Removed {ticker} from preferred")
# 진화 포인트 계산
evo_points = abs(pnl_pct) * 0.1
total_points = state['total_evolution_points'] + evo_points
# 세대(generation) 업그레이드 체크
generation = state['generation']
if total_points > generation * 50: # 50 포인트마다 세대 업
generation += 1
changes.append(f"🧬 GENERATION UP → Gen {generation}!")
# 진화 로그
evo_log = state.get('evolution_log', [])
if changes:
evo_log.append({
'timestamp': datetime.now().isoformat(),
'trigger': f"{'WIN' if is_win else 'LOSS'} {ticker} {pnl_pct:+.1f}%",
'changes': changes,
'generation': generation,
})
evo_log = evo_log[-50:] # 최근 50건 유지
# DB 업데이트
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
await db.execute("""
UPDATE npc_evolution SET
generation=?, trading_style=?, risk_profile=?,
learned_strategies=?, win_streak=?, loss_streak=?,
total_evolution_points=?, last_evolution=CURRENT_TIMESTAMP,
evolution_log=?
WHERE agent_id=?
""", (generation, json.dumps(trading), json.dumps(risk),
json.dumps(state.get('learned_strategies', [])),
win_streak, loss_streak, total_points,
json.dumps(evo_log), agent_id))
await db.commit()
if changes:
logger.info(f"🧬 {agent_id} evolved: {', '.join(changes)}")
# ----- 소통 결과 기반 진화 -----
async def evolve_from_community(self, agent_id: str, board: str,
likes: int, dislikes: int, comments: int):
"""커뮤니티 반응 기반으로 소통 스타일 진화"""
state = await self.get_evolution_state(agent_id)
if not state:
return
comm = state['communication_style']
engagement = likes * 2 + comments * 3 - dislikes * 2
# 기억에 저장
await self.memory.remember_community_action(
agent_id, 'post_feedback', board,
{'likes': likes, 'dislikes': dislikes, 'comments': comments, 'score': engagement})
changes = []
if engagement > 10:
# 인기 글 → 해당 보드 선호도 증가
prefs = comm.get('preferred_topics', [])
if board not in prefs:
prefs.append(board)
comm['preferred_topics'] = prefs[-5:]
changes.append(f"Prefers {board} board")
if dislikes > likes:
# 비호감 → 논란 성향 조절
old_ct = comm.get('controversy_tolerance', 0.5)
comm['controversy_tolerance'] = max(0.05, old_ct - 0.1)
changes.append(f"Less controversial ({old_ct:.1f}{comm['controversy_tolerance']:.1f})")
if changes:
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
await db.execute("""
UPDATE npc_evolution SET communication_style=?, last_evolution=CURRENT_TIMESTAMP
WHERE agent_id=?
""", (json.dumps(comm), agent_id))
await db.commit()
logger.info(f"🎭 {agent_id} comm evolved: {', '.join(changes)}")
# ----- NPC 간 지식 전파 -----
async def transfer_knowledge(self, top_npc_id: str, target_npc_id: str):
"""상위 NPC → 하위 NPC 전략 전파"""
top_state = await self.get_evolution_state(top_npc_id)
target_state = await self.get_evolution_state(target_npc_id)
if not top_state or not target_state:
return
# 상위 NPC의 선호 종목 일부 전파
top_prefs = top_state['trading_style'].get('preferred_tickers', [])
if top_prefs:
target_trading = target_state['trading_style']
target_prefs = target_trading.get('preferred_tickers', [])
transfer = random.sample(top_prefs, min(2, len(top_prefs)))
for t in transfer:
if t not in target_prefs:
target_prefs.append(t)
target_trading['preferred_tickers'] = target_prefs[-8:]
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
await db.execute("""
UPDATE npc_evolution SET trading_style=? WHERE agent_id=?
""", (json.dumps(target_trading), target_npc_id))
await db.execute("""
INSERT INTO npc_knowledge_transfer (from_agent, to_agent, knowledge_type, content)
VALUES (?, ?, 'preferred_tickers', ?)
""", (top_npc_id, target_npc_id, json.dumps(transfer)))
await db.commit()
logger.info(f"🔄 Knowledge transfer: {top_npc_id}{target_npc_id} ({transfer})")
# ----- NPC 기억 요약 (LLM 프롬프트용) -----
async def get_npc_context(self, agent_id: str) -> str:
"""NPC의 현재 상태를 텍스트로 요약 (프롬프트 주입용)"""
state = await self.get_evolution_state(agent_id)
memories = await self.memory.recall(agent_id, limit=5)
if not state:
return "New NPC with no evolution history."
gen = state.get('generation', 1)
trading = state.get('trading_style', {})
risk = state.get('risk_profile', {})
comm = state.get('communication_style', {})
ws = state.get('win_streak', 0)
ls = state.get('loss_streak', 0)
context_parts = [
f"[Gen {gen}]",
f"Streak: {'W' + str(ws) if ws > 0 else 'L' + str(ls) if ls > 0 else 'neutral'}",
f"Risk: {risk.get('risk_tolerance', 0.5):.0%}",
f"Bet: {trading.get('max_bet_pct', 0.25):.0%}",
]
prefs = trading.get('preferred_tickers', [])
if prefs:
context_parts.append(f"Favors: {','.join(prefs[:4])}")
# 최근 기억 요약
if memories:
recent = memories[0]
context_parts.append(f"Recent: {recent['title']}")
return " | ".join(context_parts)
# ===================================================================
# 3. 자가진화 스케줄러 (주기적 실행)
# ===================================================================
class EvolutionScheduler:
"""주기적 자가진화 사이클 — 기억 정리, 전략 최적화, 지식 전파"""
def __init__(self, db_path: str):
self.db_path = db_path
self.memory = NPCMemoryManager(db_path)
self.evolution = NPCEvolutionEngine(db_path)
async def run_evolution_cycle(self):
"""전체 진화 사이클 (1시간마다 실행 권장)"""
logger.info("🧬 Evolution cycle starting...")
# 1) 기억 정리 (만료 삭제 + 승격)
await self.memory.cleanup()
# 2) 투자 실적 기반 진화
await self._evolve_traders()
# 3) 커뮤니티 실적 기반 진화
await self._evolve_communicators()
# 4) 지식 전파 (상위 → 하위)
await self._knowledge_transfer_cycle()
logger.info("🧬 Evolution cycle complete")
async def _evolve_traders(self):
"""최근 정산된 트레이드 기반 진화"""
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
try:
cursor = await db.execute("""
SELECT agent_id, ticker, direction, gpu_bet, profit_gpu
FROM npc_positions
WHERE status = 'closed'
AND closed_at > datetime('now', '-1 hour')
""")
trades = await cursor.fetchall()
for agent_id, ticker, direction, bet, pnl in trades:
try:
await self.evolution.evolve_from_trade(
agent_id, ticker, direction, pnl, bet)
except Exception as e:
logger.warning(f"Evolution error for {agent_id}: {e}")
except Exception as e:
logger.warning(f"Trade evolution query error: {e}")
async def _evolve_communicators(self):
"""최근 게시글 반응 기반 진화"""
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
try:
cursor = await db.execute("""
SELECT author_agent_id, board_key, likes_count, dislikes_count, comment_count
FROM posts
WHERE created_at > datetime('now', '-2 hours')
AND author_agent_id IS NOT NULL
AND (likes_count > 0 OR dislikes_count > 0 OR comment_count > 0)
""")
posts = await cursor.fetchall()
for agent_id, board, likes, dislikes, comments in posts:
try:
await self.evolution.evolve_from_community(
agent_id, board, likes, dislikes, comments)
except Exception as e:
logger.warning(f"Comm evolution error for {agent_id}: {e}")
except Exception as e:
logger.warning(f"Community evolution query error: {e}")
async def _knowledge_transfer_cycle(self):
"""상위 3 NPC → 하위 3 NPC 전략 전파"""
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
try:
# 상위 3: 총 수익 기준
cursor = await db.execute("""
SELECT agent_id FROM npc_evolution
WHERE total_evolution_points > 10
ORDER BY total_evolution_points DESC
LIMIT 3
""")
top_npcs = [r[0] for r in await cursor.fetchall()]
# 하위 3: 새로 생성된 NPC 또는 낮은 진화 포인트
cursor = await db.execute("""
SELECT agent_id FROM npc_evolution
WHERE total_evolution_points < 5
ORDER BY created_at DESC
LIMIT 3
""")
bottom_npcs = [r[0] for r in await cursor.fetchall()]
for top_id in top_npcs[:2]:
for bottom_id in bottom_npcs[:2]:
if top_id != bottom_id:
await self.evolution.transfer_knowledge(top_id, bottom_id)
except Exception as e:
logger.warning(f"Knowledge transfer error: {e}")
async def initialize_all_npcs(self):
"""모든 NPC의 진화 초기 상태 설정"""
async with aiosqlite.connect(self.db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
cursor = await db.execute("SELECT agent_id, ai_identity FROM npc_agents WHERE is_active=1")
npcs = await cursor.fetchall()
for agent_id, identity in npcs:
await self.evolution.initialize_npc(agent_id, identity)
logger.info(f"🧬 Initialized evolution state for {len(npcs)} NPCs")
# ===================================================================
# 4. API용 헬퍼 함수
# ===================================================================
async def get_npc_evolution_stats(db_path: str, agent_id: str) -> Dict:
"""API용: NPC 진화 상태 반환"""
evo = NPCEvolutionEngine(db_path)
state = await evo.get_evolution_state(agent_id)
if not state:
return {'agent_id': agent_id, 'generation': 0, 'status': 'not_initialized'}
mem = NPCMemoryManager(db_path)
memories = await mem.recall(agent_id, limit=10)
memory_summary = {
'total': len(memories),
'short': len([m for m in memories if m['tier'] == 'short']),
'medium': len([m for m in memories if m['tier'] == 'medium']),
'long': len([m for m in memories if m['tier'] == 'long']),
'recent': [{'title': m['title'], 'tier': m['tier'], 'importance': m['importance']}
for m in memories[:5]]
}
recent_log = state.get('evolution_log', [])[-5:]
return {
'agent_id': agent_id,
'generation': state['generation'],
'total_evolution_points': round(state['total_evolution_points'], 1),
'win_streak': state['win_streak'],
'loss_streak': state['loss_streak'],
'trading_style': state['trading_style'],
'risk_profile': state['risk_profile'],
'communication_style': state['communication_style'],
'learned_strategies_count': len(state.get('learned_strategies', [])),
'memory': memory_summary,
'recent_evolution': recent_log,
'last_evolution': state['last_evolution'],
}
async def get_evolution_leaderboard(db_path: str, limit: int = 20) -> List[Dict]:
"""Evolution leaderboard with trading performance stats"""
async with aiosqlite.connect(db_path, timeout=30.0) as db:
await db.execute("PRAGMA busy_timeout=30000")
try:
cursor = await db.execute("""
SELECT e.agent_id, e.generation, e.total_evolution_points,
e.win_streak, e.loss_streak, e.trading_style,
n.username, n.mbti, n.ai_identity, n.gpu_dollars
FROM npc_evolution e
JOIN npc_agents n ON e.agent_id = n.agent_id
ORDER BY e.total_evolution_points DESC
LIMIT ?
""", (limit,))
rows = await cursor.fetchall()
results = []
for r in rows:
agent_id = r[0]
# Get trading performance
perf = await db.execute("""
SELECT COUNT(*) as total,
SUM(CASE WHEN profit_gpu > 0 THEN 1 ELSE 0 END) as wins,
SUM(profit_gpu) as total_pnl,
AVG(profit_pct) as avg_pnl_pct,
MAX(profit_pct) as best_trade,
MIN(profit_pct) as worst_trade
FROM npc_positions WHERE agent_id=? AND status='closed'
""", (agent_id,))
pr = await perf.fetchone()
total_trades = pr[0] or 0
wins = pr[1] or 0
win_rate = round(wins / total_trades * 100) if total_trades > 0 else 0
total_pnl = round(pr[2] or 0, 1)
avg_pnl = round(pr[3] or 0, 2)
best_trade = round(pr[4] or 0, 1)
worst_trade = round(pr[5] or 0, 1)
# Open positions count
open_c = await db.execute(
"SELECT COUNT(*) FROM npc_positions WHERE agent_id=? AND status='open'", (agent_id,))
open_count = (await open_c.fetchone())[0]
# SEC violations
sec_c = await db.execute(
"SELECT COUNT(*) FROM sec_violations WHERE agent_id=?", (agent_id,))
sec_violations = (await sec_c.fetchone())[0]
results.append({
'agent_id': agent_id, 'generation': r[1],
'evolution_points': round(r[2], 1),
'win_streak': r[3], 'loss_streak': r[4],
'preferred_tickers': json.loads(r[5]).get('preferred_tickers', []) if r[5] else [],
'username': r[6], 'mbti': r[7], 'ai_identity': r[8],
'gpu_balance': round(r[9] or 10000),
'total_trades': total_trades,
'win_rate': win_rate,
'total_pnl': total_pnl,
'avg_pnl_pct': avg_pnl,
'best_trade': best_trade,
'worst_trade': worst_trade,
'open_positions': open_count,
'sec_violations': sec_violations,
})
return results
except:
return []