Spaces:
Sleeping
Sleeping
Commit
·
b476e3c
1
Parent(s):
06c32c7
pr fixes
Browse files- validators/stream_manager.py +2 -2
- validators/streamer.py +7 -10
validators/stream_manager.py
CHANGED
|
@@ -33,9 +33,9 @@ class StreamManager:
|
|
| 33 |
|
| 34 |
await self.log_database.add_streams_to_db(completed_streams)
|
| 35 |
# Gets the first stream that acquired the lock, meaning the first stream that was able to return a non-empty chunk
|
| 36 |
-
|
| 37 |
(
|
| 38 |
-
|
| 39 |
for streamer, completed_stream in zip(streamers, completed_streams)
|
| 40 |
if streamer.lock_acquired
|
| 41 |
),
|
|
|
|
| 33 |
|
| 34 |
await self.log_database.add_streams_to_db(completed_streams)
|
| 35 |
# Gets the first stream that acquired the lock, meaning the first stream that was able to return a non-empty chunk
|
| 36 |
+
selected_stream = next(
|
| 37 |
(
|
| 38 |
+
completed_stream
|
| 39 |
for streamer, completed_stream in zip(streamers, completed_streams)
|
| 40 |
if streamer.lock_acquired
|
| 41 |
),
|
validators/streamer.py
CHANGED
|
@@ -93,10 +93,6 @@ class AsyncResponseDataStreamer:
|
|
| 93 |
return initiated_response
|
| 94 |
|
| 95 |
async def stream(self, request: web.Request) -> ProcessedStreamResponse:
|
| 96 |
-
# response = web_response.StreamResponse(status=200, reason="OK")
|
| 97 |
-
# response.headers["Content-Type"] = "application/json"
|
| 98 |
-
# await response.prepare(request) # Prepare and send the headers
|
| 99 |
-
|
| 100 |
try:
|
| 101 |
start_time = time.time()
|
| 102 |
client_response: web.Response = None
|
|
@@ -104,20 +100,21 @@ class AsyncResponseDataStreamer:
|
|
| 104 |
|
| 105 |
async for chunk in self.async_iterator:
|
| 106 |
if isinstance(chunk, str):
|
| 107 |
-
#
|
| 108 |
-
|
| 109 |
-
|
|
|
|
|
|
|
| 110 |
self.accumulated_chunks_timings.append(time.time() - start_time)
|
| 111 |
# Gets new response state
|
| 112 |
self.sequence_number += 1
|
| 113 |
new_response_state = self._create_chunk_response(
|
| 114 |
-
|
| 115 |
)
|
| 116 |
-
# Writes the new response state to the response
|
| 117 |
client_response = await self.write_to_stream(
|
| 118 |
request, client_response, new_response_state, self.lock
|
| 119 |
)
|
| 120 |
-
# await response.write(new_response_state.encode('utf-8'))
|
| 121 |
|
| 122 |
if chunk is not None and isinstance(chunk, StreamPromptingSynapse):
|
| 123 |
if len(self.accumulated_chunks) == 0:
|
|
|
|
| 93 |
return initiated_response
|
| 94 |
|
| 95 |
async def stream(self, request: web.Request) -> ProcessedStreamResponse:
|
|
|
|
|
|
|
|
|
|
|
|
|
| 96 |
try:
|
| 97 |
start_time = time.time()
|
| 98 |
client_response: web.Response = None
|
|
|
|
| 100 |
|
| 101 |
async for chunk in self.async_iterator:
|
| 102 |
if isinstance(chunk, str):
|
| 103 |
+
# If chunk is empty, skip
|
| 104 |
+
if not chunk:
|
| 105 |
+
continue
|
| 106 |
+
|
| 107 |
+
self.accumulated_chunks.append(chunk)
|
| 108 |
self.accumulated_chunks_timings.append(time.time() - start_time)
|
| 109 |
# Gets new response state
|
| 110 |
self.sequence_number += 1
|
| 111 |
new_response_state = self._create_chunk_response(
|
| 112 |
+
chunk
|
| 113 |
)
|
| 114 |
+
# Writes the new response state to the response
|
| 115 |
client_response = await self.write_to_stream(
|
| 116 |
request, client_response, new_response_state, self.lock
|
| 117 |
)
|
|
|
|
| 118 |
|
| 119 |
if chunk is not None and isinstance(chunk, StreamPromptingSynapse):
|
| 120 |
if len(self.accumulated_chunks) == 0:
|