Spaces:
Running
Running
| """FastAPI application entry point.""" | |
| import logging | |
| import os | |
| from contextlib import asynccontextmanager | |
| from collections.abc import AsyncIterator | |
| from fastapi import FastAPI | |
| from langchain_core.output_parsers import StrOutputParser | |
| from src.config import load_settings | |
| from src.provider import create_llm, create_llm_with_fallback, create_embeddings, create_reranker | |
| from src.retrieval.embedder import Embedder | |
| from src.retrieval.vector_store import VectorStore | |
| from src.retrieval.bm25_search import BM25Search | |
| from src.retrieval.hybrid import HybridRetriever | |
| from src.retrieval.reranker import Reranker | |
| from src.agent.intent_classifier import IntentClassifier | |
| from src.agent.router import QueryRouter | |
| from src.agent.plan_and_execute import PlanAndExecuteRouter | |
| from src.agent.memory import ConversationMemory | |
| from src.agent.session_store import SessionStore | |
| from src.ingestion.pipeline import IngestionPipeline | |
| from src.api.routes import router, set_dependencies | |
| logger = logging.getLogger(__name__) | |
| def create_app() -> FastAPI: | |
| """Create and configure the FastAPI application. | |
| Returns: | |
| Configured FastAPI application instance. | |
| """ | |
| settings = load_settings() | |
| logging.basicConfig(level=getattr(logging, settings.log_level, logging.INFO)) | |
| # React mode's ReAct sub-agent calls llm.bind_tools(...) internally, which | |
| # RunnableWithFallbacks does not support. Fall back chain is therefore only | |
| # applied in pipeline mode; in react mode we warn and use the primary only. | |
| if settings.llm_fallback_enabled and settings.agent_mode == "react": | |
| logger.warning( | |
| "LLM_FALLBACK_ENABLED is set but AGENT_MODE=react; fallback chain " | |
| "is incompatible with tool-calling and will be DISABLED for this run." | |
| ) | |
| llm = create_llm(settings) | |
| else: | |
| llm = create_llm_with_fallback(settings) | |
| embeddings = create_embeddings(settings) | |
| embedder = Embedder(embeddings=embeddings) | |
| vector_store = VectorStore( | |
| path=settings.qdrant_path, | |
| collection_name=settings.collection_name, | |
| dimension=settings.embedding_dimension, | |
| url=settings.qdrant_url, | |
| ) | |
| bm25_search = BM25Search() | |
| async def lifespan(_app: FastAPI) -> AsyncIterator[None]: | |
| """Load stored chunks from Qdrant and rebuild the BM25 index on startup.""" | |
| chunks = vector_store.get_all_chunks() | |
| if chunks: | |
| bm25_search.index(chunks) | |
| logger.info("Rebuilt BM25 index with %d chunks from Qdrant", len(chunks)) | |
| else: | |
| logger.info("No existing chunks in Qdrant; BM25 index is empty") | |
| yield | |
| application = FastAPI( | |
| title="KU Doc Assistant", | |
| description="RAG-based document assistant for University of Copenhagen.", | |
| version="0.1.0", | |
| lifespan=lifespan, | |
| ) | |
| hybrid_retriever = HybridRetriever( | |
| vector_store=vector_store, | |
| bm25_search=bm25_search, | |
| embedder=embedder, | |
| dense_weight=settings.dense_weight, | |
| bm25_weight=settings.bm25_weight, | |
| ) | |
| reranker = Reranker(model=create_reranker(settings.reranker_model)) | |
| if settings.agent_mode == "react": | |
| logger.info("Agent mode: Plan-and-Execute (structured multi-step agent)") | |
| query_router: QueryRouter | PlanAndExecuteRouter = PlanAndExecuteRouter( | |
| llm=llm, | |
| hybrid_retriever=hybrid_retriever, | |
| reranker=reranker, | |
| vector_store=vector_store, | |
| default_top_k=settings.top_k, | |
| memory=ConversationMemory(), | |
| token_budget_enabled=settings.token_budget_enabled, | |
| ) | |
| else: | |
| logger.info("Agent mode: pipeline (fixed DAG)") | |
| intent_classifier = IntentClassifier(llm=llm, model_name=settings.generation_model) | |
| llm_chain = llm | StrOutputParser() | |
| query_router = QueryRouter( | |
| intent_classifier=intent_classifier, | |
| hybrid_retriever=hybrid_retriever, | |
| reranker=reranker, | |
| llm_chain=llm_chain, | |
| translate_query=settings.translate_query, | |
| token_budget_enabled=settings.token_budget_enabled, | |
| ) | |
| session_store = SessionStore(db_path=os.environ.get("SESSION_DB_PATH", "./data/sessions.db")) | |
| set_dependencies( | |
| query_router=query_router, | |
| ingestion_pipeline=IngestionPipeline( | |
| strategy=_parse_strategy(settings), | |
| chunk_size=settings.chunk_size, | |
| chunk_overlap=settings.chunk_overlap, | |
| embeddings=embeddings, | |
| ), | |
| embedder=embedder, | |
| vector_store=vector_store, | |
| bm25_search=bm25_search, | |
| settings=settings, | |
| session_store=session_store, | |
| ) | |
| application.include_router(router) | |
| logger.info("KU Doc Assistant application created successfully") | |
| return application | |
| def _parse_strategy(settings: "Settings") -> "ChunkStrategy": # noqa: F821 | |
| """Return the chunking strategy from config, defaulting to SEMANTIC. | |
| Reads the CHUNK_STRATEGY environment variable via settings. Falls back | |
| to SEMANTIC when the variable is unset or empty. | |
| """ | |
| from src.models import ChunkStrategy | |
| raw = getattr(settings, "chunk_strategy", "semantic") | |
| try: | |
| return ChunkStrategy(raw) | |
| except ValueError: | |
| return ChunkStrategy.SEMANTIC | |
| app: FastAPI = create_app() | |