Spaces:
Sleeping
Sleeping
File size: 2,938 Bytes
cf450f7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
import asyncio
import logging
import os
from contextlib import asynccontextmanager
from fastapi import (
BackgroundTasks,
FastAPI,
File,
HTTPException,
UploadFile,
)
from kaig.db import DB
from . import flow
from .db import init_db
from .handlers.upload import upload_handler
from .ingestion import ingestion_loop
# DB selection
db_name = os.environ.get("DB_NAME")
if not db_name:
raise ValueError("DB_NAME environment variable is not set")
# configure logging for httpx and httpcore to WARNING
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
class Server:
def __init__(self, db_name: str):
self.db: DB = init_db(True, db_name)
self.exe: flow.Executor = flow.Executor(self.db)
ingestion_enabled = os.getenv(
"KG_ENABLE_INGESTION", "false"
).lower() in {
"1",
"true",
"yes",
"on",
}
@asynccontextmanager
async def lifespan(_app: FastAPI):
logger.info("Application is starting up...")
task = None
if ingestion_enabled:
task = asyncio.create_task(ingestion_loop(self.exe))
else:
logger.info("Ingestion disabled (set KG_ENABLE_INGESTION=true)")
yield # --- This is the point where the application runs ---
logger.info("Application is shutting down...")
if task is not None:
# _ = task.cancel()
# Call stop instead of cancelling the task
self.exe.stop()
try:
await task
except asyncio.CancelledError:
logger.info(
"Background loop was cancelled during shutdown."
)
# ----------------------------------------------------------------------
self.app: FastAPI = FastAPI(lifespan=lifespan)
# ----------------------------------------------------------------------
# Routes
@self.app.get("/")
def read_root(): # pyright: ignore[reportUnusedFunction]
return {"Hello": "World"}
@self.app.post("/upload")
async def upload( # pyright: ignore[reportUnusedFunction]
background_tasks: BackgroundTasks,
file: UploadFile = File(...), # pyright: ignore[reportCallInDefaultInitializer]
):
if file.filename is None:
raise HTTPException(status_code=400, detail="No file selected")
def async_handler() -> None:
cr = upload_handler(self.db, file)
asyncio.run(cr)
background_tasks.add_task(async_handler)
# ------------------------------------------------------------------------------
# FastAPI app
app = Server(db_name).app
|