|
|
import os |
|
|
import time |
|
|
import uuid |
|
|
from typing import List, Dict, Optional, Union, Generator, Any |
|
|
|
|
|
from fastapi import FastAPI, HTTPException, Request, status |
|
|
from fastapi.responses import StreamingResponse, JSONResponse |
|
|
from pydantic import BaseModel, Field |
|
|
import uvicorn |
|
|
|
|
|
from hugchat import hugchat |
|
|
from hugchat.login import Login |
|
|
|
|
|
|
|
|
|
|
|
HF_EMAIL = "xawet73334@magpit.com" |
|
|
HF_PASSWD = "Xawet73334@magpit.com" |
|
|
COOKIE_PATH_DIR = "./hugchat_cookies/" |
|
|
|
|
|
if not HF_EMAIL or not HF_PASSWD: |
|
|
print("Warning: HUGGINGFACE_EMAIL or HUGGINGFACE_PASSWD environment variables not set.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
chatbot: Optional[hugchat.ChatBot] = None |
|
|
available_models_list: List[str] = [] |
|
|
available_models_map: Dict[str, int] = {} |
|
|
current_llm_model_on_chatbot: Optional[str] = None |
|
|
server_start_time = int(time.time()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ModelCard(BaseModel): |
|
|
id: str |
|
|
object: str = "model" |
|
|
created: int = Field(default_factory=lambda: server_start_time) |
|
|
owned_by: str = "huggingface" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ModelList(BaseModel): |
|
|
object: str = "list" |
|
|
data: List[ModelCard] |
|
|
|
|
|
|
|
|
class ChatMessage(BaseModel): |
|
|
role: str |
|
|
content: str |
|
|
|
|
|
|
|
|
class ChatCompletionRequest(BaseModel): |
|
|
model: str |
|
|
messages: List[ChatMessage] |
|
|
stream: Optional[bool] = False |
|
|
temperature: Optional[float] = Field(None, ge=0.0, le=2.0) |
|
|
top_p: Optional[float] = Field(None, ge=0.0, le=1.0) |
|
|
n: Optional[int] = Field(None, ge=1) |
|
|
max_tokens: Optional[int] = Field(None, ge=1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DeltaMessage(BaseModel): |
|
|
role: Optional[str] = None |
|
|
content: Optional[str] = None |
|
|
|
|
|
class ChatCompletionChunkChoice(BaseModel): |
|
|
index: int = 0 |
|
|
delta: DeltaMessage |
|
|
finish_reason: Optional[str] = None |
|
|
|
|
|
class ChatCompletionChunk(BaseModel): |
|
|
id: str |
|
|
object: str = "chat.completion.chunk" |
|
|
created: int = Field(default_factory=lambda: int(time.time())) |
|
|
model: str |
|
|
|
|
|
choices: List[ChatCompletionChunkChoice] |
|
|
|
|
|
class ResponseMessage(BaseModel): |
|
|
role: str |
|
|
content: str |
|
|
|
|
|
|
|
|
class ChatCompletionChoice(BaseModel): |
|
|
index: int = 0 |
|
|
message: ResponseMessage |
|
|
finish_reason: str = "stop" |
|
|
|
|
|
|
|
|
class UsageInfo(BaseModel): |
|
|
prompt_tokens: int = 0 |
|
|
completion_tokens: int = 0 |
|
|
total_tokens: int = 0 |
|
|
|
|
|
class ChatCompletionResponse(BaseModel): |
|
|
id: str |
|
|
object: str = "chat.completion" |
|
|
created: int = Field(default_factory=lambda: int(time.time())) |
|
|
model: str |
|
|
|
|
|
choices: List[ChatCompletionChoice] |
|
|
usage: Optional[UsageInfo] = Field(default_factory=lambda: UsageInfo()) |
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="HugChat OpenAI-Compatible API", |
|
|
description="An OpenAI-compatible API wrapper for HuggingChat.", |
|
|
version="0.1.1" |
|
|
) |
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
global chatbot, available_models_list, available_models_map, current_llm_model_on_chatbot |
|
|
print("Initializing HugChatBot...") |
|
|
try: |
|
|
if not os.path.exists(COOKIE_PATH_DIR): |
|
|
os.makedirs(COOKIE_PATH_DIR) |
|
|
|
|
|
if not HF_EMAIL or not HF_PASSWD: |
|
|
print("Attempting to load cookies directly as credentials are not fully set.") |
|
|
|
|
|
|
|
|
temp_sign = Login(HF_EMAIL or "dummy_email", None) |
|
|
cookies = temp_sign.loadCookiesFromDir(cookie_dir_path=COOKIE_PATH_DIR) |
|
|
if not cookies: |
|
|
raise ValueError("Credentials not set and no saved cookies found. Please set HUGGINGFACE_EMAIL and HUGGINGFACE_PASSWD or ensure cookies are present.") |
|
|
print("Loaded cookies from disk.") |
|
|
else: |
|
|
sign = Login(HF_EMAIL, HF_PASSWD) |
|
|
cookies = sign.login(cookie_dir_path=COOKIE_PATH_DIR, save_cookies=True) |
|
|
|
|
|
chatbot = hugchat.ChatBot(cookies=cookies.get_dict()) |
|
|
print("HugChatBot initialized successfully.") |
|
|
|
|
|
models_raw = chatbot.get_available_llm_models() |
|
|
if not models_raw: |
|
|
print("Warning: No available LLM models found from HugChat.") |
|
|
return |
|
|
|
|
|
available_models_list = [str(model_name) for model_name in models_raw] |
|
|
available_models_map = {name: i for i, name in enumerate(available_models_list)} |
|
|
print(f"Available models: {available_models_list}") |
|
|
|
|
|
if available_models_list: |
|
|
default_model_index = 0 |
|
|
chatbot.switch_llm(default_model_index) |
|
|
current_llm_model_on_chatbot = available_models_list[default_model_index] |
|
|
chatbot.new_conversation(switch_to=True) |
|
|
print(f"Default model set to: {current_llm_model_on_chatbot}") |
|
|
else: |
|
|
print("No models available to set a default.") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error during HugChatBot initialization: {e}") |
|
|
chatbot = None |
|
|
|
|
|
|
|
|
def not_supported_response(feature: str): |
|
|
return JSONResponse( |
|
|
status_code=status.HTTP_501_NOT_IMPLEMENTED, |
|
|
content={"error": { |
|
|
"message": f"The '{feature}' feature is not supported by this HugChat-backed API.", |
|
|
"type": "not_supported_error", |
|
|
"param": None, |
|
|
"code": None |
|
|
}} |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/v1/models", response_model=ModelList) |
|
|
async def list_models(): |
|
|
if chatbot is None or not available_models_list: |
|
|
raise HTTPException(status_code=503, detail="Models list not available. HugChatBot might not be initialized or no models found.") |
|
|
|
|
|
model_cards = [] |
|
|
for model_id_str in available_models_list: |
|
|
owned_by = "huggingface" |
|
|
if "/" in model_id_str: |
|
|
|
|
|
possible_owner = model_id_str.split('/')[0] |
|
|
if possible_owner: |
|
|
owned_by = possible_owner |
|
|
|
|
|
model_cards.append(ModelCard(id=model_id_str, owned_by=owned_by, created=server_start_time)) |
|
|
|
|
|
return ModelList(data=model_cards) |
|
|
|
|
|
@app.get("/v1/models/{model_id}", response_model=ModelCard) |
|
|
async def retrieve_model(model_id: str): |
|
|
if chatbot is None or not available_models_list: |
|
|
raise HTTPException(status_code=503, detail="Model information not available. HugChatBot might not be initialized.") |
|
|
|
|
|
if model_id in available_models_list: |
|
|
owned_by = "huggingface" |
|
|
if "/" in model_id: |
|
|
possible_owner = model_id.split('/')[0] |
|
|
if possible_owner: |
|
|
owned_by = possible_owner |
|
|
return ModelCard(id=model_id, owned_by=owned_by, created=server_start_time) |
|
|
else: |
|
|
raise HTTPException(status_code=404, detail=f"Model '{model_id}' not found.") |
|
|
|
|
|
|
|
|
@app.post("/v1/chat/completions") |
|
|
async def chat_completions_endpoint(request: ChatCompletionRequest): |
|
|
global chatbot, current_llm_model_on_chatbot |
|
|
|
|
|
if chatbot is None: |
|
|
raise HTTPException(status_code=503, detail="HugChatBot is not available. Check server logs.") |
|
|
if not available_models_map: |
|
|
raise HTTPException(status_code=503, detail="No LLM models loaded from HugChat.") |
|
|
|
|
|
requested_model = request.model |
|
|
if requested_model not in available_models_map: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Model '{requested_model}' not found. Available models: {', '.join(available_models_list)}" |
|
|
) |
|
|
|
|
|
if current_llm_model_on_chatbot != requested_model: |
|
|
print(f"Switching model from '{current_llm_model_on_chatbot}' to '{requested_model}'...") |
|
|
try: |
|
|
model_index = available_models_map[requested_model] |
|
|
chatbot.switch_llm(model_index) |
|
|
current_llm_model_on_chatbot = requested_model |
|
|
print(f"Model switched. Creating new conversation for model: {current_llm_model_on_chatbot}") |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Failed to switch model: {e}") |
|
|
|
|
|
try: |
|
|
chatbot.new_conversation(switch_to=True) |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"Failed to create new conversation: {e}") |
|
|
|
|
|
last_user_message_content = "" |
|
|
|
|
|
|
|
|
|
|
|
for msg in reversed(request.messages): |
|
|
if msg.role == "user": |
|
|
last_user_message_content = msg.content |
|
|
break |
|
|
|
|
|
if not last_user_message_content: |
|
|
|
|
|
|
|
|
if len(request.messages) == 1 and request.messages[0].role == "system": |
|
|
last_user_message_content = request.messages[0].content |
|
|
else: |
|
|
raise HTTPException(status_code=400, detail="No user message found or suitable prompt in the request.") |
|
|
|
|
|
prompt = last_user_message_content |
|
|
chat_id = f"chatcmpl-{uuid.uuid4().hex}" |
|
|
request_time = int(time.time()) |
|
|
|
|
|
|
|
|
if request.temperature is not None and request.temperature != 1.0: |
|
|
print(f"Info: 'temperature' parameter ({request.temperature}) received but may not be supported by HugChat.") |
|
|
if request.max_tokens is not None: |
|
|
print(f"Info: 'max_tokens' parameter ({request.max_tokens}) received but may not be supported by HugChat.") |
|
|
|
|
|
|
|
|
if request.stream: |
|
|
async def stream_generator(): |
|
|
try: |
|
|
first_chunk_data = ChatCompletionChunk( |
|
|
id=chat_id, |
|
|
created=request_time, |
|
|
model=current_llm_model_on_chatbot, |
|
|
choices=[ChatCompletionChunkChoice(delta=DeltaMessage(role="assistant"))] |
|
|
) |
|
|
yield f"data: {first_chunk_data.model_dump_json(exclude_none=True)}\n\n" |
|
|
|
|
|
full_response_text = "" |
|
|
|
|
|
for chunk_text in chatbot.chat(prompt, stream=True): |
|
|
if isinstance(chunk_text, str): |
|
|
full_response_text += chunk_text |
|
|
chunk_data = ChatCompletionChunk( |
|
|
id=chat_id, |
|
|
created=request_time, |
|
|
model=current_llm_model_on_chatbot, |
|
|
choices=[ChatCompletionChunkChoice(delta=DeltaMessage(content=chunk_text))] |
|
|
) |
|
|
yield f"data: {chunk_data.model_dump_json(exclude_none=True)}\n\n" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final_chunk_data = ChatCompletionChunk( |
|
|
id=chat_id, |
|
|
created=request_time, |
|
|
model=current_llm_model_on_chatbot, |
|
|
choices=[ChatCompletionChunkChoice(delta=DeltaMessage(), finish_reason="stop")] |
|
|
) |
|
|
yield f"data: {final_chunk_data.model_dump_json(exclude_none=True)}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
except Exception as e: |
|
|
print(f"Error during streaming for chat {chat_id}: {e}") |
|
|
|
|
|
|
|
|
error_content = f"Error during stream: {str(e)}" |
|
|
error_delta = DeltaMessage(content=error_content) |
|
|
error_choice = ChatCompletionChunkChoice(delta=error_delta, finish_reason="error") |
|
|
error_chunk = ChatCompletionChunk( |
|
|
id=chat_id, created=request_time, model=current_llm_model_on_chatbot, choices=[error_choice] |
|
|
) |
|
|
try: |
|
|
yield f"data: {error_chunk.model_dump_json(exclude_none=True)}\n\n" |
|
|
except Exception: |
|
|
pass |
|
|
yield "data: [DONE]\n\n" |
|
|
|
|
|
return StreamingResponse(stream_generator(), media_type="text/event-stream") |
|
|
else: |
|
|
try: |
|
|
|
|
|
|
|
|
message_result = chatbot.chat(prompt) |
|
|
|
|
|
response_text: str |
|
|
if hasattr(message_result, 'wait_until_done'): |
|
|
response_text = message_result.wait_until_done() |
|
|
elif hasattr(message_result, 'text'): |
|
|
response_text = message_result.text |
|
|
elif isinstance(message_result, str): |
|
|
response_text = message_result |
|
|
else: |
|
|
print(f"Warning: Unexpected response type from chatbot.chat() (non-stream): {type(message_result)}") |
|
|
|
|
|
try: |
|
|
response_text = str(message_result) |
|
|
except: |
|
|
raise ValueError("Could not extract text from HugChat response.") |
|
|
|
|
|
|
|
|
return ChatCompletionResponse( |
|
|
id=chat_id, |
|
|
created=request_time, |
|
|
model=current_llm_model_on_chatbot, |
|
|
choices=[ |
|
|
ChatCompletionChoice( |
|
|
message=ResponseMessage(role="assistant", content=response_text) |
|
|
) |
|
|
], |
|
|
usage=UsageInfo() |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error processing non-streaming chat {chat_id}: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"Error processing non-streaming chat: {e}") |
|
|
|
|
|
|
|
|
|
|
|
@app.post("/v1/completions") |
|
|
async def completions_legacy(): |
|
|
return not_supported_response("Legacy completions (/v1/completions)") |
|
|
|
|
|
@app.post("/v1/embeddings") |
|
|
async def create_embeddings(): |
|
|
return not_supported_response("Embeddings (/v1/embeddings)") |
|
|
|
|
|
@app.post("/v1/audio/transcriptions") |
|
|
async def audio_transcriptions(): |
|
|
return not_supported_response("Audio transcriptions") |
|
|
|
|
|
@app.post("/v1/audio/translations") |
|
|
async def audio_translations(): |
|
|
return not_supported_response("Audio translations") |
|
|
|
|
|
@app.post("/v1/images/generations") |
|
|
async def image_generations(): |
|
|
|
|
|
|
|
|
|
|
|
return not_supported_response("Image generations (generic API, specific assistants might work via chat)") |
|
|
|
|
|
@app.get("/v1/files") |
|
|
async def list_files_openai(): |
|
|
return not_supported_response("File listing/management") |
|
|
|
|
|
@app.post("/v1/files") |
|
|
async def upload_file_openai(): |
|
|
return not_supported_response("File upload") |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
if not os.path.exists(COOKIE_PATH_DIR): |
|
|
try: |
|
|
os.makedirs(COOKIE_PATH_DIR) |
|
|
print(f"Created directory: {COOKIE_PATH_DIR}") |
|
|
except OSError as e: |
|
|
print(f"Error creating directory {COOKIE_PATH_DIR}: {e}") |
|
|
|
|
|
|
|
|
|
|
|
print("Starting Uvicorn server...") |
|
|
print(f"Credentials: EMAIL={'SET' if HF_EMAIL else 'NOT SET'}, PASSWORD={'SET' if HF_PASSWD else 'NOT SET'}") |
|
|
print(f"Cookie Path: {os.path.abspath(COOKIE_PATH_DIR)}") |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |