|
|
|
|
|
""" |
|
|
Data Persistence Verification Script |
|
|
|
|
|
This script verifies data persistence across the Knowledge Assistant RAG application, |
|
|
including database integrity, vector store consistency, and backup validation. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import sys |
|
|
import json |
|
|
import sqlite3 |
|
|
import asyncio |
|
|
import logging |
|
|
import argparse |
|
|
from datetime import datetime, timedelta |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Any, Optional |
|
|
|
|
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) |
|
|
|
|
|
from core.database import User, DocumentMetadata |
|
|
from core.vector_store import get_qdrant_client |
|
|
from core.backup import backup_manager |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class DataPersistenceVerifier: |
|
|
"""Comprehensive data persistence verification system""" |
|
|
|
|
|
def __init__(self, database_path: str = "knowledge_assistant.db"): |
|
|
self.database_path = database_path |
|
|
self.verification_results = {} |
|
|
|
|
|
def verify_database_integrity(self) -> Dict[str, Any]: |
|
|
"""Verify SQLite database integrity""" |
|
|
logger.info("Verifying database integrity...") |
|
|
|
|
|
result = { |
|
|
"test": "database_integrity", |
|
|
"status": "unknown", |
|
|
"details": {}, |
|
|
"errors": [] |
|
|
} |
|
|
|
|
|
try: |
|
|
if not os.path.exists(self.database_path): |
|
|
result["status"] = "failed" |
|
|
result["errors"].append(f"Database file not found: {self.database_path}") |
|
|
return result |
|
|
|
|
|
|
|
|
conn = sqlite3.connect(self.database_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute("PRAGMA integrity_check") |
|
|
integrity_result = cursor.fetchone()[0] |
|
|
|
|
|
if integrity_result == "ok": |
|
|
result["details"]["integrity_check"] = "passed" |
|
|
else: |
|
|
result["status"] = "failed" |
|
|
result["errors"].append(f"Database integrity check failed: {integrity_result}") |
|
|
conn.close() |
|
|
return result |
|
|
|
|
|
|
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") |
|
|
tables = [row[0] for row in cursor.fetchall()] |
|
|
|
|
|
expected_tables = ["users", "documents", "alembic_version"] |
|
|
missing_tables = [table for table in expected_tables if table not in tables] |
|
|
|
|
|
if missing_tables: |
|
|
result["status"] = "failed" |
|
|
result["errors"].append(f"Missing tables: {missing_tables}") |
|
|
conn.close() |
|
|
return result |
|
|
|
|
|
result["details"]["tables"] = tables |
|
|
|
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM users") |
|
|
user_count = cursor.fetchone()[0] |
|
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM documents") |
|
|
doc_count = cursor.fetchone()[0] |
|
|
|
|
|
result["details"]["user_count"] = user_count |
|
|
result["details"]["document_count"] = doc_count |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT COUNT(*) FROM documents d |
|
|
LEFT JOIN users u ON d.user_id = u.id |
|
|
WHERE u.id IS NULL |
|
|
""") |
|
|
orphaned_docs = cursor.fetchone()[0] |
|
|
|
|
|
if orphaned_docs > 0: |
|
|
result["errors"].append(f"Found {orphaned_docs} orphaned documents") |
|
|
|
|
|
result["details"]["orphaned_documents"] = orphaned_docs |
|
|
|
|
|
|
|
|
cursor.execute(""" |
|
|
SELECT file_hash, COUNT(*) as count |
|
|
FROM documents |
|
|
WHERE file_hash IS NOT NULL |
|
|
GROUP BY file_hash |
|
|
HAVING COUNT(*) > 1 |
|
|
""") |
|
|
duplicate_hashes = cursor.fetchall() |
|
|
|
|
|
if duplicate_hashes: |
|
|
result["details"]["duplicate_file_hashes"] = len(duplicate_hashes) |
|
|
result["errors"].append(f"Found {len(duplicate_hashes)} duplicate file hashes") |
|
|
else: |
|
|
result["details"]["duplicate_file_hashes"] = 0 |
|
|
|
|
|
conn.close() |
|
|
|
|
|
if not result["errors"]: |
|
|
result["status"] = "passed" |
|
|
else: |
|
|
result["status"] = "warning" |
|
|
|
|
|
logger.info(f"Database integrity check: {result['status']}") |
|
|
|
|
|
except Exception as e: |
|
|
result["status"] = "failed" |
|
|
result["errors"].append(f"Database verification failed: {str(e)}") |
|
|
logger.error(f"Database verification error: {str(e)}") |
|
|
|
|
|
return result |
|
|
|
|
|
async def verify_vector_store_consistency(self) -> Dict[str, Any]: |
|
|
"""Verify Qdrant vector store consistency""" |
|
|
logger.info("Verifying vector store consistency...") |
|
|
|
|
|
result = { |
|
|
"test": "vector_store_consistency", |
|
|
"status": "unknown", |
|
|
"details": {}, |
|
|
"errors": [] |
|
|
} |
|
|
|
|
|
try: |
|
|
client = get_qdrant_client() |
|
|
|
|
|
|
|
|
collections_info = client.get_collections() |
|
|
collections = collections_info.collections |
|
|
|
|
|
result["details"]["total_collections"] = len(collections) |
|
|
result["details"]["collections"] = [] |
|
|
|
|
|
total_points = 0 |
|
|
|
|
|
for collection in collections: |
|
|
collection_name = collection.name |
|
|
|
|
|
try: |
|
|
|
|
|
collection_info = client.get_collection(collection_name) |
|
|
points_count = collection_info.points_count |
|
|
|
|
|
|
|
|
sample_points, _ = client.scroll( |
|
|
collection_name=collection_name, |
|
|
limit=10, |
|
|
with_payload=True, |
|
|
with_vectors=True |
|
|
) |
|
|
|
|
|
collection_details = { |
|
|
"name": collection_name, |
|
|
"points_count": points_count, |
|
|
"sample_points": len(sample_points), |
|
|
"status": "healthy" |
|
|
} |
|
|
|
|
|
|
|
|
if sample_points: |
|
|
first_point = sample_points[0] |
|
|
|
|
|
|
|
|
if not hasattr(first_point, 'payload') or not first_point.payload: |
|
|
collection_details["status"] = "warning" |
|
|
result["errors"].append(f"Collection {collection_name}: Points missing payload") |
|
|
|
|
|
if not hasattr(first_point, 'vector') or not first_point.vector: |
|
|
collection_details["status"] = "warning" |
|
|
result["errors"].append(f"Collection {collection_name}: Points missing vectors") |
|
|
|
|
|
|
|
|
if collection_name.startswith("user_") and first_point.payload: |
|
|
required_fields = ["text", "source", "user_id"] |
|
|
missing_fields = [field for field in required_fields if field not in first_point.payload] |
|
|
|
|
|
if missing_fields: |
|
|
collection_details["status"] = "warning" |
|
|
result["errors"].append(f"Collection {collection_name}: Missing payload fields: {missing_fields}") |
|
|
|
|
|
result["details"]["collections"].append(collection_details) |
|
|
total_points += points_count |
|
|
|
|
|
except Exception as e: |
|
|
collection_details = { |
|
|
"name": collection_name, |
|
|
"status": "error", |
|
|
"error": str(e) |
|
|
} |
|
|
result["details"]["collections"].append(collection_details) |
|
|
result["errors"].append(f"Collection {collection_name}: {str(e)}") |
|
|
|
|
|
result["details"]["total_points"] = total_points |
|
|
|
|
|
|
|
|
if os.path.exists(self.database_path): |
|
|
conn = sqlite3.connect(self.database_path) |
|
|
cursor = conn.cursor() |
|
|
|
|
|
|
|
|
cursor.execute("SELECT COUNT(DISTINCT user_id) FROM documents") |
|
|
db_users_with_docs = cursor.fetchone()[0] |
|
|
|
|
|
|
|
|
user_collections = [c for c in collections if c.name.startswith("user_")] |
|
|
qdrant_user_collections = len(user_collections) |
|
|
|
|
|
result["details"]["db_users_with_documents"] = db_users_with_docs |
|
|
result["details"]["qdrant_user_collections"] = qdrant_user_collections |
|
|
|
|
|
|
|
|
if db_users_with_docs != qdrant_user_collections: |
|
|
result["errors"].append( |
|
|
f"Mismatch between database users with documents ({db_users_with_docs}) " |
|
|
f"and Qdrant user collections ({qdrant_user_collections})" |
|
|
) |
|
|
|
|
|
conn.close() |
|
|
|
|
|
if not result["errors"]: |
|
|
result["status"] = "passed" |
|
|
else: |
|
|
result["status"] = "warning" |
|
|
|
|
|
logger.info(f"Vector store consistency check: {result['status']}") |
|
|
|
|
|
except Exception as e: |
|
|
result["status"] = "failed" |
|
|
result["errors"].append(f"Vector store verification failed: {str(e)}") |
|
|
logger.error(f"Vector store verification error: {str(e)}") |
|
|
|
|
|
return result |
|
|
|
|
|
async def verify_backup_integrity(self) -> Dict[str, Any]: |
|
|
"""Verify backup integrity and completeness""" |
|
|
logger.info("Verifying backup integrity...") |
|
|
|
|
|
result = { |
|
|
"test": "backup_integrity", |
|
|
"status": "unknown", |
|
|
"details": {}, |
|
|
"errors": [] |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
backups = await backup_manager.list_backups() |
|
|
|
|
|
result["details"]["total_backups"] = len(backups) |
|
|
result["details"]["backups"] = [] |
|
|
|
|
|
if not backups: |
|
|
result["status"] = "warning" |
|
|
result["errors"].append("No backups found") |
|
|
return result |
|
|
|
|
|
|
|
|
verified_count = 0 |
|
|
failed_count = 0 |
|
|
|
|
|
for backup in backups: |
|
|
backup_details = { |
|
|
"backup_id": backup.backup_id, |
|
|
"timestamp": backup.timestamp.isoformat(), |
|
|
"file_size_bytes": backup.file_size_bytes, |
|
|
"status": backup.status |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
is_valid = await backup_manager.verify_backup_integrity(backup.backup_id) |
|
|
|
|
|
if is_valid: |
|
|
backup_details["integrity"] = "valid" |
|
|
verified_count += 1 |
|
|
else: |
|
|
backup_details["integrity"] = "invalid" |
|
|
failed_count += 1 |
|
|
result["errors"].append(f"Backup {backup.backup_id} failed integrity check") |
|
|
|
|
|
except Exception as e: |
|
|
backup_details["integrity"] = "error" |
|
|
backup_details["error"] = str(e) |
|
|
failed_count += 1 |
|
|
result["errors"].append(f"Backup {backup.backup_id} verification error: {str(e)}") |
|
|
|
|
|
result["details"]["backups"].append(backup_details) |
|
|
|
|
|
result["details"]["verified_backups"] = verified_count |
|
|
result["details"]["failed_backups"] = failed_count |
|
|
|
|
|
|
|
|
if backups: |
|
|
latest_backup = max(backups, key=lambda b: b.timestamp) |
|
|
days_since_backup = (datetime.utcnow() - latest_backup.timestamp).days |
|
|
|
|
|
result["details"]["days_since_latest_backup"] = days_since_backup |
|
|
|
|
|
if days_since_backup > 7: |
|
|
result["errors"].append(f"Latest backup is {days_since_backup} days old") |
|
|
|
|
|
if failed_count == 0 and not result["errors"]: |
|
|
result["status"] = "passed" |
|
|
elif failed_count < len(backups): |
|
|
result["status"] = "warning" |
|
|
else: |
|
|
result["status"] = "failed" |
|
|
|
|
|
logger.info(f"Backup integrity check: {result['status']}") |
|
|
|
|
|
except Exception as e: |
|
|
result["status"] = "failed" |
|
|
result["errors"].append(f"Backup verification failed: {str(e)}") |
|
|
logger.error(f"Backup verification error: {str(e)}") |
|
|
|
|
|
return result |
|
|
|
|
|
def verify_file_system_integrity(self) -> Dict[str, Any]: |
|
|
"""Verify file system integrity and permissions""" |
|
|
logger.info("Verifying file system integrity...") |
|
|
|
|
|
result = { |
|
|
"test": "file_system_integrity", |
|
|
"status": "unknown", |
|
|
"details": {}, |
|
|
"errors": [] |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
critical_paths = [ |
|
|
{"path": ".", "type": "directory", "name": "application_root"}, |
|
|
{"path": self.database_path, "type": "file", "name": "database"}, |
|
|
{"path": "uploads", "type": "directory", "name": "uploads_directory"}, |
|
|
{"path": "backups", "type": "directory", "name": "backups_directory"}, |
|
|
{"path": "src", "type": "directory", "name": "source_code"}, |
|
|
] |
|
|
|
|
|
result["details"]["path_checks"] = [] |
|
|
|
|
|
for path_info in critical_paths: |
|
|
path = path_info["path"] |
|
|
path_type = path_info["type"] |
|
|
name = path_info["name"] |
|
|
|
|
|
check_result = { |
|
|
"name": name, |
|
|
"path": path, |
|
|
"type": path_type, |
|
|
"exists": False, |
|
|
"readable": False, |
|
|
"writable": False |
|
|
} |
|
|
|
|
|
if os.path.exists(path): |
|
|
check_result["exists"] = True |
|
|
|
|
|
|
|
|
check_result["readable"] = os.access(path, os.R_OK) |
|
|
check_result["writable"] = os.access(path, os.W_OK) |
|
|
|
|
|
if path_type == "directory": |
|
|
check_result["executable"] = os.access(path, os.X_OK) |
|
|
|
|
|
|
|
|
if path_type == "file": |
|
|
check_result["size_bytes"] = os.path.getsize(path) |
|
|
elif path_type == "directory": |
|
|
try: |
|
|
|
|
|
file_count = len([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]) |
|
|
check_result["file_count"] = file_count |
|
|
except PermissionError: |
|
|
check_result["file_count"] = "permission_denied" |
|
|
else: |
|
|
result["errors"].append(f"Critical path missing: {path} ({name})") |
|
|
|
|
|
result["details"]["path_checks"].append(check_result) |
|
|
|
|
|
|
|
|
try: |
|
|
import shutil |
|
|
total, used, free = shutil.disk_usage(".") |
|
|
|
|
|
result["details"]["disk_usage"] = { |
|
|
"total_bytes": total, |
|
|
"used_bytes": used, |
|
|
"free_bytes": free, |
|
|
"free_gb": free / (1024**3), |
|
|
"usage_percent": (used / total) * 100 |
|
|
} |
|
|
|
|
|
|
|
|
free_gb = free / (1024**3) |
|
|
if free_gb < 1.0: |
|
|
result["errors"].append(f"Critical: Only {free_gb:.2f} GB free disk space") |
|
|
elif free_gb < 5.0: |
|
|
result["errors"].append(f"Warning: Only {free_gb:.2f} GB free disk space") |
|
|
|
|
|
except Exception as e: |
|
|
result["errors"].append(f"Could not check disk usage: {str(e)}") |
|
|
|
|
|
if not result["errors"]: |
|
|
result["status"] = "passed" |
|
|
else: |
|
|
result["status"] = "warning" |
|
|
|
|
|
logger.info(f"File system integrity check: {result['status']}") |
|
|
|
|
|
except Exception as e: |
|
|
result["status"] = "failed" |
|
|
result["errors"].append(f"File system verification failed: {str(e)}") |
|
|
logger.error(f"File system verification error: {str(e)}") |
|
|
|
|
|
return result |
|
|
|
|
|
async def run_comprehensive_verification(self) -> Dict[str, Any]: |
|
|
"""Run all verification tests""" |
|
|
logger.info("Starting comprehensive data persistence verification...") |
|
|
|
|
|
start_time = datetime.utcnow() |
|
|
|
|
|
|
|
|
tests = [ |
|
|
self.verify_database_integrity(), |
|
|
await self.verify_vector_store_consistency(), |
|
|
await self.verify_backup_integrity(), |
|
|
self.verify_file_system_integrity() |
|
|
] |
|
|
|
|
|
|
|
|
verification_results = { |
|
|
"timestamp": start_time.isoformat(), |
|
|
"duration_seconds": (datetime.utcnow() - start_time).total_seconds(), |
|
|
"overall_status": "unknown", |
|
|
"tests": tests, |
|
|
"summary": { |
|
|
"total_tests": len(tests), |
|
|
"passed": 0, |
|
|
"warnings": 0, |
|
|
"failed": 0 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
for test in tests: |
|
|
if test["status"] == "passed": |
|
|
verification_results["summary"]["passed"] += 1 |
|
|
elif test["status"] == "warning": |
|
|
verification_results["summary"]["warnings"] += 1 |
|
|
elif test["status"] == "failed": |
|
|
verification_results["summary"]["failed"] += 1 |
|
|
|
|
|
|
|
|
if verification_results["summary"]["failed"] > 0: |
|
|
verification_results["overall_status"] = "failed" |
|
|
elif verification_results["summary"]["warnings"] > 0: |
|
|
verification_results["overall_status"] = "warning" |
|
|
else: |
|
|
verification_results["overall_status"] = "passed" |
|
|
|
|
|
logger.info(f"Verification completed: {verification_results['overall_status']}") |
|
|
|
|
|
return verification_results |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main function""" |
|
|
parser = argparse.ArgumentParser(description="Data Persistence Verification Tool") |
|
|
parser.add_argument("--database", default="knowledge_assistant.db", help="Database file path") |
|
|
parser.add_argument("--output", help="Output file for results (JSON)") |
|
|
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.verbose: |
|
|
logging.getLogger().setLevel(logging.DEBUG) |
|
|
|
|
|
|
|
|
verifier = DataPersistenceVerifier(args.database) |
|
|
|
|
|
|
|
|
try: |
|
|
results = asyncio.run(verifier.run_comprehensive_verification()) |
|
|
|
|
|
|
|
|
if args.output: |
|
|
with open(args.output, 'w') as f: |
|
|
json.dump(results, f, indent=2) |
|
|
print(f"Results saved to: {args.output}") |
|
|
else: |
|
|
print(json.dumps(results, indent=2)) |
|
|
|
|
|
|
|
|
if results["overall_status"] == "failed": |
|
|
sys.exit(1) |
|
|
elif results["overall_status"] == "warning": |
|
|
sys.exit(2) |
|
|
else: |
|
|
sys.exit(0) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Verification failed: {str(e)}") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |