verd / scan.py
JibexBanks's picture
made changes to the code
8115669
"""
scan.py β€” Core analysis engine.
Takes a drone image, splits it into a GRID_ROWS Γ— GRID_COLS grid,
simulates disease prediction for each patch (seeded from image content
so the same image always produces the same result), and returns the
full zone grid with metadata.
"""
import hashlib
import random
import math
from PIL import Image
import io
from disease_data import DISEASES, DISEASE_MAP
GRID_ROWS = 8
GRID_COLS = 8
# Diseases that can appear as hotspot clusters in a scan
FIELD_DISEASES = [d for d in DISEASES if d["name"] != "Healthy"]
def _image_seed(image_bytes: bytes) -> int:
"""Derive a stable integer seed from the image content."""
digest = hashlib.md5(image_bytes).hexdigest()
return int(digest[:8], 16)
def _get_image_size(image_bytes: bytes):
img = Image.open(io.BytesIO(image_bytes))
return img.width, img.height
def _severity_label(s: float) -> str:
if s >= 0.75: return "critical"
if s >= 0.45: return "moderate"
if s >= 0.20: return "low"
return "healthy"
def _pick_diseases_for_field(rng: random.Random) -> list:
"""
Pick 1–2 diseases that will appear as hotspot clusters in this field.
Weighted: cassava diseases appear more often (West African context).
"""
cassava = [d for d in FIELD_DISEASES if d["crop"] == "cassava"]
others = [d for d in FIELD_DISEASES if d["crop"] != "cassava"]
primary = rng.choice(cassava)
secondary = rng.choice(others) if rng.random() > 0.4 else None
return [d for d in [primary, secondary] if d is not None]
def _build_severity_grid(rng: random.Random) -> list[list[float]]:
"""
Generate a realistic severity field using gaussian hotspot clusters.
Returns GRID_ROWS Γ— GRID_COLS grid of severity values (0.0–1.0).
"""
grid = [[0.0] * GRID_COLS for _ in range(GRID_ROWS)]
# Place 2–4 hotspot centres
n_hotspots = rng.randint(2, 4)
hotspots = []
for _ in range(n_hotspots):
r = rng.uniform(0, GRID_ROWS - 1)
c = rng.uniform(0, GRID_COLS - 1)
strength = rng.uniform(0.7, 1.0)
spread = rng.uniform(1.2, 2.8)
hotspots.append((r, c, strength, spread))
for row in range(GRID_ROWS):
for col in range(GRID_COLS):
val = 0.0
for (hr, hc, strength, spread) in hotspots:
dist = math.sqrt((row - hr) ** 2 + (col - hc) ** 2)
val = max(val, strength * math.exp(-(dist ** 2) / (2 * spread ** 2)))
# Add small background noise
val = min(1.0, val + rng.uniform(0, 0.08))
grid[row][col] = round(val, 3)
return grid
def _assign_diseases(severity_grid, diseases: list, rng: random.Random) -> list[list[dict]]:
"""
Map severity values to disease labels.
High-severity cells β†’ primary disease.
Mid-severity cells β†’ secondary disease (if exists).
Low-severity cells β†’ Healthy.
"""
primary = diseases[0]
secondary = diseases[1] if len(diseases) > 1 else None
healthy = DISEASE_MAP["Healthy"]
zones = []
for row in range(GRID_ROWS):
row_zones = []
for col in range(GRID_COLS):
s = severity_grid[row][col]
label = _severity_label(s)
if label in ("critical", "moderate"):
disease = primary
elif label == "low" and secondary:
disease = secondary if rng.random() > 0.5 else primary
elif label == "low":
disease = primary
else:
disease = healthy
confidence = round(rng.uniform(0.78, 0.97), 2) if label != "healthy" else round(rng.uniform(0.88, 0.99), 2)
row_zones.append({
"row": row,
"col": col,
"severity": s,
"severity_label": label,
"disease": disease["name"],
"confidence": confidence,
"treatments": disease["treatments"],
"yield_impact": disease["yield_impact"],
"spread_note": disease["spread"],
})
zones.append(row_zones)
return zones
def analyze_image(image_bytes: bytes, filename: str = "scan.jpg") -> dict:
"""
Main entry point. Returns full scan result dict ready for JSON response.
"""
seed = _image_seed(image_bytes)
rng = random.Random(seed)
width, height = _get_image_size(image_bytes)
diseases = _pick_diseases_for_field(rng)
severity_grid = _build_severity_grid(rng)
zone_grid = _assign_diseases(severity_grid, diseases, rng)
# Flatten zones for response
flat_zones = [zone for row in zone_grid for zone in row]
# Summary stats
critical = sum(1 for z in flat_zones if z["severity_label"] == "critical")
moderate = sum(1 for z in flat_zones if z["severity_label"] == "moderate")
low = sum(1 for z in flat_zones if z["severity_label"] == "low")
healthy = sum(1 for z in flat_zones if z["severity_label"] == "healthy")
total = len(flat_zones)
affected_pct = round((critical + moderate + low) / total * 100, 1)
unique_diseases = list({z["disease"] for z in flat_zones if z["disease"] != "Healthy"})
return {
"scan_id": f"SCN-{seed % 99999:05d}",
"filename": filename,
"image_width": width,
"image_height": height,
"grid_rows": GRID_ROWS,
"grid_cols": GRID_COLS,
"zones": flat_zones,
"summary": {
"critical_zones": critical,
"moderate_zones": moderate,
"low_zones": low,
"healthy_zones": healthy,
"total_zones": total,
"affected_pct": affected_pct,
"diseases_detected": unique_diseases,
"recommended_action": (
"Immediate intervention required" if critical > 8 else
"Targeted treatment recommended" if moderate > 8 else
"Monitor closely" if low > 10 else
"Field is largely healthy"
),
},
}