LIBRE / scripts /mock_iot_sender.py
RyZ
feat: adding full working local ETL Pipeline
e391a84
Raw
History Blame Contribute Delete
5.02 kB
"""
scripts/mock_iot_sender.py
───────────────────────────
CLI script that simulates an IoT device sending PPG signals to the API.
Usage:
python scripts/mock_iot_sender.py
python scripts/mock_iot_sender.py --url http://localhost:7860 --count 5
python scripts/mock_iot_sender.py --user patient-42 --rate 250
Useful for:
• Local testing without a real IoT sensor
• Load testing the ingest endpoint
• Seeding the message queue for consumer testing
"""
from __future__ import annotations
import argparse
import asyncio
import math
import random
import time
import uuid
from typing import Optional
import httpx
def generate_ppg_signal(
sampling_rate: float = 125.0,
duration_seconds: float = 10.0,
heart_rate_bpm: float = 70.0,
noise_level: float = 0.05,
) -> list[float]:
"""
Generate a synthetic PPG waveform (sinusoidal approximation).
Args:
sampling_rate: Samples per second (Hz).
duration_seconds: Signal duration in seconds.
heart_rate_bpm: Simulated heart rate (beats per minute).
noise_level: Gaussian noise standard deviation.
Returns:
List of float PPG amplitude values.
"""
n_samples = int(sampling_rate * duration_seconds)
heart_rate_hz = heart_rate_bpm / 60.0
values = []
for i in range(n_samples):
t = i / sampling_rate
# Main pulsatile component (fundamental frequency)
fundamental = math.sin(2 * math.pi * heart_rate_hz * t)
# Second harmonic (dicrotic notch approximation)
harmonic = 0.3 * math.sin(2 * math.pi * 2 * heart_rate_hz * t - 0.5)
# Gaussian noise
noise = random.gauss(0, noise_level)
ppg = fundamental + harmonic + noise
values.append(round(ppg, 6))
return values
async def send_ppg(
client: httpx.AsyncClient,
url: str,
device_id: str,
user_id: str,
sampling_rate: float,
duration: float,
index: int,
) -> None:
"""Send one PPG signal to the ingest endpoint."""
hr = random.uniform(60, 100)
ppg_values = generate_ppg_signal(
sampling_rate=sampling_rate,
duration_seconds=duration,
heart_rate_bpm=hr,
)
payload = {
"device_id": device_id,
"user_id": user_id,
"sampling_rate": sampling_rate,
"ppg_values": ppg_values,
"duration_seconds": duration,
}
start = time.perf_counter()
try:
response = await client.post(f"{url}/api/v1/ppg/ingest", json=payload, timeout=30.0)
elapsed = (time.perf_counter() - start) * 1000
if response.status_code == 201:
data = response.json()
print(
f"[{index:3d}] ✅ signal_id={data['signal_id'][:8]}… "
f"samples={data['num_samples']} queued={data['queued']} "
f"({elapsed:.0f}ms)"
)
else:
print(f"[{index:3d}] ❌ HTTP {response.status_code}: {response.text[:100]}")
except Exception as exc:
print(f"[{index:3d}] 💥 Error: {exc}")
async def main(
url: str,
count: int,
user_id: str,
device_id: str,
rate: float,
duration: float,
interval: float,
) -> None:
"""Send `count` PPG signals with optional interval between each."""
print(f"🩺 Mock IoT Sender")
print(f" Target : {url}")
print(f" User : {user_id}")
print(f" Device : {device_id}")
print(f" Count : {count}")
print(f" Rate : {rate} Hz")
print(f" Duration: {duration}s")
print(f" Interval: {interval}s")
print("-" * 60)
async with httpx.AsyncClient() as client:
for i in range(1, count + 1):
await send_ppg(client, url, device_id, user_id, rate, duration, i)
if i < count and interval > 0:
await asyncio.sleep(interval)
print("-" * 60)
print(f"✅ Done — sent {count} signals.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Mock IoT PPG Signal Sender")
parser.add_argument("--url", default="http://localhost:7860", help="API base URL")
parser.add_argument("--count", type=int, default=5, help="Number of signals to send")
parser.add_argument("--user", default=f"user-{uuid.uuid4().hex[:6]}", help="User ID")
parser.add_argument("--device", default="mock-sensor-001", help="Device ID")
parser.add_argument("--rate", type=float, default=125.0, help="Sampling rate (Hz)")
parser.add_argument("--duration", type=float, default=10.0, help="Signal duration (seconds)")
parser.add_argument("--interval", type=float, default=1.0, help="Seconds between signals")
args = parser.parse_args()
asyncio.run(
main(
url=args.url,
count=args.count,
user_id=args.user,
device_id=args.device,
rate=args.rate,
duration=args.duration,
interval=args.interval,
)
)