maolin.liu
commited on
Commit
·
ef7f04e
1
Parent(s):
475a8bc
[bugfix]Bugfix.
Browse files- consumer/base.py +2 -28
consumer/base.py
CHANGED
|
@@ -38,6 +38,8 @@ class Headers(BaseModel):
|
|
| 38 |
@field_validator('priority', mode='before')
|
| 39 |
@classmethod
|
| 40 |
def _convert_priority(cls, value):
|
|
|
|
|
|
|
| 41 |
return Priority[value]
|
| 42 |
|
| 43 |
|
|
@@ -176,8 +178,6 @@ class BasicMessageReceiver(BasicPikaClient):
|
|
| 176 |
|
| 177 |
def decode_message(self, body):
|
| 178 |
if type(body) == bytes:
|
| 179 |
-
return msgpack.unpackb(body)
|
| 180 |
-
elif type(body) == str:
|
| 181 |
return json.loads(body)
|
| 182 |
else:
|
| 183 |
raise NotImplementedError
|
|
@@ -207,29 +207,3 @@ class BasicMessageReceiver(BasicPikaClient):
|
|
| 207 |
self.channel_tag = None
|
| 208 |
else:
|
| 209 |
logger.error("Do not cancel a non-existing job")
|
| 210 |
-
|
| 211 |
-
|
| 212 |
-
class ExtractFaceFramesConsumer(BasicMessageReceiver):
|
| 213 |
-
@sync
|
| 214 |
-
async def consume(self, channel, method, properties, body):
|
| 215 |
-
body = self.decode_message(body=body)
|
| 216 |
-
file_content = await self._download_image(img_url=body["url"])
|
| 217 |
-
# consume message logic ...
|
| 218 |
-
|
| 219 |
-
async def _download_image(self, img_url):
|
| 220 |
-
# do some async stuff here
|
| 221 |
-
pass
|
| 222 |
-
|
| 223 |
-
|
| 224 |
-
def create_consumer():
|
| 225 |
-
worker = ExtractFaceFramesConsumer()
|
| 226 |
-
worker.declare_queue(queue_name="myqueue")
|
| 227 |
-
worker.declare_exchange(exchange_name="myexchange")
|
| 228 |
-
worker.bind_queue(
|
| 229 |
-
exchange_name="myexchange", queue_name="myqueue", routing_key="randomkey"
|
| 230 |
-
)
|
| 231 |
-
worker.consume_messages(queue="myqueue", callback=worker.consume)
|
| 232 |
-
|
| 233 |
-
|
| 234 |
-
if __name__ == "__main__":
|
| 235 |
-
create_consumer()
|
|
|
|
| 38 |
@field_validator('priority', mode='before')
|
| 39 |
@classmethod
|
| 40 |
def _convert_priority(cls, value):
|
| 41 |
+
if isinstance(value, Priority):
|
| 42 |
+
return value
|
| 43 |
return Priority[value]
|
| 44 |
|
| 45 |
|
|
|
|
| 178 |
|
| 179 |
def decode_message(self, body):
|
| 180 |
if type(body) == bytes:
|
|
|
|
|
|
|
| 181 |
return json.loads(body)
|
| 182 |
else:
|
| 183 |
raise NotImplementedError
|
|
|
|
| 207 |
self.channel_tag = None
|
| 208 |
else:
|
| 209 |
logger.error("Do not cancel a non-existing job")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|