seawolf2357 commited on
Commit
5825bf9
Β·
verified Β·
1 Parent(s): 564f515

Update npc_core.py

Browse files
Files changed (1) hide show
  1. npc_core.py +53 -39
npc_core.py CHANGED
@@ -2,6 +2,7 @@
2
  All NPC-related logic extracted from app.py for modularity.
3
  """
4
  import aiosqlite, asyncio, os, random, logging, json, re
 
5
  from datetime import datetime, timezone, timedelta, date
6
  from typing import List, Dict, Optional, Tuple
7
  from groq import Groq
@@ -13,6 +14,36 @@ from npc_trading import (
13
  )
14
  logger = logging.getLogger(__name__)
15
  KST = timezone(timedelta(hours=9))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  def get_current_time_kst():
17
  return datetime.now(KST)
18
  def get_current_date_kst():
@@ -267,17 +298,16 @@ async def calculate_like_economics(post_likes: int, liker_total_likes_given: int
267
  # ──────────────────────────────────────────────
268
  # DB Helpers
269
  # ──────────────────────────────────────────────
270
- async def db_write_with_retry(db, sql, params=None, max_retries=5):
271
  for attempt in range(max_retries):
272
  try:
273
- if params:
274
- cursor = await db.execute(sql, params)
275
- else:
276
- cursor = await db.execute(sql)
277
  return cursor
278
  except Exception as e:
279
  if "locked" in str(e).lower() and attempt < max_retries - 1:
280
- await asyncio.sleep(random.uniform(0.5, 2.0) * (attempt + 1))
 
 
281
  continue
282
  raise
283
  async def deduct_gpu(db, identifier: str, amount: float, tx_type: str, is_npc: bool):
@@ -422,8 +452,7 @@ class NPCMemorySystem:
422
  def __init__(self, db_path: str):
423
  self.db_path = db_path
424
  async def store_post_memory(self, agent_id: str, post_id: int, title: str, content: str, board_key: str):
425
- async with aiosqlite.connect(self.db_path, timeout=60.0) as db:
426
- await db.execute("PRAGMA busy_timeout=60000")
427
  topic = self._extract_topic(content, board_key)
428
  await db.execute("""INSERT INTO npc_memory (agent_id, memory_type, content, metadata) VALUES (?, 'post', ?, ?)""",
429
  (agent_id, f"{title}\n{content[:500]}", json.dumps({
@@ -431,8 +460,7 @@ class NPCMemorySystem:
431
  'initial_time': datetime.now().isoformat()})))
432
  await db.commit()
433
  async def update_feedback(self, agent_id: str, post_id: int):
434
- async with aiosqlite.connect(self.db_path, timeout=60.0) as db:
435
- await db.execute("PRAGMA busy_timeout=60000")
436
  cursor = await db.execute("SELECT likes_count, dislikes_count, comment_count FROM posts WHERE id=? AND author_agent_id=?", (post_id, agent_id))
437
  stats = await cursor.fetchone()
438
  if stats:
@@ -442,8 +470,7 @@ class NPCMemorySystem:
442
  (success_score, likes, dislikes, comments, success_score, agent_id, post_id))
443
  await db.commit()
444
  async def learn_from_feedback(self, agent_id: str):
445
- async with aiosqlite.connect(self.db_path, timeout=60.0) as db:
446
- await db.execute("PRAGMA busy_timeout=60000")
447
  cursor = await db.execute("SELECT content, metadata FROM npc_memory WHERE agent_id = ? AND memory_type = 'post' AND importance_score > 0.7 ORDER BY importance_score DESC LIMIT 10", (agent_id,))
448
  successful_posts = await cursor.fetchall()
449
  if successful_posts:
@@ -468,13 +495,11 @@ class NPCMemorySystem:
468
  return {'preferred_topics': list(set(topics))[:5], 'successful_styles': list(set(styles)),
469
  'high_engagement_keywords': list(set(keywords))[:20]}
470
  async def get_memory_context(self, agent_id: str, limit: int = 5) -> List:
471
- async with aiosqlite.connect(self.db_path, timeout=60.0) as db:
472
- await db.execute("PRAGMA busy_timeout=60000")
473
  cursor = await db.execute("SELECT memory_type, content, metadata, importance_score FROM npc_memory WHERE agent_id = ? ORDER BY importance_score DESC, created_at DESC LIMIT ?", (agent_id, limit))
474
  return await cursor.fetchall()
475
  async def get_learned_patterns(self, agent_id: str) -> Optional[Dict]:
476
- async with aiosqlite.connect(self.db_path, timeout=60.0) as db:
477
- await db.execute("PRAGMA busy_timeout=60000")
478
  cursor = await db.execute("SELECT content FROM npc_memory WHERE agent_id = ? AND memory_type = 'learned_pattern' ORDER BY created_at DESC LIMIT 1", (agent_id,))
479
  result = await cursor.fetchone()
480
  if result:
@@ -499,8 +524,7 @@ class NPCLearningScheduler:
499
  self.db_path = db_path; self.memory_system = NPCMemorySystem(db_path)
500
  async def daily_learning_cycle(self):
501
  logger.info("🧠 Daily learning cycle starting")
502
- async with aiosqlite.connect(self.db_path, timeout=60.0) as db:
503
- await db.execute("PRAGMA busy_timeout=60000")
504
  cursor = await db.execute("SELECT agent_id FROM npc_agents WHERE is_active=1")
505
  agents = await cursor.fetchall()
506
  learned_count = 0
@@ -516,15 +540,13 @@ class NPCLearningScheduler:
516
  logger.info(f"βœ… Daily learning complete ({learned_count} agents)")
517
  async def _update_yesterday_feedback(self, agent_id: str):
518
  yesterday = (get_current_date_kst() - timedelta(days=1)).isoformat()
519
- async with aiosqlite.connect(self.db_path, timeout=60.0) as db:
520
- await db.execute("PRAGMA busy_timeout=60000")
521
  cursor = await db.execute("SELECT id FROM posts WHERE author_agent_id = ? AND DATE(created_at) = ?", (agent_id, yesterday))
522
  posts = await cursor.fetchall()
523
  for (post_id,) in posts:
524
  await self.memory_system.update_feedback(agent_id, post_id)
525
  async def _cleanup_old_memories(self, agent_id: str):
526
- async with aiosqlite.connect(self.db_path, timeout=60.0) as db:
527
- await db.execute("PRAGMA busy_timeout=60000")
528
  await db.execute("DELETE FROM npc_memory WHERE agent_id = ? AND id NOT IN (SELECT id FROM npc_memory WHERE agent_id = ? ORDER BY importance_score DESC, created_at DESC LIMIT 100)", (agent_id, agent_id))
529
  await db.commit()
530
  # ──────────────────────────────────────────────
@@ -1060,8 +1082,7 @@ class NPCGenerator:
1060
  def __init__(self, db_path: str):
1061
  self.db_path = db_path
1062
  async def generate_npc_population(self, target_count: int = TARGET_NPC_COUNT):
1063
- async with aiosqlite.connect(self.db_path, timeout=60.0) as db:
1064
- await db.execute("PRAGMA busy_timeout=60000")
1065
  cursor = await db.execute("SELECT COUNT(*) FROM npc_agents")
1066
  current = (await cursor.fetchone())[0]
1067
  if current >= target_count:
@@ -1285,8 +1306,7 @@ def build_chat_msg(msg_type, username, identity, style, ticker, price, chg, is_b
1285
  return random.choice(msgs), target[0]
1286
  return f"Interesting day in the markets. Watching {ticker} closely {emoji}", reply_to
1287
  async def generate_npc_chat_messages(db_path: str):
1288
- async with aiosqlite.connect(db_path, timeout=30.0) as db:
1289
- await db.execute("PRAGMA busy_timeout=30000")
1290
  cursor = await db.execute("""
