|
|
""" |
|
|
AETHER Proto-AGI v2.2 - Core Infrastructure Module |
|
|
AI/데이터 인프라, 유틸리티, 외부 서비스 통합 |
|
|
""" |
|
|
|
|
|
import json |
|
|
import sqlite3 |
|
|
import hashlib |
|
|
import numpy as np |
|
|
import requests |
|
|
from datetime import datetime, timedelta |
|
|
from dataclasses import dataclass, field, asdict |
|
|
from typing import Optional, List, Dict, Any, Generator, Tuple |
|
|
from enum import Enum |
|
|
from pathlib import Path |
|
|
from itertools import product |
|
|
import os |
|
|
import re |
|
|
import random |
|
|
import shutil |
|
|
from urllib.parse import urlparse, urljoin |
|
|
import time as time_module |
|
|
import tempfile |
|
|
|
|
|
try: |
|
|
from bs4 import BeautifulSoup |
|
|
HAS_BS4 = True |
|
|
except ImportError: |
|
|
HAS_BS4 = False |
|
|
print("⚠️ beautifulsoup4 미설치. pip install beautifulsoup4") |
|
|
|
|
|
try: |
|
|
from groq import Groq |
|
|
HAS_GROQ = True |
|
|
except ImportError: |
|
|
HAS_GROQ = False |
|
|
print("⚠️ groq 미설치. pip install groq") |
|
|
|
|
|
try: |
|
|
import PyPDF2 |
|
|
HAS_PYPDF2 = True |
|
|
except ImportError: |
|
|
HAS_PYPDF2 = False |
|
|
print("⚠️ PyPDF2 미설치. pip install PyPDF2") |
|
|
|
|
|
try: |
|
|
from docx import Document as DocxDocument |
|
|
HAS_DOCX = True |
|
|
except ImportError: |
|
|
HAS_DOCX = False |
|
|
print("⚠️ python-docx 미설치. pip install python-docx") |
|
|
|
|
|
|
|
|
|
|
|
PERSISTENT_DIR = "/data" |
|
|
LOCAL_FALLBACK_DIR = "./data" |
|
|
BACKUP_INTERVAL_MINUTES = 30 |
|
|
|
|
|
VECTOR_DIM = 384 |
|
|
MAX_CONCURRENT_USERS = 10 |
|
|
MAX_QUEUE_SIZE = 30 |
|
|
STATUS_UPDATE_RATE = 10 |
|
|
|
|
|
MEMORY_CONFIG = { |
|
|
"short_term": {"max_items": 50, "ttl_hours": 24}, |
|
|
"mid_term": {"max_items": 200, "ttl_days": 30}, |
|
|
"long_term": {"max_items": 1000, "ttl_days": 365} |
|
|
} |
|
|
|
|
|
BRAVE_SEARCH_PURPOSES = { |
|
|
"土": {"purpose": "목표 관련 최신 동향 및 전체 맥락", "query_prefix": "latest trends"}, |
|
|
"金": {"purpose": "팩트체크 및 반론 근거 검증", "query_prefix": "fact check verify"}, |
|
|
"水": {"purpose": "심층 자료 조사 및 근거 수집", "query_prefix": "research data evidence"}, |
|
|
"木": {"purpose": "유사 사례 및 영감 소스 탐색", "query_prefix": "innovative examples case study"}, |
|
|
"火": {"purpose": "구현 방법 및 기술 문서 검색", "query_prefix": "how to implement tutorial"} |
|
|
} |
|
|
|
|
|
COMIC_CSS = """ |
|
|
@import url('https://fonts.googleapis.com/css2?family=Bangers&family=Comic+Neue:wght@400;700&family=Noto+Sans+KR:wght@400;700&display=swap'); |
|
|
.gradio-container { background-color: #FEF9C3 !important; background-image: radial-gradient(#1F2937 1px, transparent 1px) !important; background-size: 20px 20px !important; min-height: 100vh !important; font-family: 'Noto Sans KR', 'Comic Neue', cursive, sans-serif !important; } |
|
|
footer, .footer, .gradio-container footer, .built-with, [class*="footer"], .gradio-footer, a[href*="gradio.app"] { display: none !important; visibility: hidden !important; height: 0 !important; } |
|
|
.header-container { text-align: center; padding: 25px 20px; background: #3B82F6; border: 4px solid #1F2937; border-radius: 12px; margin-bottom: 20px; box-shadow: 8px 8px 0 #1F2937; } |
|
|
.header-title { font-family: 'Bangers', cursive !important; color: #FFF !important; font-size: 3rem !important; text-shadow: 3px 3px 0 #1F2937 !important; letter-spacing: 3px !important; margin: 0 !important; } |
|
|
.header-subtitle { font-family: 'Noto Sans KR', sans-serif !important; font-size: 1.1rem !important; color: #FEF9C3 !important; margin-top: 8px !important; font-weight: 700 !important; } |
|
|
.element-badge { display: inline-block; padding: 8px 16px; border-radius: 20px; font-size: 0.95rem; margin: 4px; font-weight: 700; border: 3px solid #1F2937; box-shadow: 3px 3px 0 #1F2937; font-family: 'Noto Sans KR', sans-serif !important; } |
|
|
.badge-earth { background: linear-gradient(135deg, #D2691E, #8B4513); color: #FFF; } |
|
|
.badge-metal { background: linear-gradient(135deg, #E8E8E8, #C0C0C0); color: #1F2937; } |
|
|
.badge-water { background: linear-gradient(135deg, #00BFFF, #1E90FF); color: #FFF; } |
|
|
.badge-wood { background: linear-gradient(135deg, #32CD32, #228B22); color: #FFF; } |
|
|
.badge-fire { background: linear-gradient(135deg, #FF6347, #FF4500); color: #FFF; } |
|
|
.gr-panel, .gr-box, .gr-form, .block, .gr-group { background: #FFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; box-shadow: 5px 5px 0 #1F2937 !important; } |
|
|
.gr-button-primary, button.primary, .gr-button.primary { background: #EF4444 !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; color: #FFF !important; font-family: 'Bangers', cursive !important; font-size: 1.2rem !important; letter-spacing: 2px !important; padding: 12px 24px !important; box-shadow: 4px 4px 0 #1F2937 !important; text-shadow: 1px 1px 0 #1F2937 !important; transition: all 0.2s !important; } |
|
|
.gr-button-primary:hover, button.primary:hover { background: #DC2626 !important; transform: translate(-2px, -2px) !important; box-shadow: 6px 6px 0 #1F2937 !important; } |
|
|
.gr-button-secondary, button.secondary { background: #10B981 !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; color: #FFF !important; font-weight: 700 !important; } |
|
|
textarea, input[type="text"], input[type="number"] { background: #FFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; color: #1F2937 !important; font-family: 'Noto Sans KR', sans-serif !important; font-weight: 700 !important; } |
|
|
textarea:focus, input[type="text"]:focus { border-color: #3B82F6 !important; box-shadow: 3px 3px 0 #3B82F6 !important; } |
|
|
.info-box { background: #FACC15 !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; padding: 12px 15px !important; margin: 10px 0 !important; box-shadow: 4px 4px 0 #1F2937 !important; font-family: 'Noto Sans KR', sans-serif !important; font-weight: 700 !important; color: #1F2937 !important; } |
|
|
.final-report-box { background: #ECFDF5 !important; border: 4px solid #10B981 !important; border-radius: 12px !important; box-shadow: 6px 6px 0 #059669 !important; padding: 5px !important; } |
|
|
.orchestration-log textarea { background: #1F2937 !important; color: #10B981 !important; font-family: 'Courier New', monospace !important; border: 3px solid #374151 !important; border-radius: 8px !important; font-size: 0.85rem !important; line-height: 1.4 !important; } |
|
|
label, .gr-input-label, .gr-block-label { color: #1F2937 !important; font-family: 'Noto Sans KR', sans-serif !important; font-weight: 700 !important; } |
|
|
.gr-accordion { background: #E0F2FE !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; box-shadow: 4px 4px 0 #1F2937 !important; } |
|
|
.tab-nav button { font-family: 'Noto Sans KR', sans-serif !important; font-weight: 700 !important; border: 2px solid #1F2937 !important; margin: 2px !important; background: #FFF !important; transition: all 0.2s !important; } |
|
|
.tab-nav button.selected { background: #3B82F6 !important; color: #FFF !important; box-shadow: 3px 3px 0 #1F2937 !important; } |
|
|
.footer-comic { text-align: center; padding: 20px; background: #3B82F6; border: 4px solid #1F2937; border-radius: 12px; margin-top: 20px; box-shadow: 6px 6px 0 #1F2937; } |
|
|
.footer-comic p { font-family: 'Noto Sans KR', sans-serif !important; color: #FFF !important; margin: 5px 0 !important; font-weight: 700 !important; } |
|
|
::-webkit-scrollbar { width: 12px; height: 12px; } |
|
|
::-webkit-scrollbar-track { background: #FEF9C3; border: 2px solid #1F2937; } |
|
|
::-webkit-scrollbar-thumb { background: #3B82F6; border: 2px solid #1F2937; border-radius: 6px; } |
|
|
::-webkit-scrollbar-thumb:hover { background: #EF4444; } |
|
|
::selection { background: #FACC15; color: #1F2937; } |
|
|
.gr-slider input[type="range"] { accent-color: #3B82F6 !important; } |
|
|
.gr-json { background: #FFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; } |
|
|
.gr-plot { background: #FFF !important; border: 3px solid #1F2937 !important; border-radius: 8px !important; box-shadow: 4px 4px 0 #1F2937 !important; } |
|
|
.huggingface-space-link, a[href*="huggingface.co/spaces"], button[class*="share"], .share-button, [class*="hf-logo"], .gr-share-btn, #hf-logo, .hf-icon, svg[class*="hf"], div[class*="huggingface"], a[class*="huggingface"], .svelte-1rjryqp, header a[href*="huggingface"], .space-header { display: none !important; visibility: hidden !important; opacity: 0 !important; pointer-events: none !important; } |
|
|
.gr-radio label { padding: 12px 16px !important; border: 2px solid #1F2937 !important; border-radius: 8px !important; margin: 4px 0 !important; background: #FFF !important; transition: all 0.2s ease !important; cursor: pointer !important; } |
|
|
.gr-radio label:hover { background: #FEF3C7 !important; transform: translateX(3px) !important; } |
|
|
.gr-radio input:checked + label { background: linear-gradient(135deg, #3B82F6, #1E40AF) !important; color: #FFF !important; box-shadow: 3px 3px 0 #1F2937 !important; } |
|
|
.gr-radio input:disabled + label { opacity: 0.5 !important; cursor: not-allowed !important; background: #E5E7EB !important; color: #9CA3AF !important; } |
|
|
.model-info-box { background: #FEF3C7 !important; border: 2px dashed #F59E0B !important; border-radius: 8px !important; padding: 8px 12px !important; font-size: 0.9rem !important; } |
|
|
.upload-box { background: #F0FDF4 !important; border: 3px dashed #10B981 !important; border-radius: 12px !important; padding: 20px !important; } |
|
|
.download-btn { background: #8B5CF6 !important; color: white !important; } |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
class UncertaintyLevel(Enum): |
|
|
HIGH = "high" |
|
|
MEDIUM = "medium" |
|
|
LOW = "low" |
|
|
|
|
|
@dataclass |
|
|
class QualityAssessment: |
|
|
factual_confidence: float |
|
|
logical_coherence: float |
|
|
completeness: float |
|
|
specificity: float |
|
|
overall_score: float |
|
|
uncertainty_flags: List[str] = field(default_factory=list) |
|
|
needs_verification: List[str] = field(default_factory=list) |
|
|
recommendations: List[str] = field(default_factory=list) |
|
|
|
|
|
@dataclass |
|
|
class GoalClarity: |
|
|
clarity_score: float |
|
|
ambiguous_terms: List[str] |
|
|
missing_context: List[str] |
|
|
suggested_clarifications: List[str] |
|
|
is_actionable: bool |
|
|
goal_type: str = "general" |
|
|
|
|
|
@dataclass |
|
|
class Memory: |
|
|
id: str |
|
|
content: str |
|
|
memory_type: str |
|
|
element: str |
|
|
goal_context: str |
|
|
importance: float |
|
|
access_count: int |
|
|
created_at: str |
|
|
last_accessed: str |
|
|
embedding: Optional[List[float]] = None |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
@dataclass |
|
|
class SearchResult: |
|
|
query: str |
|
|
element: str |
|
|
results: List[Dict] |
|
|
timestamp: str |
|
|
|
|
|
@dataclass |
|
|
class Knowledge: |
|
|
id: str |
|
|
goal: str |
|
|
query: str |
|
|
element: str |
|
|
search_results: str |
|
|
agent_output: str |
|
|
final_result: str |
|
|
embedding: List[float] |
|
|
created_at: str |
|
|
access_count: int = 0 |
|
|
|
|
|
|
|
|
|
|
|
_STORAGE_BASE_PATH = None |
|
|
|
|
|
def _try_enable_persistent_storage_via_api() -> bool: |
|
|
hf_token = os.getenv("HF_TOKEN") |
|
|
space_id = os.getenv("SPACE_ID") |
|
|
if not hf_token: |
|
|
print("⚠️ HF_TOKEN 환경변수 없음 - API 활성화 불가") |
|
|
return False |
|
|
if not space_id: |
|
|
print("⚠️ SPACE_ID 환경변수 없음 - HF Spaces 환경이 아닌 것 같습니다") |
|
|
return False |
|
|
print(f"🔄 Persistent Storage API 활성화 시도...") |
|
|
print(f" Space ID: {space_id}") |
|
|
try: |
|
|
from huggingface_hub import HfApi, SpaceStorage |
|
|
api = HfApi(token=hf_token) |
|
|
try: |
|
|
runtime = api.get_space_runtime(repo_id=space_id) |
|
|
current_storage = getattr(runtime, 'storage', None) |
|
|
if current_storage: |
|
|
print(f"✅ Persistent Storage 이미 활성화됨: {current_storage}") |
|
|
return True |
|
|
else: |
|
|
print("📦 Persistent Storage 비활성화 상태 - 활성화 시도...") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Space 상태 확인 실패: {e}") |
|
|
api.request_space_storage(repo_id=space_id, storage=SpaceStorage.SMALL) |
|
|
print("✅ Persistent Storage SMALL 활성화 요청 완료!") |
|
|
print("⏳ Space가 재시작됩니다. 잠시 후 /data 디렉토리가 생성됩니다.") |
|
|
return True |
|
|
except ImportError: |
|
|
print("⚠️ huggingface_hub 라이브러리 없음") |
|
|
return False |
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
if "already" in error_msg.lower() or "exists" in error_msg.lower(): |
|
|
print(f"✅ Persistent Storage 이미 활성화됨") |
|
|
return True |
|
|
elif "payment" in error_msg.lower() or "billing" in error_msg.lower(): |
|
|
print(f"⚠️ 결제 정보 필요: {error_msg}") |
|
|
else: |
|
|
print(f"⚠️ API 활성화 실패: {error_msg}") |
|
|
return False |
|
|
|
|
|
def _determine_storage_path() -> str: |
|
|
global _STORAGE_BASE_PATH |
|
|
if _STORAGE_BASE_PATH is not None: |
|
|
return _STORAGE_BASE_PATH |
|
|
print("\n" + "=" * 60) |
|
|
print("🔍 스토리지 초기화 중...") |
|
|
print("=" * 60) |
|
|
if os.path.exists(PERSISTENT_DIR): |
|
|
try: |
|
|
test_file = os.path.join(PERSISTENT_DIR, ".write_test") |
|
|
with open(test_file, "w") as f: |
|
|
f.write("test") |
|
|
os.remove(test_file) |
|
|
_STORAGE_BASE_PATH = PERSISTENT_DIR |
|
|
print(f"✅ HF Spaces 영구 스토리지 활성화: {PERSISTENT_DIR}") |
|
|
existing_files = [f for f in os.listdir(PERSISTENT_DIR) if f.endswith('.db') or f.endswith('.json')] |
|
|
if existing_files: |
|
|
print(f"📁 기존 파일 발견: {existing_files}") |
|
|
else: |
|
|
print("📁 기존 파일 없음 (새로운 스토리지)") |
|
|
print("=" * 60 + "\n") |
|
|
return _STORAGE_BASE_PATH |
|
|
except Exception as e: |
|
|
print(f"⚠️ /data 쓰기 테스트 실패: {e}") |
|
|
else: |
|
|
print(f"⚠️ {PERSISTENT_DIR} 디렉토리 없음") |
|
|
print("\n🚀 API로 Persistent Storage 활성화 시도...") |
|
|
if _try_enable_persistent_storage_via_api(): |
|
|
print("💡 API 요청 완료. Space 재시작 후 /data 사용 가능") |
|
|
os.makedirs(LOCAL_FALLBACK_DIR, exist_ok=True) |
|
|
_STORAGE_BASE_PATH = LOCAL_FALLBACK_DIR |
|
|
print(f"\n🟡 현재 로컬 스토리지 사용: {LOCAL_FALLBACK_DIR}") |
|
|
print(" (Persistent Storage 활성화 후 Space 재시작 필요)") |
|
|
print("=" * 60 + "\n") |
|
|
return _STORAGE_BASE_PATH |
|
|
|
|
|
def get_persistent_path(filename: str) -> str: |
|
|
base_path = _determine_storage_path() |
|
|
return os.path.join(base_path, filename) |
|
|
|
|
|
def get_storage_info() -> dict: |
|
|
base_path = _determine_storage_path() |
|
|
db_path = os.path.join(base_path, "soma_ohaeng.db") |
|
|
info = { |
|
|
"base_path": base_path, |
|
|
"is_persistent": base_path == PERSISTENT_DIR, |
|
|
"db_path": db_path, |
|
|
"db_exists": os.path.exists(db_path), |
|
|
"db_size": 0, |
|
|
"files": [] |
|
|
} |
|
|
if os.path.exists(db_path): |
|
|
info["db_size"] = os.path.getsize(db_path) |
|
|
if os.path.exists(base_path): |
|
|
info["files"] = [f for f in os.listdir(base_path) if not f.startswith('.')] |
|
|
return info |
|
|
|
|
|
def ensure_persistent_storage(): |
|
|
base_path = _determine_storage_path() |
|
|
os.makedirs(base_path, exist_ok=True) |
|
|
return base_path == PERSISTENT_DIR |
|
|
|
|
|
def migrate_to_persistent_storage(): |
|
|
base_path = _determine_storage_path() |
|
|
local_files = ["soma_ohaeng.db", "creat.json"] |
|
|
for filename in local_files: |
|
|
target_path = os.path.join(base_path, filename) |
|
|
if os.path.exists(target_path): |
|
|
size = os.path.getsize(target_path) |
|
|
print(f"📁 {filename} 이미 존재 ({size:,} bytes)") |
|
|
continue |
|
|
source_paths = [ |
|
|
filename, |
|
|
os.path.join("./data", filename), |
|
|
os.path.join("/tmp", filename), |
|
|
] |
|
|
for source_path in source_paths: |
|
|
if os.path.exists(source_path) and source_path != target_path: |
|
|
try: |
|
|
shutil.copy2(source_path, target_path) |
|
|
print(f"✅ 마이그레이션: {source_path} → {target_path}") |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"⚠️ 마이그레이션 실패 {source_path}: {e}") |
|
|
|
|
|
def backup_database(): |
|
|
db_path = get_persistent_path("soma_ohaeng.db") |
|
|
if os.path.exists(db_path): |
|
|
backup_name = f"soma_ohaeng_backup_{datetime.now().strftime('%Y%m%d_%H%M')}.db" |
|
|
backup_path = get_persistent_path(backup_name) |
|
|
try: |
|
|
shutil.copy2(db_path, backup_path) |
|
|
print(f"✅ DB 백업 완료: {backup_path}") |
|
|
cleanup_old_backups() |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"⚠️ 백업 실패: {e}") |
|
|
return False |
|
|
|
|
|
def cleanup_old_backups(): |
|
|
base_path = _determine_storage_path() |
|
|
try: |
|
|
backup_files = sorted([ |
|
|
f for f in os.listdir(base_path) |
|
|
if f.startswith("soma_ohaeng_backup_") and f.endswith(".db") |
|
|
], reverse=True) |
|
|
for old_backup in backup_files[5:]: |
|
|
try: |
|
|
os.remove(os.path.join(base_path, old_backup)) |
|
|
print(f"🗑️ 오래된 백업 삭제: {old_backup}") |
|
|
except: |
|
|
pass |
|
|
except: |
|
|
pass |
|
|
|
|
|
def verify_db_persistence(): |
|
|
db_path = get_persistent_path("soma_ohaeng.db") |
|
|
print(f"\n🔍 DB 영속성 검증:") |
|
|
print(f" 경로: {db_path}") |
|
|
print(f" 존재: {os.path.exists(db_path)}") |
|
|
if os.path.exists(db_path): |
|
|
print(f" 크기: {os.path.getsize(db_path):,} bytes") |
|
|
try: |
|
|
conn = sqlite3.connect(db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("SELECT name FROM sqlite_master WHERE type='table'") |
|
|
tables = [row[0] for row in c.fetchall()] |
|
|
print(f" 테이블: {tables}") |
|
|
for table in tables: |
|
|
c.execute(f"SELECT COUNT(*) FROM {table}") |
|
|
count = c.fetchone()[0] |
|
|
print(f" - {table}: {count}개 레코드") |
|
|
conn.close() |
|
|
except Exception as e: |
|
|
print(f" DB 읽기 오류: {e}") |
|
|
print() |
|
|
|
|
|
|
|
|
ensure_persistent_storage() |
|
|
migrate_to_persistent_storage() |
|
|
verify_db_persistence() |
|
|
|
|
|
DB_PATH = get_persistent_path("soma_ohaeng.db") |
|
|
CREAT_JSON_PATH = get_persistent_path("creat.json") |
|
|
|
|
|
|
|
|
|
|
|
def ensure_string(value: Any) -> str: |
|
|
if value is None: |
|
|
return "" |
|
|
if isinstance(value, str): |
|
|
return value |
|
|
if isinstance(value, (list, dict)): |
|
|
return json.dumps(value, ensure_ascii=False) |
|
|
return str(value) |
|
|
|
|
|
def safe_float(value: Any, default: float = 0.0) -> float: |
|
|
if value is None: |
|
|
return default |
|
|
try: |
|
|
return float(value) |
|
|
except (ValueError, TypeError): |
|
|
return default |
|
|
|
|
|
|
|
|
|
|
|
class TimeAwareness: |
|
|
@staticmethod |
|
|
def now() -> datetime: |
|
|
return datetime.now() |
|
|
|
|
|
@staticmethod |
|
|
def get_formatted_time() -> str: |
|
|
now = datetime.now() |
|
|
weekdays = ["월요일", "화요일", "수요일", "목요일", "금요일", "토요일", "일요일"] |
|
|
weekday = weekdays[now.weekday()] |
|
|
return now.strftime(f"%Y년 %m월 %d일 ({weekday}) %H:%M:%S") |
|
|
|
|
|
@staticmethod |
|
|
def get_context_time() -> Dict: |
|
|
now = datetime.now() |
|
|
hour = now.hour |
|
|
if 5 <= hour < 12: time_of_day, greeting = "오전", "좋은 아침입니다" |
|
|
elif 12 <= hour < 14: time_of_day, greeting = "점심", "점심 시간입니다" |
|
|
elif 14 <= hour < 18: time_of_day, greeting = "오후", "좋은 오후입니다" |
|
|
elif 18 <= hour < 22: time_of_day, greeting = "저녁", "좋은 저녁입니다" |
|
|
else: time_of_day, greeting = "밤", "밤늦게까지 수고하십니다" |
|
|
month = now.month |
|
|
if month <= 3: quarter, half = "1분기", "상반기" |
|
|
elif month <= 6: quarter, half = "2분기", "상반기" |
|
|
elif month <= 9: quarter, half = "3분기", "하반기" |
|
|
else: quarter, half = "4분기", "하반기" |
|
|
return { |
|
|
"datetime": now, "formatted": TimeAwareness.get_formatted_time(), |
|
|
"year": now.year, "month": now.month, "day": now.day, |
|
|
"hour": now.hour, "minute": now.minute, |
|
|
"weekday": now.strftime("%A"), |
|
|
"weekday_kr": ["월요일", "화요일", "수요일", "목요일", "금요일", "토요일", "일요일"][now.weekday()], |
|
|
"time_of_day": time_of_day, "greeting": greeting, |
|
|
"quarter": quarter, "half": half, "timestamp": now.timestamp() |
|
|
} |
|
|
|
|
|
@staticmethod |
|
|
def get_time_prompt() -> str: |
|
|
ctx = TimeAwareness.get_context_time() |
|
|
return f"""[현재 시간 정보] |
|
|
- 일시: {ctx['formatted']} |
|
|
- 시간대: {ctx['time_of_day']} |
|
|
- 분기: {ctx['year']}년 {ctx['quarter']} ({ctx['half']}) |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
class FileProcessor: |
|
|
@staticmethod |
|
|
def extract_text_from_pdf(file_path: str) -> str: |
|
|
if not HAS_PYPDF2: |
|
|
return "[ERROR] PyPDF2가 설치되지 않았습니다. pip install PyPDF2" |
|
|
try: |
|
|
text_content = [] |
|
|
with open(file_path, 'rb') as file: |
|
|
pdf_reader = PyPDF2.PdfReader(file) |
|
|
for page_num, page in enumerate(pdf_reader.pages): |
|
|
page_text = page.extract_text() |
|
|
if page_text: |
|
|
text_content.append(f"[페이지 {page_num + 1}]\n{page_text}") |
|
|
return "\n\n".join(text_content) if text_content else "[PDF에서 텍스트를 추출할 수 없습니다]" |
|
|
except Exception as e: |
|
|
return f"[PDF 읽기 오류: {str(e)}]" |
|
|
|
|
|
@staticmethod |
|
|
def extract_text_from_docx(file_path: str) -> str: |
|
|
if not HAS_DOCX: |
|
|
return "[ERROR] python-docx가 설치되지 않았습니다. pip install python-docx" |
|
|
try: |
|
|
doc = DocxDocument(file_path) |
|
|
paragraphs = [para.text for para in doc.paragraphs if para.text.strip()] |
|
|
return "\n\n".join(paragraphs) if paragraphs else "[문서에서 텍스트를 추출할 수 없습니다]" |
|
|
except Exception as e: |
|
|
return f"[DOCX 읽기 오류: {str(e)}]" |
|
|
|
|
|
@staticmethod |
|
|
def extract_text_from_txt(file_path: str) -> str: |
|
|
try: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
return f.read() |
|
|
except UnicodeDecodeError: |
|
|
try: |
|
|
with open(file_path, 'r', encoding='cp949') as f: |
|
|
return f.read() |
|
|
except: |
|
|
return "[텍스트 파일 인코딩 오류]" |
|
|
except Exception as e: |
|
|
return f"[파일 읽기 오류: {str(e)}]" |
|
|
|
|
|
@staticmethod |
|
|
def process_uploaded_file(file) -> Tuple[str, str]: |
|
|
if file is None: |
|
|
return "", "" |
|
|
file_path = file.name if hasattr(file, 'name') else str(file) |
|
|
file_name = os.path.basename(file_path) |
|
|
file_ext = os.path.splitext(file_name)[1].lower() |
|
|
if file_ext == '.pdf': |
|
|
text = FileProcessor.extract_text_from_pdf(file_path) |
|
|
file_info = f"📄 PDF 파일: {file_name}" |
|
|
elif file_ext in ['.docx', '.doc']: |
|
|
text = FileProcessor.extract_text_from_docx(file_path) |
|
|
file_info = f"📝 Word 문서: {file_name}" |
|
|
elif file_ext in ['.txt', '.md', '.csv']: |
|
|
text = FileProcessor.extract_text_from_txt(file_path) |
|
|
file_info = f"📃 텍스트 파일: {file_name}" |
|
|
elif file_ext == '.json': |
|
|
text = FileProcessor.extract_text_from_txt(file_path) |
|
|
file_info = f"📋 JSON 파일: {file_name}" |
|
|
else: |
|
|
text = f"[지원하지 않는 파일 형식: {file_ext}]" |
|
|
file_info = f"❌ 지원 불가: {file_name}" |
|
|
if len(text) > 50000: |
|
|
text = text[:50000] + f"\n\n[... 총 {len(text):,}자 중 50,000자만 표시 ...]" |
|
|
return text, file_info |
|
|
|
|
|
|
|
|
|
|
|
class ExportManager: |
|
|
@staticmethod |
|
|
def export_to_markdown(report: str, goal: str) -> str: |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"aether_report_{timestamp}.md" |
|
|
filepath = os.path.join(tempfile.gettempdir(), filename) |
|
|
content = f"""# AETHER Proto-AGI 분석 보고서 |
|
|
**생성 일시**: {datetime.now().strftime("%Y년 %m월 %d일 %H:%M:%S")} |
|
|
**분석 목표**: {goal} |
|
|
--- |
|
|
{report} |
|
|
--- |
|
|
*이 보고서는 AETHER Proto-AGI (SOMA 오행 순환 · SLAI 자기학습 · MAIA 창발)에 의해 자동 생성되었습니다.* |
|
|
""" |
|
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
|
f.write(content) |
|
|
return filepath |
|
|
|
|
|
@staticmethod |
|
|
def export_to_json(report: str, goal: str, log: str, element_outputs: Dict) -> str: |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"aether_report_{timestamp}.json" |
|
|
filepath = os.path.join(tempfile.gettempdir(), filename) |
|
|
data = { |
|
|
"meta": { |
|
|
"generator": "AETHER Proto-AGI", |
|
|
"version": "2.2", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"goal": goal |
|
|
}, |
|
|
"report": report, |
|
|
"orchestration_log": log, |
|
|
"element_outputs": element_outputs, |
|
|
"analysis_summary": { |
|
|
"total_elements": len(element_outputs), |
|
|
"elements_processed": list(element_outputs.keys()) |
|
|
} |
|
|
} |
|
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
|
json.dump(data, f, ensure_ascii=False, indent=2) |
|
|
return filepath |
|
|
|
|
|
@staticmethod |
|
|
def export_log_to_txt(log: str) -> str: |
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
filename = f"aether_log_{timestamp}.txt" |
|
|
filepath = os.path.join(tempfile.gettempdir(), filename) |
|
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
|
f.write(f"AETHER Proto-AGI 오케스트레이션 로그\n") |
|
|
f.write(f"생성 시간: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") |
|
|
f.write("=" * 60 + "\n\n") |
|
|
f.write(log) |
|
|
return filepath |
|
|
|
|
|
|
|
|
|
|
|
class OutputFormatter: |
|
|
ELEMENT_STYLES = { |
|
|
"土": {"emoji": "🟤", "name": "감독", "icon": "🏛️"}, |
|
|
"金": {"emoji": "⚪", "name": "비평", "icon": "⚔️"}, |
|
|
"水": {"emoji": "🔵", "name": "리서치", "icon": "🔬"}, |
|
|
"木": {"emoji": "🟢", "name": "창발", "icon": "💡"}, |
|
|
"火": {"emoji": "🔴", "name": "실행", "icon": "🚀"} |
|
|
} |
|
|
|
|
|
@staticmethod |
|
|
def header(title: str, emoji: str = "🌀") -> str: |
|
|
return f"\n{'━' * 50}\n {emoji} {title}\n{'━' * 50}\n" |
|
|
|
|
|
@staticmethod |
|
|
def element_header(element_name: str) -> str: |
|
|
style = OutputFormatter.ELEMENT_STYLES.get(element_name, {}) |
|
|
emoji = style.get("emoji", "●") |
|
|
name = style.get("name", element_name) |
|
|
icon = style.get("icon", "") |
|
|
return f"\n┏━━ {emoji} {element_name}({name}) {icon} ━━┓\n" |
|
|
|
|
|
@staticmethod |
|
|
def search_result(count: int, cached: bool = False) -> str: |
|
|
status = "📦캐시" if cached else "🔍검색" |
|
|
bar = "█" * min(count, 10) + "░" * (10 - min(count, 10)) |
|
|
return f" {status} │{bar}│ {count}건\n" |
|
|
|
|
|
@staticmethod |
|
|
def satisfaction_bar(score: float) -> str: |
|
|
filled = int(score * 20) |
|
|
bar = "█" * filled + "░" * (20 - filled) |
|
|
pct = score * 100 |
|
|
if score >= 0.85: emoji, status = "🎯", "달성" |
|
|
elif score >= 0.7: emoji, status = "📈", "양호" |
|
|
elif score >= 0.5: emoji, status = "🔄", "진행" |
|
|
else: emoji, status = "⚠️", "개선" |
|
|
return f" {emoji} 만족도: {bar} {pct:.0f}% ({status})\n" |
|
|
|
|
|
@staticmethod |
|
|
def cycle_start(iteration: int, time_str: str) -> str: |
|
|
return f"\n╔══ 🔄 순환 #{iteration} ══╗\n║ ⏰ {time_str}\n╚{'═' * 30}╝\n" |
|
|
|
|
|
@staticmethod |
|
|
def compact_json(parsed: Dict, element_name: str) -> str: |
|
|
lines = [] |
|
|
style = OutputFormatter.ELEMENT_STYLES.get(element_name, {}) |
|
|
emoji = style.get("emoji", "●") |
|
|
if element_name == "土": |
|
|
if "assessment" in parsed: |
|
|
assessment = str(parsed['assessment'])[:200] |
|
|
lines.append(f" {emoji} 평가: {assessment}...") |
|
|
if "direction" in parsed: |
|
|
lines.append(f" → 방향: {str(parsed['direction'])[:150]}") |
|
|
if "fact_check" in parsed: |
|
|
lines.append(f" ✓ 팩트: {str(parsed['fact_check'])[:100]}") |
|
|
elif element_name == "金": |
|
|
if "critiques" in parsed and parsed["critiques"]: |
|
|
lines.append(f" {emoji} 비판점 {len(parsed['critiques'])}개:") |
|
|
for c in parsed["critiques"][:2]: |
|
|
lines.append(f" • {str(c)[:80]}") |
|
|
if "risks" in parsed and parsed["risks"]: |
|
|
lines.append(f" ⚠️ 리스크 {len(parsed['risks'])}개:") |
|
|
for r in parsed["risks"][:2]: |
|
|
lines.append(f" • {str(r)[:80]}") |
|
|
if "verified_facts" in parsed and parsed["verified_facts"]: |
|
|
lines.append(f" ✅ 검증 {len(parsed['verified_facts'])}개") |
|
|
elif element_name == "水": |
|
|
if "findings" in parsed and parsed["findings"]: |
|
|
lines.append(f" {emoji} 발견 {len(parsed['findings'])}건:") |
|
|
for f in parsed["findings"][:3]: |
|
|
lines.append(f" • {str(f)[:80]}") |
|
|
if "evidence" in parsed and parsed["evidence"]: |
|
|
lines.append(f" 📚 근거 {len(parsed['evidence'])}건") |
|
|
if "gaps" in parsed and parsed["gaps"]: |
|
|
lines.append(f" ❓ 추가조사 {len(parsed['gaps'])}건") |
|
|
elif element_name == "木": |
|
|
if "ideas" in parsed and parsed["ideas"]: |
|
|
lines.append(f" {emoji} 아이디어 {len(parsed['ideas'])}개 생성") |
|
|
if "selected_top3" in parsed and parsed["selected_top3"]: |
|
|
lines.append(f" 🏆 TOP 3:") |
|
|
for i, idea in enumerate(parsed["selected_top3"][:3], 1): |
|
|
lines.append(f" {i}. {str(idea)[:70]}...") |
|
|
if "novel_connections" in parsed and parsed["novel_connections"]: |
|
|
lines.append(f" 🔗 새로운 연결 {len(parsed['novel_connections'])}개") |
|
|
elif element_name == "火": |
|
|
if "result" in parsed: |
|
|
result = str(parsed['result'])[:300] |
|
|
lines.append(f" {emoji} 결과:") |
|
|
for i, chunk in enumerate(result.split('. ')[:4]): |
|
|
if chunk.strip(): |
|
|
lines.append(f" {chunk.strip()}.") |
|
|
if "deliverables" in parsed and parsed["deliverables"]: |
|
|
lines.append(f" 📦 산출물 {len(parsed['deliverables'])}개:") |
|
|
for d in parsed["deliverables"][:3]: |
|
|
lines.append(f" • {str(d)[:60]}") |
|
|
if "implementation" in parsed: |
|
|
lines.append(f" 🔧 구현: {str(parsed['implementation'])[:100]}...") |
|
|
return "\n".join(lines) + "\n" if lines else "" |
|
|
|
|
|
@staticmethod |
|
|
def final_marker() -> str: |
|
|
return "\n" + "═" * 50 + "\n 🎉 목표 달성 완료!\n" + "═" * 50 + "\n" |
|
|
|
|
|
|
|
|
|
|
|
class ReportGenerator: |
|
|
@staticmethod |
|
|
def generate(state, stats: Dict) -> str: |
|
|
goal = state.goal |
|
|
score = state.satisfaction_score |
|
|
iterations = state.iteration |
|
|
element_outputs = state.element_outputs |
|
|
earth_data = element_outputs.get("土", {}) |
|
|
metal_data = element_outputs.get("金", {}) |
|
|
water_data = element_outputs.get("水", {}) |
|
|
wood_data = element_outputs.get("木", {}) |
|
|
fire_data = element_outputs.get("火", {}) |
|
|
main_result = fire_data.get("result", "") |
|
|
key_insight = wood_data.get("key_insight", wood_data.get("reframe", "")) |
|
|
report = f"""# 🎯 분석 결과 |
|
|
--- |
|
|
## 📋 질문 |
|
|
> **{goal}** |
|
|
--- |
|
|
## 💡 핵심 답변 |
|
|
""" |
|
|
if main_result: |
|
|
report += f"{main_result}\n\n" |
|
|
else: |
|
|
assessment = earth_data.get("assessment", "") |
|
|
if assessment: |
|
|
report += f"{assessment}\n\n" |
|
|
else: |
|
|
report += "*분석이 완료되지 않았습니다. 순환 횟수를 늘려 다시 시도해주세요.*\n\n" |
|
|
if key_insight: |
|
|
report += f"""--- |
|
|
## 🔑 핵심 인사이트 |
|
|
{key_insight} |
|
|
""" |
|
|
top3 = wood_data.get("selected_top3", []) |
|
|
if top3: |
|
|
report += """--- |
|
|
## 💡 주요 아이디어 |
|
|
""" |
|
|
for i, idea in enumerate(top3[:3], 1): |
|
|
report += f"**{i}.** {idea}\n\n" |
|
|
verified = metal_data.get("verified_facts", []) |
|
|
if verified: |
|
|
report += """--- |
|
|
## ✅ 검증된 사실 |
|
|
""" |
|
|
for fact in verified[:5]: |
|
|
report += f"- {fact}\n" |
|
|
report += "\n" |
|
|
risks = metal_data.get("risks", []) |
|
|
if risks: |
|
|
report += """--- |
|
|
## ⚠️ 주요 리스크 |
|
|
""" |
|
|
for risk in risks[:3]: |
|
|
report += f"- {risk}\n" |
|
|
report += "\n" |
|
|
findings = water_data.get("findings", []) |
|
|
if findings: |
|
|
report += """--- |
|
|
## 📊 주요 발견 |
|
|
""" |
|
|
for finding in findings[:5]: |
|
|
report += f"- {finding}\n" |
|
|
report += "\n" |
|
|
if hasattr(state, 'metacog_assessments') and state.metacog_assessments: |
|
|
avg_quality = sum(a.overall_score for a in state.metacog_assessments) / len(state.metacog_assessments) |
|
|
report += f"""--- |
|
|
## 🧠 메타인지 품질 평가 |
|
|
| 지표 | 평균 점수 | |
|
|
|------|-----------| |
|
|
| 종합 품질 | {'█' * int(avg_quality * 10)}{'░' * (10 - int(avg_quality * 10))} {avg_quality:.0%} | |
|
|
| 분석 단계 | {len(state.metacog_assessments)}회 평가 | |
|
|
""" |
|
|
report += f"""--- |
|
|
## 📊 분석 정보 |
|
|
| 항목 | 값 | |
|
|
|------|-----| |
|
|
| 분석 신뢰도 | {'█' * int(score * 10)}{'░' * (10 - int(score * 10))} {score:.0%} | |
|
|
| 분석 순환 | {iterations}회 | |
|
|
| 세션 ID | `{state.session_id}` | |
|
|
| 생성 시간 | {TimeAwareness.get_formatted_time()} | |
|
|
""" |
|
|
return report |
|
|
|
|
|
@staticmethod |
|
|
def generate_progress(state) -> str: |
|
|
progress_bar = '█' * int(state.satisfaction_score * 10) + '░' * (10 - int(state.satisfaction_score * 10)) |
|
|
current_element = "" |
|
|
if state.history: |
|
|
last = state.history[-1] |
|
|
elem_name = last.get("element", "") |
|
|
elem_map = { |
|
|
"土": "🟤 土 감독", "金": "⚪ 金 비평", "水": "🔵 水 리서치", |
|
|
"木": "🟢 木 창발", "火": "🔴 火 실행" |
|
|
} |
|
|
current_element = elem_map.get(elem_name, elem_name) |
|
|
return f"""# ⏳ 분석 진행 중... |
|
|
## 📋 질문 |
|
|
> **{state.goal}** |
|
|
## 📊 현재 상태 |
|
|
| 항목 | 상태 | |
|
|
|------|------| |
|
|
| 진행 순환 | {state.iteration}회 | |
|
|
| 현재 단계 | {current_element} | |
|
|
| 진행도 | {progress_bar} {state.satisfaction_score:.0%} | |
|
|
--- |
|
|
💡 **오케스트레이션 로그**를 펼쳐서 실시간 분석 과정을 확인하세요. |
|
|
*土(감독) → 金(비평) → 水(리서치) → 木(창발) → 火(실행) 순환 중...* |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
class MetaCognition: |
|
|
UNCERTAINTY_MARKERS = [ |
|
|
"아마", "maybe", "perhaps", "possibly", "might", "could be", |
|
|
"불확실", "uncertain", "unclear", "추측", "guess", "assume", |
|
|
"~일 수 있", "~할 수도", "정확하지 않", "확인 필요", |
|
|
"것 같", "듯하", "보임", "추정" |
|
|
] |
|
|
HEDGE_WORDS = [ |
|
|
"일반적으로", "보통", "대체로", "주로", "often", "usually", |
|
|
"typically", "generally", "sometimes", "occasionally", |
|
|
"대부분", "많은 경우" |
|
|
] |
|
|
FACTUAL_INDICATORS = [ |
|
|
r'\d{4}년', r'\d+%', r'\d+억', r'\d+조', r'\$\d+', |
|
|
r'[A-Z]{2,}', r'「.+」', r'".+"', r'\[출처\]', |
|
|
r'\d+월', r'\d+일', r'약 \d+', r'총 \d+' |
|
|
] |
|
|
LOGIC_CONNECTORS = [ |
|
|
"따라서", "그러므로", "때문에", "결과적으로", "왜냐하면", |
|
|
"therefore", "because", "consequently", "thus", "hence", |
|
|
"이로 인해", "그 결과", "이에 따라" |
|
|
] |
|
|
ABSTRACT_WARNINGS = [ |
|
|
"플랫폼", "시스템", "프레임워크", "프로그램", "메커니즘", |
|
|
"글로벌 거버넌스", "국제 협력", "전략적 파트너십", |
|
|
"~를 개발", "~를 구축", "~를 설계" |
|
|
] |
|
|
|
|
|
def __init__(self): |
|
|
self.confidence_history: Dict[str, List[float]] = {} |
|
|
self.failure_patterns: List[Dict] = [] |
|
|
self.domain_confidence: Dict[str, float] = {} |
|
|
self.uncertainty_threshold = 0.35 |
|
|
self.min_specificity_threshold = 0.4 |
|
|
|
|
|
def assess_response_quality(self, response: str, goal: str, context: Dict = None) -> QualityAssessment: |
|
|
if not response or not response.strip(): |
|
|
return QualityAssessment( |
|
|
factual_confidence=0.0, logical_coherence=0.0, |
|
|
completeness=0.0, specificity=0.0, overall_score=0.0, |
|
|
uncertainty_flags=["빈 응답"], |
|
|
needs_verification=["전체 재생성 필요"], |
|
|
recommendations=["응답 재생성 필요"] |
|
|
) |
|
|
factual = self._assess_factual_grounding(response) |
|
|
logic = self._assess_logical_coherence(response) |
|
|
complete = self._assess_completeness(response, goal) |
|
|
specific = self._assess_specificity(response) |
|
|
abstract_penalty = self._check_abstract_expressions(response) |
|
|
uncertainties = self._identify_uncertainties(response) |
|
|
verifications = self._identify_verification_needs(response, goal) |
|
|
weights = {"factual": 0.3, "logic": 0.2, "complete": 0.3, "specific": 0.2} |
|
|
overall = (factual * weights["factual"] + |
|
|
logic * weights["logic"] + |
|
|
complete * weights["complete"] + |
|
|
specific * weights["specific"]) |
|
|
if len(uncertainties) > 3: |
|
|
overall *= 0.85 |
|
|
overall *= (1 - abstract_penalty * 0.15) |
|
|
recommendations = self._generate_recommendations( |
|
|
factual, logic, complete, specific, uncertainties, abstract_penalty |
|
|
) |
|
|
return QualityAssessment( |
|
|
factual_confidence=round(factual, 3), |
|
|
logical_coherence=round(logic, 3), |
|
|
completeness=round(complete, 3), |
|
|
specificity=round(specific, 3), |
|
|
overall_score=round(max(0, min(1, overall)), 3), |
|
|
uncertainty_flags=uncertainties, |
|
|
needs_verification=verifications, |
|
|
recommendations=recommendations |
|
|
) |
|
|
|
|
|
def _assess_factual_grounding(self, response: str) -> float: |
|
|
score = 0.3 |
|
|
for pattern in self.FACTUAL_INDICATORS: |
|
|
matches = re.findall(pattern, response) |
|
|
score += min(len(matches) * 0.05, 0.25) |
|
|
source_patterns = [r'\[출처[:\s]', r'에 따르면', r'보도에 의하면', r'발표한 바', r'연구에 따르면'] |
|
|
for pattern in source_patterns: |
|
|
if re.search(pattern, response): |
|
|
score += 0.1 |
|
|
uncertainty_count = sum(1 for marker in self.UNCERTAINTY_MARKERS if marker in response.lower()) |
|
|
score -= uncertainty_count * 0.05 |
|
|
return max(0.0, min(1.0, score)) |
|
|
|
|
|
def _assess_logical_coherence(self, response: str) -> float: |
|
|
score = 0.5 |
|
|
connector_count = sum(1 for conn in self.LOGIC_CONNECTORS if conn in response) |
|
|
score += min(connector_count * 0.08, 0.25) |
|
|
sentences = re.split(r'[.!?。]', response) |
|
|
if len(sentences) > 3: |
|
|
score += 0.1 |
|
|
structure_patterns = [r'첫째|둘째|셋째', r'1\.|2\.|3\.', r'###\s', r'\*\*[^*]+\*\*', r'\|.*\|'] |
|
|
for pattern in structure_patterns: |
|
|
if re.search(pattern, response): |
|
|
score += 0.05 |
|
|
contradictions = self._detect_contradictions(response) |
|
|
score -= len(contradictions) * 0.15 |
|
|
return max(0.0, min(1.0, score)) |
|
|
|
|
|
def _detect_contradictions(self, response: str) -> List[str]: |
|
|
contradictions = [] |
|
|
contradiction_pairs = [ |
|
|
(r'증가.{0,30}감소', "증가/감소 모순"), |
|
|
(r'성공.{0,30}실패', "성공/실패 모순"), |
|
|
(r'긍정.{0,30}부정', "긍정/부정 모순"), |
|
|
(r'불가능.{0,20}가능', "가능/불가능 모순"), |
|
|
(r'강화.{0,20}약화', "강화/약화 모순"), |
|
|
] |
|
|
for pattern, desc in contradiction_pairs: |
|
|
if re.search(pattern, response): |
|
|
contradictions.append(desc) |
|
|
return contradictions |
|
|
|
|
|
def _assess_completeness(self, response: str, goal: str) -> float: |
|
|
goal_keywords = self._extract_keywords(goal) |
|
|
if not goal_keywords: |
|
|
return 0.5 |
|
|
response_lower = response.lower() |
|
|
covered = sum(1 for kw in goal_keywords if kw.lower() in response_lower) |
|
|
coverage_ratio = covered / len(goal_keywords) if goal_keywords else 0 |
|
|
length_score = min(len(response) / 1500, 1.0) * 0.3 |
|
|
return min(1.0, coverage_ratio * 0.7 + length_score) |
|
|
|
|
|
def _extract_keywords(self, text: str) -> List[str]: |
|
|
stopwords = { |
|
|
'의', '가', '이', '은', '들', '는', '좀', '잘', '를', '을', '로', '으로', |
|
|
'에', '와', '과', '도', '만', '라', '하다', '있다', '되다', '이다', '그', |
|
|
'저', '것', '수', '등', '더', '때', 'the', 'a', 'an', 'is', 'are', 'was', |
|
|
'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', |
|
|
'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can', |
|
|
'need', 'to', 'of', 'in', 'for', 'on', 'with', 'at', 'by', 'from', 'as', |
|
|
'what', 'which', 'who', 'how', 'why', 'when', 'where' |
|
|
} |
|
|
words = re.findall(r'[가-힣a-zA-Z0-9]+', text) |
|
|
keywords = [w for w in words if len(w) > 1 and w.lower() not in stopwords] |
|
|
return keywords[:15] |
|
|
|
|
|
def _assess_specificity(self, response: str) -> float: |
|
|
score = 0.2 |
|
|
number_matches = re.findall(r'\d+(?:\.\d+)?(?:%|억|만|천|조|달러|원)?', response) |
|
|
score += min(len(number_matches) * 0.04, 0.25) |
|
|
proper_nouns = re.findall(r'[A-Z][a-z]+(?:\s[A-Z][a-z]+)*', response) |
|
|
korean_proper = re.findall( |
|
|
r'(?:미국|중국|일본|한국|유럽|러시아|인도|독일|영국|프랑스|' |
|
|
r'삼성|애플|구글|마이크로소프트|테슬라|아마존|화웨이|TSMC|엔비디아|' |
|
|
r'트럼프|바이든|시진핑|푸틴|OpenAI|Anthropic)', response |
|
|
) |
|
|
score += min((len(proper_nouns) + len(korean_proper)) * 0.03, 0.25) |
|
|
date_patterns = re.findall(r'\d{4}년|\d{1,2}월|\d{1,2}일|20\d{2}', response) |
|
|
score += min(len(date_patterns) * 0.05, 0.15) |
|
|
vague_terms = ['것', '등', '여러', '다양한', '많은', '일부', '어떤', 'something', 'various', 'many', 'some'] |
|
|
vague_count = sum(1 for term in vague_terms if term in response) |
|
|
score -= vague_count * 0.03 |
|
|
return max(0.0, min(1.0, score)) |
|
|
|
|
|
def _check_abstract_expressions(self, response: str) -> float: |
|
|
count = 0 |
|
|
for expr in self.ABSTRACT_WARNINGS: |
|
|
if expr in response: |
|
|
count += 1 |
|
|
return min(count / 5, 1.0) |
|
|
|
|
|
def _identify_uncertainties(self, response: str) -> List[str]: |
|
|
uncertainties = [] |
|
|
response_lower = response.lower() |
|
|
for marker in self.UNCERTAINTY_MARKERS: |
|
|
if marker in response_lower: |
|
|
idx = response_lower.find(marker) |
|
|
context = response[max(0, idx-20):min(len(response), idx+50)] |
|
|
uncertainties.append(f"불확실: '{marker}' - ...{context}...") |
|
|
if len(uncertainties) >= 5: |
|
|
break |
|
|
return uncertainties |
|
|
|
|
|
def _identify_verification_needs(self, response: str, goal: str) -> List[str]: |
|
|
needs = [] |
|
|
numbers = re.findall(r'\d+(?:\.\d+)?(?:%|억|조)', response) |
|
|
if numbers: |
|
|
needs.append(f"숫자 데이터 검증 필요: {', '.join(numbers[:3])}") |
|
|
quotes = re.findall(r'"([^"]+)"', response) |
|
|
if quotes: |
|
|
needs.append(f"인용문 검증 필요: {len(quotes)}개") |
|
|
return needs[:5] |
|
|
|
|
|
def _generate_recommendations(self, factual: float, logic: float, |
|
|
complete: float, specific: float, |
|
|
uncertainties: List[str], |
|
|
abstract_penalty: float) -> List[str]: |
|
|
recommendations = [] |
|
|
if factual < 0.5: |
|
|
recommendations.append("검색 결과에서 구체적 사실/수치 인용 강화") |
|
|
if logic < 0.5: |
|
|
recommendations.append("논리 연결어 사용하여 인과관계 명확화") |
|
|
if complete < 0.5: |
|
|
recommendations.append("목표의 모든 측면에 대한 답변 보완") |
|
|
if specific < 0.4: |
|
|
recommendations.append("국가명, 기업명, 수치 등 구체적 정보 추가") |
|
|
if len(uncertainties) > 2: |
|
|
recommendations.append("불확실한 표현 대신 검증된 사실로 대체") |
|
|
if abstract_penalty > 0.3: |
|
|
recommendations.append("'플랫폼', '시스템' 등 추상적 용어 대신 구체적 방안 제시") |
|
|
return recommendations[:5] |
|
|
|
|
|
def analyze_goal_clarity(self, goal: str) -> GoalClarity: |
|
|
score = 0.5 |
|
|
ambiguous = [] |
|
|
missing = [] |
|
|
suggestions = [] |
|
|
ambiguous_patterns = [ |
|
|
('좋은', "좋은의 기준 모호"), |
|
|
('최적', "최적의 기준 모호"), |
|
|
('효과적', "효과적의 기준 모호"), |
|
|
('적절한', "적절함의 기준 모호"), |
|
|
('더 나은', "비교 대상 불명확"), |
|
|
] |
|
|
for term, reason in ambiguous_patterns: |
|
|
if term in goal: |
|
|
ambiguous.append(reason) |
|
|
score -= 0.1 |
|
|
if not re.search(r'\d{4}|20\d{2}', goal): |
|
|
missing.append("시간 범위") |
|
|
suggestions.append("시간 범위를 명시해주세요 (예: 2025년, 향후 5년)") |
|
|
if len(goal) < 20: |
|
|
missing.append("상세 맥락") |
|
|
suggestions.append("구체적인 맥락이나 조건을 추가해주세요") |
|
|
goal_type = self._detect_goal_type(goal) |
|
|
if goal_type in ["prediction", "strategy", "comparison", "analysis"]: |
|
|
score += 0.1 |
|
|
is_actionable = score >= 0.5 and len(missing) <= 2 |
|
|
return GoalClarity( |
|
|
clarity_score=round(max(0, min(1, score)), 3), |
|
|
ambiguous_terms=ambiguous, |
|
|
missing_context=missing, |
|
|
suggested_clarifications=suggestions[:5], |
|
|
is_actionable=is_actionable, |
|
|
goal_type=goal_type |
|
|
) |
|
|
|
|
|
def _detect_goal_type(self, goal: str) -> str: |
|
|
goal_lower = goal.lower() |
|
|
if any(kw in goal_lower for kw in ['예측', '전망', '될까', '될 것', '미래', '2025', '2026', '2027', '2028']): |
|
|
return "prediction" |
|
|
elif any(kw in goal_lower for kw in ['전략', '방법', '어떻게', '방안', '계획', '수립']): |
|
|
return "strategy" |
|
|
elif any(kw in goal_lower for kw in ['비교', '차이', 'vs', 'VS', '대', '어느 것']): |
|
|
return "comparison" |
|
|
elif any(kw in goal_lower for kw in ['분석', '왜', '원인', '영향', '현황']): |
|
|
return "analysis" |
|
|
elif any(kw in goal_lower for kw in ['특허', '발명', '아이디어', '혁신']): |
|
|
return "invention" |
|
|
elif any(kw in goal_lower for kw in ['소설', '스토리', '시나리오', '웹툰']): |
|
|
return "story" |
|
|
elif any(kw in goal_lower for kw in ['요리', '레시피', '음식']): |
|
|
return "recipe" |
|
|
else: |
|
|
return "general" |
|
|
|
|
|
def should_ask_clarification(self, goal: str) -> Tuple[bool, Optional[str]]: |
|
|
clarity = self.analyze_goal_clarity(goal) |
|
|
if clarity.clarity_score < self.uncertainty_threshold: |
|
|
return True, self._build_clarification_message(clarity) |
|
|
if not clarity.is_actionable: |
|
|
return True, self._build_clarification_message(clarity) |
|
|
return False, None |
|
|
|
|
|
def _build_clarification_message(self, clarity: GoalClarity) -> str: |
|
|
msg_parts = ["🤔 목표를 더 명확히 해주시면 더 좋은 분석이 가능합니다:\n"] |
|
|
if clarity.ambiguous_terms: |
|
|
msg_parts.append(f"• 모호한 표현: {', '.join(clarity.ambiguous_terms[:3])}") |
|
|
if clarity.missing_context: |
|
|
msg_parts.append(f"• 누락된 맥락: {', '.join(clarity.missing_context[:3])}") |
|
|
if clarity.suggested_clarifications: |
|
|
msg_parts.append("\n💡 제안:") |
|
|
for i, sugg in enumerate(clarity.suggested_clarifications[:3], 1): |
|
|
msg_parts.append(f" {i}. {sugg}") |
|
|
return "\n".join(msg_parts) |
|
|
|
|
|
def format_assessment_for_log(self, assessment: QualityAssessment) -> str: |
|
|
lines = [ |
|
|
"┌─ 🧠 메타인지 평가 ─┐", |
|
|
f"│ 사실근거: {'█' * int(assessment.factual_confidence * 10)}{'░' * (10 - int(assessment.factual_confidence * 10))} {assessment.factual_confidence:.0%}", |
|
|
f"│ 논리성: {'█' * int(assessment.logical_coherence * 10)}{'░' * (10 - int(assessment.logical_coherence * 10))} {assessment.logical_coherence:.0%}", |
|
|
f"│ 완성도: {'█' * int(assessment.completeness * 10)}{'░' * (10 - int(assessment.completeness * 10))} {assessment.completeness:.0%}", |
|
|
f"│ 구체성: {'█' * int(assessment.specificity * 10)}{'░' * (10 - int(assessment.specificity * 10))} {assessment.specificity:.0%}", |
|
|
f"│ ────────────────", |
|
|
f"│ 종합: {'█' * int(assessment.overall_score * 10)}{'░' * (10 - int(assessment.overall_score * 10))} {assessment.overall_score:.0%}", |
|
|
] |
|
|
if assessment.uncertainty_flags: |
|
|
lines.append(f"│ ⚠️ 불확실: {len(assessment.uncertainty_flags)}건") |
|
|
if assessment.recommendations: |
|
|
lines.append(f"│ 💡 권장: {assessment.recommendations[0][:25]}...") |
|
|
lines.append("└────────────────────┘") |
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
|
|
|
class BraveSearchClient: |
|
|
def __init__(self): |
|
|
self.api_key = os.getenv("BRAVE_API_KEY") |
|
|
self.base_url = "https://api.search.brave.com/res/v1/web/search" |
|
|
|
|
|
def search(self, query: str, element: str, count: int = 5) -> Dict: |
|
|
if not self.api_key: |
|
|
return {"success": False, "error": "BRAVE_API_KEY 환경변수가 설정되지 않았습니다.", "results": []} |
|
|
purpose_info = BRAVE_SEARCH_PURPOSES.get(element, {}) |
|
|
query_prefix = purpose_info.get("query_prefix", "") |
|
|
optimized_query = f"{query_prefix} {query}".strip() |
|
|
headers = { |
|
|
"Accept": "application/json", |
|
|
"Accept-Encoding": "gzip", |
|
|
"X-Subscription-Token": self.api_key |
|
|
} |
|
|
params = {"q": optimized_query, "count": count, "text_decorations": False, "search_lang": "ko"} |
|
|
try: |
|
|
response = requests.get(self.base_url, headers=headers, params=params, timeout=10) |
|
|
response.raise_for_status() |
|
|
data = response.json() |
|
|
results = [] |
|
|
for item in data.get("web", {}).get("results", []): |
|
|
results.append({ |
|
|
"title": item.get("title", ""), |
|
|
"url": item.get("url", ""), |
|
|
"description": item.get("description", ""), |
|
|
"age": item.get("age", "") |
|
|
}) |
|
|
return { |
|
|
"success": True, "query": optimized_query, "element": element, |
|
|
"purpose": purpose_info.get("purpose", ""), "results": results, "count": len(results) |
|
|
} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"success": False, "error": str(e), "results": []} |
|
|
|
|
|
def format_for_prompt(self, search_result: Dict) -> str: |
|
|
if not search_result.get("success") or not search_result.get("results"): |
|
|
return "[검색 결과 없음]" |
|
|
lines = [f"[Brave Search - {search_result.get('purpose', '')}]", f"쿼리: {search_result.get('query', '')}", ""] |
|
|
for i, result in enumerate(search_result.get("results", [])[:5], 1): |
|
|
lines.append(f"{i}. **{result.get('title', '')}**") |
|
|
lines.append(f" {result.get('description', '')[:200]}...") |
|
|
lines.append(f" 출처: {result.get('url', '')}") |
|
|
lines.append("") |
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
|
|
|
class URLCrawler: |
|
|
def __init__(self): |
|
|
self.session = requests.Session() |
|
|
self.session.headers.update({ |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
|
|
"Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7" |
|
|
}) |
|
|
|
|
|
@staticmethod |
|
|
def extract_urls(text: str) -> List[str]: |
|
|
full_url_pattern = r'(?:https?://)?(?:www\.)?(?:[a-zA-Z0-9][-a-zA-Z0-9]*\.)+[a-zA-Z]{2,}(?:/[^\s]*)?' |
|
|
urls = re.findall(full_url_pattern, text) |
|
|
cleaned_urls = [] |
|
|
for url in urls: |
|
|
url = url.strip('.,;:!?()') |
|
|
if not url.startswith(('http://', 'https://')): |
|
|
url = 'https://' + url |
|
|
cleaned_urls.append(url) |
|
|
return list(set(cleaned_urls)) |
|
|
|
|
|
def crawl(self, url: str, max_length: int = 5000) -> Dict: |
|
|
if not HAS_BS4: |
|
|
return {"success": False, "error": "beautifulsoup4가 설치되지 않았습니다.", "url": url} |
|
|
try: |
|
|
if not url.startswith(('http://', 'https://')): |
|
|
url = 'https://' + url |
|
|
response = self.session.get(url, timeout=15, allow_redirects=True) |
|
|
response.raise_for_status() |
|
|
if response.encoding is None or response.encoding == 'ISO-8859-1': |
|
|
response.encoding = response.apparent_encoding or 'utf-8' |
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'iframe', 'noscript']): |
|
|
element.decompose() |
|
|
title = soup.title.string if soup.title else "" |
|
|
meta_desc = "" |
|
|
meta_tag = soup.find('meta', attrs={'name': 'description'}) |
|
|
if meta_tag: |
|
|
meta_desc = meta_tag.get('content', '') |
|
|
main_content = soup.find('main') or soup.find('article') or soup.find('body') |
|
|
if main_content: |
|
|
text = main_content.get_text(separator='\n', strip=True) |
|
|
else: |
|
|
text = soup.get_text(separator='\n', strip=True) |
|
|
lines = [line.strip() for line in text.split('\n') if line.strip()] |
|
|
text = '\n'.join(lines) |
|
|
if len(text) > max_length: |
|
|
text = text[:max_length] + "...[truncated]" |
|
|
links = [] |
|
|
for a_tag in soup.find_all('a', href=True)[:10]: |
|
|
href = a_tag['href'] |
|
|
link_text = a_tag.get_text(strip=True) |
|
|
if href.startswith('/'): |
|
|
href = urljoin(url, href) |
|
|
if href.startswith('http'): |
|
|
links.append({"text": link_text[:50], "url": href}) |
|
|
return { |
|
|
"success": True, "url": url, "final_url": response.url, "title": title, |
|
|
"meta_description": meta_desc, "content": text, "content_length": len(text), |
|
|
"links": links, "status_code": response.status_code, |
|
|
"crawled_at": TimeAwareness.get_formatted_time() |
|
|
} |
|
|
except requests.exceptions.Timeout: |
|
|
return {"success": False, "error": "요청 시간 초과 (15초)", "url": url} |
|
|
except requests.exceptions.SSLError: |
|
|
return {"success": False, "error": "SSL 인증서 오류", "url": url} |
|
|
except requests.exceptions.RequestException as e: |
|
|
return {"success": False, "error": str(e), "url": url} |
|
|
except Exception as e: |
|
|
return {"success": False, "error": f"크롤링 오류: {str(e)}", "url": url} |
|
|
|
|
|
|
|
|
|
|
|
class SimpleVectorDB: |
|
|
def __init__(self, dimension: int = VECTOR_DIM): |
|
|
self.dimension = dimension |
|
|
self.vectors: Dict[str, np.ndarray] = {} |
|
|
self.metadata: Dict[str, Dict] = {} |
|
|
|
|
|
def add(self, id: str, vector: List[float], metadata: Dict = None): |
|
|
self.vectors[id] = np.array(vector, dtype=np.float32) |
|
|
self.metadata[id] = metadata or {} |
|
|
|
|
|
def search(self, query_vector: List[float], top_k: int = 5) -> List[Tuple[str, float, Dict]]: |
|
|
if not self.vectors: |
|
|
return [] |
|
|
query = np.array(query_vector, dtype=np.float32) |
|
|
results = [] |
|
|
for id, vec in self.vectors.items(): |
|
|
similarity = np.dot(query, vec) / (np.linalg.norm(query) * np.linalg.norm(vec) + 1e-8) |
|
|
results.append((id, float(similarity), self.metadata.get(id, {}))) |
|
|
results.sort(key=lambda x: x[1], reverse=True) |
|
|
return results[:top_k] |
|
|
|
|
|
def delete(self, id: str): |
|
|
self.vectors.pop(id, None) |
|
|
self.metadata.pop(id, None) |
|
|
|
|
|
def simple_embed(self, text: str) -> List[float]: |
|
|
hash_bytes = hashlib.sha384(text.encode()).digest() |
|
|
embedding = [float(b) / 255.0 - 0.5 for b in hash_bytes] |
|
|
while len(embedding) < self.dimension: |
|
|
embedding.extend(embedding[:self.dimension - len(embedding)]) |
|
|
return embedding[:self.dimension] |
|
|
|
|
|
|
|
|
|
|
|
class SLAIMemorySystem: |
|
|
def __init__(self, db_path: str = DB_PATH): |
|
|
self.db_path = db_path |
|
|
self.vector_db = SimpleVectorDB() |
|
|
self.knowledge_vector_db = SimpleVectorDB() |
|
|
self._init_db() |
|
|
self._load_vectors() |
|
|
|
|
|
def _init_db(self): |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("""CREATE TABLE IF NOT EXISTS memories ( |
|
|
id TEXT PRIMARY KEY, content TEXT NOT NULL, memory_type TEXT NOT NULL, |
|
|
element TEXT, goal_context TEXT, importance REAL DEFAULT 0.5, |
|
|
access_count INTEGER DEFAULT 0, created_at TEXT, last_accessed TEXT, |
|
|
embedding BLOB, metadata TEXT)""") |
|
|
c.execute("""CREATE TABLE IF NOT EXISTS learning_patterns ( |
|
|
id TEXT PRIMARY KEY, pattern_type TEXT, input_pattern TEXT, |
|
|
output_pattern TEXT, success_count INTEGER DEFAULT 0, |
|
|
fail_count INTEGER DEFAULT 0, confidence REAL DEFAULT 0.5, |
|
|
created_at TEXT, updated_at TEXT)""") |
|
|
c.execute("""CREATE TABLE IF NOT EXISTS sessions ( |
|
|
id TEXT PRIMARY KEY, goal TEXT, iterations INTEGER, satisfied INTEGER, |
|
|
satisfaction_score REAL, final_output TEXT, created_at TEXT, history TEXT)""") |
|
|
c.execute("""CREATE TABLE IF NOT EXISTS knowledge ( |
|
|
id TEXT PRIMARY KEY, goal TEXT NOT NULL, query TEXT, element TEXT, |
|
|
search_results TEXT, agent_output TEXT, final_result TEXT, |
|
|
embedding BLOB, created_at TEXT, access_count INTEGER DEFAULT 0, |
|
|
tags TEXT, quality_score REAL DEFAULT 0.5)""") |
|
|
c.execute("""CREATE TABLE IF NOT EXISTS search_cache ( |
|
|
id TEXT PRIMARY KEY, query TEXT NOT NULL, element TEXT, |
|
|
results TEXT, created_at TEXT, expiry_at TEXT)""") |
|
|
conn.commit() |
|
|
conn.close() |
|
|
print(f"✅ DB 초기화 완료: {self.db_path}") |
|
|
|
|
|
def _load_vectors(self): |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("SELECT id, content, embedding, metadata FROM memories WHERE embedding IS NOT NULL") |
|
|
for row in c.fetchall(): |
|
|
id, content, embedding_blob, metadata_str = row |
|
|
if embedding_blob: |
|
|
embedding = json.loads(embedding_blob) |
|
|
metadata = json.loads(metadata_str) if metadata_str else {} |
|
|
metadata["content"] = content |
|
|
self.vector_db.add(id, embedding, metadata) |
|
|
c.execute("SELECT id, goal, final_result, embedding FROM knowledge WHERE embedding IS NOT NULL") |
|
|
for row in c.fetchall(): |
|
|
id, goal, final_result, embedding_blob = row |
|
|
if embedding_blob: |
|
|
embedding = json.loads(embedding_blob) |
|
|
self.knowledge_vector_db.add(id, embedding, { |
|
|
"goal": goal, |
|
|
"final_result": final_result[:500] if final_result else "" |
|
|
}) |
|
|
conn.close() |
|
|
|
|
|
def store_memory(self, content: str, memory_type: str, element: str, goal_context: str, importance: float = 0.5) -> str: |
|
|
content = ensure_string(content) |
|
|
memory_type = ensure_string(memory_type) |
|
|
element = ensure_string(element) |
|
|
goal_context = ensure_string(goal_context) |
|
|
importance = safe_float(importance, 0.5) |
|
|
memory_id = hashlib.md5(f"{content}{datetime.now().isoformat()}".encode()).hexdigest()[:16] |
|
|
now = datetime.now().isoformat() |
|
|
embedding = self.vector_db.simple_embed(content) |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("""INSERT OR REPLACE INTO memories |
|
|
(id, content, memory_type, element, goal_context, importance, access_count, |
|
|
created_at, last_accessed, embedding, metadata) |
|
|
VALUES (?, ?, ?, ?, ?, ?, 0, ?, ?, ?, ?)""", |
|
|
(memory_id, content, memory_type, element, goal_context, importance, |
|
|
now, now, json.dumps(embedding), json.dumps({}))) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
self.vector_db.add(memory_id, embedding, { |
|
|
"content": content, "memory_type": memory_type, |
|
|
"element": element, "goal_context": goal_context |
|
|
}) |
|
|
self._cleanup_memories(memory_type) |
|
|
return memory_id |
|
|
|
|
|
def retrieve_memories(self, query: str, memory_type: str = None, top_k: int = 5) -> List[Dict]: |
|
|
query_embedding = self.vector_db.simple_embed(query) |
|
|
results = self.vector_db.search(query_embedding, top_k * 2) |
|
|
filtered = [] |
|
|
for id, similarity, metadata in results: |
|
|
if memory_type and metadata.get("memory_type") != memory_type: |
|
|
continue |
|
|
filtered.append({ |
|
|
"id": id, "content": metadata.get("content", ""), |
|
|
"similarity": similarity, "memory_type": metadata.get("memory_type"), |
|
|
"element": metadata.get("element") |
|
|
}) |
|
|
if len(filtered) >= top_k: |
|
|
break |
|
|
self._update_access_count([r["id"] for r in filtered]) |
|
|
return filtered |
|
|
|
|
|
def promote_memory(self, memory_id: str): |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("SELECT memory_type, access_count, importance FROM memories WHERE id = ?", (memory_id,)) |
|
|
row = c.fetchone() |
|
|
if row: |
|
|
current_type, access_count, importance = row |
|
|
if current_type == "short_term" and (access_count >= 3 or importance >= 0.7): |
|
|
new_type = "mid_term" |
|
|
elif current_type == "mid_term" and (access_count >= 10 or importance >= 0.85): |
|
|
new_type = "long_term" |
|
|
else: |
|
|
new_type = current_type |
|
|
if new_type != current_type: |
|
|
c.execute("UPDATE memories SET memory_type = ? WHERE id = ?", (new_type, memory_id)) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
def store_knowledge(self, goal: str, query: str, element: str, search_results: Dict, |
|
|
agent_output: str, final_result: Any, quality_score: float = 0.5, |
|
|
tags: List[str] = None) -> str: |
|
|
knowledge_id = hashlib.md5(f"{goal}{element}{datetime.now().isoformat()}".encode()).hexdigest()[:16] |
|
|
now = datetime.now().isoformat() |
|
|
goal = ensure_string(goal) |
|
|
query = ensure_string(query) |
|
|
element = ensure_string(element) |
|
|
agent_output = ensure_string(agent_output) |
|
|
final_result = ensure_string(final_result) |
|
|
quality_score = safe_float(quality_score, 0.5) |
|
|
if isinstance(search_results, dict): |
|
|
search_results_str = json.dumps(search_results, ensure_ascii=False) |
|
|
else: |
|
|
search_results_str = ensure_string(search_results) |
|
|
if tags is None: |
|
|
tags = [] |
|
|
tags_str = json.dumps(tags, ensure_ascii=False) |
|
|
embedding_text = f"{goal} {final_result[:500]}" |
|
|
embedding = self.knowledge_vector_db.simple_embed(embedding_text) |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("""INSERT OR REPLACE INTO knowledge |
|
|
(id, goal, query, element, search_results, agent_output, final_result, |
|
|
embedding, created_at, access_count, tags, quality_score) |
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 0, ?, ?)""", |
|
|
(knowledge_id, goal, query, element, search_results_str, agent_output, |
|
|
final_result, json.dumps(embedding), now, tags_str, quality_score)) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
self.knowledge_vector_db.add(knowledge_id, embedding, { |
|
|
"goal": goal, "final_result": final_result[:500] |
|
|
}) |
|
|
return knowledge_id |
|
|
|
|
|
def retrieve_knowledge(self, query: str, top_k: int = 5, min_similarity: float = 0.3) -> List[Dict]: |
|
|
query_embedding = self.knowledge_vector_db.simple_embed(query) |
|
|
results = self.knowledge_vector_db.search(query_embedding, top_k * 2) |
|
|
filtered = [] |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
for id, similarity, metadata in results: |
|
|
if similarity < min_similarity: |
|
|
continue |
|
|
c.execute("""SELECT goal, query, element, search_results, agent_output, |
|
|
final_result, quality_score, created_at FROM knowledge WHERE id = ?""", (id,)) |
|
|
row = c.fetchone() |
|
|
if row: |
|
|
try: |
|
|
search_results_parsed = json.loads(row[3]) if row[3] else {} |
|
|
except: |
|
|
search_results_parsed = {} |
|
|
filtered.append({ |
|
|
"id": id, "similarity": similarity, "goal": row[0], "query": row[1], |
|
|
"element": row[2], "search_results": search_results_parsed, |
|
|
"agent_output": row[4], "final_result": row[5], |
|
|
"quality_score": row[6], "created_at": row[7] |
|
|
}) |
|
|
c.execute("UPDATE knowledge SET access_count = access_count + 1 WHERE id = ?", (id,)) |
|
|
if len(filtered) >= top_k: |
|
|
break |
|
|
conn.commit() |
|
|
conn.close() |
|
|
return filtered |
|
|
|
|
|
def get_related_knowledge_for_element(self, goal: str, element: str, top_k: int = 3) -> List[Dict]: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
query_embedding = self.knowledge_vector_db.simple_embed(goal) |
|
|
results = self.knowledge_vector_db.search(query_embedding, top_k * 3) |
|
|
filtered = [] |
|
|
for id, similarity, metadata in results: |
|
|
c.execute("SELECT element, agent_output, final_result FROM knowledge WHERE id = ?", (id,)) |
|
|
row = c.fetchone() |
|
|
if row and row[0] == element: |
|
|
filtered.append({ |
|
|
"id": id, "similarity": similarity, |
|
|
"agent_output": row[1][:300] if row[1] else "", |
|
|
"final_result": row[2][:300] if row[2] else "" |
|
|
}) |
|
|
if len(filtered) >= top_k: |
|
|
break |
|
|
conn.close() |
|
|
return filtered |
|
|
|
|
|
def cache_search(self, query: str, element: str, results: Dict, ttl_hours: int = 24) -> str: |
|
|
cache_id = hashlib.md5(f"{query}{element}".encode()).hexdigest()[:16] |
|
|
now = datetime.now() |
|
|
expiry = now + timedelta(hours=ttl_hours) |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("""INSERT OR REPLACE INTO search_cache |
|
|
(id, query, element, results, created_at, expiry_at) VALUES (?, ?, ?, ?, ?, ?)""", |
|
|
(cache_id, query, element, json.dumps(results, ensure_ascii=False), |
|
|
now.isoformat(), expiry.isoformat())) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
return cache_id |
|
|
|
|
|
def get_cached_search(self, query: str, element: str) -> Optional[Dict]: |
|
|
cache_id = hashlib.md5(f"{query}{element}".encode()).hexdigest()[:16] |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("""SELECT results, expiry_at FROM search_cache |
|
|
WHERE id = ? AND expiry_at > ?""", (cache_id, datetime.now().isoformat())) |
|
|
row = c.fetchone() |
|
|
conn.close() |
|
|
if row: |
|
|
return json.loads(row[0]) |
|
|
return None |
|
|
|
|
|
def learn_pattern(self, input_pattern: str, output_pattern: str, pattern_type: str, success: bool): |
|
|
input_pattern = ensure_string(input_pattern) |
|
|
output_pattern = ensure_string(output_pattern) |
|
|
pattern_type = ensure_string(pattern_type) |
|
|
pattern_id = hashlib.md5(f"{pattern_type}{input_pattern}".encode()).hexdigest()[:16] |
|
|
now = datetime.now().isoformat() |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("SELECT success_count, fail_count FROM learning_patterns WHERE id = ?", (pattern_id,)) |
|
|
row = c.fetchone() |
|
|
if row: |
|
|
success_count, fail_count = row |
|
|
if success: |
|
|
success_count += 1 |
|
|
else: |
|
|
fail_count += 1 |
|
|
confidence = success_count / (success_count + fail_count + 1) |
|
|
c.execute("""UPDATE learning_patterns |
|
|
SET success_count = ?, fail_count = ?, confidence = ?, updated_at = ? WHERE id = ?""", |
|
|
(success_count, fail_count, confidence, now, pattern_id)) |
|
|
else: |
|
|
success_count = 1 if success else 0 |
|
|
fail_count = 0 if success else 1 |
|
|
confidence = success_count / (success_count + fail_count + 1) |
|
|
c.execute("""INSERT INTO learning_patterns |
|
|
(id, pattern_type, input_pattern, output_pattern, success_count, fail_count, confidence, created_at, updated_at) |
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", |
|
|
(pattern_id, pattern_type, input_pattern, output_pattern, |
|
|
success_count, fail_count, confidence, now, now)) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
def get_learned_patterns(self, pattern_type: str, min_confidence: float = 0.5) -> List[Dict]: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("""SELECT input_pattern, output_pattern, confidence FROM learning_patterns |
|
|
WHERE pattern_type = ? AND confidence >= ? ORDER BY confidence DESC""", |
|
|
(pattern_type, min_confidence)) |
|
|
results = [{"input": r[0], "output": r[1], "confidence": r[2]} for r in c.fetchall()] |
|
|
conn.close() |
|
|
return results |
|
|
|
|
|
def save_session(self, state): |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("""INSERT OR REPLACE INTO sessions |
|
|
(id, goal, iterations, satisfied, satisfaction_score, final_output, created_at, history) |
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", |
|
|
(state.session_id, state.goal, state.iteration, 1 if state.satisfied else 0, |
|
|
state.satisfaction_score, ensure_string(state.final_output), |
|
|
datetime.now().isoformat(), json.dumps(state.history, ensure_ascii=False))) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
def _update_access_count(self, memory_ids: List[str]): |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
now = datetime.now().isoformat() |
|
|
for mid in memory_ids: |
|
|
c.execute("""UPDATE memories SET access_count = access_count + 1, last_accessed = ? WHERE id = ?""", |
|
|
(now, mid)) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
def _cleanup_memories(self, memory_type: str): |
|
|
config = MEMORY_CONFIG.get(memory_type, {}) |
|
|
max_items = config.get("max_items", 100) |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
c.execute("""DELETE FROM memories WHERE id IN |
|
|
(SELECT id FROM memories WHERE memory_type = ? ORDER BY last_accessed DESC LIMIT -1 OFFSET ?)""", |
|
|
(memory_type, max_items)) |
|
|
conn.commit() |
|
|
conn.close() |
|
|
|
|
|
def get_memory_stats(self) -> Dict: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
stats = {} |
|
|
for mem_type in ["short_term", "mid_term", "long_term"]: |
|
|
c.execute("SELECT COUNT(*) FROM memories WHERE memory_type = ?", (mem_type,)) |
|
|
stats[mem_type] = c.fetchone()[0] |
|
|
c.execute("SELECT COUNT(*) FROM learning_patterns") |
|
|
stats["patterns"] = c.fetchone()[0] |
|
|
c.execute("SELECT COUNT(*) FROM sessions") |
|
|
stats["sessions"] = c.fetchone()[0] |
|
|
c.execute("SELECT COUNT(*) FROM knowledge") |
|
|
stats["knowledge"] = c.fetchone()[0] |
|
|
c.execute("SELECT COUNT(*) FROM search_cache") |
|
|
stats["search_cache"] = c.fetchone()[0] |
|
|
conn.close() |
|
|
return stats |
|
|
|
|
|
def get_dashboard_data(self) -> Dict: |
|
|
conn = sqlite3.connect(self.db_path) |
|
|
c = conn.cursor() |
|
|
data = { |
|
|
"memory": {}, "knowledge": {}, "learning": {}, |
|
|
"sessions": {}, "elements": {}, "timeline": {} |
|
|
} |
|
|
for mem_type in ["short_term", "mid_term", "long_term"]: |
|
|
c.execute("SELECT COUNT(*) FROM memories WHERE memory_type = ?", (mem_type,)) |
|
|
data["memory"][mem_type] = c.fetchone()[0] |
|
|
c.execute("SELECT COUNT(*) FROM knowledge") |
|
|
data["knowledge"]["total"] = c.fetchone()[0] |
|
|
c.execute("SELECT AVG(quality_score) FROM knowledge") |
|
|
avg_quality = c.fetchone()[0] |
|
|
data["knowledge"]["avg_quality"] = round(avg_quality, 3) if avg_quality else 0 |
|
|
c.execute("SELECT SUM(access_count) FROM knowledge") |
|
|
total_access = c.fetchone()[0] |
|
|
data["knowledge"]["total_access"] = total_access or 0 |
|
|
c.execute("SELECT COUNT(*) FROM learning_patterns") |
|
|
data["learning"]["total_patterns"] = c.fetchone()[0] |
|
|
c.execute("SELECT AVG(confidence) FROM learning_patterns") |
|
|
avg_conf = c.fetchone()[0] |
|
|
data["learning"]["avg_confidence"] = round(avg_conf, 3) if avg_conf else 0 |
|
|
c.execute("SELECT SUM(success_count), SUM(fail_count) FROM learning_patterns") |
|
|
row = c.fetchone() |
|
|
success = row[0] or 0 |
|
|
fail = row[1] or 0 |
|
|
data["learning"]["success_rate"] = round(success / (success + fail + 1) * 100, 1) |
|
|
data["learning"]["total_attempts"] = success + fail |
|
|
c.execute("SELECT COUNT(*) FROM sessions") |
|
|
data["sessions"]["total"] = c.fetchone()[0] |
|
|
c.execute("SELECT COUNT(*) FROM sessions WHERE satisfied = 1") |
|
|
data["sessions"]["completed"] = c.fetchone()[0] |
|
|
c.execute("SELECT AVG(satisfaction_score) FROM sessions") |
|
|
avg_sat = c.fetchone()[0] |
|
|
data["sessions"]["avg_satisfaction"] = round(avg_sat, 3) if avg_sat else 0 |
|
|
c.execute("SELECT AVG(iterations) FROM sessions") |
|
|
avg_iter = c.fetchone()[0] |
|
|
data["sessions"]["avg_iterations"] = round(avg_iter, 1) if avg_iter else 0 |
|
|
for element in ["土", "金", "水", "木", "火"]: |
|
|
c.execute("SELECT COUNT(*) FROM knowledge WHERE element = ?", (element,)) |
|
|
data["elements"][element] = c.fetchone()[0] |
|
|
data["timeline"]["dates"] = [] |
|
|
data["timeline"]["knowledge_count"] = [] |
|
|
data["timeline"]["memory_count"] = [] |
|
|
data["timeline"]["session_count"] = [] |
|
|
for i in range(6, -1, -1): |
|
|
date = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d") |
|
|
data["timeline"]["dates"].append(date[-5:]) |
|
|
c.execute("SELECT COUNT(*) FROM knowledge WHERE DATE(created_at) = ?", (date,)) |
|
|
data["timeline"]["knowledge_count"].append(c.fetchone()[0]) |
|
|
c.execute("SELECT COUNT(*) FROM memories WHERE DATE(created_at) = ?", (date,)) |
|
|
data["timeline"]["memory_count"].append(c.fetchone()[0]) |
|
|
c.execute("SELECT COUNT(*) FROM sessions WHERE DATE(created_at) = ?", (date,)) |
|
|
data["timeline"]["session_count"].append(c.fetchone()[0]) |
|
|
|
|
|
|
|
|
c.execute("SELECT quality_score FROM knowledge WHERE quality_score IS NOT NULL") |
|
|
quality_scores = [row[0] for row in c.fetchall()] |
|
|
quality_dist = { |
|
|
"very_low": sum(1 for q in quality_scores if 0 <= q < 0.2), |
|
|
"low": sum(1 for q in quality_scores if 0.2 <= q < 0.4), |
|
|
"medium": sum(1 for q in quality_scores if 0.4 <= q < 0.6), |
|
|
"high": sum(1 for q in quality_scores if 0.6 <= q < 0.8), |
|
|
"very_high": sum(1 for q in quality_scores if 0.8 <= q <= 1.0) |
|
|
} |
|
|
data["quality_distribution"] = quality_dist |
|
|
|
|
|
|
|
|
element_performance = {} |
|
|
for element in ["土", "金", "水", "木", "火"]: |
|
|
c.execute("SELECT AVG(quality_score), COUNT(*) FROM knowledge WHERE element = ?", (element,)) |
|
|
row = c.fetchone() |
|
|
avg_quality = (row[0] * 100) if row[0] else 0 |
|
|
count = row[1] |
|
|
element_performance[element] = { |
|
|
"avg_quality": avg_quality, |
|
|
"avg_speed": min(100, count * 3), |
|
|
"count": count |
|
|
} |
|
|
data["element_performance"] = element_performance |
|
|
|
|
|
intelligence_score = self._calculate_intelligence_score(data) |
|
|
data["intelligence"] = intelligence_score |
|
|
conn.close() |
|
|
return data |
|
|
|
|
|
def _calculate_intelligence_score(self, data: Dict) -> Dict: |
|
|
scores = {} |
|
|
total_memory = sum(data["memory"].values()) |
|
|
long_term_ratio = data["memory"].get("long_term", 0) / max(total_memory, 1) |
|
|
scores["memory_score"] = min(100, total_memory * 2 + long_term_ratio * 50) |
|
|
knowledge_count = data["knowledge"].get("total", 0) |
|
|
avg_quality = data["knowledge"].get("avg_quality", 0) |
|
|
scores["knowledge_score"] = min(100, knowledge_count * 1.5 + avg_quality * 30) |
|
|
patterns = data["learning"].get("total_patterns", 0) |
|
|
success_rate = data["learning"].get("success_rate", 0) |
|
|
scores["learning_score"] = min(100, patterns * 3 + success_rate * 0.5) |
|
|
sessions = data["sessions"].get("total", 0) |
|
|
completed = data["sessions"].get("completed", 0) |
|
|
completion_rate = completed / max(sessions, 1) * 100 |
|
|
scores["experience_score"] = min(100, sessions * 2 + completion_rate * 0.3) |
|
|
scores["total"] = round( |
|
|
scores["memory_score"] * 0.2 + |
|
|
scores["knowledge_score"] * 0.3 + |
|
|
scores["learning_score"] * 0.25 + |
|
|
scores["experience_score"] * 0.25, 1) |
|
|
total = scores["total"] |
|
|
if total < 20: |
|
|
scores["level"], scores["level_name"] = "🌱 초보", "Novice" |
|
|
elif total < 40: |
|
|
scores["level"], scores["level_name"] = "🌿 성장", "Growing" |
|
|
elif total < 60: |
|
|
scores["level"], scores["level_name"] = "🌳 숙련", "Skilled" |
|
|
elif total < 80: |
|
|
scores["level"], scores["level_name"] = "🎋 전문", "Expert" |
|
|
else: |
|
|
scores["level"], scores["level_name"] = "🏆 마스터", "Master" |
|
|
return scores |
|
|
|
|
|
|
|
|
|
|
|
class EmergenceEngine: |
|
|
def __init__(self, creat_json_path: str = CREAT_JSON_PATH): |
|
|
self.creat_data = self._load_creat_json(creat_json_path) |
|
|
|
|
|
def _load_creat_json(self, path: str) -> Dict: |
|
|
if Path(path).exists(): |
|
|
with open(path, "r", encoding="utf-8") as f: |
|
|
return json.load(f) |
|
|
return {"_meta": {"description": "기본 창발 매트릭스"}} |
|
|
|
|
|
def _detect_question_type(self, context: str) -> str: |
|
|
if any(kw in context for kw in ["예측", "전망", "될까", "될 것", "미래", "2025", "2026", "2027", "2028"]): |
|
|
return "예측" |
|
|
elif any(kw in context for kw in ["전략", "방법", "어떻게", "방안", "계획", "수립"]): |
|
|
return "전략" |
|
|
elif any(kw in context for kw in ["비교", "차이", "vs", "VS", "대", "어느 것"]): |
|
|
return "비교" |
|
|
elif any(kw in context for kw in ["분석", "왜", "원인", "영향", "현황"]): |
|
|
return "분석" |
|
|
elif any(kw in context for kw in ["특허", "발명", "아이디어", "혁신", "신기술"]): |
|
|
return "발명" |
|
|
elif any(kw in context for kw in ["소설", "스토리", "시나리오", "웹툰", "드라마", "영화", "캐릭터", "플롯"]): |
|
|
return "스토리" |
|
|
elif any(kw in context for kw in ["요리", "레시피", "음식", "만들기", "조리", "맛", "재료"]): |
|
|
return "레시피" |
|
|
else: |
|
|
return "일반" |
|
|
|
|
|
def _detect_domains(self, context: str) -> List[str]: |
|
|
domains = [] |
|
|
if any(kw in context for kw in ["패권", "전쟁", "국제", "외교", "안보", "군사", "중국", "미국", "러시아", "NATO", "트럼프", "시진핑"]): |
|
|
domains.append("geopolitical") |
|
|
if any(kw in context for kw in ["경제", "금융", "주식", "투자", "GDP", "인플레이션", "금리", "달러", "환율", "무역", "관세"]): |
|
|
domains.append("economic") |
|
|
if any(kw in context for kw in ["기술", "AI", "반도체", "테크", "혁신", "디지털", "양자", "바이오", "로봇", "자율주행"]): |
|
|
domains.append("technology") |
|
|
if any(kw in context for kw in ["사업", "비즈니스", "스타트업", "마케팅", "전략", "경쟁", "시장", "매출", "고객"]): |
|
|
domains.append("business") |
|
|
if any(kw in context for kw in ["특허", "발명", "아이디어", "혁신", "신기술", "청구항"]): |
|
|
domains.append("invention") |
|
|
if any(kw in context for kw in ["소설", "스토리", "시나리오", "웹툰", "드라마", "영화", "캐릭터", "플롯", "장르"]): |
|
|
domains.append("story") |
|
|
if any(kw in context for kw in ["요리", "레시피", "음식", "조리", "맛", "재료", "식당", "셰프"]): |
|
|
domains.append("recipe") |
|
|
return domains if domains else ["general"] |
|
|
|
|
|
def generate_combinations(self, context: str, max_combinations: int = 30) -> List[Dict]: |
|
|
combinations = [] |
|
|
question_type = self._detect_question_type(context) |
|
|
domains = self._detect_domains(context) |
|
|
if "geopolitical" in domains and "geopolitical_analysis" in self.creat_data: |
|
|
geo = self.creat_data["geopolitical_analysis"] |
|
|
for dim in geo.get("power_dimensions", [])[:4]: |
|
|
combo = { |
|
|
"category": "지정학분석", "dimension": dim["type"], |
|
|
"idea": f"'{context}'를 {dim['type']} 관점에서 분석", |
|
|
"score": random.uniform(0.75, 0.95) |
|
|
} |
|
|
combinations.append(combo) |
|
|
if "technology" in domains and "technology_trends" in self.creat_data: |
|
|
tech = self.creat_data["technology_trends"] |
|
|
for frontier in tech.get("current_frontiers", []): |
|
|
combo = { |
|
|
"category": "기술프론티어", "domain": frontier["domain"], |
|
|
"idea": f"'{frontier['domain']}' 기술 관점에서 분석", |
|
|
"score": random.uniform(0.7, 0.9) |
|
|
} |
|
|
combinations.append(combo) |
|
|
if "business" in domains and "business_strategy" in self.creat_data: |
|
|
biz = self.creat_data["business_strategy"] |
|
|
for framework in biz.get("frameworks", [])[:4]: |
|
|
combo = { |
|
|
"category": "비즈니스프레임워크", "framework": framework["name"], |
|
|
"idea": f"'{framework['name']}' 프레임워크 적용", |
|
|
"score": random.uniform(0.65, 0.85) |
|
|
} |
|
|
combinations.append(combo) |
|
|
combinations.sort(key=lambda x: x["score"], reverse=True) |
|
|
return combinations[:max_combinations] |
|
|
|
|
|
def cross_combine(self, ideas: List[Dict], max_cross: int = 10) -> List[Dict]: |
|
|
cross_ideas = [] |
|
|
if len(ideas) < 2: |
|
|
return cross_ideas |
|
|
top_ideas = ideas[:min(6, len(ideas))] |
|
|
for i, idea1 in enumerate(top_ideas): |
|
|
for idea2 in top_ideas[i+1:]: |
|
|
if idea1["category"] != idea2["category"]: |
|
|
cross = { |
|
|
"category": "교차조합", |
|
|
"sources": [idea1["category"], idea2["category"]], |
|
|
"idea": f"[{idea1['category']} + {idea2['category']}] 융합 아이디어", |
|
|
"score": (idea1["score"] + idea2["score"]) / 2 * 1.15 |
|
|
} |
|
|
cross_ideas.append(cross) |
|
|
if len(cross_ideas) >= max_cross: |
|
|
break |
|
|
if len(cross_ideas) >= max_cross: |
|
|
break |
|
|
return cross_ideas |
|
|
|
|
|
def format_for_prompt(self, combinations: List[Dict]) -> str: |
|
|
lines = ["[브루트포스 창발 매트릭스]", ""] |
|
|
by_category = {} |
|
|
for combo in combinations[:20]: |
|
|
cat = combo["category"] |
|
|
if cat not in by_category: |
|
|
by_category[cat] = [] |
|
|
by_category[cat].append(combo) |
|
|
idx = 1 |
|
|
for cat, combos in by_category.items(): |
|
|
lines.append(f"━━ {cat} ━━") |
|
|
for combo in combos[:4]: |
|
|
score_bar = "●" * int(combo["score"] * 5) + "○" * (5 - int(combo["score"] * 5)) |
|
|
lines.append(f"{idx}. {score_bar} {combo['idea'][:80]}") |
|
|
idx += 1 |
|
|
lines.append("") |
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
|
|
|
class LLMClient: |
|
|
MODEL_LEVELS = { |
|
|
"LOW": {"name": "⚡ gpt-oss-120b", "provider": "groq", "model": "openai/gpt-oss-120b", "description": "빠른 응답, 기본 분석", "disabled": False}, |
|
|
"MIDDLE": {"name": "🔥 GLM-4.7", "provider": "fireworks", "model": "accounts/fireworks/models/glm-4p7", "description": "균형잡힌 성능, 심층 분석", "disabled": False}, |
|
|
"HIGH": {"name": "🧠 Claude 4.5 Sonnet", "provider": "replicate", "model": "anthropic/claude-4.5-sonnet", "description": "최고 품질, 심층 추론", "disabled": False} |
|
|
} |
|
|
|
|
|
def __init__(self, model_level: str = "LOW"): |
|
|
self.model_level = model_level |
|
|
self._setup_client() |
|
|
|
|
|
def _setup_client(self): |
|
|
level_config = self.MODEL_LEVELS.get(self.model_level, self.MODEL_LEVELS["LOW"]) |
|
|
self.provider = level_config["provider"] |
|
|
self.model = level_config["model"] |
|
|
self.client = None |
|
|
if self.provider == "groq": |
|
|
self.api_key = os.getenv("GROQ_API_KEY", "") |
|
|
if HAS_GROQ and self.api_key: |
|
|
try: |
|
|
self.client = Groq(api_key=self.api_key) |
|
|
except Exception as e: |
|
|
print(f"⚠️ Groq 클라이언트 초기화 실패: {e}") |
|
|
elif self.provider == "fireworks": |
|
|
self.api_key = os.getenv("FIREWORKS_API_KEY", "") |
|
|
self.api_url = "https://api.fireworks.ai/inference/v1/chat/completions" |
|
|
elif self.provider == "replicate": |
|
|
self.api_key = os.getenv("REPLICATE_API_TOKEN", "") |
|
|
if self.api_key: |
|
|
os.environ["REPLICATE_API_TOKEN"] = self.api_key |
|
|
|
|
|
def set_model_level(self, level: str) -> bool: |
|
|
if level in self.MODEL_LEVELS and not self.MODEL_LEVELS[level].get("disabled"): |
|
|
self.model_level = level |
|
|
self._setup_client() |
|
|
return True |
|
|
return False |
|
|
|
|
|
def get_model_info(self) -> str: |
|
|
config = self.MODEL_LEVELS.get(self.model_level, {}) |
|
|
return f"{config.get('name', 'Unknown')}" |
|
|
|
|
|
def chat(self, system_prompt: str, user_prompt: str, stream: bool = True) -> Generator[str, None, None]: |
|
|
if self.provider == "groq": |
|
|
yield from self._chat_groq(system_prompt, user_prompt, stream) |
|
|
elif self.provider == "fireworks": |
|
|
yield from self._chat_fireworks(system_prompt, user_prompt, stream) |
|
|
elif self.provider == "replicate": |
|
|
yield from self._chat_replicate(system_prompt, user_prompt, stream) |
|
|
else: |
|
|
yield from self._chat_groq(system_prompt, user_prompt, stream) |
|
|
|
|
|
def _chat_groq(self, system_prompt: str, user_prompt: str, stream: bool = True) -> Generator[str, None, None]: |
|
|
if not HAS_GROQ: |
|
|
yield "[ERROR] groq 라이브러리 미설치" |
|
|
return |
|
|
if not self.api_key: |
|
|
yield "[ERROR] GROQ_API_KEY 미설정" |
|
|
return |
|
|
if not self.client: |
|
|
try: |
|
|
self.client = Groq(api_key=self.api_key) |
|
|
except Exception as e: |
|
|
yield f"[ERROR] Groq 클라이언트 생성 실패: {e}" |
|
|
return |
|
|
try: |
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": user_prompt} |
|
|
] |
|
|
if stream: |
|
|
completion = self.client.chat.completions.create( |
|
|
model=self.model, messages=messages, |
|
|
temperature=0.7, max_completion_tokens=8192, top_p=1, stream=True |
|
|
) |
|
|
for chunk in completion: |
|
|
try: |
|
|
if chunk.choices and len(chunk.choices) > 0: |
|
|
delta = chunk.choices[0].delta |
|
|
if delta and delta.content: |
|
|
yield delta.content |
|
|
except: |
|
|
continue |
|
|
else: |
|
|
completion = self.client.chat.completions.create( |
|
|
model=self.model, messages=messages, |
|
|
temperature=0.7, max_completion_tokens=8192, top_p=1, stream=False |
|
|
) |
|
|
if completion.choices and len(completion.choices) > 0: |
|
|
yield completion.choices[0].message.content or "" |
|
|
except Exception as e: |
|
|
yield f"\n[ERROR] Groq API: {str(e)[:150]}" |
|
|
|
|
|
def _chat_fireworks(self, system_prompt: str, user_prompt: str, stream: bool = True) -> Generator[str, None, None]: |
|
|
if not self.api_key: |
|
|
yield "[ERROR] FIREWORKS_API_KEY 미설정" |
|
|
return |
|
|
try: |
|
|
headers = { |
|
|
"Accept": "application/json", |
|
|
"Content-Type": "application/json", |
|
|
"Authorization": f"Bearer {self.api_key}" |
|
|
} |
|
|
messages = [ |
|
|
{"role": "system", "content": system_prompt}, |
|
|
{"role": "user", "content": user_prompt} |
|
|
] |
|
|
payload = { |
|
|
"model": self.model, "max_tokens": 8192, "top_p": 1, "top_k": 40, |
|
|
"presence_penalty": 0, "frequency_penalty": 0, "temperature": 0.7, |
|
|
"messages": messages, "stream": stream |
|
|
} |
|
|
if stream: |
|
|
response = requests.post(self.api_url, headers=headers, json=payload, stream=True, timeout=120) |
|
|
for line in response.iter_lines(): |
|
|
if line: |
|
|
line_text = line.decode('utf-8') |
|
|
if line_text.startswith("data: "): |
|
|
data_str = line_text[6:] |
|
|
if data_str.strip() == "[DONE]": |
|
|
break |
|
|
try: |
|
|
data = json.loads(data_str) |
|
|
if data.get("choices") and data["choices"][0].get("delta", {}).get("content"): |
|
|
yield data["choices"][0]["delta"]["content"] |
|
|
except: |
|
|
continue |
|
|
else: |
|
|
response = requests.post(self.api_url, headers=headers, json=payload, timeout=120) |
|
|
result = response.json() |
|
|
if result.get("choices"): |
|
|
yield result["choices"][0].get("message", {}).get("content", "") |
|
|
except Exception as e: |
|
|
yield f"\n[ERROR] Fireworks API: {str(e)[:150]}" |
|
|
|
|
|
def _chat_replicate(self, system_prompt: str, user_prompt: str, stream: bool = True) -> Generator[str, None, None]: |
|
|
if not self.api_key: |
|
|
yield "[ERROR] REPLICATE_API_TOKEN 미설정" |
|
|
return |
|
|
try: |
|
|
import replicate |
|
|
full_prompt = f"""<system> |
|
|
{system_prompt} |
|
|
</system> |
|
|
<user> |
|
|
{user_prompt} |
|
|
</user>""" |
|
|
input_data = {"prompt": full_prompt, "max_tokens": 8192} |
|
|
if stream: |
|
|
for event in replicate.stream(self.model, input=input_data): |
|
|
yield str(event) |
|
|
else: |
|
|
output = replicate.run(self.model, input=input_data) |
|
|
if hasattr(output, '__iter__') and not isinstance(output, str): |
|
|
yield "".join(str(chunk) for chunk in output) |
|
|
else: |
|
|
yield str(output) |
|
|
except ImportError: |
|
|
yield "[ERROR] replicate 라이브러리 미설치. pip install replicate" |
|
|
except Exception as e: |
|
|
yield f"\n[ERROR] Replicate API: {str(e)[:150]}" |
|
|
|
|
|
def chat_sync(self, system_prompt: str, user_prompt: str) -> str: |
|
|
return "".join(self.chat(system_prompt, user_prompt, stream=False)) |
|
|
|
|
|
|
|
|
|
|
|
def create_dashboard_charts(data: Dict) -> Tuple: |
|
|
"""Create intuitive AGI intelligence evolution dashboard charts""" |
|
|
import matplotlib.pyplot as plt |
|
|
import matplotlib |
|
|
matplotlib.use('Agg') |
|
|
plt.rcParams['font.family'] = ['DejaVu Sans', 'sans-serif'] |
|
|
plt.rcParams['axes.unicode_minus'] = False |
|
|
|
|
|
charts = {} |
|
|
|
|
|
|
|
|
fig1, ax1 = plt.subplots(figsize=(10, 5), facecolor='#FEF9C3') |
|
|
ax1.set_facecolor('#FEF9C3') |
|
|
timeline = data.get("timeline", {}) |
|
|
dates = timeline.get("dates", []) |
|
|
if dates and len(dates) > 0: |
|
|
knowledge_counts = timeline.get("knowledge_count", []) |
|
|
memory_counts = timeline.get("memory_count", []) |
|
|
iq_scores = [] |
|
|
base_iq = 20 |
|
|
for i, (k, m) in enumerate(zip(knowledge_counts, memory_counts)): |
|
|
daily_growth = (k * 2 + m * 0.5) |
|
|
base_iq += daily_growth |
|
|
iq_scores.append(min(100, base_iq)) |
|
|
line1 = ax1.plot(dates, iq_scores, marker='o', linewidth=3, |
|
|
color='#FACC15', markersize=8, label='Total IQ', zorder=3) |
|
|
ax1.fill_between(dates, iq_scores, alpha=0.3, color='#FACC15') |
|
|
if len(iq_scores) > 1: |
|
|
growth = iq_scores[-1] - iq_scores[0] |
|
|
growth_pct = (growth / iq_scores[0] * 100) if iq_scores[0] > 0 else 0 |
|
|
ax1.text(0.02, 0.98, f'📈 Growth: +{growth:.1f} ({growth_pct:+.1f}%)', |
|
|
transform=ax1.transAxes, fontsize=11, fontweight='bold', |
|
|
va='top', bbox=dict(boxstyle='round', facecolor='#10B981', alpha=0.8, edgecolor='#1F2937')) |
|
|
ax1.set_title('🧠 Intelligence Evolution Timeline', fontsize=14, fontweight='bold', pad=15) |
|
|
ax1.set_ylabel('IQ Score', fontsize=11, fontweight='bold') |
|
|
ax1.set_xlabel('Date', fontsize=11, fontweight='bold') |
|
|
ax1.grid(True, alpha=0.2, linestyle='--') |
|
|
ax1.spines['top'].set_visible(False) |
|
|
ax1.spines['right'].set_visible(False) |
|
|
ax1.legend(loc='lower right', fontsize=10) |
|
|
ax1.set_ylim(0, 105) |
|
|
else: |
|
|
ax1.text(0.5, 0.5, 'No Timeline Data', ha='center', va='center', fontsize=14, fontweight='bold') |
|
|
plt.tight_layout() |
|
|
charts["intelligence_timeline"] = fig1 |
|
|
|
|
|
|
|
|
fig2, ax2 = plt.subplots(figsize=(6, 4), facecolor='#FEF9C3') |
|
|
ax2.set_facecolor('#FEF9C3') |
|
|
quality_dist = data.get("quality_distribution", {}) |
|
|
if quality_dist: |
|
|
bins = ['0-20%', '20-40%', '40-60%', '60-80%', '80-100%'] |
|
|
counts = [ |
|
|
quality_dist.get('very_low', 0), |
|
|
quality_dist.get('low', 0), |
|
|
quality_dist.get('medium', 0), |
|
|
quality_dist.get('high', 0), |
|
|
quality_dist.get('very_high', 0) |
|
|
] |
|
|
colors = ['#EF4444', '#F59E0B', '#FACC15', '#10B981', '#059669'] |
|
|
bars = ax2.bar(bins, counts, color=colors, edgecolor='#1F2937', linewidth=2) |
|
|
for bar in bars: |
|
|
height = bar.get_height() |
|
|
if height > 0: |
|
|
ax2.text(bar.get_x() + bar.get_width()/2, height + 0.5, |
|
|
int(height), ha='center', fontsize=10, fontweight='bold') |
|
|
ax2.set_title('📊 Knowledge Quality Distribution', fontsize=13, fontweight='bold') |
|
|
ax2.set_ylabel('Count', fontsize=10, fontweight='bold') |
|
|
ax2.set_xlabel('Quality Range', fontsize=10, fontweight='bold') |
|
|
ax2.spines['top'].set_visible(False) |
|
|
ax2.spines['right'].set_visible(False) |
|
|
else: |
|
|
ax2.text(0.5, 0.5, 'No Quality Data', ha='center', va='center', fontsize=12, fontweight='bold') |
|
|
plt.tight_layout() |
|
|
charts["quality_dist"] = fig2 |
|
|
|
|
|
|
|
|
fig3, ax3 = plt.subplots(figsize=(7, 4), facecolor='#FEF9C3') |
|
|
ax3.set_facecolor('#FEF9C3') |
|
|
elements = data.get("elements", {}) |
|
|
elem_performance = data.get("element_performance", {}) |
|
|
elem_names = ['土\nEarth', '金\nMetal', '水\nWater', '木\nWood', '火\nFire'] |
|
|
metrics = ['Activity', 'Quality', 'Speed'] |
|
|
matrix = [] |
|
|
for elem in ['土', '金', '水', '木', '火']: |
|
|
perf = elem_performance.get(elem, {}) |
|
|
row = [ |
|
|
min(100, elements.get(elem, 0) * 2), |
|
|
perf.get('avg_quality', 50), |
|
|
perf.get('avg_speed', 50) |
|
|
] |
|
|
matrix.append(row) |
|
|
import numpy as np |
|
|
matrix = np.array(matrix) |
|
|
im = ax3.imshow(matrix.T, cmap='RdYlGn', aspect='auto', vmin=0, vmax=100) |
|
|
ax3.set_xticks(np.arange(len(elem_names))) |
|
|
ax3.set_yticks(np.arange(len(metrics))) |
|
|
ax3.set_xticklabels(elem_names, fontsize=10, fontweight='bold') |
|
|
ax3.set_yticklabels(metrics, fontsize=10, fontweight='bold') |
|
|
for i in range(len(metrics)): |
|
|
for j in range(len(elem_names)): |
|
|
text = ax3.text(j, i, f'{matrix[j, i]:.0f}', |
|
|
ha="center", va="center", color="black", fontsize=9, fontweight='bold') |
|
|
ax3.set_title('🔥 Five Elements Performance Matrix', fontsize=13, fontweight='bold', pad=10) |
|
|
cbar = plt.colorbar(im, ax=ax3, orientation='horizontal', pad=0.1, aspect=30) |
|
|
cbar.set_label('Performance Score', fontsize=9, fontweight='bold') |
|
|
plt.tight_layout() |
|
|
charts["element_heatmap"] = fig3 |
|
|
|
|
|
|
|
|
fig4, ax4 = plt.subplots(figsize=(6, 4), facecolor='#FEF9C3') |
|
|
ax4.set_facecolor('#FEF9C3') |
|
|
learning = data.get("learning", {}) |
|
|
success_rate = learning.get("success_rate", 0) |
|
|
categories = ['Success\nRate', 'Pattern\nConfidence', 'Knowledge\nReuse'] |
|
|
scores = [ |
|
|
success_rate, |
|
|
learning.get("avg_confidence", 0) * 100, |
|
|
data.get("knowledge", {}).get("avg_quality", 0) * 100 |
|
|
] |
|
|
x = np.arange(len(categories)) |
|
|
bars = ax4.bar(x, scores, color=['#10B981', '#3B82F6', '#8B5CF6'], |
|
|
edgecolor='#1F2937', linewidth=2, width=0.6) |
|
|
for bar, score in zip(bars, scores): |
|
|
ax4.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 2, |
|
|
f'{score:.1f}%', ha='center', fontsize=11, fontweight='bold') |
|
|
ax4.set_ylim(0, 110) |
|
|
ax4.set_xticks(x) |
|
|
ax4.set_xticklabels(categories, fontsize=10, fontweight='bold') |
|
|
ax4.set_ylabel('Score (%)', fontsize=10, fontweight='bold') |
|
|
ax4.set_title('⚡ Learning Efficiency Metrics', fontsize=13, fontweight='bold') |
|
|
ax4.spines['top'].set_visible(False) |
|
|
ax4.spines['right'].set_visible(False) |
|
|
ax4.axhline(y=70, color='#FACC15', linestyle='--', linewidth=1.5, alpha=0.5, label='Target: 70%') |
|
|
ax4.legend(loc='upper right', fontsize=8) |
|
|
plt.tight_layout() |
|
|
charts["learning_efficiency"] = fig4 |
|
|
|
|
|
|
|
|
fig5, ax5 = plt.subplots(figsize=(6, 4), facecolor='#FEF9C3') |
|
|
ax5.set_facecolor('#FEF9C3') |
|
|
memory_data = data.get("memory", {}) |
|
|
short = memory_data.get("short_term", 0) |
|
|
mid = memory_data.get("mid_term", 0) |
|
|
long = memory_data.get("long_term", 0) |
|
|
total = short + mid + long |
|
|
if total > 0: |
|
|
stages = ['Input', 'Short→Mid', 'Mid→Long', 'Long-term\nStorage'] |
|
|
short_flow = [total, short, 0, 0] |
|
|
mid_flow = [0, mid * 0.6, mid, 0] |
|
|
long_flow = [0, 0, long * 0.3, long] |
|
|
ax5.barh(stages, short_flow, color='#EF4444', edgecolor='#1F2937', linewidth=1, label='Short-term') |
|
|
ax5.barh(stages, mid_flow, left=short_flow, color='#3B82F6', edgecolor='#1F2937', linewidth=1, label='Mid-term') |
|
|
ax5.barh(stages, long_flow, left=[s+m for s,m in zip(short_flow, mid_flow)], |
|
|
color='#10B981', edgecolor='#1F2937', linewidth=1, label='Long-term') |
|
|
ax5.set_title('💾 Memory Promotion Flow', fontsize=13, fontweight='bold') |
|
|
ax5.set_xlabel('Memory Count', fontsize=10, fontweight='bold') |
|
|
ax5.legend(loc='lower right', fontsize=9) |
|
|
ax5.spines['top'].set_visible(False) |
|
|
ax5.spines['right'].set_visible(False) |
|
|
else: |
|
|
ax5.text(0.5, 0.5, 'No Memory Data', ha='center', va='center', fontsize=12, fontweight='bold') |
|
|
plt.tight_layout() |
|
|
charts["memory_flow"] = fig5 |
|
|
|
|
|
|
|
|
fig6, ax6 = plt.subplots(figsize=(5, 4), facecolor='#FEF9C3') |
|
|
ax6.set_facecolor('#FEF9C3') |
|
|
sessions = data.get("sessions", {}) |
|
|
total_sess = sessions.get("total", 0) |
|
|
completed = sessions.get("completed", 0) |
|
|
avg_satisfaction = sessions.get("avg_satisfaction", 0) |
|
|
if total_sess > 0: |
|
|
completion_rate = (completed / total_sess) * 100 |
|
|
sizes = [completion_rate, 100 - completion_rate] |
|
|
colors = ['#10B981', '#E5E7EB'] |
|
|
wedges, texts, autotexts = ax6.pie(sizes, colors=colors, startangle=90, |
|
|
wedgeprops=dict(width=0.4, edgecolor='#1F2937', linewidth=2), |
|
|
autopct='%1.1f%%', textprops={'fontweight': 'bold', 'fontsize': 11}) |
|
|
ax6.text(0, 0, f'{completed}/{total_sess}\nCompleted', |
|
|
ha='center', va='center', fontsize=13, fontweight='bold', color='#1F2937') |
|
|
ax6.set_title(f'✅ Session Success Rate\n(Avg Satisfaction: {avg_satisfaction:.2f})', |
|
|
fontsize=12, fontweight='bold', pad=10) |
|
|
else: |
|
|
ax6.text(0.5, 0.5, 'No Sessions', ha='center', va='center', fontsize=12, fontweight='bold') |
|
|
plt.tight_layout() |
|
|
charts["session_success"] = fig6 |
|
|
|
|
|
return (charts.get("intelligence_timeline"), |
|
|
charts.get("quality_dist"), |
|
|
charts.get("element_heatmap"), |
|
|
charts.get("learning_efficiency"), |
|
|
charts.get("memory_flow"), |
|
|
charts.get("session_success")) |