File size: 3,621 Bytes
48e5de1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | """
Test script to verify Redis job store functionality
"""
import sys
import os
# Add the project root to the Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from backend.utils.redis_job_store import RedisJobStore
def test_redis_job_store():
"""Test the Redis job store functionality."""
print("Testing Redis Job Store...")
# Initialize Redis job store
try:
redis_job_store = RedisJobStore('redis://localhost:6379/0')
print("[OK] Redis job store initialized successfully")
except Exception as e:
print(f"[ERROR] Failed to initialize Redis job store: {e}")
return False
# Test creating a job with default status
try:
job_id = redis_job_store.create_job(initial_status='pending', initial_data={'test': True})
print(f"[OK] Job created successfully with ID: {job_id}")
except Exception as e:
print(f"[ERROR] Failed to create job: {e}")
return False
# Test getting the job
try:
job = redis_job_store.get_job(job_id)
if job and job['status'] == 'pending':
print(f"[OK] Job retrieved successfully: {job}")
else:
print(f"[ERROR] Job not found or incorrect status: {job}")
return False
except Exception as e:
print(f"[ERROR] Failed to get job: {e}")
return False
# Test updating the job
try:
success = redis_job_store.update_job(job_id, status='completed', result={'data': 'test result'})
if success:
print("[OK] Job updated successfully")
else:
print("[ERROR] Job update failed")
return False
except Exception as e:
print(f"[ERROR] Failed to update job: {e}")
return False
# Test getting the updated job
try:
job = redis_job_store.get_job(job_id)
if job and job['status'] == 'completed' and job['result']['data'] == 'test result':
print(f"[OK] Updated job retrieved successfully: {job}")
else:
print(f"[ERROR] Job not updated correctly: {job}")
return False
except Exception as e:
print(f"[ERROR] Failed to get updated job: {e}")
return False
# Test deleting the job
try:
success = redis_job_store.delete_job(job_id)
if success:
print("[OK] Job deleted successfully")
else:
print("[ERROR] Job deletion failed")
return False
except Exception as e:
print(f"[ERROR] Failed to delete job: {e}")
return False
# Test validation functionality
try:
# Test invalid status
try:
redis_job_store.create_job(initial_status='invalid_status')
print("[ERROR] Should have failed with invalid status")
return False
except ValueError:
print("[OK] Correctly rejected invalid status")
# Test invalid job ID format
try:
redis_job_store.get_job("invalid job id with spaces")
print("[ERROR] Should have failed with invalid job ID format")
return False
except ValueError:
print("[OK] Correctly rejected invalid job ID format")
except Exception as e:
print(f"[ERROR] Validation tests failed: {e}")
return False
print("[OK] All Redis job store tests passed!")
return True
if __name__ == "__main__":
success = test_redis_job_store()
if success:
print("\nRedis job store is working correctly!")
else:
print("\nRedis job store tests failed!")
sys.exit(1) |