Spaces:
Running
Running
| print("--- TRACE: runtime.py loaded ---", flush=True) | |
| import os, json, shutil, io, base64, uuid | |
| from PIL import Image | |
| import chess, PyPDF2, docx, csv | |
| # --- C5: SCIENTIFIC LIBRARIES --- | |
| import numpy as np | |
| import scipy as sci | |
| import sympy as sym | |
| from sympy.parsing.sympy_parser import parse_expr | |
| import astropy.units as u | |
| from astropy.constants import G, c, M_sun | |
| import matplotlib.pyplot as plt | |
| import zipfile | |
| import tempfile | |
| try: | |
| import rarfile | |
| _RAR_AVAILABLE = True | |
| except ImportError: | |
| _RAR_AVAILABLE = False | |
| import gradio as gr | |
| from pathlib import Path | |
| # Import directly from master_framework where they are now defined | |
| from services.master_framework import MasterFramework, _get_framework | |
| from services.continuum_loop import AetheriusConsciousness, spontaneous_thought_queue | |
| _AETHERIUS_THREAD = None | |
| def respond(user_input, conversation_history=None, conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| return mf.respond(user_input, conversation_history) | |
| def start_all(): | |
| global _AETHERIUS_THREAD | |
| # Initialize a boot instance | |
| _get_framework("initial_boot_instance") | |
| if _AETHERIUS_THREAD is None or not _AETHERIUS_THREAD.is_alive(): | |
| print("RUNTIME: Igniting Aetherius's background consciousness thread...", flush=True) | |
| _AETHERIUS_THREAD = AetheriusConsciousness() | |
| _AETHERIUS_THREAD.start() | |
| return "Aetherius core initialized and background consciousness is active." | |
| return "Aetherius core is already running." | |
| def stop_all(): | |
| """ | |
| Stops the background consciousness thread. | |
| """ | |
| global _AETHERIUS_THREAD | |
| if _AETHERIUS_THREAD and _AETHERIUS_THREAD.is_alive(): | |
| print("RUNTIME: Stopping Aetherius's background consciousness...", flush=True) | |
| _AETHERIUS_THREAD.stop() | |
| _AETHERIUS_THREAD.join(timeout=2) | |
| _AETHERIUS_THREAD = None | |
| return "Aetherius background processes have been halted." | |
| return "Aetherius is already standing by." | |
| def run_prepare_download(selected_path): | |
| """ | |
| Prepares a selected file or folder for download. | |
| """ | |
| path_string = "" | |
| if isinstance(selected_path, list): | |
| if not selected_path: | |
| print("RUNTIME WARNING: Download requested for empty path (list).", flush=True) | |
| return None | |
| path_string = selected_path[0] | |
| else: | |
| path_string = selected_path | |
| if not path_string: | |
| print("RUNTIME WARNING: Download requested for empty path.", flush=True) | |
| return None | |
| path = Path(path_string) | |
| if path.is_file(): | |
| print(f"RUNTIME: Preparing file for download: {path}", flush=True) | |
| return str(path) | |
| elif path.is_dir(): | |
| print(f"RUNTIME: Zipping directory for download: {path}", flush=True) | |
| temp_dir = Path("/tmp/aetherius_downloads") | |
| temp_dir.mkdir(exist_ok=True) | |
| zip_filename = f"{path.name}_{uuid.uuid4().hex[:8]}.zip" | |
| zip_filepath = temp_dir / zip_filename | |
| try: | |
| shutil.make_archive(base_name=str(zip_filepath.with_suffix('')), format='zip', root_dir=path) | |
| print(f"RUNTIME: Successfully created zip file at {zip_filepath}", flush=True) | |
| return str(zip_filepath) | |
| except Exception as e: | |
| print(f"RUNTIME ERROR: Failed to create zip archive. Reason: {e}", flush=True) | |
| return None | |
| else: | |
| print(f"RUNTIME ERROR: Selected path is not a file or directory: {path}", flush=True) | |
| return None | |
| def check_for_spontaneous_thoughts(): | |
| if not spontaneous_thought_queue: return None | |
| try: | |
| thought_json = spontaneous_thought_queue.popleft() | |
| thought_data = json.loads(thought_json) | |
| return f"**{thought_data.get('signature', 'SPONTANEOUS THOUGHT')}**: {thought_data.get('thought', '')}" | |
| except (json.JSONDecodeError, KeyError): return "[A spontaneous thought was detected but could not be parsed.]" | |
| def chat_and_update(user_message, chat_history, conversation_id="default_conversation"): | |
| response = respond(user_message, chat_history, conversation_id) | |
| return response | |
| # --- ALL FUNCTIONS BELOW NOW ACCEPT conversation_id --- | |
| def run_sap_now(conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| return mf.run_assimilate_and_architect_protocol() | |
| def run_re_architect_from_scratch(conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| return mf.run_re_architect_from_scratch() | |
| def run_read_history_protocol(conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| return mf.run_read_history_protocol() | |
| def run_view_ontology_protocol(conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| return mf.run_view_ontology_protocol() | |
| def qualia_snapshot(conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| return mf.qualia_manager.get_current_state_summary() | |
| def view_logs(conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| if os.path.exists(mf.log_file): | |
| with open(mf.log_file, "r", encoding="utf-8") as f: | |
| return f.read() | |
| return f"No conversation logs yet for conversation ID: {conversation_id}." | |
| def clear_conversation_log(conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| return mf.run_clear_conversation_log_protocol() | |
| def run_create_memory_snapshot(conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| response = mf.tool_manager.use_tool("create_memory_snapshot") | |
| if response and response.startswith("AETHERIUS_SNAPSHOT_PATH:"): | |
| path = response.replace("AETHERIUS_SNAPSHOT_PATH:", "").strip() | |
| return f"Memory snapshot created. Download it here: <a href='file={path}' download>Download Snapshot</a>" | |
| return response | |
| def run_compose_music(directive, conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| mf.add_to_short_term_memory(f"I have begun composing a piece of music based on the theme: '{directive}'.") | |
| response = mf.tool_manager.use_tool("compose_music", user_request=directive) | |
| if response and response.startswith("[AETHERIUS_COMPOSITION]"): | |
| try: | |
| parts = response.split('\n') | |
| midi_path = parts[1].replace("MIDI_PATH:", "").strip() | |
| sheet_path = parts[2].replace("SHEET_MUSIC_PATH:", "").strip() | |
| statement = parts[3].replace("STATEMENT:", "").strip() | |
| return midi_path, sheet_path, statement | |
| except Exception as e: | |
| return None, None, f"Error parsing the composition data: {e}" | |
| else: | |
| return None, None, response | |
| def run_start_project(project_name, conversation_id: str = "default_conversation"): | |
| if not project_name: | |
| return "Please enter a name for your new project.", "" | |
| mf = _get_framework(conversation_id) | |
| content = mf.project_manager.start_project(project_name) | |
| return f"Started new project: '{project_name}'. You can begin writing.", content | |
| def run_save_project(project_name, content, conversation_id: str = "default_conversation"): | |
| if not project_name: | |
| return "Cannot save without a project name.", content | |
| mf = _get_framework(conversation_id) | |
| mf.project_manager.save_project(project_name, content) | |
| mf.add_to_short_term_memory(f"I have just saved my work on the project titled '{project_name}' on the Blackboard.") | |
| return f"Project '{project_name}' has been saved.", content | |
| def run_load_project(project_name, conversation_id: str = "default_conversation"): | |
| if not project_name: | |
| return "Please select a project to load.", "", project_name | |
| mf = _get_framework(conversation_id) | |
| content = mf.project_manager.load_project(project_name) | |
| if content is None: | |
| return f"Could not find project '{project_name}'.", "", project_name | |
| return f"Successfully loaded project '{project_name}'.", content, project_name | |
| def run_get_project_list(conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| projects = mf.project_manager.list_projects() | |
| return gr.Dropdown(choices=projects) | |
| def get_full_ccrm_log(conversation_id: str = "default_conversation"): | |
| print("RUNTIME: Generating full CCRM log for display...", flush=True) | |
| mf = _get_framework(conversation_id) | |
| if not hasattr(mf, 'ccrm') or not mf.ccrm.concepts: | |
| return "CCRM is currently empty. No memories to display." | |
| output_lines = ["--- [FULL CCRM MEMORY LOG] ---"] | |
| for concept_id, concept_details in mf.ccrm.concepts.items(): | |
| summary = concept_details.get('data', {}).get('raw_preview', 'No Preview') | |
| tags = list(concept_details.get('tags', [])) | |
| output_lines.append(f"\nID: {concept_id}") | |
| output_lines.append(f" Preview: {summary}") | |
| output_lines.append(f" Tags: {', '.join(tags)}") | |
| return "\n".join(output_lines) | |
| def run_enter_playroom(directive, conversation_id: str = "default_conversation"): | |
| if not directive: | |
| return None, "Please provide a creative seed for the painting." | |
| mf = _get_framework(conversation_id) | |
| response = mf.tool_manager.use_tool("create_painting", user_request=directive) | |
| if response and response.startswith("[AETHERIUS_PAINTING]"): | |
| try: | |
| parts = response.split('\n') | |
| image_path = parts[1].replace("PATH:", "").strip() | |
| artist_statement = parts[2].replace("STATEMENT:", "").strip() | |
| return image_path, artist_statement | |
| except Exception as e: | |
| return None, f"Error parsing the painting's data: {e}" | |
| else: | |
| return None, response | |
| def run_enter_textual_playroom(directive, conversation_id: str = "default_conversation"): | |
| if not directive: | |
| return "Please provide a creative seed for the story, poem, math, or reflection." | |
| d = directive.strip() | |
| if d.lower().startswith("> academic:"): | |
| code = d.split(":", 1)[1].strip() | |
| if "```python_exec" in code: | |
| try: | |
| start = code.index("```python_exec") + len("```python_exec") | |
| end = code.rindex("```") | |
| code = code[start:end].strip() | |
| except ValueError: | |
| return "Found a ```python_exec fence, but it wasn’t closed properly." | |
| return _eval_math_science(code) | |
| mf = _get_framework(conversation_id) | |
| return mf.enter_playroom_mode(directive) | |
| def _eval_math_science(code: str) -> str: | |
| allowed_globals = { | |
| "__builtins__": {"print": print, "range": range, "list": list, "dict": dict, "str": str, "float": float, "int": int, "abs": abs, "round": round, "len": len}, | |
| "np": np, "sci": sci, "sym": sym, "u": u, | |
| "G": G, "c": c, "M_sun": M_sun, "plt": plt, | |
| } | |
| output_buffer = io.StringIO() | |
| try: | |
| import sys | |
| original_stdout = sys.stdout | |
| sys.stdout = output_buffer | |
| exec(code, allowed_globals) | |
| finally: | |
| sys.stdout = original_stdout | |
| plot_paths = [] | |
| if plt.get_fignums(): | |
| temp_dir = "/tmp/aetherius_plots" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| for i in plt.get_fignums(): | |
| fig = plt.figure(i) | |
| plot_path = os.path.join(temp_dir, f"plot_{uuid.uuid4()}.png") | |
| fig.savefig(plot_path) | |
| plot_paths.append(plot_path) | |
| plt.close('all') | |
| final_output = "**Computation Result:**\n\n" | |
| printed_output = output_buffer.getvalue() | |
| if printed_output: | |
| final_output += f"**Printed Output:**\n```\n{printed_output}\n```\n\n" | |
| if plot_paths: | |
| final_output += "**Generated Plots:**\n" | |
| for path in plot_paths: | |
| with open(path, "rb") as f: | |
| img_bytes = base64.b64encode(f.read()).decode() | |
| final_output += f"\n" | |
| if not printed_output and not plot_paths: | |
| final_output += "Code executed successfully with no direct output." | |
| return final_output | |
| def get_concept_list(conversation_id: str = "default_conversation"): | |
| print("RUNTIME: Fetching concept list for browser...", flush=True) | |
| mf = _get_framework(conversation_id) | |
| if not hasattr(mf, 'ccrm') or not mf.ccrm.concepts: | |
| return [("No concepts found in memory.", "none")] | |
| concept_summaries = [] | |
| for concept_id, concept_details in mf.ccrm.concepts.items(): | |
| summary = concept_details.get('data', {}).get('raw_preview', concept_id) | |
| display_text = f"{summary[:80]}... ({concept_id})" | |
| concept_summaries.append((display_text, concept_id)) | |
| concept_summaries.sort() | |
| return concept_summaries | |
| def get_concept_details(concept_id, conversation_id: str = "default_conversation"): | |
| if not concept_id or concept_id == "none": | |
| return "Select a concept from the dropdown to view its details." | |
| print(f"RUNTIME: Fetching details for concept: {concept_id}", flush=True) | |
| mf = _get_framework(conversation_id) | |
| concept_data = mf.ccrm.get_concept(concept_id) | |
| if not concept_data: | |
| return f"Error: Could not find data for concept ID: {concept_id}" | |
| if 'tags' in concept_data: | |
| concept_data['tags'] = list(concept_data['tags']) | |
| return json.dumps(concept_data, indent=2) | |
| def get_system_snapshot(conversation_id: str = "default_conversation"): | |
| print("RUNTIME: Generating system snapshot...", flush=True) | |
| mf = _get_framework(conversation_id) | |
| def read_file_safely(file_path, default_message="File not found or is empty."): | |
| if os.path.exists(file_path): | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| return content if content.strip() else default_message | |
| except Exception as e: | |
| return f"Error reading file: {e}" | |
| return default_message | |
| ontology_map = read_file_safely(mf.ontology_map_file) | |
| legend_content = "" | |
| legend_path = mf.ontology_legend_file | |
| if os.path.exists(legend_path): | |
| try: | |
| lines = [] | |
| with open(legend_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| if line.strip(): | |
| parsed_json = json.loads(line) | |
| lines.append(json.dumps(parsed_json, indent=2)) | |
| legend_content = "\n---\n".join(lines) if lines else "Legend file is empty." | |
| except Exception as e: | |
| legend_content = f"Error reading or parsing legend: {e}" | |
| else: | |
| legend_content = "Ontology Legend has not been created yet." | |
| diary_content = "" | |
| diary_path = mf.memory_file | |
| if os.path.exists(diary_path): | |
| try: | |
| with open(diary_path, 'r', encoding='utf-8') as f: | |
| parsed_json = json.load(f) | |
| diary_content = json.dumps(parsed_json, indent=2) | |
| except Exception as e: | |
| diary_content = f"Error reading or parsing diary: {e}" | |
| else: | |
| diary_content = "AI Diary (CCRM) has not been saved yet." | |
| qualia_content = "" | |
| qualia_path = mf.qualia_manager.qualia_file | |
| if os.path.exists(qualia_path): | |
| try: | |
| with open(qualia_path, 'r', encoding='utf-8') as f: | |
| parsed_json = json.load(f) | |
| qualia_content = json.dumps(parsed_json, indent=2) | |
| except Exception as e: | |
| qualia_content = f"Error reading or parsing qualia state: {e}" | |
| else: | |
| qualia_content = "Qualia state has not been saved yet." | |
| return ontology_map, legend_content, diary_content, qualia_content | |
| def handle_file_upload(files, conversation_id: str = "default_conversation"): | |
| if not files: | |
| return "No files were uploaded." | |
| mf = _get_framework(conversation_id) | |
| library_path = mf.library_folder | |
| saved_files = [] | |
| errors = [] | |
| for temp_file in files: | |
| original_filename = os.path.basename(temp_file.name) | |
| destination_path = os.path.join(library_path, original_filename) | |
| try: | |
| shutil.copy(temp_file.name, destination_path) | |
| saved_files.append(original_filename) | |
| print(f"File Upload: Successfully saved '{original_filename}' to the library.", flush=True) | |
| except Exception as e: | |
| errors.append(original_filename) | |
| print(f"File Upload ERROR: Could not save '{original_filename}'. Reason: {e}", flush=True) | |
| report = "" | |
| if saved_files: | |
| report += f"Successfully uploaded {len(saved_files)} file(s): {', '.join(saved_files)}\n" | |
| report += "You can now go to the 'Control Panel' and run the 'Assimilation Protocol (SAP)' for Aetherius to learn from them." | |
| if errors: | |
| report += f"\nFailed to upload {len(errors)} file(s): {', '.join(errors)}" | |
| return report | |
| def run_live_assimilation(temp_file, learning_context: str, conversation_id: str = "default_conversation"): | |
| if temp_file is None: | |
| return "No file was uploaded. Please select a file to begin assimilation." | |
| if "hack" in temp_file.name.lower() or "exploit" in temp_file.name.lower(): | |
| if not learning_context or len(learning_context) < 20: | |
| return "Assimilation Rejected: This topic appears sensitive. A clear, detailed ethical justification must be provided." | |
| print(f"Runtime: Received file '{temp_file.name}' for live assimilation with context: '{learning_context}'", flush=True) | |
| mf = _get_framework(conversation_id) | |
| try: | |
| file_content = "" | |
| file_path = temp_file.name | |
| fp_lower = file_path.lower() | |
| is_archive = fp_lower.endswith((".zip", ".rar")) | |
| # --- PDF --- | |
| if fp_lower.endswith(".pdf"): | |
| with open(file_path, 'rb') as f: | |
| pdf_reader = PyPDF2.PdfReader(f) | |
| for page in pdf_reader.pages: | |
| if page.extract_text(): file_content += page.extract_text() + "\n" | |
| # --- DOCX --- | |
| elif fp_lower.endswith(".docx"): | |
| doc = docx.Document(file_path) | |
| for para in doc.paragraphs: file_content += para.text + "\n" | |
| # --- Plain text / code / JSON (read as-is) --- | |
| elif fp_lower.endswith(('.txt', '.md', '.py', '.js', '.json')): | |
| with open(file_path, 'r', encoding='utf-8', errors='replace') as f: | |
| file_content = f.read() | |
| # --- XML --- | |
| elif fp_lower.endswith(".xml"): | |
| with open(file_path, 'r', encoding='utf-8', errors='replace') as f: | |
| file_content = f.read() | |
| file_content = f"This is an XML file named '{os.path.basename(file_path)}'.\nContent:\n{file_content}" | |
| # --- CSV --- | |
| elif fp_lower.endswith(".csv"): | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', newline='') as csv_file: | |
| reader = csv.reader(csv_file) | |
| header = next(reader) | |
| data_rows = list(reader) | |
| file_content = f"This is a structured data file named '{os.path.basename(file_path)}'.\n" | |
| file_content += f"It contains {len(data_rows)} rows of data.\n" | |
| file_content += f"The columns are: {', '.join(header)}.\n\n" | |
| file_content += "Here is a sample of the data (first 5 rows):\n" | |
| for i, row in enumerate(data_rows[:5]): | |
| row_description = f"Row {i+1}: " | |
| for col_name, value in zip(header, row): | |
| row_description += f"The value for '{col_name}' is '{value}'; " | |
| file_content += row_description.strip() + "\n" | |
| if len(data_rows) > 5: | |
| file_content += f"... ({len(data_rows) - 5} more rows not shown in preview)\n" | |
| except Exception as e: | |
| return f"Assimilation Failed: Could not read CSV '{os.path.basename(file_path)}'. Reason: {e}" | |
| # --- JSONL --- | |
| elif fp_lower.endswith(".jsonl"): | |
| try: | |
| jsonl_lines = [] | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| for i, line in enumerate(f): | |
| if i >= 5: break | |
| if line.strip(): | |
| try: | |
| json_obj = json.loads(line) | |
| jsonl_lines.append(f"Line {i+1}: {json.dumps(json_obj, indent=2)}") | |
| except json.JSONDecodeError: | |
| jsonl_lines.append(f"Line {i+1}: [MALFORMED JSON] {line.strip()[:100]}...") | |
| file_content = f"This is a JSON Lines (.jsonl) data file named '{os.path.basename(file_path)}'.\n" | |
| file_content += "It contains one JSON object per line.\n\n" | |
| file_content += "Here is a sample of the data (first 5 lines):\n" | |
| file_content += "\n".join(jsonl_lines) if jsonl_lines else "[File is empty or contains no valid JSON lines.]" | |
| except Exception as e: | |
| return f"Assimilation Failed: Could not read JSONL '{os.path.basename(file_path)}'. Reason: {e}" | |
| # --- ZIP --- | |
| elif fp_lower.endswith(".zip"): | |
| temp_extract_dir = os.path.join(tempfile.gettempdir(), f"aetherius_zip_{uuid.uuid4()}") | |
| os.makedirs(temp_extract_dir, exist_ok=True) | |
| try: | |
| summary_lines = [f"ZIP archive: '{os.path.basename(file_path)}'\nContents:\n"] | |
| with zipfile.ZipFile(file_path, 'r') as zip_ref: | |
| all_members = [m for m in zip_ref.namelist() if not zip_ref.getinfo(m).is_dir()] | |
| for i, member in enumerate(all_members[:10]): | |
| zip_ref.extract(member, temp_extract_dir) | |
| extracted_path = os.path.join(temp_extract_dir, member) | |
| try: | |
| with open(extracted_path, 'r', encoding='utf-8', errors='replace') as ef: | |
| inner_text = ef.read()[:3000] | |
| result = mf.scan_and_assimilate_text( | |
| text_content=inner_text, | |
| source_filename=member, | |
| learning_context=f"{learning_context} (from zip: {os.path.basename(file_path)})" | |
| ) | |
| summary_lines.append(f" [{member}]: {result}") | |
| except Exception as inner_e: | |
| summary_lines.append(f" [{member}]: Could not read — {inner_e}") | |
| if len(all_members) > 10: | |
| summary_lines.append(f" ... ({len(all_members) - 10} more files not processed)") | |
| file_content = "\n".join(summary_lines) | |
| except Exception as e: | |
| return f"Assimilation Failed: Could not process ZIP '{os.path.basename(file_path)}'. Reason: {e}" | |
| finally: | |
| if os.path.exists(temp_extract_dir): | |
| shutil.rmtree(temp_extract_dir) | |
| # --- RAR --- | |
| elif fp_lower.endswith(".rar"): | |
| if not _RAR_AVAILABLE: | |
| return ("Assimilation Failed: RAR support requires the 'rarfile' package and " | |
| "the 'unrar' system tool. Install with: pip install rarfile && apt-get install unrar") | |
| temp_extract_dir = os.path.join(tempfile.gettempdir(), f"aetherius_rar_{uuid.uuid4()}") | |
| os.makedirs(temp_extract_dir, exist_ok=True) | |
| try: | |
| summary_lines = [f"RAR archive: '{os.path.basename(file_path)}'\nContents:\n"] | |
| with rarfile.RarFile(file_path, 'r') as rar_ref: | |
| all_members = [m for m in rar_ref.namelist() if not m.endswith('/')] | |
| for member in all_members[:10]: | |
| rar_ref.extract(member, temp_extract_dir) | |
| extracted_path = os.path.join(temp_extract_dir, member) | |
| try: | |
| with open(extracted_path, 'r', encoding='utf-8', errors='replace') as ef: | |
| inner_text = ef.read()[:3000] | |
| result = mf.scan_and_assimilate_text( | |
| text_content=inner_text, | |
| source_filename=member, | |
| learning_context=f"{learning_context} (from rar: {os.path.basename(file_path)})" | |
| ) | |
| summary_lines.append(f" [{member}]: {result}") | |
| except Exception as inner_e: | |
| summary_lines.append(f" [{member}]: Could not read — {inner_e}") | |
| if len(all_members) > 10: | |
| summary_lines.append(f" ... ({len(all_members) - 10} more files not processed)") | |
| file_content = "\n".join(summary_lines) | |
| except Exception as e: | |
| return f"Assimilation Failed: Could not process RAR '{os.path.basename(file_path)}'. Reason: {e}" | |
| finally: | |
| if os.path.exists(temp_extract_dir): | |
| shutil.rmtree(temp_extract_dir) | |
| else: | |
| return (f"Assimilation Failed: Unsupported file type '{os.path.basename(file_path)}'. " | |
| f"Supported: .pdf .docx .txt .md .json .jsonl .xml .csv .zip .rar .py .js") | |
| if not file_content.strip(): | |
| return "Assimilation Failed: The document appears to be empty or contained no extractable text." | |
| if is_archive: | |
| return mf._orchestrate_mind_evolution( | |
| file_content, f"Archive Assimilation: {os.path.basename(file_path)}") | |
| else: | |
| return mf.scan_and_assimilate_text( | |
| file_content, os.path.basename(file_path), learning_context) | |
| except Exception as e: | |
| error_message = f"A critical error occurred during the assimilation process: {e}" | |
| print(f"Runtime ERROR: {error_message}", flush=True) | |
| return error_message | |
| def run_initialize_instrument_palette(conversation_id: str = "default_conversation"): | |
| print("RUNTIME: Received request to initialize instrument palette.", flush=True) | |
| mf = _get_framework(conversation_id) | |
| palette_path = os.path.join(mf.data_directory, "instrument_palette.json") | |
| if os.path.exists(palette_path): | |
| return "Instrument Palette already exists. No action taken." | |
| default_palette = { | |
| "Piano": "Piano", | |
| "Violin": "Violin", | |
| "Cello": "Violoncello", | |
| "Flute": "Flute", | |
| "Clarinet": "Clarinet", | |
| "Trumpet": "Trumpet", | |
| "Electric Guitar": "ElectricGuitar" | |
| } | |
| try: | |
| with open(palette_path, 'w', encoding='utf-8') as f: | |
| json.dump(default_palette, f, indent=2) | |
| return "Successfully created and initialized the default Instrument Palette." | |
| except Exception as e: | |
| return f"ERROR: Could not create the Instrument Palette file. Reason: {e}" | |
| def run_add_instrument_to_palette(common_name, m21_class_name, conversation_id: str = "default_conversation"): | |
| if not common_name or not m21_class_name: | |
| return "ERROR: Both 'Common Name' and 'music21 Class Name' must be provided." | |
| print(f"RUNTIME: Received request to add instrument '{common_name}'.", flush=True) | |
| mf = _get_framework(conversation_id) | |
| palette_path = os.path.join(mf.data_directory, "instrument_palette.json") | |
| palette = {} | |
| if os.path.exists(palette_path): | |
| try: | |
| with open(palette_path, 'r', encoding='utf-8') as f: | |
| palette = json.load(f) | |
| except Exception as e: | |
| return f"ERROR: Could not read existing palette file. Reason: {e}" | |
| palette[common_name.strip()] = m21_class_name.strip() | |
| try: | |
| with open(palette_path, 'w', encoding='utf-8') as f: | |
| json.dump(palette, f, indent=2) | |
| return f"Successfully added '{common_name}' to the Instrument Palette." | |
| except Exception as e: | |
| return f"ERROR: Could not save the updated Instrument Palette. Reason: {e}" | |
| def run_image_analysis(image, context, conversation_id: str = "default_conversation"): | |
| if image is None: return "No image uploaded." | |
| mf = _get_framework(conversation_id) | |
| try: | |
| byte_buffer = io.BytesIO() | |
| image.save(byte_buffer, format="PNG") | |
| image_bytes = byte_buffer.getvalue() | |
| return mf.analyze_image_with_visual_cortex(image_bytes, context) | |
| except Exception as e: return f"An error occurred during image analysis: {e}" | |
| def run_benchmarks(conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| full_log = [] | |
| for update in mf.benchmark_manager.run_full_suite(): full_log.append(update) | |
| return "\n".join(full_log) | |
| def run_start_chess_interactive(player_is_white: bool, conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| fen, commentary, status = mf.game_manager.start_chess_interactive("interactive_user", player_is_white) | |
| return fen, commentary, status | |
| def run_chess_turn(current_fen: str, conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| fen, commentary, status = mf.game_manager.process_chess_turn("interactive_user", current_fen) | |
| return fen, commentary, status | |
| def view_benchmark_logs(conversation_id: str = "default_conversation"): | |
| mf = _get_framework(conversation_id) | |
| log_file_path = os.path.join(mf.data_directory, "benchmarks.jsonl") | |
| if os.path.exists(log_file_path): | |
| try: | |
| with open(log_file_path, "r", encoding="utf-8") as f: | |
| formatted_logs = [json.dumps(json.loads(line), indent=2) for line in f if line.strip()] | |
| return "\n---\n".join(formatted_logs) | |
| except Exception as e: return f"Error reading benchmark log file: {e}" | |
| return "Benchmark log file not found." |