XQ
Add LLM Provider Fallback
4d2a2da
raw
history blame
5.41 kB
"""FastAPI application entry point."""
import logging
import os
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from fastapi import FastAPI
from langchain_core.output_parsers import StrOutputParser
from src.config import load_settings
from src.provider import create_llm, create_llm_with_fallback, create_embeddings, create_reranker
from src.retrieval.embedder import Embedder
from src.retrieval.vector_store import VectorStore
from src.retrieval.bm25_search import BM25Search
from src.retrieval.hybrid import HybridRetriever
from src.retrieval.reranker import Reranker
from src.agent.intent_classifier import IntentClassifier
from src.agent.router import QueryRouter
from src.agent.plan_and_execute import PlanAndExecuteRouter
from src.agent.memory import ConversationMemory
from src.agent.session_store import SessionStore
from src.ingestion.pipeline import IngestionPipeline
from src.api.routes import router, set_dependencies
logger = logging.getLogger(__name__)
def create_app() -> FastAPI:
"""Create and configure the FastAPI application.
Returns:
Configured FastAPI application instance.
"""
settings = load_settings()
logging.basicConfig(level=getattr(logging, settings.log_level, logging.INFO))
# React mode's ReAct sub-agent calls llm.bind_tools(...) internally, which
# RunnableWithFallbacks does not support. Fall back chain is therefore only
# applied in pipeline mode; in react mode we warn and use the primary only.
if settings.llm_fallback_enabled and settings.agent_mode == "react":
logger.warning(
"LLM_FALLBACK_ENABLED is set but AGENT_MODE=react; fallback chain "
"is incompatible with tool-calling and will be DISABLED for this run."
)
llm = create_llm(settings)
else:
llm = create_llm_with_fallback(settings)
embeddings = create_embeddings(settings)
embedder = Embedder(embeddings=embeddings)
vector_store = VectorStore(
path=settings.qdrant_path,
collection_name=settings.collection_name,
dimension=settings.embedding_dimension,
url=settings.qdrant_url,
)
bm25_search = BM25Search()
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
"""Load stored chunks from Qdrant and rebuild the BM25 index on startup."""
chunks = vector_store.get_all_chunks()
if chunks:
bm25_search.index(chunks)
logger.info("Rebuilt BM25 index with %d chunks from Qdrant", len(chunks))
else:
logger.info("No existing chunks in Qdrant; BM25 index is empty")
yield
application = FastAPI(
title="KU Doc Assistant",
description="RAG-based document assistant for University of Copenhagen.",
version="0.1.0",
lifespan=lifespan,
)
hybrid_retriever = HybridRetriever(
vector_store=vector_store,
bm25_search=bm25_search,
embedder=embedder,
dense_weight=settings.dense_weight,
bm25_weight=settings.bm25_weight,
)
reranker = Reranker(model=create_reranker(settings.reranker_model))
if settings.agent_mode == "react":
logger.info("Agent mode: Plan-and-Execute (structured multi-step agent)")
query_router: QueryRouter | PlanAndExecuteRouter = PlanAndExecuteRouter(
llm=llm,
hybrid_retriever=hybrid_retriever,
reranker=reranker,
vector_store=vector_store,
default_top_k=settings.top_k,
memory=ConversationMemory(),
token_budget_enabled=settings.token_budget_enabled,
)
else:
logger.info("Agent mode: pipeline (fixed DAG)")
intent_classifier = IntentClassifier(llm=llm, model_name=settings.generation_model)
llm_chain = llm | StrOutputParser()
query_router = QueryRouter(
intent_classifier=intent_classifier,
hybrid_retriever=hybrid_retriever,
reranker=reranker,
llm_chain=llm_chain,
translate_query=settings.translate_query,
token_budget_enabled=settings.token_budget_enabled,
)
session_store = SessionStore(db_path=os.environ.get("SESSION_DB_PATH", "./data/sessions.db"))
set_dependencies(
query_router=query_router,
ingestion_pipeline=IngestionPipeline(
strategy=_parse_strategy(settings),
chunk_size=settings.chunk_size,
chunk_overlap=settings.chunk_overlap,
embeddings=embeddings,
),
embedder=embedder,
vector_store=vector_store,
bm25_search=bm25_search,
settings=settings,
session_store=session_store,
)
application.include_router(router)
logger.info("KU Doc Assistant application created successfully")
return application
def _parse_strategy(settings: "Settings") -> "ChunkStrategy": # noqa: F821
"""Return the chunking strategy from config, defaulting to SEMANTIC.
Reads the CHUNK_STRATEGY environment variable via settings. Falls back
to SEMANTIC when the variable is unset or empty.
"""
from src.models import ChunkStrategy
raw = getattr(settings, "chunk_strategy", "semantic")
try:
return ChunkStrategy(raw)
except ValueError:
return ChunkStrategy.SEMANTIC
app: FastAPI = create_app()