test / app.py
paola1's picture
Update app.py
7657e87 verified
from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import requests
import json
import asyncio
import os
from huggingface_hub import HfApi
import schedule
import threading
import time
app = FastAPI()
# Define the request model
class ChatRequest(BaseModel):
messages: list = [{"role": "user", "content": "Lol full form"}]
model: str = "gemini-1.5-pro-latest"
temperature: float = 1.0
top_p: float = 0.8
max_tokens: int = 4000
# Define the URL and headers
url = "https://chat.typegpt.net/api/openai/v1/chat/completions"
headers = {
"Accept": "application/json, text/event-stream",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0",
}
def restart_hf_space():
HF_TOKEN = os.environ.get("HF_TOKEN", None)
api = HfApi(token=HF_TOKEN)
api.restart_space('paola1/proxy', factory_reboot=False)
# Schedule the task to run every hour
schedule.every(6).hours.do(restart_hf_space)
def run_schedule():
while True:
schedule.run_pending()
time.sleep(1)
# Run the scheduled tasks in a separate thread
thread = threading.Thread(target=run_schedule)
thread.daemon = True # Set as daemon thread so it exits when main thread exits
thread.start()
@app.post("/chat")
async def chat(request: ChatRequest):
# Define the payload
payload = {
"messages": request.messages,
"stream": True,
"model": request.model,
"temperature": request.temperature,
"top_p": request.top_p,
"max_tokens": request.max_tokens
}
# Make the POST request with streaming
try:
response = requests.post(url, headers=headers, data=json.dumps(payload), stream=True)
# Check if the request was successful
if response.status_code == 200:
async def event_stream():
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith("data: "):
try:
data = json.loads(decoded_line[len('data: '):])
content = data.get("choices", [{}])[0].get("delta", {}).get("content", '')
if content:
yield f"{json.dumps({'response': content})}\n\n"
except json.JSONDecodeError:
continue
return StreamingResponse(event_stream(), media_type="text/event-stream")
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
except Exception as e:
print(e)
raise HTTPException(status_code=500, detail=str(e))
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
while True: # Keep the WebSocket connection alive
try:
# Receive the chat request from the client
data = await websocket.receive_text()
try:
request = ChatRequest.parse_raw(data)
except Exception as e:
await websocket.send_text(json.dumps({"error": "Invalid request format"}))
continue
# Define the payload
payload = {
"messages": request.messages,
"stream": True,
"model": request.model,
"temperature": request.temperature,
"top_p": request.top_p,
"max_tokens": request.max_tokens
}
# Make the POST request with streaming
response = requests.post(url, headers=headers, data=json.dumps(payload), stream=True)
# Check if the request was successful
if response.status_code == 200:
# Stream the response line by line
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith("data: "):
try:
data = json.loads(decoded_line[len('data: '):])
content = data.get("choices", [{}])[0].get("delta", {}).get("content", '')
if content:
await websocket.send_text(json.dumps({"response": content}))
except json.JSONDecodeError:
continue
else:
await websocket.send_text(json.dumps({"error": "Failed to get a valid response from the server"}))
except Exception as e:
print(e)
await websocket.send_text(json.dumps({"error": str(e)}))
break # Exit the loop if an error occurs
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8083)