INV / cpu_grid_manager.py
Fred808's picture
Upload 256 files
7a0c684 verified
"""
DuckDB Schema and State Management for Massive CPU Grid
"""
import duckdb
import json
from typing import Dict, List
from pathlib import Path
import asyncio
import logging
from datetime import datetime
from config import get_hf_token_cached
# Initialize token from .env
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
class CPUStateManager:
def __init__(self, db_path: str = "hf://datasets/Fred808/helium/storage.json"):
self.db_path = db_path
self.con = self.initialize_db()
def initialize_db(self) -> duckdb.DuckDBPyConnection:
"""Initialize DuckDB with our schema"""
con = duckdb.connect(self.db_path)
# Configure HuggingFace access
con.execute("INSTALL httpfs;")
con.execute("LOAD httpfs;")
con.execute("SET s3_endpoint='hf.co';")
con.execute("SET s3_use_ssl=true;")
con.execute("SET s3_url_style='path';")
con.execute(f"SET s3_access_key_id='{self.HF_TOKEN}';")
con.execute(f"SET s3_secret_access_key='{self.HF_TOKEN}';")
# CPU States Table
con.execute("""
CREATE TABLE IF NOT EXISTS cpu_states (
cpu_id INTEGER,
core_id INTEGER,
thread_id INTEGER,
state JSON,
last_instruction BIGINT,
timestamp TIMESTAMP,
PRIMARY KEY (cpu_id, core_id, thread_id)
)
""")
# Thread Allocation Table
con.execute("""
CREATE TABLE IF NOT EXISTS thread_allocation (
thread_id BIGINT,
cpu_id INTEGER,
core_id INTEGER,
thread_local_id INTEGER,
task_type VARCHAR, -- UI, Compute, IO, Background
is_active BOOLEAN,
created_at TIMESTAMP,
last_active TIMESTAMP,
PRIMARY KEY (thread_id)
)
""")
# Memory Segments Table
con.execute("""
CREATE TABLE IF NOT EXISTS memory_segments (
segment_id BIGINT,
start_address BIGINT,
size BIGINT,
cpu_id INTEGER,
core_id INTEGER,
thread_id INTEGER,
allocation_type VARCHAR,
is_shared BOOLEAN,
created_at TIMESTAMP,
PRIMARY KEY (segment_id)
)
""")
# Instruction Queue Table
con.execute("""
CREATE TABLE IF NOT EXISTS instruction_queue (
instruction_id BIGINT,
cpu_id INTEGER,
core_id INTEGER,
thread_id INTEGER,
instruction JSON,
priority INTEGER,
status VARCHAR, -- pending, running, completed, failed
created_at TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
PRIMARY KEY (instruction_id)
)
""")
# Create indexes for performance
con.execute("CREATE INDEX IF NOT EXISTS idx_cpu_states ON cpu_states(cpu_id, timestamp)")
con.execute("CREATE INDEX IF NOT EXISTS idx_thread_allocation ON thread_allocation(cpu_id, is_active)")
con.execute("CREATE INDEX IF NOT EXISTS idx_instruction_queue ON instruction_queue(status, priority)")
return con
def allocate_cpu_group(self, start_id: int, end_id: int, task_type: str):
"""Allocate a group of CPUs for specific task type"""
self.con.execute("""
INSERT INTO thread_allocation (
thread_id, cpu_id, core_id, thread_local_id,
task_type, is_active, created_at, last_active
)
SELECT
(c.cpu_id * 5000 + co.core_id * 100 + t.thread_id) as thread_id,
c.cpu_id,
co.core_id,
t.thread_id as thread_local_id,
?,
TRUE,
CURRENT_TIMESTAMP,
CURRENT_TIMESTAMP
FROM
(SELECT generate_series(?, ?) as cpu_id) c
CROSS JOIN (SELECT generate_series(0, 49) as core_id) co
CROSS JOIN (SELECT generate_series(0, 99) as thread_id) t
""", (task_type, start_id, end_id))
logger.info(f"Allocated CPUs {start_id}-{end_id} for {task_type}")
async def update_cpu_state(self, cpu_id: int, core_id: int, thread_id: int, state: Dict):
"""Update state for a specific CPU thread"""
self.con.execute("""
INSERT INTO cpu_states (cpu_id, core_id, thread_id, state, last_instruction, timestamp)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT (cpu_id, core_id, thread_id) DO UPDATE
SET state = EXCLUDED.state,
last_instruction = EXCLUDED.last_instruction,
timestamp = EXCLUDED.timestamp
""", (cpu_id, core_id, thread_id, json.dumps(state), state.get('instruction_ptr', 0)))
async def get_cpu_state(self, cpu_id: int, core_id: int, thread_id: int) -> Dict:
"""Get current state for a specific CPU thread"""
result = self.con.execute("""
SELECT state
FROM cpu_states
WHERE cpu_id = ? AND core_id = ? AND thread_id = ?
""", (cpu_id, core_id, thread_id)).fetchone()
return json.loads(result[0]) if result else None
async def queue_instruction(self, cpu_id: int, instruction: Dict, priority: int = 0):
"""Queue an instruction for execution"""
self.con.execute("""
INSERT INTO instruction_queue (
instruction_id, cpu_id, core_id, thread_id,
instruction, priority, status, created_at
)
SELECT
nextval('instruction_seq'),
?,
core_id,
thread_id,
?,
?,
'pending',
CURRENT_TIMESTAMP
FROM thread_allocation
WHERE cpu_id = ? AND is_active = TRUE
ORDER BY RANDOM()
LIMIT 1
""", (cpu_id, json.dumps(instruction), priority, cpu_id))
async def process_instruction_queue(self):
"""Process pending instructions"""
while True:
# Get next batch of instructions
instructions = self.con.execute("""
SELECT
instruction_id, cpu_id, core_id, thread_id, instruction
FROM instruction_queue
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT 1000
""").fetchall()
if not instructions:
await asyncio.sleep(0.1)
continue
# Process instructions
for inst in instructions:
instruction_id, cpu_id, core_id, thread_id, instruction = inst
try:
# Mark as running
self.con.execute("""
UPDATE instruction_queue
SET status = 'running', started_at = CURRENT_TIMESTAMP
WHERE instruction_id = ?
""", (instruction_id,))
# Execute instruction (placeholder)
await self.execute_instruction(cpu_id, core_id, thread_id, json.loads(instruction))
# Mark as completed
self.con.execute("""
UPDATE instruction_queue
SET status = 'completed', completed_at = CURRENT_TIMESTAMP
WHERE instruction_id = ?
""", (instruction_id,))
except Exception as e:
logger.error(f"Error processing instruction {instruction_id}: {e}")
self.con.execute("""
UPDATE instruction_queue
SET status = 'failed', completed_at = CURRENT_TIMESTAMP
WHERE instruction_id = ?
""", (instruction_id,))
await asyncio.sleep(0.01) # Prevent tight loop
async def execute_instruction(self, cpu_id: int, core_id: int, thread_id: int, instruction: Dict):
"""Execute a single instruction (placeholder)"""
# Get current state
state = await self.get_cpu_state(cpu_id, core_id, thread_id) or {}
# Update state based on instruction
state['instruction_ptr'] = state.get('instruction_ptr', 0) + 1
state['last_instruction'] = instruction
# Save updated state
await self.update_cpu_state(cpu_id, core_id, thread_id, state)
def initialize_cpu_grid(self):
"""Initialize the complete CPU grid with task assignments"""
# UI/Display Operations (0-499)
self.allocate_cpu_group(0, 499, "UI")
# Computation/Processing (500-999)
self.allocate_cpu_group(500, 999, "COMPUTE")
# I/O & Storage Operations (1000-1499)
self.allocate_cpu_group(1000, 1499, "IO")
# Background/System Tasks (1500-1999)
self.allocate_cpu_group(1500, 1999, "BACKGROUND")
logger.info("CPU grid initialized with all task groups")
async def main():
"""Test the CPU state manager"""
manager = CPUStateManager()
manager.initialize_cpu_grid()
# Start instruction processor
asyncio.create_task(manager.process_instruction_queue())
# Test queueing some instructions
await manager.queue_instruction(0, {"type": "UI_UPDATE", "component": "window1"}, priority=1)
await manager.queue_instruction(500, {"type": "MATRIX_MULTIPLY", "size": 1000}, priority=2)
# Keep running
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())