File size: 18,211 Bytes
efc8e53
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
import os
import time
import uuid
from typing import List, Dict, Optional, Union, Generator, Any

from fastapi import FastAPI, HTTPException, Request, status
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
import uvicorn

from hugchat import hugchat
from hugchat.login import Login
# from hugchat.types.message import MessageNode # For type hinting if needed

# --- Configuration ---
HF_EMAIL = "xawet73334@magpit.com"
HF_PASSWD = "Xawet73334@magpit.com"
COOKIE_PATH_DIR = "./hugchat_cookies/"

if not HF_EMAIL or not HF_PASSWD:
    print("Warning: HUGGINGFACE_EMAIL or HUGGINGFACE_PASSWD environment variables not set.")
    # Allow running without credentials if cookies already exist, for example.
    # The startup logic will handle login/cookie loading.

# --- Global HugChatBot instance and model info ---
chatbot: Optional[hugchat.ChatBot] = None
available_models_list: List[str] = []
available_models_map: Dict[str, int] = {} # Maps model name to index
current_llm_model_on_chatbot: Optional[str] = None
server_start_time = int(time.time()) # For 'created' timestamps

# --- Pydantic Models for OpenAI Compatibility ---

# Model for /v1/models
class ModelCard(BaseModel):
    id: str
    object: str = "model"
    created: int = Field(default_factory=lambda: server_start_time)
    owned_by: str = "huggingface" # Or parse from model ID if possible
    # Add other common fields if desired, often with default/null values
    # permission: Optional[List[Any]] = None
    # root: Optional[str] = None
    # parent: Optional[str] = None

class ModelList(BaseModel):
    object: str = "list"
    data: List[ModelCard]

# Models for /v1/chat/completions (from previous example)
class ChatMessage(BaseModel):
    role: str
    content: str
    # name: Optional[str] = None # For function calling, not directly supported by hugchat

class ChatCompletionRequest(BaseModel):
    model: str
    messages: List[ChatMessage]
    stream: Optional[bool] = False
    temperature: Optional[float] = Field(None, ge=0.0, le=2.0) # hugchat might not support all
    top_p: Optional[float] = Field(None, ge=0.0, le=1.0)      # these params directly
    n: Optional[int] = Field(None, ge=1)                      # often n=1 for chat
    max_tokens: Optional[int] = Field(None, ge=1)
    # presence_penalty: Optional[float] = None
    # frequency_penalty: Optional[float] = None
    # logit_bias: Optional[Dict[str, float]] = None
    # user: Optional[str] = None # For tracking, not used by hugchat
    # stop: Optional[Union[str, List[str]]] = None # hugchat handles its own stop

class DeltaMessage(BaseModel):
    role: Optional[str] = None
    content: Optional[str] = None

class ChatCompletionChunkChoice(BaseModel):
    index: int = 0
    delta: DeltaMessage
    finish_reason: Optional[str] = None # "stop", "length", "content_filter", "tool_calls"

class ChatCompletionChunk(BaseModel):
    id: str
    object: str = "chat.completion.chunk"
    created: int = Field(default_factory=lambda: int(time.time()))
    model: str
    # system_fingerprint: Optional[str] = None # OpenAI specific
    choices: List[ChatCompletionChunkChoice]

class ResponseMessage(BaseModel):
    role: str
    content: str
    # tool_calls: Optional[List[Any]] = None # For function/tool calling

class ChatCompletionChoice(BaseModel):
    index: int = 0
    message: ResponseMessage
    finish_reason: str = "stop"
    # logprobs: Optional[Any] = None

class UsageInfo(BaseModel): # Mocked, as hugchat doesn't provide token counts
    prompt_tokens: int = 0
    completion_tokens: int = 0
    total_tokens: int = 0

class ChatCompletionResponse(BaseModel):
    id: str
    object: str = "chat.completion"
    created: int = Field(default_factory=lambda: int(time.time()))
    model: str
    # system_fingerprint: Optional[str] = None
    choices: List[ChatCompletionChoice]
    usage: Optional[UsageInfo] = Field(default_factory=lambda: UsageInfo())


