knowledge-assistant-backend / scripts /verify-data-persistence.py
AnuragShirke's picture
Add deployment changes for hosting
edd9bd7
#!/usr/bin/env python3
"""
Data Persistence Verification Script
This script verifies data persistence across the Knowledge Assistant RAG application,
including database integrity, vector store consistency, and backup validation.
"""
import os
import sys
import json
import sqlite3
import asyncio
import logging
import argparse
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Any, Optional
# Add the src directory to the Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from core.database import User, DocumentMetadata
from core.vector_store import get_qdrant_client
from core.backup import backup_manager
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DataPersistenceVerifier:
"""Comprehensive data persistence verification system"""
def __init__(self, database_path: str = "knowledge_assistant.db"):
self.database_path = database_path
self.verification_results = {}
def verify_database_integrity(self) -> Dict[str, Any]:
"""Verify SQLite database integrity"""
logger.info("Verifying database integrity...")
result = {
"test": "database_integrity",
"status": "unknown",
"details": {},
"errors": []
}
try:
if not os.path.exists(self.database_path):
result["status"] = "failed"
result["errors"].append(f"Database file not found: {self.database_path}")
return result
# Connect to database
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
# Check database integrity
cursor.execute("PRAGMA integrity_check")
integrity_result = cursor.fetchone()[0]
if integrity_result == "ok":
result["details"]["integrity_check"] = "passed"
else:
result["status"] = "failed"
result["errors"].append(f"Database integrity check failed: {integrity_result}")
conn.close()
return result
# Check table existence
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
expected_tables = ["users", "documents", "alembic_version"]
missing_tables = [table for table in expected_tables if table not in tables]
if missing_tables:
result["status"] = "failed"
result["errors"].append(f"Missing tables: {missing_tables}")
conn.close()
return result
result["details"]["tables"] = tables
# Check record counts
cursor.execute("SELECT COUNT(*) FROM users")
user_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM documents")
doc_count = cursor.fetchone()[0]
result["details"]["user_count"] = user_count
result["details"]["document_count"] = doc_count
# Check for orphaned documents (documents without users)
cursor.execute("""
SELECT COUNT(*) FROM documents d
LEFT JOIN users u ON d.user_id = u.id
WHERE u.id IS NULL
""")
orphaned_docs = cursor.fetchone()[0]
if orphaned_docs > 0:
result["errors"].append(f"Found {orphaned_docs} orphaned documents")
result["details"]["orphaned_documents"] = orphaned_docs
# Check for duplicate file hashes
cursor.execute("""
SELECT file_hash, COUNT(*) as count
FROM documents
WHERE file_hash IS NOT NULL
GROUP BY file_hash
HAVING COUNT(*) > 1
""")
duplicate_hashes = cursor.fetchall()
if duplicate_hashes:
result["details"]["duplicate_file_hashes"] = len(duplicate_hashes)
result["errors"].append(f"Found {len(duplicate_hashes)} duplicate file hashes")
else:
result["details"]["duplicate_file_hashes"] = 0
conn.close()
if not result["errors"]:
result["status"] = "passed"
else:
result["status"] = "warning"
logger.info(f"Database integrity check: {result['status']}")
except Exception as e:
result["status"] = "failed"
result["errors"].append(f"Database verification failed: {str(e)}")
logger.error(f"Database verification error: {str(e)}")
return result
async def verify_vector_store_consistency(self) -> Dict[str, Any]:
"""Verify Qdrant vector store consistency"""
logger.info("Verifying vector store consistency...")
result = {
"test": "vector_store_consistency",
"status": "unknown",
"details": {},
"errors": []
}
try:
client = get_qdrant_client()
# Get collections info
collections_info = client.get_collections()
collections = collections_info.collections
result["details"]["total_collections"] = len(collections)
result["details"]["collections"] = []
total_points = 0
for collection in collections:
collection_name = collection.name
try:
# Get collection info
collection_info = client.get_collection(collection_name)
points_count = collection_info.points_count
# Get sample points to verify structure
sample_points, _ = client.scroll(
collection_name=collection_name,
limit=10,
with_payload=True,
with_vectors=True
)
collection_details = {
"name": collection_name,
"points_count": points_count,
"sample_points": len(sample_points),
"status": "healthy"
}
# Verify point structure
if sample_points:
first_point = sample_points[0]
# Check if point has required fields
if not hasattr(first_point, 'payload') or not first_point.payload:
collection_details["status"] = "warning"
result["errors"].append(f"Collection {collection_name}: Points missing payload")
if not hasattr(first_point, 'vector') or not first_point.vector:
collection_details["status"] = "warning"
result["errors"].append(f"Collection {collection_name}: Points missing vectors")
# Check payload structure for user collections
if collection_name.startswith("user_") and first_point.payload:
required_fields = ["text", "source", "user_id"]
missing_fields = [field for field in required_fields if field not in first_point.payload]
if missing_fields:
collection_details["status"] = "warning"
result["errors"].append(f"Collection {collection_name}: Missing payload fields: {missing_fields}")
result["details"]["collections"].append(collection_details)
total_points += points_count
except Exception as e:
collection_details = {
"name": collection_name,
"status": "error",
"error": str(e)
}
result["details"]["collections"].append(collection_details)
result["errors"].append(f"Collection {collection_name}: {str(e)}")
result["details"]["total_points"] = total_points
# Cross-reference with database
if os.path.exists(self.database_path):
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
# Get user count from database
cursor.execute("SELECT COUNT(DISTINCT user_id) FROM documents")
db_users_with_docs = cursor.fetchone()[0]
# Count user collections in Qdrant
user_collections = [c for c in collections if c.name.startswith("user_")]
qdrant_user_collections = len(user_collections)
result["details"]["db_users_with_documents"] = db_users_with_docs
result["details"]["qdrant_user_collections"] = qdrant_user_collections
# Check for consistency
if db_users_with_docs != qdrant_user_collections:
result["errors"].append(
f"Mismatch between database users with documents ({db_users_with_docs}) "
f"and Qdrant user collections ({qdrant_user_collections})"
)
conn.close()
if not result["errors"]:
result["status"] = "passed"
else:
result["status"] = "warning"
logger.info(f"Vector store consistency check: {result['status']}")
except Exception as e:
result["status"] = "failed"
result["errors"].append(f"Vector store verification failed: {str(e)}")
logger.error(f"Vector store verification error: {str(e)}")
return result
async def verify_backup_integrity(self) -> Dict[str, Any]:
"""Verify backup integrity and completeness"""
logger.info("Verifying backup integrity...")
result = {
"test": "backup_integrity",
"status": "unknown",
"details": {},
"errors": []
}
try:
# List available backups
backups = await backup_manager.list_backups()
result["details"]["total_backups"] = len(backups)
result["details"]["backups"] = []
if not backups:
result["status"] = "warning"
result["errors"].append("No backups found")
return result
# Verify each backup
verified_count = 0
failed_count = 0
for backup in backups:
backup_details = {
"backup_id": backup.backup_id,
"timestamp": backup.timestamp.isoformat(),
"file_size_bytes": backup.file_size_bytes,
"status": backup.status
}
try:
# Verify backup integrity
is_valid = await backup_manager.verify_backup_integrity(backup.backup_id)
if is_valid:
backup_details["integrity"] = "valid"
verified_count += 1
else:
backup_details["integrity"] = "invalid"
failed_count += 1
result["errors"].append(f"Backup {backup.backup_id} failed integrity check")
except Exception as e:
backup_details["integrity"] = "error"
backup_details["error"] = str(e)
failed_count += 1
result["errors"].append(f"Backup {backup.backup_id} verification error: {str(e)}")
result["details"]["backups"].append(backup_details)
result["details"]["verified_backups"] = verified_count
result["details"]["failed_backups"] = failed_count
# Check backup freshness
if backups:
latest_backup = max(backups, key=lambda b: b.timestamp)
days_since_backup = (datetime.utcnow() - latest_backup.timestamp).days
result["details"]["days_since_latest_backup"] = days_since_backup
if days_since_backup > 7:
result["errors"].append(f"Latest backup is {days_since_backup} days old")
if failed_count == 0 and not result["errors"]:
result["status"] = "passed"
elif failed_count < len(backups):
result["status"] = "warning"
else:
result["status"] = "failed"
logger.info(f"Backup integrity check: {result['status']}")
except Exception as e:
result["status"] = "failed"
result["errors"].append(f"Backup verification failed: {str(e)}")
logger.error(f"Backup verification error: {str(e)}")
return result
def verify_file_system_integrity(self) -> Dict[str, Any]:
"""Verify file system integrity and permissions"""
logger.info("Verifying file system integrity...")
result = {
"test": "file_system_integrity",
"status": "unknown",
"details": {},
"errors": []
}
try:
# Check critical directories and files
critical_paths = [
{"path": ".", "type": "directory", "name": "application_root"},
{"path": self.database_path, "type": "file", "name": "database"},
{"path": "uploads", "type": "directory", "name": "uploads_directory"},
{"path": "backups", "type": "directory", "name": "backups_directory"},
{"path": "src", "type": "directory", "name": "source_code"},
]
result["details"]["path_checks"] = []
for path_info in critical_paths:
path = path_info["path"]
path_type = path_info["type"]
name = path_info["name"]
check_result = {
"name": name,
"path": path,
"type": path_type,
"exists": False,
"readable": False,
"writable": False
}
if os.path.exists(path):
check_result["exists"] = True
# Check permissions
check_result["readable"] = os.access(path, os.R_OK)
check_result["writable"] = os.access(path, os.W_OK)
if path_type == "directory":
check_result["executable"] = os.access(path, os.X_OK)
# Get size information
if path_type == "file":
check_result["size_bytes"] = os.path.getsize(path)
elif path_type == "directory":
try:
# Count files in directory
file_count = len([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])
check_result["file_count"] = file_count
except PermissionError:
check_result["file_count"] = "permission_denied"
else:
result["errors"].append(f"Critical path missing: {path} ({name})")
result["details"]["path_checks"].append(check_result)
# Check disk space
try:
import shutil
total, used, free = shutil.disk_usage(".")
result["details"]["disk_usage"] = {
"total_bytes": total,
"used_bytes": used,
"free_bytes": free,
"free_gb": free / (1024**3),
"usage_percent": (used / total) * 100
}
# Check if disk space is critically low
free_gb = free / (1024**3)
if free_gb < 1.0:
result["errors"].append(f"Critical: Only {free_gb:.2f} GB free disk space")
elif free_gb < 5.0:
result["errors"].append(f"Warning: Only {free_gb:.2f} GB free disk space")
except Exception as e:
result["errors"].append(f"Could not check disk usage: {str(e)}")
if not result["errors"]:
result["status"] = "passed"
else:
result["status"] = "warning"
logger.info(f"File system integrity check: {result['status']}")
except Exception as e:
result["status"] = "failed"
result["errors"].append(f"File system verification failed: {str(e)}")
logger.error(f"File system verification error: {str(e)}")
return result
async def run_comprehensive_verification(self) -> Dict[str, Any]:
"""Run all verification tests"""
logger.info("Starting comprehensive data persistence verification...")
start_time = datetime.utcnow()
# Run all verification tests
tests = [
self.verify_database_integrity(),
await self.verify_vector_store_consistency(),
await self.verify_backup_integrity(),
self.verify_file_system_integrity()
]
# Collect results
verification_results = {
"timestamp": start_time.isoformat(),
"duration_seconds": (datetime.utcnow() - start_time).total_seconds(),
"overall_status": "unknown",
"tests": tests,
"summary": {
"total_tests": len(tests),
"passed": 0,
"warnings": 0,
"failed": 0
}
}
# Calculate summary
for test in tests:
if test["status"] == "passed":
verification_results["summary"]["passed"] += 1
elif test["status"] == "warning":
verification_results["summary"]["warnings"] += 1
elif test["status"] == "failed":
verification_results["summary"]["failed"] += 1
# Determine overall status
if verification_results["summary"]["failed"] > 0:
verification_results["overall_status"] = "failed"
elif verification_results["summary"]["warnings"] > 0:
verification_results["overall_status"] = "warning"
else:
verification_results["overall_status"] = "passed"
logger.info(f"Verification completed: {verification_results['overall_status']}")
return verification_results
def main():
"""Main function"""
parser = argparse.ArgumentParser(description="Data Persistence Verification Tool")
parser.add_argument("--database", default="knowledge_assistant.db", help="Database file path")
parser.add_argument("--output", help="Output file for results (JSON)")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Create verifier
verifier = DataPersistenceVerifier(args.database)
# Run verification
try:
results = asyncio.run(verifier.run_comprehensive_verification())
# Output results
if args.output:
with open(args.output, 'w') as f:
json.dump(results, f, indent=2)
print(f"Results saved to: {args.output}")
else:
print(json.dumps(results, indent=2))
# Exit with appropriate code
if results["overall_status"] == "failed":
sys.exit(1)
elif results["overall_status"] == "warning":
sys.exit(2)
else:
sys.exit(0)
except Exception as e:
logger.error(f"Verification failed: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()