PixelForge / imageforge /backend /tests /test_job_manager.py
Gregorfun's picture
Initial commit
32c5da4
import time
from pathlib import Path
from backend.app.api.schemas import GenerateRequest, JobStatus
from backend.app.jobs.manager import JobManager
from backend.app.providers.factory import ProviderRegistry
from backend.app.storage.history import PromptHistoryStore
def test_registry_contains_localai() -> None:
registry = ProviderRegistry()
provider = registry.get("localai")
assert provider.id == "localai"
def test_job_lifecycle(tmp_path: Path, monkeypatch) -> None:
monkeypatch.setattr("backend.app.jobs.manager.OUTPUT_DIR", tmp_path)
manager = JobManager(ProviderRegistry(), PromptHistoryStore(), state_file=tmp_path / "jobs_state.json")
req = GenerateRequest(
prompt="a mountain",
negative_prompt="",
model="dummy",
size="1024x1024",
count=1,
random_seed=True,
steps=10,
guidance=7.5,
)
job_id = manager.submit(req)
deadline = time.time() + 10
while time.time() < deadline:
state = manager.get(job_id)
assert state is not None
if state.status in {JobStatus.DONE, JobStatus.ERROR}:
break
time.sleep(0.1)
state = manager.get(job_id)
assert state is not None
assert state.status == JobStatus.DONE
assert state.progress == 100
assert state.image_paths
def test_job_cancel_from_queue(tmp_path: Path) -> None:
manager = JobManager(ProviderRegistry(), PromptHistoryStore(), state_file=tmp_path / "jobs_state.json")
req = GenerateRequest(prompt="cancel me", model="dummy")
job_id = manager.submit(req)
assert manager.cancel(job_id)
state = manager.get(job_id)
assert state is not None
assert state.cancel_requested
def test_job_stats_shape(tmp_path: Path) -> None:
manager = JobManager(ProviderRegistry(), PromptHistoryStore(), state_file=tmp_path / "jobs_state.json")
stats = manager.stats()
assert set(stats.keys()) == {
"queued",
"running",
"done",
"error",
"cancelled",
"total",
"last_24h",
}
def test_job_falls_back_when_primary_provider_fails(tmp_path: Path, monkeypatch) -> None:
monkeypatch.setattr("backend.app.jobs.manager.OUTPUT_DIR", tmp_path)
monkeypatch.setenv("IMAGEFORGE_ENABLE_AUTO_FALLBACK", "1")
monkeypatch.setenv("IMAGEFORGE_FALLBACK_MODELS", "localai,dummy")
monkeypatch.setenv("IMAGEFORGE_FALLBACK_MAX_STEPS", "12")
class FailingProvider:
id = "a1111"
name = "Failing A1111"
description = ""
def is_available(self) -> bool:
return True
def generate(self, request, output_dir, progress, is_cancelled): # noqa: ANN001
raise RuntimeError("primary failure")
class WorkingProvider:
id = "localai"
name = "Working LocalAI"
description = ""
def __init__(self) -> None:
self.last_steps = None
def is_available(self) -> bool:
return True
def generate(self, request, output_dir, progress, is_cancelled): # noqa: ANN001
self.last_steps = request.steps
output_dir.mkdir(parents=True, exist_ok=True)
path = output_dir / "image_01.png"
path.write_bytes(b"fallback")
progress(100, "ok")
from backend.app.providers.interface import ProviderResult
return ProviderResult(image_paths=[path])
class FakeRegistry:
def __init__(self) -> None:
self.working = WorkingProvider()
self.providers = {
"a1111": FailingProvider(),
"localai": self.working,
"dummy": self.working,
}
def get(self, provider_id: str):
return self.providers[provider_id]
def list(self):
return list(self.providers.values())
registry = FakeRegistry()
manager = JobManager(registry, PromptHistoryStore(), state_file=tmp_path / "jobs_state.json")
req = GenerateRequest(prompt="fallback", model="a1111", steps=30, guidance=9.0)
job_id = manager.submit(req)
deadline = time.time() + 10
while time.time() < deadline:
state = manager.get(job_id)
assert state is not None
if state.status in {JobStatus.DONE, JobStatus.ERROR}:
break
time.sleep(0.1)
state = manager.get(job_id)
assert state is not None
assert state.status == JobStatus.DONE
assert state.image_paths
assert registry.working.last_steps == 12