| | """Unit tests for comfy_execution/jobs.py""" |
| |
|
| | from comfy_execution.jobs import ( |
| | JobStatus, |
| | is_previewable, |
| | normalize_queue_item, |
| | normalize_history_item, |
| | get_outputs_summary, |
| | apply_sorting, |
| | ) |
| |
|
| |
|
| | class TestJobStatus: |
| | """Test JobStatus constants.""" |
| |
|
| | def test_status_values(self): |
| | """Status constants should have expected string values.""" |
| | assert JobStatus.PENDING == 'pending' |
| | assert JobStatus.IN_PROGRESS == 'in_progress' |
| | assert JobStatus.COMPLETED == 'completed' |
| | assert JobStatus.FAILED == 'failed' |
| | assert JobStatus.CANCELLED == 'cancelled' |
| |
|
| | def test_all_contains_all_statuses(self): |
| | """ALL should contain all status values.""" |
| | assert JobStatus.PENDING in JobStatus.ALL |
| | assert JobStatus.IN_PROGRESS in JobStatus.ALL |
| | assert JobStatus.COMPLETED in JobStatus.ALL |
| | assert JobStatus.FAILED in JobStatus.ALL |
| | assert JobStatus.CANCELLED in JobStatus.ALL |
| | assert len(JobStatus.ALL) == 5 |
| |
|
| |
|
| | class TestIsPreviewable: |
| | """Unit tests for is_previewable()""" |
| |
|
| | def test_previewable_media_types(self): |
| | """Images, video, audio media types should be previewable.""" |
| | for media_type in ['images', 'video', 'audio']: |
| | assert is_previewable(media_type, {}) is True |
| |
|
| | def test_non_previewable_media_types(self): |
| | """Other media types should not be previewable.""" |
| | for media_type in ['latents', 'text', 'metadata', 'files']: |
| | assert is_previewable(media_type, {}) is False |
| |
|
| | def test_3d_extensions_previewable(self): |
| | """3D file extensions should be previewable regardless of media_type.""" |
| | for ext in ['.obj', '.fbx', '.gltf', '.glb']: |
| | item = {'filename': f'model{ext}'} |
| | assert is_previewable('files', item) is True |
| |
|
| | def test_3d_extensions_case_insensitive(self): |
| | """3D extension check should be case insensitive.""" |
| | item = {'filename': 'MODEL.GLB'} |
| | assert is_previewable('files', item) is True |
| |
|
| | def test_video_format_previewable(self): |
| | """Items with video/ format should be previewable.""" |
| | item = {'format': 'video/mp4'} |
| | assert is_previewable('files', item) is True |
| |
|
| | def test_audio_format_previewable(self): |
| | """Items with audio/ format should be previewable.""" |
| | item = {'format': 'audio/wav'} |
| | assert is_previewable('files', item) is True |
| |
|
| | def test_other_format_not_previewable(self): |
| | """Items with other format should not be previewable.""" |
| | item = {'format': 'application/json'} |
| | assert is_previewable('files', item) is False |
| |
|
| |
|
| | class TestGetOutputsSummary: |
| | """Unit tests for get_outputs_summary()""" |
| |
|
| | def test_empty_outputs(self): |
| | """Empty outputs should return 0 count and None preview.""" |
| | count, preview = get_outputs_summary({}) |
| | assert count == 0 |
| | assert preview is None |
| |
|
| | def test_counts_across_multiple_nodes(self): |
| | """Outputs from multiple nodes should all be counted.""" |
| | outputs = { |
| | 'node1': {'images': [{'filename': 'a.png', 'type': 'output'}]}, |
| | 'node2': {'images': [{'filename': 'b.png', 'type': 'output'}]}, |
| | 'node3': {'images': [ |
| | {'filename': 'c.png', 'type': 'output'}, |
| | {'filename': 'd.png', 'type': 'output'} |
| | ]} |
| | } |
| | count, preview = get_outputs_summary(outputs) |
| | assert count == 4 |
| |
|
| | def test_skips_animated_key_and_non_list_values(self): |
| | """The 'animated' key and non-list values should be skipped.""" |
| | outputs = { |
| | 'node1': { |
| | 'images': [{'filename': 'test.png', 'type': 'output'}], |
| | 'animated': [True], |
| | 'metadata': 'string', |
| | 'count': 42 |
| | } |
| | } |
| | count, preview = get_outputs_summary(outputs) |
| | assert count == 1 |
| |
|
| | def test_preview_prefers_type_output(self): |
| | """Items with type='output' should be preferred for preview.""" |
| | outputs = { |
| | 'node1': { |
| | 'images': [ |
| | {'filename': 'temp.png', 'type': 'temp'}, |
| | {'filename': 'output.png', 'type': 'output'} |
| | ] |
| | } |
| | } |
| | count, preview = get_outputs_summary(outputs) |
| | assert count == 2 |
| | assert preview['filename'] == 'output.png' |
| |
|
| | def test_preview_fallback_when_no_output_type(self): |
| | """If no type='output', should use first previewable.""" |
| | outputs = { |
| | 'node1': { |
| | 'images': [ |
| | {'filename': 'temp1.png', 'type': 'temp'}, |
| | {'filename': 'temp2.png', 'type': 'temp'} |
| | ] |
| | } |
| | } |
| | count, preview = get_outputs_summary(outputs) |
| | assert preview['filename'] == 'temp1.png' |
| |
|
| | def test_non_previewable_media_types_counted_but_no_preview(self): |
| | """Non-previewable media types should be counted but not used as preview.""" |
| | outputs = { |
| | 'node1': { |
| | 'latents': [ |
| | {'filename': 'latent1.safetensors'}, |
| | {'filename': 'latent2.safetensors'} |
| | ] |
| | } |
| | } |
| | count, preview = get_outputs_summary(outputs) |
| | assert count == 2 |
| | assert preview is None |
| |
|
| | def test_previewable_media_types(self): |
| | """Images, video, and audio media types should be previewable.""" |
| | for media_type in ['images', 'video', 'audio']: |
| | outputs = { |
| | 'node1': { |
| | media_type: [{'filename': 'test.file', 'type': 'output'}] |
| | } |
| | } |
| | count, preview = get_outputs_summary(outputs) |
| | assert preview is not None, f"{media_type} should be previewable" |
| |
|
| | def test_3d_files_previewable(self): |
| | """3D file extensions should be previewable.""" |
| | for ext in ['.obj', '.fbx', '.gltf', '.glb']: |
| | outputs = { |
| | 'node1': { |
| | 'files': [{'filename': f'model{ext}', 'type': 'output'}] |
| | } |
| | } |
| | count, preview = get_outputs_summary(outputs) |
| | assert preview is not None, f"3D file {ext} should be previewable" |
| |
|
| | def test_format_mime_type_previewable(self): |
| | """Files with video/ or audio/ format should be previewable.""" |
| | for fmt in ['video/x-custom', 'audio/x-custom']: |
| | outputs = { |
| | 'node1': { |
| | 'files': [{'filename': 'file.custom', 'format': fmt, 'type': 'output'}] |
| | } |
| | } |
| | count, preview = get_outputs_summary(outputs) |
| | assert preview is not None, f"Format {fmt} should be previewable" |
| |
|
| | def test_preview_enriched_with_node_metadata(self): |
| | """Preview should include nodeId, mediaType, and original fields.""" |
| | outputs = { |
| | 'node123': { |
| | 'images': [{'filename': 'test.png', 'type': 'output', 'subfolder': 'outputs'}] |
| | } |
| | } |
| | count, preview = get_outputs_summary(outputs) |
| | assert preview['nodeId'] == 'node123' |
| | assert preview['mediaType'] == 'images' |
| | assert preview['subfolder'] == 'outputs' |
| |
|
| |
|
| | class TestApplySorting: |
| | """Unit tests for apply_sorting()""" |
| |
|
| | def test_sort_by_create_time_desc(self): |
| | """Default sort by create_time descending.""" |
| | jobs = [ |
| | {'id': 'a', 'create_time': 100}, |
| | {'id': 'b', 'create_time': 300}, |
| | {'id': 'c', 'create_time': 200}, |
| | ] |
| | result = apply_sorting(jobs, 'created_at', 'desc') |
| | assert [j['id'] for j in result] == ['b', 'c', 'a'] |
| |
|
| | def test_sort_by_create_time_asc(self): |
| | """Sort by create_time ascending.""" |
| | jobs = [ |
| | {'id': 'a', 'create_time': 100}, |
| | {'id': 'b', 'create_time': 300}, |
| | {'id': 'c', 'create_time': 200}, |
| | ] |
| | result = apply_sorting(jobs, 'created_at', 'asc') |
| | assert [j['id'] for j in result] == ['a', 'c', 'b'] |
| |
|
| | def test_sort_by_execution_duration(self): |
| | """Sort by execution_duration should order by duration.""" |
| | jobs = [ |
| | {'id': 'a', 'create_time': 100, 'execution_start_time': 100, 'execution_end_time': 5100}, |
| | {'id': 'b', 'create_time': 300, 'execution_start_time': 300, 'execution_end_time': 1300}, |
| | {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, |
| | ] |
| | result = apply_sorting(jobs, 'execution_duration', 'desc') |
| | assert [j['id'] for j in result] == ['a', 'c', 'b'] |
| |
|
| | def test_sort_with_none_values(self): |
| | """Jobs with None values should sort as 0.""" |
| | jobs = [ |
| | {'id': 'a', 'create_time': 100, 'execution_start_time': 100, 'execution_end_time': 5100}, |
| | {'id': 'b', 'create_time': 300, 'execution_start_time': None, 'execution_end_time': None}, |
| | {'id': 'c', 'create_time': 200, 'execution_start_time': 200, 'execution_end_time': 3200}, |
| | ] |
| | result = apply_sorting(jobs, 'execution_duration', 'asc') |
| | assert result[0]['id'] == 'b' |
| |
|
| |
|
| | class TestNormalizeQueueItem: |
| | """Unit tests for normalize_queue_item()""" |
| |
|
| | def test_basic_normalization(self): |
| | """Queue item should be normalized to job dict.""" |
| | item = ( |
| | 10, |
| | 'prompt-123', |
| | {'nodes': {}}, |
| | { |
| | 'create_time': 1234567890, |
| | 'extra_pnginfo': {'workflow': {'id': 'workflow-abc'}} |
| | }, |
| | ['node1'], |
| | ) |
| | job = normalize_queue_item(item, JobStatus.PENDING) |
| |
|
| | assert job['id'] == 'prompt-123' |
| | assert job['status'] == 'pending' |
| | assert job['priority'] == 10 |
| | assert job['create_time'] == 1234567890 |
| | assert 'execution_start_time' not in job |
| | assert 'execution_end_time' not in job |
| | assert 'execution_error' not in job |
| | assert 'preview_output' not in job |
| | assert job['outputs_count'] == 0 |
| | assert job['workflow_id'] == 'workflow-abc' |
| |
|
| |
|
| | class TestNormalizeHistoryItem: |
| | """Unit tests for normalize_history_item()""" |
| |
|
| | def test_completed_job(self): |
| | """Completed history item should have correct status and times from messages.""" |
| | history_item = { |
| | 'prompt': ( |
| | 5, |
| | 'prompt-456', |
| | {'nodes': {}}, |
| | { |
| | 'create_time': 1234567890000, |
| | 'extra_pnginfo': {'workflow': {'id': 'workflow-xyz'}} |
| | }, |
| | ['node1'], |
| | ), |
| | 'status': { |
| | 'status_str': 'success', |
| | 'completed': True, |
| | 'messages': [ |
| | ('execution_start', {'prompt_id': 'prompt-456', 'timestamp': 1234567890500}), |
| | ('execution_success', {'prompt_id': 'prompt-456', 'timestamp': 1234567893000}), |
| | ] |
| | }, |
| | 'outputs': {}, |
| | } |
| | job = normalize_history_item('prompt-456', history_item) |
| |
|
| | assert job['id'] == 'prompt-456' |
| | assert job['status'] == 'completed' |
| | assert job['priority'] == 5 |
| | assert job['execution_start_time'] == 1234567890500 |
| | assert job['execution_end_time'] == 1234567893000 |
| | assert job['workflow_id'] == 'workflow-xyz' |
| |
|
| | def test_failed_job(self): |
| | """Failed history item should have failed status and error from messages.""" |
| | history_item = { |
| | 'prompt': ( |
| | 5, |
| | 'prompt-789', |
| | {'nodes': {}}, |
| | {'create_time': 1234567890000}, |
| | ['node1'], |
| | ), |
| | 'status': { |
| | 'status_str': 'error', |
| | 'completed': False, |
| | 'messages': [ |
| | ('execution_start', {'prompt_id': 'prompt-789', 'timestamp': 1234567890500}), |
| | ('execution_error', { |
| | 'prompt_id': 'prompt-789', |
| | 'node_id': '5', |
| | 'node_type': 'KSampler', |
| | 'exception_message': 'CUDA out of memory', |
| | 'exception_type': 'RuntimeError', |
| | 'traceback': ['Traceback...', 'RuntimeError: CUDA out of memory'], |
| | 'timestamp': 1234567891000, |
| | }) |
| | ] |
| | }, |
| | 'outputs': {}, |
| | } |
| |
|
| | job = normalize_history_item('prompt-789', history_item) |
| | assert job['status'] == 'failed' |
| | assert job['execution_start_time'] == 1234567890500 |
| | assert job['execution_end_time'] == 1234567891000 |
| | assert job['execution_error']['node_id'] == '5' |
| | assert job['execution_error']['node_type'] == 'KSampler' |
| | assert job['execution_error']['exception_message'] == 'CUDA out of memory' |
| |
|
| | def test_cancelled_job(self): |
| | """Cancelled/interrupted history item should have cancelled status.""" |
| | history_item = { |
| | 'prompt': ( |
| | 5, |
| | 'prompt-cancelled', |
| | {'nodes': {}}, |
| | {'create_time': 1234567890000}, |
| | ['node1'], |
| | ), |
| | 'status': { |
| | 'status_str': 'error', |
| | 'completed': False, |
| | 'messages': [ |
| | ('execution_start', {'prompt_id': 'prompt-cancelled', 'timestamp': 1234567890500}), |
| | ('execution_interrupted', { |
| | 'prompt_id': 'prompt-cancelled', |
| | 'node_id': '5', |
| | 'node_type': 'KSampler', |
| | 'executed': ['1', '2', '3'], |
| | 'timestamp': 1234567891000, |
| | }) |
| | ] |
| | }, |
| | 'outputs': {}, |
| | } |
| |
|
| | job = normalize_history_item('prompt-cancelled', history_item) |
| | assert job['status'] == 'cancelled' |
| | assert job['execution_start_time'] == 1234567890500 |
| | assert job['execution_end_time'] == 1234567891000 |
| | |
| | assert 'execution_error' not in job |
| |
|
| | def test_include_outputs(self): |
| | """When include_outputs=True, should include full output data.""" |
| | history_item = { |
| | 'prompt': ( |
| | 5, |
| | 'prompt-123', |
| | {'nodes': {'1': {}}}, |
| | {'create_time': 1234567890, 'client_id': 'abc'}, |
| | ['node1'], |
| | ), |
| | 'status': {'status_str': 'success', 'completed': True, 'messages': []}, |
| | 'outputs': {'node1': {'images': [{'filename': 'test.png'}]}}, |
| | } |
| | job = normalize_history_item('prompt-123', history_item, include_outputs=True) |
| |
|
| | assert 'outputs' in job |
| | assert 'workflow' in job |
| | assert 'execution_status' in job |
| | assert job['outputs'] == {'node1': {'images': [{'filename': 'test.png'}]}} |
| | assert job['workflow'] == { |
| | 'prompt': {'nodes': {'1': {}}}, |
| | 'extra_data': {'create_time': 1234567890, 'client_id': 'abc'}, |
| | } |
| |
|