# --- FastAPI App ---
app = FastAPI(
    title="HugChat OpenAI-Compatible API",
    description="An OpenAI-compatible API wrapper for HuggingChat.",
    version="0.1.1" # Incremented version
)

@app.on_event("startup")
async def startup_event():
    global chatbot, available_models_list, available_models_map, current_llm_model_on_chatbot
    print("Initializing HugChatBot...")
    try:
        if not os.path.exists(COOKIE_PATH_DIR):
            os.makedirs(COOKIE_PATH_DIR)

        if not HF_EMAIL or not HF_PASSWD:
            print("Attempting to load cookies directly as credentials are not fully set.")
            # Try to create a Login object just to access cookie loading methods
            # This part might need adjustment based on how Login handles missing credentials
            temp_sign = Login(HF_EMAIL or "dummy_email", None) # Pass dummy email if HF_EMAIL is None
            cookies = temp_sign.loadCookiesFromDir(cookie_dir_path=COOKIE_PATH_DIR)
            if not cookies:
                raise ValueError("Credentials not set and no saved cookies found. Please set HUGGINGFACE_EMAIL and HUGGINGFACE_PASSWD or ensure cookies are present.")
            print("Loaded cookies from disk.")
        else:
            sign = Login(HF_EMAIL, HF_PASSWD)
            cookies = sign.login(cookie_dir_path=COOKIE_PATH_DIR, save_cookies=True)
        
        chatbot = hugchat.ChatBot(cookies=cookies.get_dict())
        print("HugChatBot initialized successfully.")

        models_raw = chatbot.get_available_llm_models()
        if not models_raw:
            print("Warning: No available LLM models found from HugChat.")
            return

        available_models_list = [str(model_name) for model_name in models_raw]
        available_models_map = {name: i for i, name in enumerate(available_models_list)}
        print(f"Available models: {available_models_list}")

        if available_models_list:
            default_model_index = 0
            chatbot.switch_llm(default_model_index)
            current_llm_model_on_chatbot = available_models_list[default_model_index]
            chatbot.new_conversation(switch_to=True) # Ensure new convo uses this model
            print(f"Default model set to: {current_llm_model_on_chatbot}")
        else:
            print("No models available to set a default.")

    except Exception as e:
        print(f"Error during HugChatBot initialization: {e}")
        chatbot = None

# --- Helper for Unsupported Endpoints ---
def not_supported_response(feature: str):
    return JSONResponse(
        status_code=status.HTTP_501_NOT_IMPLEMENTED,
        content={"error": {
            "message": f"The '{feature}' feature is not supported by this HugChat-backed API.",
            "type": "not_supported_error",
            "param": None,
            "code": None
        }}
    )

# --- API Endpoints ---

@app.get("/v1/models", response_model=ModelList)
async def list_models():
    if chatbot is None or not available_models_list:
        raise HTTPException(status_code=503, detail="Models list not available. HugChatBot might not be initialized or no models found.")
    
    model_cards = []
    for model_id_str in available_models_list:
        owned_by = "huggingface" # Default
        if "/" in model_id_str:
            # Try to extract owner from "owner/model_name" format
            possible_owner = model_id_str.split('/')[0]
            if possible_owner: # Basic check
                owned_by = possible_owner

        model_cards.append(ModelCard(id=model_id_str, owned_by=owned_by, created=server_start_time))
    
    return ModelList(data=model_cards)

@app.get("/v1/models/{model_id}", response_model=ModelCard)
async def retrieve_model(model_id: str):
    if chatbot is None or not available_models_list:
        raise HTTPException(status_code=503, detail="Model information not available. HugChatBot might not be initialized.")
    
    if model_id in available_models_list:
        owned_by = "huggingface"
        if "/" in model_id:
            possible_owner = model_id.split('/')[0]
            if possible_owner:
                owned_by = possible_owner
        return ModelCard(id=model_id, owned_by=owned_by, created=server_start_time)
    else:
        raise HTTPException(status_code=404, detail=f"Model '{model_id}' not found.")


