File size: 5,649 Bytes
565e754 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
from typing import Optional, List, Dict, Any
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from src import RAG
from src.db_utils.history_utils import (
init_history_table,
log_query,
get_all_history,
get_history_by_dialogue,
search_history,
get_history_stats,
delete_history,
get_recent_dialogues
)
# --- Lifespan для инициализации при старте ---
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: инициализация таблицы истории
try:
init_history_table()
except Exception as e:
print(f"⚠️ Не удалось инициализировать таблицу истории: {e}")
yield
# Shutdown: ничего не делаем
app = FastAPI(
title="RAG API",
version="1.0.0",
lifespan=lifespan,
)
# --- Инициализация RAG один раз при старте ---
rag = RAG(
embed_model_name="deepvk/USER-bge-m3",
embed_index_name="recursive_USER-bge-m3",
)
# --- Request / Response схемы ---
class QueryRequest(BaseModel):
query: str
dialogue_id: Optional[str] = None
history: Optional[List[Dict[str, Any]]] = None # История диалога для контекста
class QueryResponse(BaseModel):
answer: str
reason: str
query_id: Optional[int] = None # ID записи в истории
class HistoryEntry(BaseModel):
id: int
timestamp: str
dialogue_id: str
query: str
answer: str
reason: Optional[str] = None
search_period: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
class HistoryStats(BaseModel):
total_queries: int
unique_dialogues: int
last_query_time: Optional[str] = None
first_query_time: Optional[str] = None
class DialogueInfo(BaseModel):
dialogue_id: str
message_count: int
started_at: Optional[str] = None
last_message_at: Optional[str] = None
# --- RAG Endpoint ---
@app.post("/rag", response_model=QueryResponse)
def rag_query(request: QueryRequest):
"""Основной endpoint для запросов к RAG. Логирует запрос после получения ответа."""
# Если передан dialogue_id, загружаем историю
history = None
if request.dialogue_id and not request.history:
history = get_history_by_dialogue(request.dialogue_id)
elif request.history:
history = request.history
# Получаем ответ от RAG с историей (история используется для обогащения вопроса)
result = rag.invoke(request.query, history=history)
# Логируем в историю
query_id = log_query(
query=request.query,
answer=result.get("answer", ""),
reason=result.get("reason", ""),
dialogue_id=request.dialogue_id
)
return QueryResponse(
answer=result.get("answer", ""),
reason=result.get("reason", ""),
query_id=query_id
)
# except Exception as e:
# raise HTTPException(
# status_code=500,
# detail=str(e)
# )
# --- History Endpoints ---
@app.get("/history", response_model=List[HistoryEntry])
def get_history(
limit: int = Query(default=100, ge=1, le=1000),
offset: int = Query(default=0, ge=0)
):
"""Получить историю запросов"""
return get_all_history(limit=limit, offset=offset)
@app.get("/history/stats", response_model=HistoryStats)
def get_stats():
"""Получить статистику по истории"""
stats = get_history_stats()
return HistoryStats(
total_queries=stats.get("total_queries", 0),
unique_dialogues=stats.get("unique_dialogues", 0),
last_query_time=stats.get("last_query_time"),
first_query_time=stats.get("first_query_time")
)
@app.get("/history/search", response_model=List[HistoryEntry])
def search_in_history(
q: str = Query(..., min_length=1, description="Текст для поиска"),
limit: int = Query(default=50, ge=1, le=500)
):
"""Поиск по истории запросов"""
return search_history(search_text=q, limit=limit)
@app.get("/history/dialogues", response_model=List[DialogueInfo])
def get_dialogues(
limit: int = Query(default=10, ge=1, le=100)
):
"""Получить список последних диалогов"""
return get_recent_dialogues(limit=limit)
@app.get("/history/dialogue/{dialogue_id}", response_model=List[HistoryEntry])
def get_dialogue(dialogue_id: str):
"""Получить историю конкретного диалога"""
return get_history_by_dialogue(dialogue_id)
@app.delete("/history")
def clear_history(dialogue_id: Optional[str] = None):
"""Удалить историю (всю или конкретного диалога)"""
try:
delete_history(dialogue_id=dialogue_id)
if dialogue_id:
return {"message": f"История диалога {dialogue_id} удалена"}
return {"message": "Вся история удалена"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# --- Healthcheck ---
@app.get("/health")
def health():
return {"status": "ok"}
# --- Entry point ---
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"server:app",
host="0.0.0.0",
port=8000,
reload=True,
)
|