Spaces:
Sleeping
Sleeping
| import asyncio | |
| import websockets | |
| import json | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| from huggingface_hub import InferenceClient | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI() | |
| client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
| WEBSOCKET_URL = "wss://4d24-196-75-8-109.ngrok-free.app/ws" | |
| class Item(BaseModel): | |
| prompt: str | |
| history: list = [] | |
| system_prompt: str = "" | |
| temperature: float = 0.0 | |
| max_new_tokens: int = 1048 | |
| top_p: float = 0.15 | |
| repetition_penalty: float = 1.0 | |
| def format_prompt(message, history): | |
| prompt = "<s>" | |
| for user_prompt, bot_response in history: | |
| prompt += f"[INST] {user_prompt} [/INST]" | |
| prompt += f" {bot_response}</s> " | |
| prompt += f"[INST] {message} [/INST]" | |
| return prompt | |
| async def generate_stream(item: Item): | |
| temperature = max(float(item.temperature), 1e-2) | |
| top_p = float(item.top_p) | |
| generate_kwargs = dict( | |
| temperature=temperature, | |
| max_new_tokens=item.max_new_tokens, | |
| top_p=top_p, | |
| repetition_penalty=item.repetition_penalty, | |
| do_sample=True, | |
| seed=42, | |
| ) | |
| formatted_prompt = format_prompt(f"{item.system_prompt}, {item.prompt}", item.history) | |
| stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
| for response in stream: | |
| yield response.token.text | |
| async def websocket_client(): | |
| call_count = 0 | |
| while True: | |
| try: | |
| async with websockets.connect(WEBSOCKET_URL) as websocket: | |
| logger.info("WebSocket connection established") | |
| while True: | |
| # Request a prompt | |
| await websocket.send(json.dumps({"type": "getPrompt"})) | |
| message = await websocket.recv() | |
| data = json.loads(message) | |
| logger.info(f"Received: {data}") | |
| if data.get('type') == 'prompt': | |
| call_count += 1 | |
| prompt_id = data.get('id') | |
| prompt_text = data.get('prompt') | |
| logger.info(f"Processing prompt: {prompt_text}") | |
| logger.info(f"Call count: {call_count}") | |
| item = Item(prompt=prompt_text) | |
| async for chunk in generate_stream(item): | |
| await websocket.send(json.dumps({ | |
| "type": "chunk", | |
| "id": prompt_id, | |
| "chunk": chunk | |
| })) | |
| logger.info(f"Sent chunk: {chunk}") | |
| await websocket.send(json.dumps({ | |
| "type": "completed", | |
| "id": prompt_id | |
| })) | |
| logger.info("Generation completed") | |
| elif data.get('type') == 'error': | |
| logger.info(f"Received error: {data.get('message')}") | |
| # Wait a bit before requesting a new prompt | |
| await asyncio.sleep(5) | |
| else: | |
| logger.info(f"Received unexpected message type: {data.get('type')}") | |
| except websockets.exceptions.ConnectionClosed: | |
| logger.error("WebSocket connection closed. Retrying in 5 seconds...") | |
| await asyncio.sleep(5) | |
| except Exception as e: | |
| logger.error(f"Error: {e}. Retrying in 5 seconds...") | |
| await asyncio.sleep(5) | |
| async def startup_event(): | |
| asyncio.create_task(websocket_client()) | |
| async def root(): | |
| return {"message": "WebSocket client is running"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |