""" Test script to verify Redis job store functionality """ import sys import os # Add the project root to the Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from backend.utils.redis_job_store import RedisJobStore def test_redis_job_store(): """Test the Redis job store functionality.""" print("Testing Redis Job Store...") # Initialize Redis job store try: redis_job_store = RedisJobStore('redis://localhost:6379/0') print("[OK] Redis job store initialized successfully") except Exception as e: print(f"[ERROR] Failed to initialize Redis job store: {e}") return False # Test creating a job with default status try: job_id = redis_job_store.create_job(initial_status='pending', initial_data={'test': True}) print(f"[OK] Job created successfully with ID: {job_id}") except Exception as e: print(f"[ERROR] Failed to create job: {e}") return False # Test getting the job try: job = redis_job_store.get_job(job_id) if job and job['status'] == 'pending': print(f"[OK] Job retrieved successfully: {job}") else: print(f"[ERROR] Job not found or incorrect status: {job}") return False except Exception as e: print(f"[ERROR] Failed to get job: {e}") return False # Test updating the job try: success = redis_job_store.update_job(job_id, status='completed', result={'data': 'test result'}) if success: print("[OK] Job updated successfully") else: print("[ERROR] Job update failed") return False except Exception as e: print(f"[ERROR] Failed to update job: {e}") return False # Test getting the updated job try: job = redis_job_store.get_job(job_id) if job and job['status'] == 'completed' and job['result']['data'] == 'test result': print(f"[OK] Updated job retrieved successfully: {job}") else: print(f"[ERROR] Job not updated correctly: {job}") return False except Exception as e: print(f"[ERROR] Failed to get updated job: {e}") return False # Test deleting the job try: success = redis_job_store.delete_job(job_id) if success: print("[OK] Job deleted successfully") else: print("[ERROR] Job deletion failed") return False except Exception as e: print(f"[ERROR] Failed to delete job: {e}") return False # Test validation functionality try: # Test invalid status try: redis_job_store.create_job(initial_status='invalid_status') print("[ERROR] Should have failed with invalid status") return False except ValueError: print("[OK] Correctly rejected invalid status") # Test invalid job ID format try: redis_job_store.get_job("invalid job id with spaces") print("[ERROR] Should have failed with invalid job ID format") return False except ValueError: print("[OK] Correctly rejected invalid job ID format") except Exception as e: print(f"[ERROR] Validation tests failed: {e}") return False print("[OK] All Redis job store tests passed!") return True if __name__ == "__main__": success = test_redis_job_store() if success: print("\nRedis job store is working correctly!") else: print("\nRedis job store tests failed!") sys.exit(1)