""" DuckDB Schema and State Management for Massive CPU Grid """ import duckdb import json from typing import Dict, List from pathlib import Path import asyncio import logging from datetime import datetime from config import get_hf_token_cached # Initialize token from .env logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) class CPUStateManager: def __init__(self, db_path: str = "hf://datasets/Fred808/helium/storage.json"): self.db_path = db_path self.con = self.initialize_db() def initialize_db(self) -> duckdb.DuckDBPyConnection: """Initialize DuckDB with our schema""" con = duckdb.connect(self.db_path) # Configure HuggingFace access con.execute("INSTALL httpfs;") con.execute("LOAD httpfs;") con.execute("SET s3_endpoint='hf.co';") con.execute("SET s3_use_ssl=true;") con.execute("SET s3_url_style='path';") con.execute(f"SET s3_access_key_id='{self.HF_TOKEN}';") con.execute(f"SET s3_secret_access_key='{self.HF_TOKEN}';") # CPU States Table con.execute(""" CREATE TABLE IF NOT EXISTS cpu_states ( cpu_id INTEGER, core_id INTEGER, thread_id INTEGER, state JSON, last_instruction BIGINT, timestamp TIMESTAMP, PRIMARY KEY (cpu_id, core_id, thread_id) ) """) # Thread Allocation Table con.execute(""" CREATE TABLE IF NOT EXISTS thread_allocation ( thread_id BIGINT, cpu_id INTEGER, core_id INTEGER, thread_local_id INTEGER, task_type VARCHAR, -- UI, Compute, IO, Background is_active BOOLEAN, created_at TIMESTAMP, last_active TIMESTAMP, PRIMARY KEY (thread_id) ) """) # Memory Segments Table con.execute(""" CREATE TABLE IF NOT EXISTS memory_segments ( segment_id BIGINT, start_address BIGINT, size BIGINT, cpu_id INTEGER, core_id INTEGER, thread_id INTEGER, allocation_type VARCHAR, is_shared BOOLEAN, created_at TIMESTAMP, PRIMARY KEY (segment_id) ) """) # Instruction Queue Table con.execute(""" CREATE TABLE IF NOT EXISTS instruction_queue ( instruction_id BIGINT, cpu_id INTEGER, core_id INTEGER, thread_id INTEGER, instruction JSON, priority INTEGER, status VARCHAR, -- pending, running, completed, failed created_at TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP, PRIMARY KEY (instruction_id) ) """) # Create indexes for performance con.execute("CREATE INDEX IF NOT EXISTS idx_cpu_states ON cpu_states(cpu_id, timestamp)") con.execute("CREATE INDEX IF NOT EXISTS idx_thread_allocation ON thread_allocation(cpu_id, is_active)") con.execute("CREATE INDEX IF NOT EXISTS idx_instruction_queue ON instruction_queue(status, priority)") return con def allocate_cpu_group(self, start_id: int, end_id: int, task_type: str): """Allocate a group of CPUs for specific task type""" self.con.execute(""" INSERT INTO thread_allocation ( thread_id, cpu_id, core_id, thread_local_id, task_type, is_active, created_at, last_active ) SELECT (c.cpu_id * 5000 + co.core_id * 100 + t.thread_id) as thread_id, c.cpu_id, co.core_id, t.thread_id as thread_local_id, ?, TRUE, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP FROM (SELECT generate_series(?, ?) as cpu_id) c CROSS JOIN (SELECT generate_series(0, 49) as core_id) co CROSS JOIN (SELECT generate_series(0, 99) as thread_id) t """, (task_type, start_id, end_id)) logger.info(f"Allocated CPUs {start_id}-{end_id} for {task_type}") async def update_cpu_state(self, cpu_id: int, core_id: int, thread_id: int, state: Dict): """Update state for a specific CPU thread""" self.con.execute(""" INSERT INTO cpu_states (cpu_id, core_id, thread_id, state, last_instruction, timestamp) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ON CONFLICT (cpu_id, core_id, thread_id) DO UPDATE SET state = EXCLUDED.state, last_instruction = EXCLUDED.last_instruction, timestamp = EXCLUDED.timestamp """, (cpu_id, core_id, thread_id, json.dumps(state), state.get('instruction_ptr', 0))) async def get_cpu_state(self, cpu_id: int, core_id: int, thread_id: int) -> Dict: """Get current state for a specific CPU thread""" result = self.con.execute(""" SELECT state FROM cpu_states WHERE cpu_id = ? AND core_id = ? AND thread_id = ? """, (cpu_id, core_id, thread_id)).fetchone() return json.loads(result[0]) if result else None async def queue_instruction(self, cpu_id: int, instruction: Dict, priority: int = 0): """Queue an instruction for execution""" self.con.execute(""" INSERT INTO instruction_queue ( instruction_id, cpu_id, core_id, thread_id, instruction, priority, status, created_at ) SELECT nextval('instruction_seq'), ?, core_id, thread_id, ?, ?, 'pending', CURRENT_TIMESTAMP FROM thread_allocation WHERE cpu_id = ? AND is_active = TRUE ORDER BY RANDOM() LIMIT 1 """, (cpu_id, json.dumps(instruction), priority, cpu_id)) async def process_instruction_queue(self): """Process pending instructions""" while True: # Get next batch of instructions instructions = self.con.execute(""" SELECT instruction_id, cpu_id, core_id, thread_id, instruction FROM instruction_queue WHERE status = 'pending' ORDER BY priority DESC, created_at ASC LIMIT 1000 """).fetchall() if not instructions: await asyncio.sleep(0.1) continue # Process instructions for inst in instructions: instruction_id, cpu_id, core_id, thread_id, instruction = inst try: # Mark as running self.con.execute(""" UPDATE instruction_queue SET status = 'running', started_at = CURRENT_TIMESTAMP WHERE instruction_id = ? """, (instruction_id,)) # Execute instruction (placeholder) await self.execute_instruction(cpu_id, core_id, thread_id, json.loads(instruction)) # Mark as completed self.con.execute(""" UPDATE instruction_queue SET status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE instruction_id = ? """, (instruction_id,)) except Exception as e: logger.error(f"Error processing instruction {instruction_id}: {e}") self.con.execute(""" UPDATE instruction_queue SET status = 'failed', completed_at = CURRENT_TIMESTAMP WHERE instruction_id = ? """, (instruction_id,)) await asyncio.sleep(0.01) # Prevent tight loop async def execute_instruction(self, cpu_id: int, core_id: int, thread_id: int, instruction: Dict): """Execute a single instruction (placeholder)""" # Get current state state = await self.get_cpu_state(cpu_id, core_id, thread_id) or {} # Update state based on instruction state['instruction_ptr'] = state.get('instruction_ptr', 0) + 1 state['last_instruction'] = instruction # Save updated state await self.update_cpu_state(cpu_id, core_id, thread_id, state) def initialize_cpu_grid(self): """Initialize the complete CPU grid with task assignments""" # UI/Display Operations (0-499) self.allocate_cpu_group(0, 499, "UI") # Computation/Processing (500-999) self.allocate_cpu_group(500, 999, "COMPUTE") # I/O & Storage Operations (1000-1499) self.allocate_cpu_group(1000, 1499, "IO") # Background/System Tasks (1500-1999) self.allocate_cpu_group(1500, 1999, "BACKGROUND") logger.info("CPU grid initialized with all task groups") async def main(): """Test the CPU state manager""" manager = CPUStateManager() manager.initialize_cpu_grid() # Start instruction processor asyncio.create_task(manager.process_instruction_queue()) # Test queueing some instructions await manager.queue_instruction(0, {"type": "UI_UPDATE", "component": "window1"}, priority=1) await manager.queue_instruction(500, {"type": "MATRIX_MULTIPLY", "size": 1000}, priority=2) # Keep running while True: await asyncio.sleep(1) if __name__ == "__main__": asyncio.run(main())