| """ |
| scripts/mock_iot_sender.py |
| ─────────────────────────── |
| CLI script that simulates an IoT device sending PPG signals to the API. |
| |
| Usage: |
| python scripts/mock_iot_sender.py |
| python scripts/mock_iot_sender.py --url http://localhost:7860 --count 5 |
| python scripts/mock_iot_sender.py --user patient-42 --rate 250 |
| |
| Useful for: |
| • Local testing without a real IoT sensor |
| • Load testing the ingest endpoint |
| • Seeding the message queue for consumer testing |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import asyncio |
| import math |
| import random |
| import time |
| import uuid |
| from typing import Optional |
|
|
| import httpx |
|
|
|
|
| def generate_ppg_signal( |
| sampling_rate: float = 125.0, |
| duration_seconds: float = 10.0, |
| heart_rate_bpm: float = 70.0, |
| noise_level: float = 0.05, |
| ) -> list[float]: |
| """ |
| Generate a synthetic PPG waveform (sinusoidal approximation). |
| |
| Args: |
| sampling_rate: Samples per second (Hz). |
| duration_seconds: Signal duration in seconds. |
| heart_rate_bpm: Simulated heart rate (beats per minute). |
| noise_level: Gaussian noise standard deviation. |
| |
| Returns: |
| List of float PPG amplitude values. |
| """ |
| n_samples = int(sampling_rate * duration_seconds) |
| heart_rate_hz = heart_rate_bpm / 60.0 |
|
|
| values = [] |
| for i in range(n_samples): |
| t = i / sampling_rate |
| |
| fundamental = math.sin(2 * math.pi * heart_rate_hz * t) |
| |
| harmonic = 0.3 * math.sin(2 * math.pi * 2 * heart_rate_hz * t - 0.5) |
| |
| noise = random.gauss(0, noise_level) |
| ppg = fundamental + harmonic + noise |
| values.append(round(ppg, 6)) |
|
|
| return values |
|
|
|
|
| async def send_ppg( |
| client: httpx.AsyncClient, |
| url: str, |
| device_id: str, |
| user_id: str, |
| sampling_rate: float, |
| duration: float, |
| index: int, |
| ) -> None: |
| """Send one PPG signal to the ingest endpoint.""" |
| hr = random.uniform(60, 100) |
| ppg_values = generate_ppg_signal( |
| sampling_rate=sampling_rate, |
| duration_seconds=duration, |
| heart_rate_bpm=hr, |
| ) |
|
|
| payload = { |
| "device_id": device_id, |
| "user_id": user_id, |
| "sampling_rate": sampling_rate, |
| "ppg_values": ppg_values, |
| "duration_seconds": duration, |
| } |
|
|
| start = time.perf_counter() |
| try: |
| response = await client.post(f"{url}/api/v1/ppg/ingest", json=payload, timeout=30.0) |
| elapsed = (time.perf_counter() - start) * 1000 |
| if response.status_code == 201: |
| data = response.json() |
| print( |
| f"[{index:3d}] ✅ signal_id={data['signal_id'][:8]}… " |
| f"samples={data['num_samples']} queued={data['queued']} " |
| f"({elapsed:.0f}ms)" |
| ) |
| else: |
| print(f"[{index:3d}] ❌ HTTP {response.status_code}: {response.text[:100]}") |
| except Exception as exc: |
| print(f"[{index:3d}] 💥 Error: {exc}") |
|
|
|
|
| async def main( |
| url: str, |
| count: int, |
| user_id: str, |
| device_id: str, |
| rate: float, |
| duration: float, |
| interval: float, |
| ) -> None: |
| """Send `count` PPG signals with optional interval between each.""" |
| print(f"🩺 Mock IoT Sender") |
| print(f" Target : {url}") |
| print(f" User : {user_id}") |
| print(f" Device : {device_id}") |
| print(f" Count : {count}") |
| print(f" Rate : {rate} Hz") |
| print(f" Duration: {duration}s") |
| print(f" Interval: {interval}s") |
| print("-" * 60) |
|
|
| async with httpx.AsyncClient() as client: |
| for i in range(1, count + 1): |
| await send_ppg(client, url, device_id, user_id, rate, duration, i) |
| if i < count and interval > 0: |
| await asyncio.sleep(interval) |
|
|
| print("-" * 60) |
| print(f"✅ Done — sent {count} signals.") |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser(description="Mock IoT PPG Signal Sender") |
| parser.add_argument("--url", default="http://localhost:7860", help="API base URL") |
| parser.add_argument("--count", type=int, default=5, help="Number of signals to send") |
| parser.add_argument("--user", default=f"user-{uuid.uuid4().hex[:6]}", help="User ID") |
| parser.add_argument("--device", default="mock-sensor-001", help="Device ID") |
| parser.add_argument("--rate", type=float, default=125.0, help="Sampling rate (Hz)") |
| parser.add_argument("--duration", type=float, default=10.0, help="Signal duration (seconds)") |
| parser.add_argument("--interval", type=float, default=1.0, help="Seconds between signals") |
| args = parser.parse_args() |
|
|
| asyncio.run( |
| main( |
| url=args.url, |
| count=args.count, |
| user_id=args.user, |
| device_id=args.device, |
| rate=args.rate, |
| duration=args.duration, |
| interval=args.interval, |
| ) |
| ) |
|
|