@app.post("/v1/chat/completions") # response_model removed for StreamingResponse flexibility
async def chat_completions_endpoint(request: ChatCompletionRequest):
    global chatbot, current_llm_model_on_chatbot

    if chatbot is None:
        raise HTTPException(status_code=503, detail="HugChatBot is not available. Check server logs.")
    if not available_models_map:
        raise HTTPException(status_code=503, detail="No LLM models loaded from HugChat.")

    requested_model = request.model
    if requested_model not in available_models_map:
        raise HTTPException(
            status_code=400,
            detail=f"Model '{requested_model}' not found. Available models: {', '.join(available_models_list)}"
        )

    if current_llm_model_on_chatbot != requested_model:
        print(f"Switching model from '{current_llm_model_on_chatbot}' to '{requested_model}'...")
        try:
            model_index = available_models_map[requested_model]
            chatbot.switch_llm(model_index)
            current_llm_model_on_chatbot = requested_model
            print(f"Model switched. Creating new conversation for model: {current_llm_model_on_chatbot}")
        except Exception as e:
            raise HTTPException(status_code=500, detail=f"Failed to switch model: {e}")

    try:
        chatbot.new_conversation(switch_to=True) # Ensure new conversation for this request
        # convo_info = chatbot.get_conversation_info()
        # print(f"New conversation started. Active model: {convo_info.model}")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to create new conversation: {e}")

    last_user_message_content = ""
    # OpenAI typically expects a sequence. We'll primarily use the last user message for hugchat.
    # For a more complex setup, one could try to feed prior messages if hugchat supported it explicitly
    # in a single `chat` call beyond its internal memory.
    for msg in reversed(request.messages):
        if msg.role == "user":
            last_user_message_content = msg.content
            break
    
    if not last_user_message_content:
        # Check for system prompt if no user prompt and it's the only message.
        # Though typically OpenAI clients send at least one user message.
        if len(request.messages) == 1 and request.messages[0].role == "system":
             last_user_message_content = request.messages[0].content # Use system as prompt
        else:
            raise HTTPException(status_code=400, detail="No user message found or suitable prompt in the request.")

    prompt = last_user_message_content
    chat_id = f"chatcmpl-{uuid.uuid4().hex}"
    request_time = int(time.time())

    # Handle unsupported parameters (informatively, but hugchat will ignore them)
    if request.temperature is not None and request.temperature != 1.0: # Default OpenAI temp
        print(f"Info: 'temperature' parameter ({request.temperature}) received but may not be supported by HugChat.")
    if request.max_tokens is not None:
        print(f"Info: 'max_tokens' parameter ({request.max_tokens}) received but may not be supported by HugChat.")
    # ... (similar for other params like top_p, n, etc.)

    if request.stream:
        async def stream_generator():
            try:
                first_chunk_data = ChatCompletionChunk(
                    id=chat_id,
                    created=request_time,
                    model=current_llm_model_on_chatbot,
                    choices=[ChatCompletionChunkChoice(delta=DeltaMessage(role="assistant"))]
                )
                yield f"data: {first_chunk_data.model_dump_json(exclude_none=True)}\n\n"

                full_response_text = ""
                # The hugchat stream yields text chunks
                for chunk_text in chatbot.chat(prompt, stream=True):
                    if isinstance(chunk_text, str):
                        full_response_text += chunk_text
                        chunk_data = ChatCompletionChunk(
                            id=chat_id,
                            created=request_time,
                            model=current_llm_model_on_chatbot,
                            choices=[ChatCompletionChunkChoice(delta=DeltaMessage(content=chunk_text))]
                        )
                        yield f"data: {chunk_data.model_dump_json(exclude_none=True)}\n\n"
                    # Add handling for other types if hugchat stream changes
                
                # print(f"Stream complete. Full text for chat {chat_id}: {full_response_text[:100]}...")

                final_chunk_data = ChatCompletionChunk(
                    id=chat_id,
                    created=request_time,
                    model=current_llm_model_on_chatbot,
                    choices=[ChatCompletionChunkChoice(delta=DeltaMessage(), finish_reason="stop")]
                )
                yield f"data: {final_chunk_data.model_dump_json(exclude_none=True)}\n\n"
                yield "data: [DONE]\n\n"
            except Exception as e:
                print(f"Error during streaming for chat {chat_id}: {e}")
                # Attempt to send an error in the stream if possible (before [DONE])
                # This is non-standard for OpenAI, but useful for debugging
                error_content = f"Error during stream: {str(e)}"
                error_delta = DeltaMessage(content=error_content)
                error_choice = ChatCompletionChunkChoice(delta=error_delta, finish_reason="error") # Custom
                error_chunk = ChatCompletionChunk(
                    id=chat_id, created=request_time, model=current_llm_model_on_chatbot, choices=[error_choice]
                )
                try:
                    yield f"data: {error_chunk.model_dump_json(exclude_none=True)}\n\n"
                except Exception: # If stream already broken
                    pass
                yield "data: [DONE]\n\n" # Always end with [DONE]

        return StreamingResponse(stream_generator(), media_type="text/event-stream")
    else: # Non-streaming
        try:
            # Assuming chatbot.chat() with stream=False returns a result object
            # that has wait_until_done() or .text attribute.
            message_result = chatbot.chat(prompt) # hugchat's non-stream returns a Message object

            response_text: str
            if hasattr(message_result, 'wait_until_done'): # If it's a generator-like object
                response_text = message_result.wait_until_done()
            elif hasattr(message_result, 'text'): # If it's a MessageNode or similar
                response_text = message_result.text
            elif isinstance(message_result, str): # Direct string response
                response_text = message_result
            else:
                print(f"Warning: Unexpected response type from chatbot.chat() (non-stream): {type(message_result)}")
                # Attempt to convert to string as a fallback
                try:
                    response_text = str(message_result)
                except:
                    raise ValueError("Could not extract text from HugChat response.")

            # print(f"Non-streamed response for chat {chat_id} / model {current_llm_model_on_chatbot}: {response_text[:100]}...")
            return ChatCompletionResponse(
                id=chat_id,
                created=request_time,
                model=current_llm_model_on_chatbot,
                choices=[
                    ChatCompletionChoice(
                        message=ResponseMessage(role="assistant", content=response_text)
                    )
                ],
                usage=UsageInfo() # Mocked usage
            )
        except Exception as e:
            print(f"Error processing non-streaming chat {chat_id}: {e}")
            raise HTTPException(status_code=500, detail=f"Error processing non-streaming chat: {e}")


