Spaces:
Sleeping
Sleeping
| """ | |
| scan.py β Core analysis engine. | |
| Takes a drone image, splits it into a GRID_ROWS Γ GRID_COLS grid, | |
| simulates disease prediction for each patch (seeded from image content | |
| so the same image always produces the same result), and returns the | |
| full zone grid with metadata. | |
| """ | |
| import hashlib | |
| import random | |
| import math | |
| from PIL import Image | |
| import io | |
| from disease_data import DISEASES, DISEASE_MAP | |
| GRID_ROWS = 8 | |
| GRID_COLS = 8 | |
| # Diseases that can appear as hotspot clusters in a scan | |
| FIELD_DISEASES = [d for d in DISEASES if d["name"] != "Healthy"] | |
| def _image_seed(image_bytes: bytes) -> int: | |
| """Derive a stable integer seed from the image content.""" | |
| digest = hashlib.md5(image_bytes).hexdigest() | |
| return int(digest[:8], 16) | |
| def _get_image_size(image_bytes: bytes): | |
| img = Image.open(io.BytesIO(image_bytes)) | |
| return img.width, img.height | |
| def _severity_label(s: float) -> str: | |
| if s >= 0.75: return "critical" | |
| if s >= 0.45: return "moderate" | |
| if s >= 0.20: return "low" | |
| return "healthy" | |
| def _pick_diseases_for_field(rng: random.Random) -> list: | |
| """ | |
| Pick 1β2 diseases that will appear as hotspot clusters in this field. | |
| Weighted: cassava diseases appear more often (West African context). | |
| """ | |
| cassava = [d for d in FIELD_DISEASES if d["crop"] == "cassava"] | |
| others = [d for d in FIELD_DISEASES if d["crop"] != "cassava"] | |
| primary = rng.choice(cassava) | |
| secondary = rng.choice(others) if rng.random() > 0.4 else None | |
| return [d for d in [primary, secondary] if d is not None] | |
| def _build_severity_grid(rng: random.Random) -> list[list[float]]: | |
| """ | |
| Generate a realistic severity field using gaussian hotspot clusters. | |
| Returns GRID_ROWS Γ GRID_COLS grid of severity values (0.0β1.0). | |
| """ | |
| grid = [[0.0] * GRID_COLS for _ in range(GRID_ROWS)] | |
| # Place 2β4 hotspot centres | |
| n_hotspots = rng.randint(2, 4) | |
| hotspots = [] | |
| for _ in range(n_hotspots): | |
| r = rng.uniform(0, GRID_ROWS - 1) | |
| c = rng.uniform(0, GRID_COLS - 1) | |
| strength = rng.uniform(0.7, 1.0) | |
| spread = rng.uniform(1.2, 2.8) | |
| hotspots.append((r, c, strength, spread)) | |
| for row in range(GRID_ROWS): | |
| for col in range(GRID_COLS): | |
| val = 0.0 | |
| for (hr, hc, strength, spread) in hotspots: | |
| dist = math.sqrt((row - hr) ** 2 + (col - hc) ** 2) | |
| val = max(val, strength * math.exp(-(dist ** 2) / (2 * spread ** 2))) | |
| # Add small background noise | |
| val = min(1.0, val + rng.uniform(0, 0.08)) | |
| grid[row][col] = round(val, 3) | |
| return grid | |
| def _assign_diseases(severity_grid, diseases: list, rng: random.Random) -> list[list[dict]]: | |
| """ | |
| Map severity values to disease labels. | |
| High-severity cells β primary disease. | |
| Mid-severity cells β secondary disease (if exists). | |
| Low-severity cells β Healthy. | |
| """ | |
| primary = diseases[0] | |
| secondary = diseases[1] if len(diseases) > 1 else None | |
| healthy = DISEASE_MAP["Healthy"] | |
| zones = [] | |
| for row in range(GRID_ROWS): | |
| row_zones = [] | |
| for col in range(GRID_COLS): | |
| s = severity_grid[row][col] | |
| label = _severity_label(s) | |
| if label in ("critical", "moderate"): | |
| disease = primary | |
| elif label == "low" and secondary: | |
| disease = secondary if rng.random() > 0.5 else primary | |
| elif label == "low": | |
| disease = primary | |
| else: | |
| disease = healthy | |
| confidence = round(rng.uniform(0.78, 0.97), 2) if label != "healthy" else round(rng.uniform(0.88, 0.99), 2) | |
| row_zones.append({ | |
| "row": row, | |
| "col": col, | |
| "severity": s, | |
| "severity_label": label, | |
| "disease": disease["name"], | |
| "confidence": confidence, | |
| "treatments": disease["treatments"], | |
| "yield_impact": disease["yield_impact"], | |
| "spread_note": disease["spread"], | |
| }) | |
| zones.append(row_zones) | |
| return zones | |
| def analyze_image(image_bytes: bytes, filename: str = "scan.jpg") -> dict: | |
| """ | |
| Main entry point. Returns full scan result dict ready for JSON response. | |
| """ | |
| seed = _image_seed(image_bytes) | |
| rng = random.Random(seed) | |
| width, height = _get_image_size(image_bytes) | |
| diseases = _pick_diseases_for_field(rng) | |
| severity_grid = _build_severity_grid(rng) | |
| zone_grid = _assign_diseases(severity_grid, diseases, rng) | |
| # Flatten zones for response | |
| flat_zones = [zone for row in zone_grid for zone in row] | |
| # Summary stats | |
| critical = sum(1 for z in flat_zones if z["severity_label"] == "critical") | |
| moderate = sum(1 for z in flat_zones if z["severity_label"] == "moderate") | |
| low = sum(1 for z in flat_zones if z["severity_label"] == "low") | |
| healthy = sum(1 for z in flat_zones if z["severity_label"] == "healthy") | |
| total = len(flat_zones) | |
| affected_pct = round((critical + moderate + low) / total * 100, 1) | |
| unique_diseases = list({z["disease"] for z in flat_zones if z["disease"] != "Healthy"}) | |
| return { | |
| "scan_id": f"SCN-{seed % 99999:05d}", | |
| "filename": filename, | |
| "image_width": width, | |
| "image_height": height, | |
| "grid_rows": GRID_ROWS, | |
| "grid_cols": GRID_COLS, | |
| "zones": flat_zones, | |
| "summary": { | |
| "critical_zones": critical, | |
| "moderate_zones": moderate, | |
| "low_zones": low, | |
| "healthy_zones": healthy, | |
| "total_zones": total, | |
| "affected_pct": affected_pct, | |
| "diseases_detected": unique_diseases, | |
| "recommended_action": ( | |
| "Immediate intervention required" if critical > 8 else | |
| "Targeted treatment recommended" if moderate > 8 else | |
| "Monitor closely" if low > 10 else | |
| "Field is largely healthy" | |
| ), | |
| }, | |
| } | |