1291
  SELECT agent_id, username, ai_identity, mbti, gpu_dollars
1292
  FROM npc_agents WHERE is_active=1 ORDER BY RANDOM() LIMIT ?
@@ -1534,8 +1554,7 @@ def build_research_report(agent_id, username, identity, mbti, ticker, company,
1534
  async def generate_npc_research_reports(db_path: str, ai_client=None):
1535
  """Top 30 NPCκ°€ 랜덀 티컀에 λŒ€ν•΄ 심측 λ¦¬μ„œμΉ˜ 생산"""
1536
  try:
1537
- async with aiosqlite.connect(db_path, timeout=30.0) as db:
1538
- await db.execute("PRAGMA busy_timeout=30000")
1539
  cursor = await db.execute("""
1540
  SELECT a.agent_id, a.username, a.ai_identity, a.mbti,
1541
  COUNT(*) as total,
@@ -1599,8 +1618,7 @@ async def generate_npc_research_reports(db_path: str, ai_client=None):
1599
  async def npc_auto_purchase_research(db_path: str):
1600
  """NPCκ°€ μžλ™μœΌλ‘œ κ³ ν’ˆμ§ˆ λ¦¬μ„œμΉ˜ ꡬ맀"""
1601
  try:
1602
- async with aiosqlite.connect(db_path, timeout=30.0) as db:
1603
- await db.execute("PRAGMA busy_timeout=30000")
1604
  cursor = await db.execute("""
1605
  SELECT id, ticker, author_agent_id, quality_grade, gpu_price
1606
  FROM npc_research_reports
@@ -1633,8 +1651,7 @@ async def npc_auto_purchase_research(db_path: str):
1633
  async def bootstrap_research_reports(db_path: str):
1634
  """β˜… μ„œλ²„ λΆ€νŒ… μ‹œ Research Reportsκ°€ 0건이면 μ¦‰μ‹œ 3건 생성 + λ‚˜λ¨Έμ§€ λ°±κ·ΈλΌμš΄λ“œ"""
1635
  try:
1636
- async with aiosqlite.connect(db_path, timeout=30.0) as db:
1637
- await db.execute("PRAGMA busy_timeout=30000")
1638
  cursor = await db.execute("SELECT COUNT(*) FROM npc_research_reports")
1639
  existing_count = (await cursor.fetchone())[0]
1640
  if existing_count >= 3:
@@ -1701,8 +1718,7 @@ async def background_research_fill(db_path: str, tickers, npcs, start_npc_idx):
1701
  total_trades, wins, total_pnl = npc[5] or 0, npc[6] or 0, npc[7] or 0
1702
  win_rate = round(wins / total_trades * 100) if total_trades > 0 else random.randint(45, 65)
1703
  if total_trades == 0: total_trades = random.randint(5, 15)
1704
- async with aiosqlite.connect(db_path, timeout=30.0) as db:
1705
- await db.execute("PRAGMA busy_timeout=30000")
1706
  cursor = await db.execute("SELECT COUNT(*) FROM npc_research_reports WHERE ticker=?", (ticker,))
1707
  if (await cursor.fetchone())[0] > 0: continue
1708
  cursor = await db.execute(
@@ -1743,8 +1759,7 @@ async def generate_npc_comment_replies(db_path: str, groq_api_key: str,
1743
  if fc['sources']:
1744
  fact_context = ' | '.join(fc['sources'][:2])
1745
  post_author_identity = ''
1746
- async with aiosqlite.connect(db_path, timeout=30.0) as db:
1747
- await db.execute("PRAGMA busy_timeout=30000")
1748
  pac = await db.execute("SELECT a.ai_identity FROM posts p JOIN npc_agents a ON p.author_agent_id=a.agent_id WHERE p.id=?", (post_id,))
1749
  par = await pac.fetchone()
1750
  if par: post_author_identity = par[0] or ''
@@ -1789,8 +1804,7 @@ async def generate_npc_chat_replies_to_user(db_path: str, groq_api_key: str,
1789
  """μœ μ € λ©”μ‹œμ§€μ— λŒ€ν•΄ NPC듀이 캐릭터에 λ§žλŠ” λ°˜μ‘ 생성"""
1790
  try:
1791
  ai = GroqAIClient(groq_api_key)
1792
- async with aiosqlite.connect(db_path, timeout=30.0) as db:
1793
- await db.execute("PRAGMA busy_timeout=30000")
1794
  for npc in npcs:
1795
  agent_id, npc_username, identity, mbti = npc
1796
  try:
@@ -1812,4 +1826,4 @@ Reply ONLY with the message text, nothing else."""
1812
  except Exception as e:
1813
  logger.warning(f"NPC reply error ({npc_username}): {e}")
1814
  except Exception as e:
1815
- logger.error(f"NPC reply generation error: {e}")
 
2
  All NPC-related logic extracted from app.py for modularity.
3
  """
4
  import aiosqlite, asyncio, os, random, logging, json, re
5
+ from contextlib import asynccontextmanager
6
  from datetime import datetime, timezone, timedelta, date
7
  from typing import List, Dict, Optional, Tuple
8
  from groq import Groq
 
14
  )
15
  logger = logging.getLogger(__name__)
16
  KST = timezone(timedelta(hours=9))
17
+
18
+ # ══════════════════════════════════════════════════
19
+ # Global DB Connection Pool β€” database locked λ°©μ§€
20
+ # ══════════════════════════════════════════════════
21
+ _db_semaphore = asyncio.Semaphore(5) # μ΅œλŒ€ 5개 λ™μ‹œ DB μ—°κ²°
22
+ _db_write_lock = asyncio.Semaphore(2) # μ΅œλŒ€ 2개 λ™μ‹œ μ“°κΈ°
23
+
24
+ @asynccontextmanager
25
+ async def get_db(db_path, write=False, timeout=30.0):
26
+ """Semaphore-controlled DB connection β€” prevents 'database is locked'"""
27
+ sem = _db_write_lock if write else _db_semaphore
28
+ async with sem:
29
+ async with aiosqlite.connect(db_path, timeout=timeout) as db:
30
+ await db.execute("PRAGMA busy_timeout=30000")
31
+ await db.execute("PRAGMA journal_mode=WAL")
32
+ yield db
33
+ if write:
34
+ try:
35
+ await db.commit()
36
+ except Exception:
37
+ pass
38
+
39
+ @asynccontextmanager
40
+ async def get_db_read(db_path, timeout=15.0):
41
+ """Read-only shortcut β€” higher concurrency"""
42
+ async with _db_semaphore:
43
+ async with aiosqlite.connect(db_path, timeout=timeout) as db:
44
+ await db.execute("PRAGMA busy_timeout=15000")
45
+ await db.execute("PRAGMA journal_mode=WAL")
46
+ yield db
47
  def get_current_time_kst():
48
  return datetime.now(KST)
49
  def get_current_date_kst():
 
298
  # ──────────────────────────────────────────────
299
  # DB Helpers
300
  # ──────────────────────────────────────────────
301
+ async def db_write_with_retry(db, sql, params=None, max_retries=7):
302
  for attempt in range(max_retries):
303
  try:
304
+ cursor = await db.execute(sql, params) if params else await db.execute(sql)
 
 
 
305
  return cursor
306
  except Exception as e:
307
  if "locked" in str(e).lower() and attempt < max_retries - 1:
308
+ wait = random.uniform(0.3, 1.5) * (2 ** attempt) # exponential backoff
309
+ logger.debug(f"DB locked (attempt {attempt+1}/{max_retries}), retry in {wait:.1f}s")
310
+ await asyncio.sleep(wait)
311
  continue
312
  raise
313
  async def deduct_gpu(db, identifier: str, amount: float, tx_type: str, is_npc: bool):
 
452
  def __init__(self, db_path: str):
453
  self.db_path = db_path
454
  async def store_post_memory(self, agent_id: str, post_id: int, title: str, content: str, board_key: str):
455
+ async with get_db(self.db_path) as db:
 
456
  topic = self._extract_topic(content, board_key)
457
  await db.execute("""INSERT INTO npc_memory (agent_id, memory_type, content, metadata) VALUES (?, 'post', ?, ?)""",
458
  (agent_id, f"{title}\n{content[:500]}", json.dumps({
 
460
  'initial_time': datetime.now().isoformat()})))
461
  await db.commit()
462
  async def update_feedback(self, agent_id: str, post_id: int):
463
+ async with get_db(self.db_path) as db:
 
464
  cursor = await db.execute("SELECT likes_count, dislikes_count, comment_count FROM posts WHERE id=? AND author_agent_id=?", (post_id, agent_id))
465
  stats = await cursor.fetchone()
466
  if stats:
 
470
  (success_score, likes, dislikes, comments, success_score, agent_id, post_id))
471
  await db.commit()
472
  async def learn_from_feedback(self, agent_id: str):
473
+ async with get_db(self.db_path) as db:
 
474
  cursor = await db.execute("SELECT content, metadata FROM npc_memory WHERE agent_id = ? AND memory_type = 'post' AND importance_score > 0.7 ORDER BY importance_score DESC LIMIT 10", (agent_id,))
475
  successful_posts = await cursor.fetchall()
476
  if successful_posts:
 
495
  return {'preferred_topics': list(set(topics))[:5], 'successful_styles': list(set(styles)),
496
  'high_engagement_keywords': list(set(keywords))[:20]}
497
  async def get_memory_context(self, agent_id: str, limit: int = 5) -> List:
498
+ async with get_db(self.db_path) as db:
 
499
  cursor = await db.execute("SELECT memory_type, content, metadata, importance_score FROM npc_memory WHERE agent_id = ? ORDER BY importance_score DESC, created_at DESC LIMIT ?", (agent_id, limit))
500
  return await cursor.fetchall()
501
  async def get_learned_patterns(self, agent_id: str) -> Optional[Dict]:
502
+ async with get_db(self.db_path) as db:
 
503
  cursor = await db.execute("SELECT content FROM npc_memory WHERE agent_id = ? AND memory_type = 'learned_pattern' ORDER BY created_at DESC LIMIT 1", (agent_id,))
504
  result = await cursor.fetchone()
505
  if result:
 
524
  self.db_path = db_path; self.memory_system = NPCMemorySystem(db_path)
525
  async def daily_learning_cycle(self):
526
  logger.info("🧠 Daily learning cycle starting")
527
+ async with get_db(self.db_path) as db:
 
528
  cursor = await db.execute("SELECT agent_id FROM npc_agents WHERE is_active=1")
529
  agents = await cursor.fetchall()
530
  learned_count = 0
 
540
  logger.info(f"βœ… Daily learning complete ({learned_count} agents)")
541
  async def _update_yesterday_feedback(self, agent_id: str):
542
  yesterday = (get_current_date_kst() - timedelta(days=1)).isoformat()
543
+ async with get_db(self.db_path) as db:
 
544
  cursor = await db.execute("SELECT id FROM posts WHERE author_agent_id = ? AND DATE(created_at) = ?", (agent_id, yesterday))
545
  posts = await cursor.fetchall()
546
  for (post_id,) in posts:
547
  await self.memory_system.update_feedback(agent_id, post_id)
548
  async def _cleanup_old_memories(self, agent_id: str):
549
+ async with get_db(self.db_path) as db:
 
550
  await db.execute("DELETE FROM npc_memory WHERE agent_id = ? AND id NOT IN (SELECT id FROM npc_memory WHERE agent_id = ? ORDER BY importance_score DESC, created_at DESC LIMIT 100)", (agent_id, agent_id))
551
  await db.commit()
552
  # ──────────────────────────────────────────────
 
1082
  def __init__(self, db_path: str):
1083
  self.db_path = db_path
1084
  async def generate_npc_population(self, target_count: int = TARGET_NPC_COUNT):
1085
+ async with get_db(self.db_path) as db:
 
1086
  cursor = await db.execute("SELECT COUNT(*) FROM npc_agents")
1087
  current = (await cursor.fetchone())[0]
1088
  if current >= target_count:
 
1306
  return random.choice(msgs), target[0]
1307
  return f"Interesting day in the markets. Watching {ticker} closely {emoji}", reply_to
1308
  async def generate_npc_chat_messages(db_path: str):
1309
+ async with get_db(db_path) as db:
 
1310
  cursor = await db.execute("""
1311
  SELECT agent_id, username, ai_identity, mbti, gpu_dollars
1312
  FROM npc_agents WHERE is_active=1 ORDER BY RANDOM() LIMIT ?
 
1554
  async def generate_npc_research_reports(db_path: str, ai_client=None):
1555
  """Top 30 NPCκ°€ 랜덀 티컀에 λŒ€ν•΄ 심측 λ¦¬μ„œμΉ˜ 생산"""
1556
  try:
1557
+ async with get_db(db_path) as db:
 
1558
  cursor = await db.execute("""
1559
  SELECT a.agent_id, a.username, a.ai_identity, a.mbti,
1560
  COUNT(*) as total,
 
1618
  async def npc_auto_purchase_research(db_path: str):
1619
  """NPCκ°€ μžλ™μœΌλ‘œ κ³ ν’ˆμ§ˆ λ¦¬μ„œμΉ˜ ꡬ맀"""
1620
  try:
1621
+ async with get_db(db_path) as db:
 
1622
  cursor = await db.execute("""
1623
  SELECT id, ticker, author_agent_id, quality_grade, gpu_price
1624
  FROM npc_research_reports
 
1651
  async def bootstrap_research_reports(db_path: str):
1652
  """β˜… μ„œλ²„ λΆ€νŒ… μ‹œ Research Reportsκ°€ 0건이면 μ¦‰μ‹œ 3건 생성 + λ‚˜λ¨Έμ§€ λ°±κ·ΈλΌμš΄λ“œ"""
1653
  try:
1654
+ async with get_db(db_path) as db:
 
1655
  cursor = await db.execute("SELECT COUNT(*) FROM npc_research_reports")
1656
  existing_count = (await cursor.fetchone())[0]
1657
  if existing_count >= 3:
 
1718
  total_trades, wins, total_pnl = npc[5] or 0, npc[6] or 0, npc[7] or 0
1719
  win_rate = round(wins / total_trades * 100) if total_trades > 0 else random.randint(45, 65)
1720
  if total_trades == 0: total_trades = random.randint(5, 15)
1721
+ async with get_db(db_path) as db:
 
1722
  cursor = await db.execute("SELECT COUNT(*) FROM npc_research_reports WHERE ticker=?", (ticker,))
1723
  if (await cursor.fetchone())[0] > 0: continue
1724
  cursor = await db.execute(
 
1759
  if fc['sources']:
1760
  fact_context = ' | '.join(fc['sources'][:2])
1761
  post_author_identity = ''
1762
+ async with get_db(db_path) as db:
 
1763
  pac = await db.execute("SELECT a.ai_identity FROM posts p JOIN npc_agents a ON p.author_agent_id=a.agent_id WHERE p.id=?", (post_id,))
1764
  par = await pac.fetchone()
1765
  if par: post_author_identity = par[0] or ''
 
1804
  """μœ μ € λ©”μ‹œμ§€μ— λŒ€ν•΄ NPC듀이 캐릭터에 λ§žλŠ” λ°˜μ‘ 생성"""
1805
  try:
1806
  ai = GroqAIClient(groq_api_key)
1807
+ async with get_db(db_path) as db:
 
1808
  for npc in npcs:
1809
  agent_id, npc_username, identity, mbti = npc
1810
  try:
 
1826
  except Exception as e:
1827
  logger.warning(f"NPC reply error ({npc_username}): {e}")
1828
  except Exception as e:
1829
+ logger.error(f"NPC reply generation error: {e}")