gg / main.py
tachibanaa710's picture
Update main.py
3b1d7e0 verified
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uuid
import os
from dotenv import load_dotenv
import config
from g4f.client import Client
import time
import json
from typing import Optional, Dict, List
load_dotenv()
app = FastAPI(
title=config.APP_NAME,
description=config.APP_DESCRIPTION,
version="1.0.0"
)
# In-memory storage for chat sessions
chat_sessions: Dict[str, List[Dict]] = {}
session_timestamps: Dict[str, List[float]] = {}
session_models: Dict[str, str] = {}
class ChatMessage(BaseModel):
message: str
model: Optional[str] = None
class ChatResponse(BaseModel):
response: str
session_id: str
model: str
class SessionInfo(BaseModel):
session_id: str
message_count: int
model: str
history: List[Dict]
def get_or_create_session(session_name: str) -> str:
"""Get existing session or create new one"""
if session_name not in chat_sessions:
chat_sessions[session_name] = [
{"role": "assistant", "content": "Hello! I am ready to help you."}
]
session_timestamps[session_name] = []
session_models[session_name] = config.DEFAULT_MODEL
return session_name
def check_rate_limit(session_name: str) -> bool:
"""Check if session has exceeded rate limits"""
current_time = time.time()
if session_name not in session_timestamps:
session_timestamps[session_name] = []
session_timestamps[session_name].append(current_time)
one_minute_ago = current_time - 60
one_hour_ago = current_time - 3600
one_day_ago = current_time - 86400
recent_requests_minute = [ts for ts in session_timestamps[session_name] if ts > one_minute_ago]
recent_requests_hour = [ts for ts in session_timestamps[session_name] if ts > one_hour_ago]
recent_requests_day = [ts for ts in session_timestamps[session_name] if ts > one_day_ago]
# Clean up old timestamps
session_timestamps[session_name] = recent_requests_day
return (len(recent_requests_minute) <= config.RPM and
len(recent_requests_hour) <= config.RPH and
len(recent_requests_day) <= config.RPD)
@app.get("/chat", response_model=ChatResponse)
async def chat(
message: str = Query(..., description="The message to send"),
session: str = Query(default="default", description="Session name"),
model: Optional[str] = Query(None, description="AI model to use")
):
"""Send a message to the chat API"""
# Get or create session
session_id = get_or_create_session(session)
# Check rate limit
if not check_rate_limit(session_id):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded. Please wait before sending another message."
)
# Update model if provided
if model:
session_models[session_id] = model
# Add user message to session
chat_sessions[session_id].append({"role": "user", "content": message})
try:
# Prepare client
client_params = {}
if config.ALGORITHM == 'openai':
openai_base_url = os.getenv("OPENAI_BASE_URL")
openai_api_key = os.getenv("OPENAI_API_KEY")
if openai_base_url:
client_params['base_url'] = openai_base_url
if openai_api_key:
client_params['api_key'] = openai_api_key
client = Client(**client_params)
# Get API model name
current_model = session_models[session_id]
api_model_name = config.MODELS.get(current_model, config.DEFAULT_MODEL)
# Prepare messages for API
messages_for_api = [
{"role": msg["role"], "content": msg["content"]}
for msg in chat_sessions[session_id]
]
# Generate response
response = client.chat.completions.create(
model=api_model_name,
messages=messages_for_api,
stream=False,
web_search=False
)
assistant_response = response.choices[0].message.content
# Add assistant response to session
chat_sessions[session_id].append({
"role": "assistant",
"content": assistant_response
})
return ChatResponse(
response=assistant_response,
session_id=session_id,
model=current_model
)
except Exception as e:
error_message = f"Error generating response: {str(e)}"
chat_sessions[session_id].append({
"role": "assistant",
"content": error_message
})
raise HTTPException(status_code=500, detail=error_message)
@app.get("/chat/stream")
async def chat_stream(
message: str = Query(..., description="The message to send"),
session: str = Query(default="default", description="Session name"),
model: Optional[str] = Query(None, description="AI model to use")
):
"""Send a message to the chat API with streaming response"""
# Get or create session
session_id = get_or_create_session(session)
# Check rate limit
if not check_rate_limit(session_id):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded. Please wait before sending another message."
)
# Update model if provided
if model:
if model in config.MODELS:
session_models[session_id] = model
else:
available_models = list(config.MODELS.keys())
raise HTTPException(
status_code=400,
detail=f"Invalid model. Available models: {available_models}"
)
# Add user message to session
chat_sessions[session_id].append({"role": "user", "content": message})
async def generate_stream():
try:
# Prepare client
client_params = {}
if config.ALGORITHM == 'openai':
openai_base_url = os.getenv("OPENAI_BASE_URL")
openai_api_key = os.getenv("OPENAI_API_KEY")
if openai_base_url:
client_params['base_url'] = openai_base_url
if openai_api_key:
client_params['api_key'] = openai_api_key
client = Client(**client_params)
# Get API model name
current_model = session_models[session_id]
api_model_name = config.MODELS.get(current_model, config.DEFAULT_MODEL)
# Prepare messages for API
messages_for_api = [
{"role": msg["role"], "content": msg["content"]}
for msg in chat_sessions[session_id]
]
# Generate streaming response
stream = client.chat.completions.create(
model=api_model_name,
messages=messages_for_api,
stream=True,
web_search=False
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
full_response += content
yield f"data: {json.dumps({'content': content, 'session_id': session_id})}\n\n"
# Add complete response to session
chat_sessions[session_id].append({
"role": "assistant",
"content": full_response
})
yield f"data: {json.dumps({'done': True, 'session_id': session_id})}\n\n"
except Exception as e:
error_message = f"Error generating response: {str(e)}"
chat_sessions[session_id].append({
"role": "assistant",
"content": error_message
})
yield f"data: {json.dumps({'error': error_message, 'session_id': session_id})}\n\n"
return StreamingResponse(
generate_stream(),
media_type="text/plain",
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"}
)
@app.get("/sessions/{session_name}", response_model=SessionInfo)
async def get_session_info(session_name: str):
"""Get information about a specific session"""
if session_name not in chat_sessions:
raise HTTPException(status_code=404, detail="Session not found")
return SessionInfo(
session_id=session_name,
message_count=len(chat_sessions[session_name]),
model=session_models.get(session_name, config.DEFAULT_MODEL),
history=chat_sessions[session_name]
)
@app.get("/sessions")
async def list_sessions():
"""List all active sessions"""
sessions_info = []
for session_name in chat_sessions:
sessions_info.append({
"session_id": session_name,
"message_count": len(chat_sessions[session_name]),
"model": session_models.get(session_name, config.DEFAULT_MODEL)
})
return {"sessions": sessions_info}
@app.delete("/sessions/{session_name}")
async def delete_session(session_name: str):
"""Delete a specific session"""
if session_name not in chat_sessions:
raise HTTPException(status_code=404, detail="Session not found")
del chat_sessions[session_name]
if session_name in session_timestamps:
del session_timestamps[session_name]
if session_name in session_models:
del session_models[session_name]
return {"message": f"Session {session_name} deleted successfully"}
@app.post("/sessions/{session_name}/new")
async def create_new_session(session_name: str):
"""Create a new session or reset existing one"""
chat_sessions[session_name] = [
{"role": "assistant", "content": "Hello! I am ready to help you."}
]
session_timestamps[session_name] = []
session_models[session_name] = config.DEFAULT_MODEL
return {"message": f"Session {session_name} created/reset successfully"}
@app.get("/models")
async def get_available_models():
"""Get list of available models"""
return {
"models": list(config.MODELS.keys()),
"default_model": config.DEFAULT_MODEL
}
@app.get("/")
async def root():
"""API root endpoint"""
return {
"message": f"Welcome to {config.APP_NAME}",
"description": config.APP_DESCRIPTION,
"endpoints": {
"POST /chat": "Send a message (with ?session=name parameter)",
"POST /chat/stream": "Send a message with streaming response",
"GET /sessions": "List all sessions",
"GET /sessions/{name}": "Get session info",
"DELETE /sessions/{name}": "Delete a session",
"POST /sessions/{name}/new": "Create/reset a session",
"GET /models": "Get available models"
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)