Archaeo commited on
Commit
82b65ff
·
verified ·
1 Parent(s): 9bb9cb8

Update app/main.py

Browse files
Files changed (1) hide show
  1. app/main.py +244 -239
app/main.py CHANGED
@@ -1,239 +1,244 @@
1
- from __future__ import annotations
2
-
3
- import asyncio
4
- import logging
5
- from contextlib import asynccontextmanager, suppress
6
- from datetime import datetime
7
- from pathlib import Path
8
- from typing import Annotated
9
-
10
- from fastapi import Depends, FastAPI, HTTPException, Query, Request
11
- from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse
12
- from fastapi.staticfiles import StaticFiles
13
- from fastapi.templating import Jinja2Templates
14
- from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
15
-
16
- from .cache import MeasurementCache
17
- from .config import Settings, get_settings
18
- from .fetcher import FetchResult, fetch_latest
19
- from .metrics import record_fetch, set_background_state, set_data_age
20
- from .models import HistoryResponse, LatestResponse, Measurement
21
-
22
- logger = logging.getLogger(__name__)
23
-
24
- BASE_DIR = Path(__file__).resolve().parent
25
- TEMPLATES = Jinja2Templates(directory=str(BASE_DIR / "templates"))
26
-
27
- @asynccontextmanager
28
- async def lifespan(app: FastAPI):
29
- logging.basicConfig(level=logging.INFO)
30
- await service.start()
31
- try:
32
- yield
33
- finally:
34
- await service.stop()
35
-
36
-
37
- app = FastAPI(title="Rheinpegel App", version="0.1.0", lifespan=lifespan)
38
- app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static")
39
-
40
-
41
- class PegelService:
42
- def __init__(self, settings: Settings, cache: MeasurementCache) -> None:
43
- self.settings = settings
44
- self.cache = cache
45
- self._lock = asyncio.Lock()
46
- self._task: asyncio.Task[None] | None = None
47
-
48
- async def start(self) -> None:
49
- if self._task and not self._task.done():
50
- return
51
-
52
- if len(self.cache) == 0:
53
- try:
54
- await self.fetch_and_update()
55
- except Exception as exc: # noqa: BLE001
56
- logger.warning("Initial fetch failed, cache remains empty: %s", exc)
57
-
58
- self._task = asyncio.create_task(self._run_loop())
59
-
60
- async def stop(self) -> None:
61
- if self._task:
62
- self._task.cancel()
63
- with suppress(asyncio.CancelledError):
64
- await self._task
65
- self._task = None
66
- set_background_state(False)
67
-
68
- def is_running(self) -> bool:
69
- return self._task is not None and not self._task.done()
70
-
71
- async def fetch_and_update(
72
- self,
73
- *,
74
- force_demo: bool = False,
75
- ) -> FetchResult:
76
- async with self._lock:
77
- previous = await self.cache.get_latest()
78
- result = await fetch_latest(self.settings, previous, force_demo=force_demo)
79
- await self.cache.add(result.measurement)
80
-
81
- self._update_metrics(result)
82
- return result
83
-
84
- async def latest(self) -> Measurement | None:
85
- return await self.cache.get_latest()
86
-
87
- async def history(self) -> list[Measurement]:
88
- return await self.cache.get_history()
89
-
90
- async def _run_loop(self) -> None:
91
- set_background_state(True)
92
- try:
93
- while True:
94
- try:
95
- await self.fetch_and_update()
96
- except Exception as exc: # noqa: BLE001
97
- logger.exception("Background fetch failure: %s", exc)
98
- await asyncio.sleep(self.settings.refresh_seconds)
99
- except asyncio.CancelledError:
100
- raise
101
- finally:
102
- set_background_state(False)
103
-
104
- def _update_metrics(self, result: FetchResult) -> None:
105
- outcome = "success"
106
- if result.error:
107
- outcome = "error"
108
- elif result.is_demo:
109
- outcome = "demo"
110
- record_fetch(result.latency_ms, outcome)
111
-
112
- if result.measurement:
113
- now = datetime.now(self.settings.timezone)
114
- age = max(0.0, (now - result.measurement.timestamp).total_seconds())
115
- set_data_age(age)
116
- else:
117
- set_data_age(None)
118
-
119
-
120
- settings = get_settings()
121
- cache = MeasurementCache()
122
- service = PegelService(settings=settings, cache=cache)
123
-
124
-
125
- async def get_service() -> PegelService:
126
- return service
127
-
128
-
129
- def _warning_levels() -> list[dict[str, str | int]]:
130
- return [
131
- {"label": "Normal", "range": "< 400 cm", "color": "badge-normal", "min": 0, "max": 399},
132
- {"label": "Aufmerksamkeit", "range": "400 – 599 cm", "color": "badge-attention", "min": 400, "max": 599},
133
- {"label": "Warnung", "range": "600 – 799 cm", "color": "badge-warning", "min": 600, "max": 799},
134
- {"label": "Alarm", "range": "≥ 800 cm", "color": "badge-alarm", "min": 800, "max": 9999},
135
- ]
136
-
137
-
138
- def _sparkline_path(measurements: list[Measurement]) -> str:
139
- if not measurements:
140
- return ""
141
- levels = [m.level_cm for m in measurements]
142
- min_level = min(levels)
143
- max_level = max(levels)
144
- span = max(max_level - min_level, 1)
145
- step = 100 / max(len(levels) - 1, 1)
146
- points = []
147
- for idx, level in enumerate(levels):
148
- x = idx * step
149
- y = 40 - ((level - min_level) / span * 40)
150
- points.append(f"{x:.2f},{y:.2f}")
151
- return "M " + " L ".join(points)
152
-
153
-
154
- @app.get("/", response_class=HTMLResponse)
155
- async def dashboard(
156
- request: Request,
157
- demo: Annotated[bool, Query(alias="demo")] = False,
158
- service: PegelService = Depends(get_service),
159
- ) -> HTMLResponse:
160
- if demo:
161
- await service.fetch_and_update(force_demo=True)
162
-
163
- latest = await service.latest()
164
- if latest is None:
165
- result = await service.fetch_and_update(force_demo=demo)
166
- latest = result.measurement
167
-
168
- history = await service.history()
169
- sparkline = history[-24:]
170
- sparkline_path = _sparkline_path(sparkline)
171
- initial_payload = {
172
- "latest": latest.model_dump(mode="json") if latest else None,
173
- "history": [item.model_dump(mode="json") for item in history],
174
- "autoRefresh": settings.refresh_seconds,
175
- "demo": demo or (latest.is_demo if latest else False),
176
- }
177
-
178
- context = {
179
- "request": request,
180
- "latest": latest,
181
- "history": history,
182
- "sparkline": sparkline,
183
- "sparkline_path": sparkline_path,
184
- "auto_refresh_seconds": settings.refresh_seconds,
185
- "timezone": settings.tz,
186
- "warning_levels": _warning_levels(),
187
- "demo_mode": demo or (latest.is_demo if latest else False),
188
- "initial_payload": initial_payload,
189
- }
190
- return TEMPLATES.TemplateResponse("index.html.j2", context)
191
-
192
-
193
- @app.get("/api/latest")
194
- async def api_latest(
195
- demo: Annotated[bool, Query(alias="demo")] = False,
196
- service: PegelService = Depends(get_service),
197
- ) -> JSONResponse:
198
- result = await service.fetch_and_update(force_demo=demo)
199
- history = await service.history()
200
- response = LatestResponse(
201
- measurement=result.measurement,
202
- history=history,
203
- latency_ms=result.latency_ms,
204
- error=result.error,
205
- demo_mode=result.is_demo,
206
- )
207
- return JSONResponse(response.model_dump(mode="json"))
208
-
209
-
210
- @app.get("/api/history")
211
- async def api_history(service: PegelService = Depends(get_service)) -> JSONResponse:
212
- history = await service.history()
213
- response = HistoryResponse(data=history, demo_mode=any(item.is_demo for item in history))
214
- return JSONResponse(response.model_dump(mode="json"))
215
-
216
-
217
- @app.get("/healthz")
218
- async def healthz(service: PegelService = Depends(get_service)) -> PlainTextResponse:
219
- if not service.is_running():
220
- raise HTTPException(status_code=503, detail="Background task not running")
221
- return PlainTextResponse("ok")
222
-
223
-
224
- @app.get("/metrics")
225
- async def metrics_endpoint(service: PegelService = Depends(get_service)) -> PlainTextResponse:
226
- latest = await service.latest()
227
- if latest is None:
228
- set_data_age(None)
229
- else:
230
- age = max(0.0, (datetime.now(settings.timezone) - latest.timestamp).total_seconds())
231
- set_data_age(age)
232
- data = generate_latest()
233
- return PlainTextResponse(content=data.decode("utf-8"), media_type=CONTENT_TYPE_LATEST)
234
-
235
-
236
- if __name__ == "__main__":
237
- import uvicorn
238
-
239
- uvicorn.run("app.main:app", host="0.0.0.0", port=settings.port, reload=True)
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import asyncio
4
+ import logging
5
+ # app/app/main.py
6
+ import os
7
+ from app.cache import Cache
8
+ from contextlib import asynccontextmanager, suppress
9
+ from datetime import datetime
10
+ from pathlib import Path
11
+ from typing import Annotated
12
+
13
+ from fastapi import Depends, FastAPI, HTTPException, Query, Request
14
+ from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse
15
+ from fastapi.staticfiles import StaticFiles
16
+ from fastapi.templating import Jinja2Templates
17
+ from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
18
+
19
+ from .cache import MeasurementCache
20
+ from .config import Settings, get_settings
21
+ from .fetcher import FetchResult, fetch_latest
22
+ from .metrics import record_fetch, set_background_state, set_data_age
23
+ from .models import HistoryResponse, LatestResponse, Measurement
24
+
25
+ logger = logging.getLogger(__name__)
26
+
27
+ CACHE_DIR = os.getenv("APP_CACHE_DIR", "/data")
28
+ cache = Cache(filename="cache.json") # nutzt intern APP_CACHE_DIR
29
+ BASE_DIR = Path(__file__).resolve().parent
30
+ TEMPLATES = Jinja2Templates(directory=str(BASE_DIR / "templates"))
31
+
32
+ @asynccontextmanager
33
+ async def lifespan(app: FastAPI):
34
+ logging.basicConfig(level=logging.INFO)
35
+ await service.start()
36
+ try:
37
+ yield
38
+ finally:
39
+ await service.stop()
40
+
41
+
42
+ app = FastAPI(title="Rheinpegel App", version="0.1.0", lifespan=lifespan)
43
+ app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static")
44
+
45
+
46
+ class PegelService:
47
+ def __init__(self, settings: Settings, cache: MeasurementCache) -> None:
48
+ self.settings = settings
49
+ self.cache = cache
50
+ self._lock = asyncio.Lock()
51
+ self._task: asyncio.Task[None] | None = None
52
+
53
+ async def start(self) -> None:
54
+ if self._task and not self._task.done():
55
+ return
56
+
57
+ if len(self.cache) == 0:
58
+ try:
59
+ await self.fetch_and_update()
60
+ except Exception as exc: # noqa: BLE001
61
+ logger.warning("Initial fetch failed, cache remains empty: %s", exc)
62
+
63
+ self._task = asyncio.create_task(self._run_loop())
64
+
65
+ async def stop(self) -> None:
66
+ if self._task:
67
+ self._task.cancel()
68
+ with suppress(asyncio.CancelledError):
69
+ await self._task
70
+ self._task = None
71
+ set_background_state(False)
72
+
73
+ def is_running(self) -> bool:
74
+ return self._task is not None and not self._task.done()
75
+
76
+ async def fetch_and_update(
77
+ self,
78
+ *,
79
+ force_demo: bool = False,
80
+ ) -> FetchResult:
81
+ async with self._lock:
82
+ previous = await self.cache.get_latest()
83
+ result = await fetch_latest(self.settings, previous, force_demo=force_demo)
84
+ await self.cache.add(result.measurement)
85
+
86
+ self._update_metrics(result)
87
+ return result
88
+
89
+ async def latest(self) -> Measurement | None:
90
+ return await self.cache.get_latest()
91
+
92
+ async def history(self) -> list[Measurement]:
93
+ return await self.cache.get_history()
94
+
95
+ async def _run_loop(self) -> None:
96
+ set_background_state(True)
97
+ try:
98
+ while True:
99
+ try:
100
+ await self.fetch_and_update()
101
+ except Exception as exc: # noqa: BLE001
102
+ logger.exception("Background fetch failure: %s", exc)
103
+ await asyncio.sleep(self.settings.refresh_seconds)
104
+ except asyncio.CancelledError:
105
+ raise
106
+ finally:
107
+ set_background_state(False)
108
+
109
+ def _update_metrics(self, result: FetchResult) -> None:
110
+ outcome = "success"
111
+ if result.error:
112
+ outcome = "error"
113
+ elif result.is_demo:
114
+ outcome = "demo"
115
+ record_fetch(result.latency_ms, outcome)
116
+
117
+ if result.measurement:
118
+ now = datetime.now(self.settings.timezone)
119
+ age = max(0.0, (now - result.measurement.timestamp).total_seconds())
120
+ set_data_age(age)
121
+ else:
122
+ set_data_age(None)
123
+
124
+
125
+ settings = get_settings()
126
+ cache = MeasurementCache()
127
+ service = PegelService(settings=settings, cache=cache)
128
+
129
+
130
+ async def get_service() -> PegelService:
131
+ return service
132
+
133
+
134
+ def _warning_levels() -> list[dict[str, str | int]]:
135
+ return [
136
+ {"label": "Normal", "range": "< 400 cm", "color": "badge-normal", "min": 0, "max": 399},
137
+ {"label": "Aufmerksamkeit", "range": "400 – 599 cm", "color": "badge-attention", "min": 400, "max": 599},
138
+ {"label": "Warnung", "range": "600 799 cm", "color": "badge-warning", "min": 600, "max": 799},
139
+ {"label": "Alarm", "range": "≥ 800 cm", "color": "badge-alarm", "min": 800, "max": 9999},
140
+ ]
141
+
142
+
143
+ def _sparkline_path(measurements: list[Measurement]) -> str:
144
+ if not measurements:
145
+ return ""
146
+ levels = [m.level_cm for m in measurements]
147
+ min_level = min(levels)
148
+ max_level = max(levels)
149
+ span = max(max_level - min_level, 1)
150
+ step = 100 / max(len(levels) - 1, 1)
151
+ points = []
152
+ for idx, level in enumerate(levels):
153
+ x = idx * step
154
+ y = 40 - ((level - min_level) / span * 40)
155
+ points.append(f"{x:.2f},{y:.2f}")
156
+ return "M " + " L ".join(points)
157
+
158
+
159
+ @app.get("/", response_class=HTMLResponse)
160
+ async def dashboard(
161
+ request: Request,
162
+ demo: Annotated[bool, Query(alias="demo")] = False,
163
+ service: PegelService = Depends(get_service),
164
+ ) -> HTMLResponse:
165
+ if demo:
166
+ await service.fetch_and_update(force_demo=True)
167
+
168
+ latest = await service.latest()
169
+ if latest is None:
170
+ result = await service.fetch_and_update(force_demo=demo)
171
+ latest = result.measurement
172
+
173
+ history = await service.history()
174
+ sparkline = history[-24:]
175
+ sparkline_path = _sparkline_path(sparkline)
176
+ initial_payload = {
177
+ "latest": latest.model_dump(mode="json") if latest else None,
178
+ "history": [item.model_dump(mode="json") for item in history],
179
+ "autoRefresh": settings.refresh_seconds,
180
+ "demo": demo or (latest.is_demo if latest else False),
181
+ }
182
+
183
+ context = {
184
+ "request": request,
185
+ "latest": latest,
186
+ "history": history,
187
+ "sparkline": sparkline,
188
+ "sparkline_path": sparkline_path,
189
+ "auto_refresh_seconds": settings.refresh_seconds,
190
+ "timezone": settings.tz,
191
+ "warning_levels": _warning_levels(),
192
+ "demo_mode": demo or (latest.is_demo if latest else False),
193
+ "initial_payload": initial_payload,
194
+ }
195
+ return TEMPLATES.TemplateResponse("index.html.j2", context)
196
+
197
+
198
+ @app.get("/api/latest")
199
+ async def api_latest(
200
+ demo: Annotated[bool, Query(alias="demo")] = False,
201
+ service: PegelService = Depends(get_service),
202
+ ) -> JSONResponse:
203
+ result = await service.fetch_and_update(force_demo=demo)
204
+ history = await service.history()
205
+ response = LatestResponse(
206
+ measurement=result.measurement,
207
+ history=history,
208
+ latency_ms=result.latency_ms,
209
+ error=result.error,
210
+ demo_mode=result.is_demo,
211
+ )
212
+ return JSONResponse(response.model_dump(mode="json"))
213
+
214
+
215
+ @app.get("/api/history")
216
+ async def api_history(service: PegelService = Depends(get_service)) -> JSONResponse:
217
+ history = await service.history()
218
+ response = HistoryResponse(data=history, demo_mode=any(item.is_demo for item in history))
219
+ return JSONResponse(response.model_dump(mode="json"))
220
+
221
+
222
+ @app.get("/healthz")
223
+ async def healthz(service: PegelService = Depends(get_service)) -> PlainTextResponse:
224
+ if not service.is_running():
225
+ raise HTTPException(status_code=503, detail="Background task not running")
226
+ return PlainTextResponse("ok")
227
+
228
+
229
+ @app.get("/metrics")
230
+ async def metrics_endpoint(service: PegelService = Depends(get_service)) -> PlainTextResponse:
231
+ latest = await service.latest()
232
+ if latest is None:
233
+ set_data_age(None)
234
+ else:
235
+ age = max(0.0, (datetime.now(settings.timezone) - latest.timestamp).total_seconds())
236
+ set_data_age(age)
237
+ data = generate_latest()
238
+ return PlainTextResponse(content=data.decode("utf-8"), media_type=CONTENT_TYPE_LATEST)
239
+
240
+
241
+ if __name__ == "__main__":
242
+ import uvicorn
243
+
244
+ uvicorn.run("app.main:app", host="0.0.0.0", port=settings.port, reload=True)