|
|
"""
|
|
|
DuckDB Schema and State Management for Massive CPU Grid
|
|
|
"""
|
|
|
|
|
|
import duckdb
|
|
|
import json
|
|
|
from typing import Dict, List
|
|
|
from pathlib import Path
|
|
|
import asyncio
|
|
|
import logging
|
|
|
from datetime import datetime
|
|
|
from config import get_hf_token_cached
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CPUStateManager:
|
|
|
|
|
|
def __init__(self, db_path: str = "hf://datasets/Fred808/helium/storage.json"):
|
|
|
self.db_path = db_path
|
|
|
self.con = self.initialize_db()
|
|
|
|
|
|
def initialize_db(self) -> duckdb.DuckDBPyConnection:
|
|
|
"""Initialize DuckDB with our schema"""
|
|
|
con = duckdb.connect(self.db_path)
|
|
|
|
|
|
|
|
|
con.execute("INSTALL httpfs;")
|
|
|
con.execute("LOAD httpfs;")
|
|
|
con.execute("SET s3_endpoint='hf.co';")
|
|
|
con.execute("SET s3_use_ssl=true;")
|
|
|
con.execute("SET s3_url_style='path';")
|
|
|
con.execute(f"SET s3_access_key_id='{self.HF_TOKEN}';")
|
|
|
con.execute(f"SET s3_secret_access_key='{self.HF_TOKEN}';")
|
|
|
|
|
|
|
|
|
con.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS cpu_states (
|
|
|
cpu_id INTEGER,
|
|
|
core_id INTEGER,
|
|
|
thread_id INTEGER,
|
|
|
state JSON,
|
|
|
last_instruction BIGINT,
|
|
|
timestamp TIMESTAMP,
|
|
|
PRIMARY KEY (cpu_id, core_id, thread_id)
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
con.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS thread_allocation (
|
|
|
thread_id BIGINT,
|
|
|
cpu_id INTEGER,
|
|
|
core_id INTEGER,
|
|
|
thread_local_id INTEGER,
|
|
|
task_type VARCHAR, -- UI, Compute, IO, Background
|
|
|
is_active BOOLEAN,
|
|
|
created_at TIMESTAMP,
|
|
|
last_active TIMESTAMP,
|
|
|
PRIMARY KEY (thread_id)
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
con.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS memory_segments (
|
|
|
segment_id BIGINT,
|
|
|
start_address BIGINT,
|
|
|
size BIGINT,
|
|
|
cpu_id INTEGER,
|
|
|
core_id INTEGER,
|
|
|
thread_id INTEGER,
|
|
|
allocation_type VARCHAR,
|
|
|
is_shared BOOLEAN,
|
|
|
created_at TIMESTAMP,
|
|
|
PRIMARY KEY (segment_id)
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
con.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS instruction_queue (
|
|
|
instruction_id BIGINT,
|
|
|
cpu_id INTEGER,
|
|
|
core_id INTEGER,
|
|
|
thread_id INTEGER,
|
|
|
instruction JSON,
|
|
|
priority INTEGER,
|
|
|
status VARCHAR, -- pending, running, completed, failed
|
|
|
created_at TIMESTAMP,
|
|
|
started_at TIMESTAMP,
|
|
|
completed_at TIMESTAMP,
|
|
|
PRIMARY KEY (instruction_id)
|
|
|
)
|
|
|
""")
|
|
|
|
|
|
|
|
|
con.execute("CREATE INDEX IF NOT EXISTS idx_cpu_states ON cpu_states(cpu_id, timestamp)")
|
|
|
con.execute("CREATE INDEX IF NOT EXISTS idx_thread_allocation ON thread_allocation(cpu_id, is_active)")
|
|
|
con.execute("CREATE INDEX IF NOT EXISTS idx_instruction_queue ON instruction_queue(status, priority)")
|
|
|
|
|
|
return con
|
|
|
|
|
|
def allocate_cpu_group(self, start_id: int, end_id: int, task_type: str):
|
|
|
"""Allocate a group of CPUs for specific task type"""
|
|
|
self.con.execute("""
|
|
|
INSERT INTO thread_allocation (
|
|
|
thread_id, cpu_id, core_id, thread_local_id,
|
|
|
task_type, is_active, created_at, last_active
|
|
|
)
|
|
|
SELECT
|
|
|
(c.cpu_id * 5000 + co.core_id * 100 + t.thread_id) as thread_id,
|
|
|
c.cpu_id,
|
|
|
co.core_id,
|
|
|
t.thread_id as thread_local_id,
|
|
|
?,
|
|
|
TRUE,
|
|
|
CURRENT_TIMESTAMP,
|
|
|
CURRENT_TIMESTAMP
|
|
|
FROM
|
|
|
(SELECT generate_series(?, ?) as cpu_id) c
|
|
|
CROSS JOIN (SELECT generate_series(0, 49) as core_id) co
|
|
|
CROSS JOIN (SELECT generate_series(0, 99) as thread_id) t
|
|
|
""", (task_type, start_id, end_id))
|
|
|
|
|
|
logger.info(f"Allocated CPUs {start_id}-{end_id} for {task_type}")
|
|
|
|
|
|
async def update_cpu_state(self, cpu_id: int, core_id: int, thread_id: int, state: Dict):
|
|
|
"""Update state for a specific CPU thread"""
|
|
|
self.con.execute("""
|
|
|
INSERT INTO cpu_states (cpu_id, core_id, thread_id, state, last_instruction, timestamp)
|
|
|
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
|
|
ON CONFLICT (cpu_id, core_id, thread_id) DO UPDATE
|
|
|
SET state = EXCLUDED.state,
|
|
|
last_instruction = EXCLUDED.last_instruction,
|
|
|
timestamp = EXCLUDED.timestamp
|
|
|
""", (cpu_id, core_id, thread_id, json.dumps(state), state.get('instruction_ptr', 0)))
|
|
|
|
|
|
async def get_cpu_state(self, cpu_id: int, core_id: int, thread_id: int) -> Dict:
|
|
|
"""Get current state for a specific CPU thread"""
|
|
|
result = self.con.execute("""
|
|
|
SELECT state
|
|
|
FROM cpu_states
|
|
|
WHERE cpu_id = ? AND core_id = ? AND thread_id = ?
|
|
|
""", (cpu_id, core_id, thread_id)).fetchone()
|
|
|
|
|
|
return json.loads(result[0]) if result else None
|
|
|
|
|
|
async def queue_instruction(self, cpu_id: int, instruction: Dict, priority: int = 0):
|
|
|
"""Queue an instruction for execution"""
|
|
|
self.con.execute("""
|
|
|
INSERT INTO instruction_queue (
|
|
|
instruction_id, cpu_id, core_id, thread_id,
|
|
|
instruction, priority, status, created_at
|
|
|
)
|
|
|
SELECT
|
|
|
nextval('instruction_seq'),
|
|
|
?,
|
|
|
core_id,
|
|
|
thread_id,
|
|
|
?,
|
|
|
?,
|
|
|
'pending',
|
|
|
CURRENT_TIMESTAMP
|
|
|
FROM thread_allocation
|
|
|
WHERE cpu_id = ? AND is_active = TRUE
|
|
|
ORDER BY RANDOM()
|
|
|
LIMIT 1
|
|
|
""", (cpu_id, json.dumps(instruction), priority, cpu_id))
|
|
|
|
|
|
async def process_instruction_queue(self):
|
|
|
"""Process pending instructions"""
|
|
|
while True:
|
|
|
|
|
|
instructions = self.con.execute("""
|
|
|
SELECT
|
|
|
instruction_id, cpu_id, core_id, thread_id, instruction
|
|
|
FROM instruction_queue
|
|
|
WHERE status = 'pending'
|
|
|
ORDER BY priority DESC, created_at ASC
|
|
|
LIMIT 1000
|
|
|
""").fetchall()
|
|
|
|
|
|
if not instructions:
|
|
|
await asyncio.sleep(0.1)
|
|
|
continue
|
|
|
|
|
|
|
|
|
for inst in instructions:
|
|
|
instruction_id, cpu_id, core_id, thread_id, instruction = inst
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.con.execute("""
|
|
|
UPDATE instruction_queue
|
|
|
SET status = 'running', started_at = CURRENT_TIMESTAMP
|
|
|
WHERE instruction_id = ?
|
|
|
""", (instruction_id,))
|
|
|
|
|
|
|
|
|
await self.execute_instruction(cpu_id, core_id, thread_id, json.loads(instruction))
|
|
|
|
|
|
|
|
|
self.con.execute("""
|
|
|
UPDATE instruction_queue
|
|
|
SET status = 'completed', completed_at = CURRENT_TIMESTAMP
|
|
|
WHERE instruction_id = ?
|
|
|
""", (instruction_id,))
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing instruction {instruction_id}: {e}")
|
|
|
self.con.execute("""
|
|
|
UPDATE instruction_queue
|
|
|
SET status = 'failed', completed_at = CURRENT_TIMESTAMP
|
|
|
WHERE instruction_id = ?
|
|
|
""", (instruction_id,))
|
|
|
|
|
|
await asyncio.sleep(0.01)
|
|
|
|
|
|
async def execute_instruction(self, cpu_id: int, core_id: int, thread_id: int, instruction: Dict):
|
|
|
"""Execute a single instruction (placeholder)"""
|
|
|
|
|
|
state = await self.get_cpu_state(cpu_id, core_id, thread_id) or {}
|
|
|
|
|
|
|
|
|
state['instruction_ptr'] = state.get('instruction_ptr', 0) + 1
|
|
|
state['last_instruction'] = instruction
|
|
|
|
|
|
|
|
|
await self.update_cpu_state(cpu_id, core_id, thread_id, state)
|
|
|
|
|
|
def initialize_cpu_grid(self):
|
|
|
"""Initialize the complete CPU grid with task assignments"""
|
|
|
|
|
|
self.allocate_cpu_group(0, 499, "UI")
|
|
|
|
|
|
|
|
|
self.allocate_cpu_group(500, 999, "COMPUTE")
|
|
|
|
|
|
|
|
|
self.allocate_cpu_group(1000, 1499, "IO")
|
|
|
|
|
|
|
|
|
self.allocate_cpu_group(1500, 1999, "BACKGROUND")
|
|
|
|
|
|
logger.info("CPU grid initialized with all task groups")
|
|
|
|
|
|
async def main():
|
|
|
"""Test the CPU state manager"""
|
|
|
manager = CPUStateManager()
|
|
|
manager.initialize_cpu_grid()
|
|
|
|
|
|
|
|
|
asyncio.create_task(manager.process_instruction_queue())
|
|
|
|
|
|
|
|
|
await manager.queue_instruction(0, {"type": "UI_UPDATE", "component": "window1"}, priority=1)
|
|
|
await manager.queue_instruction(500, {"type": "MATRIX_MULTIPLY", "size": 1000}, priority=2)
|
|
|
|
|
|
|
|
|
while True:
|
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
asyncio.run(main())
|
|
|
|