Spaces:
Sleeping
Sleeping
Commit
·
f6cfbcb
1
Parent(s):
89a4057
adds documentation to files
Browse files- validators/database.py +45 -0
- validators/stream_manager.py +28 -0
- validators/streamer.py +77 -0
validators/database.py
CHANGED
|
@@ -6,11 +6,40 @@ from .streamer import ProcessedStreamResponse
|
|
| 6 |
|
| 7 |
|
| 8 |
class LogDatabase:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 9 |
def __init__(self, log_database_path: str):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 10 |
self.log_database_path = log_database_path
|
| 11 |
self.ensure_db_exists(log_database_path)
|
| 12 |
|
| 13 |
def ensure_db_exists(self, file_path):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 14 |
if not os.path.exists(file_path):
|
| 15 |
# Create an empty JSONL file
|
| 16 |
with open(file_path, "w") as file:
|
|
@@ -21,6 +50,15 @@ class LogDatabase:
|
|
| 21 |
bt.logging.info(f"File '{file_path}' already exists.")
|
| 22 |
|
| 23 |
async def add_streams_to_db(self, stream_responses: ProcessedStreamResponse):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 24 |
bt.logging.info(f"Writing streams to the database...")
|
| 25 |
try:
|
| 26 |
stream_responses_dict = [
|
|
@@ -35,6 +73,13 @@ class LogDatabase:
|
|
| 35 |
raise e
|
| 36 |
|
| 37 |
async def append_dicts_to_file(self, file_path, dictionaries):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 38 |
async with aiofiles.open(file_path, mode="a") as file:
|
| 39 |
for dictionary in dictionaries:
|
| 40 |
await file.write(json.dumps(dictionary) + "\n")
|
|
|
|
| 6 |
|
| 7 |
|
| 8 |
class LogDatabase:
|
| 9 |
+
"""
|
| 10 |
+
A class to manage a log database stored as a JSONL (JSON Lines) file.
|
| 11 |
+
|
| 12 |
+
Attributes:
|
| 13 |
+
log_database_path (str): The path to the log database file.
|
| 14 |
+
|
| 15 |
+
Methods:
|
| 16 |
+
ensure_db_exists(file_path):
|
| 17 |
+
Ensures that the log database file exists. If it doesn't, an empty file is created.
|
| 18 |
+
|
| 19 |
+
add_streams_to_db(stream_responses: ProcessedStreamResponse):
|
| 20 |
+
Asynchronously adds stream responses to the log database.
|
| 21 |
+
|
| 22 |
+
append_dicts_to_file(file_path, dictionaries):
|
| 23 |
+
Asynchronously appends a list of dictionaries to the specified file.
|
| 24 |
+
"""
|
| 25 |
+
|
| 26 |
def __init__(self, log_database_path: str):
|
| 27 |
+
"""
|
| 28 |
+
Initializes the LogDatabase with the given log database file path.
|
| 29 |
+
|
| 30 |
+
Args:
|
| 31 |
+
log_database_path (str): The path to the log database file.
|
| 32 |
+
"""
|
| 33 |
self.log_database_path = log_database_path
|
| 34 |
self.ensure_db_exists(log_database_path)
|
| 35 |
|
| 36 |
def ensure_db_exists(self, file_path):
|
| 37 |
+
"""
|
| 38 |
+
Ensures that the log database file exists. If it doesn't, creates an empty JSONL file.
|
| 39 |
+
|
| 40 |
+
Args:
|
| 41 |
+
file_path (str): The path to the log database file.
|
| 42 |
+
"""
|
| 43 |
if not os.path.exists(file_path):
|
| 44 |
# Create an empty JSONL file
|
| 45 |
with open(file_path, "w") as file:
|
|
|
|
| 50 |
bt.logging.info(f"File '{file_path}' already exists.")
|
| 51 |
|
| 52 |
async def add_streams_to_db(self, stream_responses: ProcessedStreamResponse):
|
| 53 |
+
"""
|
| 54 |
+
Asynchronously adds stream responses to the log database.
|
| 55 |
+
|
| 56 |
+
Args:
|
| 57 |
+
stream_responses (ProcessedStreamResponse): A list of processed stream responses to add to the log database.
|
| 58 |
+
|
| 59 |
+
Raises:
|
| 60 |
+
Exception: If an error occurs while adding streams to the database.
|
| 61 |
+
"""
|
| 62 |
bt.logging.info(f"Writing streams to the database...")
|
| 63 |
try:
|
| 64 |
stream_responses_dict = [
|
|
|
|
| 73 |
raise e
|
| 74 |
|
| 75 |
async def append_dicts_to_file(self, file_path, dictionaries):
|
| 76 |
+
"""
|
| 77 |
+
Asynchronously appends a list of dictionaries to the specified file.
|
| 78 |
+
|
| 79 |
+
Args:
|
| 80 |
+
file_path (str): The path to the file where dictionaries will be appended.
|
| 81 |
+
dictionaries (list): A list of dictionaries to append to the file.
|
| 82 |
+
"""
|
| 83 |
async with aiofiles.open(file_path, mode="a") as file:
|
| 84 |
for dictionary in dictionaries:
|
| 85 |
await file.write(json.dumps(dictionary) + "\n")
|
validators/stream_manager.py
CHANGED
|
@@ -7,7 +7,24 @@ from aiohttp.web import Request
|
|
| 7 |
|
| 8 |
|
| 9 |
class StreamManager:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 10 |
def __init__(self, log_database_path: str = "requests_db.jsonl"):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 11 |
self.log_database = LogDatabase(log_database_path)
|
| 12 |
|
| 13 |
async def process_streams(
|
|
@@ -16,6 +33,17 @@ class StreamManager:
|
|
| 16 |
streams_responses: List[AsyncIterator],
|
| 17 |
stream_uids: List[int],
|
| 18 |
):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 19 |
lock = asyncio.Lock()
|
| 20 |
|
| 21 |
streamers = [
|
|
|
|
| 7 |
|
| 8 |
|
| 9 |
class StreamManager:
|
| 10 |
+
"""
|
| 11 |
+
A class to manage the processing of multiple asynchronous data streams and log their responses.
|
| 12 |
+
|
| 13 |
+
Attributes:
|
| 14 |
+
log_database (LogDatabase): The log database to store stream responses.
|
| 15 |
+
|
| 16 |
+
Methods:
|
| 17 |
+
process_streams(request, streams_responses, stream_uids):
|
| 18 |
+
Processes multiple asynchronous streams, logs their responses, and returns the selected stream response.
|
| 19 |
+
"""
|
| 20 |
+
|
| 21 |
def __init__(self, log_database_path: str = "requests_db.jsonl"):
|
| 22 |
+
"""
|
| 23 |
+
Initializes the StreamManager with the given log database file path.
|
| 24 |
+
|
| 25 |
+
Args:
|
| 26 |
+
log_database_path (str): The path to the log database file, defaults to "requests_db.jsonl".
|
| 27 |
+
"""
|
| 28 |
self.log_database = LogDatabase(log_database_path)
|
| 29 |
|
| 30 |
async def process_streams(
|
|
|
|
| 33 |
streams_responses: List[AsyncIterator],
|
| 34 |
stream_uids: List[int],
|
| 35 |
):
|
| 36 |
+
"""
|
| 37 |
+
Processes multiple asynchronous streams, logs their responses, and returns the selected stream response (stream from first non-empty chunk).
|
| 38 |
+
|
| 39 |
+
Args:
|
| 40 |
+
request (Request): The web request object.
|
| 41 |
+
streams_responses (List[AsyncIterator]): A list of asynchronous iterators representing the streams.
|
| 42 |
+
stream_uids (List[int]): A list of unique IDs for the streams.
|
| 43 |
+
|
| 44 |
+
Returns:
|
| 45 |
+
ProcessedStreamResponse: The response from the selected stream.
|
| 46 |
+
"""
|
| 47 |
lock = asyncio.Lock()
|
| 48 |
|
| 49 |
streamers = [
|
validators/streamer.py
CHANGED
|
@@ -11,6 +11,18 @@ from prompting.protocol import StreamPromptingSynapse
|
|
| 11 |
|
| 12 |
|
| 13 |
class StreamChunk(BaseModel):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 14 |
delta: str
|
| 15 |
finish_reason: Optional[str]
|
| 16 |
accumulated_chunks: List[str]
|
|
@@ -20,11 +32,29 @@ class StreamChunk(BaseModel):
|
|
| 20 |
selected_uid: int
|
| 21 |
|
| 22 |
def encode(self, encoding: str) -> bytes:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 23 |
data = json.dumps(self.dict(), indent=4)
|
| 24 |
return data.encode(encoding)
|
| 25 |
|
| 26 |
|
| 27 |
class StreamError(BaseModel):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 28 |
error: str
|
| 29 |
timestamp: str
|
| 30 |
sequence_number: int
|
|
@@ -39,6 +69,20 @@ ProcessedStreamResponse = Union[StreamChunk, StreamError]
|
|
| 39 |
|
| 40 |
|
| 41 |
class AsyncResponseDataStreamer:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 42 |
def __init__(
|
| 43 |
self,
|
| 44 |
async_iterator: AsyncIterator,
|
|
@@ -59,6 +103,15 @@ class AsyncResponseDataStreamer:
|
|
| 59 |
def ensure_response_is_created(
|
| 60 |
self, initiated_response: web.StreamResponse
|
| 61 |
) -> web.StreamResponse:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 62 |
# Creates response if it was not created
|
| 63 |
if initiated_response == None:
|
| 64 |
initiated_response = web_response.StreamResponse(status=200, reason="OK")
|
|
@@ -74,6 +127,18 @@ class AsyncResponseDataStreamer:
|
|
| 74 |
stream_chunk: StreamChunk,
|
| 75 |
lock: asyncio.Lock,
|
| 76 |
) -> web.StreamResponse:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 77 |
# Try to acquire the lock and sets the lock_acquired flag. Only the stream that acquires the lock should write to the response
|
| 78 |
if lock.locked() == False:
|
| 79 |
self.lock_acquired = await lock.acquire()
|
|
@@ -93,6 +158,18 @@ class AsyncResponseDataStreamer:
|
|
| 93 |
return initiated_response
|
| 94 |
|
| 95 |
async def stream(self, request: web.Request) -> ProcessedStreamResponse:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 96 |
try:
|
| 97 |
start_time = time.time()
|
| 98 |
client_response: web.Response = None
|
|
|
|
| 11 |
|
| 12 |
|
| 13 |
class StreamChunk(BaseModel):
|
| 14 |
+
"""
|
| 15 |
+
A model representing a chunk of streaming data.
|
| 16 |
+
|
| 17 |
+
Attributes:
|
| 18 |
+
delta (str): The change in the stream.
|
| 19 |
+
finish_reason (Optional[str]): The reason for finishing the stream.
|
| 20 |
+
accumulated_chunks (List[str]): List of accumulated chunks.
|
| 21 |
+
accumulated_chunks_timings (List[float]): Timings for the accumulated chunks.
|
| 22 |
+
timestamp (str): The timestamp of the chunk.
|
| 23 |
+
sequence_number (int): The sequence number of the chunk.
|
| 24 |
+
selected_uid (int): The selected user ID.
|
| 25 |
+
"""
|
| 26 |
delta: str
|
| 27 |
finish_reason: Optional[str]
|
| 28 |
accumulated_chunks: List[str]
|
|
|
|
| 32 |
selected_uid: int
|
| 33 |
|
| 34 |
def encode(self, encoding: str) -> bytes:
|
| 35 |
+
"""
|
| 36 |
+
Encodes the StreamChunk instance to a JSON-formatted bytes object.
|
| 37 |
+
|
| 38 |
+
Args:
|
| 39 |
+
encoding (str): The encoding to use.
|
| 40 |
+
|
| 41 |
+
Returns:
|
| 42 |
+
bytes: The encoded JSON data.
|
| 43 |
+
"""
|
| 44 |
data = json.dumps(self.dict(), indent=4)
|
| 45 |
return data.encode(encoding)
|
| 46 |
|
| 47 |
|
| 48 |
class StreamError(BaseModel):
|
| 49 |
+
"""
|
| 50 |
+
A model representing an error in the streaming data.
|
| 51 |
+
|
| 52 |
+
Attributes:
|
| 53 |
+
error (str): The error message.
|
| 54 |
+
timestamp (str): The timestamp of the error.
|
| 55 |
+
sequence_number (int): The sequence number at the time of error.
|
| 56 |
+
finish_reason (str): The reason for finishing the stream, defaults to "error".
|
| 57 |
+
"""
|
| 58 |
error: str
|
| 59 |
timestamp: str
|
| 60 |
sequence_number: int
|
|
|
|
| 69 |
|
| 70 |
|
| 71 |
class AsyncResponseDataStreamer:
|
| 72 |
+
"""
|
| 73 |
+
A class to manage asynchronous streaming of response data.
|
| 74 |
+
|
| 75 |
+
Attributes:
|
| 76 |
+
async_iterator (AsyncIterator): An asynchronous iterator for streaming data.
|
| 77 |
+
selected_uid (int): The selected user ID.
|
| 78 |
+
lock (asyncio.Lock): An asyncio lock to ensure exclusive access.
|
| 79 |
+
delay (float): Delay between processing chunks, defaults to 0.1 seconds.
|
| 80 |
+
accumulated_chunks (List[str]): List of accumulated chunks.
|
| 81 |
+
accumulated_chunks_timings (List[float]): Timings for the accumulated chunks.
|
| 82 |
+
finish_reason (str): The reason for finishing the stream.
|
| 83 |
+
sequence_number (int): The sequence number of the stream.
|
| 84 |
+
lock_acquired (bool): Flag indicating if the lock was acquired.
|
| 85 |
+
"""
|
| 86 |
def __init__(
|
| 87 |
self,
|
| 88 |
async_iterator: AsyncIterator,
|
|
|
|
| 103 |
def ensure_response_is_created(
|
| 104 |
self, initiated_response: web.StreamResponse
|
| 105 |
) -> web.StreamResponse:
|
| 106 |
+
"""
|
| 107 |
+
Ensures that a StreamResponse is created if it does not already exist.
|
| 108 |
+
|
| 109 |
+
Args:
|
| 110 |
+
initiated_response (web.StreamResponse): The initiated response.
|
| 111 |
+
|
| 112 |
+
Returns:
|
| 113 |
+
web.StreamResponse: The ensured response.
|
| 114 |
+
"""
|
| 115 |
# Creates response if it was not created
|
| 116 |
if initiated_response == None:
|
| 117 |
initiated_response = web_response.StreamResponse(status=200, reason="OK")
|
|
|
|
| 127 |
stream_chunk: StreamChunk,
|
| 128 |
lock: asyncio.Lock,
|
| 129 |
) -> web.StreamResponse:
|
| 130 |
+
"""
|
| 131 |
+
Writes a stream chunk to the response if the lock is acquired.
|
| 132 |
+
|
| 133 |
+
Args:
|
| 134 |
+
request (web.Request): The web request object.
|
| 135 |
+
initiated_response (web.StreamResponse): The initiated response.
|
| 136 |
+
stream_chunk (StreamChunk): The chunk of stream data to write.
|
| 137 |
+
lock (asyncio.Lock): The lock to ensure exclusive access.
|
| 138 |
+
|
| 139 |
+
Returns:
|
| 140 |
+
web.StreamResponse: The response with the written chunk.
|
| 141 |
+
"""
|
| 142 |
# Try to acquire the lock and sets the lock_acquired flag. Only the stream that acquires the lock should write to the response
|
| 143 |
if lock.locked() == False:
|
| 144 |
self.lock_acquired = await lock.acquire()
|
|
|
|
| 158 |
return initiated_response
|
| 159 |
|
| 160 |
async def stream(self, request: web.Request) -> ProcessedStreamResponse:
|
| 161 |
+
"""
|
| 162 |
+
Streams data from the async iterator and writes it to the response.
|
| 163 |
+
|
| 164 |
+
Args:
|
| 165 |
+
request (web.Request): The web request object.
|
| 166 |
+
|
| 167 |
+
Returns:
|
| 168 |
+
ProcessedStreamResponse: The final processed stream response.
|
| 169 |
+
|
| 170 |
+
Raises:
|
| 171 |
+
ValueError: If the stream does not return a valid synapse.
|
| 172 |
+
"""
|
| 173 |
try:
|
| 174 |
start_time = time.time()
|
| 175 |
client_response: web.Response = None
|