llm-ready-data / app /api /server.py
light-infer-chat's picture
add vector store
c2bb116
Raw
History Blame Contribute Delete
6.19 kB
from __future__ import annotations
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from app.config import get_settings
from app.core.auth.deps import init_auth_db
from app.core.database import pool_manager
from app.core.logger import get_logger
from app.core.redis_client import create_redis_client, close_redis
from app.core.scripts import load_scripts
from app.core.vector_store.deps import init_vector_store_db
from app.services.embeddings_service import EmbeddingService
from app.services.vector_store_service import VectorStoreService
from app.api.v1.router import api_v1_router
_logger = get_logger(__name__)
_settings = get_settings()
_embedding_service: EmbeddingService = EmbeddingService()
_vector_store_service: VectorStoreService = VectorStoreService(_embedding_service)
async def _self_ping():
import httpx
health_url = _settings.self_ping_url
while True:
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(health_url)
if response.status_code == 200:
_logger.info("Self-ping successful: %s", health_url)
else:
_logger.warning("Self-ping returned: %s - %s", health_url, response.status_code)
except Exception as exc:
_logger.error("Self-ping error: %s", exc)
await asyncio.sleep(900)
@asynccontextmanager
async def lifespan(app: FastAPI):
_logger.info("Initializing authentication database...")
await init_auth_db()
_logger.info("Authentication database initialized")
_logger.info("Initializing vector store database...")
await init_vector_store_db()
await _vector_store_service.init_db()
_logger.info("Vector store database initialized with %d stores", len(_vector_store_service.list_stores()))
_logger.info("Initializing embedding service (loading 384-dim model)...")
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, _embedding_service.load_model, 384)
# await loop.run_in_executor(None, _embedding_service.load_vision_model) # DISABLED (OOM mitigation)
_logger.info("Embedding service initialized with dims: %s", _embedding_service.loaded_dimensions)
_logger.info("Vector store service initialized with %d existing stores", len(_vector_store_service.list_stores()))
redis = create_redis_client(_settings.redis_url) if _settings.redis_url else None
scripts = await load_scripts(redis) if redis else {}
app.state.redis = redis
app.state.scripts = scripts
if redis:
_logger.info("Redis and Lua scripts initialized")
else:
_logger.warning("Redis not configured, running in degraded mode")
asyncio.create_task(_self_ping())
yield
_logger.info("Shutting down...")
await close_redis(redis)
await _vector_store_service.close_all()
await pool_manager.close_all()
def create_application() -> FastAPI:
app = FastAPI(
title=_settings.app_name,
description="All API Collection",
version=_settings.app_version,
docs_url="/docs",
redoc_url="/redoc",
openapi_tags=[
{"name": "Convert", "description": "Single-file and single-URL conversion"},
{"name": "Batch", "description": "Bulk conversion of files and URLs"},
{"name": "System", "description": "Health, info, and supported formats"},
{"name": "Embeddings", "description": "Text embedding generation using transformer models"},
{"name": "Verify", "description": "Phone number and identity verification"},
{"name": "Vector Stores", "description": "Create, manage, and search vector stores for RAG"},
],
lifespan=lifespan,
)
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_v1_router, prefix="/api/v1")
@app.get("/", include_in_schema=False)
async def root(request: Request):
from collections import defaultdict
routes_by_tag: dict[str, list[dict]] = defaultdict(list)
for route in app.routes:
if not hasattr(route, "methods") or not hasattr(route, "path"):
continue
if route.path in ("/", "/health", "/ping", "/openapi.json", "/docs", "/redoc", "/docs/oauth2-redirect"):
continue
tags = getattr(route, "tags", None) or ["default"]
for tag in tags:
routes_by_tag[tag].append({
"method": list(route.methods - {"HEAD", "OPTIONS"}),
"path": route.path,
"summary": getattr(route, "summary", ""),
})
return {
"name": _settings.app_name,
"version": _settings.app_version,
"docs": {
"swagger": str(request.base_url) + "docs",
"redoc": str(request.base_url) + "redoc",
},
"endpoints": [
{"tag": tag, "routes": sorted(routes, key=lambda r: r["path"])}
for tag, routes in sorted(routes_by_tag.items())
],
}
@app.get("/health", include_in_schema=False)
async def root_health():
store_count = len(_vector_store_service.list_stores())
doc_count = await _vector_store_service.get_total_document_count()
return {
"success": True,
"app_name": _settings.app_name,
"version": _settings.app_version,
"embedding_dimension": _settings.embedding_dimension,
"vector_store_count": store_count,
"total_documents": doc_count,
"model_loaded": _embedding_service.is_loaded(384),
}
@app.get("/ping", include_in_schema=False)
async def ping():
return {"name": f"{_settings.app_name}", "version": _settings.app_version}
return app
app = create_application()