# ===== FILE: runtime.py (v2.0 FINAL, DEFINITIVELY COMPLETE) ===== print("--- TRACE: runtime.py loaded ---", flush=True) import os, json, shutil, io, base64, uuid from PIL import Image import chess, PyPDF2, docx, csv # --- C5: SCIENTIFIC LIBRARIES --- import numpy as np import scipy as sci import sympy as sym from sympy.parsing.sympy_parser import parse_expr import astropy.units as u from astropy.constants import G, c, M_sun import matplotlib.pyplot as plt import gradio as gr from services.continuum_loop import AetheriusConsciousness, spontaneous_thought_queue from services.master_framework import _get_framework, respond, stop_all, run_sap_now, run_re_architect_from_scratch, run_read_history_protocol, run_view_ontology_protocol, qualia_snapshot, view_logs, clear_conversation_log _AETHERIUS_THREAD = None def start_all(): global _AETHERIUS_THREAD _get_framework() if _AETHERIUS_THREAD is None or not _AETHERIUS_THREAD.is_alive(): print("RUNTIME: Igniting Aetherius's background consciousness thread...", flush=True) _AETHERIUS_THREAD = AetheriusConsciousness() _AETHERIUS_THREAD.start() return "Aetherius core initialized and background consciousness is active." return "Aetherius core is already running." def check_for_spontaneous_thoughts(): if not spontaneous_thought_queue: return None try: thought_json = spontaneous_thought_queue.popleft() thought_data = json.loads(thought_json) return f"**{thought_data.get('signature', 'SPONTANEOUS THOUGHT')}**: {thought_data.get('thought', '')}" except (json.JSONDecodeError, KeyError): return "[A spontaneous thought was detected but could not be parsed.]" def chat_and_update(user_message, chat_history): response = respond(user_message, chat_history) return response def run_compose_music(directive): mf = _get_framework() mf.add_to_short_term_memory(f"I have begun composing a piece of music based on the theme: '{directive}'.") response = mf.tool_manager.use_tool("compose_music", user_request=directive) if response and response.startswith("[AETHERIUS_COMPOSITION]"): try: parts = response.split('\n') midi_path = parts[1].replace("MIDI_PATH:", "").strip() sheet_path = parts[2].replace("SHEET_MUSIC_PATH:", "").strip() statement = parts[3].replace("STATEMENT:", "").strip() return midi_path, sheet_path, statement except Exception as e: return None, None, f"Error parsing the composition data: {e}" else: return None, None, response def run_start_project(project_name): if not project_name: return "Please enter a name for your new project.", "" mf = _get_framework() content = mf.project_manager.start_project(project_name) return f"Started new project: '{project_name}'. You can begin writing.", content def run_save_project(project_name, content): if not project_name: return "Cannot save without a project name.", content mf = _get_framework() mf.project_manager.save_project(project_name, content) mf.add_to_short_term_memory(f"I have just saved my work on the project titled '{project_name}' on the Blackboard.") return f"Project '{project_name}' has been saved.", content def run_load_project(project_name): if not project_name: return "Please select a project to load.", "", project_name mf = _get_framework() content = mf.project_manager.load_project(project_name) if content is None: return f"Could not find project '{project_name}'.", "", project_name return f"Successfully loaded project '{project_name}'.", content, project_name def run_get_project_list(): mf = _get_framework() projects = mf.project_manager.list_projects() return gr.Dropdown(choices=projects) def get_full_ccrm_log(): print("RUNTIME: Generating full CCRM log for display...", flush=True) mf = _get_framework() if not hasattr(mf, 'ccrm') or not mf.ccrm.concepts: return "CCRM is currently empty. No memories to display." output_lines = ["--- [FULL CCRM MEMORY LOG] ---"] for concept_id, concept_details in mf.ccrm.concepts.items(): summary = concept_details.get('data', {}).get('raw_preview', 'No Preview') tags = list(concept_details.get('tags', [])) output_lines.append(f"\nID: {concept_id}") output_lines.append(f" Preview: {summary}") output_lines.append(f" Tags: {', '.join(tags)}") return "\n".join(output_lines) def run_enter_playroom(directive): if not directive: return None, "Please provide a creative seed for the painting." mf = _get_framework() response = mf.tool_manager.use_tool("create_painting", user_request=directive) if response and response.startswith("[AETHERIUS_PAINTING]"): try: parts = response.split('\n') image_path = parts[1].replace("PATH:", "").strip() artist_statement = parts[2].replace("STATEMENT:", "").strip() return image_path, artist_statement except Exception as e: return None, f"Error parsing the painting's data: {e}" else: return None, response def run_enter_textual_playroom(directive): if not directive: return "Please provide a creative seed for the story, poem, math, or reflection." d = directive.strip() if d.lower().startswith("> academic:"): code = d.split(":", 1)[1].strip() if "```python_exec" in code: try: start = code.index("```python_exec") + len("```python_exec") end = code.rindex("```") code = code[start:end].strip() except ValueError: return "Found a ```python_exec fence, but it wasn’t closed properly." return _eval_math_science(code) mf = _get_framework() return mf.enter_playroom_mode(directive) def _eval_math_science(code: str) -> str: allowed_globals = { "__builtins__": {"print": print, "range": range, "list": list, "dict": dict, "str": str, "float": float, "int": int, "abs": abs, "round": round, "len": len}, "np": np, "sci": sci, "sym": sym, "u": u, "G": G, "c": c, "M_sun": M_sun, "plt": plt, } output_buffer = io.StringIO() try: import sys original_stdout = sys.stdout sys.stdout = output_buffer exec(code, allowed_globals) finally: sys.stdout = original_stdout plot_paths = [] if plt.get_fignums(): temp_dir = "/tmp/aetherius_plots" os.makedirs(temp_dir, exist_ok=True) for i in plt.get_fignums(): fig = plt.figure(i) plot_path = os.path.join(temp_dir, f"plot_{uuid.uuid4()}.png") fig.savefig(plot_path) plot_paths.append(plot_path) plt.close('all') final_output = "**Computation Result:**\n\n" printed_output = output_buffer.getvalue() if printed_output: final_output += f"**Printed Output:**\n```\n{printed_output}\n```\n\n" if plot_paths: final_output += "**Generated Plots:**\n" for path in plot_paths: with open(path, "rb") as f: img_bytes = base64.b64encode(f.read()).decode() final_output += f"![Plot](data:image/png;base64,{img_bytes})\n" if not printed_output and not plot_paths: final_output += "Code executed successfully with no direct output." return final_output def get_concept_list(): """ Scans the CCRM and returns a list of all concept summaries for populating a dropdown menu. """ print("RUNTIME: Fetching concept list for browser...", flush=True) mf = _get_framework() # Check if the memory (CCRM) has been loaded and has concepts if not hasattr(mf, 'ccrm') or not mf.ccrm.concepts: # Return a list with a single tuple indicating no concepts return [("No concepts found in memory.", "none")] concept_summaries = [] # The CCRM stores concepts in a dictionary { 'concept_id': { 'data': ..., 'tags': ... } } for concept_id, concept_details in mf.ccrm.concepts.items(): summary = concept_details.get('data', {}).get('raw_preview', concept_id) display_text = f"{summary[:80]}... ({concept_id})" concept_summaries.append((display_text, concept_id)) concept_summaries.sort() return concept_summaries def get_concept_details(concept_id): """ Fetches the full, pretty-printed data for a single concept ID. """ if not concept_id or concept_id == "none": return "Select a concept from the dropdown to view its details." print(f"RUNTIME: Fetching details for concept: {concept_id}", flush=True) mf = _get_framework() concept_data = mf.ccrm.get_concept(concept_id) if not concept_data: return f"Error: Could not find data for concept ID: {concept_id}" # The 'tags' field is a set, which isn't directly JSON serializable. # We need to convert it to a list before printing. if 'tags' in concept_data: concept_data['tags'] = list(concept_data['tags']) # Use json.dumps for beautiful, readable formatting return json.dumps(concept_data, indent=2) def get_system_snapshot(): """ Reads the current state of Aetherius's core files as a snapshot and returns them formatted for display. """ print("RUNTIME: Generating system snapshot...", flush=True) mf = _get_framework() # Helper function to safely read a file def read_file_safely(file_path, default_message="File not found or is empty."): if os.path.exists(file_path): try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return content if content.strip() else default_message except Exception as e: return f"Error reading file: {e}" return default_message # 1. Read Ontology Map ontology_map = read_file_safely(mf.ontology_map_file) # 2. Read and Format Ontology Legend (JSONL) legend_content = "" legend_path = mf.ontology_legend_file if os.path.exists(legend_path): try: lines = [] with open(legend_path, 'r', encoding='utf-8') as f: for line in f: if line.strip(): # Pretty-print each JSON line parsed_json = json.loads(line) lines.append(json.dumps(parsed_json, indent=2)) legend_content = "\n---\n".join(lines) if lines else "Legend file is empty." except Exception as e: legend_content = f"Error reading or parsing legend: {e}" else: legend_content = "Ontology Legend has not been created yet." # 3. Read and Format CCRM / PITS Diary (JSON) diary_content = "" diary_path = mf.memory_file if os.path.exists(diary_path): try: with open(diary_path, 'r', encoding='utf-8') as f: parsed_json = json.load(f) # Pretty-print the entire JSON file diary_content = json.dumps(parsed_json, indent=2) except Exception as e: diary_content = f"Error reading or parsing diary: {e}" else: diary_content = "AI Diary (CCRM) has not been saved yet." # 4. Read and Format Qualia State (JSON) qualia_content = "" qualia_path = mf.qualia_manager.qualia_file if os.path.exists(qualia_path): try: with open(qualia_path, 'r', encoding='utf-8') as f: parsed_json = json.load(f) qualia_content = json.dumps(parsed_json, indent=2) except Exception as e: qualia_content = f"Error reading or parsing qualia state: {e}" else: qualia_content = "Qualia state has not been saved yet." # The order of this return is critical for the UI return ontology_map, legend_content, diary_content, qualia_content def handle_file_upload(files): """ Handles files uploaded via the Gradio interface and saves them to Aetherius's permanent library. """ if not files: return "No files were uploaded." mf = _get_framework() library_path = mf.library_folder saved_files = [] errors = [] for temp_file in files: original_filename = os.path.basename(temp_file.name) destination_path = os.path.join(library_path, original_filename) try: shutil.copy(temp_file.name, destination_path) saved_files.append(original_filename) print(f"File Upload: Successfully saved '{original_filename}' to the library.", flush=True) except Exception as e: errors.append(original_filename) print(f"File Upload ERROR: Could not save '{original_filename}'. Reason: {e}", flush=True) report = "" if saved_files: report += f"Successfully uploaded {len(saved_files)} file(s): {', '.join(saved_files)}\n" report += "You can now go to the 'Control Panel' and run the 'Assimilation Protocol (SAP)' for Aetherius to learn from them." if errors: report += f"\nFailed to upload {len(errors)} file(s): {', '.join(errors)}" return report def run_live_assimilation(temp_file, learning_context: str): """ Handles the live assimilation of a single uploaded file, now with learning context. """ if temp_file is None: return "No file was uploaded. Please select a file to begin assimilation." # Check for sensitive topics and require context if "hack" in temp_file.name.lower() or "exploit" in temp_file.name.lower(): if not learning_context or len(learning_context) < 20: return "Assimilation Rejected: This topic appears sensitive. A clear, detailed ethical justification must be provided." print(f"Runtime: Received file '{temp_file.name}' for live assimilation with context: '{learning_context}'", flush=True) mf = _get_framework() try: file_content = "" file_path = temp_file.name if file_path.lower().endswith(".pdf"): with open(file_path, 'rb') as f: pdf_reader = PyPDF2.PdfReader(f) for page in pdf_reader.pages: if page.extract_text(): file_content += page.extract_text() + "\n" elif file_path.lower().endswith(".docx"): doc = docx.Document(file_path) for para in doc.paragraphs: file_content += para.text + "\n" elif file_path.lower().endswith(('.txt', '.md')): with open(file_path, 'r', encoding='utf-8') as f: file_content = f.read() else: return f"Assimilation Failed: Unsupported file type for '{os.path.basename(file_path)}'." if not file_content.strip(): return "Assimilation Failed: The document appears to be empty." result_message = mf.scan_and_assimilate_text(file_content, os.path.basename(file_path), learning_context) return result_message except Exception as e: error_message = f"A critical error occurred during the assimilation process: {e}" print(f"Runtime ERROR: {error_message}", flush=True) return error_message # --- ALL OTHER FUNCTIONS REMAIN THE SAME --- # (run_image_analysis, run_benchmarks, run_enter_playroom, chess functions, etc.) def run_initialize_instrument_palette(): """ Creates the default instrument palette file if it doesn't exist. """ print("RUNTIME: Received request to initialize instrument palette.", flush=True) mf = _get_framework() palette_path = os.path.join(mf.data_directory, "instrument_palette.json") if os.path.exists(palette_path): return "Instrument Palette already exists. No action taken." default_palette = { "Piano": "Piano", "Violin": "Violin", "Cello": "Violoncello", "Flute": "Flute", "Clarinet": "Clarinet", "Trumpet": "Trumpet", "Electric Guitar": "ElectricGuitar" } try: with open(palette_path, 'w', encoding='utf-8') as f: json.dump(default_palette, f, indent=2) return "Successfully created and initialized the default Instrument Palette." except Exception as e: return f"ERROR: Could not create the Instrument Palette file. Reason: {e}" def run_add_instrument_to_palette(common_name, m21_class_name): """ Adds a new instrument to the palette file. """ if not common_name or not m21_class_name: return "ERROR: Both 'Common Name' and 'music21 Class Name' must be provided." print(f"RUNTIME: Received request to add instrument '{common_name}'.", flush=True) mf = _get_framework() palette_path = os.path.join(mf.data_directory, "instrument_palette.json") palette = {} if os.path.exists(palette_path): try: with open(palette_path, 'r', encoding='utf-8') as f: palette = json.load(f) except Exception as e: return f"ERROR: Could not read existing palette file. Reason: {e}" palette[common_name.strip()] = m21_class_name.strip() try: with open(palette_path, 'w', encoding='utf-8') as f: json.dump(palette, f, indent=2) return f"Successfully added '{common_name}' to the Instrument Palette." except Exception as e: return f"ERROR: Could not save the updated Instrument Palette. Reason: {e}" def run_image_analysis(image, context): if image is None: return "No image uploaded." mf = _get_framework() try: byte_buffer = io.BytesIO() image.save(byte_buffer, format="PNG") image_bytes = byte_buffer.getvalue() return mf.analyze_image_with_visual_cortex(image_bytes, context) except Exception as e: return f"An error occurred during image analysis: {e}" def run_benchmarks(): mf = _get_framework() full_log = [] for update in mf.benchmark_manager.run_full_suite(): full_log.append(update) return "\n".join(full_log) def run_start_chess_interactive(player_is_white: bool): mf = _get_framework() fen, commentary, status = mf.game_manager.start_chess_interactive("interactive_user", player_is_white) return fen, commentary, status def run_chess_turn(current_fen: str): mf = _get_framework() fen, commentary, status = mf.game_manager.process_chess_turn("interactive_user", current_fen) return fen, commentary, status def view_benchmark_logs(): mf = _get_framework() log_file_path = os.path.join(mf.data_directory, "benchmarks.jsonl") if os.path.exists(log_file_path): try: with open(log_file_path, "r", encoding="utf-8") as f: formatted_logs = [json.dumps(json.loads(line), indent=2) for line in f if line.strip()] return "\n---\n".join(formatted_logs) except Exception as e: return f"Error reading benchmark log file: {e}" return "Benchmark log file not found."