|
|
""" |
|
|
Test script to verify Redis job store functionality |
|
|
""" |
|
|
import sys |
|
|
import os |
|
|
|
|
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) |
|
|
|
|
|
from backend.utils.redis_job_store import RedisJobStore |
|
|
|
|
|
def test_redis_job_store(): |
|
|
"""Test the Redis job store functionality.""" |
|
|
print("Testing Redis Job Store...") |
|
|
|
|
|
|
|
|
try: |
|
|
redis_job_store = RedisJobStore('redis://localhost:6379/0') |
|
|
print("[OK] Redis job store initialized successfully") |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to initialize Redis job store: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
try: |
|
|
job_id = redis_job_store.create_job(initial_status='pending', initial_data={'test': True}) |
|
|
print(f"[OK] Job created successfully with ID: {job_id}") |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to create job: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
try: |
|
|
job = redis_job_store.get_job(job_id) |
|
|
if job and job['status'] == 'pending': |
|
|
print(f"[OK] Job retrieved successfully: {job}") |
|
|
else: |
|
|
print(f"[ERROR] Job not found or incorrect status: {job}") |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to get job: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
try: |
|
|
success = redis_job_store.update_job(job_id, status='completed', result={'data': 'test result'}) |
|
|
if success: |
|
|
print("[OK] Job updated successfully") |
|
|
else: |
|
|
print("[ERROR] Job update failed") |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to update job: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
try: |
|
|
job = redis_job_store.get_job(job_id) |
|
|
if job and job['status'] == 'completed' and job['result']['data'] == 'test result': |
|
|
print(f"[OK] Updated job retrieved successfully: {job}") |
|
|
else: |
|
|
print(f"[ERROR] Job not updated correctly: {job}") |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to get updated job: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
try: |
|
|
success = redis_job_store.delete_job(job_id) |
|
|
if success: |
|
|
print("[OK] Job deleted successfully") |
|
|
else: |
|
|
print("[ERROR] Job deletion failed") |
|
|
return False |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to delete job: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
try: |
|
|
redis_job_store.create_job(initial_status='invalid_status') |
|
|
print("[ERROR] Should have failed with invalid status") |
|
|
return False |
|
|
except ValueError: |
|
|
print("[OK] Correctly rejected invalid status") |
|
|
|
|
|
|
|
|
try: |
|
|
redis_job_store.get_job("invalid job id with spaces") |
|
|
print("[ERROR] Should have failed with invalid job ID format") |
|
|
return False |
|
|
except ValueError: |
|
|
print("[OK] Correctly rejected invalid job ID format") |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Validation tests failed: {e}") |
|
|
return False |
|
|
|
|
|
print("[OK] All Redis job store tests passed!") |
|
|
return True |
|
|
|
|
|
if __name__ == "__main__": |
|
|
success = test_redis_job_store() |
|
|
if success: |
|
|
print("\nRedis job store is working correctly!") |
|
|
else: |
|
|
print("\nRedis job store tests failed!") |
|
|
sys.exit(1) |