CapStoneRAG10 / rebuild_sqlite_direct.py
Developer
Initial commit for HuggingFace Spaces - RAG Capstone Project with Qdrant Cloud
1d10b0a
"""Direct SQLite rebuild - populate collections table from existing folders."""
import os
import sys
import sqlite3
import uuid as uuid_lib
import json
import pickle
from datetime import datetime
def extract_collection_name(collection_path, collection_id):
"""Extract collection name from metadata files.
Tries multiple approaches:
1. Read from metadata.json
2. Parse ChromaDB index_metadata.pickle
3. Query from segments table (will be done later)
4. Generate name from UUID
"""
# Try metadata.json
metadata_file = os.path.join(collection_path, "metadata.json")
if os.path.exists(metadata_file):
try:
with open(metadata_file, 'r') as f:
metadata = json.load(f)
if isinstance(metadata, dict) and 'name' in metadata:
name = metadata['name']
print(f" 📖 Found in metadata.json: {name}")
return name
except Exception as e:
print(f" ⚠️ Error reading metadata.json: {e}")
# Try to parse index_metadata.pickle
index_file = os.path.join(collection_path, "index_metadata.pickle")
if os.path.exists(index_file):
try:
with open(index_file, 'rb') as f:
index_data = pickle.load(f)
# Check if it contains collection name
if isinstance(index_data, dict):
if 'name' in index_data:
name = index_data['name']
print(f" 📖 Found in pickle: {name}")
return name
# Log available keys for debugging
keys = list(index_data.keys())[:5]
print(f" 📋 Pickle keys: {keys}")
except Exception as e:
print(f" ⚠️ Error reading pickle: {e}")
# Generate name from UUID
generated_name = f"collection_{collection_id[:8]}"
print(f" 📝 Using generated name: {generated_name}")
return generated_name
def rebuild_sqlite_directly():
"""Directly rebuild SQLite collections table from existing collection folders.
This script:
1. Reads existing collection folders
2. Inserts entries into SQLite collections table
3. Verifies ChromaDB can find them
"""
print("\n" + "=" * 80)
print("🔧 Direct SQLite Collections Table Rebuild")
print("=" * 80)
chroma_path = "./chroma_db"
sqlite_path = os.path.join(chroma_path, "chroma.sqlite3")
# Step 1: Find all collection folders
print("\n📁 Step 1: Scanning collection folders...")
print("-" * 80)
collections = []
try:
for item in os.listdir(chroma_path):
item_path = os.path.join(chroma_path, item)
# UUID folder check
if os.path.isdir(item_path) and len(item) == 36 and item.count('-') == 4:
print(f"\n✅ Found collection: {item}")
# Extract the actual collection name
collection_name = extract_collection_name(item_path, item)
collections.append({
'uuid': item,
'name': collection_name,
'path': item_path,
'files': os.listdir(item_path)
})
except Exception as e:
print(f"❌ Error scanning: {e}")
return False
if len(collections) == 0:
print("⚠️ No collections found")
return True
print(f"\n✅ Total collections: {len(collections)}")
# Step 2: Connect to SQLite
print("\n💾 Step 2: Connecting to SQLite...")
print("-" * 80)
if not os.path.exists(sqlite_path):
print("❌ ERROR: chroma.sqlite3 not found!")
print(" Run 'streamlit run streamlit_app.py' first to create it")
return False
try:
conn = sqlite3.connect(sqlite_path)
cursor = conn.cursor()
print("✅ Connected to SQLite")
except Exception as e:
print(f"❌ Error connecting to SQLite: {e}")
return False
# Step 3: Check collections table schema
print("\n🔍 Step 3: Checking collections table schema...")
print("-" * 80)
try:
cursor.execute("PRAGMA table_info(collections)")
columns = cursor.fetchall()
print("✅ Collections table schema:")
for col in columns:
col_id, col_name, col_type, not_null, default, pk = col
print(f" • {col_name} ({col_type})")
except Exception as e:
print(f"❌ Error reading schema: {e}")
conn.close()
return False
# Step 4: Get database and tenant IDs
print("\n🔐 Step 4: Getting database and tenant IDs...")
print("-" * 80)
try:
# Get default tenant
cursor.execute("SELECT id FROM tenants LIMIT 1")
tenant_result = cursor.fetchone()
if tenant_result:
tenant_id = tenant_result[0]
print(f"✅ Tenant ID: {tenant_id}")
else:
# Create default tenant
tenant_id = str(uuid_lib.uuid4())
cursor.execute("INSERT INTO tenants (id, name) VALUES (?, ?)",
(tenant_id, 'default'))
conn.commit()
print(f"✅ Created Tenant ID: {tenant_id}")
# Get default database
cursor.execute("SELECT id FROM databases WHERE tenant_id = ? LIMIT 1", (tenant_id,))
db_result = cursor.fetchone()
if db_result:
db_id = db_result[0]
print(f"✅ Database ID: {db_id}")
else:
# Create default database
db_id = str(uuid_lib.uuid4())
cursor.execute(
"INSERT INTO databases (id, name, tenant_id) VALUES (?, ?, ?)",
(db_id, 'default', tenant_id)
)
conn.commit()
print(f"✅ Created Database ID: {db_id}")
except Exception as e:
print(f"❌ Error getting IDs: {e}")
conn.close()
return False
# Step 4b: Try to extract collection names from segments table
print("\n🔍 Step 4b: Checking for collection names in segments table...")
print("-" * 80)
segment_names = {}
try:
cursor.execute("""
SELECT DISTINCT collection_id, metadata
FROM segment_metadata
WHERE collection_id IN ({})
""".format(','.join(['?' for _ in collections])),
[c['uuid'] for c in collections])
for collection_id, metadata_str in cursor.fetchall():
try:
if metadata_str:
metadata = json.loads(metadata_str)
if isinstance(metadata, dict) and 'name' in metadata:
segment_names[collection_id] = metadata['name']
print(f"✅ Found name in segments: {metadata['name']}")
except:
pass
except Exception as e:
print(f"ℹ️ Could not query segments: {e}")
# Update collection names from segments if found
for collection in collections:
if collection['uuid'] in segment_names:
collection['name'] = segment_names[collection['uuid']]
print(f" Updated {collection['uuid'][:8]} -> {collection['name']}")
# Step 5: Insert collection records
print("\n📝 Step 5: Inserting collection records into SQLite...")
print("-" * 80)
inserted_count = 0
for collection in collections:
collection_id = collection['uuid']
collection_name = collection['name'] # Use extracted/generated name
try:
cursor.execute("""
INSERT OR REPLACE INTO collections
(id, name, database_id)
VALUES (?, ?, ?)
""", (collection_id, collection_name, db_id))
inserted_count += 1
print(f"✅ Inserted: {collection_name}")
print(f" ID: {collection_id}")
except Exception as e:
print(f"⚠️ Could not insert {collection_id}: {e}")
# Step 6: Commit changes
print("\n💾 Step 6: Committing changes to SQLite...")
print("-" * 80)
try:
conn.commit()
print(f"✅ Committed {inserted_count} collection record(s)")
except Exception as e:
print(f"❌ Error committing: {e}")
conn.close()
return False
# Step 7: Verify
print("\n✅ Step 7: Verifying collections in SQLite...")
print("-" * 80)
try:
cursor.execute("SELECT id, name FROM collections")
verified = cursor.fetchall()
print(f"✅ Collections in SQLite: {len(verified)}")
for collection_id, name in verified:
print(f" • {name}")
conn.close()
if len(verified) > 0:
return True
else:
return False
except Exception as e:
print(f"❌ Error verifying: {e}")
conn.close()
return False
def main():
"""Main entry point."""
try:
success = rebuild_sqlite_directly()
print("\n" + "=" * 80)
if success:
print("✅ SQLITE REBUILD COMPLETE!")
print("\n📝 Next steps:")
print(" 1. Restart Streamlit: streamlit run streamlit_app.py")
print(" 2. Check 'Existing Collections' dropdown")
print(" 3. Your collections should now appear")
print(" 4. Load a collection and verify data is intact")
exit_code = 0
else:
print("❌ SQLITE REBUILD FAILED")
print("\n💡 Try manual approach:")
print(" 1. Delete chroma.sqlite3")
print(" 2. Restart Streamlit to create fresh database")
print(" 3. Manually re-upload datasets to recreate collections")
exit_code = 1
print("=" * 80 + "\n")
sys.exit(exit_code)
except Exception as e:
print(f"\n❌ FATAL ERROR: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()