Spaces:
Sleeping
Sleeping
| # app.py | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import RedirectResponse, JSONResponse | |
| # Імпорти внутрішніх модулів | |
| # (файли мають існувати: ci_trigger_engine.py, ci_sync_monitor.py) | |
| from ci_trigger_engine import trigger_engine | |
| from ci_sync_monitor import sync_monitor | |
| app = FastAPI(title="ci_inputs_system", version="0.1.0") | |
| # Локальне сховище отриманих подій (для демо) | |
| inputs_store: list[dict] = [] | |
| # --- Системні маршрути --- | |
| def root(): | |
| # редірект на інтерактивну документацію | |
| return RedirectResponse(url="/docs") | |
| def healthcheck(): | |
| return JSONResponse({"status": "ok"}) | |
| # --- Базові API --- | |
| async def receive_input(req: Request): | |
| data = await req.json() | |
| inputs_store.append(data) | |
| # Безпечно викликаємо обробники, якщо вони реалізовані | |
| add_input = getattr(trigger_engine, "add_input", None) | |
| if callable(add_input): | |
| add_input(data) | |
| log_event = getattr(sync_monitor, "log_event", None) | |
| if callable(log_event): | |
| log_event(data) | |
| check_alignment = getattr(sync_monitor, "check_alignment", None) | |
| if callable(check_alignment): | |
| check_alignment(data) | |
| return {"status": "received"} | |
| async def get_all(): | |
| return inputs_store |