Spaces:
Paused
Paused
| from __future__ import annotations | |
| import json | |
| import tempfile | |
| import time | |
| import unittest | |
| from pathlib import Path | |
| from services.image_task_service import ImageTaskService | |
| OWNER = {"id": "owner-1", "name": "Owner", "role": "admin"} | |
| OTHER_OWNER = {"id": "owner-2", "name": "Other", "role": "user"} | |
| def wait_for_task(service: ImageTaskService, identity: dict[str, object], task_id: str, status: str, timeout: float = 2.0): | |
| deadline = time.time() + timeout | |
| last = None | |
| while time.time() < deadline: | |
| result = service.list_tasks(identity, [task_id]) | |
| last = (result.get("items") or [None])[0] | |
| if last and last.get("status") == status: | |
| return last | |
| time.sleep(0.02) | |
| raise AssertionError(f"task {task_id} did not reach {status}, last={last}") | |
| class ImageTaskServiceTests(unittest.TestCase): | |
| def make_service(self, path: Path, handler=None) -> ImageTaskService: | |
| return ImageTaskService( | |
| path, | |
| generation_handler=handler or (lambda _payload: {"data": [{"url": "http://example.test/image.png"}]}), | |
| edit_handler=handler or (lambda _payload: {"data": [{"url": "http://example.test/edit.png"}]}), | |
| retention_days_getter=lambda: 30, | |
| ) | |
| def test_duplicate_submit_uses_existing_task(self): | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| calls = 0 | |
| def handler(_payload): | |
| nonlocal calls | |
| calls += 1 | |
| time.sleep(0.05) | |
| return {"data": [{"url": "http://example.test/image.png"}]} | |
| service = self.make_service(Path(tmp_dir) / "image_tasks.json", handler) | |
| first = service.submit_generation( | |
| OWNER, | |
| client_task_id="task-1", | |
| prompt="cat", | |
| model="gpt-image-2", | |
| size=None, | |
| base_url="http://local.test", | |
| ) | |
| second = service.submit_generation( | |
| OWNER, | |
| client_task_id="task-1", | |
| prompt="cat", | |
| model="gpt-image-2", | |
| size=None, | |
| base_url="http://local.test", | |
| ) | |
| self.assertEqual(first["id"], "task-1") | |
| self.assertEqual(second["id"], "task-1") | |
| task = wait_for_task(service, OWNER, "task-1", "success") | |
| self.assertEqual(task["data"][0]["url"], "http://example.test/image.png") | |
| self.assertEqual(calls, 1) | |
| def test_different_owner_cannot_query_task(self): | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| service = self.make_service(Path(tmp_dir) / "image_tasks.json") | |
| service.submit_generation( | |
| OWNER, | |
| client_task_id="private-task", | |
| prompt="cat", | |
| model="gpt-image-2", | |
| size=None, | |
| base_url="http://local.test", | |
| ) | |
| wait_for_task(service, OWNER, "private-task", "success") | |
| result = service.list_tasks(OTHER_OWNER, ["private-task"]) | |
| self.assertEqual(result["items"], []) | |
| self.assertEqual(result["missing_ids"], ["private-task"]) | |
| def test_success_task_persists_to_new_service_instance(self): | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| path = Path(tmp_dir) / "image_tasks.json" | |
| service = self.make_service(path) | |
| service.submit_generation( | |
| OWNER, | |
| client_task_id="persisted-task", | |
| prompt="cat", | |
| model="gpt-image-2", | |
| size=None, | |
| base_url="http://local.test", | |
| ) | |
| wait_for_task(service, OWNER, "persisted-task", "success") | |
| reloaded = self.make_service(path) | |
| result = reloaded.list_tasks(OWNER, ["persisted-task"]) | |
| self.assertEqual(result["missing_ids"], []) | |
| self.assertEqual(result["items"][0]["status"], "success") | |
| self.assertEqual(result["items"][0]["data"][0]["url"], "http://example.test/image.png") | |
| def test_startup_marks_unfinished_tasks_as_error(self): | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| path = Path(tmp_dir) / "image_tasks.json" | |
| path.write_text( | |
| json.dumps( | |
| { | |
| "tasks": [ | |
| { | |
| "id": "queued-task", | |
| "owner_id": "owner-1", | |
| "status": "queued", | |
| "mode": "generate", | |
| "model": "gpt-image-2", | |
| "created_at": "2099-01-01 00:00:00", | |
| "updated_at": "2099-01-01 00:00:00", | |
| }, | |
| { | |
| "id": "running-task", | |
| "owner_id": "owner-1", | |
| "status": "running", | |
| "mode": "generate", | |
| "model": "gpt-image-2", | |
| "created_at": "2099-01-01 00:00:00", | |
| "updated_at": "2099-01-01 00:00:00", | |
| }, | |
| ] | |
| } | |
| ), | |
| encoding="utf-8", | |
| ) | |
| service = self.make_service(path) | |
| result = service.list_tasks(OWNER, ["queued-task", "running-task"]) | |
| self.assertEqual([item["status"] for item in result["items"]], ["error", "error"]) | |
| self.assertTrue(all("已中断" in item.get("error", "") for item in result["items"])) | |
| if __name__ == "__main__": | |
| unittest.main() | |