INV / cpu /db_example.py
Fred808's picture
Upload 256 files
7a0c684 verified
"""
Database Example Script for Direct HuggingFace DuckDB Connection
"""
import duckdb
import numpy as np
from datetime import datetime
import os
# Initialize HuggingFace token from environment
HF_TOKEN = os.getenv("HF_TOKEN")
DB_URL = "hf://datasets/Fred808/helium/storage.json"
def connect_to_db():
"""Connect directly to the HuggingFace DuckDB database"""
try:
# Connect directly to HuggingFace URL
conn = duckdb.connect(DB_URL)
print(f"Connected to HuggingFace database: {DB_URL}")
return conn
except Exception as e:
print(f"Error connecting to database: {e}")
return None
def test_db_connection():
"""Test connecting to the database and performing basic operations"""
try:
# Connect directly to HuggingFace database
conn = connect_to_db()
if not conn:
return
# Query existing tables
print("\nQuerying existing tables...")
tables = conn.execute("SHOW TABLES").fetchall()
if tables:
print("Existing tables:")
for table in tables:
print(f"- {table[0]}")
else:
print("No existing tables found")
# Create tables if they don't exist
print("\nCreating tables...")
conn.execute("""
CREATE TABLE IF NOT EXISTS vram_blocks (
block_id VARCHAR PRIMARY KEY,
size_bytes BIGINT,
allocation_time TIMESTAMP,
data BLOB,
metadata JSON,
status VARCHAR
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS memory_mappings (
virtual_address BIGINT PRIMARY KEY,
block_id VARCHAR,
mapping_time TIMESTAMP,
FOREIGN KEY (block_id) REFERENCES vram_blocks(block_id)
)
""")
print("Tables created successfully")
# Run some test queries
print("\nRunning test queries...")
# Query 1: Count blocks
count = conn.execute("SELECT COUNT(*) FROM vram_blocks").fetchone()[0]
print(f"\nTotal VRAM blocks: {count}")
# Query 2: Show memory mappings
mappings = conn.execute("""
SELECT
m.virtual_address,
m.block_id,
b.size_bytes,
b.status
FROM memory_mappings m
JOIN vram_blocks b ON m.block_id = b.block_id
""").fetchall()
if mappings:
print("\nMemory Mappings:")
for mapping in mappings:
print(f"Virtual Address: 0x{mapping[0]:X}")
print(f"Block ID: {mapping[1]}")
print(f"Block Size: {mapping[2]} bytes")
print(f"Block Status: {mapping[3]}")
else:
print("\nNo memory mappings found")
# Query 3: Get largest blocks
largest_blocks = conn.execute("""
SELECT
block_id,
size_bytes,
status,
allocation_time
FROM vram_blocks
ORDER BY size_bytes DESC
LIMIT 5
""").fetchall()
if largest_blocks:
print("\nLargest VRAM blocks:")
for block in largest_blocks:
print(f"Block ID: {block[0]}")
print(f"Size: {block[1]} bytes")
print(f"Status: {block[2]}")
print(f"Allocated: {block[3]}")
print()
else:
print("\nNo VRAM blocks found")
# Cleanup
conn.close()
print("\nDatabase connection closed")
except Exception as e:
print(f"Error: {str(e)}")
if 'conn' in locals():
conn.close()
if __name__ == "__main__":
test_db_connection()
def connect_to_db():
"""Connect directly to the HuggingFace DuckDB database"""
try:
# Connect directly to HuggingFace URL
conn = duckdb.connect(DB_URL)
print(f"Connected to HuggingFace database: {DB_URL}")
return conn
except Exception as e:
print(f"Error connecting to database: {e}")
return None
def test_db_connection():
"""Test connecting to the database and performing basic operations"""
try:
# Connect directly to HuggingFace database
conn = connect_to_db()
if not conn:
return
# Create a test table for VRAM blocks
conn.execute("""
CREATE TABLE IF NOT EXISTS vram_blocks (
block_id VARCHAR PRIMARY KEY,
size_bytes BIGINT,
allocation_time TIMESTAMP,
data BLOB,
metadata JSON,
status VARCHAR
)
""")
# Create a test table for memory mappings
conn.execute("""
CREATE TABLE IF NOT EXISTS memory_mappings (
virtual_address BIGINT PRIMARY KEY,
block_id VARCHAR,
mapping_time TIMESTAMP,
FOREIGN KEY (block_id) REFERENCES vram_blocks(block_id)
)
""")
# Insert some test data
test_data = np.random.rand(100, 100).astype(np.float32)
metadata = {
"shape": [100, 100],
"dtype": "float32",
"description": "Test array"
}
print("\nInserting test data...")
conn.execute("""
INSERT INTO vram_blocks (
block_id,
size_bytes,
allocation_time,
data,
metadata,
status
)
VALUES (?, ?, ?, ?, ?, ?)
""", [
"test_block_1",
test_data.nbytes,
datetime.now(),
test_data.tobytes(),
metadata,
"allocated"
])
# Test some queries
print("\nRunning test queries...")
# Query 1: Get block info
print("\nQuery 1: Block Information")
result = conn.execute("""
SELECT
block_id,
size_bytes,
allocation_time,
metadata->>'shape' as array_shape,
metadata->>'dtype' as data_type,
status
FROM vram_blocks
WHERE block_id = 'test_block_1'
""").fetchall()
for row in result:
print(f"Block ID: {row[0]}")
print(f"Size: {row[1]} bytes")
print(f"Allocated: {row[2]}")
print(f"Array Shape: {row[3]}")
print(f"Data Type: {row[4]}")
print(f"Status: {row[5]}")
# Query 2: Test retrieving and reconstructing numpy array
print("\nQuery 2: Data Retrieval Test")
binary_result = conn.execute("""
SELECT data
FROM vram_blocks
WHERE block_id = 'test_block_1'
""").fetchone()
if binary_result:
restored_array = np.frombuffer(binary_result[0], dtype=np.float32).reshape(100, 100)
print("Successfully restored numpy array:")
print(f"Shape: {restored_array.shape}")
print(f"Data type: {restored_array.dtype}")
print(f"Sample values:\n{restored_array[:2, :2]}")
# Query 3: Show table schemas
print("\nQuery 3: Database Schema")
print("\nvram_blocks table schema:")
schema = conn.execute("DESCRIBE vram_blocks").fetchall()
for col in schema:
print(f"Column: {col[0]}, Type: {col[1]}")
print("\nmemory_mappings table schema:")
schema = conn.execute("DESCRIBE memory_mappings").fetchall()
for col in schema:
print(f"Column: {col[0]}, Type: {col[1]}")
# Query 4: Test memory mapping
print("\nQuery 4: Testing Memory Mapping")
conn.execute("""
INSERT INTO memory_mappings (
virtual_address,
block_id,
mapping_time
)
VALUES (?, ?, ?)
""", [
0x1000000, # Example virtual address
"test_block_1",
datetime.now()
])
mappings = conn.execute("""
SELECT
m.virtual_address,
m.block_id,
b.size_bytes,
b.status
FROM memory_mappings m
JOIN vram_blocks b ON m.block_id = b.block_id
""").fetchall()
print("\nMemory Mappings:")
for mapping in mappings:
print(f"Virtual Address: 0x{mapping[0]:X}")
print(f"Block ID: {mapping[1]}")
print(f"Block Size: {mapping[2]} bytes")
print(f"Block Status: {mapping[3]}")
# Cleanup
conn.close()
print("\nDatabase connection closed")
except Exception as e:
print(f"Error: {str(e)}")
if 'conn' in locals():
conn.close()
if __name__ == "__main__":
test_db_connection()