Spaces:
Paused
Paused
| from fastapi import FastAPI, HTTPException, Query | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| import uuid | |
| import os | |
| from dotenv import load_dotenv | |
| import config | |
| from g4f.client import Client | |
| import time | |
| import json | |
| from typing import Optional, Dict, List | |
| load_dotenv() | |
| app = FastAPI( | |
| title=config.APP_NAME, | |
| description=config.APP_DESCRIPTION, | |
| version="1.0.0" | |
| ) | |
| # In-memory storage for chat sessions | |
| chat_sessions: Dict[str, List[Dict]] = {} | |
| session_timestamps: Dict[str, List[float]] = {} | |
| session_models: Dict[str, str] = {} | |
| class ChatMessage(BaseModel): | |
| message: str | |
| model: Optional[str] = None | |
| class ChatResponse(BaseModel): | |
| response: str | |
| session_id: str | |
| model: str | |
| class SessionInfo(BaseModel): | |
| session_id: str | |
| message_count: int | |
| model: str | |
| history: List[Dict] | |
| def get_or_create_session(session_name: str) -> str: | |
| """Get existing session or create new one""" | |
| if session_name not in chat_sessions: | |
| chat_sessions[session_name] = [ | |
| {"role": "assistant", "content": "Hello! I am ready to help you."} | |
| ] | |
| session_timestamps[session_name] = [] | |
| session_models[session_name] = config.DEFAULT_MODEL | |
| return session_name | |
| def check_rate_limit(session_name: str) -> bool: | |
| """Check if session has exceeded rate limits""" | |
| current_time = time.time() | |
| if session_name not in session_timestamps: | |
| session_timestamps[session_name] = [] | |
| session_timestamps[session_name].append(current_time) | |
| one_minute_ago = current_time - 60 | |
| one_hour_ago = current_time - 3600 | |
| one_day_ago = current_time - 86400 | |
| recent_requests_minute = [ts for ts in session_timestamps[session_name] if ts > one_minute_ago] | |
| recent_requests_hour = [ts for ts in session_timestamps[session_name] if ts > one_hour_ago] | |
| recent_requests_day = [ts for ts in session_timestamps[session_name] if ts > one_day_ago] | |
| # Clean up old timestamps | |
| session_timestamps[session_name] = recent_requests_day | |
| return (len(recent_requests_minute) <= config.RPM and | |
| len(recent_requests_hour) <= config.RPH and | |
| len(recent_requests_day) <= config.RPD) | |
| async def chat( | |
| message: str = Query(..., description="The message to send"), | |
| session: str = Query(default="default", description="Session name"), | |
| model: Optional[str] = Query(None, description="AI model to use") | |
| ): | |
| """Send a message to the chat API""" | |
| # Get or create session | |
| session_id = get_or_create_session(session) | |
| # Check rate limit | |
| if not check_rate_limit(session_id): | |
| raise HTTPException( | |
| status_code=429, | |
| detail="Rate limit exceeded. Please wait before sending another message." | |
| ) | |
| # Update model if provided | |
| if model: | |
| session_models[session_id] = model | |
| # Add user message to session | |
| chat_sessions[session_id].append({"role": "user", "content": message}) | |
| try: | |
| # Prepare client | |
| client_params = {} | |
| if config.ALGORITHM == 'openai': | |
| openai_base_url = os.getenv("OPENAI_BASE_URL") | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if openai_base_url: | |
| client_params['base_url'] = openai_base_url | |
| if openai_api_key: | |
| client_params['api_key'] = openai_api_key | |
| client = Client(**client_params) | |
| # Get API model name | |
| current_model = session_models[session_id] | |
| api_model_name = config.MODELS.get(current_model, config.DEFAULT_MODEL) | |
| # Prepare messages for API | |
| messages_for_api = [ | |
| {"role": msg["role"], "content": msg["content"]} | |
| for msg in chat_sessions[session_id] | |
| ] | |
| # Generate response | |
| response = client.chat.completions.create( | |
| model=api_model_name, | |
| messages=messages_for_api, | |
| stream=False, | |
| web_search=False | |
| ) | |
| assistant_response = response.choices[0].message.content | |
| # Add assistant response to session | |
| chat_sessions[session_id].append({ | |
| "role": "assistant", | |
| "content": assistant_response | |
| }) | |
| return ChatResponse( | |
| response=assistant_response, | |
| session_id=session_id, | |
| model=current_model | |
| ) | |
| except Exception as e: | |
| error_message = f"Error generating response: {str(e)}" | |
| chat_sessions[session_id].append({ | |
| "role": "assistant", | |
| "content": error_message | |
| }) | |
| raise HTTPException(status_code=500, detail=error_message) | |
| async def chat_stream( | |
| message: str = Query(..., description="The message to send"), | |
| session: str = Query(default="default", description="Session name"), | |
| model: Optional[str] = Query(None, description="AI model to use") | |
| ): | |
| """Send a message to the chat API with streaming response""" | |
| # Get or create session | |
| session_id = get_or_create_session(session) | |
| # Check rate limit | |
| if not check_rate_limit(session_id): | |
| raise HTTPException( | |
| status_code=429, | |
| detail="Rate limit exceeded. Please wait before sending another message." | |
| ) | |
| # Update model if provided | |
| if model: | |
| if model in config.MODELS: | |
| session_models[session_id] = model | |
| else: | |
| available_models = list(config.MODELS.keys()) | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid model. Available models: {available_models}" | |
| ) | |
| # Add user message to session | |
| chat_sessions[session_id].append({"role": "user", "content": message}) | |
| async def generate_stream(): | |
| try: | |
| # Prepare client | |
| client_params = {} | |
| if config.ALGORITHM == 'openai': | |
| openai_base_url = os.getenv("OPENAI_BASE_URL") | |
| openai_api_key = os.getenv("OPENAI_API_KEY") | |
| if openai_base_url: | |
| client_params['base_url'] = openai_base_url | |
| if openai_api_key: | |
| client_params['api_key'] = openai_api_key | |
| client = Client(**client_params) | |
| # Get API model name | |
| current_model = session_models[session_id] | |
| api_model_name = config.MODELS.get(current_model, config.DEFAULT_MODEL) | |
| # Prepare messages for API | |
| messages_for_api = [ | |
| {"role": msg["role"], "content": msg["content"]} | |
| for msg in chat_sessions[session_id] | |
| ] | |
| # Generate streaming response | |
| stream = client.chat.completions.create( | |
| model=api_model_name, | |
| messages=messages_for_api, | |
| stream=True, | |
| web_search=False | |
| ) | |
| full_response = "" | |
| for chunk in stream: | |
| if chunk.choices[0].delta.content is not None: | |
| content = chunk.choices[0].delta.content | |
| full_response += content | |
| yield f"data: {json.dumps({'content': content, 'session_id': session_id})}\n\n" | |
| # Add complete response to session | |
| chat_sessions[session_id].append({ | |
| "role": "assistant", | |
| "content": full_response | |
| }) | |
| yield f"data: {json.dumps({'done': True, 'session_id': session_id})}\n\n" | |
| except Exception as e: | |
| error_message = f"Error generating response: {str(e)}" | |
| chat_sessions[session_id].append({ | |
| "role": "assistant", | |
| "content": error_message | |
| }) | |
| yield f"data: {json.dumps({'error': error_message, 'session_id': session_id})}\n\n" | |
| return StreamingResponse( | |
| generate_stream(), | |
| media_type="text/plain", | |
| headers={"Cache-Control": "no-cache", "Connection": "keep-alive"} | |
| ) | |
| async def get_session_info(session_name: str): | |
| """Get information about a specific session""" | |
| if session_name not in chat_sessions: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| return SessionInfo( | |
| session_id=session_name, | |
| message_count=len(chat_sessions[session_name]), | |
| model=session_models.get(session_name, config.DEFAULT_MODEL), | |
| history=chat_sessions[session_name] | |
| ) | |
| async def list_sessions(): | |
| """List all active sessions""" | |
| sessions_info = [] | |
| for session_name in chat_sessions: | |
| sessions_info.append({ | |
| "session_id": session_name, | |
| "message_count": len(chat_sessions[session_name]), | |
| "model": session_models.get(session_name, config.DEFAULT_MODEL) | |
| }) | |
| return {"sessions": sessions_info} | |
| async def delete_session(session_name: str): | |
| """Delete a specific session""" | |
| if session_name not in chat_sessions: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| del chat_sessions[session_name] | |
| if session_name in session_timestamps: | |
| del session_timestamps[session_name] | |
| if session_name in session_models: | |
| del session_models[session_name] | |
| return {"message": f"Session {session_name} deleted successfully"} | |
| async def create_new_session(session_name: str): | |
| """Create a new session or reset existing one""" | |
| chat_sessions[session_name] = [ | |
| {"role": "assistant", "content": "Hello! I am ready to help you."} | |
| ] | |
| session_timestamps[session_name] = [] | |
| session_models[session_name] = config.DEFAULT_MODEL | |
| return {"message": f"Session {session_name} created/reset successfully"} | |
| async def get_available_models(): | |
| """Get list of available models""" | |
| return { | |
| "models": list(config.MODELS.keys()), | |
| "default_model": config.DEFAULT_MODEL | |
| } | |
| async def root(): | |
| """API root endpoint""" | |
| return { | |
| "message": f"Welcome to {config.APP_NAME}", | |
| "description": config.APP_DESCRIPTION, | |
| "endpoints": { | |
| "POST /chat": "Send a message (with ?session=name parameter)", | |
| "POST /chat/stream": "Send a message with streaming response", | |
| "GET /sessions": "List all sessions", | |
| "GET /sessions/{name}": "Get session info", | |
| "DELETE /sessions/{name}": "Delete a session", | |
| "POST /sessions/{name}/new": "Create/reset a session", | |
| "GET /models": "Get available models" | |
| } | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |