Lin / test_redis_job_store.py
Zelyanoth's picture
add redis for job queuing
48e5de1
"""
Test script to verify Redis job store functionality
"""
import sys
import os
# Add the project root to the Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from backend.utils.redis_job_store import RedisJobStore
def test_redis_job_store():
"""Test the Redis job store functionality."""
print("Testing Redis Job Store...")
# Initialize Redis job store
try:
redis_job_store = RedisJobStore('redis://localhost:6379/0')
print("[OK] Redis job store initialized successfully")
except Exception as e:
print(f"[ERROR] Failed to initialize Redis job store: {e}")
return False
# Test creating a job with default status
try:
job_id = redis_job_store.create_job(initial_status='pending', initial_data={'test': True})
print(f"[OK] Job created successfully with ID: {job_id}")
except Exception as e:
print(f"[ERROR] Failed to create job: {e}")
return False
# Test getting the job
try:
job = redis_job_store.get_job(job_id)
if job and job['status'] == 'pending':
print(f"[OK] Job retrieved successfully: {job}")
else:
print(f"[ERROR] Job not found or incorrect status: {job}")
return False
except Exception as e:
print(f"[ERROR] Failed to get job: {e}")
return False
# Test updating the job
try:
success = redis_job_store.update_job(job_id, status='completed', result={'data': 'test result'})
if success:
print("[OK] Job updated successfully")
else:
print("[ERROR] Job update failed")
return False
except Exception as e:
print(f"[ERROR] Failed to update job: {e}")
return False
# Test getting the updated job
try:
job = redis_job_store.get_job(job_id)
if job and job['status'] == 'completed' and job['result']['data'] == 'test result':
print(f"[OK] Updated job retrieved successfully: {job}")
else:
print(f"[ERROR] Job not updated correctly: {job}")
return False
except Exception as e:
print(f"[ERROR] Failed to get updated job: {e}")
return False
# Test deleting the job
try:
success = redis_job_store.delete_job(job_id)
if success:
print("[OK] Job deleted successfully")
else:
print("[ERROR] Job deletion failed")
return False
except Exception as e:
print(f"[ERROR] Failed to delete job: {e}")
return False
# Test validation functionality
try:
# Test invalid status
try:
redis_job_store.create_job(initial_status='invalid_status')
print("[ERROR] Should have failed with invalid status")
return False
except ValueError:
print("[OK] Correctly rejected invalid status")
# Test invalid job ID format
try:
redis_job_store.get_job("invalid job id with spaces")
print("[ERROR] Should have failed with invalid job ID format")
return False
except ValueError:
print("[OK] Correctly rejected invalid job ID format")
except Exception as e:
print(f"[ERROR] Validation tests failed: {e}")
return False
print("[OK] All Redis job store tests passed!")
return True
if __name__ == "__main__":
success = test_redis_job_store()
if success:
print("\nRedis job store is working correctly!")
else:
print("\nRedis job store tests failed!")
sys.exit(1)