KSvend Claude Opus 4.6 (1M context) commited on
Commit
ae4c60c
·
1 Parent(s): cbe084f

feat: add cross-indicator compound signal detection

Browse files

Implements compound signal module with 4 detection patterns:
- land_conversion: NDVI decline + settlement growth
- flood_event: SAR backscatter decrease + water increase
- drought_stress: NDVI decline + water decline + SAR increase
- displacement_pressure: settlement growth adjacent to vegetation loss

Includes overlap calculation, confidence tagging, and comprehensive tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Files changed (2) hide show
  1. app/analysis/compound.py +109 -0
  2. tests/test_compound.py +54 -0
app/analysis/compound.py ADDED
@@ -0,0 +1,109 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Cross-indicator compound signal detection."""
2
+ from __future__ import annotations
3
+
4
+ import numpy as np
5
+
6
+ from app.models import CompoundSignal
7
+
8
+
9
+ def compute_overlap_pct(mask_a: np.ndarray, mask_b: np.ndarray) -> float:
10
+ """Compute overlap percentage: intersection / min(count_a, count_b) * 100."""
11
+ intersection = np.sum(mask_a & mask_b)
12
+ min_count = min(np.sum(mask_a), np.sum(mask_b))
13
+ if min_count == 0:
14
+ return 0.0
15
+ return float(intersection / min_count * 100)
16
+
17
+
18
+ def _tag_confidence(n_indicators: int, overlap_pct: float) -> str:
19
+ if n_indicators >= 3 and overlap_pct > 20:
20
+ return "strong"
21
+ if n_indicators >= 2 and overlap_pct >= 10:
22
+ return "moderate"
23
+ return "weak"
24
+
25
+
26
+ def detect_compound_signals(
27
+ zscore_rasters: dict[str, np.ndarray],
28
+ pixel_area_ha: float,
29
+ threshold: float = 2.0,
30
+ ) -> list[CompoundSignal]:
31
+ """Test for compound signal patterns across indicator z-score rasters."""
32
+ decline: dict[str, np.ndarray] = {}
33
+ increase: dict[str, np.ndarray] = {}
34
+ for ind_id, z in zscore_rasters.items():
35
+ decline[ind_id] = z < -threshold
36
+ increase[ind_id] = z > threshold
37
+
38
+ signals: list[CompoundSignal] = []
39
+
40
+ # 1. Land conversion: NDVI decline + Settlement growth
41
+ if "ndvi" in decline and "buildup" in increase:
42
+ overlap = compute_overlap_pct(decline["ndvi"], increase["buildup"])
43
+ triggered = overlap > 10
44
+ affected = float(np.sum(decline["ndvi"] & increase["buildup"])) * pixel_area_ha
45
+ signals.append(CompoundSignal(
46
+ name="land_conversion", triggered=triggered,
47
+ confidence=_tag_confidence(2, overlap) if triggered else "weak",
48
+ description=(
49
+ f"NDVI decline overlaps with settlement growth ({overlap:.0f}% overlap, "
50
+ f"{affected:.1f} ha affected). Suggests possible vegetation loss to urbanization."
51
+ ) if triggered else "No land conversion signal detected.",
52
+ indicators=["ndvi", "buildup"], overlap_pct=overlap, affected_ha=affected,
53
+ ))
54
+
55
+ # 2. Flood event: SAR decrease + Water increase
56
+ if "sar" in decline and "water" in increase:
57
+ overlap = compute_overlap_pct(decline["sar"], increase["water"])
58
+ triggered = overlap > 10
59
+ affected = float(np.sum(decline["sar"] & increase["water"])) * pixel_area_ha
60
+ signals.append(CompoundSignal(
61
+ name="flood_event", triggered=triggered,
62
+ confidence=_tag_confidence(2, overlap) if triggered else "weak",
63
+ description=(
64
+ f"SAR backscatter decrease coincides with water extent increase "
65
+ f"({overlap:.0f}% overlap, {affected:.1f} ha). Suggests potential flooding."
66
+ ) if triggered else "No flood signal detected.",
67
+ indicators=["sar", "water"], overlap_pct=overlap, affected_ha=affected,
68
+ ))
69
+
70
+ # 3. Drought stress: NDVI decline + Water decline + SAR increase
71
+ if "ndvi" in decline and "water" in decline and "sar" in increase:
72
+ combined = decline["ndvi"] & decline["water"] & increase["sar"]
73
+ n_combined = int(np.sum(combined))
74
+ min_single = min(np.sum(decline["ndvi"]), np.sum(decline["water"]), np.sum(increase["sar"]))
75
+ overlap = float(n_combined / min_single * 100) if min_single > 0 else 0.0
76
+ triggered = overlap > 10
77
+ affected = n_combined * pixel_area_ha
78
+ signals.append(CompoundSignal(
79
+ name="drought_stress", triggered=triggered,
80
+ confidence=_tag_confidence(3, overlap) if triggered else "weak",
81
+ description=(
82
+ f"NDVI decline, water decline, and SAR increase co-occur "
83
+ f"({overlap:.0f}% overlap, {affected:.1f} ha). Suggests possible drought."
84
+ ) if triggered else "No drought signal detected.",
85
+ indicators=["ndvi", "water", "sar"], overlap_pct=overlap, affected_ha=affected,
86
+ ))
87
+
88
+ # 4. Displacement pressure: Settlement growth + NDVI decline adjacent
89
+ if "buildup" in increase and "ndvi" in decline:
90
+ from scipy.ndimage import binary_dilation
91
+ expanded_buildup = binary_dilation(increase["buildup"], iterations=1)
92
+ adjacent_decline = expanded_buildup & decline["ndvi"] & ~increase["buildup"]
93
+ n_adjacent = int(np.sum(adjacent_decline))
94
+ n_buildup = int(np.sum(increase["buildup"]))
95
+ overlap = float(n_adjacent / max(n_buildup, 1) * 100)
96
+ triggered = overlap > 10 and n_adjacent > 0
97
+ affected = n_adjacent * pixel_area_ha
98
+ signals.append(CompoundSignal(
99
+ name="displacement_pressure", triggered=triggered,
100
+ confidence=_tag_confidence(2, overlap) if triggered else "weak",
101
+ description=(
102
+ f"Settlement growth hotspots are adjacent to NDVI decline areas "
103
+ f"({affected:.1f} ha of surrounding vegetation loss). "
104
+ f"Suggests expansion into previously vegetated land."
105
+ ) if triggered else "No displacement pressure signal detected.",
106
+ indicators=["ndvi", "buildup"], overlap_pct=overlap, affected_ha=affected,
107
+ ))
108
+
109
+ return signals
tests/test_compound.py ADDED
@@ -0,0 +1,54 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Tests for cross-indicator compound signal detection."""
2
+ import numpy as np
3
+ import pytest
4
+
5
+
6
+ def test_compute_overlap_pct():
7
+ from app.analysis.compound import compute_overlap_pct
8
+ a = np.array([[True, True, False], [False, False, True]], dtype=bool)
9
+ b = np.array([[True, False, False], [False, False, True]], dtype=bool)
10
+ pct = compute_overlap_pct(a, b)
11
+ assert pct > 0
12
+
13
+
14
+ def test_detect_land_conversion():
15
+ from app.analysis.compound import detect_compound_signals
16
+ ndvi_z = np.full((10, 10), -2.5, dtype=np.float32)
17
+ buildup_z = np.full((10, 10), 2.5, dtype=np.float32)
18
+ water_z = np.zeros((10, 10), dtype=np.float32)
19
+ sar_z = np.zeros((10, 10), dtype=np.float32)
20
+ signals = detect_compound_signals(
21
+ zscore_rasters={"ndvi": ndvi_z, "water": water_z, "sar": sar_z, "buildup": buildup_z},
22
+ pixel_area_ha=0.04, threshold=2.0,
23
+ )
24
+ land_conv = [s for s in signals if s.name == "land_conversion"]
25
+ assert len(land_conv) == 1
26
+ assert land_conv[0].triggered is True
27
+ assert "ndvi" in land_conv[0].indicators
28
+ assert "buildup" in land_conv[0].indicators
29
+
30
+
31
+ def test_no_signals_when_all_normal():
32
+ from app.analysis.compound import detect_compound_signals
33
+ normal = np.zeros((10, 10), dtype=np.float32)
34
+ signals = detect_compound_signals(
35
+ zscore_rasters={"ndvi": normal, "water": normal, "sar": normal, "buildup": normal},
36
+ pixel_area_ha=0.04, threshold=2.0,
37
+ )
38
+ triggered = [s for s in signals if s.triggered]
39
+ assert len(triggered) == 0
40
+
41
+
42
+ def test_flood_signal():
43
+ from app.analysis.compound import detect_compound_signals
44
+ sar_z = np.full((10, 10), -2.5, dtype=np.float32)
45
+ water_z = np.full((10, 10), 2.5, dtype=np.float32)
46
+ ndvi_z = np.zeros((10, 10), dtype=np.float32)
47
+ buildup_z = np.zeros((10, 10), dtype=np.float32)
48
+ signals = detect_compound_signals(
49
+ zscore_rasters={"ndvi": ndvi_z, "water": water_z, "sar": sar_z, "buildup": buildup_z},
50
+ pixel_area_ha=0.04, threshold=2.0,
51
+ )
52
+ flood = [s for s in signals if s.name == "flood_event"]
53
+ assert len(flood) == 1
54
+ assert flood[0].triggered is True