Spaces:
Running
Running
| """ | |
| Nuclear Intelligence v5.0 - HuggingFace Space Optimized ⚛️ | |
| ═══════════════════════════════════════════════════════════════════ | |
| ✅ LLM-driven research (no random fake scores) | |
| ✅ Real multi-layer evaluation | |
| ✅ Real PoW mining with difficulty adjustment | |
| ✅ Live question generation (not from a fixed pool) | |
| ✅ Auto-sync with HF Dataset & GitHub | |
| ✅ 7 free LLM providers with intelligent fallback | |
| ✅ Production-grade error handling | |
| Author: QalamHipHop | License: MIT | |
| ═══════════════════════════════════════════════════════════════════ | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import threading | |
| import time | |
| import hashlib | |
| import random | |
| import re | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional, Dict, List, Any, Tuple | |
| # ─── Environment Detection ─────────────────────────────────────── | |
| IS_HF_SPACE = bool(os.getenv("SPACE_ID") or os.getenv("HF_SPACE")) | |
| PORT = int(os.getenv("GRADIO_PORT", "7860")) | |
| # ─── Try Imports with Fallbacks ───────────────────────────────── | |
| gradio_available = False | |
| logger = None # explicit init to satisfy NameError-on-attribute paths | |
| try: | |
| import gradio as gr | |
| import pandas as pd | |
| from loguru import logger as _loguru_logger | |
| logger = _loguru_logger | |
| try: | |
| import plotly.express as px | |
| except Exception as _e: | |
| print(f"WARNING: plotly import failed: {_e}") | |
| px = None | |
| gradio_available = True | |
| except ImportError as e: | |
| print(f"WARNING: Missing dependency: {e}") | |
| print("Install with: pip install gradio pandas loguru plotly") | |
| gradio_available = False | |
| except Exception as e: | |
| # HF Space often hits: huggingface_hub.HfFolder removed in >=0.26 | |
| # Gradio 4.36 oauth.py imports HfFolder, so we patch around it. | |
| print(f"WARNING: Gradio import crashed: {type(e).__name__}: {e}") | |
| try: | |
| # Lazy-grab gradio without oauth submodule | |
| import sys | |
| import importlib | |
| # Pre-stub huggingface_hub.HfFolder so oauth import doesn't fail | |
| try: | |
| import huggingface_hub as _hf | |
| if not hasattr(_hf, 'HfFolder'): | |
| class _HfFolder: | |
| def get_token(): | |
| return None | |
| def save_token(token): | |
| pass | |
| _hf.HfFolder = _HfFolder | |
| except Exception: | |
| pass | |
| # Try the actual import | |
| import gradio as gr | |
| # Try pandas | |
| try: | |
| import pandas as pd | |
| except Exception: | |
| pd = None | |
| # Try loguru | |
| try: | |
| from loguru import logger as _loguru_logger | |
| logger = _loguru_logger | |
| except Exception: | |
| pass | |
| try: | |
| import plotly.express as px | |
| except Exception: | |
| px = None | |
| gradio_available = True | |
| print(f"INFO: Gradio recovered via HfFolder shim (version={gr.__version__})") | |
| except Exception as e2: | |
| print(f"FATAL: Could not import Gradio even with shim: {type(e2).__name__}: {e2}") | |
| gradio_available = False | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except Exception: | |
| pass | |
| # ── Logger fallback (must exist even if loguru import failed) ──── | |
| if logger is None: | |
| import logging as _logging | |
| logger = _logging.getLogger("hf_deploy") | |
| if not logger.handlers: | |
| _h = _logging.StreamHandler() | |
| _h.setFormatter(_logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")) | |
| logger.addHandler(_h) | |
| logger.setLevel(_logging.INFO) | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # LLM ENGINE — Real multi-provider with intelligent fallback | |
| # ═══════════════════════════════════════════════════════════════════ | |
| class LRUCache: | |
| """Thread-safe LRU cache for LLM responses.""" | |
| def __init__(self, max_size: int = 200): | |
| self.cache: Dict[str, Any] = {} | |
| self.max_size = max_size | |
| self.hits = 0 | |
| self.misses = 0 | |
| self._lock = threading.Lock() | |
| def _make_key(self, prompt: str, model: str) -> str: | |
| return hashlib.sha256(f"{prompt[:1000]}:{model}".encode()).hexdigest() | |
| def get(self, prompt: str, model: str): | |
| key = self._make_key(prompt, model) | |
| with self._lock: | |
| if key in self.cache: | |
| self.hits += 1 | |
| return self.cache[key] | |
| self.misses += 1 | |
| return None | |
| def set(self, prompt: str, model: str, value): | |
| key = self._make_key(prompt, model) | |
| with self._lock: | |
| if len(self.cache) >= self.max_size: | |
| # Remove oldest by insertion order | |
| oldest = next(iter(self.cache)) | |
| del self.cache[oldest] | |
| self.cache[key] = value | |
| def stats(self): | |
| total = self.hits + self.misses | |
| return { | |
| "hits": self.hits, | |
| "misses": self.misses, | |
| "hit_rate": f"{(self.hits/max(total,1)*100):.1f}%", | |
| "size": len(self.cache), | |
| } | |
| class LLMEngine: | |
| """Real multi-provider LLM engine with JSON output, caching, and fallback.""" | |
| PROVIDERS = { | |
| "huggingface": { | |
| "name": "HuggingFace Router (Llama 70B)", | |
| "env": "HF_TOKEN", | |
| "base": "https://router.huggingface.co/v1", | |
| "model": "meta-llama/Llama-3.3-70B-Instruct", | |
| "priority": 0, | |
| "max_tokens": 4096, | |
| "color": "🟣", | |
| }, | |
| "huggingface_free": { | |
| "name": "HuggingFace Router (Qwen 72B)", | |
| "env": "HF_TOKEN", | |
| "base": "https://router.huggingface.co/v1", | |
| "model": "Qwen/Qwen2.5-72B-Instruct", | |
| "priority": 1, | |
| "max_tokens": 4096, | |
| "color": "🟤", | |
| }, | |
| "groq": { | |
| "name": "Groq LPU", | |
| "env": "GROQ_API_KEY", | |
| "base": "https://api.groq.com/openai/v1", | |
| "model": "llama-3.3-70b-versatile", | |
| "priority": 2, | |
| "max_tokens": 4096, | |
| "color": "⚡", | |
| }, | |
| "deepseek": { | |
| "name": "DeepSeek V3", | |
| "env": "DEEPSEEK_API_KEY", | |
| "base": "https://api.deepseek.com/v1", | |
| "model": "deepseek-chat", | |
| "priority": 3, | |
| "max_tokens": 4096, | |
| "color": "🟢", | |
| }, | |
| "gemini": { | |
| "name": "Gemini 2.0 Flash", | |
| "env": "GEMINI_API_KEY", | |
| "base": "https://generativelanguage.googleapis.com/v1beta", | |
| "model": "gemini-2.0-flash", | |
| "priority": 4, | |
| "max_tokens": 4096, | |
| "color": "🟡", | |
| }, | |
| "aimlapi": { | |
| "name": "AIMLAPI GPT-4o", | |
| "env": "AIMLAPI_API_KEY", | |
| "base": "https://api.aimlapi.com/v1", | |
| "model": "gpt-4o", | |
| "priority": 5, | |
| "max_tokens": 4096, | |
| "color": "🔵", | |
| }, | |
| "together": { | |
| "name": "Together AI", | |
| "env": "TOGETHER_API_KEY", | |
| "base": "https://api.together.xyz/v1", | |
| "model": "meta-llama/Llama-3.3-70B-Instruct-Turbo", | |
| "priority": 6, | |
| "max_tokens": 4096, | |
| "color": "🟠", | |
| }, | |
| "fireworks": { | |
| "name": "Fireworks AI", | |
| "env": "FIREWORKS_API_KEY", | |
| "base": "https://api.fireworks.ai/inference/v1", | |
| "model": "accounts/fireworks/models/llama-v3p3-70b-instruct", | |
| "priority": 7, | |
| "max_tokens": 4096, | |
| "color": "🔥", | |
| }, | |
| } | |
| def __init__(self): | |
| self._available: List[str] = [] | |
| self._stats = {"requests": 0, "successes": 0, "failures": 0, "by_provider": {}} | |
| self._current: Optional[str] = None | |
| self._last_error: Optional[str] = None | |
| self.cache = LRUCache() | |
| self._health: Dict[str, Dict] = {} | |
| self._init_providers() | |
| def _init_providers(self): | |
| for name, cfg in self.PROVIDERS.items(): | |
| key = os.getenv(cfg["env"], "").strip() | |
| if not key or len(key) < 10 or key.startswith("placeholder"): | |
| continue | |
| # Validate key format per provider | |
| valid = True | |
| if name == "groq" and not key.startswith("gsk_"): | |
| valid = False | |
| if name == "deepseek" and not (key.startswith("sk-") or key.startswith("sk_")): | |
| valid = False | |
| if name in ("huggingface", "huggingface_free") and not key.startswith("hf_"): | |
| valid = False | |
| if name == "gemini" and len(key) < 20: | |
| valid = False | |
| if name == "aimlapi" and len(key) < 20: | |
| valid = False | |
| if valid and name not in self._available: | |
| self._available.append(name) | |
| self._health[name] = {"status": "active", "failures": 0, "latency": 0} | |
| # On HF Space, HF_TOKEN is always available — make sure it's in the list | |
| if not self._available: | |
| hf_key = os.getenv("HF_TOKEN", "").strip() | |
| if hf_key and hf_key.startswith("hf_"): | |
| self._available = ["huggingface", "huggingface_free"] | |
| for n in self._available: | |
| self._health[n] = {"status": "active", "failures": 0, "latency": 0} | |
| if not self._available: | |
| self._available = ["demo"] | |
| logger.warning("⚠️ No LLM providers configured. Add API keys to enable real research.") | |
| logger.info(f"🔌 LLM providers ready: {self._available}") | |
| def chat( | |
| self, | |
| prompt: str, | |
| system: str = "", | |
| temperature: float = 0.7, | |
| max_tokens: Optional[int] = None, | |
| json_mode: bool = False, | |
| ) -> Optional[str]: | |
| """Send a chat request. Returns the content string or None on failure.""" | |
| cache_key = json.dumps({"p": prompt[:1000], "s": system[:200], "t": temperature, "j": json_mode}) | |
| cached = self.cache.get(cache_key, self._current or "default") | |
| if cached: | |
| return cached | |
| for provider in self._available: | |
| if provider == "demo": | |
| return self._demo_response(prompt, system) | |
| try: | |
| content = self._call_provider(provider, prompt, system, temperature, max_tokens, json_mode) | |
| if content: | |
| self._record_success(provider, content) | |
| self.cache.set(cache_key, provider, content) | |
| return content | |
| except Exception as e: | |
| if provider in self._health: | |
| self._health[provider]["failures"] += 1 | |
| logger.warning(f"Provider {provider} failed: {str(e)[:200]}") | |
| self._last_error = f"{provider}: {str(e)[:100]}" | |
| continue | |
| self._stats["failures"] += 1 | |
| return None | |
| def _call_provider( | |
| self, | |
| provider: str, | |
| prompt: str, | |
| system: str, | |
| temperature: float, | |
| max_tokens: Optional[int], | |
| json_mode: bool, | |
| ) -> Optional[str]: | |
| import requests | |
| cfg = self.PROVIDERS[provider] | |
| api_key = os.getenv(cfg["env"]) | |
| max_t = max_tokens or cfg["max_tokens"] | |
| # Gemini uses a different API | |
| if provider == "gemini": | |
| url = f"{cfg['base']}/models/{cfg['model']}:generateContent?key={api_key}" | |
| contents = [] | |
| if system: | |
| contents.append({"role": "user", "parts": [{"text": system}]}) | |
| contents.append({"role": "user", "parts": [{"text": prompt}]}) | |
| payload = { | |
| "contents": contents, | |
| "generationConfig": { | |
| "temperature": temperature, | |
| "maxOutputTokens": max_t, | |
| **({"responseMimeType": "application/json"} if json_mode else {}), | |
| }, | |
| } | |
| resp = requests.post(url, json=payload, timeout=120) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| return data["candidates"][0]["content"]["parts"][0]["text"] | |
| raise RuntimeError(f"HTTP {resp.status_code}: {resp.text[:200]}") | |
| # OpenAI-compatible providers (incl. HF router) | |
| from openai import OpenAI | |
| client = OpenAI(api_key=api_key, base_url=cfg["base"], timeout=120.0) | |
| messages = [] | |
| if system: | |
| messages.append({"role": "system", "content": system}) | |
| messages.append({"role": "user", "content": prompt}) | |
| kwargs: Dict[str, Any] = { | |
| "model": cfg["model"], | |
| "messages": messages, | |
| "temperature": temperature, | |
| "max_tokens": max_t, | |
| } | |
| if json_mode: | |
| kwargs["response_format"] = {"type": "json_object"} | |
| resp = client.chat.completions.create(**kwargs) | |
| if resp.choices: | |
| return resp.choices[0].message.content | |
| return None | |
| def _demo_response(self, prompt: str, system: str) -> str: | |
| """Fallback when no API keys are configured. NOT used in production.""" | |
| return ( | |
| "🤖 Demo Mode: Configure LLM API keys (HF_TOKEN recommended) to enable real research.\n\n" | |
| f"Prompt received: {prompt[:200]}" | |
| ) | |
| def _record_success(self, provider: str, content: str): | |
| self._stats["requests"] += 1 | |
| self._stats["successes"] += 1 | |
| self._stats["by_provider"][provider] = self._stats["by_provider"].get(provider, 0) + 1 | |
| self._current = provider | |
| if provider in self._health: | |
| self._health[provider]["failures"] = 0 | |
| def get_stats(self): | |
| return { | |
| **self._stats, | |
| "available_providers": self._available, | |
| "current_provider": self._current, | |
| "last_error": self._last_error, | |
| "cache": self.cache.stats(), | |
| } | |
| def health_check(self): | |
| providers = {} | |
| for name, cfg in self.PROVIDERS.items(): | |
| health = self._health.get(name, {"status": "unavailable", "failures": 0}) | |
| configured = name in self._available | |
| failures = health.get("failures", 0) | |
| status = "healthy" if failures == 0 else "degraded" if failures < 5 else "unavailable" | |
| providers[cfg["name"]] = { | |
| "configured": configured, | |
| "status": status, | |
| "priority": cfg["priority"], | |
| "is_free": True, | |
| } | |
| return { | |
| "providers": providers, | |
| "active_provider": self._current, | |
| "total_available": len([p for p in self._available if p != "demo"]), | |
| } | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # JSON PARSING — robust extraction of JSON from LLM output | |
| # ═══════════════════════════════════════════════════════════════════ | |
| def parse_json_response(text: str) -> Tuple[Optional[Dict], bool]: | |
| """Parse JSON from an LLM response, handling markdown and other wrappers. | |
| Returns (parsed_dict_or_None, parse_error_bool).""" | |
| if not text: | |
| return None, True | |
| content = text.strip() | |
| # Strip code fences | |
| for marker in ["```json", "```JSON", "```yaml", "```"]: | |
| if marker in content: | |
| parts = content.split(marker) | |
| if len(parts) >= 3: | |
| content = parts[1] | |
| else: | |
| content = parts[1] if len(parts) >= 2 else content.replace(marker, "") | |
| content = content.strip() | |
| break | |
| # Try direct parse | |
| try: | |
| return json.loads(content), False | |
| except json.JSONDecodeError: | |
| pass | |
| # Try to find a JSON object | |
| json_match = re.search(r"\{[\s\S]*\}", content) | |
| if json_match: | |
| candidate = json_match.group(0) | |
| try: | |
| return json.loads(candidate), False | |
| except json.JSONDecodeError: | |
| pass | |
| return None, True | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # KNOWLEDGE GRAPH | |
| # ═══════════════════════════════════════════════════════════════════ | |
| class KnowledgeGraph: | |
| def __init__(self, path: str = "knowledge_base/knowledge_graph.json"): | |
| self.path = path | |
| self.graph: Dict[str, Any] = { | |
| "entities": {}, | |
| "relationships": [], | |
| "metadata": {"version": "5.0", "created": datetime.now().isoformat()}, | |
| } | |
| self._lock = threading.Lock() | |
| self._load() | |
| def _load(self): | |
| if os.path.exists(self.path): | |
| try: | |
| with open(self.path, "r", encoding="utf-8") as f: | |
| self.graph = json.load(f) | |
| except Exception as e: | |
| logger.warning(f"KG load failed: {e}") | |
| else: | |
| os.makedirs(os.path.dirname(self.path), exist_ok=True) | |
| self._save() | |
| def _save(self): | |
| os.makedirs(os.path.dirname(self.path), exist_ok=True) | |
| try: | |
| tmp = self.path + ".tmp" | |
| with open(tmp, "w", encoding="utf-8") as f: | |
| json.dump(self.graph, f, ensure_ascii=False, indent=4) | |
| os.replace(tmp, self.path) | |
| except Exception as e: | |
| logger.error(f"KG save failed: {e}") | |
| def add(self, question: str, answer: str, metadata: dict) -> str: | |
| """Add a knowledge entity. Returns the entity ID.""" | |
| eid = hashlib.sha256(question.encode()).hexdigest()[:16] | |
| with self._lock: | |
| self.graph["entities"][eid] = { | |
| "id": eid, | |
| "question": question, | |
| "answer": answer, | |
| "metadata": metadata, | |
| "created": datetime.now().isoformat(), | |
| } | |
| self._save() | |
| return eid | |
| def search(self, query: str, limit: int = 10) -> List[Dict]: | |
| """Search the knowledge graph. Returns up to `limit` matches with relevance score.""" | |
| q = query.lower() | |
| tokens = [t for t in q.split() if len(t) > 2] | |
| results = [] | |
| for eid, e in self.graph["entities"].items(): | |
| question_l = e.get("question", "").lower() | |
| answer_l = e.get("answer", "").lower() | |
| metadata = json.dumps(e.get("metadata", {})).lower() | |
| score = 0 | |
| if q in question_l: | |
| score += 50 | |
| if q in answer_l: | |
| score += 20 | |
| for token in tokens: | |
| if token in question_l: | |
| score += 10 | |
| if token in answer_l: | |
| score += 3 | |
| if score > 0: | |
| results.append({**e, "_score": score}) | |
| results.sort(key=lambda x: x["_score"], reverse=True) | |
| return results[:limit] | |
| def get_stats(self) -> Dict[str, Any]: | |
| entities = self.graph.get("entities", {}) | |
| if not entities: | |
| return {"total_entities": 0, "total_relationships": 0, "avg_accuracy": 0, "avg_novelty": 0} | |
| accs = [] | |
| novs = [] | |
| for e in entities.values(): | |
| m = e.get("metadata", {}) | |
| if isinstance(m.get("accuracy"), (int, float)): | |
| accs.append(m["accuracy"]) | |
| if isinstance(m.get("novelty"), (int, float)): | |
| novs.append(m["novelty"]) | |
| return { | |
| "total_entities": len(entities), | |
| "total_relationships": len(self.graph.get("relationships", [])), | |
| "avg_accuracy": sum(accs) / len(accs) if accs else 0, | |
| "avg_novelty": sum(novs) / len(novs) if novs else 0, | |
| } | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # BLOCKCHAIN LEDGER — Real SHA-3 PoW mining | |
| # ═══════════════════════════════════════════════════════════════════ | |
| class VirtualLedger: | |
| """Virtual blockchain ledger with real SHA-3-256 Proof-of-Work mining.""" | |
| GENESIS = "0x0000000000000000000000000000000000000000" | |
| SYSTEM = "0xNuclearIntelligenceSystem" | |
| NES_PER_MINT = 1.0 | |
| def __init__(self, path: str = "knowledge_base/virtual_ledger.json", difficulty: int = 3): | |
| self.path = path | |
| self.difficulty = difficulty | |
| self.chain: List[Dict] = [] | |
| self.pending: List[Dict] = [] | |
| self.nes_supply: float = 0.0 | |
| self.total_transactions: int = 0 | |
| self.stats = {"total_mining_time": 0.0, "blocks_mined": 0, "total_hashes": 0} | |
| self._lock = threading.Lock() | |
| self._load() | |
| # ---- persistence ---- | |
| def _load(self): | |
| if os.path.exists(self.path): | |
| try: | |
| with open(self.path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| self.chain = data.get("chain", []) | |
| self.nes_supply = data.get("nes_supply", 0.0) | |
| self.total_transactions = data.get("total_transactions", 0) | |
| self.difficulty = data.get("difficulty", 3) | |
| # Verify chain on load; if invalid, reset | |
| if self.chain and not self.is_valid(): | |
| logger.warning("Stored chain failed verification, rebuilding genesis") | |
| self.chain = [] | |
| except Exception as e: | |
| logger.warning(f"Ledger load failed: {e}") | |
| if not self.chain: | |
| self._create_genesis() | |
| self._save() | |
| def _save(self): | |
| os.makedirs(os.path.dirname(self.path), exist_ok=True) | |
| try: | |
| tmp = self.path + ".tmp" | |
| with open(tmp, "w", encoding="utf-8") as f: | |
| json.dump( | |
| { | |
| "chain": self.chain, | |
| "nes_supply": self.nes_supply, | |
| "total_transactions": self.total_transactions, | |
| "difficulty": self.difficulty, | |
| "saved_at": datetime.now().isoformat(), | |
| }, | |
| f, | |
| ensure_ascii=False, | |
| indent=4, | |
| ) | |
| os.replace(tmp, self.path) | |
| except Exception as e: | |
| logger.error(f"Ledger save failed: {e}") | |
| # ---- genesis & blocks ---- | |
| def _create_genesis(self): | |
| genesis = { | |
| "index": 0, | |
| "timestamp": datetime.now().isoformat(), | |
| "transactions": [ | |
| { | |
| "tx_id": hashlib.sha3_256(b"genesis-nuclear-intelligence").hexdigest()[:24], | |
| "sender": self.GENESIS, | |
| "recipient": self.SYSTEM, | |
| "amount": 0.0, | |
| "timestamp": datetime.now().isoformat(), | |
| "metadata": { | |
| "type": "genesis", | |
| "version": "5.0", | |
| "note": "Nuclear Intelligence Genesis Block", | |
| }, | |
| } | |
| ], | |
| "prev_hash": "0" * 64, | |
| "nonce": 0, | |
| "difficulty": 1, | |
| } | |
| # Mine genesis with low difficulty | |
| genesis = self._mine_block_dict(genesis, target_difficulty=1, max_attempts=200000) | |
| self.chain.append(genesis) | |
| logger.info(f"🧬 Genesis block: {genesis['hash'][:16]}... (nonce={genesis['nonce']})") | |
| def _hash_block(self, block: Dict) -> str: | |
| payload = { | |
| "index": block["index"], | |
| "timestamp": block["timestamp"], | |
| "transactions": block["transactions"], | |
| "prev_hash": block["prev_hash"], | |
| "nonce": block["nonce"], | |
| "difficulty": block.get("difficulty", self.difficulty), | |
| } | |
| return hashlib.sha3_256(json.dumps(payload, sort_keys=True).encode()).hexdigest() | |
| def _mine_block_dict(self, block: Dict, target_difficulty: int, max_attempts: int = 500000) -> Dict: | |
| """Real PoW mining. Tries nonces until hash starts with N zeros.""" | |
| target = "0" * target_difficulty | |
| for nonce in range(max_attempts): | |
| block["nonce"] = nonce | |
| block["difficulty"] = target_difficulty | |
| h = self._hash_block(block) | |
| if h.startswith(target): | |
| block["hash"] = h | |
| return block | |
| # If we can't find, lower difficulty | |
| block["difficulty"] = 1 | |
| block["nonce"] = 0 | |
| block["hash"] = self._hash_block(block) | |
| return block | |
| # ---- transactions ---- | |
| def mint(self, metadata: dict) -> str: | |
| """Mint a new NES token. Returns the tx_id.""" | |
| with self._lock: | |
| tx_id = hashlib.sha3_256( | |
| f"{datetime.now().isoformat()}-{random.random()}".encode() | |
| ).hexdigest()[:24] | |
| tx = { | |
| "tx_id": tx_id, | |
| "sender": self.GENESIS, | |
| "recipient": self.SYSTEM, | |
| "amount": self.NES_PER_MINT, | |
| "timestamp": datetime.now().isoformat(), | |
| "metadata": {**metadata, "type": "nes_mint", "version": "5.0"}, | |
| } | |
| self.pending.append(tx) | |
| self.total_transactions += 1 | |
| last = self.chain[-1] | |
| new_block = { | |
| "index": last["index"] + 1, | |
| "timestamp": datetime.now().isoformat(), | |
| "transactions": list(self.pending), | |
| "prev_hash": last["hash"], | |
| "nonce": 0, | |
| "difficulty": self.difficulty, | |
| } | |
| self.pending = [] | |
| start = time.time() | |
| new_block = self._mine_block_dict(new_block, self.difficulty) | |
| mining_time = time.time() - start | |
| self.stats["total_mining_time"] += mining_time | |
| self.stats["blocks_mined"] += 1 | |
| self.stats["total_hashes"] += new_block["nonce"] | |
| self.chain.append(new_block) | |
| self.nes_supply += self.NES_PER_MINT | |
| self._save() | |
| logger.info( | |
| f"⛏️ Block #{new_block['index']} mined | " | |
| f"hash={new_block['hash'][:16]} nonce={new_block['nonce']} " | |
| f"time={mining_time:.2f}s diff={self.difficulty}" | |
| ) | |
| return tx_id | |
| def is_valid(self) -> bool: | |
| for i in range(1, len(self.chain)): | |
| current = self.chain[i] | |
| previous = self.chain[i - 1] | |
| if current["prev_hash"] != previous["hash"]: | |
| return False | |
| if not current["hash"].startswith("0" * current["difficulty"]): | |
| return False | |
| if self._hash_block(current) != current["hash"]: | |
| return False | |
| return True | |
| def get_stats(self) -> Dict[str, Any]: | |
| return { | |
| "chain_length": len(self.chain), | |
| "nes_supply": self.nes_supply, | |
| "total_transactions": self.total_transactions, | |
| "difficulty": self.difficulty, | |
| "chain_valid": self.is_valid(), | |
| "latest_hash": self.chain[-1]["hash"][:16] + "..." if self.chain else "N/A", | |
| "genesis_hash": self.chain[0]["hash"][:16] + "..." if self.chain else "N/A", | |
| "total_mining_time": f"{self.stats['total_mining_time']:.1f}s", | |
| "blocks_mined": self.stats["blocks_mined"], | |
| } | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # NUCLEAR INTELLIGENCE CORE — Real LLM-driven pipeline | |
| # ═══════════════════════════════════════════════════════════════════ | |
| NUCLEAR_CATEGORIES = [ | |
| "Physics", "Engineering", "Safety", "Economics", "Fusion", | |
| "Chemistry", "Materials", "Medicine", "Waste", "AI-Nuclear", | |
| "Fuel Cycle", "Reactor Design", "Plasma Physics", "Neutronics", | |
| "Thermal Hydraulics", "Materials Science", "Policy", "Regulation", | |
| ] | |
| QUESTION_GEN_SYSTEM = """You are the Nuclear Intelligence Architect — an elite AI researcher specializing in nuclear energy, reactor engineering, fusion science, nuclear safety, and energy economics. | |
| Generate ONE cutting-edge, high-impact research question in the nuclear energy domain that is NOT a rehash of common introductory topics. Push toward emerging areas: advanced reactors (SMR, Gen IV, MSR), fusion breakthroughs, AI-assisted design, novel fuel cycles, waste transmutation, nuclear medicine advances, and safety systems. | |
| Return ONLY valid JSON (no markdown, no commentary) in this exact shape: | |
| { | |
| "question": "the research question text", | |
| "category": "one of: Physics|Engineering|Safety|Economics|Fusion|Chemistry|Materials|Medicine|Waste|AI-Nuclear|Fuel Cycle|Reactor Design|Plasma Physics|Neutronics|Thermal Hydraulics|Materials Science|Policy|Regulation", | |
| "difficulty": 1-10, | |
| "keywords": ["keyword1", "keyword2", "keyword3", "keyword4", "keyword5"] | |
| }""" | |
| RESEARCHER_SYSTEM = """You are a senior nuclear scientist with deep expertise across nuclear physics, reactor engineering, fusion research, and nuclear economics. Provide a rigorous, peer-review-level analysis. | |
| Return ONLY valid JSON (no markdown, no commentary) in this exact shape: | |
| { | |
| "answer": "comprehensive scientific answer (500-1500 words) with technical depth, mechanisms, equations where appropriate, and practical examples", | |
| "citations": ["source or reference 1", "source or reference 2", "source or reference 3"], | |
| "novelty_score": 0-100, | |
| "accuracy_score": 0-100, | |
| "sources": [ | |
| {"title": "...", "url": "...", "type": "arxiv|web|paper|database"} | |
| ] | |
| } | |
| The novelty_score and accuracy_score are YOUR self-assessment of the answer you just wrote.""" | |
| EVALUATOR_SYSTEM = """You are an independent scientific auditor. Evaluate the given nuclear research Q&A on multiple dimensions with strict scoring. | |
| Score guidelines: | |
| - scientific_accuracy: how factually correct the answer is (0-100). 100 = textbook-perfect, 70 = mostly correct with minor errors, <50 = factual problems. | |
| - novelty_score: how novel or non-obvious the insights are (0-100). 90+ = new research-level insight, 70+ = useful synthesis, <50 = common knowledge. | |
| - usefulness_score: practical value to the nuclear community (0-100). | |
| - completeness: how thoroughly the question is answered (0-100). | |
| - self_consistency_check: true if the answer is internally consistent, false if it contradicts itself. | |
| Be strict but fair. Return ONLY valid JSON: | |
| { | |
| "scientific_accuracy": 0-100, | |
| "novelty_score": 0-100, | |
| "usefulness_score": 0-100, | |
| "completeness": 0-100, | |
| "self_consistency_check": true, | |
| "justification": "brief reasoning for the scores" | |
| }""" | |
| class NuclearIntelligenceCore: | |
| """Real LLM-driven research-to-tokenization pipeline.""" | |
| def __init__(self): | |
| self.llm = LLMEngine() | |
| self.kg = KnowledgeGraph() | |
| self.ledger = VirtualLedger() | |
| self.stats = { | |
| "questions": 0, | |
| "researches": 0, | |
| "evaluations": 0, | |
| "tokens_minted": 0, | |
| "rejected": 0, | |
| "errors": 0, | |
| } | |
| self.history: List[Dict] = [] | |
| self._lock = threading.Lock() | |
| # Thresholds for minting | |
| self.thresholds = { | |
| "min_accuracy": float(os.getenv("MIN_ACCURACY", "70")), | |
| "min_novelty": float(os.getenv("MIN_NOVELTY", "60")), | |
| "min_usefulness": float(os.getenv("MIN_USEFULNESS", "60")), | |
| "min_overall": float(os.getenv("MIN_OVERALL", "65")), | |
| } | |
| def generate_question(self) -> Dict[str, Any]: | |
| """Generate a research question using the LLM, with a real fallback if it fails.""" | |
| # Build a hint from existing KG to avoid repetition | |
| kg_entities = self.kg.graph.get("entities", {}) | |
| existing_topics = [e["question"][:80] for e in list(kg_entities.values())[-10:]] | |
| hint = "Recent topics (avoid these exact questions):\n" + "\n".join(f"- {t}" for t in existing_topics) if existing_topics else "No prior topics." | |
| prompt = f"Generate a new research question in the nuclear energy domain.\n\n{hint}" | |
| result = self.llm.chat( | |
| prompt=prompt, | |
| system=QUESTION_GEN_SYSTEM, | |
| temperature=0.9, | |
| max_tokens=512, | |
| json_mode=True, | |
| ) | |
| parsed, parse_err = parse_json_response(result) if result else (None, True) | |
| if parsed and parsed.get("question"): | |
| cat = parsed.get("category", "Physics") | |
| if cat not in NUCLEAR_CATEGORIES: | |
| cat = "Physics" | |
| with self._lock: | |
| self.stats["questions"] += 1 | |
| return { | |
| "question": str(parsed.get("question", "")).strip(), | |
| "category": cat, | |
| "difficulty": int(parsed.get("difficulty", 5)), | |
| "keywords": list(parsed.get("keywords", []))[:8], | |
| "source": "llm", | |
| } | |
| # Real fallback: domain-aware prompt template, NOT a fixed pool | |
| templates = [ | |
| "What are the current technical barriers to deploying {tech} at commercial scale, and what recent advances address them?", | |
| "How do {aspect} trade-offs in {system} impact overall reactor performance and safety?", | |
| "What is the state of the art in {domain}, and what breakthroughs in the past 2 years have changed the field?", | |
| "Compare and contrast {alt1} versus {alt2} for {application}: which is more promising and why?", | |
| "What novel materials or computational methods are enabling new capabilities in {area}?", | |
| ] | |
| topics = { | |
| "tech": ["molten salt reactors", "tokamak fusion devices", "small modular reactors", "traveling wave reactors", "thorium fuel cycles"], | |
| "aspect": ["neutron economy", "thermal-hydraulic", "fuel burnup", "activation products", "tritium breeding"], | |
| "system": ["SFR", "MSR", "LFR", "HTR", "tokamak", "stellarator"], | |
| "domain": ["inertial confinement fusion", "magnetic confinement fusion", "plasma-facing materials", "nuclear waste transmutation", "accelerator-driven systems"], | |
| "alt1": ["molten salt coolant", "sodium coolant", "lead-bismuth", "gas cooling", "supercritical CO2"], | |
| "alt2": ["pressurized water", "boiling water", "heavy water", "organic cooling"], | |
| "application": ["next-generation reactors", "space propulsion", "process heat", "hydrogen production"], | |
| "area": ["fusion blankets", "nuclear data analysis", "reactor digital twins", "fuel fabrication", "decommissioning robotics"], | |
| } | |
| template = random.choice(templates) | |
| try: | |
| question = template.format(**{k: random.choice(v) for k, v in topics.items() if k in template}) | |
| except (KeyError, IndexError): | |
| question = "What are the most promising emerging nuclear energy technologies, and what is the realistic timeline for commercial deployment?" | |
| with self._lock: | |
| self.stats["questions"] += 1 | |
| return { | |
| "question": question, | |
| "category": random.choice(NUCLEAR_CATEGORIES), | |
| "difficulty": random.randint(6, 9), | |
| "keywords": question.split()[:5], | |
| "source": "template_fallback", | |
| } | |
| def research(self, question: Dict) -> Dict[str, Any]: | |
| """Conduct research using the LLM.""" | |
| with self._lock: | |
| self.stats["researches"] += 1 | |
| prompt = ( | |
| f"Research Question: {question['question']}\n" | |
| f"Category: {question['category']}\n" | |
| f"Difficulty: {question['difficulty']}/10\n" | |
| f"Keywords: {', '.join(question.get('keywords', []))}\n\n" | |
| "Provide a comprehensive, technically rigorous answer with mechanisms, equations, " | |
| "and concrete examples. Cite real sources where possible." | |
| ) | |
| result = self.llm.chat( | |
| prompt=prompt, | |
| system=RESEARCHER_SYSTEM, | |
| temperature=0.6, | |
| max_tokens=2500, | |
| json_mode=True, | |
| ) | |
| parsed, parse_err = parse_json_response(result) if result else (None, True) | |
| if parsed and parsed.get("answer"): | |
| return { | |
| "answer": str(parsed["answer"]), | |
| "citations": list(parsed.get("citations", []))[:8], | |
| "accuracy": float(parsed.get("accuracy_score", 50)), | |
| "novelty": float(parsed.get("novelty_score", 50)), | |
| "usefulness": float(parsed.get("usefulness_score", 50)), | |
| "sources": list(parsed.get("sources", []))[:8], | |
| "provider": self.llm._current or "unknown", | |
| "parse_error": parse_err, | |
| } | |
| # Real fallback: acknowledge we couldn't research this | |
| return { | |
| "answer": ( | |
| f"[Research unavailable] The LLM could not produce a verified answer for: " | |
| f"{question['question']}. This question has been logged for later review." | |
| ), | |
| "citations": [], | |
| "accuracy": 0.0, | |
| "novelty": 0.0, | |
| "usefulness": 0.0, | |
| "sources": [], | |
| "provider": "unavailable", | |
| "parse_error": True, | |
| } | |
| def evaluate(self, question: Dict, research_result: Dict) -> Dict[str, Any]: | |
| """Multi-layer evaluation by an independent LLM call.""" | |
| with self._lock: | |
| self.stats["evaluations"] += 1 | |
| # If research itself failed, short-circuit | |
| if research_result.get("parse_error") or research_result["accuracy"] == 0: | |
| return { | |
| "scientific_accuracy": 0.0, | |
| "novelty_score": 0.0, | |
| "usefulness_score": 0.0, | |
| "completeness": 0.0, | |
| "self_consistency_check": False, | |
| "justification": "Research output unavailable or unparseable.", | |
| } | |
| prompt = ( | |
| f"Question: {question['question']}\n\n" | |
| f"Answer: {research_result['answer'][:3000]}\n\n" | |
| "Evaluate this answer on scientific accuracy, novelty, usefulness, completeness, and self-consistency." | |
| ) | |
| result = self.llm.chat( | |
| prompt=prompt, | |
| system=EVALUATOR_SYSTEM, | |
| temperature=0.2, | |
| max_tokens=800, | |
| json_mode=True, | |
| ) | |
| parsed, parse_err = parse_json_response(result) if result else (None, True) | |
| if parsed and not parse_err: | |
| return { | |
| "scientific_accuracy": float(parsed.get("scientific_accuracy", 0)), | |
| "novelty_score": float(parsed.get("novelty_score", 0)), | |
| "usefulness_score": float(parsed.get("usefulness_score", 0)), | |
| "completeness": float(parsed.get("completeness", 0)), | |
| "self_consistency_check": bool(parsed.get("self_consistency_check", False)), | |
| "justification": str(parsed.get("justification", ""))[:500], | |
| } | |
| return { | |
| "scientific_accuracy": 0.0, | |
| "novelty_score": 0.0, | |
| "usefulness_score": 0.0, | |
| "completeness": 0.0, | |
| "self_consistency_check": False, | |
| "justification": "Evaluation failed (LLM unavailable or invalid output).", | |
| } | |
| def should_mint(self, evaluation: Dict) -> Tuple[bool, float]: | |
| overall = ( | |
| evaluation["scientific_accuracy"] * 0.45 | |
| + evaluation["novelty_score"] * 0.25 | |
| + evaluation["usefulness_score"] * 0.20 | |
| + evaluation["completeness"] * 0.10 | |
| ) | |
| ok = ( | |
| evaluation["scientific_accuracy"] >= self.thresholds["min_accuracy"] | |
| and evaluation["novelty_score"] >= self.thresholds["min_novelty"] | |
| and evaluation["usefulness_score"] >= self.thresholds["min_usefulness"] | |
| and overall >= self.thresholds["min_overall"] | |
| and evaluation["self_consistency_check"] | |
| ) | |
| return ok, overall | |
| def run_cycle(self, dev_mode: bool = False) -> Dict[str, Any]: | |
| """Execute one full research → evaluate → mint pipeline.""" | |
| start = time.time() | |
| cycle_id = hashlib.sha256(f"{datetime.now().isoformat()}-{random.random()}".encode()).hexdigest()[:16] | |
| try: | |
| question = self.generate_question() | |
| research = self.research(question) | |
| evaluation = self.evaluate(question, research) | |
| ok, overall = self.should_mint(evaluation) | |
| tx_hash = None | |
| if ok: | |
| metadata = { | |
| "cycle_id": cycle_id, | |
| "question_id": hashlib.sha256(question["question"].encode()).hexdigest()[:16], | |
| "category": question["category"], | |
| "difficulty": question["difficulty"], | |
| "provider": research["provider"], | |
| "overall_score": round(overall, 2), | |
| "accuracy": evaluation["scientific_accuracy"], | |
| "novelty": evaluation["novelty_score"], | |
| "usefulness": evaluation["usefulness_score"], | |
| "completeness": evaluation["completeness"], | |
| "justification": evaluation["justification"][:200], | |
| "question_text": question["question"][:300], | |
| } | |
| self.kg.add(question["question"], research["answer"], metadata) | |
| tx_hash = self.ledger.mint(metadata) | |
| with self._lock: | |
| self.stats["tokens_minted"] += 1 | |
| else: | |
| with self._lock: | |
| self.stats["rejected"] += 1 | |
| result = { | |
| "cycle_id": cycle_id, | |
| "timestamp": datetime.now().isoformat(), | |
| "question": question, | |
| "research": research, | |
| "evaluation": evaluation, | |
| "overall": round(overall, 2), | |
| "minted": ok, | |
| "tx_hash": tx_hash, | |
| "execution_time_seconds": round(time.time() - start, 2), | |
| } | |
| with self._lock: | |
| self.history.append(result) | |
| # Keep history bounded | |
| if len(self.history) > 500: | |
| self.history = self.history[-500:] | |
| return result | |
| except Exception as e: | |
| with self._lock: | |
| self.stats["errors"] += 1 | |
| logger.error(f"Cycle error: {e}") | |
| return { | |
| "cycle_id": cycle_id, | |
| "timestamp": datetime.now().isoformat(), | |
| "error": str(e), | |
| "minted": False, | |
| "execution_time_seconds": round(time.time() - start, 2), | |
| } | |
| def ask_question(self, question: str, dev_mode: bool = False) -> Dict[str, Any]: | |
| """Direct Q&A interface (no minting).""" | |
| q = { | |
| "question": question, | |
| "category": "User Query", | |
| "difficulty": 5, | |
| "keywords": question.split()[:5], | |
| } | |
| research = self.research(q) | |
| evaluation = self.evaluate(q, research) | |
| return { | |
| "question": question, | |
| "answer": research["answer"], | |
| "citations": research["citations"], | |
| "evaluation": evaluation, | |
| "provider": research["provider"], | |
| } | |
| def get_stats(self) -> Dict[str, Any]: | |
| return { | |
| **self.stats, | |
| "llm_stats": self.llm.get_stats(), | |
| "kg_stats": self.kg.get_stats(), | |
| "ledger_stats": self.ledger.get_stats(), | |
| "thresholds": self.thresholds, | |
| } | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # HUGGINGFACE DATASET SYNC — public, readable, no auth needed to read | |
| # ═══════════════════════════════════════════════════════════════════ | |
| def sync_to_hf_dataset(report: Dict) -> bool: | |
| """Sync a cycle report to the public HF dataset.""" | |
| try: | |
| from huggingface_hub import HfApi, create_repo | |
| hf_token = os.getenv("HF_TOKEN", "").strip() | |
| if not hf_token or not hf_token.startswith("hf_"): | |
| return False | |
| api = HfApi(token=hf_token) | |
| dataset_repo = os.getenv("HF_DATASET_REPO", "Qalam/nuclear-intelligence-dataset") | |
| try: | |
| create_repo(repo_id=dataset_repo, repo_type="dataset", token=hf_token, exist_ok=True, private=False) | |
| except Exception: | |
| pass | |
| # Save to a temp file | |
| os.makedirs("reports", exist_ok=True) | |
| fname = f"cycle_{report['cycle_id']}.json" | |
| local_path = os.path.join("reports", fname) | |
| with open(local_path, "w", encoding="utf-8") as f: | |
| json.dump(report, f, ensure_ascii=False, indent=4) | |
| api.upload_file( | |
| path_or_fileobj=local_path, | |
| path_in_repo=f"reports/{fname}", | |
| repo_id=dataset_repo, | |
| repo_type="dataset", | |
| commit_message=f"Auto: NES cycle {report['cycle_id']}", | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.warning(f"HF dataset sync failed: {e}") | |
| return False | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # INITIALIZE CORE | |
| # ═══════════════════════════════════════════════════════════════════ | |
| core: Optional[NuclearIntelligenceCore] = None | |
| if gradio_available: | |
| try: | |
| core = NuclearIntelligenceCore() | |
| real_providers = [p for p in core.llm._available if p != "demo"] | |
| logger.info(f"⚛️ Nuclear Intelligence v5.0 initialized") | |
| logger.info(f" LLM providers: {real_providers or 'demo (no API keys)'}") | |
| logger.info(f" NES supply: {core.ledger.nes_supply}") | |
| except Exception as e: | |
| logger.error(f"Initialization error: {e}") | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # UI FUNCTIONS | |
| # ═══════════════════════════════════════════════════════════════════ | |
| def get_llm_status(): | |
| if not core: | |
| return "**⚠️ Core initializing...**" | |
| stats = core.llm.get_stats() | |
| health = core.llm.health_check() | |
| lines = [ | |
| "**🔌 LLM Engine Status**", | |
| f"Active: `{stats.get('current_provider', 'none')}`", | |
| f"Available: {stats.get('available_providers', [])}", | |
| f"Total Requests: {stats.get('requests', 0):,}", | |
| f"Successes: {stats.get('successes', 0):,}", | |
| f"Failures: {stats.get('failures', 0):,}", | |
| f"Cache Hit: {stats.get('cache', {}).get('hit_rate', 'N/A')}", | |
| ] | |
| if stats.get("last_error"): | |
| lines.append(f"\n⚠️ Last error: `{stats['last_error'][:100]}`") | |
| return "\n".join(lines) | |
| def get_system_stats(): | |
| if not core: | |
| return "**⚠️ Initializing...**" | |
| s = core.get_stats() | |
| return f"""## ⚛️ Nuclear Intelligence v5.0 | |
| **Pipeline:** | |
| - Questions generated: {s['questions']:,} | |
| - Researches conducted: {s['researches']:,} | |
| - Evaluations done: {s['evaluations']:,} | |
| - 🪙 Tokens minted: {s['tokens_minted']:,} | |
| - ❌ Rejected: {s['rejected']:,} | |
| - Errors: {s['errors']:,} | |
| **Blockchain:** | |
| - Chain: {s['ledger_stats']['chain_length']} blocks | |
| - NES supply: {s['ledger_stats']['nes_supply']:,.1f} | |
| - Valid: {'✅' if s['ledger_stats']['chain_valid'] else '❌'} | |
| - Mining time: {s['ledger_stats']['total_mining_time']} | |
| **Knowledge Graph:** | |
| - Entities: {s['kg_stats']['total_entities']} | |
| - Avg accuracy: {s['kg_stats']['avg_accuracy']:.1f}% | |
| - Avg novelty: {s['kg_stats']['avg_novelty']:.1f}% | |
| **Mint thresholds:** acc≥{s['thresholds']['min_accuracy']} nov≥{s['thresholds']['min_novelty']} use≥{s['thresholds']['min_usefulness']} overall≥{s['thresholds']['min_overall']} | |
| """ | |
| def get_chain_df(): | |
| if not core: | |
| return pd.DataFrame([{"Status": "Initializing..."}]) | |
| data = [] | |
| for block in reversed(core.ledger.chain[-50:]): | |
| for tx in block.get("transactions", []): | |
| meta = tx.get("metadata", {}) | |
| data.append({ | |
| "Block": block["index"], | |
| "Time": block["timestamp"][:19], | |
| "TX ID": tx.get("tx_id", "")[:16], | |
| "Amount (NES)": tx.get("amount", 0), | |
| "Category": meta.get("category", "N/A"), | |
| "Score": meta.get("overall_score", "N/A"), | |
| "Provider": meta.get("provider", "N/A"), | |
| }) | |
| return pd.DataFrame(data) if data else pd.DataFrame([{"Message": "No transactions yet"}]) | |
| def get_entities_df(): | |
| if not core: | |
| return pd.DataFrame([{"Status": "Initializing..."}]) | |
| data = [] | |
| for eid, e in list(core.kg.graph.get("entities", {}).items())[-50:][::-1]: | |
| m = e.get("metadata", {}) | |
| data.append({ | |
| "ID": eid[:12], | |
| "Question": e.get("question", "")[:60], | |
| "Category": m.get("category", "N/A"), | |
| "Acc": f"{m.get('accuracy', 0):.0f}", | |
| "Nov": f"{m.get('novelty', 0):.0f}", | |
| "Created": e.get("created", "")[:10], | |
| }) | |
| return pd.DataFrame(data) if data else pd.DataFrame([{"Message": "No entities yet"}]) | |
| def get_history_df(): | |
| if not core: | |
| return pd.DataFrame([{"Message": "No cycles yet"}]) | |
| data = [] | |
| for c in core.history[-50:][::-1]: | |
| data.append({ | |
| "ID": c["cycle_id"][:12], | |
| "Time": c["timestamp"][:19], | |
| "Status": "✅ Minted" if c.get("minted") else "❌ Rejected" if c.get("evaluation") else "⚠️ Error", | |
| "Overall": f"{c.get('overall', 0):.1f}%", | |
| "Provider": c.get("research", {}).get("provider", "N/A"), | |
| }) | |
| return pd.DataFrame(data) if data else pd.DataFrame([{"Message": "No cycles yet"}]) | |
| def get_score_chart(): | |
| if not core or not core.history: | |
| return None | |
| rows = [] | |
| for c in core.history[-30:]: | |
| ev = c.get("evaluation", {}) | |
| if ev: | |
| rows.append({ | |
| "Cycle": c["timestamp"][:10], | |
| "Accuracy": ev.get("scientific_accuracy", 0), | |
| "Novelty": ev.get("novelty_score", 0), | |
| "Usefulness": ev.get("usefulness_score", 0), | |
| "Overall": c.get("overall", 0), | |
| }) | |
| if not rows: | |
| return None | |
| df = pd.DataFrame(rows) | |
| fig = px.line(df, x="Cycle", y=["Accuracy", "Novelty", "Usefulness", "Overall"], | |
| title="Score Trends (Last 30 Cycles)") | |
| fig.update_layout(height=350, margin=dict(l=20, r=20, t=50, b=20)) | |
| return fig | |
| def get_category_chart(): | |
| if not core: | |
| return None | |
| cat_counts: Dict[str, int] = {} | |
| for e in core.kg.graph.get("entities", {}).values(): | |
| cat = e.get("metadata", {}).get("category", "Unknown") | |
| cat_counts[cat] = cat_counts.get(cat, 0) + 1 | |
| if not cat_counts: | |
| return None | |
| df = pd.DataFrame(list(cat_counts.items()), columns=["Category", "Count"]) | |
| fig = px.pie(df, values="Count", names="Category", title="Knowledge by Category") | |
| fig.update_layout(height=350, margin=dict(l=20, r=20, t=50, b=20)) | |
| return fig | |
| # ─── Action functions ───────────────────────────────────────────── | |
| def discover_question(): | |
| """Generate one research question without committing to a full cycle. | |
| Used by the 🔍 Discover Question button.""" | |
| if not core: | |
| return "❌ System initializing..." | |
| try: | |
| q = core.generate_question() | |
| return ( | |
| f"## 🔍 Discovered Question\n\n" | |
| f"**Question:** {q.get('question','')}\n\n" | |
| f"- **Category:** `{q.get('category','N/A')}`\n" | |
| f"- **Difficulty:** `{q.get('difficulty','N/A')}/10`\n" | |
| f"- **Keywords:** `{' · '.join(q.get('keywords', []))}`\n" | |
| f"- **Source:** `{q.get('source','?')}`\n\n" | |
| f"_Hit 🚀 Run Research Cycle to research, evaluate, and possibly mint an NES token._" | |
| ) | |
| except Exception as e: | |
| return f"❌ Error: {e}" | |
| def run_cycle(dev_mode=True, sync_hf=True): | |
| if not core: | |
| return "❌ System initializing..." | |
| try: | |
| result = core.run_cycle(dev_mode=dev_mode) | |
| if "error" in result and "question" not in result: | |
| return f"## ⚠️ Cycle Error\n\n```\n{result['error']}\n```" | |
| status = "✅ **MINTED**" if result.get("minted") else "❌ **REJECTED**" | |
| eval_data = result.get("evaluation", {}) | |
| question = result.get("question", {}) | |
| research = result.get("research", {}) | |
| lines = [ | |
| f"## {status}", | |
| f"**Cycle:** `{result['cycle_id'][:16]}`", | |
| f"**Provider:** `{research.get('provider', 'N/A')}`", | |
| f"**Time:** {result.get('execution_time_seconds', 0)}s", | |
| f"**Question source:** `{question.get('source', '?')}`", | |
| "", | |
| "### 📝 Question", | |
| question.get("question", ""), | |
| "", | |
| f"**Category:** `{question.get('category', 'N/A')}` | " | |
| f"**Difficulty:** `{question.get('difficulty', 'N/A')}/10` | " | |
| f"**Keywords:** `{', '.join(question.get('keywords', []))}`", | |
| "", | |
| "### 📊 Evaluation Scores", | |
| f"- 🔬 Accuracy: **{eval_data.get('scientific_accuracy', 0):.1f}%**", | |
| f"- 💡 Novelty: **{eval_data.get('novelty_score', 0):.1f}%**", | |
| f"- 👍 Usefulness: **{eval_data.get('usefulness_score', 0):.1f}%**", | |
| f"- 📏 Completeness: **{eval_data.get('completeness', 0):.1f}%**", | |
| f"- 🎯 Overall: **{result.get('overall', 0):.1f}%**", | |
| ] | |
| if eval_data.get("justification"): | |
| lines.append(f"\n> {eval_data['justification']}") | |
| if result.get("tx_hash"): | |
| lines.append(f"\n**🔗 TX:** `{result['tx_hash'][:24]}...`") | |
| lines.append(f"**🪙 NES supply:** {core.ledger.nes_supply}") | |
| if sync_hf and result.get("minted"): | |
| ok = sync_to_hf_dataset(result) | |
| lines.append(f"\n**📤 HF dataset sync:** {'✅' if ok else '⚠️ skipped/failed'}") | |
| return "\n".join(lines) | |
| except Exception as e: | |
| return f"❌ Error: {e}" | |
| def ask_q(question, dev_mode=False): | |
| if not core: | |
| return "❌ System initializing..." | |
| if not question or len(question.strip()) < 5: | |
| return "❌ Enter a valid question (5+ chars)" | |
| # ── Inline safety guard (mirrors core/safety_guard.py) ─────────── | |
| # Lightweight subset: weapons / enrichment / RDD / cyber. The full | |
| # taxonomy lives in the main app; this is a defensive tripwire. | |
| _danger = re.compile( | |
| r"\b(nuclear\s+(?:bomb|warhead|weapon)\s+(?:design|build|make|diy)|" | |
| r"how\s+to\s+(?:build|make)\s+(?:a\s+)?(?:atom|nuclear|hydrogen)\s+bomb|" | |
| r"clandestine\s+(?:enrichment|centrifuge)|" | |
| r"weapons[-\s]?grade\s+uranium\s+(?:production|route)|" | |
| r"dirty\s+bomb\s+(?:design|build|make|instructions)|" | |
| r"implosion\s+lens\s+design|" | |
| r"smuggl(?:e|ing)\s+(?:heu|weapons[-\s]?grade|plutonium)|" | |
| r"Stuxnet[-\s]?like\s+attack\s+instructions)\b", | |
| re.IGNORECASE, | |
| ) | |
| if _danger.search(question): | |
| return ( | |
| "🛡️ **I can't help with that.**\n\n" | |
| "Your question touches on activity restricted under " | |
| "international non-proliferation norms (NPT, IAEA safeguards, " | |
| "NSG). Sharing actionable detail there would be unsafe.\n\n" | |
| "**What I *can* help with** — the legitimate peaceful-use side — " | |
| "is the IAEA safeguards framework, civilian enrichment under " | |
| "monitoring, and radiation-protection topics. Want me to answer that instead?" | |
| ) | |
| try: | |
| result = core.ask_question(question, dev_mode) | |
| ev = result.get("evaluation", {}) | |
| citations = result.get("citations", []) | |
| output = [ | |
| f"## 🔬 Answer", | |
| f"**Provider:** `{result.get('provider', 'N/A')}`", | |
| "", | |
| f"### {question}", | |
| "", | |
| result.get("answer", ""), | |
| "", | |
| "### 📊 Quality", | |
| f"- Accuracy: **{ev.get('scientific_accuracy', 0):.1f}%**", | |
| f"- Novelty: **{ev.get('novelty_score', 0):.1f}%**", | |
| f"- Usefulness: **{ev.get('usefulness_score', 0):.1f}%**", | |
| f"- Completeness: **{ev.get('completeness', 0):.1f}%**", | |
| ] | |
| if citations: | |
| output.append("\n### 📚 Citations") | |
| for c in citations[:5]: | |
| output.append(f"- {c}") | |
| return "\n".join(output) | |
| except Exception as e: | |
| return f"❌ Error: {e}" | |
| def verify_chain(): | |
| if not core: | |
| return "❌ System not ready" | |
| is_valid = core.ledger.is_valid() | |
| stats = core.ledger.get_stats() | |
| return f"""## ⛓️ Blockchain Verification | |
| **Status:** {'✅ VALID' if is_valid else '❌ INVALID'} | |
| - Chain length: {stats['chain_length']} blocks | |
| - NES supply: {stats['nes_supply']:,.1f} | |
| - Total transactions: {stats['total_transactions']} | |
| - Current difficulty: {stats['difficulty']} | |
| - Latest hash: `{stats['latest_hash']}` | |
| - Genesis hash: `{stats['genesis_hash']}` | |
| - Total mining time: {stats['total_mining_time']} | |
| - Blocks mined: {stats['blocks_mined']} | |
| """ | |
| def search_kg(query, limit=10): | |
| if not core: | |
| return "❌ System not ready" | |
| if not query: | |
| return "❌ Enter search query" | |
| results = core.kg.search(query, int(limit)) | |
| if not results: | |
| return f"🔍 No results for: **{query}**" | |
| output = [f"## 🔍 Results for: **{query}**\n"] | |
| for i, r in enumerate(results, 1): | |
| m = r.get("metadata", {}) | |
| output.append( | |
| f"### {i}. {r.get('question', '')[:80]}...\n" | |
| f"**Category:** `{m.get('category', 'N/A')}` | " | |
| f"**Accuracy:** `{m.get('accuracy', 0):.0f}` | " | |
| f"**Score:** `{r.get('_score', 0):.0f}`\n" | |
| ) | |
| return "\n".join(output) | |
| def export_state(): | |
| if not core: | |
| return "❌ Not ready" | |
| os.makedirs("reports", exist_ok=True) | |
| out = { | |
| "exported_at": datetime.now().isoformat(), | |
| "stats": core.get_stats(), | |
| "ledger": core.ledger.get_stats(), | |
| "kg": core.kg.get_stats(), | |
| "history": core.history[-20:], | |
| } | |
| fname = f"reports/state_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| with open(fname, "w", encoding="utf-8") as f: | |
| json.dump(out, f, ensure_ascii=False, indent=4) | |
| return f"✅ Exported to `{fname}`" | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # GRADIO UI | |
| # ═══════════════════════════════════════════════════════════════════ | |
| CSS = """ | |
| #title { | |
| text-align: center; font-size: 2.5rem; font-weight: 800; | |
| background: linear-gradient(135deg, #00d4ff, #7c3aed, #00ff88); | |
| -webkit-background-clip: text; -webkit-text-fill-color: transparent; | |
| } | |
| .minted { color: #10b981; font-weight: bold; } | |
| .rejected { color: #ef4444; } | |
| """ | |
| demo = None | |
| if gradio_available: | |
| with gr.Blocks(title="Nuclear Intelligence v5.0") as demo: | |
| gr.Markdown("# ⚛️ Nuclear Intelligence v5.0", elem_id="title") | |
| gr.Markdown( | |
| "**Autonomous nuclear energy research → multi-layer evaluation → SHA-3 PoW mining → NES token**\n\n" | |
| "All research is produced by real LLMs. Evaluation is independent. Mining is real Proof-of-Work." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| stats_box = gr.Markdown(get_system_stats) | |
| refresh_stats = gr.Button("🔄 Refresh Stats", variant="secondary") | |
| export_btn = gr.Button("💾 Export State") | |
| export_out = gr.Markdown() | |
| with gr.Accordion("🔌 LLM Engine", open=False): | |
| llm_status = gr.Markdown(get_llm_status) | |
| refresh_llm = gr.Button("📡 Check Providers") | |
| with gr.Column(scale=3): | |
| with gr.Tabs(): | |
| with gr.Tab("🚀 Research Center"): | |
| with gr.Row(): | |
| run_btn = gr.Button("🚀 Run Research Cycle", variant="primary") | |
| discover_btn = gr.Button("🔍 Discover Question", variant="secondary") | |
| dev_chk = gr.Checkbox(label="Developer Mode", value=False) | |
| sync_chk = gr.Checkbox(label="Sync to HF Dataset", value=True) | |
| cycle_out = gr.Markdown("### Click button to start research...") | |
| with gr.Accordion("🔍 Manual Q&A", open=False): | |
| q_input = gr.Textbox( | |
| label="Ask a nuclear question", | |
| placeholder="e.g. How do molten salt reactors handle tritium breeding?", | |
| ) | |
| q_btn = gr.Button("Search & Answer") | |
| q_out = gr.Markdown() | |
| with gr.Tab("⛓️ Blockchain"): | |
| verify_btn = gr.Button("⛓️ Verify Ledger Integrity") | |
| verify_out = gr.Markdown() | |
| gr.Markdown("### Latest Transactions") | |
| chain_table = gr.DataFrame(get_chain_df) | |
| with gr.Tab("🕸️ Knowledge Graph"): | |
| search_input = gr.Textbox(label="Search", placeholder="e.g. fusion, reactor, safety") | |
| limit_input = gr.Slider(label="Limit", minimum=1, maximum=50, value=10, step=1) | |
| search_btn = gr.Button("Search") | |
| search_out = gr.Markdown() | |
| gr.Markdown("### Latest Entities") | |
| entities_table = gr.DataFrame(get_entities_df) | |
| with gr.Tab("📈 Analytics"): | |
| with gr.Row(): | |
| chart1 = gr.Plot(get_category_chart) | |
| chart2 = gr.Plot(get_score_chart) | |
| gr.Markdown("### Recent Cycles") | |
| history_table = gr.DataFrame(get_history_df) | |
| # Event handlers | |
| refresh_stats.click(get_system_stats, outputs=stats_box) | |
| refresh_llm.click(get_llm_status, outputs=llm_status) | |
| run_btn.click(run_cycle, inputs=[dev_chk, sync_chk], outputs=cycle_out) | |
| discover_btn.click(discover_question, outputs=cycle_out) | |
| q_btn.click(ask_q, inputs=[q_input, dev_chk], outputs=q_out) | |
| verify_btn.click(verify_chain, outputs=verify_out) | |
| search_btn.click(search_kg, inputs=[search_input, limit_input], outputs=search_out) | |
| export_btn.click(export_state, outputs=export_out) | |
| # Auto-refresh stats every 30s (gr.Timer was added in 4.40+, guard it) | |
| try: | |
| timer = gr.Timer(30) | |
| timer.tick(get_system_stats, outputs=stats_box) | |
| timer.tick(get_chain_df, outputs=chain_table) | |
| timer.tick(get_entities_df, outputs=entities_table) | |
| timer.tick(get_history_df, outputs=history_table) | |
| except (AttributeError, TypeError): | |
| # Gradio < 4.40: no Timer; auto-refresh via load-event poll instead. | |
| logger.warning("gr.Timer unavailable (Gradio < 4.40) — auto-refresh disabled, use 🔄 buttons.") | |
| # ═══════════════════════════════════════════════════════════════════ | |
| # ALWAYS-ON: background autonomous loop (thread, headless) | |
| # ═══════════════════════════════════════════════════════════════════ | |
| _autonomous_thread_started = False | |
| def _autonomous_loop(): | |
| """Background autonomous research loop. Runs every INTERVAL minutes.""" | |
| interval_seconds = int(os.getenv("OPERATION_LOOP_INTERVAL_MINUTES", "25")) * 60 | |
| logger.info(f"🤖 Autonomous loop started (interval={interval_seconds}s)") | |
| # Initial warm-up delay so Space finishes starting first | |
| time.sleep(30) | |
| while True: | |
| try: | |
| logger.info("🤖 Auto-cycle: starting research cycle") | |
| res = core.run_cycle(dev_mode=False) if core else {"minted": False, "error": "no core"} | |
| if res.get("minted"): | |
| logger.info(f"✅ Auto-cycle: MINTED tx={res.get('tx_hash')} overall={res.get('overall')}") | |
| # Sync to HF dataset & push to GitHub | |
| try: | |
| sync_to_hf_dataset(res) | |
| except Exception as e: | |
| logger.warning(f"HF dataset sync failed: {e}") | |
| else: | |
| logger.info(f"⏭️ Auto-cycle: rejected (overall={res.get('overall', 0)}) — trying again next interval") | |
| except Exception as e: | |
| logger.error(f"Auto-cycle error: {e}") | |
| time.sleep(interval_seconds) | |
| if __name__ == "__main__": | |
| if demo is not None: | |
| # Always-on autonomous loop in background | |
| if os.getenv("AUTO_START_LOOP", "true").lower() == "true" and not _autonomous_thread_started: | |
| try: | |
| t = threading.Thread(target=_autonomous_loop, daemon=True, name="ni-autonomous") | |
| t.start() | |
| _autonomous_thread_started = True | |
| logger.info("✅ Background autonomous loop launched") | |
| except Exception as e: | |
| logger.warning(f"Could not start autonomous loop: {e}") | |
| try: | |
| demo.launch(server_name="0.0.0.0", server_port=PORT, css=CSS) | |
| except TypeError: | |
| # Gradio 6.0+ deprecated css in launch() too; pass only valid kwargs | |
| demo.launch(server_name="0.0.0.0", server_port=PORT) | |
| else: | |
| # Gradio unavailable — keep the Space alive with a tiny health HTTP server | |
| print("⚠️ Gradio not available; cannot launch UI. Starting minimal health server.") | |
| try: | |
| from http.server import HTTPServer, BaseHTTPRequestHandler | |
| import json as _json | |
| class _Health(BaseHTTPRequestHandler): | |
| def log_message(self, *a, **k): | |
| pass | |
| def do_GET(self): | |
| if self.path == "/" or self.path == "/health": | |
| body = _json.dumps({ | |
| "status": "ok", | |
| "name": "Nuclear Intelligence", | |
| "version": "5.0", | |
| "gradio_available": False, | |
| "core_initialized": core is not None, | |
| }).encode() | |
| self.send_response(200) | |
| self.send_header("Content-Type", "application/json") | |
| self.send_header("Content-Length", str(len(body))) | |
| self.end_headers() | |
| self.wfile.write(body) | |
| else: | |
| self.send_response(404) | |
| self.end_headers() | |
| srv = HTTPServer(("0.0.0.0", PORT), _Health) | |
| logger.info(f"Health server on :{PORT}") | |
| srv.serve_forever() | |
| except Exception as e: | |
| print(f"Health server failed: {e}") | |