File size: 5,021 Bytes
e391a84 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | """
scripts/mock_iot_sender.py
βββββββββββββββββββββββββββ
CLI script that simulates an IoT device sending PPG signals to the API.
Usage:
python scripts/mock_iot_sender.py
python scripts/mock_iot_sender.py --url http://localhost:7860 --count 5
python scripts/mock_iot_sender.py --user patient-42 --rate 250
Useful for:
β’ Local testing without a real IoT sensor
β’ Load testing the ingest endpoint
β’ Seeding the message queue for consumer testing
"""
from __future__ import annotations
import argparse
import asyncio
import math
import random
import time
import uuid
from typing import Optional
import httpx
def generate_ppg_signal(
sampling_rate: float = 125.0,
duration_seconds: float = 10.0,
heart_rate_bpm: float = 70.0,
noise_level: float = 0.05,
) -> list[float]:
"""
Generate a synthetic PPG waveform (sinusoidal approximation).
Args:
sampling_rate: Samples per second (Hz).
duration_seconds: Signal duration in seconds.
heart_rate_bpm: Simulated heart rate (beats per minute).
noise_level: Gaussian noise standard deviation.
Returns:
List of float PPG amplitude values.
"""
n_samples = int(sampling_rate * duration_seconds)
heart_rate_hz = heart_rate_bpm / 60.0
values = []
for i in range(n_samples):
t = i / sampling_rate
# Main pulsatile component (fundamental frequency)
fundamental = math.sin(2 * math.pi * heart_rate_hz * t)
# Second harmonic (dicrotic notch approximation)
harmonic = 0.3 * math.sin(2 * math.pi * 2 * heart_rate_hz * t - 0.5)
# Gaussian noise
noise = random.gauss(0, noise_level)
ppg = fundamental + harmonic + noise
values.append(round(ppg, 6))
return values
async def send_ppg(
client: httpx.AsyncClient,
url: str,
device_id: str,
user_id: str,
sampling_rate: float,
duration: float,
index: int,
) -> None:
"""Send one PPG signal to the ingest endpoint."""
hr = random.uniform(60, 100)
ppg_values = generate_ppg_signal(
sampling_rate=sampling_rate,
duration_seconds=duration,
heart_rate_bpm=hr,
)
payload = {
"device_id": device_id,
"user_id": user_id,
"sampling_rate": sampling_rate,
"ppg_values": ppg_values,
"duration_seconds": duration,
}
start = time.perf_counter()
try:
response = await client.post(f"{url}/api/v1/ppg/ingest", json=payload, timeout=30.0)
elapsed = (time.perf_counter() - start) * 1000
if response.status_code == 201:
data = response.json()
print(
f"[{index:3d}] β
signal_id={data['signal_id'][:8]}β¦ "
f"samples={data['num_samples']} queued={data['queued']} "
f"({elapsed:.0f}ms)"
)
else:
print(f"[{index:3d}] β HTTP {response.status_code}: {response.text[:100]}")
except Exception as exc:
print(f"[{index:3d}] π₯ Error: {exc}")
async def main(
url: str,
count: int,
user_id: str,
device_id: str,
rate: float,
duration: float,
interval: float,
) -> None:
"""Send `count` PPG signals with optional interval between each."""
print(f"π©Ί Mock IoT Sender")
print(f" Target : {url}")
print(f" User : {user_id}")
print(f" Device : {device_id}")
print(f" Count : {count}")
print(f" Rate : {rate} Hz")
print(f" Duration: {duration}s")
print(f" Interval: {interval}s")
print("-" * 60)
async with httpx.AsyncClient() as client:
for i in range(1, count + 1):
await send_ppg(client, url, device_id, user_id, rate, duration, i)
if i < count and interval > 0:
await asyncio.sleep(interval)
print("-" * 60)
print(f"β
Done β sent {count} signals.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Mock IoT PPG Signal Sender")
parser.add_argument("--url", default="http://localhost:7860", help="API base URL")
parser.add_argument("--count", type=int, default=5, help="Number of signals to send")
parser.add_argument("--user", default=f"user-{uuid.uuid4().hex[:6]}", help="User ID")
parser.add_argument("--device", default="mock-sensor-001", help="Device ID")
parser.add_argument("--rate", type=float, default=125.0, help="Sampling rate (Hz)")
parser.add_argument("--duration", type=float, default=10.0, help="Signal duration (seconds)")
parser.add_argument("--interval", type=float, default=1.0, help="Seconds between signals")
args = parser.parse_args()
asyncio.run(
main(
url=args.url,
count=args.count,
user_id=args.user,
device_id=args.device,
rate=args.rate,
duration=args.duration,
interval=args.interval,
)
)
|