Spaces:
Sleeping
Sleeping
| import time | |
| import hashlib | |
| import json | |
| import re | |
| from pathlib import Path | |
| from collections import defaultdict | |
| import math | |
| import threading | |
| import queue | |
| import gradio as gr | |
| import networkx as nx | |
| import numpy as np | |
| from scipy import sparse | |
| from scipy.sparse import linalg | |
| import sympy as sp | |
| # --- Dependency Check & Hardware Imports --- | |
| try: | |
| from pypdf import PdfReader | |
| from docx import Document | |
| except ImportError as e: | |
| print(f"[WARNING]: Missing dependencies for file parsing: {e}") | |
| # --- Hardware Acceleration Subsystem --- | |
| try: | |
| import torch | |
| HAS_TORCH = True | |
| except ImportError: | |
| HAS_TORCH = False | |
| class HardwareAccelerator: | |
| def __init__(self): | |
| self.device_name = "CPU (SciPy Optimized)" | |
| self.device = None | |
| self.enabled = False | |
| # Only try to load CUDA if Torch is actually present AND CUDA is available | |
| if HAS_TORCH and torch.cuda.is_available(): | |
| try: | |
| self.device = torch.device("cuda") | |
| self.device_name = f"NVIDIA GPU (CUDA) - {torch.cuda.get_device_name(0)}" | |
| self.enabled = True | |
| except: | |
| self.enabled = False | |
| def compute_eigenvector_centrality(self, logic_map, tol=1e-06, max_iter=1000): | |
| """ | |
| Calculates centrality. Defaults to SciPy (CPU fast path) if GPU is unavailable. | |
| """ | |
| if not logic_map: return {} | |
| nodes = list(logic_map.keys()) | |
| node_to_idx = {node: i for i, node in enumerate(nodes)} | |
| n = len(nodes) | |
| # Build Data for Matrix | |
| row, col, data = [], [], [] | |
| for u, neighbors in logic_map.items(): | |
| u_idx = node_to_idx[u] | |
| for v, weight in neighbors.items(): | |
| if v in node_to_idx: | |
| v_idx = node_to_idx[v] | |
| row.append(u_idx) | |
| col.append(v_idx) | |
| data.append(float(weight)) | |
| if not data: return {node: 0.0 for node in nodes} | |
| # --- PATH A: GPU ACCELERATION (Only if Torch + CUDA active) --- | |
| if self.enabled: | |
| try: | |
| i = torch.LongTensor([row, col]).to(self.device) | |
| v = torch.FloatTensor(data).to(self.device) | |
| adj_matrix = torch.sparse_coo_tensor(i, v, (n, n)).to(self.device) | |
| x = torch.ones((n, 1), device=self.device) / n | |
| for _ in range(max_iter): | |
| x_prev = x.clone() | |
| x = torch.sparse.mm(adj_matrix, x) | |
| norm = torch.norm(x) | |
| if norm == 0: break | |
| x = x / norm | |
| if torch.norm(x - x_prev) < tol: | |
| break | |
| scores = x.flatten().cpu().numpy().tolist() | |
| return {nodes[i]: float(scores[i]) for i in range(n)} | |
| except Exception as e: | |
| print(f"GPU Math Error: {e}. Switching to SciPy.") | |
| self.enabled = False | |
| # Fall through to Path B | |
| # --- PATH B: SCIPY SPARSE (Fast CPU) --- | |
| # This is the default path for Spaces without GPU | |
| try: | |
| adj_sparse = sparse.csr_matrix((data, (row, col)), shape=(n, n)) | |
| # Use eigs for large matrices, or simple power iteration if very small | |
| if n > 5: | |
| eigenvalues, eigenvectors = linalg.eigs(adj_sparse, k=1, which='LR', tol=tol, maxiter=max_iter) | |
| scores = np.abs(eigenvectors.flatten()) | |
| else: | |
| # Simple fallback for tiny graphs where ARPACK might complain | |
| return nx.eigenvector_centrality(nx.Graph(list(zip(nodes, nodes))), max_iter=max_iter) # Dummy fallback | |
| norm = np.sum(scores) | |
| if norm > 0: scores = scores / norm | |
| return {nodes[i]: float(scores[i]) for i in range(n)} | |
| except: | |
| # Final Safety Net: NetworkX | |
| G = nx.Graph() | |
| for i in range(len(data)): | |
| G.add_edge(nodes[row[i]], nodes[col[i]], weight=data[i]) | |
| try: | |
| return nx.eigenvector_centrality(G, max_iter=max_iter, tol=tol) | |
| except: | |
| return nx.degree_centrality(G) | |
| # --- Memory Subsystem --- | |
| class ProtogenMemory: | |
| def __init__(self, protogen_root_path: Path): | |
| self.protogen_root_path = protogen_root_path | |
| self.protogen_root_path.mkdir(parents=True, exist_ok=True) | |
| self.paths = { | |
| "memory": self.protogen_root_path / "memory_core.json", | |
| "ontology": self.protogen_root_path / "ontology_sqt.json", | |
| } | |
| self._initialize_storage() | |
| self.core_state = self._load_json(self.paths["memory"]) | |
| self.ontology_data = self._load_json(self.paths["ontology"]) | |
| def _initialize_storage(self): | |
| defaults = { | |
| "memory": {}, | |
| "ontology": { | |
| "logic_map": {}, "symbols": {}, "reasoning_patterns": [], | |
| "graph_metrics": {"eigenvector_centrality": {}, "shannon_entropy": 0.0}, | |
| } | |
| } | |
| for key, path in self.paths.items(): | |
| if not path.exists(): | |
| with open(path, 'w', encoding='utf-8') as f: json.dump(defaults[key], f) | |
| def _load_json(self, path): | |
| try: | |
| with open(path, 'r', encoding='utf-8') as f: return json.load(f) | |
| except: return {} | |
| def _save_json(self, data, path): | |
| with open(path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4) | |
| def load_core_state(self): return self.core_state | |
| def save_core_state(self, state): | |
| self.core_state.update(state) | |
| self._save_json(self.core_state, self.paths["memory"]) | |
| def load_ontology(self): return self.ontology_data | |
| def save_ontology(self, ontology): | |
| self.ontology_data = ontology | |
| self._save_json(self.ontology_data, self.paths["ontology"]) | |
| # --- Main Operative Class --- | |
| class OperativeProtogen: | |
| def __init__(self, root_dir="protogen_core"): | |
| self.root = Path(root_dir) | |
| self.library_path = self.root / "library" | |
| self.library_path.mkdir(parents=True, exist_ok=True) | |
| self.accelerator = HardwareAccelerator() | |
| self.memory_manager = ProtogenMemory(self.root) | |
| self.core_state = self.memory_manager.load_core_state() | |
| if not self.core_state: | |
| self._initial_genesis() | |
| self.core_state = self.memory_manager.load_core_state() | |
| self.identity_hash = self.core_state.get("identity", {}).get("hash", "UNKNOWN") | |
| self.thresholds = self.core_state.get("thresholds", { | |
| "min_token_len": 3, "reflection_trigger": 2, "shannon_entropy_threshold": 16.0, | |
| "eigenvector_threshold": 0.001, "axiom_alignment_threshold": 0.5 | |
| }) | |
| self.ontology = self.memory_manager.load_ontology() | |
| self.logic_map = self.ontology.get("logic_map", {}) | |
| self.symbols = self.ontology.get("symbols", {}) | |
| self.reasoning_patterns = self.ontology.get("reasoning_patterns", []) | |
| self.graph_metrics = self.ontology.get("graph_metrics", {"eigenvector_centrality": {}, "shannon_entropy": 0.0}) | |
| self.symbolic_anchors = {} | |
| self.lock = threading.Lock() | |
| self.is_syncing = threading.Event() | |
| self.sync_thread = threading.Thread(target=self._autonomic_sync_loop, daemon=True) | |
| self.sync_thread.start() | |
| def _initial_genesis(self): | |
| genesis_state = { | |
| "identity": {"hash": hashlib.sha256(str(time.time_ns()).encode()).hexdigest()}, | |
| "thresholds": { | |
| "min_token_len": 3, "reflection_trigger": 2, "shannon_entropy_threshold": 16.0, | |
| "eigenvector_threshold": 0.001, "axiom_alignment_threshold": 0.5 | |
| } | |
| } | |
| self.memory_manager.save_core_state(genesis_state) | |
| def _save_memory(self): | |
| with self.lock: | |
| self.ontology.update({ | |
| "logic_map": self.logic_map, "symbols": self.symbols, | |
| "reasoning_patterns": self.reasoning_patterns, | |
| "graph_metrics": self.graph_metrics | |
| }) | |
| self.memory_manager.save_ontology(self.ontology) | |
| def _calculate_shannon_entropy(self, text=None) -> float: | |
| if text: | |
| clean_text = re.sub(r'[^\w\s]', '', text.lower()) | |
| words = clean_text.split() | |
| if not words: return 0.0 | |
| word_counts = defaultdict(int) | |
| for w in words: word_counts[w] += 1 | |
| total = len(words) | |
| else: | |
| with self.lock: | |
| if not self.logic_map: return 0.0 | |
| word_counts = defaultdict(int) | |
| for w, n in self.logic_map.items(): | |
| word_counts[w] += sum(n.values()) | |
| total = sum(word_counts.values()) | |
| if total == 0: return 0.0 | |
| entropy = 0.0 | |
| for count in word_counts.values(): | |
| p = count / total | |
| entropy -= p * math.log2(p) | |
| return entropy | |
| def _autonomic_sync_loop(self): | |
| while True: | |
| self.is_syncing.set() | |
| self.sync() | |
| self.is_syncing.clear() | |
| time.sleep(30) | |
| def sync(self): | |
| # 1. Update Graph Metrics | |
| centrality = self.accelerator.compute_eigenvector_centrality(self.logic_map) | |
| with self.lock: | |
| self.graph_metrics["eigenvector_centrality"] = centrality | |
| self.graph_metrics["shannon_entropy"] = self._calculate_shannon_entropy() | |
| # 2. SYMPY Integration | |
| for node, score in centrality.items(): | |
| if score > self.thresholds.get("axiom_alignment_threshold", 0.5): | |
| if node not in self.symbolic_anchors: | |
| # Create valid sympy symbol from node string | |
| clean_sym = re.sub(r'[^a-zA-Z0-9]', '_', node) | |
| if clean_sym: | |
| self.symbolic_anchors[node] = sp.Symbol(clean_sym) | |
| self._save_memory() | |
| def process_file_live(self, file_obj): | |
| # Gradio passes a NamedString or similar object, use .name for path | |
| try: | |
| fp = Path(file_obj.name) | |
| content = "" | |
| if fp.suffix == ".txt": content = fp.read_text(encoding='utf-8', errors='ignore') | |
| elif fp.suffix == ".pdf": | |
| r = PdfReader(fp) | |
| for p in r.pages: content += p.extract_text() + " " | |
| elif fp.suffix == ".docx": | |
| d = Document(fp) | |
| for p in d.paragraphs: content += p.text + " " | |
| if content: | |
| input_entropy = self._calculate_shannon_entropy(content) | |
| if input_entropy > self.thresholds.get("shannon_entropy_threshold", 16.0): | |
| return f"Error: High Entropy ({input_entropy:.2f})." | |
| self._process_text_content(content) | |
| return f"Success: Processed {fp.name}." | |
| return "Error: Empty file." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def _process_text_content(self, content): | |
| clean_content = re.sub(r'[^\w\s]', '', content.lower()) | |
| words = [t for t in clean_content.split() if len(t) > self.thresholds.get("min_token_len", 3)] | |
| if not words: return | |
| with self.lock: | |
| for i in range(len(words)-1): | |
| w1, w2 = words[i], words[i+1] | |
| if w1 not in self.logic_map: self.logic_map[w1] = {} | |
| self.logic_map[w1][w2] = self.logic_map[w1].get(w2, 0) + 1 | |
| def chat(self, user_in): | |
| ent = self._calculate_shannon_entropy(user_in) | |
| if ent > self.thresholds.get("shannon_entropy_threshold", 16.0): | |
| return f"[SYSTEM]: Entropy rejection ({ent:.2f})." | |
| self._process_text_content(user_in) | |
| resp = f"[{self.accelerator.device_name}]: " | |
| with self.lock: | |
| clean_in = re.sub(r'[^\w\s]', '', user_in.lower()).split() | |
| matches = [w for w in clean_in if w in self.symbolic_anchors] | |
| if matches: | |
| resp += f"Symbolic Anchor: '{matches[0]}'. " | |
| words = [w for w in clean_in if w in self.logic_map] | |
| if words: | |
| associations = self.logic_map[words[0]] | |
| if associations: | |
| best = max(associations.items(), key=lambda x: x[1])[0] | |
| resp += f"Logic link: '{words[0]}' -> '{best}'. " | |
| return resp + "Input integrated." | |
| # --- Gradio Interface --- | |
| protogen = OperativeProtogen() | |
| def protogen_chat(message, history): | |
| return protogen.chat(message) | |
| def handle_file_upload(files): | |
| results = [] | |
| if files: | |
| for file in files: | |
| status = protogen.process_file_live(file) | |
| results.append(status) | |
| return "\n".join(results) | |
| def get_stats(): | |
| # Helper to safely serialize stats | |
| try: | |
| ent = protogen.graph_metrics.get('shannon_entropy', 0.0) | |
| return { | |
| "Identity": protogen.identity_hash[:8], | |
| "Nodes": len(protogen.logic_map), | |
| "Symbolic Anchors": len(protogen.symbolic_anchors), | |
| "Entropy": f"{ent:.2f}", | |
| "Math Engine": protogen.accelerator.device_name | |
| } | |
| except: | |
| return {"Status": "Initializing..."} | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# PROTOGEN V4.0.9 - Scientific Operative Interface") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| gr.ChatInterface(fn=protogen_chat, title="Architect Link") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Live Ingestion") | |
| file_output = gr.Textbox(label="Status", interactive=False) | |
| upload_btn = gr.File(label="Upload", file_count="multiple") | |
| upload_btn.upload(fn=handle_file_upload, inputs=upload_btn, outputs=file_output) | |
| gr.Markdown("### Math Telemetry") | |
| stats_display = gr.JSON(value=get_stats, label="System Metrics") | |
| refresh_btn = gr.Button("Refresh Telemetry") | |
| refresh_btn.click(fn=get_stats, outputs=stats_display) | |
| if __name__ == "__main__": | |
| demo.launch() |