from __future__ import annotations import json import tempfile import time import unittest from pathlib import Path from services.image_task_service import ImageTaskService OWNER = {"id": "owner-1", "name": "Owner", "role": "admin"} OTHER_OWNER = {"id": "owner-2", "name": "Other", "role": "user"} def wait_for_task(service: ImageTaskService, identity: dict[str, object], task_id: str, status: str, timeout: float = 2.0): deadline = time.time() + timeout last = None while time.time() < deadline: result = service.list_tasks(identity, [task_id]) last = (result.get("items") or [None])[0] if last and last.get("status") == status: return last time.sleep(0.02) raise AssertionError(f"task {task_id} did not reach {status}, last={last}") class ImageTaskServiceTests(unittest.TestCase): def make_service(self, path: Path, handler=None) -> ImageTaskService: return ImageTaskService( path, generation_handler=handler or (lambda _payload: {"data": [{"url": "http://example.test/image.png"}]}), edit_handler=handler or (lambda _payload: {"data": [{"url": "http://example.test/edit.png"}]}), retention_days_getter=lambda: 30, ) def test_duplicate_submit_uses_existing_task(self): with tempfile.TemporaryDirectory() as tmp_dir: calls = 0 def handler(_payload): nonlocal calls calls += 1 time.sleep(0.05) return {"data": [{"url": "http://example.test/image.png"}]} service = self.make_service(Path(tmp_dir) / "image_tasks.json", handler) first = service.submit_generation( OWNER, client_task_id="task-1", prompt="cat", model="gpt-image-2", size=None, base_url="http://local.test", ) second = service.submit_generation( OWNER, client_task_id="task-1", prompt="cat", model="gpt-image-2", size=None, base_url="http://local.test", ) self.assertEqual(first["id"], "task-1") self.assertEqual(second["id"], "task-1") task = wait_for_task(service, OWNER, "task-1", "success") self.assertEqual(task["data"][0]["url"], "http://example.test/image.png") self.assertEqual(calls, 1) def test_different_owner_cannot_query_task(self): with tempfile.TemporaryDirectory() as tmp_dir: service = self.make_service(Path(tmp_dir) / "image_tasks.json") service.submit_generation( OWNER, client_task_id="private-task", prompt="cat", model="gpt-image-2", size=None, base_url="http://local.test", ) wait_for_task(service, OWNER, "private-task", "success") result = service.list_tasks(OTHER_OWNER, ["private-task"]) self.assertEqual(result["items"], []) self.assertEqual(result["missing_ids"], ["private-task"]) def test_success_task_persists_to_new_service_instance(self): with tempfile.TemporaryDirectory() as tmp_dir: path = Path(tmp_dir) / "image_tasks.json" service = self.make_service(path) service.submit_generation( OWNER, client_task_id="persisted-task", prompt="cat", model="gpt-image-2", size=None, base_url="http://local.test", ) wait_for_task(service, OWNER, "persisted-task", "success") reloaded = self.make_service(path) result = reloaded.list_tasks(OWNER, ["persisted-task"]) self.assertEqual(result["missing_ids"], []) self.assertEqual(result["items"][0]["status"], "success") self.assertEqual(result["items"][0]["data"][0]["url"], "http://example.test/image.png") def test_startup_marks_unfinished_tasks_as_error(self): with tempfile.TemporaryDirectory() as tmp_dir: path = Path(tmp_dir) / "image_tasks.json" path.write_text( json.dumps( { "tasks": [ { "id": "queued-task", "owner_id": "owner-1", "status": "queued", "mode": "generate", "model": "gpt-image-2", "created_at": "2099-01-01 00:00:00", "updated_at": "2099-01-01 00:00:00", }, { "id": "running-task", "owner_id": "owner-1", "status": "running", "mode": "generate", "model": "gpt-image-2", "created_at": "2099-01-01 00:00:00", "updated_at": "2099-01-01 00:00:00", }, ] } ), encoding="utf-8", ) service = self.make_service(path) result = service.list_tasks(OWNER, ["queued-task", "running-task"]) self.assertEqual([item["status"] for item in result["items"]], ["error", "error"]) self.assertTrue(all("已中断" in item.get("error", "") for item in result["items"])) if __name__ == "__main__": unittest.main()