# --- Placeholder/Not Implemented Endpoints ---
@app.post("/v1/completions")
async def completions_legacy():
    return not_supported_response("Legacy completions (/v1/completions)")

@app.post("/v1/embeddings")
async def create_embeddings():
    return not_supported_response("Embeddings (/v1/embeddings)")

@app.post("/v1/audio/transcriptions")
async def audio_transcriptions():
    return not_supported_response("Audio transcriptions")

@app.post("/v1/audio/translations")
async def audio_translations():
    return not_supported_response("Audio translations")

@app.post("/v1/images/generations")
async def image_generations():
    # Note: HuggingChat *can* have image generation assistants.
    # A more advanced version could try to map this if a specific assistant ID is known
    # and the request format can be adapted. For now, marking as generally not supported.
    return not_supported_response("Image generations (generic API, specific assistants might work via chat)")

@app.get("/v1/files")
async def list_files_openai(): # Renamed to avoid conflict if you had other /files
    return not_supported_response("File listing/management")

@app.post("/v1/files")
async def upload_file_openai():
    return not_supported_response("File upload")

# ... (add more placeholders for fine-tuning, moderations etc. as needed)

if __name__ == "__main__":
    if not os.path.exists(COOKIE_PATH_DIR):
        try:
            os.makedirs(COOKIE_PATH_DIR)
            print(f"Created directory: {COOKIE_PATH_DIR}")
        except OSError as e:
            print(f"Error creating directory {COOKIE_PATH_DIR}: {e}")
            # Decide if to exit or continue if dir creation fails
            # exit(1)

    print("Starting Uvicorn server...")
    print(f"Credentials: EMAIL={'SET' if HF_EMAIL else 'NOT SET'}, PASSWORD={'SET' if HF_PASSWD else 'NOT SET'}")
    print(f"Cookie Path: {os.path.abspath(COOKIE_PATH_DIR)}")
    uvicorn.run(app, host="0.0.0.0", port=7860)