ground-zero / src /iot /sensor_bridge.py
jefffffff9
Initial commit: Sahel-Agri Voice AI
76db545
Raw
History Blame Contribute Delete
4.77 kB
"""
Fetches sensor data (soil moisture, weather, irrigation) from the IoT backend API.
Falls back to synthetic mock data when SENSOR_API_URL is not configured.
"""
from __future__ import annotations
import logging
import random
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from src.iot.intent_parser import Intent
logger = logging.getLogger(__name__)
@dataclass
class SensorData:
sensor_type: str
values: dict[str, float]
timestamp: str
unit: str = ""
class SensorBridge:
"""Async bridge to IoT sensor API. Uses mock data when no API URL is configured."""
def __init__(self, sensor_api_url: str | None = None, timeout_s: float = 5.0) -> None:
self.sensor_api_url = sensor_api_url
self.timeout_s = timeout_s
self._mock_mode = not sensor_api_url
if self._mock_mode:
logger.info("SensorBridge: running in MOCK mode (set SENSOR_API_URL to use real sensors).")
async def fetch(self, intent: "Intent", field_id: str | None = None) -> SensorData:
"""Dispatch to the correct sensor fetch method based on intent entity."""
action = intent.action
if action == "check_soil":
return await self.get_soil_data(field_id or "default")
elif action == "check_weather":
return await self.get_weather(field_id or "default")
elif action == "irrigation_status":
return await self.get_irrigation(field_id or "default")
elif action == "pest_alert":
return await self.get_pest_status(field_id or "default")
else:
return SensorData(
sensor_type="unknown",
values={},
timestamp=datetime.utcnow().isoformat(),
)
async def get_soil_data(self, location_id: str) -> SensorData:
if self._mock_mode:
return SensorData(
sensor_type="soil",
values={
"moisture_pct": round(random.uniform(25, 65), 1),
"ph": round(random.uniform(5.5, 7.5), 1),
"nitrogen_ppm": round(random.uniform(10, 40), 1),
"temperature_c": round(random.uniform(24, 35), 1),
},
timestamp=datetime.utcnow().isoformat(),
)
return await self._get(f"/sensors/soil/{location_id}", "soil")
async def get_weather(self, location_id: str) -> SensorData:
if self._mock_mode:
return SensorData(
sensor_type="weather",
values={
"temperature_c": round(random.uniform(28, 42), 1),
"humidity_pct": round(random.uniform(20, 80), 1),
"wind_speed_kmh": round(random.uniform(0, 25), 1),
"rain_probability_pct": round(random.uniform(0, 100), 1),
},
timestamp=datetime.utcnow().isoformat(),
)
return await self._get(f"/sensors/weather/{location_id}", "weather")
async def get_irrigation(self, field_id: str) -> SensorData:
if self._mock_mode:
return SensorData(
sensor_type="irrigation",
values={
"flow_rate_lph": round(random.uniform(0, 500), 1),
"pressure_bar": round(random.uniform(1.0, 4.0), 2),
"active": float(random.choice([0, 1])),
"last_irrigation_h_ago": round(random.uniform(1, 48), 1),
},
timestamp=datetime.utcnow().isoformat(),
)
return await self._get(f"/sensors/irrigation/{field_id}", "irrigation")
async def get_pest_status(self, field_id: str) -> SensorData:
if self._mock_mode:
return SensorData(
sensor_type="pest",
values={
"trap_count_24h": float(random.randint(0, 50)),
"alert_level": float(random.randint(0, 3)), # 0=none 1=low 2=medium 3=high
},
timestamp=datetime.utcnow().isoformat(),
)
return await self._get(f"/sensors/pest/{field_id}", "pest")
async def _get(self, path: str, sensor_type: str) -> SensorData:
import httpx
url = f"{self.sensor_api_url}{path}"
async with httpx.AsyncClient(timeout=self.timeout_s) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
return SensorData(
sensor_type=sensor_type,
values=data.get("values", data),
timestamp=data.get("timestamp", datetime.utcnow().isoformat()),
)