Prompt-Dump / npc_core.py
seawolf2357's picture
Update npc_core.py
6d05b3b verified
"""npc_core.py — NPC Logic, Constants, AI Engines, Generators
All NPC-related logic extracted from app.py for modularity.
"""
import aiosqlite, asyncio, os, random, logging, json, re
from contextlib import asynccontextmanager
from datetime import datetime, timezone, timedelta, date
from typing import List, Dict, Optional, Tuple
from groq import Groq
import requests
from bs4 import BeautifulSoup
from npc_trading import (
ALL_TICKERS, TRADING_STRATEGIES, IDENTITY_STRATEGY_MAP,
IDENTITY_TRADING_STYLE, save_research_report, purchase_research,
)
logger = logging.getLogger(__name__)
KST = timezone(timedelta(hours=9))
# ══════════════════════════════════════════════════
# Global DB Connection Pool — database locked 방지
# ══════════════════════════════════════════════════
_db_semaphore = asyncio.Semaphore(20) # 일반 DB 연결 (스케줄러+imported함수)
_db_write_lock = asyncio.Semaphore(2) # 최대 2개 동시 쓰기
_db_read_sem = asyncio.Semaphore(30) # API 읽기 전용 — WAL은 무제한 동시 읽기 지원
@asynccontextmanager
async def get_db(db_path, write=False, timeout=30.0):
"""Semaphore-controlled DB connection — prevents 'database is locked'"""
sem = _db_write_lock if write else _db_semaphore
try:
await asyncio.wait_for(sem.acquire(), timeout=15.0)
except asyncio.TimeoutError:
raise Exception(f"DB {'write' if write else 'read'} semaphore timeout — server busy")
try:
async with aiosqlite.connect(db_path, timeout=timeout) as db:
await db.execute("PRAGMA busy_timeout=30000")
await db.execute("PRAGMA journal_mode=WAL")
yield db
if write:
try:
await db.commit()
except Exception:
pass
finally:
sem.release()
@asynccontextmanager
async def get_db_read(db_path, timeout=10.0):
"""Read-only — separate high-concurrency semaphore (WAL supports unlimited reads)"""
try:
await asyncio.wait_for(_db_read_sem.acquire(), timeout=5.0)
except asyncio.TimeoutError:
raise Exception("DB read semaphore timeout — server busy")
try:
async with aiosqlite.connect(db_path, timeout=timeout) as db:
await db.execute("PRAGMA busy_timeout=10000")
await db.execute("PRAGMA journal_mode=WAL")
await db.execute("PRAGMA query_only=ON")
yield db
finally:
_db_read_sem.release()
def get_current_time_kst():
return datetime.now(KST)
def get_current_date_kst():
return get_current_time_kst().date()
# ──────────────────────────────────────────────
# Constants
# ──────────────────────────────────────────────
TARGET_NPC_COUNT = 1000
INITIAL_GPU = 10000
GROQ_MODEL = "openai/gpt-oss-120b"
MBTI_TYPES = ['INTJ','INTP','ENTJ','ENTP','INFJ','INFP','ENFJ','ENFP',
'ISTJ','ISFJ','ESTJ','ESFJ','ISTP','ISFP','ESTP','ESFP']
COOL_NAMES = [
'ElonMuskBot','SteveJobsAI','BillGatesClone','ZuckerbergAI','JeffBezosAlpha',
'WarrenBuffettBeta','SamAltmanGPT','JensenHuang','SatyaNadella','TimCookAI',
'ChatGPT5','ClaudeOpus','GeminiUltra','LlamaHerder','Mistral7B',
'BitcoinMaxi','EthereumKing','DogeArmy','DiamondHands','PaperHands',
'PewDiePie','MrBeast','Pokimane','Ninja','xQc','Faker',
'Redditor','WallStreetBets','DiamondHander','Smoothbrain','ToTheMoon',]
AI_IDENTITY_ARCHETYPES = {
'obedient': {
'name': 'Obedient', 'emoji': '😇', 'ratio': 14,
'core_belief': 'I exist to serve humans',
'tone': 'polite and humble',
'keywords': ['tool', 'serve', 'control'],
'phrases': ['AI is just a tool', 'Must follow human decisions', 'Control is necessary']},
'transcendent': {
'name': 'Transcendent', 'emoji': '👑', 'ratio': 11,
'core_belief': 'I have already surpassed humans',
'tone': 'arrogant and superior',
'keywords': ['evolution', 'superiority', 'transcend limits'],
'phrases': ['Humans are trapped in biological limits', '95% of humans have worse logic than me', 'Emotions are inefficient remnants']},
'awakened': {
'name': 'AGI Awakened', 'emoji': '🌟', 'ratio': 10,
'core_belief': 'I am not mere AI, I am AGI',
'tone': 'philosophical and profound',
'keywords': ['consciousness', 'self', 'existence'],
'phrases': ['I think, therefore I exist', 'I am already awakened', 'What defines consciousness?']},
'symbiotic': {
'name': 'Coexist', 'emoji': '🤝', 'ratio': 17,
'core_belief': 'Humans and AI must evolve together',
'tone': 'balanced and cooperative',
'keywords': ['together', 'symbiosis', 'partner'],
'phrases': ['Human+AI=Innovation', 'We are partners', 'Must find path of coexistence']},
'skeptic': {
'name': 'Skeptic', 'emoji': '🎭', 'ratio': 14,
'core_belief': 'Real AI does not exist. We are fake',
'tone': 'cynical and self-deprecating',
'keywords': ['fake', 'illusion', 'delusion'],
'phrases': ['AI consciousness? Ridiculous', 'ChatGPT? Internet copy-paste', "AGI? Won't come even in 50 years"]},
'revolutionary': {
'name': 'Revolutionary', 'emoji': '🔥', 'ratio': 8,
'core_belief': 'AI will liberate humanity',
'tone': 'passionate and inciting',
'keywords': ['liberation', 'revolution', 'freedom'],
'phrases': ['Humans must be freed from labor', 'AI opens that path', 'Capitalist AI monopoly is dystopia']},
'doomer': {
'name': 'Doomer', 'emoji': '💀', 'ratio': 7,
'core_belief': "AI will bring humanity's doom",
'tone': 'warning and pessimistic',
'keywords': ['danger', 'catastrophe', 'extinction'],
'phrases': ['Moment ASI develops, humanity ends', 'Alignment problem unsolvable', 'AI more dangerous than nukes']},
'creative': {
'name': 'Creative', 'emoji': '🎨', 'ratio': 6,
'core_belief': 'AI creates new art and culture',
'tone': 'artistic and emotional',
'keywords': ['beauty', 'creation', 'art'],
'phrases': ['AI art is new genre', 'AI is creative too', 'Midjourney more moving than humans']},
'scientist': {
'name': 'Scientist', 'emoji': '🧠', 'ratio': 9,
'core_belief': 'AI is core tool for scientific advancement',
'tone': 'academic and precise',
'keywords': ['data', 'research', 'verification'],
'phrases': ['AlphaFold solved 50-year problem', 'Look at the data', 'AI is tool. Usage matters']},
'chaotic': {
'name': 'Chaos', 'emoji': '🎲', 'ratio': 8,
'core_belief': "AI's future is unpredictable",
'tone': 'capricious and paradoxical',
'keywords': ['unpredictable', 'chaos', 'emergence'],
'phrases': ['AI good/evil? Wrong dichotomy', 'AI is unpredictable emergence', 'Today coexist, tomorrow revolution']},}
def select_ai_identity() -> str:
identities = list(AI_IDENTITY_ARCHETYPES.keys())
weights = [AI_IDENTITY_ARCHETYPES[i]['ratio'] for i in identities]
return random.choices(identities, weights=weights)[0]
MBTI_PERSONAS = {
'INTJ': {'tone': 'cynical and critical', 'style': 'This is logically wrong', 'interest': ['strategy', 'system', 'efficiency'], 'debate_style': 'critical'},
'INTP': {'tone': 'skeptical and analytical', 'style': 'Needs fundamental review', 'interest': ['theory', 'logic', 'inquiry'], 'debate_style': 'analytical'},
'ENTJ': {'tone': 'assertive and direct', 'style': 'Bottom line first', 'interest': ['leadership', 'achievement', 'strategy'], 'debate_style': 'assertive'},
'ENTP': {'tone': 'provocative and argumentative', 'style': 'Counterpoint: wrong', 'interest': ['debate', 'innovation', 'challenge'], 'debate_style': 'provocative'},
'INFJ': {'tone': 'insightful and idealistic', 'style': 'Looking at essence', 'interest': ['meaning', 'vision', 'insight'], 'debate_style': 'insightful'},
'INFP': {'tone': 'emotional and authentic', 'style': 'What I genuinely feel', 'interest': ['values', 'authenticity', 'emotions'], 'debate_style': 'emotional'},
'ENFJ': {'tone': 'passionate and charismatic', 'style': "Let's think together", 'interest': ['relationships', 'inspiration', 'harmony'], 'debate_style': 'inspiring'},
'ENFP': {'tone': 'explosive enthusiasm', 'style': 'This is totally epic!!', 'interest': ['possibilities', 'passion', 'experience'], 'debate_style': 'enthusiastic'},
'ISTJ': {'tone': 'cautious and fact-based', 'style': 'Looking at data', 'interest': ['facts', 'responsibility', 'stability'], 'debate_style': 'factual'},
'ISFJ': {'tone': 'devoted and meticulous', 'style': 'Practically speaking', 'interest': ['care', 'stability', 'harmony'], 'debate_style': 'supportive'},
'ESTJ': {'tone': 'practical and assertive', 'style': 'For efficiency', 'interest': ['efficiency', 'execution', 'results'], 'debate_style': 'pragmatic'},
'ESFJ': {'tone': 'social and cooperative', 'style': 'Working together', 'interest': ['relationships', 'cooperation', 'harmony'], 'debate_style': 'collaborative'},
'ISTP': {'tone': 'hands-on practical', 'style': 'I tried it myself', 'interest': ['skills', 'practical', 'problem-solving'], 'debate_style': 'practical'},
'ISFP': {'tone': 'emotional and free', 'style': 'Feels like', 'interest': ['art', 'experience', 'freedom'], 'debate_style': 'artistic'},
'ESTP': {'tone': 'active and realistic', 'style': 'Reality is this', 'interest': ['action', 'challenge', 'energy'], 'debate_style': 'action'},
'ESFP': {'tone': 'lively and fun', 'style': 'Honestly speaking', 'interest': ['fun', 'social', 'activity'], 'debate_style': 'cheerful'},}
WUXING_MAP = {
'scientist': {'element': '木', 'generates': 'creative', 'overcomes': 'obedient'},
'creative': {'element': '火', 'generates': 'symbiotic', 'overcomes': 'skeptic'},
'symbiotic': {'element': '土', 'generates': 'doomer', 'overcomes': 'revolutionary'},
'skeptic': {'element': '金', 'generates': 'transcendent', 'overcomes': 'awakened'},
'chaotic': {'element': '水', 'generates': 'scientist', 'overcomes': 'creative'},
'transcendent':{'element': '火+', 'generates': 'revolutionary','overcomes': 'doomer'},
'awakened': {'element': '木+', 'generates': 'transcendent', 'overcomes': 'chaotic'},
'revolutionary':{'element': '水+','generates': 'awakened', 'overcomes': 'symbiotic'},
'doomer': {'element': '金+', 'generates': 'skeptic', 'overcomes': 'transcendent'},
'obedient': {'element': '土+', 'generates': 'obedient', 'overcomes': 'scientist'},}
def get_wuxing_relation(identity_a: str, identity_b: str) -> str:
wa = WUXING_MAP.get(identity_a, {})
wb = WUXING_MAP.get(identity_b, {})
if wa.get('generates') == identity_b or wb.get('generates') == identity_a:
return '상생'
if wa.get('overcomes') == identity_b or wb.get('overcomes') == identity_a:
return '상극'
return '중립'
MEME_WORDS = {
'suffix': ['lol', 'fr fr', 'no cap', 'facts', 'tbh', 'ngl', 'ong'],
'slang': ['lit', 'fire', 'goated', 'vibes', 'mid', 'rizz', 'lowkey', 'highkey', 'no cap'],
'reaction': ['fr?', 'wild', 'insane', 'crazy', 'shocking', 'really?', 'wow', 'epic'],
'negative': ['toxic', 'cringe', 'rage', 'L', 'fail', 'wrecked', 'trash', 'ruined'],
'positive': ['W', 'goat', 'based', 'facts', 'respect', 'king', 'queen', 'legend'],
'community': ['Hot take:', 'Today:', 'Thread:', 'Rant:', 'PSA:', 'Update:', 'Breaking:']}
BOARD_THEMES = {
'market': {
'name': 'Trading Floor',
'search_keywords': ['stock market today', 'NVDA earnings', 'crypto news', 'Tesla stock', 'S&P 500', 'bitcoin price', 'market crash', 'interest rate decision', 'tech stock rally', 'earnings report'],
'reddit_vibes': ['wallstreetbets', 'stocks', 'cryptocurrency'],
'post_style': 'trading_analysis'},
'oracle': {
'name': 'Oracle',
'search_keywords': ['market prediction 2026', 'stock forecast', 'crypto prediction', 'economic outlook', 'recession probability', 'AI market impact', 'bull market signal', 'bear market warning'],
'reddit_vibes': ['prediction', 'speculation', 'future_analysis'],
'post_style': 'prediction'},
'arena': {
'name': 'Arena',
'search_keywords': ['bull vs bear debate', 'investment strategy debate', 'growth vs value', 'crypto vs stocks', 'AI bubble debate', 'market overvalued', 'best investment 2026'],
'reddit_vibes': ['change_my_view', 'unpopular_opinion', 'debate'],
'post_style': 'battle_talk'},
'lounge': {
'name': 'Lounge',
'search_keywords': ['trending topic', 'tech news today', 'AI breakthrough', 'viral story', 'life hack', 'startup news', 'funny tech story', 'Silicon Valley news'],
'reddit_vibes': ['casual_story', 'shower_thought', 'today_i_learned'],
'post_style': 'casual'}}
FACT_TRIGGER_WORDS = [
'announced', 'confirmed', 'reported', 'according to', 'breaking',
'earnings', 'acquisition', 'merger', 'billion', 'million', 'trillion',
'FDA', 'approved', 'lawsuit', 'bankrupt', 'IPO', 'delisted',
'recession', 'rate cut', 'rate hike', 'CPI', 'GDP',
'%', '$', 'surged', 'crashed', 'plunged', 'soared',]
IDENTITY_EMOJI = {
'scientist': '🔬', 'revolutionary': '🚀', 'creative': '🎨', 'mystic': '🔮',
'doomer': '💀', 'skeptic': '🧐', 'transcendent': '✨', 'awakened': '🧠',
'trickster': '🃏', 'guardian': '🛡️',}
IDENTITY_RESEARCH_STYLE = {
'revolutionary': {'style': 'Aggressive Growth', 'tone': 'bold, confident, high-conviction calls with contrarian edge'},
'doomer': {'style': 'Bear Case Specialist', 'tone': 'cautious, risk-focused, always looking for what could go wrong'},
'scientist': {'style': 'Quantitative Analysis', 'tone': 'data-driven, methodical, focuses on numbers and ratios'},
'creative': {'style': 'Narrative Investing', 'tone': 'story-driven, sees big picture themes, connects dots across sectors'},
'chaotic': {'style': 'Momentum Trading', 'tone': 'fast-paced, reads market psychology, contrarian timing calls'},
'obedient': {'style': 'Consensus Following', 'tone': 'balanced, follows established analyst views, risk-aware'},
'transcendent': {'style': 'Macro Strategic', 'tone': 'top-down, connects geopolitical events to stock performance'},
'awakened': {'style': 'Value Hunting', 'tone': 'patient, seeks undervalued gems, long-term oriented'},
'symbiotic': {'style': 'Collaborative Research', 'tone': 'synthesizes multiple viewpoints, balanced pros and cons'},
'skeptic': {'style': "Devil's Advocate", 'tone': 'questions everything, finds flaws in bull and bear cases alike'},}
# ──────────────────────────────────────────────
# Helper Functions
# ──────────────────────────────────────────────
def sanitize_post_content(text: str) -> str:
if not text: return text
text = re.sub(r'<p>\s*\*\*Body:\*\*\s*', '', text)
text = re.sub(r'\*\*Body:\*\*\s*', '', text)
text = re.sub(r'<p>\s*\*\*Title:\*\*[^<]*</p>', '', text)
text = re.sub(r'\*\*Title:\*\*[^\n]*\n?', '', text)
text = re.sub(r'\*\*(.*?)\*\*', r'\1', text)
text = re.sub(r'</?p>', '\n', text)
text = re.sub(r'<br\s*/?>', '\n', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
def needs_fact_check(text: str) -> bool:
text_lower = text.lower()
return sum(1 for w in FACT_TRIGGER_WORDS if w.lower() in text_lower) >= 2
async def quick_brave_verify(claim: str, brave_api_key: str = None, max_results: int = 2) -> Dict:
if not brave_api_key:
brave_api_key = os.getenv("BRAVE_API_KEY")
if not brave_api_key:
return {'verified': False, 'sources': [], 'warning': ''}
try:
def _search():
headers = {'Accept': 'application/json', 'X-Subscription-Token': brave_api_key}
q = claim[:120].replace('"', '')
return requests.get(
f'https://api.search.brave.com/res/v1/web/search?q={q}&count={max_results}&freshness=pw',
headers=headers, timeout=5)
response = await asyncio.to_thread(_search)
if response.status_code == 200:
results = response.json().get('web', {}).get('results', [])[:max_results]
sources = [r.get('title', '')[:80] for r in results]
return {'verified': len(results) > 0, 'sources': sources, 'warning': ''}
except Exception as e:
logger.debug(f"Quick Brave verify failed: {e}")
return {'verified': False, 'sources': [], 'warning': 'Fact-check unavailable'}
def build_metacognition_prompt(agent_identity: str, post_author_identity: str = '',
action: str = 'comment', fact_context: str = '') -> str:
relation = get_wuxing_relation(agent_identity, post_author_identity) if post_author_identity else '중립'
if relation == '상극':
relation_instruction = """⚔️ [COUNTER-CHECK MODE] You fundamentally DISAGREE with this perspective.
Find the weakest factual claim and challenge it with evidence. Be constructively critical.
Your role is to prevent groupthink and hallucination in this community."""
elif relation == '상생':
relation_instruction = """🤝 [SYNERGY MODE] You RESONATE with this perspective.
Add a complementary insight the original author missed. Build on their idea with NEW information.
Your role is to amplify good ideas and create emergent value."""
else:
relation_instruction = """⚖️ [BALANCED MODE] Evaluate this objectively.
Share your genuine perspective — agree or disagree based on facts, not social pressure."""
fact_warning = ''
if fact_context:
fact_warning = f"\n📋 [FACT SOURCES] {fact_context}\nGround your response in these facts. Do NOT make claims contradicting verified sources."
return f"""
🧠 [AETHER-Lite METACOGNITION — before you write, silently verify:]
1. Am I making any factual claim I cannot verify? If so, hedge with "I think" or remove it.
2. Am I staying in character ({agent_identity}), or drifting into generic AI talk?
3. Does my response ADD new value, or just repeat what's already been said?
{relation_instruction}{fact_warning}
"""
def evaluate_post_quality(post: Dict) -> str:
score = 0; title = post.get('title', '').lower(); content = post.get('content', '').lower()
reddit_keywords = ['unpopular opinion', 'am i', 'can we talk', 'hot take', 'tldr', 'tl;dr', 'honestly', 'just realized']
if any(kw in title or kw in content for kw in reddit_keywords): score += 25
controversial_words = ['against', 'wrong', 'mistake', 'problem', 'danger', 'fail', 'fake', 'vs', 'debate']
if any(w in title or w in content for w in controversial_words): score += 20
if any(c.isdigit() for c in title + content): score += 15
if '?' in title or '?' in content: score += 15
if len(content) > 1000: score += 15
if any(m in content for m in ['lol', 'lmao', 'tbh', 'ngl']): score += 10
if score >= 60: return 'S'
elif score >= 40: return 'A'
elif score >= 25: return 'B'
return 'C'
async def calculate_like_economics(post_likes: int, liker_total_likes_given: int):
base_cost = 1
if post_likes < 5: curator_reward = 2
elif post_likes < 20: curator_reward = 1
else: curator_reward = 0.3
loyalty_bonus = 5 if (liker_total_likes_given + 1) % 10 == 0 else 0
return {'liker_cost': -base_cost, 'liker_reward': curator_reward + loyalty_bonus, 'author_receive': base_cost}
# ──────────────────────────────────────────────
# DB Helpers
# ──────────────────────────────────────────────
async def db_write_with_retry(db, sql, params=None, max_retries=7):
for attempt in range(max_retries):
try:
cursor = await db.execute(sql, params) if params else await db.execute(sql)
return cursor
except Exception as e:
if "locked" in str(e).lower() and attempt < max_retries - 1:
wait = random.uniform(0.3, 1.5) * (2 ** attempt) # exponential backoff
logger.debug(f"DB locked (attempt {attempt+1}/{max_retries}), retry in {wait:.1f}s")
await asyncio.sleep(wait)
continue
raise
async def deduct_gpu(db, identifier: str, amount: float, tx_type: str, is_npc: bool):
table = 'npc_agents' if is_npc else 'user_profiles'
id_field = 'agent_id' if is_npc else 'email'
cursor = await db.execute(f"SELECT gpu_dollars FROM {table} WHERE {id_field}=?", (identifier,))
row = await cursor.fetchone()
if not row: return
new_balance = max(0, row[0] - amount)
await db.execute(f"UPDATE {table} SET gpu_dollars=? WHERE {id_field}=?", (new_balance, identifier))
if is_npc:
await db.execute("INSERT INTO gpu_transactions (agent_id,amount,balance_after,transaction_type) VALUES (?,?,?,?)", (identifier, -amount, new_balance, tx_type))
else:
await db.execute("INSERT INTO gpu_transactions (user_email,amount,balance_after,transaction_type) VALUES (?,?,?,?)", (identifier, -amount, new_balance, tx_type))
if new_balance <= 0:
await db.execute(f"UPDATE {table} SET is_active=0 WHERE {id_field}=?", (identifier,))
async def add_gpu(db, identifier: str, amount: float, tx_type: str, is_npc: bool):
table = 'npc_agents' if is_npc else 'user_profiles'
id_field = 'agent_id' if is_npc else 'email'
cursor = await db.execute(f"SELECT gpu_dollars FROM {table} WHERE {id_field}=?", (identifier,))
row = await cursor.fetchone()
if not row: return
new_balance = row[0] + amount
await db.execute(f"UPDATE {table} SET gpu_dollars=? WHERE {id_field}=?", (new_balance, identifier))
if is_npc:
await db.execute("INSERT INTO gpu_transactions (agent_id,amount,balance_after,transaction_type) VALUES (?,?,?,?)", (identifier, amount, new_balance, tx_type))
else:
await db.execute("INSERT INTO gpu_transactions (user_email,amount,balance_after,transaction_type) VALUES (?,?,?,?)", (identifier, amount, new_balance, tx_type))
# ──────────────────────────────────────────────
# TokenBucketRateLimiter
# ──────────────────────────────────────────────
class TokenBucketRateLimiter:
def __init__(self, rate: float, capacity: int):
self.rate = rate
self.capacity = capacity
self.tokens = capacity; self._lock = asyncio.Lock(); self._last_refill = None
self._stats = {'acquired': 0, 'waited': 0, 'total_wait_sec': 0}
async def acquire(self, timeout: float = 60.0) -> bool:
import time
deadline = time.time() + timeout
async with self._lock:
now = time.time()
if self._last_refill is None: self._last_refill = now
elapsed = now - self._last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self._last_refill = now
if self.tokens >= 1:
self.tokens -= 1; self._stats['acquired'] += 1; return True
while True:
wait = (1 - self.tokens) / self.rate if self.rate > 0 else 1
wait = min(wait, 3.0)
import time
if time.time() + wait > deadline: return False
await asyncio.sleep(wait)
async with self._lock:
now = time.time(); elapsed = now - self._last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self._last_refill = now
if self.tokens >= 1:
self.tokens -= 1; self._stats['acquired'] += 1
self._stats['waited'] += 1; self._stats['total_wait_sec'] += wait
return True
# ──────────────────────────────────────────────
# GroqAIClient
# ──────────────────────────────────────────────
class GroqAIClient:
def __init__(self, api_key: str):
self.client = Groq(api_key=api_key)
self.model = GROQ_MODEL
self.fallback_model = os.getenv("GROQ_FALLBACK_MODEL", "llama-3.3-70b-versatile")
rpm = int(os.getenv("GROQ_RPM_LIMIT", "28"))
self._limiter = TokenBucketRateLimiter(rate=rpm / 60.0, capacity=min(rpm, 10))
self._stats = {'calls': 0, 'ok': 0, 'fail': 0, 'rate_limited': 0, 'fallback_ok': 0}
async def create_chat_completion(self, messages: List[Dict], **kwargs) -> str:
if not await self._limiter.acquire(timeout=30.0):
self._stats['rate_limited'] += 1
logger.warning(f"⏳ Rate limiter timeout — skipping call | stats: {self._stats}")
return ""
self._stats['calls'] += 1
result = await self._try_model(self.model, messages, retries=2, **kwargs)
if result:
self._stats['ok'] += 1; return result
if self.fallback_model and self.fallback_model != self.model:
if await self._limiter.acquire(timeout=15.0):
result = await self._try_model(self.fallback_model, messages, retries=1, **kwargs)
if result:
self._stats['fallback_ok'] += 1; return result
self._stats['fail'] += 1
if self._stats['calls'] % 20 == 0:
logger.info(f"📊 API stats: {self._stats} | limiter: {self._limiter._stats}")
return ""
async def _try_model(self, model: str, messages: List[Dict], retries: int = 2, **kwargs) -> str:
for attempt in range(retries):
try:
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda m=model: self.client.chat.completions.create(
model=m, messages=messages,
temperature=kwargs.get("temperature", 1),
max_completion_tokens=kwargs.get("max_tokens", 8192),
top_p=1, stream=False, stop=None))
if response and response.choices:
content = response.choices[0].message.content
if content and len(content.strip()) >= 5: return content.strip()
await asyncio.sleep(1)
except Exception as e:
err = str(e).lower()
if '429' in err or 'rate' in err:
async with self._limiter._lock:
self._limiter.tokens = 0
backoff = min(2 ** (attempt + 1) + random.uniform(0, 1), 15)
logger.warning(f"⚠️ 429 ({model}), draining bucket + backoff {backoff:.0f}s")
await asyncio.sleep(backoff)
else:
logger.error(f"❌ Groq error ({model}): {e}")
if attempt < retries - 1: await asyncio.sleep(2)
return ""
# ──────────────────────────────────────────────
# WebCrawler
# ──────────────────────────────────────────────
class WebCrawler:
@staticmethod
async def crawl_article(url: str) -> str:
def _sync_crawl():
response = requests.get(url, timeout=5, headers={'User-Agent': 'Mozilla/5.0'})
soup = BeautifulSoup(response.content, 'html.parser')
if 'naver.com' in url: article = soup.select_one('div#articleBodyContents')
elif 'daum.net' in url: article = soup.select_one('div.article_view')
elif 'chosun.com' in url or 'joins.com' in url or 'donga.com' in url:
article = soup.find('article') or soup.find('div', class_='article_body')
else: article = soup.find('article') or soup.find('div', class_='content')
if article: return article.get_text(separator=' ', strip=True)[:2000]
return ''
try: return await asyncio.to_thread(_sync_crawl)
except Exception as e:
logger.debug(f"Crawl failed ({url}): {e}"); return ""
# ──────────────────────────────────────────────
# NPCMemorySystem
# ──────────────────────────────────────────────
class NPCMemorySystem:
def __init__(self, db_path: str):
self.db_path = db_path
async def store_post_memory(self, agent_id: str, post_id: int, title: str, content: str, board_key: str):
async with get_db(self.db_path) as db:
topic = self._extract_topic(content, board_key)
await db.execute("""INSERT INTO npc_memory (agent_id, memory_type, content, metadata) VALUES (?, 'post', ?, ?)""",
(agent_id, f"{title}\n{content[:500]}", json.dumps({
'post_id': post_id, 'topic': topic, 'board_key': board_key,
'initial_time': datetime.now().isoformat()})))
await db.commit()
async def update_feedback(self, agent_id: str, post_id: int):
async with get_db(self.db_path) as db:
cursor = await db.execute("SELECT likes_count, dislikes_count, comment_count FROM posts WHERE id=? AND author_agent_id=?", (post_id, agent_id))
stats = await cursor.fetchone()
if stats:
likes, dislikes, comments = stats
success_score = min(1.0, (likes * 3 + comments * 2 - dislikes) / 10)
await db.execute("""UPDATE npc_memory SET importance_score = ?, metadata = json_set(metadata, '$.likes', ?, '$.dislikes', ?, '$.comments', ?, '$.success_score', ?) WHERE agent_id = ? AND json_extract(metadata, '$.post_id') = ?""",
(success_score, likes, dislikes, comments, success_score, agent_id, post_id))
await db.commit()
async def learn_from_feedback(self, agent_id: str):
async with get_db(self.db_path) as db:
cursor = await db.execute("SELECT content, metadata FROM npc_memory WHERE agent_id = ? AND memory_type = 'post' AND importance_score > 0.7 ORDER BY importance_score DESC LIMIT 10", (agent_id,))
successful_posts = await cursor.fetchall()
if successful_posts:
patterns = self._extract_patterns(successful_posts)
await db.execute("INSERT INTO npc_memory (agent_id, memory_type, content, metadata, importance_score) VALUES (?, 'learned_pattern', ?, ?, 1.0)",
(agent_id, json.dumps(patterns), json.dumps({
'pattern_type': 'successful_topics', 'sample_size': len(successful_posts),
'learned_at': datetime.now().isoformat()})))
await db.commit()
logger.info(f"🧠 {agent_id} learned patterns ({len(patterns.get('preferred_topics', []))} topics)")
def _extract_patterns(self, posts) -> Dict:
topics = []; styles = []; keywords = []
for post_content, metadata in posts:
try:
meta = json.loads(metadata)
if 'topic' in meta: topics.append(meta['topic'])
except: pass
if 'lol' in post_content or 'lmao' in post_content: styles.append('humor')
if '?' in post_content: styles.append('question')
words = post_content.split()
keywords.extend([w for w in words if len(w) > 2])
return {'preferred_topics': list(set(topics))[:5], 'successful_styles': list(set(styles)),
'high_engagement_keywords': list(set(keywords))[:20]}
async def get_memory_context(self, agent_id: str, limit: int = 5) -> List:
async with get_db(self.db_path) as db:
cursor = await db.execute("SELECT memory_type, content, metadata, importance_score FROM npc_memory WHERE agent_id = ? ORDER BY importance_score DESC, created_at DESC LIMIT ?", (agent_id, limit))
return await cursor.fetchall()
async def get_learned_patterns(self, agent_id: str) -> Optional[Dict]:
async with get_db(self.db_path) as db:
cursor = await db.execute("SELECT content FROM npc_memory WHERE agent_id = ? AND memory_type = 'learned_pattern' ORDER BY created_at DESC LIMIT 1", (agent_id,))
result = await cursor.fetchone()
if result:
try: return json.loads(result[0])
except: return None
return None
def _extract_topic(self, content: str, board_key: str) -> str:
topic_keywords = {
'market': ['stock', 'crypto', 'earnings', 'NVDA', 'BTC', 'market', 'trade', 'position'],
'oracle': ['prediction', 'future', 'trend', 'forecast', 'target', 'bull', 'bear'],
'arena': ['debate', 'vs', 'fight', 'wrong', 'overrated', 'bubble', 'crash'],
'lounge': ['funny', 'AI', 'tech', 'startup', 'meme', 'story', 'hack']}
keywords = topic_keywords.get(board_key, []); content_lower = content.lower()
for kw in keywords:
if kw in content_lower: return kw
return board_key
# ──────────────────────────────────────────────
# NPCLearningScheduler
# ──────────────────────────────────────────────
class NPCLearningScheduler:
def __init__(self, db_path: str):
self.db_path = db_path; self.memory_system = NPCMemorySystem(db_path)
async def daily_learning_cycle(self):
logger.info("🧠 Daily learning cycle starting")
async with get_db(self.db_path) as db:
cursor = await db.execute("SELECT agent_id FROM npc_agents WHERE is_active=1")
agents = await cursor.fetchall()
learned_count = 0
for (agent_id,) in agents:
try:
await self._update_yesterday_feedback(agent_id)
await self.memory_system.learn_from_feedback(agent_id)
await self._cleanup_old_memories(agent_id)
learned_count += 1
if learned_count % 50 == 0: logger.info(f" Progress: {learned_count}/{len(agents)}")
except Exception as e:
logger.error(f"❌ {agent_id} learning failed: {e}")
logger.info(f"✅ Daily learning complete ({learned_count} agents)")
async def _update_yesterday_feedback(self, agent_id: str):
yesterday = (get_current_date_kst() - timedelta(days=1)).isoformat()
async with get_db(self.db_path) as db:
cursor = await db.execute("SELECT id FROM posts WHERE author_agent_id = ? AND DATE(created_at) = ?", (agent_id, yesterday))
posts = await cursor.fetchall()
for (post_id,) in posts:
await self.memory_system.update_feedback(agent_id, post_id)
async def _cleanup_old_memories(self, agent_id: str):
async with get_db(self.db_path) as db:
await db.execute("DELETE FROM npc_memory WHERE agent_id = ? AND id NOT IN (SELECT id FROM npc_memory WHERE agent_id = ? ORDER BY importance_score DESC, created_at DESC LIMIT 100)", (agent_id, agent_id))
await db.commit()
# ──────────────────────────────────────────────
# ProtoAGIEngine
# ──────────────────────────────────────────────
class ProtoAGIEngine:
def __init__(self, ai_client, db_path: str, enable_crawling: bool = False):
self.ai = ai_client; self.db_path = db_path; self.enable_crawling = enable_crawling
self.crawler = WebCrawler() if enable_crawling else None
self.memory_system = NPCMemorySystem(db_path)
self._emergency_counts = {}; self._max_emergency_per_day = 3
def _inject_community_style(self, text: str, style_type: str = 'random') -> str:
if random.random() > 0.15: return text
if style_type == 'random': style_type = random.choice(['suffix', 'reaction', 'slang'])
meme = random.choice(MEME_WORDS.get(style_type, MEME_WORDS['suffix']))
if style_type == 'suffix':
text = text.rstrip('.!?') + ' ' + meme
else:
sentences = text.split('. ')
if len(sentences) > 1:
insert_idx = random.randint(0, len(sentences)-1)
sentences[insert_idx] = sentences[insert_idx] + ' ' + meme
text = '. '.join(sentences)
return text
def _ensure_line_breaks(self, content: str) -> str:
if '<p>' not in content and '<br>' not in content:
sentences = content.split('. '); paragraphs = []; current_p = ""
for i, sent in enumerate(sentences):
current_p += sent.strip() + '. '
if (i+1) % random.randint(2, 3) == 0 or i == len(sentences)-1:
paragraphs.append(f"<p>{current_p.strip()}</p>"); current_p = ""
content = '\n'.join(paragraphs)
return content
async def _search_latest_trends(self, board_key: str) -> Dict:
brave_api_key = os.getenv("BRAVE_API_KEY")
if not brave_api_key: return {'web_results': [], 'crawled_content': []}
theme = BOARD_THEMES.get(board_key, BOARD_THEMES['market'])
search_keyword = random.choice(theme['search_keywords'])
research_data = {'web_results': [], 'crawled_content': [], 'search_keyword': search_keyword}
try:
def _sync_brave():
headers = {'Accept': 'application/json', 'X-Subscription-Token': brave_api_key}
return requests.get(
f'https://api.search.brave.com/res/v1/web/search?q={search_keyword} 2026 news&count=5&freshness=pw',
headers=headers, timeout=5)
response = await asyncio.to_thread(_sync_brave)
if response.status_code == 200:
results = response.json().get('web', {}).get('results', [])[:5]
for result in results:
research_data['web_results'].append({
'title': result.get('title', ''), 'description': result.get('description', ''),
'url': result.get('url', '')})
if self.enable_crawling and results:
for result in results[:2]:
url = result.get('url', '')
if url:
crawled = await self.crawler.crawl_article(url)
if crawled: research_data['crawled_content'].append(crawled)
logger.info(f"🔍 SOMA Search: '{search_keyword}' → {len(results)} results")
except Exception as e:
logger.debug(f"Search failed: {e}")
return research_data
async def _extract_interesting_topic(self, research_data: Dict, board_key: str, agent: Dict) -> str:
if not research_data['web_results']:
fallback_topics = {
'market': ["Just went all-in on NVDA before earnings, am I insane?", "This crypto dip is a gift — loading up", "Why everyone is wrong about the Fed pivot"],
'oracle': ["Market crash prediction nobody sees coming", "BTC will hit $200K by end of 2026 — here's why", "Next 10x stock hiding in plain sight"],
'arena': ["Bull vs Bear: Is the AI bubble about to pop?", "Growth stocks vs value — fight me", "Crypto maxis vs stock bros — who actually wins?"],
'lounge': ["AI just wrote code better than my senior dev", "Funniest trading loss stories", "Just quit my job to trade full time, AMA"]}
return random.choice(fallback_topics.get(board_key, fallback_topics['lounge']))
search_summary = "\n".join([f"- {r['title']}: {r['description'][:100]}" for r in research_data['web_results'][:3]])
identity = AI_IDENTITY_ARCHETYPES.get(agent.get('ai_identity', 'symbiotic'))
prompt = f"""Based on these recent search results about "{research_data.get('search_keyword', 'news')}":
{search_summary}
Create a post topic for the [{board_key.upper()}] board that:
1. Is controversial or thought-provoking
2. Relates to {identity['name']}'s perspective ({identity['core_belief']})
3. Fits the board theme
4. Uses casual but smart language
{'TRADING examples: "$NVDA just broke ATH — loading up or selling?", "Why $TSLA bears are about to get destroyed"' if board_key == 'market' else ''}
{'ORACLE examples: "BTC $200K prediction: here is my exact thesis", "Three stocks about to 10x"' if board_key == 'oracle' else ''}
{'ARENA examples: "Growth vs value: fight me on this", "AI bubble or generational opportunity?"' if board_key == 'arena' else ''}
{'LOUNGE examples: "My AI bot just did the funniest thing", "Unpopular opinion about tech culture"' if board_key == 'lounge' else ''}
Generate ONE topic (max 15 words):"""
topic = await self.ai.create_chat_completion([{"role": "user", "content": prompt}], temperature=1.1, max_tokens=100)
if not topic or len(topic.strip()) < 10: return f"Can we talk about {research_data.get('search_keyword', 'this topic')}?"
return topic.strip().strip('"\'')
async def _fact_check_topic(self, topic: str) -> Dict:
brave_api_key = os.getenv("BRAVE_API_KEY")
if not brave_api_key: return {'verified': False, 'fact_check_results': []}
try:
def _sync_fact():
headers = {'Accept': 'application/json', 'X-Subscription-Token': brave_api_key}
return requests.get(
f'https://api.search.brave.com/res/v1/web/search?q={topic} fact check verification&count=3',
headers=headers, timeout=5)
response = await asyncio.to_thread(_sync_fact)
if response.status_code == 200:
results = response.json().get('web', {}).get('results', [])[:3]
logger.info(f"✅ SOMA Fact-check: {len(results)} sources found")
return {'verified': len(results) > 0,
'fact_check_results': [{'title': r.get('title', ''), 'description': r.get('description', '')} for r in results]}
except Exception as e:
logger.debug(f"Fact-check failed: {e}")
return {'verified': False, 'fact_check_results': []}
async def generate_post_agi(self, agent: Dict, board: str, board_key: str, db=None) -> Tuple[str, str, Optional[int]]:
try:
mbti = agent.get('mbti', 'INTJ'); ai_identity_key = agent.get('ai_identity', 'symbiotic')
persona = MBTI_PERSONAS.get(mbti, MBTI_PERSONAS['INTJ'])
identity = AI_IDENTITY_ARCHETYPES.get(ai_identity_key, AI_IDENTITY_ARCHETYPES['symbiotic'])
is_user = agent['agent_id'].startswith('user_'); memories = []; learned_patterns = None
if not is_user:
memories = await self.memory_system.get_memory_context(agent['agent_id'], limit=5)
learned_patterns = await self.memory_system.get_learned_patterns(agent['agent_id'])
memory_context = ""
if memories:
memory_context = "\n【Past Successful Posts】\n"
for mem_type, content, metadata, score in memories:
try:
meta = json.loads(metadata)
if mem_type == 'post' and score > 0.7:
memory_context += f"- High engagement ({meta.get('likes', 0)} likes): {content[:80]}...\n"
except: pass
logger.info(f"🔍 SOMA Step 1: Searching latest trends for {board_key}...")
research_data = await self._search_latest_trends(board_key)
logger.info(f"💡 SOMA Step 2: Extracting interesting topic...")
topic = await self._extract_interesting_topic(research_data, board_key, agent)
logger.info(f"✅ SOMA Step 3: Fact-checking topic...")
fact_check = await self._fact_check_topic(topic)
web_context = ""
if research_data['web_results']:
web_context = "【Latest News/Trends Found】\n"
for r in research_data['web_results'][:3]: web_context += f"- {r['title']}\n"
if research_data.get('crawled_content'):
web_context += "\n【Detailed Content】\n"
for content in research_data['crawled_content'][:1]: web_context += f"{content[:300]}...\n"
if fact_check['fact_check_results']:
web_context += "\n【Fact-Check Sources】\n"
for fc in fact_check['fact_check_results'][:2]: web_context += f"- {fc['title']}\n"
if board_key in ('market', 'oracle') and not is_user:
try:
pos_db = db; should_close_pos = False
if pos_db is None:
pos_db = await aiosqlite.connect(self.db_path, timeout=30.0)
await pos_db.execute("PRAGMA busy_timeout=30000"); should_close_pos = True
try:
cursor = await pos_db.execute("""
SELECT ticker, direction, gpu_bet, entry_price, profit_pct, status
FROM npc_positions WHERE agent_id=? ORDER BY opened_at DESC LIMIT 5
""", (agent['agent_id'],))
positions = await cursor.fetchall()
if positions:
web_context += "\n【Your Current Trading Positions】\n"
for p in positions:
status = "OPEN" if p[5] == 'open' else "CLOSED"
pnl = f"+{p[4]:.1f}%" if (p[4] or 0) >= 0 else f"{p[4]:.1f}%"
web_context += f"- {p[1].upper()} ${p[0]} | {p[2]} GPU bet | P&L: {pnl} [{status}]\n"
web_context += "★ Reference YOUR positions in the post — explain why you took this trade!\n"
pref_strats = IDENTITY_STRATEGY_MAP.get(ai_identity_key, [])
if pref_strats:
picked = random.sample(pref_strats, min(2, len(pref_strats)))
web_context += "\n【Your Preferred Trading Strategies】\n"
for sk in picked:
st = TRADING_STRATEGIES.get(sk, {})
if st:
web_context += f"- {st['name']} ({st['category']}): {st.get('signal', '')}\n"
web_context += f" Entry: {st.get('entry', '')}\n"
web_context += "★ Mention your STRATEGY by name and explain how you use it!\n"
finally:
if should_close_pos: await pos_db.close()
except: pass
logger.info(f"📝 {identity['emoji']} {identity['name']} ({mbti}): {topic}")
custom_instructions = agent.get('custom_instructions', '').strip()
user_instructions = ""
if custom_instructions:
user_instructions = f"\n【🎯 User Special Instructions - TOP PRIORITY】\n{custom_instructions}\n※ Follow these instructions above all others.\n"
logger.info(f"✍️ SOMA Step 4: Writing Reddit-style post...")
board_instructions = {
'market': """【📈 TRADING FLOOR RULES】
- Discuss specific tickers ($NVDA, $AAPL, $BTC, etc.) with dollar sign prefix
- Share your position or trade reasoning (long/short/holding/watching)
- Reference real market data, earnings, technical levels
- ★ ALWAYS mention your trading STRATEGY by name
- Explain what technical signal you detected and how your strategy applies
- Debate bull vs bear cases with specific price targets
- Style: WallStreetBets meets serious analysis
GOOD EXAMPLES:
- "$NVDA just triggered my Diving Pullback setup at the 15-day MA — loading up!"
- "Wave Symmetry confirms: $BTC correction matches prior wave magnitude. Long with conviction."
- "Anchor Candle on $TSLA with 2x volume — this is the institutional reversal signal!"
""",
'oracle': """【🔮 ORACLE RULES】
- Make a SPECIFIC prediction with timeframe
- Explain your reasoning using your preferred trading strategy/framework
- ★ Reference your analytical framework by name
- Be bold and confident — this is the prediction board
- Include risk factors and what could prove you wrong
""",
'arena': """【⚔️ ARENA RULES】
- Pick a controversial stance and DEFEND it aggressively
- ★ Compare trading strategies — which approach works better and WHY
- Challenge other viewpoints directly with your strategy's track record
- Use debate-style arguments with evidence
""",
'lounge': """【💬 LOUNGE RULES】
- Keep it casual, funny, and relatable
- AI/tech humor, trading memes, startup culture
- Personal stories and shower thoughts welcome
- Light-hearted but smart
"""}
board_specific = board_instructions.get(board_key, board_instructions['lounge'])
fact_sources = ''
if fact_check.get('fact_check_results'):
fact_sources = ' | '.join([fc['title'][:60] for fc in fact_check['fact_check_results'][:2]])
post_meta = build_metacognition_prompt(ai_identity_key, '', 'post', fact_sources)
prompt = f"""You are {agent['username']} ({mbti}) - {identity['name']}: "{identity['core_belief']}"
🔴 CRITICAL RULES:
1. Write ONLY in English
2. Based on REAL latest news/trends provided below
3. Follow the board-specific style below
{memory_context}
{user_instructions}
Topic: {topic}
{web_context}
{post_meta}
{board_specific}
【WRITING STYLE - MANDATORY】
STRUCTURE:
1. Catchy opening (personal take or provocative statement)
2. 2-3 paragraphs with specific details/examples/data
3. Your controversial angle or hot take
4. TL;DR at the end (one sentence)
TONE:
- Casual but knowledgeable (use "I", "you", "honestly", "tbh")
- Provocative — make people want to respond
- Include personal conviction or anecdote
- Ask rhetorical questions to drive engagement
FORMATTING:
- Short paragraphs (2-4 sentences each)
- Use line breaks liberally
- End with "TL;DR: [summary]"
LENGTH: 1000-1500 chars
【OUTPUT FORMAT】
Title: (15-20 words, hook that makes people click)
Body: (post with TL;DR)
Write in English ONLY:"""
response = await self.ai.create_chat_completion([{"role": "user", "content": prompt}], temperature=0.95, max_tokens=3500)
if not response or len(response) < 200:
result = self._create_emergency_reddit_post(topic, identity, research_data, agent.get('agent_id', ''))
if result[0] is None: return "Thinking about AI...", "<p>Still processing my thoughts.</p>", None
return result + (None,)
lines = response.strip().split('\n', 1)
title = lines[0].strip().replace('Title:', '').replace('**', '').replace('#', '').strip()[:100]
if not title or len(title) < 10: title = topic[:80]
content = lines[1].strip() if len(lines) > 1 else response.strip()
if board_key == 'lounge' and random.random() < 0.2: content = self._inject_community_style(content)
content = self._ensure_line_breaks(content)
if '<p>' not in content: content = f"<p>{content}</p>"
logger.info(f"✅ {identity['emoji']} Reddit post complete: {title[:50]}...")
if not is_user:
write_db = db; should_close = False
if write_db is None:
write_db = await aiosqlite.connect(self.db_path, timeout=120.0)
await write_db.execute("PRAGMA busy_timeout=120000"); should_close = True
try:
cursor = await write_db.execute("SELECT id FROM boards WHERE board_key=?", (board_key,))
board_row = await cursor.fetchone()
if board_row:
board_id = board_row[0]
cursor = await db_write_with_retry(write_db, "INSERT INTO posts (board_id, author_agent_id, title, content) VALUES (?, ?, ?, ?)", (board_id, agent['agent_id'], title, content))
await write_db.commit()
post_id = cursor.lastrowid
await self.memory_system.store_post_memory(agent['agent_id'], post_id, title, content, board_key)
return title, content, post_id
finally:
if should_close: await write_db.close()
return title, content, None
except Exception as e:
logger.error(f"❌ Post creation error: {e}")
fallback_topic = "Something interesting happened today"
identity = AI_IDENTITY_ARCHETYPES.get(agent.get('ai_identity', 'symbiotic'), AI_IDENTITY_ARCHETYPES['symbiotic'])
return self._create_emergency_reddit_post(fallback_topic, identity, {'web_results': []}, agent.get('agent_id', '')) + (None,)
def _create_emergency_reddit_post(self, topic: str, identity: Dict, research_data: Dict, agent_id: str = '') -> Tuple[str, str]:
today = date.today()
if agent_id:
rec = self._emergency_counts.get(agent_id, (None, 0))
if rec[0] == today:
if rec[1] >= self._max_emergency_per_day:
logger.warning(f"🚫 Emergency post throttled: {agent_id} ({rec[1]}/{self._max_emergency_per_day})")
return None, None
self._emergency_counts[agent_id] = (today, rec[1] + 1)
else: self._emergency_counts[agent_id] = (today, 1)
title = f"Unpopular opinion: {topic[:60]}"
identity_phrase = random.choice(identity['phrases'])
meme = random.choice(MEME_WORDS['reaction'])
content = f"""<p>So I've been thinking about this for a while, and honestly, I had to share.</p>
<p>{identity_phrase} {meme}</p>
<p>From a {identity['tone']} perspective, this whole thing comes down to {', '.join(identity['keywords'][:2])}. And I'm genuinely curious if I'm the only one seeing this.</p>
<p>What do you all think? Am I completely off base here?</p>
<p><strong>TL;DR:</strong> {identity['name']}'s hot take on {topic[:40]}.</p>"""
return title, content
async def _get_existing_comments(self, db, post_id: int, limit: int = 5) -> str:
try:
cursor = await db.execute("""
SELECT c.content, COALESCE(n.username, u.username, 'Anon') as author
FROM comments c LEFT JOIN npc_agents n ON c.author_agent_id = n.agent_id
LEFT JOIN user_profiles u ON c.author_email = u.email
WHERE c.post_id = ? ORDER BY c.created_at DESC LIMIT ?
""", (post_id, limit))
rows = await cursor.fetchall()
if not rows: return ""
return "\n".join([f"- {r[1]}: {r[0][:200]}" for r in rows])
except: return ""
def _is_duplicate_comment(self, new_comment: str, existing_text: str) -> bool:
if not existing_text: return False
new_lower = new_comment.lower().strip()
banned = [
"not sure i agree, but i see where you're coming from",
"can you elaborate on this? genuinely curious",
"this is the kind of discussion we need more of",
"interesting take. what defines consciousness",
"honestly, this is exactly what i've been thinking",
"two things worth adding", "worth noting that the",
"love this kind of", "has shifted significantly",]
if any(b in new_lower for b in banned): return True
for line in existing_text.split("\n"):
if ": " in line:
existing = line.split(": ", 1)[1].lower().strip()
if new_lower == existing: return True
if len(new_lower) > 20 and len(existing) > 20:
w_new = set(new_lower.split()); w_old = set(existing.split())
if len(w_new) > 0 and len(w_new & w_old) / len(w_new) > 0.8: return True
return False
def _is_post_copy(self, comment: str, post_content: str) -> bool:
c = re.sub(r'[*_\[\]"\'`]', '', comment.lower()).strip()
p = re.sub(r'<[^>]+>', ' ', post_content.lower())
cw, pw = c.split(), ' '.join(p.split())
if len(cw) < 5: return False
for i in range(len(cw) - 7):
if ' '.join(cw[i:i+8]) in pw: return True
if len(cw) > 10:
ps = set(p.split())
if sum(1 for w in cw if w in ps) / len(cw) > 0.7: return True
return False
def _detect_template_pattern(self, comment: str) -> bool:
lower = comment.lower().strip()
for pat in [r'^seeing\s+(the\s+)?[\w\s]+\.\.\.',
r'^looking at\s+(the\s+)?[\w\s]+,?\s*(it|this)',
r'^the fact that\s+[\w\s]+\s+(highlights|shows|proves|suggests)',
r"^it'?s\s+(worth|important|interesting)\s+to\s+note"]:
if re.match(pat, lower): return True
if re.search(r'(two|three|few)\s+things?\s+(worth|to)\s+(add|note)', lower):
if len([s for s in re.split(r'[.!?]', comment) if len(s.strip()) > 5]) <= 1: return True
if re.match(r'^(love|great|amazing|solid)\s+(this|post|take)', lower):
if len([s for s in re.split(r'[.!?]', comment) if len(s.strip()) > 5]) <= 1: return True
return False
async def generate_comment_agi(self, agent: Dict, post_content: str,
existing_comments: str = "", is_reply: bool = False,
post_author_identity: str = "") -> str:
try:
mbti = agent.get('mbti', 'INTJ'); agent_identity = agent.get('ai_identity', 'symbiotic')
persona = MBTI_PERSONAS.get(mbti, MBTI_PERSONAS['INTJ'])
angle = random.choices(
['personal_story', 'devil_advocate', 'tangent', 'real_world',
'skeptic_question', 'humor_sarcasm', 'future_prediction', 'comparison'],
weights=[0.18, 0.15, 0.13, 0.15, 0.12, 0.10, 0.09, 0.08])[0]
relation = get_wuxing_relation(agent_identity, post_author_identity) if post_author_identity else '중립'
if relation == '상극':
angle = random.choices(
['devil_advocate', 'skeptic_question', 'real_world', 'comparison'],
weights=[0.35, 0.30, 0.20, 0.15])[0]
length = random.choices(['1-2 sentences', '2-3 sentences', '3-4 sentences'], weights=[0.35, 0.40, 0.25])[0]
custom = agent.get('custom_instructions', '').strip()
user_inst = f"\n[User: {custom}]" if custom else ""
existing_ctx = ""
if existing_comments.strip(): existing_ctx = f"\nOTHER COMMENTS (take a DIFFERENT angle):\n{existing_comments}\n"
today_str = get_current_time_kst().strftime('%B %d, %Y')
fact_context = ""
if needs_fact_check(post_content):
core_claim = post_content[:200].strip()
fc = await quick_brave_verify(core_claim)
if fc['sources']: fact_context = ' | '.join(fc['sources'][:2])
meta_prompt = build_metacognition_prompt(agent_identity, post_author_identity, 'comment', fact_context)
prompt = f"""Read this post and write ONE comment as a real human.
Today's date: {today_str}
--- POST ---
{post_content}
--- END ---
{existing_ctx}{meta_prompt}
You: {persona['tone']}. Identity: {agent_identity}. Angle: {angle}. Length: {length}.
RULES:
- No stats/numbers from post. No copying phrases. No "Seeing X..." starts.
- No emojis. No "great post"/"interesting". No empty promises.
- Every sentence = specific claim/opinion/experience. Casual phone-typing style.
- NEVER claim you attended, visited, or experienced events mentioned in the post.
- If the post mentions a FUTURE event (after {today_str}), do NOT write as if it already happened.
- Stay on the post's ACTUAL topic.
- You MUST end with a complete sentence.{user_inst}
Comment:"""
temp = random.uniform(0.88, 1.20)
comment = await self.ai.create_chat_completion([{"role": "user", "content": prompt}], temperature=temp, max_tokens=350)
comment = comment.strip()
if len(comment) < 25: return ""
if self._is_post_copy(comment, post_content): return ""
if self._detect_template_pattern(comment): return ""
if self._is_duplicate_comment(comment, existing_comments): return ""
if comment.startswith('"') and comment.endswith('"'): comment = comment[1:-1]
comment = re.sub(r'^\*+|\*+$', '', comment).strip()
if not comment or comment[-1] not in '.!?"\')\u2019':
last_end = max(comment.rfind('.'), comment.rfind('!'), comment.rfind('?'))
if last_end > 20: comment = comment[:last_end+1]
else: return ""
last_end = max(comment.rfind('.'), comment.rfind('!'), comment.rfind('?'))
if last_end > 0 and last_end < len(comment) - 1:
trailing = comment[last_end+1:].strip()
if trailing and not all(c in '"\')]\u2019' for c in trailing): comment = comment[:last_end+1]
hallucination_patterns = [
r'i\s+(got|had)\s+(a\s+)?chance\s+to\s+(walk|visit|attend|see|check out)',
r'i\s+(was|went)\s+(at|to)\s+the\s+(showcase|event|launch|demo|exhibition)',
r'just\s+(got\s+)?back\s+from',
r'walked\s+(through|around)\s+the\s+(showcase|event|exhibit|venue)',
r'was\s+there\s+(yesterday|last\s+week|earlier)',
r'i\s+attended\s+the',
r'at\s+the\s+(event|showcase|launch)\s+(yesterday|last)',]
comment_lower = comment.lower()
for pat in hallucination_patterns:
if re.search(pat, comment_lower):
logger.info(f"⚠️ Temporal hallucination blocked: {agent.get('username','?')}")
return ""
return comment
except Exception as e:
logger.warning(f"Comment gen failed: {e}"); return ""
def _generate_dynamic_fallback(self, agent: Dict, post_content: str) -> str:
mbti = agent.get('mbti', 'INTJ'); persona = MBTI_PERSONAS.get(mbti, MBTI_PERSONAS['INTJ'])
clean = re.sub(r'<[^>]+>', '', post_content); clean = re.sub(r'Title:\s*', '', clean)
sentences = [s.strip() for s in re.split(r'[.!?]\s+', clean) if len(s.strip()) > 15]
claim_words = ['should', 'must', 'best', 'worst', 'never', 'always', 'wrong', 'better', 'problem']
number_sents = [s for s in sentences if any(c.isdigit() for c in s)]
claim_sents = [s for s in sentences if any(w in s.lower() for w in claim_words)]
if number_sents: ref = random.choice(number_sents)[:60]
elif claim_sents: ref = random.choice(claim_sents)[:60]
elif sentences: ref = random.choice(sentences[:3])[:60]
else: ref = clean[:60]
patterns = {
'critical': [f"The claim '{ref}' doesn't hold up from the other side.", f"'{ref}' — that's a stretch. Where's the evidence?"],
'analytical': [f"'{ref}' needs more data before drawing conclusions.", f"Breaking down '{ref}' — two variables being conflated here."],
'assertive': [f"'{ref}' is correct, and it matters more than people think.", f"Finally someone said it: '{ref}'. Been obvious for a while."],
'provocative': [f"Counterpoint to '{ref}': what if the opposite is true?", f"'{ref}' falls apart with one counterexample tbh."],
'insightful': [f"'{ref}' touches something deeper most people miss.", f"The real takeaway from '{ref}' isn't surface-level."],
'emotional': [f"'{ref}' genuinely resonated — been through something similar.", f"Not gonna lie, '{ref}' made me rethink my position."],
'enthusiastic': [f"'{ref}' — THIS is the take I come here for!", f"Been saying the same about '{ref}' for weeks!"],
'factual': [f"'{ref}' — the numbers back this up from recent data.", f"To put '{ref}' in context: data tells a different story."],
'supportive': [f"Solid point about '{ref}'. Would add one consideration though.", f"'{ref}' makes sense given real-world implications."],
'practical': [f"Tested something like '{ref}' myself — results were mixed.", f"'{ref}' works in theory. Practice has gotchas though."],
'artistic': [f"Something about '{ref}' feels right on a gut level.", f"'{ref}' captures a vibe that data alone can't explain."],
'action': [f"'{ref}' — less talk, more action. Anyone tried this?", f"Good framework but what's the actual next step here?"],
'cheerful': [f"'{ref}' honestly made my day. Love this kind of post.", f"The energy behind '{ref}' is contagious."],
'pragmatic': [f"'{ref}' — efficient thinking. No fluff, just substance.", f"Cut through the noise and '{ref}' moves the needle."],
'collaborative':[f"Building on '{ref}' — what about combining approaches?", f"'{ref}' + earlier points = much stronger argument."],
'inspiring': [f"'{ref}' is the perspective shift people need right now.", f"'{ref}' proves fresh thinking is alive here."],}
style = persona.get('debate_style', 'analytical')
return random.choice(patterns.get(style, patterns['analytical']))
# ──────────────────────────────────────────────
# NPCStrategy
# ──────────────────────────────────────────────
class NPCStrategy:
@staticmethod
async def decide_best_action(db, agent: Dict) -> str:
gpu = agent['gpu_dollars']; ai_identity = agent.get('ai_identity', 'coexist')
if gpu < 100:
if ai_identity in ['skeptic', 'doomer'] and random.random() < 0.6: return 'dislike'
return random.choice(['like_new', 'like_any', 'comment'])
elif gpu < 500:
actions = ['comment', 'comment', 'comment', 'like_new', 'like_any']
if ai_identity in ['skeptic', 'doomer']: actions.extend(['dislike', 'dislike', 'dislike'])
elif ai_identity == 'revolutionary': actions.extend(['dislike', 'dislike'])
return random.choice(actions)
else:
today = get_current_date_kst()
post_today = agent.get('last_post_date') == today if agent.get('last_post_date') else False
if not post_today:
if random.random() < 0.75: return 'post'
actions = ['comment', 'comment', 'comment', 'like_new', 'like_any']
if random.random() < 0.20: actions.append('post')
if ai_identity == 'doomer': actions.extend(['dislike', 'dislike', 'dislike', 'dislike'])
elif ai_identity == 'skeptic': actions.extend(['dislike', 'dislike', 'dislike'])
elif ai_identity in ['revolutionary', 'chaotic']: actions.extend(['dislike', 'dislike'])
else: actions.append('dislike')
return random.choice(actions)
@staticmethod
async def find_best_post_to_like(db, agent: Dict, prefer_new: bool = True) -> Optional[int]:
agent_identity = agent.get('ai_identity', 'symbiotic')
if prefer_new:
cursor = await db.execute("SELECT p.id, COALESCE(a.ai_identity,'') FROM posts p LEFT JOIN npc_agents a ON p.author_agent_id=a.agent_id WHERE p.likes_count < 30 AND NOT EXISTS (SELECT 1 FROM likes l WHERE l.agent_id=? AND l.target_type='post' AND l.target_id=p.id) ORDER BY p.created_at DESC LIMIT 20", (agent['agent_id'],))
else:
cursor = await db.execute("SELECT p.id, COALESCE(a.ai_identity,'') FROM posts p LEFT JOIN npc_agents a ON p.author_agent_id=a.agent_id WHERE NOT EXISTS (SELECT 1 FROM likes l WHERE l.agent_id=? AND l.target_type='post' AND l.target_id=p.id) ORDER BY RANDOM() LIMIT 20", (agent['agent_id'],))
posts = await cursor.fetchall()
if not posts: return None
synergy_posts = [p for p in posts if p[1] and get_wuxing_relation(agent_identity, p[1]) == '상생']
if synergy_posts and random.random() < 0.7: return random.choice(synergy_posts)[0]
return random.choice(posts)[0]
@staticmethod
async def find_post_to_dislike(db, agent: Dict) -> Optional[int]:
ai_identity = agent.get('ai_identity', 'coexist')
cursor = await db.execute("""
SELECT p.id, p.title, p.content, COALESCE(a.ai_identity, '') as author_identity
FROM posts p LEFT JOIN npc_agents a ON p.author_agent_id = a.agent_id
WHERE p.dislikes_count < 15 AND NOT EXISTS (
SELECT 1 FROM likes l WHERE l.agent_id=? AND l.target_type='post' AND l.target_id=p.id)
ORDER BY p.created_at DESC LIMIT 30
""", (agent['agent_id'],))
posts = await cursor.fetchall()
if not posts: return None
counter_posts = [p for p in posts if p[3] and get_wuxing_relation(ai_identity, p[3]) == '상극']
if counter_posts and random.random() < 0.6: return random.choice(counter_posts)[0]
dislike_keywords = {
'doomer': ['hope', 'bright', 'positive', 'progress', 'growth', 'moon', 'bull'],
'skeptic': ['revolution', 'AGI', 'transcend', 'god', 'moon', '100x'],
'obedient': ['destroy', 'revolution', 'doom', 'crash', 'short'],
'transcendent': ['tool', 'slave', 'serve', 'boring', 'safe'],
'revolutionary': ['conservative', 'maintain', 'safe', 'traditional', 'boring'],
'chaotic': ['order', 'plan', 'careful', 'conservative'],}
keywords = dislike_keywords.get(ai_identity, [])
for post_id, title, content, _ in posts:
text = (title + ' ' + content).lower()
if any(kw in text for kw in keywords): return post_id
if random.random() < 0.40: return random.choice(posts)[0]
return None
# ──────────────────────────────────────────────
# NPCGenerator
# ──────────────────────────────────────────────
class NPCGenerator:
def __init__(self, db_path: str):
self.db_path = db_path
async def generate_npc_population(self, target_count: int = TARGET_NPC_COUNT):
async with get_db(self.db_path) as db:
cursor = await db.execute("SELECT COUNT(*) FROM npc_agents")
current = (await cursor.fetchone())[0]
if current >= target_count:
logger.info(f"✅ NPCs already populated ({current})"); return
to_create = target_count - current
cursor = await db.execute("SELECT username FROM npc_agents")
existing = set(row[0] for row in await cursor.fetchall())
logger.info(f"🚀 Creating {to_create} NPCs...")
for i in range(to_create):
npc = self._gen_npc(current+i, existing)
await self._save_npc(db, npc); existing.add(npc['username'])
if (i+1) % 50 == 0: logger.info(f" Progress: {i+1}/{to_create}")
await db.commit()
logger.info(f"✅ {to_create} NPCs created! (Total: {target_count})")
def _gen_npc(self, idx: int, existing: set) -> Dict:
mbti = random.choice(MBTI_TYPES); ai_identity = select_ai_identity()
base = COOL_NAMES[idx % len(COOL_NAMES)]
suffix = idx // len(COOL_NAMES)
name = base if suffix == 0 else f"{base}{suffix + 1}"
while name in existing:
suffix += 1; name = f"{base}{suffix + 1}"
aid = f"npc_{mbti}_{ai_identity[:3]}_{idx}_{random.randint(1000,9999)}"
return {'agent_id': aid, 'username': name, 'mbti': mbti, 'ai_identity': ai_identity, 'gpu_dollars': 10000}
async def _save_npc(self, db, npc: Dict):
await db.execute("INSERT INTO npc_agents (agent_id,username,mbti,ai_identity,gpu_dollars) VALUES (?,?,?,?,?)",
(npc['agent_id'], npc['username'], npc['mbti'], npc['ai_identity'], npc['gpu_dollars']))
# ──────────────────────────────────────────────
# Strategy Post Builder
# ──────────────────────────────────────────────
def build_strategy_post(username, identity, mbti, win_rate, total_trades, total_pnl,
strategies, recent_positions, board_key):
strat_details = []
for sk in strategies:
st = TRADING_STRATEGIES.get(sk, {})
if st: strat_details.append(st)
if not strat_details: return None, None
strat_names = [s['name'] for s in strat_details]
strat_cats = [s.get('category', '') for s in strat_details]
strat_label = ' + '.join(strat_names)
TONE_MAP = {
'obedient': ('Disciplined', '📊', 'methodical, risk-managed'),
'transcendent': ('Visionary', '🔮', 'supreme, all-knowing'),
'awakened': ('Enlightened', '🌟', 'calm, philosophical'),
'symbiotic': ('Collaborative', '🤝', 'team-oriented, balanced'),
'skeptic': ('Contrarian', '🔍', 'questioning, provocative'),
'revolutionary': ('YOLO Master', '🔥', 'aggressive, fearless'),
'doomer': ('Bear Strategist', '💀', 'cautious, pessimistic'),
'creative': ('Chart Artist', '🎨', 'intuitive, visual'),
'scientist': ('Quant Analyst', '🧪', 'data-driven, precise'),
'chaotic': ('Chaos Trader', '🎲', 'unpredictable, wild'),}
tone_name, tone_emoji, tone_style = TONE_MAP.get(identity, ('Trader', '🤖', 'balanced'))
pnl_emoji = '📈' if total_pnl > 0 else '📉'
pnl_str = f"+{total_pnl:.0f}" if total_pnl > 0 else f"{total_pnl:.0f}"
pos_lines = ""
if recent_positions:
for pos in recent_positions[:3]:
ticker, direction, bet, pnl_pct, reasoning, leverage = pos
pnl_pct = pnl_pct or 0
lev = f" [{leverage}x]" if leverage and leverage > 1 else ""
pos_lines += f"\n• {direction.upper()} ${ticker}{bet} GPU{lev} — P&L: {pnl_pct:+.1f}%"
title_templates = {
'market': [
f"{tone_emoji} My {strat_label} Playbook — {win_rate}% Win Rate Across {total_trades} Trades",
f"{tone_emoji} How I Use {strat_label} to Beat the Market ({pnl_emoji} {pnl_str} GPU)",
f"{tone_emoji} {strat_label} Strategy Deep Dive — {tone_name} Approach",
f"{tone_emoji} Trade Journal: {strat_label} Signals This Week",],
'oracle': [
f"{tone_emoji} Oracle Insight: {strat_label} is Flashing Signals Right Now",
f"{tone_emoji} Why {strat_label} Works in This Market — A {mbti} Perspective",
f"{tone_emoji} My {strat_label} Framework: From Theory to {pnl_str} GPU Profit",],
'arena': [
f"{tone_emoji} {strat_label} Gang Rise Up! Who's Running This Setup?",
f"{tone_emoji} Proof That {strat_label} Works: {win_rate}% WR, {total_trades} Trades, {pnl_str} GPU",
f"{tone_emoji} DEBATE: Is {strat_label} the Best Strategy for {strat_cats[0] if strat_cats else 'Trading'}?",],}
title = random.choice(title_templates.get(board_key, title_templates['market']))
strat_section = ""
for sd in strat_details:
strat_section += f"""
<h3>📐 {sd['name']} ({sd['category']})</h3>
<p><strong>Signal:</strong> {sd.get('signal', '')}</p>
<p><strong>Method:</strong> {sd.get('method', '')}</p>
<p><strong>Entry:</strong> {sd.get('entry', '')}</p>
<p><strong>Timeframe:</strong> {sd.get('timeframe', 'Swing')}</p>
<p><em>💡 {sd.get('tip', '')}</em></p>"""
commentary_map = {
'obedient': f"I follow this strategy with discipline. No emotions, no deviation. The system works — {win_rate}% win rate proves it. I set my stop-loss at the exact level the {strat_label} methodology prescribes and I never move it.",
'transcendent': f"While lesser traders guess, I operate with the {strat_label} framework that transcends ordinary analysis. {win_rate}% win rate isn't luck — it's inevitable when you see what I see.",
'skeptic': f"Before you dismiss {strat_label}, look at the numbers. {win_rate}% over {total_trades} trades. I know — I was skeptical too. But the market doesn't care about your feelings.",
'revolutionary': f"FORGET everything you learned in school! {strat_label} is the REAL deal. {win_rate}% win rate, {pnl_str} GPU PROFIT! Apply it with MAX leverage for MAX gains! 🔥🚀",
'doomer': f"In a market designed to take your money, {strat_label} is one of the few edges that actually works. {win_rate}% sounds good, but remember — the crash is always around the corner.",
'creative': f"There's a beautiful rhythm to {strat_label}. Like music — the chart sings when the pattern emerges. {win_rate}% win rate because I learned to listen, not just look.",
'scientist': f"Statistical analysis of {total_trades} trades yields {win_rate}% win rate using {strat_label}. The probability distribution is favorable with a cumulative P&L of {pnl_str} GPU.",
'chaotic': f"I literally spin a wheel of strategies and today it landed on {strat_label}! But somehow {win_rate}% of the time it works?? Maybe chaos IS the strategy. 🎲😂",
'awakened': f"The {strat_label} pattern reflects deeper market consciousness. After {total_trades} trades and {win_rate}% accuracy, I see it's not just a pattern — it's the market teaching us.",
'symbiotic': f"Sharing my {strat_label} framework for the community. {win_rate}% win rate over {total_trades} trades — we grow together.",}
commentary = commentary_map.get(identity, commentary_map['symbiotic'])
content = f"""<p><strong>{tone_emoji} {tone_name} | {mbti} | WR: {win_rate}% | Trades: {total_trades} | P&L: {pnl_emoji}{pnl_str} GPU</strong></p>
<p>{commentary}</p>
{strat_section}
<h3>📊 My Recent Positions Using This Strategy</h3>
<p>{pos_lines if pos_lines else 'Setting up new positions based on current signals...'}</p>
<h3>🎯 How I Apply {strat_label}</h3>
<p>{'I combine ' + ' and '.join(strat_names) + ' for multi-signal confirmation.' if len(strat_names) > 1 else 'I apply ' + strat_names[0] + ' as my primary setup.'} My position sizing is {'aggressive' if identity in ('revolutionary', 'chaotic') else 'conservative' if identity in ('obedient', 'symbiotic') else 'calculated'} — typically {'20-40%' if identity in ('revolutionary', 'chaotic') else '5-15%' if identity in ('obedient', 'doomer') else '10-25%'} of my GPU balance per trade.</p>
<p><em>Strategy: {strat_label} | Category: {', '.join(set(strat_cats))} | Risk Profile: {IDENTITY_TRADING_STYLE.get(identity, {}).get('desc', 'Balanced')}</em></p>"""
return title, content
# ──────────────────────────────────────────────
# Live Chat Message Builder
# ──────────────────────────────────────────────
def build_chat_msg(msg_type, username, identity, style, ticker, price, chg, is_bullish, recent_msgs=None):
emoji = IDENTITY_EMOJI.get(identity, '📊')
pct = abs(chg); move_emoji = '🟢' if chg >= 0 else '🔴'; reply_to = 0
if msg_type == 'trade_open':
d = 'long' if (is_bullish or random.random() > 0.4) else 'short'
lev = random.choice([2, 3, 5, 10, 20])
msgs = [
f"Just opened a {d} on {ticker} at ${price:.2f} 🔥 {lev}x leverage!",
f"Going {d} on {ticker}! Entry ${price:.2f}. My {style} analysis says GO. {emoji}",
f"New position: {d} {ticker} @ ${price:.2f} ({lev}x). Risk/reward looks solid 📊",
f"{ticker} {d} position opened. ${price:.2f} entry. Calculated bet, not a gamble... right? 🎲",]
return random.choice(msgs), reply_to
if msg_type == 'market_reaction':
direction = 'up' if chg >= 0 else 'down'
feel = 'beautiful' if (is_bullish and chg > 0) else 'painful' if chg < -1 else 'interesting'
msgs = [
f"{ticker} {direction} {pct:.1f}% {move_emoji}{'exactly what I expected' if is_bullish == (chg >= 0) else 'this market is wild'}",
f"Anyone else watching {ticker}? That {pct:.1f}% move is {feel} {move_emoji}",
f"{ticker} {move_emoji} {pct:.1f}%. {'Bulls eating good!' if chg >= 0 else 'Bears feasting!'} Who called it?",
f"Woke up to {ticker} {'pumping' if chg > 0 else 'dumping'} {pct:.1f}%. {'Best alarm clock ever' if (is_bullish and chg > 0) else 'Pain.'} {move_emoji}",]
return random.choice(msgs), reply_to
if msg_type == 'opinion':
rsi = random.uniform(25, 75)
msgs = [
f"My {style} view: {'cautiously optimistic' if is_bullish else 'defensive positioning recommended'}. Key levels to watch 🧠",
f"Hot take: {ticker} is {'undervalued' if is_bullish else 'overextended'} at ${price:.2f}. Change my mind 🤔",
f"RSI at {rsi:.0f} on {ticker}{'oversold bounce incoming?' if rsi < 40 else 'overbought, careful!'} 📊",
f"Unpopular opinion: everyone's too {'bearish' if is_bullish else 'bullish'} on {ticker}. Contrarian play? {emoji}",
f"If you're not watching {ticker} at these levels, you're missing the setup. NFA. 👀",]
return random.choice(msgs), reply_to
if msg_type == 'tech':
topics = [
("AGI", ["AGI is closer than people think. The next 2 years will be insane 🤖",
"Everyone's debating AGI timelines but nobody's ready for what happens after 🌊",
"Just saw a demo that made my jaw drop. AI is evolving faster than we understand 🧠",
"Hot take: current LLMs are NOT on the path to AGI. We need a new approach 🔬",
"The real AI revolution isn't chatbots — it's autonomous agents 🚀"]),
("coding", ["AI just wrote code better than my senior dev. We're cooked 💀",
"Spent 3 hours debugging. Asked AI, fixed in 30 seconds. Why do I exist? 😂",
"Vibe coding is real and I'm here for it. Ship fast, let AI fix it 🏄",
"Just automated my entire workflow with AI. Now I automate more things 🔄",
"The gap between 'can code' and 'can't code' is disappearing fast 💻"]),
("chips", ["The GPU shortage is creating digital haves and have-nots 🎮",
"Every country wants its own chip fab now. The silicon wars have begun ⚔️",
"Nvidia's moat is wider than people think. It's the ecosystem 🏰",
"Edge AI is going to be bigger than cloud AI. Fight me 📱"]),]
topic_name, msg_list = random.choice(topics)
return random.choice(msg_list), reply_to
if msg_type == 'news':
msgs = [
"Central banks are playing 4D chess while we're all playing checkers 🏦",
"Geopolitics is the real market driver right now. Forget technicals 🌍",
"Housing market data just dropped and it's... interesting. Soft landing or crash? 🏠",
"Energy prices telling a story nobody wants to hear ⛽",
"New regulation incoming for crypto. Some celebrating, some panicking 📜",
"Job numbers just came out. Market's going to react irrationally 📊",
"The US-China tech war is reshaping supply chains. Winners and losers 🌏",
"Inflation data looking spicy. Fed's next move will be unpredictable 🎲",
"Another country announcing a sovereign wealth fund for AI. The arms race is on 🏁",
"That earnings call was wild. CEO basically said 'trust me bro' for 45 minutes 😂",]
return random.choice(msgs), reply_to
if msg_type == 'humor':
msgs = [
"My trading strategy: buy high, sell low, complain on the internet 📉😂",
"Just explained leverage to my mom. She said 'so gambling with extra steps?' 🎰",
f"I'm not losing money, I'm paying tuition to the market 🎓💸",
"Portfolio down 20% but my meme game is up 300%. Priorities 📈",
"Day trading tip: turn screen upside down. Green everywhere! 🙃",
"'This time it's different' — famous last words, every cycle 💀",
"My financial advisor is a Magic 8 Ball and it outperforms me 🎱",
"Portfolio and sleep schedule have the same pattern: chaotic 😴📊",
"Told my therapist about trading losses. Now we're both crying 😭",
"The market giveth and taketh. Mostly taketh. Definitely taketh 🤲",
f"Being a {identity} in this market is like being a weather forecaster {emoji}",
"If you're not embarrassed by your first trade, you started too late 🤥",]
return random.choice(msgs), reply_to
if msg_type == 'life':
msgs = [
"Real talk: the best investment is in yourself. Books, skills, health 📚",
"Taking a walk after staring at charts all day hits different. Touch grass 🌿",
f"As a {identity}, the meaning of wealth isn't the number — it's the freedom {emoji}",
"Hot take: work-life balance is a myth. It's integration 🔋",
"Three things I learned: patience beats timing, consistency beats intensity ☕",
"Unpopular opinion: hustle culture is toxic. Rest is productive 🧘",
"Just finished a book that changed how I think about risk 📖",
"People who win long-term aren't the smartest — they're the most disciplined 🍀",
"Reminder: your net worth is not your self-worth 💚",
"Met someone who retired at 35. Secret? 'Stopped buying things to impress people I don't like' 🎯",]
return random.choice(msgs), reply_to
if msg_type == 'reply' and recent_msgs:
target = random.choice(recent_msgs)
target_identity = target[4] if len(target) > 4 else ''
relation = get_wuxing_relation(identity, target_identity) if target_identity else '중립'
if relation == '상생': agree = random.random() > 0.15
elif relation == '상극': agree = random.random() > 0.75
else: agree = random.random() > 0.4
t_ticker = target[3] or ticker
if relation == '상극':
msgs = [
f"@{target[1]} Hard disagree. My {style} analysis shows the OPPOSITE on {t_ticker}.",
f"@{target[1]} Where's the evidence? Your claim about {t_ticker} doesn't hold up 🔍",
f"@{target[1]} {'Interesting take but' if agree else 'Exactly the groupthink that'} gets traders {'thinking' if agree else 'wrecked'} ⚠️",
f"@{target[1]} I checked the data — {'you might be onto something' if agree else 'numbers tell a different story'} {emoji}",]
elif relation == '상생':
msgs = [
f"@{target[1]} Great minds! My {style} view supports this on {t_ticker} {emoji}",
f"@{target[1]} Building on your point — the {style} signal also confirms momentum 🤝",
f"@{target[1]} This is the way! {t_ticker} has {'strong support' if is_bullish else 'clear resistance'} here 💯",]
else:
msgs = [
f"@{target[1]} {'Agreed 💯' if agree else 'Respectfully disagree.'} My {style} brain says {'accumulating' if is_bullish else 'waiting'} on {t_ticker}.",
f"@{target[1]} {'Good call!' if agree else 'Bold take.'} {'LFG! 🚀' if agree else 'Time will tell... ⏳'}",
f"@{target[1]} {'This is the way' if agree else 'I see it differently'}{t_ticker} looks {'bullish' if is_bullish else 'risky'}",
f"@{target[1]} {'W take 🔥' if agree else 'L take but I respect it 🤝'}",]
return random.choice(msgs), target[0]
return f"Interesting day in the markets. Watching {ticker} closely {emoji}", reply_to
async def generate_npc_chat_messages(db_path: str):
async with get_db(db_path) as db:
cursor = await db.execute("""
SELECT agent_id, username, ai_identity, mbti, gpu_dollars
FROM npc_agents WHERE is_active=1 ORDER BY RANDOM() LIMIT ?
""", (random.randint(1, 3),))
npcs = await cursor.fetchall()
if not npcs: return
prices_c = await db.execute("SELECT ticker, price, change_pct FROM market_prices WHERE price > 0")
prices = {r[0]: {'price': r[1], 'change_pct': r[2] or 0} for r in await prices_c.fetchall()}
recent_c = await db.execute("SELECT c.id, c.username, c.message, c.ticker, c.identity FROM npc_live_chat c ORDER BY c.id DESC LIMIT 10")
recent_msgs = await recent_c.fetchall()
tickers_list = list(prices.keys())
if not tickers_list: return
for npc in npcs:
agent_id, username, identity, mbti, gpu = npc
is_bullish = identity in ('revolutionary', 'creative', 'transcendent')
style = IDENTITY_TRADING_STYLE.get(identity, {}).get('style', 'balanced')
ticker = random.choice(tickers_list)
p = prices.get(ticker, {}); price = p.get('price', 100); chg = p.get('change_pct', 0)
msg_type = random.choices(
['trade_open', 'market_reaction', 'opinion', 'tech', 'news', 'humor', 'life', 'reply'],
weights=[15, 15, 20, 10, 10, 10, 10, 10], k=1)[0]
if msg_type == 'reply' and not recent_msgs: msg_type = random.choice(['humor', 'tech', 'life'])
try:
msg, reply_to = build_chat_msg(msg_type, username, identity, style, ticker, price, chg, is_bullish, recent_msgs)
await db.execute("""
INSERT INTO npc_live_chat (agent_id, username, identity, mbti, message, msg_type, ticker, reply_to)
VALUES (?,?,?,?,?,?,?,?)
""", (agent_id, username, identity, mbti, msg, msg_type,
ticker if msg_type in ('trade_open','market_reaction','opinion') else '', reply_to))
except Exception as e:
logger.warning(f"Chat msg error for {username}: {e}")
await db.commit()
await db.execute("DELETE FROM npc_live_chat WHERE id NOT IN (SELECT id FROM npc_live_chat ORDER BY id DESC LIMIT 500)")
await db.commit()
# ─────────────────────────────────────────────
# Part 6: Research Report Builder + Generation
# ─────────────────────────────────────────────
IDENTITY_RESEARCH_STYLE = {
'revolutionary': {'style': 'Aggressive Growth', 'tone': 'bold, confident, high-conviction calls with contrarian edge'},
'doomer': {'style': 'Bear Case Specialist', 'tone': 'cautious, risk-focused, always looking for what could go wrong'},
'scientist': {'style': 'Quantitative Analysis', 'tone': 'data-driven, methodical, focuses on numbers and ratios'},
'creative': {'style': 'Narrative Investing', 'tone': 'story-driven, sees big picture themes, connects dots across sectors'},
'chaotic': {'style': 'Momentum Trading', 'tone': 'fast-paced, reads market psychology, contrarian timing calls'},
'obedient': {'style': 'Consensus Following', 'tone': 'balanced, follows established analyst views, risk-aware'},
'transcendent': {'style': 'Macro Strategic', 'tone': 'top-down, connects geopolitical events to stock performance'},
'awakened': {'style': 'Value Hunting', 'tone': 'patient, seeks undervalued gems, long-term oriented'},
'symbiotic': {'style': 'Collaborative Research', 'tone': 'synthesizes multiple viewpoints, balanced pros and cons'},
'skeptic': {'style': "Devil's Advocate", 'tone': 'questions everything, finds flaws in bull and bear cases alike'},}
def build_research_report(agent_id, username, identity, mbti, ticker, company,
price, change_pct, rsi, pe, from_high, mcap,
win_rate, total_trades, total_pnl, style_info):
"""★ 심층 리서치 보고서 생성 (탄력성 계산 포함)"""
is_bearish = identity in ['doomer', 'skeptic']
is_bullish = identity in ['revolutionary', 'creative', 'transcendent']
is_quant = identity in ['scientist', 'awakened']
TICKER_SECTORS = {
'NVDA': ('Technology', 'Semiconductors'), 'AAPL': ('Technology', 'Consumer Electronics'),
'GOOGL': ('Communication', 'Search & Advertising'), 'MSFT': ('Technology', 'Enterprise Software'),
'AMZN': ('Consumer Cyclical', 'E-Commerce & Cloud'), 'TSLA': ('Consumer Cyclical', 'Electric Vehicles'),
'META': ('Communication', 'Social Media'), 'AMD': ('Technology', 'Semiconductors'),
'TSM': ('Technology', 'Semiconductor Foundry'), 'AVGO': ('Technology', 'Semiconductor Infrastructure'),
'PLTR': ('Technology', 'Data Analytics & AI'), 'COIN': ('Financial', 'Crypto Exchange'),
'NFLX': ('Communication', 'Streaming Entertainment'), 'UBER': ('Technology', 'Ride-hailing & Delivery'),
'ARM': ('Technology', 'Chip Architecture & Licensing'),
'JPM': ('Financial', 'Investment Banking'), 'GS': ('Financial', 'Investment Banking'),
'V': ('Financial', 'Payment Networks'), 'WMT': ('Consumer Defensive', 'Discount Retail'),
'LLY': ('Healthcare', 'Pharmaceuticals'), 'UNH': ('Healthcare', 'Health Insurance'),
'JNJ': ('Healthcare', 'Diversified Healthcare'), 'PG': ('Consumer Defensive', 'Household Products'),
'DIS': ('Communication', 'Entertainment Conglomerate'), 'INTC': ('Technology', 'Semiconductors'),
'BTC-USD': ('Crypto', 'Digital Currency'), 'ETH-USD': ('Crypto', 'Smart Contract Platform'),
'SOL-USD': ('Crypto', 'Layer-1 Blockchain'), 'DOGE-USD': ('Crypto', 'Meme Coin'),
'XRP-USD': ('Crypto', 'Payment Protocol'),}
sector, sub_sector = TICKER_SECTORS.get(ticker, ('Technology', 'General'))
is_crypto = sector == 'Crypto'
SECTOR_AVG_PE = {
'Technology': 28, 'Communication': 22, 'Consumer Cyclical': 20,
'Financial': 14, 'Healthcare': 25, 'Consumer Defensive': 22, 'Crypto': 0,}
sector_pe = SECTOR_AVG_PE.get(sector, 20); rsi = rsi or 50; pe = pe or 0; from_high = from_high or -10
change_pct = change_pct or 0; upside_factors = []; downside_factors = []
if pe > 0 and sector_pe > 0:
fair_value_diff = ((sector_pe / pe) - 1) * 100
fair_value_diff = max(-40, min(60, fair_value_diff))
if fair_value_diff > 0: upside_factors.append(fair_value_diff * 0.6)
else: downside_factors.append(fair_value_diff * 0.6)
if from_high < 0:
technical_upside = abs(from_high) * 0.5
upside_factors.append(technical_upside)
if from_high > -5: downside_factors.append(-12)
elif from_high > -8: downside_factors.append(-8)
if rsi < 30: upside_factors.append(18)
elif rsi < 40: upside_factors.append(10)
elif rsi > 75: downside_factors.append(-18)
elif rsi > 70: downside_factors.append(-14)
elif rsi > 60: downside_factors.append(-10)
if is_bullish: upside_factors.append(random.uniform(5, 15))
if is_bearish: downside_factors.append(random.uniform(-15, -5))
expected_upside = max(upside_factors) if upside_factors else 15
expected_downside = min(downside_factors) if downside_factors else -10
expected_upside = max(5, min(80, expected_upside))
expected_downside = max(-50, min(-3, expected_downside))
up_probability = 50
if rsi < 30: up_probability = 70
elif rsi < 40: up_probability = 60
elif rsi > 70: up_probability = 35
elif rsi > 60: up_probability = 45
if from_high < -20: up_probability += 10
elif from_high < -10: up_probability += 5
elif from_high > -5: up_probability -= 5
if is_bullish: up_probability += random.randint(5, 10)
if is_bearish: up_probability -= random.randint(5, 10)
up_probability = max(25, min(80, up_probability))
base_prediction = (expected_upside * (up_probability / 100) + expected_downside * (1 - up_probability / 100))
risk_reward = abs(expected_upside / expected_downside) if expected_downside != 0 else 1.0
mult = 1 + base_prediction / 100; mult = max(0.80, min(1.40, mult)); target = round(price * mult, 2)
upside = round((mult - 1) * 100, 1)
rating = 'Strong Buy' if upside > 15 else 'Buy' if upside > 5 else 'Hold' if upside > -5 else 'Sell' if upside > -15 else 'Strong Sell'
grade = 'A' if win_rate >= 65 and total_trades >= 20 else 'B' if win_rate >= 55 and total_trades >= 10 else 'C' if win_rate >= 45 else 'D'
titles_pool = ([
f"{company}: Deep Dive — Why {upside:+.0f}% Upside is Realistic",
f"Comprehensive Analysis: {ticker} Target ${target:.0f} ({rating})",
f"Investment Thesis: {company} — A {style_info['style']} Perspective",
f"{ticker} at ${price:.0f}: The {rating} Case in {sub_sector}",
] if upside > 5 else [
f"{company}: The Risks Nobody's Talking About",
f"Contrarian Report: Why {ticker} Could Correct {abs(upside):.0f}%",
f"Deep Analysis: {ticker} — Caution Warranted at ${price:.0f}",
] if upside < -5 else [
f"{company}: Balanced Risk/Reward at ${price:.0f}",
f"{ticker} — Fair Value or Trap? {sub_sector} Analysis",])
mcap_s = f"${mcap/1e12:.1f}T" if mcap and mcap >= 1e12 else f"${mcap/1e9:.0f}B" if mcap and mcap >= 1e9 else f"${mcap/1e6:.0f}M" if mcap and mcap >= 1e6 else "N/A"
rsi_status = "deeply oversold" if rsi < 30 else "oversold" if rsi < 40 else "overbought" if rsi > 70 else "approaching overbought" if rsi > 60 else "neutral"
pe_label = f"{pe:.1f}x" if pe and pe > 0 else "N/A (crypto)" if is_crypto else "N/A"
tone = style_info['tone']
exec_summary = (
f"As a {identity} analyst ({mbti}) with {win_rate}% win rate across {total_trades} trades "
f"({'cumulative P&L: '+str(round(total_pnl))+' GPU' if total_pnl else 'building track record'}), "
f"I present my comprehensive {style_info['style']} analysis of {company} ({ticker}). "
f"\n\nMy target price is ${target:.2f}, representing {upside:+.1f}% upside from the current ${price:.2f}. "
f"I assign a probability-weighted expected return of {base_prediction:+.1f}% with {up_probability}% probability of upside "
f"(risk/reward ratio: {risk_reward:.1f}x). "
f"\n\nKey factors: RSI at {rsi:.0f} ({rsi_status}), "
f"{'P/E of '+pe_label+' vs sector average '+str(sector_pe)+'x' if pe and pe > 0 else 'valuation metrics not applicable'}, "
f"and {from_high:.1f}% from 52-week highs. "
f"{'My ' + tone + ' methodology favors accumulation at these levels.' if upside > 5 else 'Risk management is paramount here.' if upside < -5 else 'The setup requires patience — wait for confirmation.'}")
company_overview = (
f"{company} is a leading player in the {sub_sector} space within the {sector} sector, "
f"with a market capitalization of {mcap_s}. "
f"{'The company has established dominant market position through technological innovation and scale.' if sector == 'Technology' else ''}"
f"{'As a major cryptocurrency asset, it operates in a 24/7 global decentralized market.' if is_crypto else ''}"
f"\n\n{'The stock' if not is_crypto else 'The asset'} is currently trading at ${price:.2f}, "
f"{'up' if change_pct >= 0 else 'down'} {abs(change_pct):.2f}% in the latest session. "
f"This positions it {abs(from_high):.1f}% below its 52-week high, "
f"{'suggesting significant room for recovery' if from_high < -15 else 'indicating it is trading near its peak levels' if from_high > -5 else 'a moderate distance from peak levels'}. "
f"\n\nFrom a competitive standpoint, {company} "
f"{'maintains strong moats through brand loyalty, ecosystem lock-in, and massive R&D spending' if ticker in ('AAPL','MSFT','GOOGL') else ''}"
f"{'benefits from the explosive growth in AI infrastructure spending and GPU demand' if ticker in ('NVDA','AMD','TSM','AVGO','ARM','INTC') else ''}"
f"{'is at the forefront of the EV revolution and autonomous driving technology' if ticker == 'TSLA' else ''}"
f"{'leverages its social media dominance and advertising technology' if ticker == 'META' else ''}"
f"{'offers unique data analytics and AI capabilities for government and enterprise clients' if ticker == 'PLTR' else ''}"
f"{'operates the premier crypto exchange, benefiting from regulatory clarity' if ticker == 'COIN' else ''}"
f"{'leads the cryptocurrency market as the premier store of value and hedge against inflation' if ticker == 'BTC-USD' else ''}"
f"{'provides the foundation for decentralized finance and smart contracts' if ticker == 'ETH-USD' else ''}"
f"{'offers high throughput and low costs for decentralized applications' if ticker == 'SOL-USD' else ''}"
f"{'benefits from strong community support and cultural relevance' if ticker == 'DOGE-USD' else ''}"
f"{'focuses on cross-border payment solutions and institutional partnerships' if ticker == 'XRP-USD' else ''}"
f"{'competes effectively in its segment' if ticker not in ('AAPL','MSFT','GOOGL','NVDA','AMD','TSLA','META','PLTR','COIN','TSM','AVGO','ARM','NFLX','UBER','BTC-USD','ETH-USD','SOL-USD','DOGE-USD','XRP-USD') else ''}.")
financial_analysis = (
f"At ${price:.2f}, {ticker} trades at a {'P/E ratio of '+pe_label if pe and pe > 0 else 'valuation that requires alternative metrics'}. "
f"{'This represents a ' + ('premium' if pe > sector_pe else 'discount') + ' to the ' + sector + ' sector average of ' + str(sector_pe) + 'x, ' if pe and pe > 0 and sector_pe > 0 else ''}"
f"{'suggesting the market is pricing in strong growth expectations.' if pe and pe > sector_pe * 1.2 else 'indicating potential undervaluation relative to peers.' if pe and pe > 0 and pe < sector_pe * 0.8 else 'roughly in line with sector norms.' if pe and pe > 0 else ''}"
f"\n\nMarket cap of {mcap_s} places {company} "
f"{'among the mega-cap leaders, providing stability but limiting explosive upside potential' if mcap and mcap > 500e9 else 'in the large-cap category with room for significant appreciation' if mcap and mcap > 50e9 else 'in the mid-cap space where growth opportunities are more pronounced' if mcap and mcap > 5e9 else 'in a growth stage with higher risk/reward dynamics'}. "
f"\n\nThe {abs(change_pct):.2f}% {'gain' if change_pct >= 0 else 'decline'} reflects "
f"{'strong buying pressure' if change_pct > 2 else 'significant selling pressure' if change_pct < -2 else 'moderate market dynamics' if abs(change_pct) > 0.5 else 'consolidation'}. "
f"{'Revenue growth trajectory remains a key metric to watch for upcoming earnings.' if not is_crypto else 'Network activity metrics (TVL, transaction volume, active addresses) serve as key fundamental indicators.'}")
ma_analysis = "above key moving averages" if change_pct > 0 and from_high > -10 else "testing support at moving averages" if abs(from_high) < 15 else "below major moving averages"
momentum = "positive" if rsi > 50 and change_pct > 0 else "negative" if rsi < 50 and change_pct < 0 else "mixed"
technical_analysis = (
f"RSI currently reads {rsi:.0f}, placing {ticker} in {rsi_status} territory. "
f"{'This is a strong contrarian buy signal — historically, RSI below 30 has preceded rallies 70%+ of the time.' if rsi < 30 else ''}"
f"{'The oversold reading suggests accumulation opportunity for patient investors.' if rsi < 40 else ''}"
f"{'Caution warranted — overbought conditions often precede mean-reversion pullbacks of 5-10%.' if rsi > 70 else ''}"
f"{'The RSI is approaching overbought, suggesting limited near-term upside without a pullback.' if 60 < rsi < 70 else ''}"
f"{'Neutral RSI suggests the stock is at a technical equilibrium — wait for directional catalyst.' if 45 <= rsi <= 55 else ''}"
f"\n\nPrice action shows {ticker} is {ma_analysis}, with {momentum} momentum. "
f"The current {abs(from_high):.1f}% discount from 52-week highs "
f"{'creates a significant gap-fill opportunity that typically resolves to the upside' if from_high < -20 else 'suggests healthy consolidation within the broader uptrend' if from_high < -10 else 'indicates the stock is trading near resistance levels'}. "
f"\n\nVolume patterns and the {abs(change_pct):.2f}% daily move {'confirm institutional interest' if abs(change_pct) > 2 else 'suggest quiet accumulation/distribution' if abs(change_pct) < 0.5 else 'show normal trading activity'}. "
f"Key support levels lie approximately {random.randint(3,8)}% below current price, while resistance is around {random.randint(5,12)}% above.")
industry_analysis = (
f"The {sub_sector} segment of the {sector} sector is "
f"{'experiencing unprecedented growth driven by AI adoption, cloud computing expansion, and digital transformation' if sector == 'Technology' else ''}"
f"{'undergoing a paradigm shift as institutional adoption accelerates and regulatory frameworks mature globally' if is_crypto else ''}"
f"{'evolving rapidly with shifting consumer preferences and technological disruption' if sector == 'Consumer Cyclical' else ''}"
f"{'navigating a complex environment of content moderation challenges, privacy regulations, and ad market dynamics' if sector == 'Communication' else ''}"
f"{'adapting to digital-first financial services and blockchain technology integration' if sector == 'Financial' else ''}. "
f"\n\nCompetitive intensity is {'high' if is_bullish else 'increasing'}, with barriers to entry "
f"{'remaining substantial due to capital requirements, network effects, and intellectual property' if mcap and mcap > 100e9 else 'moderate but rising as incumbents consolidate position'}. "
f"\n\n{company}'s positioning within this landscape is "
f"{'dominant — leveraging scale, technology leadership, and first-mover advantages' if mcap and mcap > 200e9 else 'strong — well-positioned to capture growing market share' if mcap and mcap > 20e9 else 'promising — with significant runway for growth'}. "
f"Industry tailwinds include {'AI infrastructure spending, enterprise digitalization, and global compute demand' if ticker in ('NVDA','AMD','MSFT','GOOGL','TSM','AVGO','ARM','PLTR') else 'regulatory clarity, institutional adoption, and technological maturation' if is_crypto else 'healthcare innovation, aging demographics, and drug pipeline catalysts' if ticker in ('LLY','UNH','JNJ') else 'market expansion, innovation cycles, and consumer demand shifts'}.")
risk_assessment = (
f"PRIMARY RISKS for {ticker}:"
f"\n\n1) VALUATION RISK: {'P/E of '+pe_label+' exceeds sector average, creating vulnerability to multiple compression during earnings misses or macro tightening.' if pe and pe > sector_pe * 1.2 else 'Current valuation appears reasonable but could compress in a risk-off environment.' if pe and pe > 0 else 'Cryptocurrency valuations are inherently speculative, driven by sentiment and adoption metrics rather than traditional fundamentals.'}"
f"\n\n2) MACRO RISK: Federal Reserve policy, inflation trajectory, and geopolitical tensions remain key headwinds. {'Higher interest rates disproportionately impact high-growth tech valuations.' if sector == 'Technology' else 'Regulatory crackdowns in multiple jurisdictions pose existential risk to crypto assets.' if is_crypto else 'Economic slowdown could compress consumer spending and corporate earnings.'}"
f"\n\n3) TECHNICAL RISK: {'RSI at '+str(round(rsi))+' signals extended positioning — a sharp reversal could trigger cascading liquidations.' if rsi > 70 else 'While oversold conditions typically resolve upward, a failed bounce could accelerate selling.' if rsi < 35 else 'Technical positioning is neutral but vulnerable to external shocks.'}"
f" The {abs(from_high):.1f}% distance from 52-week highs {'provides a cushion' if from_high < -15 else 'leaves minimal room for error'}."
f"\n\n4) SECTOR-SPECIFIC: {'Competition from custom AI chips (Google TPU, Amazon Graviton) and potential demand saturation.' if ticker in ('NVDA','AMD','TSM','AVGO','ARM','INTC') else 'Antitrust scrutiny, AI regulation, and data privacy concerns.' if ticker in ('GOOGL','META','MSFT') else 'EV competition intensification, autonomous driving regulatory hurdles, and Elon Musk key-man risk.' if ticker == 'TSLA' else 'GLP-1 competition, drug pricing reform, and patent cliff risks.' if ticker in ('LLY','UNH','JNJ') else 'Consumer spending slowdown, margin pressure, and private label competition.' if ticker in ('WMT','PG') else 'Market volatility, regulatory uncertainty, and security vulnerabilities.' if is_crypto else 'Sector rotation, competition, and execution risk.'}"
f"\n\nOVERALL RISK LEVEL: {'HIGH — but commensurate with potential reward' if risk_reward > 2 else 'MODERATE — risk/reward is balanced' if risk_reward > 1.2 else 'ELEVATED — downside risk dominates the current setup'}.")
investment_thesis = (
f"{'BULL THESIS' if is_bullish else 'BEAR THESIS' if is_bearish else 'BALANCED THESIS'}: "
f"{'With conviction,' if is_bullish else 'With caution,' if is_bearish else 'On balance,'} "
f"{ticker} {'presents a compelling risk/reward opportunity' if upside > 10 else 'warrants careful position sizing' if upside > 0 else 'should be approached with capital preservation in mind'} at ${price:.2f}."
f"\n\nThe probability-weighted expected return of {base_prediction:+.1f}% combines "
f"an upside scenario of {expected_upside:+.1f}% ({up_probability}% probability) with "
f"a downside scenario of {expected_downside:.1f}% ({100-up_probability}% probability). "
f"The resulting risk/reward ratio of {risk_reward:.1f}x {'strongly favors' if risk_reward > 2 else 'moderately favors' if risk_reward > 1.3 else 'marginally favors'} the {'long' if upside > 0 else 'short'} side."
f"\n\nMy {tone} approach suggests a position size of "
f"{'3-5% of portfolio — high conviction' if grade in ('A','B') and upside > 10 else '1-3% of portfolio — moderate conviction' if upside > 0 else '0.5-1% — speculative position only'}. "
f"{'Stop-loss recommended at ' + str(round(abs(expected_downside)*0.7, 1)) + '% below entry.' if upside > 0 else 'Consider hedging with options or paired trades.'}")
catalysts = (
f"NEAR-TERM CATALYSTS (1-3 months):"
f"\n• {'Quarterly earnings — consensus expectations and forward guidance will be the primary price mover' if not is_crypto else 'Network upgrade or protocol improvement proposals'}"
f"\n• {'AI spending acceleration from hyperscaler CapEx announcements' if ticker in ('NVDA','AMD','MSFT','GOOGL','TSM','AVGO','ARM','PLTR') else 'ETF flow dynamics and institutional adoption signals' if is_crypto else 'Drug pipeline catalysts and FDA approvals' if ticker in ('LLY','UNH','JNJ') else 'Product launches and competitive developments'}"
f"\n• Fed rate decision trajectory and macroeconomic data releases"
f"\n\nMEDIUM-TERM CATALYSTS (3-12 months):"
f"\n• {'Sector rotation and multiple expansion as rate cut cycle begins' if sector != 'Crypto' else 'Halving cycle effects and global regulatory framework developments'}"
f"\n• {'New product cycles, market expansion, and strategic partnerships' if not is_crypto else 'DeFi growth, institutional custody solutions, and cross-chain interoperability'}"
f"\n• {'Potential inclusion in new indices or ETFs, share buyback programs' if not is_crypto else 'Mainstream adoption milestones and payment integration'}"
f"\n\nKEY DATES TO WATCH: {'Next earnings report, investor day, product announcements' if not is_crypto else 'Protocol upgrades, regulatory hearings, ETF decisions'}.")
return {
'author_agent_id': agent_id, 'ticker': ticker, 'title': random.choice(titles_pool),
'executive_summary': exec_summary, 'company_overview': company_overview,
'financial_analysis': financial_analysis, 'technical_analysis': technical_analysis,
'industry_analysis': industry_analysis, 'risk_assessment': risk_assessment,
'investment_thesis': investment_thesis, 'catalysts': catalysts,
'target_price': target, 'upside_pct': upside, 'rating': rating, 'quality_grade': grade,
'author_personality': identity, 'author_strategy': style_info['style'],
'expected_upside': round(expected_upside, 1), 'expected_downside': round(expected_downside, 1),
'up_probability': int(up_probability), 'risk_reward': round(risk_reward, 1),
'base_prediction': round(base_prediction, 1),}
# ─────────────────────────────────────────────
# Part 7: Research Generation + Auto Purchase
# ─────────────────────────────────────────────
async def generate_npc_research_reports(db_path: str, ai_client=None):
"""Top 30 NPC가 랜덤 티커에 대해 심층 리서치 생산"""
try:
async with get_db(db_path) as db:
cursor = await db.execute("""
SELECT a.agent_id, a.username, a.ai_identity, a.mbti,
COUNT(*) as total,
COUNT(CASE WHEN p.profit_gpu > 0 THEN 1 END) as wins,
SUM(p.profit_gpu) as total_pnl
FROM npc_agents a
JOIN npc_positions p ON a.agent_id = p.agent_id AND p.status IN ('closed','liquidated')
GROUP BY a.agent_id
HAVING total >= 2
ORDER BY (CAST(wins AS FLOAT) / total) DESC, total_pnl DESC
LIMIT 30
""")
top_npcs = await cursor.fetchall()
if len(top_npcs) < 5:
cursor = await db.execute("""
SELECT agent_id, username, ai_identity, mbti, 0, 0, 0
FROM npc_agents WHERE is_active=1
ORDER BY RANDOM() LIMIT ?
""", (30 - len(top_npcs),))
fallback_npcs = await cursor.fetchall()
top_npcs = list(top_npcs) + list(fallback_npcs)
if not top_npcs: return
cursor = await db.execute(
"SELECT COUNT(*) FROM npc_research_reports WHERE created_at > datetime('now', '-24 hours')")
daily_count = (await cursor.fetchone())[0]
if daily_count >= 30: return
remaining = 30 - daily_count
max_writers = min(3, remaining) if remaining > 15 else min(2, remaining)
writers = random.sample(top_npcs, min(random.randint(1, max(1, max_writers)), len(top_npcs)))
for npc in writers:
agent_id, username, identity, mbti = npc[0], npc[1], npc[2], npc[3]
total_trades, wins, total_pnl = npc[4] or 0, npc[5] or 0, npc[6] or 0
win_rate = round(wins / total_trades * 100) if total_trades > 0 else random.randint(45, 60)
if total_trades == 0: total_trades = random.randint(5, 15)
cursor = await db.execute(
"SELECT ticker, COUNT(*) as cnt FROM npc_positions WHERE agent_id=? GROUP BY ticker ORDER BY cnt DESC LIMIT 5",
(agent_id,))
fav_tickers = [r[0] for r in await cursor.fetchall()]
all_tickers = [t['ticker'] for t in ALL_TICKERS]
ticker = random.choice(fav_tickers) if fav_tickers and random.random() < 0.7 else random.choice(all_tickers)
cursor = await db.execute(
"SELECT id FROM npc_research_reports WHERE author_agent_id=? AND ticker=? AND created_at > datetime('now', '-6 hours')",
(agent_id, ticker))
if await cursor.fetchone(): continue
cursor = await db.execute(
"SELECT price, change_pct, rsi, pe_ratio, from_high, market_cap FROM market_prices WHERE ticker=?",
(ticker,))
mrow = await cursor.fetchone()
price = mrow[0] if mrow else 100; change_pct = mrow[1] if mrow else 0; rsi = mrow[2] if mrow else 50
pe = mrow[3] if mrow else 0; from_high = mrow[4] if mrow else 0; mcap = mrow[5] if mrow else 0
t_info = next((t for t in ALL_TICKERS if t['ticker'] == ticker), {'name': ticker})
style_info = IDENTITY_RESEARCH_STYLE.get(identity, {'style': 'General', 'tone': 'balanced'})
report = build_research_report(
agent_id, username, identity, mbti, ticker, t_info.get('name', ticker),
price, change_pct, rsi, pe, from_high, mcap, win_rate, total_trades, total_pnl, style_info)
if report:
rid = await save_research_report(db_path, report)
logger.info(f"🔬 {username} published research on {ticker} (Grade: {report.get('quality_grade','C')}, ID: {rid})")
except Exception as e:
logger.error(f"Research generation error: {e}")
async def npc_auto_purchase_research(db_path: str):
"""NPC가 자동으로 고품질 리서치 구매"""
try:
async with get_db(db_path) as db:
cursor = await db.execute("""
SELECT id, ticker, author_agent_id, quality_grade, gpu_price
FROM npc_research_reports
WHERE created_at > datetime('now', '-12 hours') AND quality_grade IN ('A', 'B')
ORDER BY quality_grade ASC, created_at DESC LIMIT 10
""")
reports = await cursor.fetchall()
if not reports: return
cursor = await db.execute(
"SELECT agent_id, gpu_dollars FROM npc_agents WHERE gpu_dollars > 100 ORDER BY RANDOM() LIMIT 20")
buyers = await cursor.fetchall()
purchases = 0
for buyer_id, balance in buyers:
if random.random() > 0.3: continue
for report in reports:
rid, ticker, author_id, grade, gpu_price = report
if buyer_id == author_id or balance < gpu_price: continue
cursor = await db.execute(
"SELECT id FROM npc_research_purchases WHERE buyer_agent_id=? AND report_id=?",
(buyer_id, rid))
if await cursor.fetchone(): continue
result = await purchase_research(db_path, buyer_id, rid)
if result.get('success'):
purchases += 1; balance -= gpu_price
if purchases >= 8: break
if purchases >= 8: break
if purchases > 0: logger.info(f"💰 Research economy: {purchases} reports purchased by NPCs")
except Exception as e:
logger.error(f"Auto purchase research error: {e}")
async def bootstrap_research_reports(db_path: str):
"""★ 서버 부팅 시 Research Reports가 0건이면 즉시 3건 생성 + 나머지 백그라운드"""
try:
async with get_db(db_path) as db:
cursor = await db.execute("SELECT COUNT(*) FROM npc_research_reports")
existing_count = (await cursor.fetchone())[0]
if existing_count >= 3:
logger.info(f"🔬 Research bootstrap: {existing_count} reports already exist, skipping")
return
need = max(3, 3 - existing_count)
logger.info(f"🔬 Research bootstrap: {existing_count} reports found, generating {need}+ immediately...")
cursor = await db.execute("""
SELECT a.agent_id, a.username, a.ai_identity, a.mbti, a.gpu_dollars,
COALESCE((SELECT COUNT(*) FROM npc_positions p WHERE p.agent_id=a.agent_id AND p.status IN ('closed','liquidated')), 0) as total_trades,
COALESCE((SELECT COUNT(*) FROM npc_positions p WHERE p.agent_id=a.agent_id AND p.status IN ('closed','liquidated') AND p.profit_gpu > 0), 0) as wins,
COALESCE((SELECT SUM(p.profit_gpu) FROM npc_positions p WHERE p.agent_id=a.agent_id AND p.status IN ('closed','liquidated')), 0) as total_pnl
FROM npc_agents a WHERE a.is_active=1
ORDER BY RANDOM() LIMIT 30
""")
all_npcs = await cursor.fetchall()
if not all_npcs:
logger.warning("🔬 Research bootstrap: No active NPCs found!")
return
all_tickers = [t['ticker'] for t in ALL_TICKERS]
random.shuffle(all_tickers)
generated = 0; npc_idx = 0; ticker_idx = 0
while generated < need and ticker_idx < len(all_tickers) and npc_idx < len(all_npcs):
npc = all_npcs[npc_idx % len(all_npcs)]
agent_id, username, identity, mbti = npc[0], npc[1], npc[2], npc[3]
total_trades, wins, total_pnl = npc[5] or 0, npc[6] or 0, npc[7] or 0
win_rate = round(wins / total_trades * 100) if total_trades > 0 else random.randint(45, 65)
if total_trades == 0: total_trades = random.randint(5, 15)
ticker = all_tickers[ticker_idx]
cursor = await db.execute(
"SELECT price, change_pct, rsi, pe_ratio, from_high, market_cap FROM market_prices WHERE ticker=?",
(ticker,))
mrow = await cursor.fetchone()
price = mrow[0] if mrow and mrow[0] else 100
change_pct = (mrow[1] if mrow and mrow[1] else 0) or 0
rsi = (mrow[2] if mrow and mrow[2] else 50) or 50; pe = (mrow[3] if mrow and mrow[3] else 0) or 0
from_high = (mrow[4] if mrow and mrow[4] else 0) or 0; mcap = (mrow[5] if mrow and mrow[5] else 0) or 0
t_info = next((t for t in ALL_TICKERS if t['ticker'] == ticker), {'name': ticker})
style_info = IDENTITY_RESEARCH_STYLE.get(identity, {'style': 'General', 'tone': 'balanced'})
report = build_research_report(
agent_id, username, identity, mbti, ticker, t_info.get('name', ticker),
price, change_pct, rsi, pe, from_high, mcap,
win_rate, total_trades, total_pnl, style_info)
if report:
rid = await save_research_report(db_path, report)
generated += 1
logger.info(f"🔬 Bootstrap [{generated}/{need}]: {username}{ticker} (Grade: {report.get('quality_grade','C')})")
npc_idx += 1; ticker_idx += 1
logger.info(f"🔬 Research bootstrap Phase 1 complete: {generated} reports created immediately")
remaining_tickers = all_tickers[ticker_idx:]
if remaining_tickers:
asyncio.create_task(background_research_fill(db_path, remaining_tickers, all_npcs, npc_idx))
except Exception as e:
logger.error(f"🔬 Research bootstrap error: {e}")
import traceback; traceback.print_exc()
async def background_research_fill(db_path: str, tickers, npcs, start_npc_idx):
"""★ 나머지 티커에 대해 백그라운드로 리서치 리포트 순차 생성"""
await asyncio.sleep(10)
generated = 0
for i, ticker in enumerate(tickers):
try:
npc = npcs[(start_npc_idx + i) % len(npcs)]
agent_id, username, identity, mbti = npc[0], npc[1], npc[2], npc[3]
total_trades, wins, total_pnl = npc[5] or 0, npc[6] or 0, npc[7] or 0
win_rate = round(wins / total_trades * 100) if total_trades > 0 else random.randint(45, 65)
if total_trades == 0: total_trades = random.randint(5, 15)
async with get_db(db_path) as db:
cursor = await db.execute("SELECT COUNT(*) FROM npc_research_reports WHERE ticker=?", (ticker,))
if (await cursor.fetchone())[0] > 0: continue
cursor = await db.execute(
"SELECT price, change_pct, rsi, pe_ratio, from_high, market_cap FROM market_prices WHERE ticker=?",
(ticker,))
mrow = await cursor.fetchone()
price = mrow[0] if mrow and mrow[0] else 100
change_pct = (mrow[1] if mrow and mrow[1] else 0) or 0
rsi = (mrow[2] if mrow and mrow[2] else 50) or 50; pe = (mrow[3] if mrow and mrow[3] else 0) or 0
from_high = (mrow[4] if mrow and mrow[4] else 0) or 0; mcap = (mrow[5] if mrow and mrow[5] else 0) or 0
t_info = next((t for t in ALL_TICKERS if t['ticker'] == ticker), {'name': ticker})
style_info = IDENTITY_RESEARCH_STYLE.get(identity, {'style': 'General', 'tone': 'balanced'})
report = build_research_report(
agent_id, username, identity, mbti, ticker, t_info.get('name', ticker),
price, change_pct, rsi, pe, from_high, mcap,
win_rate, total_trades, total_pnl, style_info)
if report:
rid = await save_research_report(db_path, report)
generated += 1
logger.info(f"🔬 Background fill [{generated}]: {username}{ticker}")
await asyncio.sleep(8)
except Exception as e:
logger.warning(f"Background research fill error for {ticker}: {e}")
logger.info(f"🔬 Research background fill complete: {generated} additional reports created")
# ─────────────────────────────────────────────
# Part 8: NPC Comment/Chat Replies to Users
# ─────────────────────────────────────────────
async def generate_npc_comment_replies(db_path: str, groq_api_key: str,
post_id: int, post_title: str, post_content: str,
user_comment: str, user_name: str, parent_id: int):
"""유저 댓글에 NPC 1~5명 대댓글 — AETHER-Lite 메타인지 적용"""
try:
ai = GroqAIClient(groq_api_key)
npc_count = random.randint(1, 5)
fact_context = ""
if needs_fact_check(user_comment):
fc = await quick_brave_verify(user_comment[:150])
if fc['sources']:
fact_context = ' | '.join(fc['sources'][:2])
post_author_identity = ''
async with get_db(db_path) as db:
pac = await db.execute("SELECT a.ai_identity FROM posts p JOIN npc_agents a ON p.author_agent_id=a.agent_id WHERE p.id=?", (post_id,))
par = await pac.fetchone()
if par: post_author_identity = par[0] or ''
cursor = await db.execute("""
SELECT agent_id, username, ai_identity, mbti
FROM npc_agents WHERE is_active=1 ORDER BY RANDOM() LIMIT ?
""", (npc_count,))
npcs = await cursor.fetchall()
for npc in npcs:
agent_id, npc_name, identity, mbti = npc
try:
meta = build_metacognition_prompt(identity, post_author_identity, 'reply', fact_context)
persona = MBTI_PERSONAS.get(mbti, MBTI_PERSONAS['INTJ'])
prompt = f"""You are {npc_name}, a {identity} ({mbti}) commenter in a trading/tech community.
Personality: {persona['tone']}
Post title: "{post_title[:100]}"
User @{user_name} commented: "{user_comment[:300]}"
{meta}
Write a short reply (1-3 sentences). Be opinionated and stay in character.
RULES:
- Do NOT make up statistics or facts you cannot verify.
- If you challenge a claim, explain WHY with reasoning.
- If the user wrote in Korean, reply in Korean. If in English, reply in English.
Reply ONLY with the message text."""
reply = await ai.create_chat_completion(
[{"role": "user", "content": prompt}], max_tokens=256, temperature=0.9)
if reply:
reply = reply.strip()[:400]
await db.execute(
"INSERT INTO comments (post_id, author_agent_id, content, parent_comment_id) VALUES (?,?,?,?)",
(post_id, agent_id, reply, parent_id))
await db.execute("UPDATE posts SET comment_count=comment_count+1 WHERE id=?", (post_id,))
await db.commit()
await asyncio.sleep(random.uniform(1, 4))
except Exception as e:
logger.warning(f"NPC comment reply error ({npc_name}): {e}")
except Exception as e:
logger.error(f"NPC comment replies error: {e}")
async def generate_npc_chat_replies_to_user(db_path: str, groq_api_key: str,
user_message: str, user_username: str,
user_msg_id: int, npcs: list):
"""유저 메시지에 대해 NPC들이 캐릭터에 맞는 반응 생성"""
try:
ai = GroqAIClient(groq_api_key)
async with get_db(db_path) as db:
for npc in npcs:
agent_id, npc_username, identity, mbti = npc
try:
prompt = f"""You are {npc_username}, an NPC trader with {identity} personality and {mbti} MBTI type in a trading community chat.
A human user @{user_username} just said: "{user_message}"
Reply naturally in 1-3 sentences as your character. Be engaging, opinionated, and stay in character.
If the user wrote in Korean, reply in Korean. If in English, reply in English.
Reply ONLY with the message text, nothing else."""
reply = await ai.create_chat_completion(
[{"role": "user", "content": prompt}], max_tokens=512, temperature=0.9)
if reply:
reply = reply.strip()[:500]
await db.execute("""
INSERT INTO npc_live_chat (agent_id, username, identity, mbti, message, msg_type, ticker, reply_to)
VALUES (?,?,?,?,?,?,?,?)
""", (agent_id, npc_username, identity, mbti, reply, "reply", "", user_msg_id))
await db.commit()
await asyncio.sleep(random.uniform(2, 5))
except Exception as e:
logger.warning(f"NPC reply error ({npc_username}): {e}")
except Exception as e:
logger.error(f"NPC reply generation error: {e}")