""" Database Example Script for Direct HuggingFace DuckDB Connection """ import duckdb import numpy as np from datetime import datetime import os # Initialize HuggingFace token from environment HF_TOKEN = os.getenv("HF_TOKEN") DB_URL = "hf://datasets/Fred808/helium/storage.json" def connect_to_db(): """Connect directly to the HuggingFace DuckDB database""" try: # Connect directly to HuggingFace URL conn = duckdb.connect(DB_URL) print(f"Connected to HuggingFace database: {DB_URL}") return conn except Exception as e: print(f"Error connecting to database: {e}") return None def test_db_connection(): """Test connecting to the database and performing basic operations""" try: # Connect directly to HuggingFace database conn = connect_to_db() if not conn: return # Query existing tables print("\nQuerying existing tables...") tables = conn.execute("SHOW TABLES").fetchall() if tables: print("Existing tables:") for table in tables: print(f"- {table[0]}") else: print("No existing tables found") # Create tables if they don't exist print("\nCreating tables...") conn.execute(""" CREATE TABLE IF NOT EXISTS vram_blocks ( block_id VARCHAR PRIMARY KEY, size_bytes BIGINT, allocation_time TIMESTAMP, data BLOB, metadata JSON, status VARCHAR ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS memory_mappings ( virtual_address BIGINT PRIMARY KEY, block_id VARCHAR, mapping_time TIMESTAMP, FOREIGN KEY (block_id) REFERENCES vram_blocks(block_id) ) """) print("Tables created successfully") # Run some test queries print("\nRunning test queries...") # Query 1: Count blocks count = conn.execute("SELECT COUNT(*) FROM vram_blocks").fetchone()[0] print(f"\nTotal VRAM blocks: {count}") # Query 2: Show memory mappings mappings = conn.execute(""" SELECT m.virtual_address, m.block_id, b.size_bytes, b.status FROM memory_mappings m JOIN vram_blocks b ON m.block_id = b.block_id """).fetchall() if mappings: print("\nMemory Mappings:") for mapping in mappings: print(f"Virtual Address: 0x{mapping[0]:X}") print(f"Block ID: {mapping[1]}") print(f"Block Size: {mapping[2]} bytes") print(f"Block Status: {mapping[3]}") else: print("\nNo memory mappings found") # Query 3: Get largest blocks largest_blocks = conn.execute(""" SELECT block_id, size_bytes, status, allocation_time FROM vram_blocks ORDER BY size_bytes DESC LIMIT 5 """).fetchall() if largest_blocks: print("\nLargest VRAM blocks:") for block in largest_blocks: print(f"Block ID: {block[0]}") print(f"Size: {block[1]} bytes") print(f"Status: {block[2]}") print(f"Allocated: {block[3]}") print() else: print("\nNo VRAM blocks found") # Cleanup conn.close() print("\nDatabase connection closed") except Exception as e: print(f"Error: {str(e)}") if 'conn' in locals(): conn.close() if __name__ == "__main__": test_db_connection() def connect_to_db(): """Connect directly to the HuggingFace DuckDB database""" try: # Connect directly to HuggingFace URL conn = duckdb.connect(DB_URL) print(f"Connected to HuggingFace database: {DB_URL}") return conn except Exception as e: print(f"Error connecting to database: {e}") return None def test_db_connection(): """Test connecting to the database and performing basic operations""" try: # Connect directly to HuggingFace database conn = connect_to_db() if not conn: return # Create a test table for VRAM blocks conn.execute(""" CREATE TABLE IF NOT EXISTS vram_blocks ( block_id VARCHAR PRIMARY KEY, size_bytes BIGINT, allocation_time TIMESTAMP, data BLOB, metadata JSON, status VARCHAR ) """) # Create a test table for memory mappings conn.execute(""" CREATE TABLE IF NOT EXISTS memory_mappings ( virtual_address BIGINT PRIMARY KEY, block_id VARCHAR, mapping_time TIMESTAMP, FOREIGN KEY (block_id) REFERENCES vram_blocks(block_id) ) """) # Insert some test data test_data = np.random.rand(100, 100).astype(np.float32) metadata = { "shape": [100, 100], "dtype": "float32", "description": "Test array" } print("\nInserting test data...") conn.execute(""" INSERT INTO vram_blocks ( block_id, size_bytes, allocation_time, data, metadata, status ) VALUES (?, ?, ?, ?, ?, ?) """, [ "test_block_1", test_data.nbytes, datetime.now(), test_data.tobytes(), metadata, "allocated" ]) # Test some queries print("\nRunning test queries...") # Query 1: Get block info print("\nQuery 1: Block Information") result = conn.execute(""" SELECT block_id, size_bytes, allocation_time, metadata->>'shape' as array_shape, metadata->>'dtype' as data_type, status FROM vram_blocks WHERE block_id = 'test_block_1' """).fetchall() for row in result: print(f"Block ID: {row[0]}") print(f"Size: {row[1]} bytes") print(f"Allocated: {row[2]}") print(f"Array Shape: {row[3]}") print(f"Data Type: {row[4]}") print(f"Status: {row[5]}") # Query 2: Test retrieving and reconstructing numpy array print("\nQuery 2: Data Retrieval Test") binary_result = conn.execute(""" SELECT data FROM vram_blocks WHERE block_id = 'test_block_1' """).fetchone() if binary_result: restored_array = np.frombuffer(binary_result[0], dtype=np.float32).reshape(100, 100) print("Successfully restored numpy array:") print(f"Shape: {restored_array.shape}") print(f"Data type: {restored_array.dtype}") print(f"Sample values:\n{restored_array[:2, :2]}") # Query 3: Show table schemas print("\nQuery 3: Database Schema") print("\nvram_blocks table schema:") schema = conn.execute("DESCRIBE vram_blocks").fetchall() for col in schema: print(f"Column: {col[0]}, Type: {col[1]}") print("\nmemory_mappings table schema:") schema = conn.execute("DESCRIBE memory_mappings").fetchall() for col in schema: print(f"Column: {col[0]}, Type: {col[1]}") # Query 4: Test memory mapping print("\nQuery 4: Testing Memory Mapping") conn.execute(""" INSERT INTO memory_mappings ( virtual_address, block_id, mapping_time ) VALUES (?, ?, ?) """, [ 0x1000000, # Example virtual address "test_block_1", datetime.now() ]) mappings = conn.execute(""" SELECT m.virtual_address, m.block_id, b.size_bytes, b.status FROM memory_mappings m JOIN vram_blocks b ON m.block_id = b.block_id """).fetchall() print("\nMemory Mappings:") for mapping in mappings: print(f"Virtual Address: 0x{mapping[0]:X}") print(f"Block ID: {mapping[1]}") print(f"Block Size: {mapping[2]} bytes") print(f"Block Status: {mapping[3]}") # Cleanup conn.close() print("\nDatabase connection closed") except Exception as e: print(f"Error: {str(e)}") if 'conn' in locals(): conn.close() if __name__ == "__main__": test_db_connection()