|
|
|
|
|
from __future__ import annotations |
|
|
import asyncio, json, time, uuid, math, logging |
|
|
from typing import Any, AsyncIterable, Dict, List, Optional |
|
|
|
|
|
import aio_pika |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
def _now() -> int: |
|
|
return int(time.time()) |
|
|
|
|
|
def _chunk_text(s: str, sz: int = 120) -> List[str]: |
|
|
if not s: |
|
|
return [] |
|
|
return [s[i:i+sz] for i in range(0, len(s), sz)] |
|
|
|
|
|
def _last_user_text(messages: List[Dict[str, Any]]) -> str: |
|
|
|
|
|
for m in reversed(messages or []): |
|
|
if (m or {}).get("role") == "user": |
|
|
c = m.get("content", "") |
|
|
if isinstance(c, str): |
|
|
return c |
|
|
if isinstance(c, list): |
|
|
texts = [p.get("text","") for p in c if p.get("type") == "text"] |
|
|
return " ".join([t for t in texts if t]) |
|
|
return "" |
|
|
|
|
|
|
|
|
|
|
|
class ChatBackend: |
|
|
async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]: |
|
|
raise NotImplementedError |
|
|
|
|
|
class DummyChatBackend(ChatBackend): |
|
|
async def stream(self, request: Dict[str, Any]) -> AsyncIterable[Dict[str, Any]]: |
|
|
""" |
|
|
Emits OpenAI-shaped *streaming* chunks. |
|
|
- No tool_calls for now (keeps server simple) |
|
|
- Mimics delta frames + final finish_reason |
|
|
""" |
|
|
rid = f"chatcmpl-{uuid.uuid4().hex[:12]}" |
|
|
model = request.get("model", "gpt-4o-mini") |
|
|
text = _last_user_text(request.get("messages", [])) or "(empty)" |
|
|
answer = f"Echo (RabbitMQ): {text}" |
|
|
now = _now() |
|
|
|
|
|
|
|
|
yield { |
|
|
"id": rid, "object": "chat.completion.chunk", "created": now, "model": model, |
|
|
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}] |
|
|
} |
|
|
|
|
|
|
|
|
for piece in _chunk_text(answer, 140): |
|
|
yield { |
|
|
"id": rid, "object": "chat.completion.chunk", "created": now, "model": model, |
|
|
"choices": [{"index": 0, "delta": {"content": piece}, "finish_reason": None}] |
|
|
} |
|
|
|
|
|
|
|
|
yield { |
|
|
"id": rid, "object": "chat.completion.chunk", "created": now, "model": model, |
|
|
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}] |
|
|
} |
|
|
|
|
|
class ImagesBackend: |
|
|
async def generate_b64(self, request: Dict[str, Any]) -> str: |
|
|
""" |
|
|
Return base64 image string. This is a stub. |
|
|
Replace with your image generator (e.g., SDXL, OpenAI gpt-image-1, etc.). |
|
|
""" |
|
|
|
|
|
return "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGP4BwQACfsD/etCJH0AAAAASUVORK5CYII=" |
|
|
|
|
|
|
|
|
|
|
|
class ChatCompletionsServer: |
|
|
""" |
|
|
Consumes OpenAI Chat Completions requests from exchange `oa.chat.create`, |
|
|
routing-key `default`, and streams OpenAI-shaped chunks back to `reply_to`. |
|
|
""" |
|
|
def __init__(self, amqp_url: str, *, exchange_name: str = "oa.chat.create", routing_key: str = "default", backend: Optional[ChatBackend] = None): |
|
|
self._amqp_url = amqp_url |
|
|
self._exchange_name = exchange_name |
|
|
self._routing_key = routing_key |
|
|
self._backend = backend or DummyChatBackend() |
|
|
self._conn: Optional[aio_pika.RobustConnection] = None |
|
|
self._ch: Optional[aio_pika.RobustChannel] = None |
|
|
self._ex: Optional[aio_pika.Exchange] = None |
|
|
self._queue_name = f"{exchange_name}.{routing_key}" |
|
|
|
|
|
async def start(self): |
|
|
self._conn = await aio_pika.connect_robust(self._amqp_url) |
|
|
self._ch = await self._conn.channel() |
|
|
self._ex = await self._ch.declare_exchange(self._exchange_name, aio_pika.ExchangeType.DIRECT, durable=True) |
|
|
q = await self._ch.declare_queue(self._queue_name, durable=True) |
|
|
await q.bind(self._ex, routing_key=self._routing_key) |
|
|
await q.consume(self._on_message) |
|
|
logger.info("ChatCompletionsServer listening on %s/%s β %s", self._exchange_name, self._routing_key, self._queue_name) |
|
|
|
|
|
async def _on_message(self, msg: aio_pika.IncomingMessage): |
|
|
async with msg.process(ignore_processed=True): |
|
|
try: |
|
|
req = json.loads(msg.body.decode("utf-8", errors="replace")) |
|
|
reply_to = msg.reply_to |
|
|
corr_id = msg.correlation_id |
|
|
if not reply_to or not corr_id: |
|
|
logger.warning("Missing reply_to/correlation_id; dropping.") |
|
|
return |
|
|
|
|
|
async for chunk in self._backend.stream(req): |
|
|
await self._ch.default_exchange.publish( |
|
|
aio_pika.Message( |
|
|
body=json.dumps(chunk).encode("utf-8"), |
|
|
correlation_id=corr_id, |
|
|
content_type="application/json", |
|
|
delivery_mode=aio_pika.DeliveryMode.NOT_PERSISTENT, |
|
|
), |
|
|
routing_key=reply_to, |
|
|
) |
|
|
|
|
|
|
|
|
await self._ch.default_exchange.publish( |
|
|
aio_pika.Message( |
|
|
body=b'{"object":"stream.end"}', |
|
|
correlation_id=corr_id, |
|
|
content_type="application/json", |
|
|
), |
|
|
routing_key=reply_to, |
|
|
) |
|
|
|
|
|
except Exception: |
|
|
logger.exception("ChatCompletionsServer: failed to process message") |
|
|
|
|
|
class ImagesServer: |
|
|
""" |
|
|
Consumes OpenAI Images API requests from exchange `oa.images.generate`, |
|
|
routing-key `default`, and replies once with {data:[{b64_json:...}], created:...}. |
|
|
""" |
|
|
def __init__(self, amqp_url: str, *, exchange_name: str = "oa.images.generate", routing_key: str = "default", backend: Optional[ImagesBackend] = None): |
|
|
self._amqp_url = amqp_url |
|
|
self._exchange_name = exchange_name |
|
|
self._routing_key = routing_key |
|
|
self._backend = backend or ImagesBackend() |
|
|
self._conn: Optional[aio_pika.RobustConnection] = None |
|
|
self._ch: Optional[aio_pika.RobustChannel] = None |
|
|
self._ex: Optional[aio_pika.Exchange] = None |
|
|
self._queue_name = f"{exchange_name}.{routing_key}" |
|
|
|
|
|
async def start(self): |
|
|
self._conn = await aio_pika.connect_robust(self._amqp_url) |
|
|
self._ch = await self._conn.channel() |
|
|
self._ex = await self._ch.declare_exchange(self._exchange_name, aio_pika.ExchangeType.DIRECT, durable=True) |
|
|
q = await self._ch.declare_queue(self._queue_name, durable=True) |
|
|
await q.bind(self._ex, routing_key=self._routing_key) |
|
|
await q.consume(self._on_message) |
|
|
logger.info("ImagesServer listening on %s/%s β %s", self._exchange_name, self._routing_key, self._queue_name) |
|
|
|
|
|
async def _on_message(self, msg: aio_pika.IncomingMessage): |
|
|
async with msg.process(ignore_processed=True): |
|
|
try: |
|
|
req = json.loads(msg.body.decode("utf-8", errors="replace")) |
|
|
reply_to = msg.reply_to |
|
|
corr_id = msg.correlation_id |
|
|
if not reply_to or not corr_id: |
|
|
logger.warning("Missing reply_to/correlation_id; dropping.") |
|
|
return |
|
|
|
|
|
b64_img = await self._backend.generate_b64(req) |
|
|
resp = {"created": _now(), "data": [{"b64_json": b64_img}]} |
|
|
|
|
|
await self._ch.default_exchange.publish( |
|
|
aio_pika.Message( |
|
|
body=json.dumps(resp).encode("utf-8"), |
|
|
correlation_id=corr_id, |
|
|
content_type="application/json", |
|
|
), |
|
|
routing_key=reply_to, |
|
|
) |
|
|
|
|
|
except Exception: |
|
|
logger.exception("ImagesServer: failed to process message") |
|
|
|