File size: 4,193 Bytes
8a37e0a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from abc import ABC, abstractmethod
from typing import Optional

from invokeai.app.services.session_queue.session_queue_common import (
    QUEUE_ITEM_STATUS,
    Batch,
    BatchStatus,
    CancelByBatchIDsResult,
    CancelByDestinationResult,
    CancelByQueueIDResult,
    ClearResult,
    EnqueueBatchResult,
    IsEmptyResult,
    IsFullResult,
    PruneResult,
    SessionQueueCountsByDestination,
    SessionQueueItem,
    SessionQueueItemDTO,
    SessionQueueStatus,
)
from invokeai.app.services.shared.graph import GraphExecutionState
from invokeai.app.services.shared.pagination import CursorPaginatedResults


class SessionQueueBase(ABC):
    """Base class for session queue"""

    @abstractmethod
    def dequeue(self) -> Optional[SessionQueueItem]:
        """Dequeues the next session queue item."""
        pass

    @abstractmethod
    def enqueue_batch(self, queue_id: str, batch: Batch, prepend: bool) -> EnqueueBatchResult:
        """Enqueues all permutations of a batch for execution."""
        pass

    @abstractmethod
    def get_current(self, queue_id: str) -> Optional[SessionQueueItem]:
        """Gets the currently-executing session queue item"""
        pass

    @abstractmethod
    def get_next(self, queue_id: str) -> Optional[SessionQueueItem]:
        """Gets the next session queue item (does not dequeue it)"""
        pass

    @abstractmethod
    def clear(self, queue_id: str) -> ClearResult:
        """Deletes all session queue items"""
        pass

    @abstractmethod
    def prune(self, queue_id: str) -> PruneResult:
        """Deletes all completed and errored session queue items"""
        pass

    @abstractmethod
    def is_empty(self, queue_id: str) -> IsEmptyResult:
        """Checks if the queue is empty"""
        pass

    @abstractmethod
    def is_full(self, queue_id: str) -> IsFullResult:
        """Checks if the queue is empty"""
        pass

    @abstractmethod
    def get_queue_status(self, queue_id: str) -> SessionQueueStatus:
        """Gets the status of the queue"""
        pass

    @abstractmethod
    def get_counts_by_destination(self, queue_id: str, destination: str) -> SessionQueueCountsByDestination:
        """Gets the counts of queue items by destination"""
        pass

    @abstractmethod
    def get_batch_status(self, queue_id: str, batch_id: str) -> BatchStatus:
        """Gets the status of a batch"""
        pass

    @abstractmethod
    def complete_queue_item(self, item_id: int) -> SessionQueueItem:
        """Completes a session queue item"""
        pass

    @abstractmethod
    def cancel_queue_item(self, item_id: int) -> SessionQueueItem:
        """Cancels a session queue item"""
        pass

    @abstractmethod
    def fail_queue_item(
        self, item_id: int, error_type: str, error_message: str, error_traceback: str
    ) -> SessionQueueItem:
        """Fails a session queue item"""
        pass

    @abstractmethod
    def cancel_by_batch_ids(self, queue_id: str, batch_ids: list[str]) -> CancelByBatchIDsResult:
        """Cancels all queue items with matching batch IDs"""
        pass

    @abstractmethod
    def cancel_by_destination(self, queue_id: str, destination: str) -> CancelByDestinationResult:
        """Cancels all queue items with the given batch destination"""
        pass

    @abstractmethod
    def cancel_by_queue_id(self, queue_id: str) -> CancelByQueueIDResult:
        """Cancels all queue items with matching queue ID"""
        pass

    @abstractmethod
    def list_queue_items(
        self,
        queue_id: str,
        limit: int,
        priority: int,
        cursor: Optional[int] = None,
        status: Optional[QUEUE_ITEM_STATUS] = None,
    ) -> CursorPaginatedResults[SessionQueueItemDTO]:
        """Gets a page of session queue items"""
        pass

    @abstractmethod
    def get_queue_item(self, item_id: int) -> SessionQueueItem:
        """Gets a session queue item by ID"""
        pass

    @abstractmethod
    def set_queue_item_session(self, item_id: int, session: GraphExecutionState) -> SessionQueueItem:
        """Sets the session for a session queue item. Use this to update the session state."""
        pass