File size: 2,203 Bytes
b701455
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import asyncio
import base64
import os

import pytest

import server
from src.FileManaging import ImageSaver


@pytest.mark.asyncio
async def test_process_group_chunks_large_batch(monkeypatch):
    """When many small requests are coalesced, the buffer should split them
    into smaller pipeline runs and merge results correctly."""

    # Keep chunk size small for the test
    server.LD_MAX_IMAGES_PER_GROUP = 3

    async def immediate_to_thread(func, /, *args, **kwargs):
        return func(*args, **kwargs)

    monkeypatch.setattr(server.asyncio, "to_thread", immediate_to_thread)

    # Prepare a simple fake pipeline that records saved images in the
    # in-memory byte buffer and returns a batched_results mapping.
    def fake_pipeline(**kwargs):
        per_sample_info = kwargs.get("per_sample_info", [])
        assert kwargs["batch"] <= 3
        results = {}
        for info in per_sample_info:
            rid = info["request_id"]
            filename = f"{rid}_img.png"
            # Simulate in-memory bytes being stored by SaveImage
            ImageSaver.store_image_bytes(f"LD-REQ-{rid}", filename, "Classic", b"PNGDATA")
            results.setdefault(rid, []).append({"filename": filename, "subfolder": "Classic"})
        return {"batched_results": results}

    monkeypatch.setattr(server, "pipeline", fake_pipeline)

    # Build a list of 7 pending requests which will be chunked as 3,3,1
    items = []
    for i in range(7):
        req = server.GenerateRequest(prompt=f"p{i}", num_images=1, batch_size=3)
        pr = server.PendingRequest(req, request_id=f"r{i:03d}")
        items.append(pr)

    buf = server.GenerationBuffer()

    # Call the internal _process_group directly
    await buf._process_group(items)

    # Ensure all futures were completed successfully and have images
    for p in items:
        assert p.future.done()
        res = p.future.result()
        # Each should resolve to a single image dict
        assert isinstance(res, dict)
        assert ("image" in res) or ("images" in res)

    # Verify that the in-memory buffer is empty for these prefixes
    for i in range(7):
        assert ImageSaver.pop_image_bytes(f"LD-REQ-r{i:03d}") == []