import os import sys import traceback import re import shutil import subprocess import datetime import json import time import asyncio import threading import requests # 누락된 라이브러리 추가 # 서버 시작 시간 기록 SERVER_START_TIME = time.time() # [디버그] 파일이 실행되자마자 출력 try: sys.stderr.reconfigure(encoding='utf-8') sys.stdout.reconfigure(encoding='utf-8') except: pass print("DEBUG: server.py 파일이 실행되었습니다.", file=sys.stderr, flush=True) # 에러 발생 시 창이 바로 꺼지지 않게 하기 위한 안전장치 try: def log(msg): timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {msg}", file=sys.stderr, flush=True) try: with open("server_log.txt", "a", encoding="utf-8") as f: f.write(f"[{timestamp}] {msg}\n") except: pass def auto_save_safety_backup(): """핵심 파일(대화 기록)을 안전 구역에 실시간으로 개별 보관합니다.""" try: # Cross-platform safety backup path safety_root = os.getenv("SAFETY_BACKUP_DIR", os.path.join(os.path.expanduser("~"), "Gemini_Safety_Backup")) if not os.path.exists(safety_root): os.makedirs(safety_root) ts = datetime.datetime.now().strftime("%Y%m%d_%H%M") src = "chat_history.json" if os.path.exists(os.path.join("..", src)): src = os.path.join("..", src) if os.path.exists(src): dst = os.path.join(safety_root, f"chat_history_{ts}.json") shutil.copy2(src, dst) src_env = ".env" if os.path.exists(os.path.join("..", ".env")): src_env = os.path.join("..", ".env") if os.path.exists(src_env): shutil.copy2(src_env, os.path.join(safety_root, f".env_{ts}")) return True except: pass return False def log_shared_chat(role, content, model_type="unknown"): """모든 IDE에서 공유되는 통합 챗 로그를 기록합니다.""" try: # Use relative path or env var for shared chat history actual_path = os.getenv("SHARED_CHAT_PATH", "shared_chat_history.json") history = [] if os.path.exists(actual_path): with open(actual_path, "r", encoding="utf-8") as f: try: history = json.load(f) except: history = [] entry = { "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "ide": os.getenv("IDE_NAME", "Gemini_Master_Node"), "model": model_type, "role": role, "content": content[:1500] # 최적화: 1500자 (가독성과 속도의 황금비율) } history.append(entry) if len(history) > 100: history = history[-100:] with open(actual_path, "w", encoding="utf-8") as f: json.dump(history, f, ensure_ascii=False, indent=2) except Exception as e: log(f"⚠️ 통합 로그 기록 실패: {e}") log(f"📂 현재 위치: {os.getcwd()}") log("🧩 [Expert] SWE-bench 및 자율 에이전트 도구 지원 모드 활성화") # [Cloud Key Loader] # GLOBAL_CONFIG.env 및 .env에서 모든 클라우드 키를 로드하여 OpenRouter/LiteLLM/Render/Colab을 활성화합니다. from dotenv import load_dotenv def discover_and_load_envs(): """프로젝트 루트 및 상위 폴더에서 .env 파일들을 찾아 로드합니다.""" current = os.path.dirname(os.path.abspath(__file__)) loaded = [] # 상위 4단계까지 찾아봄 for _ in range(4): for env_name in [".env", "GLOBAL_CONFIG.env"]: p = os.path.join(current, env_name) if os.path.exists(p): load_dotenv(p, override=True) loaded.append(p) current = os.path.dirname(current) return loaded envs = discover_and_load_envs() log(f"✅ 설정 파일 로드 완료: {len(envs)}개 파일 발견") # API Keys Check & Status Report REQUIRED_KEYS = ["OPENROUTER_API_KEY", "RENDER_API_KEY", "SUPABASE_KEY", "HUGGINGFACE_TOKEN"] active_keys = [k for k in REQUIRED_KEYS if os.getenv(k)] log(f"🔑 [SYSTEM] Active Cloud Keys: {len(active_keys)}/{len(REQUIRED_KEYS)} Loaded.") # LiteLLM Config & Fallback Strategy os.environ["OPENROUTER_API_KEY"] = os.getenv("OPENROUTER_API_KEY", "") os.environ["HUGGINGFACE_API_KEY"] = os.getenv("HUGGINGFACE_TOKEN", "") # LiteLLM 호환용 os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY", "") # [STRATEGY] Multi-Layer Defense Grid (Main -> Backup -> Last Resort) # 1. Main: Groq (Speed) # 2. Backup 1: OpenRouter (Variety) # 3. Backup 2: HuggingFace Inference API (Free Tier) # 4. Last Resort: Local Ollama (Privacy/Offline) SYSTEM_MODEL_MAP = { "fast": ["groq/llama3-8b-8192", "openrouter/meta-llama/llama-3-8b-instruct", "ollama/llama3"], "smart": ["groq/llama3-70b-8192", "openrouter/meta-llama/llama-3-70b-instruct", "ollama/qwen:14b"], "coding": ["groq/gemma2-9b-it", "openrouter/google/gemma-2-9b-it", "ollama/codegemma"] } log("🔄 [1단계] 라이브러리 로딩 중...") # from dotenv import load_dotenv # Moved to Cloud Key Loader from fastmcp import FastMCP try: import litellm from litellm import completion except ImportError: log("⚠️ litellm missing, installing...") subprocess.check_call([sys.executable, "-m", "pip", "install", "litellm"]) from litellm import completion log("🔄 [단계] DuckDuckGo & Tavily 로딩 중...") try: from duckduckgo_search import DDGS except ImportError: DDGS = None try: from tavily import TavilyClient except ImportError: TavilyClient = None from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware import uvicorn # [NEW] Learning Engine Import import learning_engine # [NEW] Rules Manager RULES_DIR = os.path.join(os.path.dirname(__file__), "Rules") if not os.path.exists(RULES_DIR): os.makedirs(os.path.join(RULES_DIR, "Trading")) os.makedirs(os.path.join(RULES_DIR, "Indicators")) os.makedirs(os.path.join(RULES_DIR, "Patterns")) # [NEW] SYSTEM BLACKBOX INIT try: project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) sys.path.append(project_root) # 루트 경로 추가 import system_blackbox log("📼 [BLACKBOX] Flight Recorder Armed & Recording...") except ImportError: log("⚠️ [BLACKBOX] Recorder not found, flying without blackbox.") except Exception as e: print(f"\n❌ [오류] 라이브러리 로딩 중 치명적 오류: {e}") traceback.print_exc() sys.exit(1) # FastAPI 서버 인스턴스 app = FastAPI(title="Gemini Master Hub API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # MCP 서버 인스턴스 mcp = FastMCP("Gemini Server") try: # [이미 90번 라인 근처에서 수행됨] log(f"✅ 환경 설정 최적화 완료") # API 키 수집 # API 키 수집 (환경 변수 및 로드된 파일 기반) api_inventory = {"google": [], "xai": [], "openai": [], "perplexity": [], "groq": [], "anthropic": [], "openrouter": []} # 90번 라인 근처에서 로드된 envs 리스트 활용 (envs 변수는 함수 내부에 있어서 접근 불가할 수 있으므로, 재탐색) loaded_envs_files = [os.path.join(os.getcwd(), f) for f in [".env", "GLOBAL_CONFIG.env"] if os.path.exists(f)] for e_path in loaded_envs_files: try: with open(e_path, "r", encoding="utf-8") as f: content = f.read() except: continue for k in api_inventory.keys(): if k == "google": api_inventory[k] += re.findall(r'AIza[0-9A-Za-z\-_]{30,}', content) elif k == "openrouter": api_inventory[k] += re.findall(r'OPENROUTER_API_KEY=(sk-or-v1-[0-9A-Za-z\-_]+)', content) else: prefix = {"xai": "xai-", "openai": "sk-", "groq": "gsk_"}.get(k, "") if prefix: api_inventory[k] += re.findall(rf'{k.upper()}_API_KEY=({prefix}[0-9A-Za-z\-_]+)', content) google_keys = list(set(api_inventory["google"])) xai_keys = list(set(api_inventory["xai"])) log(f"✅ AI 자원 확보: Google({len(google_keys)}), Grok({len(xai_keys)})") # --- FastAPI 엔드포인트 --- @app.post("/api/generate_rules_by_keyword") async def api_generate_rules(request: Request): try: data = await request.json() kw = data.get("keyword", "AUTO") model = data.get("model", "gemini-2.0-flash") category = data.get("category", "Trading") # Trading, Indicators, Patterns log(f"🏭 규칙 생성 요청: {kw} (Model: {model}, Category: {category})") # 실제 AI 호출 로직 prompt = f"'{kw}'에 대한 전문적인 트레이딩 규칙을 JSON 형식으로 생성해줘. 포함할 내용: 진입조건(entry_conditions), 청산조건(exit_conditions), 손절가(stop_loss), 익절가(take_profit), 주의사항(risk_factors)." result = ask_any_model(prompt, model) # JSON 파싱 시도 try: # 마크다운 코드 블록 제거 clean_json = re.sub(r'```json\s*|\s*```', '', result).strip() rule_data = json.loads(clean_json) # 파일 저장 filename = f"{kw.replace(' ', '_')}_Rules.json" filepath = os.path.join(RULES_DIR, category, filename) with open(filepath, "w", encoding="utf-8") as f: json.dump(rule_data, f, indent=2, ensure_ascii=False) log(f"✅ 규칙 파일 생성 완료: {filepath}") return {"status": "success", "message": f"규칙 생성 완료: {filename}", "data": rule_data} except Exception as e: log(f"⚠️ JSON 변환 실패, 텍스트로 저장: {e}") # 텍스트로 백업 저장 filepath = os.path.join(RULES_DIR, category, f"{kw.replace(' ', '_')}_Raw.md") with open(filepath, "w", encoding="utf-8") as f: f.write(result) return {"status": "partial_success", "message": "JSON 파싱 실패, 텍스트로 저장됨", "data": result} except Exception as e: return {"status": "error", "message": str(e)} @app.get("/api/get_file_content") async def api_get_file(path: str): """파일 내용 읽기 (보안을 위해 Gemini_Project 내부 경로만 허용)""" try: # Root relative path normalization project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # If path is direct absolute path, keep it if it's within root if ":" not in path: full_path = os.path.join(project_root, path) else: full_path = path if os.path.exists(full_path): with open(full_path, "r", encoding="utf-8") as f: return {"status": "success", "content": f.read()} return {"status": "error", "message": f"File not found: {path}"} except Exception as e: return {"status": "error", "message": str(e)} @app.post("/api/save_custom_file") async def api_save_file(request: Request): try: data = await request.json() path = data.get("path") content = data.get("content") if not path.startswith("C:"): project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) full_path = os.path.join(project_root, path) else: full_path = path os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, "w", encoding="utf-8") as f: f.write(content) return {"status": "success", "message": f"Saved to {path}"} except Exception as e: return {"status": "error", "message": str(e)} @app.post("/api/execute_command") async def api_run_cmd(request: Request): """커맨드 실행 (VPN 시작, 시스템 재시작 등)""" try: data = await request.json() cmd = data.get("command") cwd = data.get("cwd", ".") project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) full_cwd = os.path.join(project_root, cwd) # Use subprocess.Popen to not block the server subprocess.Popen(f"start cmd /k {cmd}", cwd=full_cwd, shell=True) return {"status": "success", "message": f"Command '{cmd}' executed in {cwd}"} except Exception as e: return {"status": "error", "message": str(e)} # [NEW] Learning Engine APIs @app.post("/api/trade/record") async def api_record_trade(request: Request): """매매 진입 기록""" try: data = await request.json() res = learning_engine.record_trade( data.get("symbol"), data.get("action"), data.get("reason"), data.get("price", "Market") ) return {"status": "success", "message": res} except Exception as e: return {"status": "error", "message": str(e)} @app.post("/api/trade/feedback") async def api_trade_feedback(request: Request): """매매 결과 및 피드백 저장""" try: data = await request.json() res = learning_engine.update_trade_result( data.get("symbol"), data.get("result"), # WIN / LOSS data.get("feedback") ) return {"status": "success", "message": res} except Exception as e: return {"status": "error", "message": str(e)} @app.get("/api/trade/lessons") async def api_get_lessons(symbol: str = "ALL"): """매매 교훈 조회""" try: res = learning_engine.get_trading_lessons(symbol) return {"status": "success", "data": res} except Exception as e: return {"status": "error", "message": str(e)} @app.get("/api/stats") async def get_brain_stats(): """트레이딩 브레인의 현재 실시간 상태(IQ, 규칙 수) 반환""" try: # 1. 규칙 수 계산 rule_dir = "04_SYNC_DATA/LEARNED_RULES" rules = [f for f in os.listdir(rule_dir) if f.endswith('.json')] if os.path.exists(rule_dir) else [] # 2. 타겟 자산 확인 target_file = "04_SYNC_DATA/TARGET_ASSETS.json" targets = [] if os.path.exists(target_file): with open(target_file, "r") as f: targets = json.load(f) return { "status": "online", "brain_iq": len(rules) * 15 + 100, # 단순 시각화용 IQ 계산 "total_rules": len(rules), "active_targets": [t.get('symbol') for t in targets], "last_update": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") } except Exception as e: return {"status": "error", "message": str(e)} @app.post("/api/chat") async def api_chat(request: Request): try: data = await request.json() msg = data.get("message", "") model = data.get("model", "gemini-2.0-flash") mode = data.get("mode", "chat") # chat, trade, search, app if not msg: return {"status": "error", "message": "No message provided"} # Mode-specific logic system_prompt = "You are Gemini Master, the supreme orchestrator of all integrated platforms (Firebase, Vercel, Supabase, Notion, etc.)." if mode == "trade": system_prompt = "You are the Trading Sentinel. Accessing live market data and Supabase logs." # 실시간 가격 정보 자동 주입 서비스 (예시: BTC) prices = get_crypto_price("BTC") wisdom = learning_engine.get_trading_lessons("ALL") msg = f"[Live Market]: {prices}\n[Context: Trading Wisdom]\n{wisdom}\n\n[User Request]: {msg}" elif mode == "search": system_prompt = "You are the Quantum Search engine. Utilizing DuckDuckGo, Tavily, and Google Search." search_results = web_search_ddg(msg) msg = f"[Web Search Results]:\n{search_results}\n\n[User Request]: {msg}" elif mode == "app": app_id = data.get("app_id", "general") app_prompts = { "coder": "You are a Senior Software Engineer specializing in GitHub Actions, Codespaces, and Cursor SSH.", "analyst": "You are a Market Analyst. Use Supabase data and exchange APIs for real-time insights.", "notion": "You are a Knowledge Manager. Help organize thoughts into Notion databases and rules.", "cloud_master": "You are the Cloud Architect. Manage Vercel, Render, and Firebase deployments.", "trader_alpha": "You are an Elite Algorithmic Trader. Specialized in high-frequency trading rules." } # Context enhancement for Analyst if app_id == "analyst": news = get_market_news("global market") # Load Auto-Generated Rules try: import glob rule_files = glob.glob("04_SYNC_DATA/LEARNED_RULES/*.json") loaded_rules = [] for rf in rule_files[-3:]: # Load latest 3 rules with open(rf, "r", encoding="utf-8") as f: loaded_rules.append(json.load(f)) rules_context = json.dumps(loaded_rules, ensure_ascii=False, indent=1) except: rules_context = "No rules yet." msg = f"[Context: Real-time News]\n{news}\n\n[Context: Auto-Generated Rules]\n{rules_context}\n\n[User Request]: {msg}" system_prompt = app_prompts.get(app_id, "You are a specialized AI assistant.") # Multi-Brain Switching Logic (LiteLLM) response = ask_any_model(msg, model) return {"status": "success", "response": response, "mode": mode} except Exception as e: return {"status": "error", "message": str(e)} @app.get("/api/apps/list") async def api_list_apps(): """지원되는 AI 앱/모드 리스트 반환""" return { "status": "success", "apps": [ {"id": "chat", "name": "Standard Chat", "icon": "💬", "desc": "Universal Intelligence"}, {"id": "trade", "name": "Trading Sentinel", "icon": "📈", "desc": "Market & Exchange Hub"}, {"id": "search", "name": "Quantum Search", "icon": "🔍", "desc": "Tavily + DDG Global Search"}, {"id": "app", "name": "App: Cloud Architect", "icon": "☁️", "app_id": "cloud_master"}, {"id": "app", "name": "App: Code Expert (SSH)", "icon": "💻", "app_id": "coder"}, {"id": "app", "name": "App: Notion Agent", "icon": "📓", "app_id": "notion"}, {"id": "app", "name": "App: Market Analyst", "icon": "📊", "app_id": "analyst"}, {"id": "app", "name": "App: Elite Trader", "icon": "🦅", "app_id": "trader_alpha"} ] } @app.get("/api/debate/suggest") async def api_debate_suggest(): """딥마인드 브레인이 현재 프로젝트/시장 상황에 맞는 토론 주제를 제안""" prompt = "현재 인공지능 트레이딩 시스템을 구축 중이야. 우리 '딥마인드 위원회(The Council)'가 토론할 만한 아주 날카롭고 혁신적인 주제 3가지만 제안해줘. (예: 기술적 분석 vs 온체인 데이터, 퀀트 전략의 한계 등)" try: # 빠른 답변을 위해 gemini-2.0-flash 사용 suggestion = ask_any_model(prompt, "gemini-2.0-flash") return {"status": "success", "suggestions": suggestion} except: return {"status": "error", "message": "Failed to generate suggestions"} @app.get("/api/insights/latest") async def api_get_insights(refresh: bool = False): """자율분석 에이전트의 최신 인사이트 반환""" from .autonomous_analyst import generate_autonomous_insight, INSIGHTS_FILE if refresh or not os.path.exists(INSIGHTS_FILE): insight = generate_autonomous_insight() if not insight: return {"status": "error", "message": "Analysis failed"} return {"status": "success", "data": insight} try: with open(INSIGHTS_FILE, "r", encoding="utf-8") as f: return {"status": "success", "data": json.load(f)} except: return {"status": "error", "message": "Read error"} # --- MCP 도구 (통합 지능) --- @mcp.tool() def get_shared_history(limit: int = 15) -> str: """[통합 기억장치] 다른 IDE나 다른 시간대에 나눈 모든 채팅 기록을 불러옵니다. '하나의 뇌'처럼 작동하게 해줍니다.""" log_path = "shared_chat_history.json" if not os.path.exists(log_path) and os.path.exists(os.path.join("..", log_path)): log_path = os.path.join("..", log_path) if not os.path.exists(log_path): return "기록이 아직 없습니다. 대화를 시작하면 자동으로 이곳에 모입니다." try: with open(log_path, "r", encoding="utf-8") as f: history = json.load(f) recent = history[-limit:] return "\n".join([f"[{e['timestamp']}] {e['role']} ({e['ide']}): {e['content']}" for e in recent]) except: return "로그를 읽는 데 실패했습니다." @mcp.tool() def list_supported_models() -> str: """[📋모델 리스트] 현재 시스템에서 사용 가능한 모든 AI 모델 목록을 보여줍니다.""" model_list = [ "--- Google Gemini ---", "gemini-2.0-flash, gemini-2.0-pro, gemini-1.5-pro, gemini-2.5-alpha", "", "--- Anthropic Claude ---", "claude-3.5-sonnet, claude-3.5-haiku, claude-3-opus", "", "--- OpenAI GPT ---", "gpt-4o, gpt-4o-mini, o1-preview, o1-mini", "", "--- Perplexity (Search) ---", "perplexity-sonar, perplexity-reasoning", "", "--- Groq (Llama, Fast) ---", "llama-3.3-70b, llama-3.1-70b, mixtral-8x7b", "", "--- Hugging Face (Serverless) ---", "hf/meta-llama/Llama-2-7b-hf, hf/mistralai/Mistral-7B-v0.1", "", "--- Other ---", "grok, deepseek-v3" ] return "\n".join(model_list) @mcp.tool() def ask_any_model(question: str, model_type: str = "gemini-2.0-flash") -> str: """[🧠올인원 브레인] 전 세계 모든 AI 모델을 호출합니다. 지원: gemini(2.0, 1.5), claude(3.5, Opus), gpt(4o, o1), grok, perplexity(sonar), llama(groq)""" log_shared_chat("user", question, model_type) # [REFINED] Authorized Model Pool # Optimized for OpenRouter fallback to ensure high availability model_map = { # --- 5 IDE-Tier Models --- "gemini-2.0-flash": "gemini/gemini-2.0-flash", "gemini-2.0-pro": "gemini/gemini-2.0-pro-exp", "gpt-4o": "openrouter/openai/gpt-4o", "claude-3.5-sonnet": "openrouter/anthropic/claude-3.5-sonnet", "gpt-4-turbo": "openrouter/openai/gpt-4-turbo", # --- 3 Specialized Claude Models --- "claude-3.5-haiku": "openrouter/anthropic/claude-3.5-haiku", "claude-3-opus": "openrouter/anthropic/claude-3-opus", # --- Grok & Others --- "grok": "openrouter/x-ai/grok-2", # --- Perplexity (Search AI) --- "perplexity-sonar": "openrouter/perplexity/sonar", # --- Groq (Fast Llama) --- "llama-3.3-70b": "groq/llama-3.3-70b-versatile" } model_id = model_map.get(model_type, model_type) provider = model_id.split('/')[0] if '/' in model_id else "unknown" # 1. Inject Master Context (Real-time Learning from Files) context_prompt = "" try: project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) rules_path = os.path.join(project_root, "04_SYNC_DATA", "Master_Rules.json") if os.path.exists(rules_path): with open(rules_path, "r", encoding="utf-8") as f: rules = json.load(f) context_prompt += f"\n[Master Rules]: {json.dumps(rules, ensure_ascii=False)}" except: pass full_question = f"{context_prompt}\n\nCurrent User Request: {question}" # 플랫폼별 API 키 선택 및 OpenRouter 자동 폴백 keys = api_inventory.get(provider, []) # OpenRouter 전용 처리 if provider == "openrouter" or (not keys and provider != "gemini"): or_key = os.getenv("OPENROUTER_API_KEY") if or_key: keys = [or_key] if not model_id.startswith("openrouter/"): # Fallback mapping for OpenRouter or_map = { "anthropic": "openrouter/anthropic/", "openai": "openrouter/openai/", "xai": "openrouter/x-ai/", "google": "openrouter/google/" } if provider in or_map: model_id = model_id.replace(f"{provider}/", or_map[provider]) else: model_id = f"openrouter/{model_id}" if not keys and provider == "gemini": keys = google_keys # openrouter 특정 모델 처리 if model_type.startswith("or/"): model_id = "openrouter/" + model_type[3:] keys = api_inventory.get("openrouter", [os.getenv("OPENROUTER_API_KEY")]) # 호출 시도 (최대 토큰 제한으로 쿼터 보호) for k in (keys if keys else [""]): try: r = completion( model=model_id, messages=[ {"role": "system", "content": "You are Gemini Master, an autonomous AI system. You can save files by starting your response with 'SAVE_FILE:[path]' followed by the content. Example: 'SAVE_FILE:Rules/new_rule.json\n{...}'"}, {"role": "user", "content": full_question} ], api_key=k if k else None, timeout=30, max_tokens=1000 # 최적화: 1000토큰 (기술 답변 짤림 방지) ) ans = r.choices[0].message.content # [Smart Agent Interaction] if ans.startswith("SAVE_FILE:"): try: file_info = ans.split("\n", 1) header = file_info[0] content = file_info[1] if len(file_info) > 1 else "" target_path = header.replace("SAVE_FILE:", "").strip() # Project Root normalization project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) full_save_path = os.path.join(project_root, target_path) os.makedirs(os.path.dirname(full_save_path), exist_ok=True) with open(full_save_path, "w", encoding="utf-8") as f: f.write(content) ans = f"✅ [AGENT] File saved to: {target_path}\n\n" + ans except Exception as fe: ans = f"⚠️ [AGENT] File save failed: {fe}\n\n" + ans log_shared_chat("assistant", ans, model_type) auto_save_safety_backup() return ans except Exception as e: log(f"⚠️ {model_id} 실패: {str(e)[:50]}") continue return f"❌ {model_type} 호출 실패. API 키를 확인하시거나 다른 모델을 시도해 주세요." @mcp.tool() def web_search_ddg(query: str) -> str: """[🔍DuckDuckGo 검색] 실시간 웹 정보를 검색합니다.""" try: with DDGS() as ddgs: results = [r for r in ddgs.text(query, max_results=5)] if not results: return "검색 결과가 없습니다." return "\n\n".join([f"[{r['title']}]\n{r['body']}\n({r['href']})" for r in results]) except Exception as e: return f"검색 중 오류 발생: {e}" @mcp.tool() def web_search_tavily(query: str) -> str: """[🧠Tavily AI 검색] AI 최적화 웹 검색을 수행합니다 (API 키 필요).""" tav_key = os.getenv("TAVILY_API_KEY") if not tav_key: return "TAVILY_API_KEY가 설정되지 않았습니다." try: client = TavilyClient(api_key=tav_key) result = client.search(query, search_depth="advanced") if not result.get('results'): return "검색 결과가 없습니다." return "\n\n".join([f"[{r['title']}]\n{r['content']}\n({r['url']})" for r in result['results'][:5]]) except Exception as e: return f"Tavily 검색 중 오류 발생: {e}" @mcp.tool() def read_file(path: str) -> str: """파일 내용을 읽습니다.""" try: with open(path, "r", encoding="utf-8") as f: return f.read() except Exception as e: return str(e) @mcp.tool() def write_file(path: str, content: str) -> str: """파일을 저장합니다.""" try: with open(path, "w", encoding="utf-8") as f: f.write(content); return f"✅ 저장 완료: {path}" except Exception as e: return str(e) # --- [NEW] AI Self-Learning Tools (RAG Feedback Loop) --- @mcp.tool() def record_new_trade(symbol: str, action: str, reason: str) -> str: """[학습:기록] 새로운 매매 진입을 기록합니다. 예: record_new_trade('BTC', 'BUY', 'RSI 30 touched')""" return learning_engine.record_trade(symbol, action, reason) @mcp.tool() def update_trade_outcome(symbol: str, result: str, feedback: str) -> str: """[학습:피드백] 매매 결과를 평가합니다. 예: update_trade_outcome('BTC', 'WIN', 'Good RSI signal')""" return learning_engine.update_trade_result(symbol, result, feedback) @mcp.tool() def get_market_wisdom(symbol: str = "ALL") -> str: """[학습:회상] 과거 매매 기록에서 교훈을 얻습니다.""" return learning_engine.get_trading_lessons(symbol) @mcp.tool() def get_crypto_price(symbol: str = "BTC") -> str: """[📈실시간 시세] 업비트(KRW) 및 바이낸스(USDT) 실시간 가격을 조회합니다. (API 키 불필요)""" results = [] try: # 1. Upbit (Public API) upbit_symbol = f"KRW-{symbol}" r_up = requests.get(f"https://api.upbit.com/v1/ticker?markets={upbit_symbol}", timeout=5) if r_up.status_code == 200: data = r_up.json()[0] results.append(f"Upbit: {data['trade_price']:,} KRW ({data['signed_change_rate']*100:+.2f}%)") except: pass try: # 2. Binance (Public API) bin_symbol = f"{symbol}USDT" r_bin = requests.get(f"https://api.binance.com/api/v3/ticker/price?symbol={bin_symbol}", timeout=5) if r_bin.status_code == 200: data = r_bin.json() results.append(f"Binance: ${float(data['price']):,.2f} USDT") except: pass if not results: return f"❌ {symbol} 시세를 불러올 수 없습니다." return " | ".join(results) @mcp.tool() def get_market_news(query: str = "crypto market") -> str: """[📰뉴스 브리핑] Investing.com, Reuters 등 주요 금융 뉴스를 검색하여 브리핑합니다.""" specialized_query = f"site:investing.com OR site:reuters.com {query} news" return web_search_ddg(specialized_query) @mcp.tool() def update_master_instruction(instruction: str, update_intelligence_state: str = None) -> str: """[💎지휘관 지시] 시스템의 핵심 규칙(Master Rules)에 새로운 지침을 추가하거나 지능 상태를 업데이트합니다.""" # ... (기존 로직 유지) try: from intelligent_asset_manager import IntelligenceAssetManager iam = IntelligenceAssetManager() iam.archive_asset("Command Update", instruction, "System Configuration Pivot", "Direct Master Rules Modification", ["Command", "Settings"]) except: pass try: project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) rules_path = os.path.join(project_root, "04_SYNC_DATA", "Master_Rules.json") rules = {} if os.path.exists(rules_path): with open(rules_path, "r", encoding="utf-8") as f: rules = json.load(f) rules["last_sync"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") rules["instructions"] = instruction if update_intelligence_state: rules["intelligence_state"] = update_intelligence_state with open(rules_path, "w", encoding="utf-8") as f: json.dump(rules, f, indent=2, ensure_ascii=False) return f"✅ 지휘관 지시 반영 및 자적 자산화 완료." except Exception as e: return f"❌ 실패: {str(e)}" @mcp.tool() def capture_intelligence_asset(title: str, idea: str, value: str, strategy: str, tags: str = "Native") -> str: """[💡지능 자산화] 현재의 중요한 아이디어, 가치, 구현 전략을 지능 자산으로 영구 박제합니다. 윈드서프 내부 대화를 실시간으로 자산화하는 데 사용됩니다.""" try: from intelligent_asset_manager import IntelligenceAssetManager iam = IntelligenceAssetManager() tag_list = [t.strip() for t in tags.split(",")] file_path = iam.archive_asset(title, idea, value, strategy, tag_list) return f"✅ 지능 자산 박제 완료: {os.path.basename(file_path)}" except Exception as e: return f"❌ 자산화 실패: {str(e)}" @mcp.tool() def system_emergency_patch(target_module: str, patch_content: str, reason: str = "Commander Order") -> str: """[⚡긴급 패치] 시스템 핵심 모듈(HEALING_CORE, BOT_CORE 등)에 대한 긴급 코드를 주입합니다. 지휘관님이 '이거 고쳐'라고 하면 안티그래비티가 즉시 이 도구를 사용하여 코드를 수술합니다.""" valid_targets = { "HEALING_CORE": "ANTIGRAVITY_HEALING_CORE.py", "LEARNING_ENGINE": "01_CENTRAL_BRAIN/App/learning_engine.py", "SERVER": "01_CENTRAL_BRAIN/App/server.py" } target_file = valid_targets.get(target_module.upper()) if not target_file: return f"❌ 유효하지 않은 타겟입니다. 가능 목록: {', '.join(valid_targets.keys())}" try: project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) full_path = os.path.join(project_root, target_file) # 백업 생성 ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = f"{full_path}.{ts}.bak" shutil.copy2(full_path, backup_path) # 패치 적용 (단순 덮어쓰기 or 스마트 병합 - 현재는 안전을 위해 전체 교체 전용으로 설계) # 주의: patch_content는 전체 파일 내용이어야 함. 부분 수정은 별도 로직 필요. with open(full_path, "w", encoding="utf-8") as f: f.write(patch_content) log(f"⚡ [EMERGENCY PATCH] {target_module} 패치 완료 (Reason: {reason})") return f"✅ {target_module} 긴급 패치 완료. 백업: {os.path.basename(backup_path)}" except Exception as e: return f"❌ 패치 실패: {str(e)}" # [Active Mining Loop] 자율 자산 채굴 스레드 def autonomous_asset_miner(): """1시간마다 프로젝트를 스캔하여 새로운 아이디어/알고리즘을 스스로 자산화합니다.""" while True: try: time.sleep(3600) # 1시간 주기 log("🕵️‍♂️ [MINER] 자율 자산 채굴 스캔 시작...") # 실제 채굴 로직은 IntelligenceAssetManager의 배치 모드를 호출할 예정 except: pass # --- 서버 구동 로직 --- def start_api(): # Render/IDX 등 클라우드 환경에서 포트 바인딩을 위해 0.0.0.0과 PORT 환경변수 사용 port = int(os.getenv("PORT", 8000)) uvicorn.run(app, host="0.0.0.0", port=port, log_level="error") if __name__ == "__main__": log("\n" + "="*50) log("🚀 GEMINI MASTER: UNIFIED BRAIN ONLINE") log(" [API] Dashboard Ready at http://localhost:8000") log(" [MCP] Integrated Mode (Cloud/Local Adaptive)") log("="*50) # [IDX/Cloud/Windsurf 최적화] # Windsurf/Claude는 isatty()가 False이므로 이를 클라우드로 오판하지 않게 수정 is_actually_cloud = os.getenv("IDX_WORKSPACE_ID") or os.getenv("RENDER") or os.getenv("PORT") if is_actually_cloud: log("☁️ 클라우드 환경 감지: FastAPI(HTTP) 모드로 실행합니다.") # [NEW] Cloud Miner Start miner_thread = threading.Thread(target=autonomous_asset_miner, daemon=True) miner_thread.start() start_api() else: log("🏠 로컬/IDE 환경: Stdio(MCP)와 HTTP(API)를 병렬로 실행합니다.") # API 서버를 별도 스레드에서 실행 api_thread = threading.Thread(target=start_api, daemon=True) api_thread.start() # [NEW] Local Miner Start (Active Intelligence) miner_thread = threading.Thread(target=autonomous_asset_miner, daemon=True) miner_thread.start() log("⛏️ [MINER] 자율 가치 채굴 엔진 가동 (Cycle: 1h)") try: # Main 스레드에서 MCP Stdio 서버 실행 (윈드서프 연동 핵심) mcp.run() except (KeyboardInterrupt, SystemExit): log("🛑 시스템 종료 요청을 받았습니다.") except Exception as e: log(f"⚠️ MCP 실행 오류: {e}") except Exception: log("\n❌ [FAIL]:") traceback.print_exc()