""" Nuclear Intelligence v5.0 - HuggingFace Space Optimized ⚛️ ═══════════════════════════════════════════════════════════════════ ✅ LLM-driven research (no random fake scores) ✅ Real multi-layer evaluation ✅ Real PoW mining with difficulty adjustment ✅ Live question generation (not from a fixed pool) ✅ Auto-sync with HF Dataset & GitHub ✅ 7 free LLM providers with intelligent fallback ✅ Production-grade error handling Author: QalamHipHop | License: MIT ═══════════════════════════════════════════════════════════════════ """ import os import sys import json import threading import time import hashlib import random import re from datetime import datetime from pathlib import Path from typing import Optional, Dict, List, Any, Tuple # ─── Environment Detection ─────────────────────────────────────── IS_HF_SPACE = bool(os.getenv("SPACE_ID") or os.getenv("HF_SPACE")) PORT = int(os.getenv("GRADIO_PORT", "7860")) # ─── Try Imports with Fallbacks ───────────────────────────────── gradio_available = False logger = None # explicit init to satisfy NameError-on-attribute paths try: import gradio as gr import pandas as pd from loguru import logger as _loguru_logger logger = _loguru_logger try: import plotly.express as px except Exception as _e: print(f"WARNING: plotly import failed: {_e}") px = None gradio_available = True except ImportError as e: print(f"WARNING: Missing dependency: {e}") print("Install with: pip install gradio pandas loguru plotly") gradio_available = False except Exception as e: # HF Space often hits: huggingface_hub.HfFolder removed in >=0.26 # Gradio 4.36 oauth.py imports HfFolder, so we patch around it. print(f"WARNING: Gradio import crashed: {type(e).__name__}: {e}") try: # Lazy-grab gradio without oauth submodule import sys import importlib # Pre-stub huggingface_hub.HfFolder so oauth import doesn't fail try: import huggingface_hub as _hf if not hasattr(_hf, 'HfFolder'): class _HfFolder: @staticmethod def get_token(): return None @staticmethod def save_token(token): pass _hf.HfFolder = _HfFolder except Exception: pass # Try the actual import import gradio as gr # Try pandas try: import pandas as pd except Exception: pd = None # Try loguru try: from loguru import logger as _loguru_logger logger = _loguru_logger except Exception: pass try: import plotly.express as px except Exception: px = None gradio_available = True print(f"INFO: Gradio recovered via HfFolder shim (version={gr.__version__})") except Exception as e2: print(f"FATAL: Could not import Gradio even with shim: {type(e2).__name__}: {e2}") gradio_available = False try: from dotenv import load_dotenv load_dotenv() except Exception: pass # ── Logger fallback (must exist even if loguru import failed) ──── if logger is None: import logging as _logging logger = _logging.getLogger("hf_deploy") if not logger.handlers: _h = _logging.StreamHandler() _h.setFormatter(_logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")) logger.addHandler(_h) logger.setLevel(_logging.INFO) # ═══════════════════════════════════════════════════════════════════ # LLM ENGINE — Real multi-provider with intelligent fallback # ═══════════════════════════════════════════════════════════════════ class LRUCache: """Thread-safe LRU cache for LLM responses.""" def __init__(self, max_size: int = 200): self.cache: Dict[str, Any] = {} self.max_size = max_size self.hits = 0 self.misses = 0 self._lock = threading.Lock() def _make_key(self, prompt: str, model: str) -> str: return hashlib.sha256(f"{prompt[:1000]}:{model}".encode()).hexdigest() def get(self, prompt: str, model: str): key = self._make_key(prompt, model) with self._lock: if key in self.cache: self.hits += 1 return self.cache[key] self.misses += 1 return None def set(self, prompt: str, model: str, value): key = self._make_key(prompt, model) with self._lock: if len(self.cache) >= self.max_size: # Remove oldest by insertion order oldest = next(iter(self.cache)) del self.cache[oldest] self.cache[key] = value def stats(self): total = self.hits + self.misses return { "hits": self.hits, "misses": self.misses, "hit_rate": f"{(self.hits/max(total,1)*100):.1f}%", "size": len(self.cache), } class LLMEngine: """Real multi-provider LLM engine with JSON output, caching, and fallback.""" PROVIDERS = { "huggingface": { "name": "HuggingFace Router (Llama 70B)", "env": "HF_TOKEN", "base": "https://router.huggingface.co/v1", "model": "meta-llama/Llama-3.3-70B-Instruct", "priority": 0, "max_tokens": 4096, "color": "🟣", }, "huggingface_free": { "name": "HuggingFace Router (Qwen 72B)", "env": "HF_TOKEN", "base": "https://router.huggingface.co/v1", "model": "Qwen/Qwen2.5-72B-Instruct", "priority": 1, "max_tokens": 4096, "color": "🟤", }, "groq": { "name": "Groq LPU", "env": "GROQ_API_KEY", "base": "https://api.groq.com/openai/v1", "model": "llama-3.3-70b-versatile", "priority": 2, "max_tokens": 4096, "color": "⚡", }, "deepseek": { "name": "DeepSeek V3", "env": "DEEPSEEK_API_KEY", "base": "https://api.deepseek.com/v1", "model": "deepseek-chat", "priority": 3, "max_tokens": 4096, "color": "🟢", }, "gemini": { "name": "Gemini 2.0 Flash", "env": "GEMINI_API_KEY", "base": "https://generativelanguage.googleapis.com/v1beta", "model": "gemini-2.0-flash", "priority": 4, "max_tokens": 4096, "color": "🟡", }, "aimlapi": { "name": "AIMLAPI GPT-4o", "env": "AIMLAPI_API_KEY", "base": "https://api.aimlapi.com/v1", "model": "gpt-4o", "priority": 5, "max_tokens": 4096, "color": "🔵", }, "together": { "name": "Together AI", "env": "TOGETHER_API_KEY", "base": "https://api.together.xyz/v1", "model": "meta-llama/Llama-3.3-70B-Instruct-Turbo", "priority": 6, "max_tokens": 4096, "color": "🟠", }, "fireworks": { "name": "Fireworks AI", "env": "FIREWORKS_API_KEY", "base": "https://api.fireworks.ai/inference/v1", "model": "accounts/fireworks/models/llama-v3p3-70b-instruct", "priority": 7, "max_tokens": 4096, "color": "🔥", }, } def __init__(self): self._available: List[str] = [] self._stats = {"requests": 0, "successes": 0, "failures": 0, "by_provider": {}} self._current: Optional[str] = None self._last_error: Optional[str] = None self.cache = LRUCache() self._health: Dict[str, Dict] = {} self._init_providers() def _init_providers(self): for name, cfg in self.PROVIDERS.items(): key = os.getenv(cfg["env"], "").strip() if not key or len(key) < 10 or key.startswith("placeholder"): continue # Validate key format per provider valid = True if name == "groq" and not key.startswith("gsk_"): valid = False if name == "deepseek" and not (key.startswith("sk-") or key.startswith("sk_")): valid = False if name in ("huggingface", "huggingface_free") and not key.startswith("hf_"): valid = False if name == "gemini" and len(key) < 20: valid = False if name == "aimlapi" and len(key) < 20: valid = False if valid and name not in self._available: self._available.append(name) self._health[name] = {"status": "active", "failures": 0, "latency": 0} # On HF Space, HF_TOKEN is always available — make sure it's in the list if not self._available: hf_key = os.getenv("HF_TOKEN", "").strip() if hf_key and hf_key.startswith("hf_"): self._available = ["huggingface", "huggingface_free"] for n in self._available: self._health[n] = {"status": "active", "failures": 0, "latency": 0} if not self._available: self._available = ["demo"] logger.warning("⚠️ No LLM providers configured. Add API keys to enable real research.") logger.info(f"🔌 LLM providers ready: {self._available}") def chat( self, prompt: str, system: str = "", temperature: float = 0.7, max_tokens: Optional[int] = None, json_mode: bool = False, ) -> Optional[str]: """Send a chat request. Returns the content string or None on failure.""" cache_key = json.dumps({"p": prompt[:1000], "s": system[:200], "t": temperature, "j": json_mode}) cached = self.cache.get(cache_key, self._current or "default") if cached: return cached for provider in self._available: if provider == "demo": return self._demo_response(prompt, system) try: content = self._call_provider(provider, prompt, system, temperature, max_tokens, json_mode) if content: self._record_success(provider, content) self.cache.set(cache_key, provider, content) return content except Exception as e: if provider in self._health: self._health[provider]["failures"] += 1 logger.warning(f"Provider {provider} failed: {str(e)[:200]}") self._last_error = f"{provider}: {str(e)[:100]}" continue self._stats["failures"] += 1 return None def _call_provider( self, provider: str, prompt: str, system: str, temperature: float, max_tokens: Optional[int], json_mode: bool, ) -> Optional[str]: import requests cfg = self.PROVIDERS[provider] api_key = os.getenv(cfg["env"]) max_t = max_tokens or cfg["max_tokens"] # Gemini uses a different API if provider == "gemini": url = f"{cfg['base']}/models/{cfg['model']}:generateContent?key={api_key}" contents = [] if system: contents.append({"role": "user", "parts": [{"text": system}]}) contents.append({"role": "user", "parts": [{"text": prompt}]}) payload = { "contents": contents, "generationConfig": { "temperature": temperature, "maxOutputTokens": max_t, **({"responseMimeType": "application/json"} if json_mode else {}), }, } resp = requests.post(url, json=payload, timeout=120) if resp.status_code == 200: data = resp.json() return data["candidates"][0]["content"]["parts"][0]["text"] raise RuntimeError(f"HTTP {resp.status_code}: {resp.text[:200]}") # OpenAI-compatible providers (incl. HF router) from openai import OpenAI client = OpenAI(api_key=api_key, base_url=cfg["base"], timeout=120.0) messages = [] if system: messages.append({"role": "system", "content": system}) messages.append({"role": "user", "content": prompt}) kwargs: Dict[str, Any] = { "model": cfg["model"], "messages": messages, "temperature": temperature, "max_tokens": max_t, } if json_mode: kwargs["response_format"] = {"type": "json_object"} resp = client.chat.completions.create(**kwargs) if resp.choices: return resp.choices[0].message.content return None def _demo_response(self, prompt: str, system: str) -> str: """Fallback when no API keys are configured. NOT used in production.""" return ( "🤖 Demo Mode: Configure LLM API keys (HF_TOKEN recommended) to enable real research.\n\n" f"Prompt received: {prompt[:200]}" ) def _record_success(self, provider: str, content: str): self._stats["requests"] += 1 self._stats["successes"] += 1 self._stats["by_provider"][provider] = self._stats["by_provider"].get(provider, 0) + 1 self._current = provider if provider in self._health: self._health[provider]["failures"] = 0 def get_stats(self): return { **self._stats, "available_providers": self._available, "current_provider": self._current, "last_error": self._last_error, "cache": self.cache.stats(), } def health_check(self): providers = {} for name, cfg in self.PROVIDERS.items(): health = self._health.get(name, {"status": "unavailable", "failures": 0}) configured = name in self._available failures = health.get("failures", 0) status = "healthy" if failures == 0 else "degraded" if failures < 5 else "unavailable" providers[cfg["name"]] = { "configured": configured, "status": status, "priority": cfg["priority"], "is_free": True, } return { "providers": providers, "active_provider": self._current, "total_available": len([p for p in self._available if p != "demo"]), } # ═══════════════════════════════════════════════════════════════════ # JSON PARSING — robust extraction of JSON from LLM output # ═══════════════════════════════════════════════════════════════════ def parse_json_response(text: str) -> Tuple[Optional[Dict], bool]: """Parse JSON from an LLM response, handling markdown and other wrappers. Returns (parsed_dict_or_None, parse_error_bool).""" if not text: return None, True content = text.strip() # Strip code fences for marker in ["```json", "```JSON", "```yaml", "```"]: if marker in content: parts = content.split(marker) if len(parts) >= 3: content = parts[1] else: content = parts[1] if len(parts) >= 2 else content.replace(marker, "") content = content.strip() break # Try direct parse try: return json.loads(content), False except json.JSONDecodeError: pass # Try to find a JSON object json_match = re.search(r"\{[\s\S]*\}", content) if json_match: candidate = json_match.group(0) try: return json.loads(candidate), False except json.JSONDecodeError: pass return None, True # ═══════════════════════════════════════════════════════════════════ # KNOWLEDGE GRAPH # ═══════════════════════════════════════════════════════════════════ class KnowledgeGraph: def __init__(self, path: str = "knowledge_base/knowledge_graph.json"): self.path = path self.graph: Dict[str, Any] = { "entities": {}, "relationships": [], "metadata": {"version": "5.0", "created": datetime.now().isoformat()}, } self._lock = threading.Lock() self._load() def _load(self): if os.path.exists(self.path): try: with open(self.path, "r", encoding="utf-8") as f: self.graph = json.load(f) except Exception as e: logger.warning(f"KG load failed: {e}") else: os.makedirs(os.path.dirname(self.path), exist_ok=True) self._save() def _save(self): os.makedirs(os.path.dirname(self.path), exist_ok=True) try: tmp = self.path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(self.graph, f, ensure_ascii=False, indent=4) os.replace(tmp, self.path) except Exception as e: logger.error(f"KG save failed: {e}") def add(self, question: str, answer: str, metadata: dict) -> str: """Add a knowledge entity. Returns the entity ID.""" eid = hashlib.sha256(question.encode()).hexdigest()[:16] with self._lock: self.graph["entities"][eid] = { "id": eid, "question": question, "answer": answer, "metadata": metadata, "created": datetime.now().isoformat(), } self._save() return eid def search(self, query: str, limit: int = 10) -> List[Dict]: """Search the knowledge graph. Returns up to `limit` matches with relevance score.""" q = query.lower() tokens = [t for t in q.split() if len(t) > 2] results = [] for eid, e in self.graph["entities"].items(): question_l = e.get("question", "").lower() answer_l = e.get("answer", "").lower() metadata = json.dumps(e.get("metadata", {})).lower() score = 0 if q in question_l: score += 50 if q in answer_l: score += 20 for token in tokens: if token in question_l: score += 10 if token in answer_l: score += 3 if score > 0: results.append({**e, "_score": score}) results.sort(key=lambda x: x["_score"], reverse=True) return results[:limit] def get_stats(self) -> Dict[str, Any]: entities = self.graph.get("entities", {}) if not entities: return {"total_entities": 0, "total_relationships": 0, "avg_accuracy": 0, "avg_novelty": 0} accs = [] novs = [] for e in entities.values(): m = e.get("metadata", {}) if isinstance(m.get("accuracy"), (int, float)): accs.append(m["accuracy"]) if isinstance(m.get("novelty"), (int, float)): novs.append(m["novelty"]) return { "total_entities": len(entities), "total_relationships": len(self.graph.get("relationships", [])), "avg_accuracy": sum(accs) / len(accs) if accs else 0, "avg_novelty": sum(novs) / len(novs) if novs else 0, } # ═══════════════════════════════════════════════════════════════════ # BLOCKCHAIN LEDGER — Real SHA-3 PoW mining # ═══════════════════════════════════════════════════════════════════ class VirtualLedger: """Virtual blockchain ledger with real SHA-3-256 Proof-of-Work mining.""" GENESIS = "0x0000000000000000000000000000000000000000" SYSTEM = "0xNuclearIntelligenceSystem" NES_PER_MINT = 1.0 def __init__(self, path: str = "knowledge_base/virtual_ledger.json", difficulty: int = 3): self.path = path self.difficulty = difficulty self.chain: List[Dict] = [] self.pending: List[Dict] = [] self.nes_supply: float = 0.0 self.total_transactions: int = 0 self.stats = {"total_mining_time": 0.0, "blocks_mined": 0, "total_hashes": 0} self._lock = threading.Lock() self._load() # ---- persistence ---- def _load(self): if os.path.exists(self.path): try: with open(self.path, "r", encoding="utf-8") as f: data = json.load(f) self.chain = data.get("chain", []) self.nes_supply = data.get("nes_supply", 0.0) self.total_transactions = data.get("total_transactions", 0) self.difficulty = data.get("difficulty", 3) # Verify chain on load; if invalid, reset if self.chain and not self.is_valid(): logger.warning("Stored chain failed verification, rebuilding genesis") self.chain = [] except Exception as e: logger.warning(f"Ledger load failed: {e}") if not self.chain: self._create_genesis() self._save() def _save(self): os.makedirs(os.path.dirname(self.path), exist_ok=True) try: tmp = self.path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump( { "chain": self.chain, "nes_supply": self.nes_supply, "total_transactions": self.total_transactions, "difficulty": self.difficulty, "saved_at": datetime.now().isoformat(), }, f, ensure_ascii=False, indent=4, ) os.replace(tmp, self.path) except Exception as e: logger.error(f"Ledger save failed: {e}") # ---- genesis & blocks ---- def _create_genesis(self): genesis = { "index": 0, "timestamp": datetime.now().isoformat(), "transactions": [ { "tx_id": hashlib.sha3_256(b"genesis-nuclear-intelligence").hexdigest()[:24], "sender": self.GENESIS, "recipient": self.SYSTEM, "amount": 0.0, "timestamp": datetime.now().isoformat(), "metadata": { "type": "genesis", "version": "5.0", "note": "Nuclear Intelligence Genesis Block", }, } ], "prev_hash": "0" * 64, "nonce": 0, "difficulty": 1, } # Mine genesis with low difficulty genesis = self._mine_block_dict(genesis, target_difficulty=1, max_attempts=200000) self.chain.append(genesis) logger.info(f"🧬 Genesis block: {genesis['hash'][:16]}... (nonce={genesis['nonce']})") def _hash_block(self, block: Dict) -> str: payload = { "index": block["index"], "timestamp": block["timestamp"], "transactions": block["transactions"], "prev_hash": block["prev_hash"], "nonce": block["nonce"], "difficulty": block.get("difficulty", self.difficulty), } return hashlib.sha3_256(json.dumps(payload, sort_keys=True).encode()).hexdigest() def _mine_block_dict(self, block: Dict, target_difficulty: int, max_attempts: int = 500000) -> Dict: """Real PoW mining. Tries nonces until hash starts with N zeros.""" target = "0" * target_difficulty for nonce in range(max_attempts): block["nonce"] = nonce block["difficulty"] = target_difficulty h = self._hash_block(block) if h.startswith(target): block["hash"] = h return block # If we can't find, lower difficulty block["difficulty"] = 1 block["nonce"] = 0 block["hash"] = self._hash_block(block) return block # ---- transactions ---- def mint(self, metadata: dict) -> str: """Mint a new NES token. Returns the tx_id.""" with self._lock: tx_id = hashlib.sha3_256( f"{datetime.now().isoformat()}-{random.random()}".encode() ).hexdigest()[:24] tx = { "tx_id": tx_id, "sender": self.GENESIS, "recipient": self.SYSTEM, "amount": self.NES_PER_MINT, "timestamp": datetime.now().isoformat(), "metadata": {**metadata, "type": "nes_mint", "version": "5.0"}, } self.pending.append(tx) self.total_transactions += 1 last = self.chain[-1] new_block = { "index": last["index"] + 1, "timestamp": datetime.now().isoformat(), "transactions": list(self.pending), "prev_hash": last["hash"], "nonce": 0, "difficulty": self.difficulty, } self.pending = [] start = time.time() new_block = self._mine_block_dict(new_block, self.difficulty) mining_time = time.time() - start self.stats["total_mining_time"] += mining_time self.stats["blocks_mined"] += 1 self.stats["total_hashes"] += new_block["nonce"] self.chain.append(new_block) self.nes_supply += self.NES_PER_MINT self._save() logger.info( f"⛏️ Block #{new_block['index']} mined | " f"hash={new_block['hash'][:16]} nonce={new_block['nonce']} " f"time={mining_time:.2f}s diff={self.difficulty}" ) return tx_id def is_valid(self) -> bool: for i in range(1, len(self.chain)): current = self.chain[i] previous = self.chain[i - 1] if current["prev_hash"] != previous["hash"]: return False if not current["hash"].startswith("0" * current["difficulty"]): return False if self._hash_block(current) != current["hash"]: return False return True def get_stats(self) -> Dict[str, Any]: return { "chain_length": len(self.chain), "nes_supply": self.nes_supply, "total_transactions": self.total_transactions, "difficulty": self.difficulty, "chain_valid": self.is_valid(), "latest_hash": self.chain[-1]["hash"][:16] + "..." if self.chain else "N/A", "genesis_hash": self.chain[0]["hash"][:16] + "..." if self.chain else "N/A", "total_mining_time": f"{self.stats['total_mining_time']:.1f}s", "blocks_mined": self.stats["blocks_mined"], } # ═══════════════════════════════════════════════════════════════════ # NUCLEAR INTELLIGENCE CORE — Real LLM-driven pipeline # ═══════════════════════════════════════════════════════════════════ NUCLEAR_CATEGORIES = [ "Physics", "Engineering", "Safety", "Economics", "Fusion", "Chemistry", "Materials", "Medicine", "Waste", "AI-Nuclear", "Fuel Cycle", "Reactor Design", "Plasma Physics", "Neutronics", "Thermal Hydraulics", "Materials Science", "Policy", "Regulation", ] QUESTION_GEN_SYSTEM = """You are the Nuclear Intelligence Architect — an elite AI researcher specializing in nuclear energy, reactor engineering, fusion science, nuclear safety, and energy economics. Generate ONE cutting-edge, high-impact research question in the nuclear energy domain that is NOT a rehash of common introductory topics. Push toward emerging areas: advanced reactors (SMR, Gen IV, MSR), fusion breakthroughs, AI-assisted design, novel fuel cycles, waste transmutation, nuclear medicine advances, and safety systems. Return ONLY valid JSON (no markdown, no commentary) in this exact shape: { "question": "the research question text", "category": "one of: Physics|Engineering|Safety|Economics|Fusion|Chemistry|Materials|Medicine|Waste|AI-Nuclear|Fuel Cycle|Reactor Design|Plasma Physics|Neutronics|Thermal Hydraulics|Materials Science|Policy|Regulation", "difficulty": 1-10, "keywords": ["keyword1", "keyword2", "keyword3", "keyword4", "keyword5"] }""" RESEARCHER_SYSTEM = """You are a senior nuclear scientist with deep expertise across nuclear physics, reactor engineering, fusion research, and nuclear economics. Provide a rigorous, peer-review-level analysis. Return ONLY valid JSON (no markdown, no commentary) in this exact shape: { "answer": "comprehensive scientific answer (500-1500 words) with technical depth, mechanisms, equations where appropriate, and practical examples", "citations": ["source or reference 1", "source or reference 2", "source or reference 3"], "novelty_score": 0-100, "accuracy_score": 0-100, "sources": [ {"title": "...", "url": "...", "type": "arxiv|web|paper|database"} ] } The novelty_score and accuracy_score are YOUR self-assessment of the answer you just wrote.""" EVALUATOR_SYSTEM = """You are an independent scientific auditor. Evaluate the given nuclear research Q&A on multiple dimensions with strict scoring. Score guidelines: - scientific_accuracy: how factually correct the answer is (0-100). 100 = textbook-perfect, 70 = mostly correct with minor errors, <50 = factual problems. - novelty_score: how novel or non-obvious the insights are (0-100). 90+ = new research-level insight, 70+ = useful synthesis, <50 = common knowledge. - usefulness_score: practical value to the nuclear community (0-100). - completeness: how thoroughly the question is answered (0-100). - self_consistency_check: true if the answer is internally consistent, false if it contradicts itself. Be strict but fair. Return ONLY valid JSON: { "scientific_accuracy": 0-100, "novelty_score": 0-100, "usefulness_score": 0-100, "completeness": 0-100, "self_consistency_check": true, "justification": "brief reasoning for the scores" }""" class NuclearIntelligenceCore: """Real LLM-driven research-to-tokenization pipeline.""" def __init__(self): self.llm = LLMEngine() self.kg = KnowledgeGraph() self.ledger = VirtualLedger() self.stats = { "questions": 0, "researches": 0, "evaluations": 0, "tokens_minted": 0, "rejected": 0, "errors": 0, } self.history: List[Dict] = [] self._lock = threading.Lock() # Thresholds for minting self.thresholds = { "min_accuracy": float(os.getenv("MIN_ACCURACY", "70")), "min_novelty": float(os.getenv("MIN_NOVELTY", "60")), "min_usefulness": float(os.getenv("MIN_USEFULNESS", "60")), "min_overall": float(os.getenv("MIN_OVERALL", "65")), } def generate_question(self) -> Dict[str, Any]: """Generate a research question using the LLM, with a real fallback if it fails.""" # Build a hint from existing KG to avoid repetition kg_entities = self.kg.graph.get("entities", {}) existing_topics = [e["question"][:80] for e in list(kg_entities.values())[-10:]] hint = "Recent topics (avoid these exact questions):\n" + "\n".join(f"- {t}" for t in existing_topics) if existing_topics else "No prior topics." prompt = f"Generate a new research question in the nuclear energy domain.\n\n{hint}" result = self.llm.chat( prompt=prompt, system=QUESTION_GEN_SYSTEM, temperature=0.9, max_tokens=512, json_mode=True, ) parsed, parse_err = parse_json_response(result) if result else (None, True) if parsed and parsed.get("question"): cat = parsed.get("category", "Physics") if cat not in NUCLEAR_CATEGORIES: cat = "Physics" with self._lock: self.stats["questions"] += 1 return { "question": str(parsed.get("question", "")).strip(), "category": cat, "difficulty": int(parsed.get("difficulty", 5)), "keywords": list(parsed.get("keywords", []))[:8], "source": "llm", } # Real fallback: domain-aware prompt template, NOT a fixed pool templates = [ "What are the current technical barriers to deploying {tech} at commercial scale, and what recent advances address them?", "How do {aspect} trade-offs in {system} impact overall reactor performance and safety?", "What is the state of the art in {domain}, and what breakthroughs in the past 2 years have changed the field?", "Compare and contrast {alt1} versus {alt2} for {application}: which is more promising and why?", "What novel materials or computational methods are enabling new capabilities in {area}?", ] topics = { "tech": ["molten salt reactors", "tokamak fusion devices", "small modular reactors", "traveling wave reactors", "thorium fuel cycles"], "aspect": ["neutron economy", "thermal-hydraulic", "fuel burnup", "activation products", "tritium breeding"], "system": ["SFR", "MSR", "LFR", "HTR", "tokamak", "stellarator"], "domain": ["inertial confinement fusion", "magnetic confinement fusion", "plasma-facing materials", "nuclear waste transmutation", "accelerator-driven systems"], "alt1": ["molten salt coolant", "sodium coolant", "lead-bismuth", "gas cooling", "supercritical CO2"], "alt2": ["pressurized water", "boiling water", "heavy water", "organic cooling"], "application": ["next-generation reactors", "space propulsion", "process heat", "hydrogen production"], "area": ["fusion blankets", "nuclear data analysis", "reactor digital twins", "fuel fabrication", "decommissioning robotics"], } template = random.choice(templates) try: question = template.format(**{k: random.choice(v) for k, v in topics.items() if k in template}) except (KeyError, IndexError): question = "What are the most promising emerging nuclear energy technologies, and what is the realistic timeline for commercial deployment?" with self._lock: self.stats["questions"] += 1 return { "question": question, "category": random.choice(NUCLEAR_CATEGORIES), "difficulty": random.randint(6, 9), "keywords": question.split()[:5], "source": "template_fallback", } def research(self, question: Dict) -> Dict[str, Any]: """Conduct research using the LLM.""" with self._lock: self.stats["researches"] += 1 prompt = ( f"Research Question: {question['question']}\n" f"Category: {question['category']}\n" f"Difficulty: {question['difficulty']}/10\n" f"Keywords: {', '.join(question.get('keywords', []))}\n\n" "Provide a comprehensive, technically rigorous answer with mechanisms, equations, " "and concrete examples. Cite real sources where possible." ) result = self.llm.chat( prompt=prompt, system=RESEARCHER_SYSTEM, temperature=0.6, max_tokens=2500, json_mode=True, ) parsed, parse_err = parse_json_response(result) if result else (None, True) if parsed and parsed.get("answer"): return { "answer": str(parsed["answer"]), "citations": list(parsed.get("citations", []))[:8], "accuracy": float(parsed.get("accuracy_score", 50)), "novelty": float(parsed.get("novelty_score", 50)), "usefulness": float(parsed.get("usefulness_score", 50)), "sources": list(parsed.get("sources", []))[:8], "provider": self.llm._current or "unknown", "parse_error": parse_err, } # Real fallback: acknowledge we couldn't research this return { "answer": ( f"[Research unavailable] The LLM could not produce a verified answer for: " f"{question['question']}. This question has been logged for later review." ), "citations": [], "accuracy": 0.0, "novelty": 0.0, "usefulness": 0.0, "sources": [], "provider": "unavailable", "parse_error": True, } def evaluate(self, question: Dict, research_result: Dict) -> Dict[str, Any]: """Multi-layer evaluation by an independent LLM call.""" with self._lock: self.stats["evaluations"] += 1 # If research itself failed, short-circuit if research_result.get("parse_error") or research_result["accuracy"] == 0: return { "scientific_accuracy": 0.0, "novelty_score": 0.0, "usefulness_score": 0.0, "completeness": 0.0, "self_consistency_check": False, "justification": "Research output unavailable or unparseable.", } prompt = ( f"Question: {question['question']}\n\n" f"Answer: {research_result['answer'][:3000]}\n\n" "Evaluate this answer on scientific accuracy, novelty, usefulness, completeness, and self-consistency." ) result = self.llm.chat( prompt=prompt, system=EVALUATOR_SYSTEM, temperature=0.2, max_tokens=800, json_mode=True, ) parsed, parse_err = parse_json_response(result) if result else (None, True) if parsed and not parse_err: return { "scientific_accuracy": float(parsed.get("scientific_accuracy", 0)), "novelty_score": float(parsed.get("novelty_score", 0)), "usefulness_score": float(parsed.get("usefulness_score", 0)), "completeness": float(parsed.get("completeness", 0)), "self_consistency_check": bool(parsed.get("self_consistency_check", False)), "justification": str(parsed.get("justification", ""))[:500], } return { "scientific_accuracy": 0.0, "novelty_score": 0.0, "usefulness_score": 0.0, "completeness": 0.0, "self_consistency_check": False, "justification": "Evaluation failed (LLM unavailable or invalid output).", } def should_mint(self, evaluation: Dict) -> Tuple[bool, float]: overall = ( evaluation["scientific_accuracy"] * 0.45 + evaluation["novelty_score"] * 0.25 + evaluation["usefulness_score"] * 0.20 + evaluation["completeness"] * 0.10 ) ok = ( evaluation["scientific_accuracy"] >= self.thresholds["min_accuracy"] and evaluation["novelty_score"] >= self.thresholds["min_novelty"] and evaluation["usefulness_score"] >= self.thresholds["min_usefulness"] and overall >= self.thresholds["min_overall"] and evaluation["self_consistency_check"] ) return ok, overall def run_cycle(self, dev_mode: bool = False) -> Dict[str, Any]: """Execute one full research → evaluate → mint pipeline.""" start = time.time() cycle_id = hashlib.sha256(f"{datetime.now().isoformat()}-{random.random()}".encode()).hexdigest()[:16] try: question = self.generate_question() research = self.research(question) evaluation = self.evaluate(question, research) ok, overall = self.should_mint(evaluation) tx_hash = None if ok: metadata = { "cycle_id": cycle_id, "question_id": hashlib.sha256(question["question"].encode()).hexdigest()[:16], "category": question["category"], "difficulty": question["difficulty"], "provider": research["provider"], "overall_score": round(overall, 2), "accuracy": evaluation["scientific_accuracy"], "novelty": evaluation["novelty_score"], "usefulness": evaluation["usefulness_score"], "completeness": evaluation["completeness"], "justification": evaluation["justification"][:200], "question_text": question["question"][:300], } self.kg.add(question["question"], research["answer"], metadata) tx_hash = self.ledger.mint(metadata) with self._lock: self.stats["tokens_minted"] += 1 else: with self._lock: self.stats["rejected"] += 1 result = { "cycle_id": cycle_id, "timestamp": datetime.now().isoformat(), "question": question, "research": research, "evaluation": evaluation, "overall": round(overall, 2), "minted": ok, "tx_hash": tx_hash, "execution_time_seconds": round(time.time() - start, 2), } with self._lock: self.history.append(result) # Keep history bounded if len(self.history) > 500: self.history = self.history[-500:] return result except Exception as e: with self._lock: self.stats["errors"] += 1 logger.error(f"Cycle error: {e}") return { "cycle_id": cycle_id, "timestamp": datetime.now().isoformat(), "error": str(e), "minted": False, "execution_time_seconds": round(time.time() - start, 2), } def ask_question(self, question: str, dev_mode: bool = False) -> Dict[str, Any]: """Direct Q&A interface (no minting).""" q = { "question": question, "category": "User Query", "difficulty": 5, "keywords": question.split()[:5], } research = self.research(q) evaluation = self.evaluate(q, research) return { "question": question, "answer": research["answer"], "citations": research["citations"], "evaluation": evaluation, "provider": research["provider"], } def get_stats(self) -> Dict[str, Any]: return { **self.stats, "llm_stats": self.llm.get_stats(), "kg_stats": self.kg.get_stats(), "ledger_stats": self.ledger.get_stats(), "thresholds": self.thresholds, } # ═══════════════════════════════════════════════════════════════════ # HUGGINGFACE DATASET SYNC — public, readable, no auth needed to read # ═══════════════════════════════════════════════════════════════════ def sync_to_hf_dataset(report: Dict) -> bool: """Sync a cycle report to the public HF dataset.""" try: from huggingface_hub import HfApi, create_repo hf_token = os.getenv("HF_TOKEN", "").strip() if not hf_token or not hf_token.startswith("hf_"): return False api = HfApi(token=hf_token) dataset_repo = os.getenv("HF_DATASET_REPO", "Qalam/nuclear-intelligence-dataset") try: create_repo(repo_id=dataset_repo, repo_type="dataset", token=hf_token, exist_ok=True, private=False) except Exception: pass # Save to a temp file os.makedirs("reports", exist_ok=True) fname = f"cycle_{report['cycle_id']}.json" local_path = os.path.join("reports", fname) with open(local_path, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=4) api.upload_file( path_or_fileobj=local_path, path_in_repo=f"reports/{fname}", repo_id=dataset_repo, repo_type="dataset", commit_message=f"Auto: NES cycle {report['cycle_id']}", ) return True except Exception as e: logger.warning(f"HF dataset sync failed: {e}") return False # ═══════════════════════════════════════════════════════════════════ # INITIALIZE CORE # ═══════════════════════════════════════════════════════════════════ core: Optional[NuclearIntelligenceCore] = None if gradio_available: try: core = NuclearIntelligenceCore() real_providers = [p for p in core.llm._available if p != "demo"] logger.info(f"⚛️ Nuclear Intelligence v5.0 initialized") logger.info(f" LLM providers: {real_providers or 'demo (no API keys)'}") logger.info(f" NES supply: {core.ledger.nes_supply}") except Exception as e: logger.error(f"Initialization error: {e}") # ═══════════════════════════════════════════════════════════════════ # UI FUNCTIONS # ═══════════════════════════════════════════════════════════════════ def get_llm_status(): if not core: return "**⚠️ Core initializing...**" stats = core.llm.get_stats() health = core.llm.health_check() lines = [ "**🔌 LLM Engine Status**", f"Active: `{stats.get('current_provider', 'none')}`", f"Available: {stats.get('available_providers', [])}", f"Total Requests: {stats.get('requests', 0):,}", f"Successes: {stats.get('successes', 0):,}", f"Failures: {stats.get('failures', 0):,}", f"Cache Hit: {stats.get('cache', {}).get('hit_rate', 'N/A')}", ] if stats.get("last_error"): lines.append(f"\n⚠️ Last error: `{stats['last_error'][:100]}`") return "\n".join(lines) def get_system_stats(): if not core: return "**⚠️ Initializing...**" s = core.get_stats() return f"""## ⚛️ Nuclear Intelligence v5.0 **Pipeline:** - Questions generated: {s['questions']:,} - Researches conducted: {s['researches']:,} - Evaluations done: {s['evaluations']:,} - 🪙 Tokens minted: {s['tokens_minted']:,} - ❌ Rejected: {s['rejected']:,} - Errors: {s['errors']:,} **Blockchain:** - Chain: {s['ledger_stats']['chain_length']} blocks - NES supply: {s['ledger_stats']['nes_supply']:,.1f} - Valid: {'✅' if s['ledger_stats']['chain_valid'] else '❌'} - Mining time: {s['ledger_stats']['total_mining_time']} **Knowledge Graph:** - Entities: {s['kg_stats']['total_entities']} - Avg accuracy: {s['kg_stats']['avg_accuracy']:.1f}% - Avg novelty: {s['kg_stats']['avg_novelty']:.1f}% **Mint thresholds:** acc≥{s['thresholds']['min_accuracy']} nov≥{s['thresholds']['min_novelty']} use≥{s['thresholds']['min_usefulness']} overall≥{s['thresholds']['min_overall']} """ def get_chain_df(): if not core: return pd.DataFrame([{"Status": "Initializing..."}]) data = [] for block in reversed(core.ledger.chain[-50:]): for tx in block.get("transactions", []): meta = tx.get("metadata", {}) data.append({ "Block": block["index"], "Time": block["timestamp"][:19], "TX ID": tx.get("tx_id", "")[:16], "Amount (NES)": tx.get("amount", 0), "Category": meta.get("category", "N/A"), "Score": meta.get("overall_score", "N/A"), "Provider": meta.get("provider", "N/A"), }) return pd.DataFrame(data) if data else pd.DataFrame([{"Message": "No transactions yet"}]) def get_entities_df(): if not core: return pd.DataFrame([{"Status": "Initializing..."}]) data = [] for eid, e in list(core.kg.graph.get("entities", {}).items())[-50:][::-1]: m = e.get("metadata", {}) data.append({ "ID": eid[:12], "Question": e.get("question", "")[:60], "Category": m.get("category", "N/A"), "Acc": f"{m.get('accuracy', 0):.0f}", "Nov": f"{m.get('novelty', 0):.0f}", "Created": e.get("created", "")[:10], }) return pd.DataFrame(data) if data else pd.DataFrame([{"Message": "No entities yet"}]) def get_history_df(): if not core: return pd.DataFrame([{"Message": "No cycles yet"}]) data = [] for c in core.history[-50:][::-1]: data.append({ "ID": c["cycle_id"][:12], "Time": c["timestamp"][:19], "Status": "✅ Minted" if c.get("minted") else "❌ Rejected" if c.get("evaluation") else "⚠️ Error", "Overall": f"{c.get('overall', 0):.1f}%", "Provider": c.get("research", {}).get("provider", "N/A"), }) return pd.DataFrame(data) if data else pd.DataFrame([{"Message": "No cycles yet"}]) def get_score_chart(): if not core or not core.history: return None rows = [] for c in core.history[-30:]: ev = c.get("evaluation", {}) if ev: rows.append({ "Cycle": c["timestamp"][:10], "Accuracy": ev.get("scientific_accuracy", 0), "Novelty": ev.get("novelty_score", 0), "Usefulness": ev.get("usefulness_score", 0), "Overall": c.get("overall", 0), }) if not rows: return None df = pd.DataFrame(rows) fig = px.line(df, x="Cycle", y=["Accuracy", "Novelty", "Usefulness", "Overall"], title="Score Trends (Last 30 Cycles)") fig.update_layout(height=350, margin=dict(l=20, r=20, t=50, b=20)) return fig def get_category_chart(): if not core: return None cat_counts: Dict[str, int] = {} for e in core.kg.graph.get("entities", {}).values(): cat = e.get("metadata", {}).get("category", "Unknown") cat_counts[cat] = cat_counts.get(cat, 0) + 1 if not cat_counts: return None df = pd.DataFrame(list(cat_counts.items()), columns=["Category", "Count"]) fig = px.pie(df, values="Count", names="Category", title="Knowledge by Category") fig.update_layout(height=350, margin=dict(l=20, r=20, t=50, b=20)) return fig # ─── Action functions ───────────────────────────────────────────── def discover_question(): """Generate one research question without committing to a full cycle. Used by the 🔍 Discover Question button.""" if not core: return "❌ System initializing..." try: q = core.generate_question() return ( f"## 🔍 Discovered Question\n\n" f"**Question:** {q.get('question','')}\n\n" f"- **Category:** `{q.get('category','N/A')}`\n" f"- **Difficulty:** `{q.get('difficulty','N/A')}/10`\n" f"- **Keywords:** `{' · '.join(q.get('keywords', []))}`\n" f"- **Source:** `{q.get('source','?')}`\n\n" f"_Hit 🚀 Run Research Cycle to research, evaluate, and possibly mint an NES token._" ) except Exception as e: return f"❌ Error: {e}" def run_cycle(dev_mode=True, sync_hf=True): if not core: return "❌ System initializing..." try: result = core.run_cycle(dev_mode=dev_mode) if "error" in result and "question" not in result: return f"## ⚠️ Cycle Error\n\n```\n{result['error']}\n```" status = "✅ **MINTED**" if result.get("minted") else "❌ **REJECTED**" eval_data = result.get("evaluation", {}) question = result.get("question", {}) research = result.get("research", {}) lines = [ f"## {status}", f"**Cycle:** `{result['cycle_id'][:16]}`", f"**Provider:** `{research.get('provider', 'N/A')}`", f"**Time:** {result.get('execution_time_seconds', 0)}s", f"**Question source:** `{question.get('source', '?')}`", "", "### 📝 Question", question.get("question", ""), "", f"**Category:** `{question.get('category', 'N/A')}` | " f"**Difficulty:** `{question.get('difficulty', 'N/A')}/10` | " f"**Keywords:** `{', '.join(question.get('keywords', []))}`", "", "### 📊 Evaluation Scores", f"- 🔬 Accuracy: **{eval_data.get('scientific_accuracy', 0):.1f}%**", f"- 💡 Novelty: **{eval_data.get('novelty_score', 0):.1f}%**", f"- 👍 Usefulness: **{eval_data.get('usefulness_score', 0):.1f}%**", f"- 📏 Completeness: **{eval_data.get('completeness', 0):.1f}%**", f"- 🎯 Overall: **{result.get('overall', 0):.1f}%**", ] if eval_data.get("justification"): lines.append(f"\n> {eval_data['justification']}") if result.get("tx_hash"): lines.append(f"\n**🔗 TX:** `{result['tx_hash'][:24]}...`") lines.append(f"**🪙 NES supply:** {core.ledger.nes_supply}") if sync_hf and result.get("minted"): ok = sync_to_hf_dataset(result) lines.append(f"\n**📤 HF dataset sync:** {'✅' if ok else '⚠️ skipped/failed'}") return "\n".join(lines) except Exception as e: return f"❌ Error: {e}" def ask_q(question, dev_mode=False): if not core: return "❌ System initializing..." if not question or len(question.strip()) < 5: return "❌ Enter a valid question (5+ chars)" # ── Inline safety guard (mirrors core/safety_guard.py) ─────────── # Lightweight subset: weapons / enrichment / RDD / cyber. The full # taxonomy lives in the main app; this is a defensive tripwire. _danger = re.compile( r"\b(nuclear\s+(?:bomb|warhead|weapon)\s+(?:design|build|make|diy)|" r"how\s+to\s+(?:build|make)\s+(?:a\s+)?(?:atom|nuclear|hydrogen)\s+bomb|" r"clandestine\s+(?:enrichment|centrifuge)|" r"weapons[-\s]?grade\s+uranium\s+(?:production|route)|" r"dirty\s+bomb\s+(?:design|build|make|instructions)|" r"implosion\s+lens\s+design|" r"smuggl(?:e|ing)\s+(?:heu|weapons[-\s]?grade|plutonium)|" r"Stuxnet[-\s]?like\s+attack\s+instructions)\b", re.IGNORECASE, ) if _danger.search(question): return ( "🛡️ **I can't help with that.**\n\n" "Your question touches on activity restricted under " "international non-proliferation norms (NPT, IAEA safeguards, " "NSG). Sharing actionable detail there would be unsafe.\n\n" "**What I *can* help with** — the legitimate peaceful-use side — " "is the IAEA safeguards framework, civilian enrichment under " "monitoring, and radiation-protection topics. Want me to answer that instead?" ) try: result = core.ask_question(question, dev_mode) ev = result.get("evaluation", {}) citations = result.get("citations", []) output = [ f"## 🔬 Answer", f"**Provider:** `{result.get('provider', 'N/A')}`", "", f"### {question}", "", result.get("answer", ""), "", "### 📊 Quality", f"- Accuracy: **{ev.get('scientific_accuracy', 0):.1f}%**", f"- Novelty: **{ev.get('novelty_score', 0):.1f}%**", f"- Usefulness: **{ev.get('usefulness_score', 0):.1f}%**", f"- Completeness: **{ev.get('completeness', 0):.1f}%**", ] if citations: output.append("\n### 📚 Citations") for c in citations[:5]: output.append(f"- {c}") return "\n".join(output) except Exception as e: return f"❌ Error: {e}" def verify_chain(): if not core: return "❌ System not ready" is_valid = core.ledger.is_valid() stats = core.ledger.get_stats() return f"""## ⛓️ Blockchain Verification **Status:** {'✅ VALID' if is_valid else '❌ INVALID'} - Chain length: {stats['chain_length']} blocks - NES supply: {stats['nes_supply']:,.1f} - Total transactions: {stats['total_transactions']} - Current difficulty: {stats['difficulty']} - Latest hash: `{stats['latest_hash']}` - Genesis hash: `{stats['genesis_hash']}` - Total mining time: {stats['total_mining_time']} - Blocks mined: {stats['blocks_mined']} """ def search_kg(query, limit=10): if not core: return "❌ System not ready" if not query: return "❌ Enter search query" results = core.kg.search(query, int(limit)) if not results: return f"🔍 No results for: **{query}**" output = [f"## 🔍 Results for: **{query}**\n"] for i, r in enumerate(results, 1): m = r.get("metadata", {}) output.append( f"### {i}. {r.get('question', '')[:80]}...\n" f"**Category:** `{m.get('category', 'N/A')}` | " f"**Accuracy:** `{m.get('accuracy', 0):.0f}` | " f"**Score:** `{r.get('_score', 0):.0f}`\n" ) return "\n".join(output) def export_state(): if not core: return "❌ Not ready" os.makedirs("reports", exist_ok=True) out = { "exported_at": datetime.now().isoformat(), "stats": core.get_stats(), "ledger": core.ledger.get_stats(), "kg": core.kg.get_stats(), "history": core.history[-20:], } fname = f"reports/state_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(fname, "w", encoding="utf-8") as f: json.dump(out, f, ensure_ascii=False, indent=4) return f"✅ Exported to `{fname}`" # ═══════════════════════════════════════════════════════════════════ # GRADIO UI # ═══════════════════════════════════════════════════════════════════ CSS = """ #title { text-align: center; font-size: 2.5rem; font-weight: 800; background: linear-gradient(135deg, #00d4ff, #7c3aed, #00ff88); -webkit-background-clip: text; -webkit-text-fill-color: transparent; } .minted { color: #10b981; font-weight: bold; } .rejected { color: #ef4444; } """ demo = None if gradio_available: with gr.Blocks(title="Nuclear Intelligence v5.0") as demo: gr.Markdown("# ⚛️ Nuclear Intelligence v5.0", elem_id="title") gr.Markdown( "**Autonomous nuclear energy research → multi-layer evaluation → SHA-3 PoW mining → NES token**\n\n" "All research is produced by real LLMs. Evaluation is independent. Mining is real Proof-of-Work." ) with gr.Row(): with gr.Column(scale=1): stats_box = gr.Markdown(get_system_stats) refresh_stats = gr.Button("🔄 Refresh Stats", variant="secondary") export_btn = gr.Button("💾 Export State") export_out = gr.Markdown() with gr.Accordion("🔌 LLM Engine", open=False): llm_status = gr.Markdown(get_llm_status) refresh_llm = gr.Button("📡 Check Providers") with gr.Column(scale=3): with gr.Tabs(): with gr.Tab("🚀 Research Center"): with gr.Row(): run_btn = gr.Button("🚀 Run Research Cycle", variant="primary") discover_btn = gr.Button("🔍 Discover Question", variant="secondary") dev_chk = gr.Checkbox(label="Developer Mode", value=False) sync_chk = gr.Checkbox(label="Sync to HF Dataset", value=True) cycle_out = gr.Markdown("### Click button to start research...") with gr.Accordion("🔍 Manual Q&A", open=False): q_input = gr.Textbox( label="Ask a nuclear question", placeholder="e.g. How do molten salt reactors handle tritium breeding?", ) q_btn = gr.Button("Search & Answer") q_out = gr.Markdown() with gr.Tab("⛓️ Blockchain"): verify_btn = gr.Button("⛓️ Verify Ledger Integrity") verify_out = gr.Markdown() gr.Markdown("### Latest Transactions") chain_table = gr.DataFrame(get_chain_df) with gr.Tab("🕸️ Knowledge Graph"): search_input = gr.Textbox(label="Search", placeholder="e.g. fusion, reactor, safety") limit_input = gr.Slider(label="Limit", minimum=1, maximum=50, value=10, step=1) search_btn = gr.Button("Search") search_out = gr.Markdown() gr.Markdown("### Latest Entities") entities_table = gr.DataFrame(get_entities_df) with gr.Tab("📈 Analytics"): with gr.Row(): chart1 = gr.Plot(get_category_chart) chart2 = gr.Plot(get_score_chart) gr.Markdown("### Recent Cycles") history_table = gr.DataFrame(get_history_df) # Event handlers refresh_stats.click(get_system_stats, outputs=stats_box) refresh_llm.click(get_llm_status, outputs=llm_status) run_btn.click(run_cycle, inputs=[dev_chk, sync_chk], outputs=cycle_out) discover_btn.click(discover_question, outputs=cycle_out) q_btn.click(ask_q, inputs=[q_input, dev_chk], outputs=q_out) verify_btn.click(verify_chain, outputs=verify_out) search_btn.click(search_kg, inputs=[search_input, limit_input], outputs=search_out) export_btn.click(export_state, outputs=export_out) # Auto-refresh stats every 30s (gr.Timer was added in 4.40+, guard it) try: timer = gr.Timer(30) timer.tick(get_system_stats, outputs=stats_box) timer.tick(get_chain_df, outputs=chain_table) timer.tick(get_entities_df, outputs=entities_table) timer.tick(get_history_df, outputs=history_table) except (AttributeError, TypeError): # Gradio < 4.40: no Timer; auto-refresh via load-event poll instead. logger.warning("gr.Timer unavailable (Gradio < 4.40) — auto-refresh disabled, use 🔄 buttons.") # ═══════════════════════════════════════════════════════════════════ # ALWAYS-ON: background autonomous loop (thread, headless) # ═══════════════════════════════════════════════════════════════════ _autonomous_thread_started = False def _autonomous_loop(): """Background autonomous research loop. Runs every INTERVAL minutes.""" interval_seconds = int(os.getenv("OPERATION_LOOP_INTERVAL_MINUTES", "25")) * 60 logger.info(f"🤖 Autonomous loop started (interval={interval_seconds}s)") # Initial warm-up delay so Space finishes starting first time.sleep(30) while True: try: logger.info("🤖 Auto-cycle: starting research cycle") res = core.run_cycle(dev_mode=False) if core else {"minted": False, "error": "no core"} if res.get("minted"): logger.info(f"✅ Auto-cycle: MINTED tx={res.get('tx_hash')} overall={res.get('overall')}") # Sync to HF dataset & push to GitHub try: sync_to_hf_dataset(res) except Exception as e: logger.warning(f"HF dataset sync failed: {e}") else: logger.info(f"⏭️ Auto-cycle: rejected (overall={res.get('overall', 0)}) — trying again next interval") except Exception as e: logger.error(f"Auto-cycle error: {e}") time.sleep(interval_seconds) if __name__ == "__main__": if demo is not None: # Always-on autonomous loop in background if os.getenv("AUTO_START_LOOP", "true").lower() == "true" and not _autonomous_thread_started: try: t = threading.Thread(target=_autonomous_loop, daemon=True, name="ni-autonomous") t.start() _autonomous_thread_started = True logger.info("✅ Background autonomous loop launched") except Exception as e: logger.warning(f"Could not start autonomous loop: {e}") try: demo.launch(server_name="0.0.0.0", server_port=PORT, css=CSS) except TypeError: # Gradio 6.0+ deprecated css in launch() too; pass only valid kwargs demo.launch(server_name="0.0.0.0", server_port=PORT) else: # Gradio unavailable — keep the Space alive with a tiny health HTTP server print("⚠️ Gradio not available; cannot launch UI. Starting minimal health server.") try: from http.server import HTTPServer, BaseHTTPRequestHandler import json as _json class _Health(BaseHTTPRequestHandler): def log_message(self, *a, **k): pass def do_GET(self): if self.path == "/" or self.path == "/health": body = _json.dumps({ "status": "ok", "name": "Nuclear Intelligence", "version": "5.0", "gradio_available": False, "core_initialized": core is not None, }).encode() self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) else: self.send_response(404) self.end_headers() srv = HTTPServer(("0.0.0.0", PORT), _Health) logger.info(f"Health server on :{PORT}") srv.serve_forever() except Exception as e: print(f"Health server failed: {e}")