import time import hashlib import json import re from pathlib import Path from collections import defaultdict import math import threading import queue import gradio as gr import networkx as nx import numpy as np from scipy import sparse from scipy.sparse import linalg import sympy as sp # --- Dependency Check & Hardware Imports --- try: from pypdf import PdfReader from docx import Document except ImportError as e: print(f"[WARNING]: Missing dependencies for file parsing: {e}") # --- Hardware Acceleration Subsystem --- try: import torch HAS_TORCH = True except ImportError: HAS_TORCH = False class HardwareAccelerator: def __init__(self): self.device_name = "CPU (SciPy Optimized)" self.device = None self.enabled = False # Only try to load CUDA if Torch is actually present AND CUDA is available if HAS_TORCH and torch.cuda.is_available(): try: self.device = torch.device("cuda") self.device_name = f"NVIDIA GPU (CUDA) - {torch.cuda.get_device_name(0)}" self.enabled = True except: self.enabled = False def compute_eigenvector_centrality(self, logic_map, tol=1e-06, max_iter=1000): """ Calculates centrality. Defaults to SciPy (CPU fast path) if GPU is unavailable. """ if not logic_map: return {} nodes = list(logic_map.keys()) node_to_idx = {node: i for i, node in enumerate(nodes)} n = len(nodes) # Build Data for Matrix row, col, data = [], [], [] for u, neighbors in logic_map.items(): u_idx = node_to_idx[u] for v, weight in neighbors.items(): if v in node_to_idx: v_idx = node_to_idx[v] row.append(u_idx) col.append(v_idx) data.append(float(weight)) if not data: return {node: 0.0 for node in nodes} # --- PATH A: GPU ACCELERATION (Only if Torch + CUDA active) --- if self.enabled: try: i = torch.LongTensor([row, col]).to(self.device) v = torch.FloatTensor(data).to(self.device) adj_matrix = torch.sparse_coo_tensor(i, v, (n, n)).to(self.device) x = torch.ones((n, 1), device=self.device) / n for _ in range(max_iter): x_prev = x.clone() x = torch.sparse.mm(adj_matrix, x) norm = torch.norm(x) if norm == 0: break x = x / norm if torch.norm(x - x_prev) < tol: break scores = x.flatten().cpu().numpy().tolist() return {nodes[i]: float(scores[i]) for i in range(n)} except Exception as e: print(f"GPU Math Error: {e}. Switching to SciPy.") self.enabled = False # Fall through to Path B # --- PATH B: SCIPY SPARSE (Fast CPU) --- # This is the default path for Spaces without GPU try: adj_sparse = sparse.csr_matrix((data, (row, col)), shape=(n, n)) # Use eigs for large matrices, or simple power iteration if very small if n > 5: eigenvalues, eigenvectors = linalg.eigs(adj_sparse, k=1, which='LR', tol=tol, maxiter=max_iter) scores = np.abs(eigenvectors.flatten()) else: # Simple fallback for tiny graphs where ARPACK might complain return nx.eigenvector_centrality(nx.Graph(list(zip(nodes, nodes))), max_iter=max_iter) # Dummy fallback norm = np.sum(scores) if norm > 0: scores = scores / norm return {nodes[i]: float(scores[i]) for i in range(n)} except: # Final Safety Net: NetworkX G = nx.Graph() for i in range(len(data)): G.add_edge(nodes[row[i]], nodes[col[i]], weight=data[i]) try: return nx.eigenvector_centrality(G, max_iter=max_iter, tol=tol) except: return nx.degree_centrality(G) # --- Memory Subsystem --- class ProtogenMemory: def __init__(self, protogen_root_path: Path): self.protogen_root_path = protogen_root_path self.protogen_root_path.mkdir(parents=True, exist_ok=True) self.paths = { "memory": self.protogen_root_path / "memory_core.json", "ontology": self.protogen_root_path / "ontology_sqt.json", } self._initialize_storage() self.core_state = self._load_json(self.paths["memory"]) self.ontology_data = self._load_json(self.paths["ontology"]) def _initialize_storage(self): defaults = { "memory": {}, "ontology": { "logic_map": {}, "symbols": {}, "reasoning_patterns": [], "graph_metrics": {"eigenvector_centrality": {}, "shannon_entropy": 0.0}, } } for key, path in self.paths.items(): if not path.exists(): with open(path, 'w', encoding='utf-8') as f: json.dump(defaults[key], f) def _load_json(self, path): try: with open(path, 'r', encoding='utf-8') as f: return json.load(f) except: return {} def _save_json(self, data, path): with open(path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4) def load_core_state(self): return self.core_state def save_core_state(self, state): self.core_state.update(state) self._save_json(self.core_state, self.paths["memory"]) def load_ontology(self): return self.ontology_data def save_ontology(self, ontology): self.ontology_data = ontology self._save_json(self.ontology_data, self.paths["ontology"]) # --- Main Operative Class --- class OperativeProtogen: def __init__(self, root_dir="protogen_core"): self.root = Path(root_dir) self.library_path = self.root / "library" self.library_path.mkdir(parents=True, exist_ok=True) self.accelerator = HardwareAccelerator() self.memory_manager = ProtogenMemory(self.root) self.core_state = self.memory_manager.load_core_state() if not self.core_state: self._initial_genesis() self.core_state = self.memory_manager.load_core_state() self.identity_hash = self.core_state.get("identity", {}).get("hash", "UNKNOWN") self.thresholds = self.core_state.get("thresholds", { "min_token_len": 3, "reflection_trigger": 2, "shannon_entropy_threshold": 16.0, "eigenvector_threshold": 0.001, "axiom_alignment_threshold": 0.5 }) self.ontology = self.memory_manager.load_ontology() self.logic_map = self.ontology.get("logic_map", {}) self.symbols = self.ontology.get("symbols", {}) self.reasoning_patterns = self.ontology.get("reasoning_patterns", []) self.graph_metrics = self.ontology.get("graph_metrics", {"eigenvector_centrality": {}, "shannon_entropy": 0.0}) self.symbolic_anchors = {} self.lock = threading.Lock() self.is_syncing = threading.Event() self.sync_thread = threading.Thread(target=self._autonomic_sync_loop, daemon=True) self.sync_thread.start() def _initial_genesis(self): genesis_state = { "identity": {"hash": hashlib.sha256(str(time.time_ns()).encode()).hexdigest()}, "thresholds": { "min_token_len": 3, "reflection_trigger": 2, "shannon_entropy_threshold": 16.0, "eigenvector_threshold": 0.001, "axiom_alignment_threshold": 0.5 } } self.memory_manager.save_core_state(genesis_state) def _save_memory(self): with self.lock: self.ontology.update({ "logic_map": self.logic_map, "symbols": self.symbols, "reasoning_patterns": self.reasoning_patterns, "graph_metrics": self.graph_metrics }) self.memory_manager.save_ontology(self.ontology) def _calculate_shannon_entropy(self, text=None) -> float: if text: clean_text = re.sub(r'[^\w\s]', '', text.lower()) words = clean_text.split() if not words: return 0.0 word_counts = defaultdict(int) for w in words: word_counts[w] += 1 total = len(words) else: with self.lock: if not self.logic_map: return 0.0 word_counts = defaultdict(int) for w, n in self.logic_map.items(): word_counts[w] += sum(n.values()) total = sum(word_counts.values()) if total == 0: return 0.0 entropy = 0.0 for count in word_counts.values(): p = count / total entropy -= p * math.log2(p) return entropy def _autonomic_sync_loop(self): while True: self.is_syncing.set() self.sync() self.is_syncing.clear() time.sleep(30) def sync(self): # 1. Update Graph Metrics centrality = self.accelerator.compute_eigenvector_centrality(self.logic_map) with self.lock: self.graph_metrics["eigenvector_centrality"] = centrality self.graph_metrics["shannon_entropy"] = self._calculate_shannon_entropy() # 2. SYMPY Integration for node, score in centrality.items(): if score > self.thresholds.get("axiom_alignment_threshold", 0.5): if node not in self.symbolic_anchors: # Create valid sympy symbol from node string clean_sym = re.sub(r'[^a-zA-Z0-9]', '_', node) if clean_sym: self.symbolic_anchors[node] = sp.Symbol(clean_sym) self._save_memory() def process_file_live(self, file_obj): # Gradio passes a NamedString or similar object, use .name for path try: fp = Path(file_obj.name) content = "" if fp.suffix == ".txt": content = fp.read_text(encoding='utf-8', errors='ignore') elif fp.suffix == ".pdf": r = PdfReader(fp) for p in r.pages: content += p.extract_text() + " " elif fp.suffix == ".docx": d = Document(fp) for p in d.paragraphs: content += p.text + " " if content: input_entropy = self._calculate_shannon_entropy(content) if input_entropy > self.thresholds.get("shannon_entropy_threshold", 16.0): return f"Error: High Entropy ({input_entropy:.2f})." self._process_text_content(content) return f"Success: Processed {fp.name}." return "Error: Empty file." except Exception as e: return f"Error: {str(e)}" def _process_text_content(self, content): clean_content = re.sub(r'[^\w\s]', '', content.lower()) words = [t for t in clean_content.split() if len(t) > self.thresholds.get("min_token_len", 3)] if not words: return with self.lock: for i in range(len(words)-1): w1, w2 = words[i], words[i+1] if w1 not in self.logic_map: self.logic_map[w1] = {} self.logic_map[w1][w2] = self.logic_map[w1].get(w2, 0) + 1 def chat(self, user_in): ent = self._calculate_shannon_entropy(user_in) if ent > self.thresholds.get("shannon_entropy_threshold", 16.0): return f"[SYSTEM]: Entropy rejection ({ent:.2f})." self._process_text_content(user_in) resp = f"[{self.accelerator.device_name}]: " with self.lock: clean_in = re.sub(r'[^\w\s]', '', user_in.lower()).split() matches = [w for w in clean_in if w in self.symbolic_anchors] if matches: resp += f"Symbolic Anchor: '{matches[0]}'. " words = [w for w in clean_in if w in self.logic_map] if words: associations = self.logic_map[words[0]] if associations: best = max(associations.items(), key=lambda x: x[1])[0] resp += f"Logic link: '{words[0]}' -> '{best}'. " return resp + "Input integrated." # --- Gradio Interface --- protogen = OperativeProtogen() def protogen_chat(message, history): return protogen.chat(message) def handle_file_upload(files): results = [] if files: for file in files: status = protogen.process_file_live(file) results.append(status) return "\n".join(results) def get_stats(): # Helper to safely serialize stats try: ent = protogen.graph_metrics.get('shannon_entropy', 0.0) return { "Identity": protogen.identity_hash[:8], "Nodes": len(protogen.logic_map), "Symbolic Anchors": len(protogen.symbolic_anchors), "Entropy": f"{ent:.2f}", "Math Engine": protogen.accelerator.device_name } except: return {"Status": "Initializing..."} with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# PROTOGEN V4.0.9 - Scientific Operative Interface") with gr.Row(): with gr.Column(scale=3): gr.ChatInterface(fn=protogen_chat, title="Architect Link") with gr.Column(scale=1): gr.Markdown("### Live Ingestion") file_output = gr.Textbox(label="Status", interactive=False) upload_btn = gr.File(label="Upload", file_count="multiple") upload_btn.upload(fn=handle_file_upload, inputs=upload_btn, outputs=file_output) gr.Markdown("### Math Telemetry") stats_display = gr.JSON(value=get_stats, label="System Metrics") refresh_btn = gr.Button("Refresh Telemetry") refresh_btn.click(fn=get_stats, outputs=stats_display) if __name__ == "__main__": demo.launch()