File size: 2,938 Bytes
cf450f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import asyncio
import logging
import os
from contextlib import asynccontextmanager

from fastapi import (
    BackgroundTasks,
    FastAPI,
    File,
    HTTPException,
    UploadFile,
)

from kaig.db import DB

from . import flow
from .db import init_db
from .handlers.upload import upload_handler
from .ingestion import ingestion_loop

# DB selection
db_name = os.environ.get("DB_NAME")
if not db_name:
    raise ValueError("DB_NAME environment variable is not set")

# configure logging for httpx and httpcore to WARNING
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)

logger = logging.getLogger(__name__)


class Server:
    def __init__(self, db_name: str):
        self.db: DB = init_db(True, db_name)
        self.exe: flow.Executor = flow.Executor(self.db)
        ingestion_enabled = os.getenv(
            "KG_ENABLE_INGESTION", "false"
        ).lower() in {
            "1",
            "true",
            "yes",
            "on",
        }

        @asynccontextmanager
        async def lifespan(_app: FastAPI):
            logger.info("Application is starting up...")
            task = None
            if ingestion_enabled:
                task = asyncio.create_task(ingestion_loop(self.exe))
            else:
                logger.info("Ingestion disabled (set KG_ENABLE_INGESTION=true)")

            yield  # --- This is the point where the application runs ---

            logger.info("Application is shutting down...")

            if task is not None:
                # _ = task.cancel()
                # Call stop instead of cancelling the task
                self.exe.stop()

                try:
                    await task
                except asyncio.CancelledError:
                    logger.info(
                        "Background loop was cancelled during shutdown."
                    )

        # ----------------------------------------------------------------------
        self.app: FastAPI = FastAPI(lifespan=lifespan)

        # ----------------------------------------------------------------------
        # Routes

        @self.app.get("/")
        def read_root():  # pyright: ignore[reportUnusedFunction]
            return {"Hello": "World"}

        @self.app.post("/upload")
        async def upload(  # pyright: ignore[reportUnusedFunction]
            background_tasks: BackgroundTasks,
            file: UploadFile = File(...),  # pyright: ignore[reportCallInDefaultInitializer]
        ):
            if file.filename is None:
                raise HTTPException(status_code=400, detail="No file selected")

            def async_handler() -> None:
                cr = upload_handler(self.db, file)
                asyncio.run(cr)

            background_tasks.add_task(async_handler)


# ------------------------------------------------------------------------------
# FastAPI app

app = Server(db_name).app