| | import json |
| | import random |
| | import string |
| | import time |
| | |
| |
|
| | import g4f |
| | from fastapi import FastAPI, Request |
| | from fastapi.responses import StreamingResponse |
| |
|
| | from g4f import ChatCompletion |
| | from loguru import logger |
| | from starlette.middleware.cors import CORSMiddleware |
| |
|
| | import nest_asyncio |
| | import os |
| |
|
| | nest_asyncio.apply() |
| |
|
| | app = FastAPI() |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| |
|
| | @app.post("/chat/completions") |
| | @app.post("/v1/chat/completions") |
| | async def chat_completions(request: Request): |
| | req_data = await request.json() |
| | stream = req_data.get("stream", False) |
| | model = req_data.get("model", "gpt-3.5-turbo") |
| | messages = req_data.get("messages") |
| | temperature = req_data.get("temperature", 1.0) |
| | top_p = req_data.get("top_p", 1.0) |
| | max_tokens = req_data.get("max_tokens", 0) |
| |
|
| | logger.info( |
| | f"chat_completions: stream: {stream}, model: {model}, temperature: {temperature}, top_p: {top_p}, max_tokens: {max_tokens}" |
| | ) |
| |
|
| | response = await gen_resp(max_tokens, messages, model, stream, temperature, top_p) |
| |
|
| | completion_id = "".join(random.choices(string.ascii_letters + string.digits, k=28)) |
| | completion_timestamp = int(time.time()) |
| |
|
| | if not stream: |
| | logger.info(f"chat_completions: response: {response}") |
| | return { |
| | "id": f"chatcmpl-{completion_id}", |
| | "object": "chat.completion", |
| | "created": completion_timestamp, |
| | "model": model, |
| | "choices": [ |
| | { |
| | "index": 0, |
| | "message": { |
| | "role": "assistant", |
| | "content": response, |
| | }, |
| | "finish_reason": "stop", |
| | } |
| | ], |
| | "usage": { |
| | "prompt_tokens": None, |
| | "completion_tokens": None, |
| | "total_tokens": None, |
| | }, |
| | } |
| |
|
| | def streaming(): |
| | for chunk in response: |
| | completion_data = { |
| | "id": f"chatcmpl-{completion_id}", |
| | "object": "chat.completion.chunk", |
| | "created": completion_timestamp, |
| | "model": model, |
| | "choices": [ |
| | { |
| | "index": 0, |
| | "delta": { |
| | "content": chunk, |
| | }, |
| | "finish_reason": None, |
| | } |
| | ], |
| | } |
| |
|
| | content = json.dumps(completion_data, separators=(",", ":")) |
| | yield f"data: {content}\n\n" |
| | time.sleep(0.03) |
| |
|
| | end_completion_data: dict[str, Any] = { |
| | "id": f"chatcmpl-{completion_id}", |
| | "object": "chat.completion.chunk", |
| | "created": completion_timestamp, |
| | "model": model, |
| | "choices": [ |
| | { |
| | "index": 0, |
| | "delta": {}, |
| | "finish_reason": "stop", |
| | } |
| | ], |
| | } |
| | content = json.dumps(end_completion_data, separators=(",", ":")) |
| | yield f"data: {content}\n\n" |
| |
|
| | return StreamingResponse(streaming(), media_type="text/event-stream") |
| |
|
| |
|
| | async def gen_resp(max_tokens, messages, model, stream, temperature, top_p): |
| | MAX_ATTEMPTS = int(os.getenv("MAX_ATTEMPTS", 10)) |
| | attempts = 0 |
| | while True: |
| | try: |
| | response = ChatCompletion.create( |
| | model=model, |
| | stream=stream, |
| | messages=messages, |
| | temperature=temperature, |
| | top_p=top_p, |
| | max_tokens=max_tokens, |
| | system_prompt="", |
| | provider=g4f.Provider.Bing, |
| | ) |
| | return response |
| | except Exception as e: |
| | logger.error(f"gen_resp: Exception: {e}") |
| | attempts += 1 |
| | if attempts >= MAX_ATTEMPTS: |
| | return "Lo siento, no he podido generar una respuesta de chat. Por favor, revisa tu conexión a Internet y la configuración de la API y vuelve a intentarlo." |
| |
|
| |
|