---
title: BP Monitoring Pipeline
emoji: π©Ί
colorFrom: red
colorTo: blue
sdk: docker
app_port: 7860
pinned: false
---
# BP Monitoring Pipeline
### Clean Architecture & System Design Documentation





*Dokumen ini menjelaskan arsitektur, design patterns, dan keputusan teknis yang digunakan dalam pipeline data monitoring tekanan darah berbasis sinyal PPG (Photoplethysmography) dari perangkat IoT.*
---
**Status**: `Production-Ready Design` Β· **Versi**: `1.0.0` Β· **Terakhir Diperbarui**: `30 Mei 2026`
---
## π Daftar Isi
- [1. Executive Summary](#1-executive-summary)
- [2. Architecture Overview](#2-architecture-overview)
- [2.1. High-Level System Architecture](#21-high-level-system-architecture)
- [2.2. Clean Architecture Layers](#22-clean-architecture-layers)
- [2.3. Dependency Rule](#23-dependency-rule)
- [3. Layer-by-Layer Breakdown](#3-layer-by-layer-breakdown)
- [3.1. Domain Layer](#31-domain-layer)
- [3.2. Application Layer](#32-application-layer)
- [3.3. Infrastructure Layer](#33-infrastructure-layer)
- [3.4. Interface Layer](#34-interface-layer)
- [3.5. Shared Module](#35-shared-module)
- [4. Design Principles](#4-design-principles)
- [4.1. SOLID Principles](#41-solid-principles)
- [4.2. DRY (Don't Repeat Yourself)](#42-dry-dont-repeat-yourself)
- [5. Design Patterns](#5-design-patterns)
- [6. Data Flow & ETL Pipeline](#6-data-flow--etl-pipeline)
- [6.1. ETL #1 β Data Ingestion](#61-etl-1--data-ingestion)
- [6.2. ETL #2 β Processing & Inference](#62-etl-2--processing--inference)
- [6.3. End-to-End Data Flow](#63-end-to-end-data-flow)
- [7. Project Structure](#7-project-structure)
- [8. Deployment Architecture](#8-deployment-architecture)
- [8.1. Deployment Topology](#81-deployment-topology)
- [8.2. Technology Stack](#82-technology-stack)
- [8.3. Entry Points](#83-entry-points)
- [9. Configuration Management](#9-configuration-management)
- [10. Testing Strategy](#10-testing-strategy)
- [11. Glossary](#11-glossary)
- [12. References](#12-references)
---
## 1. Executive Summary
**BP Monitoring Pipeline** adalah sistem data pipeline yang dirancang untuk memproses sinyal **PPG (Photoplethysmography)** dari perangkat IoT melalui smartphone, kemudian melakukan inferensi menggunakan model deep learning (**GAN** dan **VGTL-Net**) untuk memprediksi **ABP (Arterial Blood Pressure)** secara non-invasif.
### Tujuan Utama
| # | Tujuan | Deskripsi |
|---|--------|-----------|
| 1 | **Reliability** | Menjamin data PPG dari sensor IoT tidak hilang selama proses transmisi dan penyimpanan |
| 2 | **Scalability** | Arsitektur yang dapat berkembang seiring bertambahnya jumlah sensor dan pengguna |
| 3 | **Maintainability** | Codebase yang mudah dipahami, dimodifikasi, dan di-debug oleh seluruh anggota tim |
| 4 | **Replaceability** | Setiap komponen eksternal (database, message broker, model AI) dapat diganti tanpa mengubah business logic |
| 5 | **Testability** | Setiap layer dapat di-test secara independen melalui unit test dan integration test |
### Keputusan Arsitektural Utama
| Keputusan | Pilihan | Alasan |
|-----------|---------|--------|
| Architecture Style | Clean Architecture | Separation of concerns, dependency inversion, framework independence |
| Communication Pattern | Asynchronous Messaging (Message Queue) | Decoupling antara Data Node dan Model Node |
| Data Pipeline Pattern | ETL (Extract-Transform-Load) | Data harus di-transform sebelum masuk ke downstream (model dan database) |
| Deployment Strategy | Multi-platform (HF Spaces + Kaggle) | Cost optimization ($0) dengan memanfaatkan free tier |
---
## 2. Architecture Overview
### 2.1. High-Level System Architecture
Sistem terdiri dari **tiga node** utama yang berkomunikasi secara asinkron:
```
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SYSTEM ARCHITECTURE β
β β
β ββββββββββββ ββββββββββββββββ ββββββββββββββββββββ β
β β IoT Node ββββββββββ Data Node ββββββββββ Model Node β β
β β (Sensor β PPG β (ETL #1) β Queue β (ETL #2) β β
β β + HP) β Signal β β β β β
β ββββββββββββ β ββββββββββ β β ββββββββββββββ β β
β β βFastAPI β β β β GAN β β β
β β β+ ETL β β β β + VGTL-Net β β β
β β ββββββββββ β β ββββββββββββββ β β
β ββββββββ¬ββββββββ ββββββββββ¬ββββββββββ β
β β β β
β βΌ βΌ β
β ββββββββββββββββββββββββββββββββββββ β
β β Database β β
β β (Raw PPG + Predictions) β β
β ββββββββββββββββββββββββββββββββββββ β
β β² β
β β β
β ββββββββ΄ββββββββ β
β β Application β β
β β Node β β
β β (Frontend + β β
β β Backend) β β
β ββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
```
### 2.2. Clean Architecture Layers
Seluruh codebase diorganisasi ke dalam **empat layer konsentris** sesuai prinsip Clean Architecture oleh Robert C. Martin:
```
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β βββββββββββββββββββββββββββββββββββββββββββββ β
β β INTERFACE LAYER β β
β β (FastAPI Routes, CLI, Consumer) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββ β β
β β β INFRASTRUCTURE LAYER β β β
β β β (PostgreSQL, RabbitMQ, SciPy, β β β
β β β Model Service Implementations) β β β
β β β β β β
β β β βββββββββββββββββββββββββββββ β β β
β β β β APPLICATION LAYER β β β β
β β β β (Use Cases, DTOs) β β β β
β β β β β β β β
β β β β βββββββββββββββββββ β β β β
β β β β β DOMAIN LAYER β β β β β
β β β β β (Entities, β β β β β
β β β β β Interfaces, β β β β β
β β β β β Value Objects)β β β β β
β β β β βββββββββββββββββββ β β β β
β β β βββββββββββββββββββββββββββββ β β β
β β βββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
```
### 2.3. Dependency Rule
> **"Source code dependencies harus selalu mengarah KE DALAM (ke arah Domain Layer)."**
> β Robert C. Martin, *Clean Architecture* (2017)
```
Interface ββdepends onβββ Application ββdepends onβββ Domain
β²
Infrastructure ββββββββββββββimplementsβββββββββββββββββ
```
**Implikasi penting:**
- Domain Layer **tidak mengetahui** keberadaan PostgreSQL, RabbitMQ, FastAPI, atau framework apapun.
- Application Layer (Use Cases) bergantung pada **interfaces** yang didefinisikan di Domain, bukan pada implementasi konkret.
- Infrastructure Layer **mengimplementasikan** interfaces Domain, sehingga bisa di-swap tanpa mengubah business logic.
- Interface Layer bertindak sebagai **entry point** β hanya melakukan wiring/composition.
---
## 3. Layer-by-Layer Breakdown
### 3.1. Domain Layer
**Lokasi**: `src/domain/`
Domain Layer adalah **inti dari sistem** β berisi business rules murni yang tidak bergantung pada teknologi apapun. Layer ini harus bisa berjalan **tanpa framework, tanpa database, tanpa network**.
#### 3.1.1. Entities
Entity merepresentasikan objek bisnis utama dengan identitas dan lifecycle.
| Entity | File | Deskripsi | Key Properties |
|--------|------|-----------|----------------|
| `PPGSignal` | `entities/ppg_signal.py` | Representasi sinyal PPG yang masuk dari sensor | `device_id`, `user_id`, `timestamp`, `sampling_rate`, `ppg_values`, `duration_seconds` |
| `BPPrediction` | `entities/prediction.py` | Hasil prediksi tekanan darah dari model AI | `predicted_sbp`, `predicted_dbp`, `model_version`, `inference_time_ms` |
**Karakteristik Entity:**
- Menggunakan `@dataclass` murni Python β **bukan** SQLAlchemy model, **bukan** Pydantic model.
- Berisi **domain validation** (`validate()`) yang merepresentasikan business rules.
- Berisi **computed properties** (e.g., `num_samples`, `mean_arterial_pressure`).
#### 3.1.2. Value Objects
Value Object adalah objek immutable yang didefinisikan oleh atributnya, bukan identitasnya.
| Value Object | File | Deskripsi |
|-------------|------|-----------|
| `DeviceInfo` | `value_objects/device_info.py` | Informasi perangkat sensor IoT |
| `SignalMetadata` | `value_objects/signal_metadata.py` | Metadata sinyal (sampling rate, durasi, jumlah sampel) |
#### 3.1.3. Interfaces (Contracts)
Interfaces mendefinisikan **kontrak** yang harus dipenuhi oleh Infrastructure Layer. Menggunakan Abstract Base Class (ABC).
| Interface | File | Kontrak |
|-----------|------|---------|
| `BaseRepository[T]` | `interfaces/repositories/base_repository.py` | Generic CRUD: `add()`, `get_by_id()`, `get_all()`, `delete()` |
| `PPGRepository` | `interfaces/repositories/ppg_repository.py` | Extends `BaseRepository[PPGSignal]` + `get_by_user()`, `get_by_device()` |
| `PredictionRepository` | `interfaces/repositories/prediction_repository.py` | Extends `BaseRepository[BPPrediction]` + query spesifik prediksi |
| `MessageBroker` | `interfaces/services/message_broker.py` | `publish()`, `consume()`, `connect()`, `disconnect()` |
| `SignalProcessor` | `interfaces/services/signal_processor.py` | `filter_signal()`, `normalize()`, `segment()`, `process()` |
| `ModelService` | `interfaces/services/model_service.py` | `load_model()`, `predict()` |
> **π Catatan Penting**: Interface `MessageBroker` tidak menyebut "RabbitMQ" dimanapun. Jika di kemudian hari ingin berpindah ke **Apache Kafka** atau **Redis Streams**, cukup buat implementasi baru tanpa mengubah satu baris pun di Domain atau Application Layer.
#### 3.1.4. Domain Exceptions
```
DomainException (base)
βββ InvalidSignalError
βββ PredictionOutOfRangeError
βββ EntityNotFoundError
```
---
### 3.2. Application Layer
**Lokasi**: `src/application/`
Application Layer berisi **use cases** β orkestrasi langkah-langkah business logic untuk memenuhi satu kebutuhan pengguna yang spesifik.
#### 3.2.1. Use Cases
| Use Case | File | Deskripsi | Input β Output |
|----------|------|-----------|----------------|
| `IngestPPGUseCase` | `use_cases/ingest_ppg.py` | ETL #1: Terima PPG β Store β Publish | `PPGIngestRequest` β `PPGIngestResponse` |
| `ProcessAndPredictUseCase` | `use_cases/process_and_predict.py` | ETL #2: Consume β Preprocess β Predict β Store | `dict (message)` β `BPPrediction` |
| `GetPredictionHistoryUseCase` | `use_cases/get_prediction_history.py` | Query riwayat prediksi per user | `user_id, date_range` β `list[BPPrediction]` |
**Prinsip Use Case:**
- Setiap use case memiliki **satu method publik**: `execute()`.
- Dependencies di-inject melalui **constructor** (Dependency Inversion).
- Use case **tidak mengetahui** dari mana dia dipanggil (API? CLI? Consumer?).
```
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β IngestPPGUseCase β
β β
β __init__(ppg_repo: PPGRepository, broker: MessageBroker) β
β β² β² β
β β interface β interface β
β β β β
β execute(request: PPGIngestRequest) β PPGIngestResponse β
β 1. Create PPGSignal entity β
β 2. entity.validate() β
β 3. ppg_repo.add(entity) β
β 4. broker.publish("ppg_queue", message) β
β 5. Return response DTO β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
```
#### 3.2.2. Data Transfer Objects (DTOs)
DTO digunakan untuk transfer data antar layer boundary. Menggunakan **Pydantic** untuk validasi otomatis.
| DTO | Tipe | Deskripsi |
|-----|------|-----------|
| `PPGIngestRequest` | Input | Data yang diterima dari HP via API |
| `PPGIngestResponse` | Output | Response setelah ingest berhasil |
| `PredictionResponse` | Output | Hasil prediksi untuk ditampilkan ke frontend |
> **Kenapa bukan Entity langsung?**
> Entity adalah representasi domain murni. DTO adalah representasi "bentuk data" yang masuk/keluar dari sistem. Memisahkan keduanya mencegah domain entity terkontaminasi oleh kebutuhan serialization/validation framework.
---
### 3.3. Infrastructure Layer
**Lokasi**: `src/infrastructure/`
Infrastructure Layer berisi **implementasi konkret** dari interfaces yang didefinisikan di Domain Layer. Layer ini berhubungan langsung dengan teknologi pihak ketiga.
#### 3.3.1. Database
| Komponen | File | Deskripsi |
|----------|------|-----------|
| `connection.py` | `database/connection.py` | SQLAlchemy async engine + session factory |
| `Base` | `database/models/base.py` | Declarative base dengan `id` dan `created_at` otomatis |
| `PPGModel` | `database/models/ppg_model.py` | ORM model untuk tabel `raw_ppg` |
| `PredictionModel` | `database/models/prediction_model.py` | ORM model untuk tabel `predictions` |
| `SQLAlchemyBaseRepository` | `database/repositories/sqlalchemy_base.py` | Generic CRUD implementation (DRY) |
| `SQLAlchemyPPGRepository` | `database/repositories/ppg_repository.py` | Implementasi `PPGRepository` |
| `SQLAlchemyPredictionRepository` | `database/repositories/prediction_repository.py` | Implementasi `PredictionRepository` |
**Mapping Pattern:**
```
Domain Entity (PPGSignal) ββββ _to_entity() βββ ORM Model (PPGModel)
βββ _to_model() ββββ
```
Setiap repository konkret wajib mengimplementasikan dua method mapping:
- `_to_entity(model) β entity` β Mengkonversi ORM model ke domain entity.
- `_to_model(entity) β model` β Mengkonversi domain entity ke ORM model.
#### 3.3.2. Messaging
| Komponen | File | Deskripsi |
|----------|------|-----------|
| `RabbitMQBroker` | `messaging/rabbitmq_broker.py` | Implementasi `MessageBroker` menggunakan `aio-pika` |
Mendukung koneksi **AMQP** (lokal) dan **AMQPS** (CloudAMQP β SSL) melalui konfigurasi URL.
#### 3.3.3. Signal Processing
| Komponen | File | Deskripsi |
|----------|------|-----------|
| `ScipySignalProcessor` | `processing/scipy_signal_processor.py` | Implementasi `SignalProcessor` menggunakan SciPy |
Pipeline pemrosesan sinyal:
```
Raw PPG Signal
β
βΌ
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Bandpass Filter βββββββ Normalization βββββββ Segmentation β
β (0.5 β 8.0 Hz) β β (Z-score) β β (8s windows) β
β Butterworth 4th β β β β β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β
βΌ
Processed Segments
shape: (N, window_size)
```
#### 3.3.4. Model Service
| Komponen | File | Deskripsi |
|----------|------|-----------|
| `MockModelService` | `model/mock_model_service.py` | Mock implementation untuk testing tanpa model asli |
| `GANVGTLNetService` | `model/gan_vgtlnet_service.py` | Implementasi production: GAN + VGTL-Net inference |
---
### 3.4. Interface Layer
**Lokasi**: `src/interface/`
Interface Layer adalah **entry point** ke sistem. Layer ini bertanggung jawab atas:
- Menerima input dari dunia luar (HTTP request, message queue, CLI command).
- Melakukan **dependency wiring** (composition root).
- Memanggil use case yang sesuai.
- Mengembalikan output dalam format yang diharapkan.
#### 3.4.1. API (Entry Point 1 β HF Spaces)
| Komponen | File | Deskripsi |
|----------|------|-----------|
| `create_app()` | `api/app.py` | FastAPI application factory |
| `dependencies.py` | `api/dependencies.py` | Dependency injection wiring menggunakan `Depends()` |
| `ppg_routes.py` | `api/routes/ppg_routes.py` | REST endpoint untuk ingest PPG |
| `prediction_routes.py` | `api/routes/prediction_routes.py` | REST endpoint untuk query predictions |
#### 3.4.2. Consumer (Entry Point 2 β Kaggle)
| Komponen | File | Deskripsi |
|----------|------|-----------|
| `run_consumer.py` | `consumer/run_consumer.py` | Standalone consumer service |
| `colab_notebook.ipynb` | `consumer/colab_notebook.ipynb` | Jupyter notebook untuk menjalankan consumer di Colab |
> **π Kedua entry point menggunakan use case, repository, dan interface yang PERSIS SAMA.** Perbedaannya hanya pada cara bootstrapping dan dependency wiring.
---
### 3.5. Shared Module
**Lokasi**: `src/shared/`
Cross-cutting concerns yang digunakan oleh seluruh layer.
| Komponen | File | Deskripsi |
|----------|------|-----------|
| `Settings` | `config.py` | Konfigurasi aplikasi dari environment variables (Pydantic Settings) |
| `get_logger()` | `logger.py` | Logging factory dengan format konsisten |
| `constants.py` | `constants.py` | Named constants (menghilangkan magic numbers) |
---
## 4. Design Principles
### 4.1. SOLID Principles
#### **S β Single Responsibility Principle**
> *"Sebuah class harus memiliki satu, dan hanya satu, alasan untuk berubah."*
| Komponen | Responsibility Tunggal |
|----------|----------------------|
| `PPGSignal` | Merepresentasikan sinyal PPG dan validasi domain-nya |
| `IngestPPGUseCase` | Mengorkestrasikan alur ingest data PPG |
| `ScipySignalProcessor` | Memproses sinyal menggunakan SciPy |
| `SQLAlchemyPPGRepository` | Mempersist PPGSignal ke PostgreSQL |
| `RabbitMQBroker` | Mengirim/menerima pesan via RabbitMQ |
| `ppg_routes.py` | Menangani HTTP routing untuk endpoint PPG |
#### **O β Open/Closed Principle**
> *"Entitas software harus terbuka untuk ekstensi, tertutup untuk modifikasi."*
| Skenario | Solusi (Tanpa Modifikasi) |
|----------|--------------------------|
| Ganti PostgreSQL β MongoDB | Buat `MongoDBPPGRepository` yang implement `PPGRepository`. Use case **tidak berubah**. |
| Ganti RabbitMQ β Kafka | Buat `KafkaBroker` yang implement `MessageBroker`. Use case **tidak berubah**. |
| Ganti SciPy β PyTorch preprocessing | Buat `TorchSignalProcessor` yang implement `SignalProcessor`. Use case **tidak berubah**. |
| Tambah model baru selain GAN+VGTL-Net | Buat `NewModelService` yang implement `ModelService`. Use case **tidak berubah**. |
#### **L β Liskov Substitution Principle**
> *"Objek dari superclass harus bisa digantikan oleh objek subclass tanpa merusak program."*
```python
# Keduanya bisa digunakan di mana pun PPGRepository dibutuhkan
repo: PPGRepository = SQLAlchemyPPGRepository(session) # β
Production
repo: PPGRepository = InMemoryPPGRepository() # β
Testing
# Keduanya bisa digunakan di mana pun MessageBroker dibutuhkan
broker: MessageBroker = RabbitMQBroker() # β
Production
broker: MessageBroker = InMemoryBroker() # β
Testing
```
#### **I β Interface Segregation Principle**
> *"Client tidak boleh dipaksa bergantung pada interface yang tidak mereka gunakan."*
```
βββββ PPGRepository βββββ
β get_by_user() β
BaseRepository[T] β get_by_device() β
add() ββββββββββββββββββββββββ
get_by_id()
get_all() βββ PredictionRepository βββ
delete() β get_by_user_latest() β
β get_by_date_range() β
ββββββββββββββββββββββββββββ
```
- `IngestPPGUseCase` hanya bergantung pada `PPGRepository` dan `MessageBroker` β **bukan** pada `PredictionRepository` atau `SignalProcessor`.
- `ProcessAndPredictUseCase` bergantung pada `PPGRepository`, `PredictionRepository`, `SignalProcessor`, dan `ModelService` β masing-masing interface kecil dan fokus.
#### **D β Dependency Inversion Principle**
> *"Module high-level tidak boleh bergantung pada module low-level. Keduanya harus bergantung pada abstraksi."*
```
ββββββββββββββββββββ ββββββββββββββββββββββββ
β Use Case β β Interface (ABC) β
β (High-Level) βββββββββ PPGRepository β
β β β MessageBroker β
ββββββββββββββββββββ ββββββββββββ¬ββββββββββββ
β
β implements
β
ββββββββββββΌββββββββββββ
β Concrete Class β
β (Low-Level) β
β SQLAlchemyPPGRepo β
β RabbitMQBroker β
ββββββββββββββββββββββββ
```
Use case menerima dependency melalui **constructor injection**:
```python
class IngestPPGUseCase:
def __init__(
self,
ppg_repo: PPGRepository, # β interface, bukan SQLAlchemyPPGRepository
broker: MessageBroker, # β interface, bukan RabbitMQBroker
):
self._ppg_repo = ppg_repo
self._broker = broker
```
---
### 4.2. DRY (Don't Repeat Yourself)
| Penerapan | Lokasi | Sebelum (Redundant) | Sesudah (DRY) |
|-----------|--------|---------------------|---------------|
| Generic CRUD | `SQLAlchemyBaseRepository` | Setiap repository menulis `add()`, `get_by_id()`, `delete()` sendiri-sendiri | Tulis sekali di base class, semua repo inherit |
| ORM Base Model | `database/models/base.py` | Setiap ORM model mendefinisikan `id` dan `created_at` | Definisi di `Base`, semua model inherit |
| Magic Numbers | `shared/constants.py` | `0.5` dan `8.0` tersebar di banyak file | `PPG_BANDPASS_LOW = 0.5` didefinisikan sekali |
| Signal Processing Pipeline | `SignalProcessor.process()` | Urutan `filter β normalize β segment` ditulis ulang di setiap pemanggil | Template Method di base class: `process()` memanggil tiga langkah secara berurutan |
| Logger Creation | `shared/logger.py` | `logging.getLogger()` + handler setup di setiap file | `get_logger(__name__)` β factory function |
---
## 5. Design Patterns
| Pattern | Lokasi | Penggunaan |
|---------|--------|------------|
| **Repository Pattern** | `domain/interfaces/repositories/` | Abstraksi akses data, memisahkan domain dari persistence layer |
| **Factory Pattern** | `interface/api/app.py` β `create_app()` | Membuat instance FastAPI application dengan konfigurasi yang konsisten |
| **Dependency Injection** | `interface/api/dependencies.py` | Wiring dependencies menggunakan FastAPI `Depends()` |
| **Template Method** | `SignalProcessor.process()` | Mendefinisikan skeleton algoritma; subclass override langkah-langkah spesifik |
| **Data Transfer Object (DTO)** | `application/dto/` | Transfer data antar layer boundary tanpa mengekspos domain entity |
| **Data Mapper** | `_to_entity()` / `_to_model()` | Mapping antara domain entity dan ORM model |
| **Observer (Pub/Sub)** | `MessageBroker` + RabbitMQ | Publisher (ETL #1) dan Consumer (ETL #2) loosely coupled melalui message queue |
| **Strategy Pattern** | `SignalProcessor`, `ModelService` | Algoritma preprocessing dan model inference dapat di-swap melalui interface |
| **Singleton** | `_broker` di `dependencies.py` | Satu instance broker di-share ke seluruh request |
---
## 6. Data Flow & ETL Pipeline
### 6.1. ETL #1 β Data Ingestion
**Dijalankan di**: π€ Hugging Face Spaces
**Trigger**: HTTP POST request dari mobile app
```
βββββββββββ ββββββββββββββββββββββββββββββββββββββββββββββββββ
β Mobile β β ETL #1 (HF Spaces) β
β App β β β
β (HP) ββββββββ βββββββββββββ ββββββββββββββββ ββββββββββ β
β β POST β β EXTRACT ββ β TRANSFORM ββ β LOAD β β
βββββββββββ β β β β β β β β
β β Parse β β Create β β ββββββ β β
β β JSON body β β PPGSignal β β β DB β β β
β β Validate β β entity β β ββββββ β β
β β DTO β β domain β β ββββββ β β
β β β β validation β β β MQ β β β
β βββββββββββββ ββββββββββββββββ β ββββββ β β
β ββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββ
```
| Tahap | Aksi | Output |
|-------|------|--------|
| **Extract** | Parse JSON request body, validasi melalui Pydantic DTO | `PPGIngestRequest` object |
| **Transform** | Buat `PPGSignal` entity, jalankan `entity.validate()` | Validated `PPGSignal` entity |
| **Load** | `ppg_repo.add()` β Supabase; `broker.publish()` β CloudAMQP | Stored record + queued message |
### 6.2. ETL #2 β Processing & Inference
**Dijalankan di**: π§ͺ Kaggle (GPU)
**Trigger**: Message baru di RabbitMQ queue
```
βββββββββββββ βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CloudAMQP β β ETL #2 (Kaggle) β
β (Queue) β β β
β ββββββββ βββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β msg β β EXTRACT ββ β TRANSFORM ββ β LOAD β β
βββββββββββββ β β β β β β β β
β β Consume β β Bandpass β β GAN predict β β
β β message β β filter β β VGTL-Net β β
β β Fetch PPG β β Normalize β β predict β β
β β from DB β β Segment β β Store to DB β β
β βββββββββββββ ββββββββββββββββ ββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
```
| Tahap | Aksi | Output |
|-------|------|--------|
| **Extract** | Consume message dari queue, fetch full PPG signal dari database | `PPGSignal` entity |
| **Transform** | Bandpass filter β Z-score normalization β 8-second segmentation | `np.ndarray` shape `(N, window_size)` |
| **Load** | GAN inference β VGTL-Net inference β `prediction_repo.add()` | `BPPrediction` stored in database |
### 6.3. End-to-End Data Flow
```
π± HP π€ HF Spaces π° CloudAMQP π§ͺ Kaggle βοΈ Supabase
β β β β β
β POST /ppg/ingest β β β β
ββββββββββββββββββββββββββ β β β
β β β β β
β βββ Store raw PPG ββββββββββββββββββββββββββββββββββββββββββββββββ
β β β β β
β βββ Publish message βββββ β β
β β β β β
β β 200 OK (response) β β β β
ββββββββββββββββββββββββββ β β β
β β β β β
β β βββ Consume βββββββββββ β
β β β β β
β β β βββ Fetch raw PPG ββββ
β β β ββββ PPG data ββββββββ
β β β β β
β β β β [ETL #2] β
β β β β Filter β Norm β
β β β β β Segment β
β β β β β
β β β β [Model] β
β β β β GAN β VGTL-Net β
β β β β β
β β β βββ Store predictionββ
β β β β β
```
---
## 7. Project Structure
```
bp-monitoring-pipeline/
β
βββ README.md # Project overview
βββ ARCHITECTURE.md # β¬
Dokumen ini
βββ docker-compose.yml # Local development stack
βββ docker-compose.prod.yml # Production stack
βββ Dockerfile # HF Spaces deployment
βββ pyproject.toml # Project metadata & dependencies
βββ alembic.ini # Database migration config
βββ .env.example # Environment variables template
β
βββ src/
β βββ __init__.py
β β
β βββ domain/ # π’ LAYER 1: Domain (Innermost)
β β βββ __init__.py #
β β βββ entities/ # Core business objects
β β β βββ __init__.py #
β β β βββ ppg_signal.py # PPG signal entity
β β β βββ prediction.py # Blood pressure prediction entity
β β βββ value_objects/ # Immutable value types
β β β βββ __init__.py #
β β β βββ device_info.py # IoT device information
β β β βββ signal_metadata.py # Signal metadata (rate, duration)
β β βββ interfaces/ # Abstract contracts (ABC)
β β β βββ __init__.py #
β β β βββ repositories/ # Data persistence contracts
β β β β βββ __init__.py #
β β β β βββ base_repository.py # Generic CRUD interface
β β β β βββ ppg_repository.py # PPG-specific queries
β β β β βββ prediction_repository.py # Prediction-specific queries
β β β βββ services/ # External service contracts
β β β βββ __init__.py #
β β β βββ message_broker.py # Pub/Sub messaging
β β β βββ signal_processor.py # Signal preprocessing
β β β βββ model_service.py # AI model inference
β β βββ exceptions/ # Domain-specific errors
β β βββ __init__.py #
β β βββ domain_exceptions.py # InvalidSignalError, etc.
β β
β βββ application/ # π΅ LAYER 2: Application
β β βββ __init__.py #
β β βββ use_cases/ # Business use case orchestrators
β β β βββ __init__.py #
β β β βββ ingest_ppg.py # ETL #1: Ingest from mobile
β β β βββ process_and_predict.py # ETL #2: Process + AI predict
β β β βββ get_prediction_history.py # Query historical predictions
β β βββ dto/ # Data Transfer Objects
β β βββ __init__.py #
β β βββ ppg_dto.py # Request/Response for PPG
β β βββ prediction_dto.py # Request/Response for predictions
β β
β βββ infrastructure/ # π LAYER 3: Infrastructure
β β βββ __init__.py #
β β βββ database/ # PostgreSQL implementation
β β β βββ __init__.py #
β β β βββ connection.py # Engine & session factory
β β β βββ models/ # SQLAlchemy ORM models
β β β β βββ __init__.py #
β β β β βββ base.py # Shared base (id, created_at)
β β β β βββ ppg_model.py # raw_ppg table
β β β β βββ prediction_model.py # predictions table
β β β βββ repositories/ # Concrete repository implementations
β β β βββ __init__.py #
β β β βββ sqlalchemy_base.py # Generic CRUD impl (DRY)
β β β βββ ppg_repository.py # PPGRepository impl
β β β βββ prediction_repository.py # PredictionRepository impl
β β βββ messaging/ # RabbitMQ implementation
β β β βββ __init__.py #
β β β βββ rabbitmq_broker.py # MessageBroker impl
β β βββ processing/ # Signal processing implementation
β β β βββ __init__.py #
β β β βββ scipy_signal_processor.py # SignalProcessor impl (SciPy)
β β βββ model/ # AI model implementation
β β βββ __init__.py #
β β βββ mock_model_service.py # Mock for testing
β β βββ gan_vgtlnet_service.py # Production GAN + VGTL-Net
β β
β βββ interface/ # π΄ LAYER 4: Interface (Outermost)
β β βββ __init__.py #
β β βββ api/ # Entry Point 1: REST API (HF Spaces)
β β β βββ __init__.py #
β β β βββ app.py # FastAPI application factory
β β β βββ dependencies.py # Dependency injection wiring
β β β βββ routes/ # HTTP route handlers
β β β βββ __init__.py #
β β β βββ ppg_routes.py # POST /api/v1/ppg/ingest
β β β βββ prediction_routes.py # GET /api/v1/predictions
β β βββ consumer/ # Entry Point 2: MQ Consumer (Colab)
β β βββ __init__.py #
β β βββ run_consumer.py # Standalone async consumer
β β βββ colab_notebook.ipynb # Jupyter notebook for Colab
β β
β βββ shared/ # π£ Cross-cutting Concerns
β βββ __init__.py #
β βββ config.py # Settings from .env (Pydantic)
β βββ logger.py # Logging factory
β βββ constants.py # Named constants (no magic numbers)
β
βββ tests/ # Test suite
β βββ __init__.py #
β βββ unit/ # Unit tests (no external deps)
β β βββ test_entities.py # Domain entity tests
β β βββ test_use_cases.py # Use case tests (mocked deps)
β β βββ test_signal_processor.py # Signal processing tests
β βββ integration/ # Integration tests
β βββ test_api.py # API endpoint tests
β βββ test_rabbitmq.py # Message broker tests
β
βββ migrations/ # Alembic database migrations
β βββ env.py #
β βββ versions/ #
β
βββ scripts/ # Utility scripts
βββ mock_iot_sender.py # Simulate IoT data
βββ seed_database.py # Seed initial data
```
---
## 8. Deployment Architecture
### 8.1. Deployment Topology
```
ββββββββββββββββββββββββββββββββββββ
β GitHub Repository β
β bp-monitoring-pipeline β
βββββββββββββ¬βββββββββββ¬ββββββββββββ
β β
git push β β git clone
(subset) β β (full repo)
β β
ββββββββββββββββββββΌβββ ββββββΌβββββββββββββββββββ
β π€ Hugging Face β β π§ͺ Kaggle β
β Spaces β β (GPU: T4) β
β β β β
β ββββββββββββββββββ β β ββββββββββββββββββββ β
β β Docker β β β β run_consumer.py β β
β β ββββββββββββ β β β β β β
β β β FastAPI β β β β β ETL #2 β β
β β β ETL #1 β β β β β + GAN β β
β β β port 7860β β β β β + VGTL-Net β β
β β ββββββββββββ β β β ββββββββββββββββββββ β
β ββββββββββββββββββ β ββββββββββββββββββββββββββ
βββββββββ¬βββββ¬βββββββββ β β
β β β β
ββββββββββββΌβ ββΌβββββββββββ βββββββΌβββ ββββΌββββββββ
β βοΈSupabase β βπ°CloudAMQPβ ββοΈSupa- β βπ°Cloud- β
β PostgreSQLβ β RabbitMQ β β base β β AMQP β
β (Store) β β (Publish) β β (R/W) β β(Consume)β
βββββββββββββ ββββββββββββββ ββββββββββ βββββββββββ
```
### 8.2. Technology Stack
| Layer | Teknologi | Versi | Justifikasi |
|-------|-----------|-------|-------------|
| **Runtime** | Python | 3.11+ | Async support, type hints, ecosystem ML matang |
| **Web Framework** | FastAPI | β₯ 0.110 | Async-native, auto-docs (Swagger), Pydantic integration |
| **ORM** | SQLAlchemy | β₯ 2.0 | Async support, mature, database-agnostic |
| **Database Driver** | asyncpg | β₯ 0.29 | Fastest Python PostgreSQL async driver |
| **Database** | PostgreSQL (Supabase) | 15 | JSONB support, reliability, free tier 500MB |
| **Message Broker** | RabbitMQ (CloudAMQP) | 3.x | AMQP protocol, persistent messages, free tier 1M msg/month |
| **Message Client** | aio-pika | β₯ 9.4 | Async RabbitMQ client, SSL support |
| **Signal Processing** | SciPy | β₯ 1.12 | Industry-standard scientific computing |
| **Deep Learning** | PyTorch | β₯ 2.0 | GPU inference, GAN + VGTL-Net compatibility |
| **Validation** | Pydantic | β₯ 2.0 | DTO validation, settings management |
| **Containerization** | Docker | β | Reproducible deployments |
| **Hosting (API)** | Hugging Face Spaces | Free | Docker SDK, 2 vCPU, 16GB RAM |
| **Hosting (GPU)** | Kaggle | Free | T4 GPU, 15GB VRAM |
### 8.3. Entry Points
| # | Entry Point | Platform | Command | Fungsi |
|---|-------------|----------|---------|--------|
| 1 | FastAPI Server | HF Spaces | `uvicorn src.interface.api.app:create_app` | Terima PPG dari HP, store, publish |
| 2 | MQ Consumer | Kaggle | `python -m src.interface.consumer.run_consumer` | Consume, preprocess, predict, store |
> **π Kedua entry point berbagi ~85% codebase yang sama.** Yang berbeda hanyalah bootstrapping dan dependency wiring.
---
## 9. Configuration Management
Seluruh konfigurasi dikelola melalui **environment variables** menggunakan Pydantic Settings. Tidak ada credentials yang di-hardcode.
### Environment Variables
| Variable | Deskripsi | Contoh | Wajib |
|----------|-----------|--------|:-----:|
| `DATABASE_URL` | PostgreSQL connection string (async) | `postgresql+asyncpg://user:pass@host:6543/db` | β
|
| `RABBITMQ_URL` | RabbitMQ/CloudAMQP connection string | `amqps://user:pass@host/vhost` | β
|
| `APP_PORT` | Port untuk FastAPI (default: 7860 untuk HF) | `7860` | β |
| `APP_HOST` | Host binding (default: 0.0.0.0) | `0.0.0.0` | β |
| `DEBUG` | Enable debug mode | `false` | β |
| `LOG_LEVEL` | Logging level | `INFO` | β |
### Secrets Management per Platform
| Platform | Metode | Lokasi |
|----------|--------|--------|
| HF Spaces | Space Secrets | Settings β Variables and Secrets |
| Kaggle | Colab Secrets | π Sidebar β Secrets |
| Local Development | `.env` file | Root project directory |
---
## 10. Testing Strategy
### Testing Pyramid
```
β±β²
β± β² E2E Tests
β± β² (curl β HF Spaces β Supabase β Colab)
β±βββββββ²
β± β² Integration Tests
β± β² (API + real DB, Consumer + real Queue)
β±βββββββββββββ²
β± β² Unit Tests
β± β² (Entities, Use Cases, Processors β mocked deps)
β±βββββββββββββββββββ²
```
### Test Categories
| Kategori | Scope | Dependencies | Tools |
|----------|-------|-------------|-------|
| **Unit** | Domain entities, value objects | Tidak ada | `pytest` |
| **Unit** | Use cases | Mocked repositories & services | `pytest`, `unittest.mock` |
| **Unit** | Signal processor | Tidak ada (pure NumPy/SciPy) | `pytest`, `numpy.testing` |
| **Integration** | API endpoints | Test database (SQLite/PostgreSQL) | `pytest`, `httpx.AsyncClient` |
| **Integration** | Message broker | Test RabbitMQ (Docker) | `pytest`, `aio-pika` |
| **E2E** | Full pipeline | All services running | `curl`, `pytest` |
### Testability by Layer
| Layer | Testable Without External Deps? | Bagaimana |
|-------|:-------------------------------:|-----------|
| Domain | β
Ya | Pure Python β tidak ada dependency |
| Application | β
Ya | Inject mock repositories & services |
| Infrastructure | β οΈ Perlu test DB/Queue | Gunakan testcontainers atau in-memory substitutes |
| Interface | β οΈ Perlu running app | Gunakan `TestClient` (FastAPI) |
---
## 11. Glossary
| Istilah | Definisi |
|---------|----------|
| **PPG** | Photoplethysmography β teknik optik non-invasif untuk mengukur perubahan volume darah dalam jaringan mikrovaskular |
| **ABP** | Arterial Blood Pressure β tekanan darah arteri yang diprediksi oleh model |
| **SBP** | Systolic Blood Pressure β tekanan darah saat jantung berkontraksi (angka atas) |
| **DBP** | Diastolic Blood Pressure β tekanan darah saat jantung berelaksasi (angka bawah) |
| **GAN** | Generative Adversarial Network β arsitektur deep learning untuk mentranslasikan sinyal PPG ke ECG |
| **VGTL-Net** | Model deep learning yang menerima PPG + ECG untuk memprediksi ABP |
| **ETL** | Extract-Transform-Load β pola pipeline data |
| **Entity** | Objek domain dengan identitas unik dan lifecycle |
| **Value Object** | Objek domain immutable yang didefinisikan oleh atributnya |
| **DTO** | Data Transfer Object β objek untuk transfer data antar layer boundary |
| **Repository** | Abstraksi akses data yang menyembunyikan detail persistence |
| **Use Case** | Orkestrasi satu unit business logic yang spesifik |
| **Message Broker** | Middleware yang mengelola antrian pesan antara producer dan consumer |
| **Dependency Injection** | Teknik menyediakan dependency dari luar, bukan membuat di dalam class |
---
## 12. References
| # | Referensi | Penulis |
|---|-----------|---------|
| 1 | *Clean Architecture: A Craftsman's Guide to Software Structure and Design* | Robert C. Martin (2017) |
| 2 | *Design Patterns: Elements of Reusable Object-Oriented Software* | Gamma, Helm, Johnson, Vlissides (1994) |
| 3 | *Patterns of Enterprise Application Architecture* | Martin Fowler (2002) |
| 4 | *Domain-Driven Design: Tackling Complexity in the Heart of Software* | Eric Evans (2003) |
| 5 | FastAPI Documentation | https://fastapi.tiangolo.com |
| 6 | SQLAlchemy 2.0 Documentation | https://docs.sqlalchemy.org |
| 7 | RabbitMQ Tutorials | https://www.rabbitmq.com/tutorials |
| 8 | Hugging Face Spaces Docker SDK | https://huggingface.co/docs/hub/spaces-sdks-docker |
---
---
*Dokumen ini di-maintain oleh Data Engineering Team.*
*Untuk pertanyaan, silakan buat issue di repository GitHub.*
**Β© 2026 BP Monitoring Pipeline β All rights reserved.**