import os import re import io import json import math import datetime as dt from typing import Optional, Tuple, Dict import requests import numpy as np import cv2 from mgrs import MGRS from PIL import Image import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import gradio as gr # -------------------------- # Константы и хелперы # -------------------------- S2_BASE = "https://sentinel-s2-l1c.s3.amazonaws.com/tiles" UA = {"User-Agent": "s2-ndvi-gradio-tool/1.0 (+https://example.com)"} # Sentinel-2 тайлы 10м — 10980 px (около 109.8 км), 100 км поле внутри с ~490 px отступом TILE_SIZE_10M = 10980 GRID_100KM_M = 100_000 MARGIN_PX_10M = (TILE_SIZE_10M - GRID_100KM_M // 10) // 2 # ~490 # -------------------------- # Геокодирование / парсинг # -------------------------- def parse_latlon(text: str) -> Optional[Tuple[float, float]]: if not text: return None m = re.match(r"^\s*([+-]?\d+(?:\.\d+)?)\s*[,; ]\s*([+-]?\d+(?:\.\d+)?)\s*$", text) if not m: return None lat = float(m.group(1)) lon = float(m.group(2)) if not (-90 <= lat <= 90 and -180 <= lon <= 180): return None return lat, lon def geocode_nominatim(query: str, lang: str = "ru") -> Optional[Tuple[float, float, str]]: url = "https://nominatim.openstreetmap.org/search" params = {"q": query, "format": "jsonv2", "limit": 1, "accept-language": lang} try: r = requests.get(url, params=params, headers=UA, timeout=20) if r.status_code != 200: return None data = r.json() if not data: return None lat = float(data[0]["lat"]) lon = float(data[0]["lon"]) display = data[0].get("display_name", query) return lat, lon, display except Exception: return None # -------------------------- # MGRS / индексация тайла # -------------------------- def latlon_to_mgrs(lat: float, lon: float) -> Dict[str, any]: """ Возвращает словарь с полями: - zone (int), band (str), square (str) — компоненты пути S3 - easting_m, northing_m — координаты внутри 100 км квадрата (метры) - mgrs_str — полный MGRS (с 5+5 цифрами) """ m = MGRS() s = m.toMGRS(lat, lon, MGRSPrecision=5) # Пример: '31UDQ4825111980' # Найдём границу цифр/букв i = 0 while i < len(s) and s[i].isdigit(): i += 1 zone = int(s[:i]) band = s[i] square = s[i + 1:i + 3] digits = s[i + 3:] e_str = digits[:5] n_str = digits[5:10] easting_m = int(e_str) northing_m = int(n_str) return { "zone": zone, "band": band, "square": square, "easting_m": easting_m, "northing_m": northing_m, "mgrs_str": s } def build_s2_tile_prefix(zone: int, band: str, square: str, d: dt.date, seq: int) -> str: # В AWS путь вида: /tiles/{utm_zone}/{lat_band}/{grid_square}/{YYYY}/{M}/{D}/{sequence}/ return f"{S2_BASE}/{zone}/{band}/{square}/{d.year}/{d.month}/{d.day}/{seq}" def url_band(zone: int, band: str, square: str, d: dt.date, seq: int, band_name: str) -> str: # band_name: 'B04', 'B08', 'TCI', 'QA60' и т.д. return f"{build_s2_tile_prefix(zone, band, square, d, seq)}/{band_name}.jp2" def url_preview(zone: int, band: str, square: str, d: dt.date, seq: int) -> str: return f"{build_s2_tile_prefix(zone, band, square, d, seq)}/preview.jpg" def head_exists(url: str) -> bool: try: r = requests.head(url, headers=UA, timeout=15) return r.status_code == 200 except Exception: return False def find_sequence_with_data(zone: int, band: str, square: str, d: dt.date, test_band="B04") -> Optional[int]: # Пробуем seq 0..3 пока найдём объект for seq in range(4): if head_exists(url_band(zone, band, square, d, seq, test_band)): return seq return None # -------------------------- # Загрузка и декодирование JP2 # -------------------------- def http_get(url: str) -> Optional[bytes]: try: r = requests.get(url, headers=UA, timeout=60) if r.status_code != 200: return None return r.content except Exception: return None def decode_jp2_to_np(jp2_bytes: bytes, flags=cv2.IMREAD_UNCHANGED): # Возвращает numpy массив (H, W) или (H, W, C); 16-bit для каналов, 8/16-bit для TCI arr = np.frombuffer(jp2_bytes, dtype=np.uint8) img = cv2.imdecode(arr, flags) return img # -------------------------- # Геометрия ROI внутри тайла # -------------------------- def pixel_from_mgrs(easting_m: int, northing_m: int) -> Tuple[int, int]: """ Перевод положения внутри 100 км квадрата (метры от юго-западного угла) в пиксели 10 м данных тайла (с учётом ~490 px отступа). Возвращает (col_x, row_y). """ x_px = MARGIN_PX_10M + int(round(easting_m / 10.0)) # y: сверху 0, внутри квадрата от южной границы -> переворачиваем y_px = MARGIN_PX_10M + int(round((GRID_100KM_M - northing_m) / 10.0)) # страхуемся в пределах допустимого x_px = max(0, min(TILE_SIZE_10M - 1, x_px)) y_px = max(0, min(TILE_SIZE_10M - 1, y_px)) return x_px, y_px def crop_roi(img: np.ndarray, cx: int, cy: int, rad_px: int) -> np.ndarray: h, w = img.shape[:2] x0 = max(0, cx - rad_px) x1 = min(w, cx + rad_px) y0 = max(0, cy - rad_px) y1 = min(h, cy + rad_px) return img[y0:y1, x0:x1].copy(), (x0, y0, x1, y1) # -------------------------- # NDVI и отрисовка # -------------------------- def compute_ndvi(b08_10m: np.ndarray, b04_10m: np.ndarray, qa60_10m: Optional[np.ndarray] = None) -> Tuple[np.ndarray, Optional[np.ndarray]]: b08 = b08_10m.astype(np.float32) b04 = b04_10m.astype(np.float32) denom = (b08 + b04) ndvi = np.where(denom > 0, (b08 - b04) / denom, np.nan) cloud_mask = None if qa60_10m is not None: # QA60: биты 10 (opaque clouds) и 11 (cirrus) qa = qa60_10m.astype(np.uint16) clouds = (((qa >> 10) & 1) | ((qa >> 11) & 1)) > 0 ndvi = np.where(clouds, np.nan, ndvi) cloud_mask = clouds return ndvi, cloud_mask def colorize_ndvi(ndvi: np.ndarray, cloud_mask: Optional[np.ndarray] = None, vmin: float = -0.2, vmax: float = 0.9) -> np.ndarray: x = ndvi.copy() # Отдельно отметим NaN (облака/нет данных) nan_mask = np.isnan(x) x = np.clip(x, vmin, vmax) norm = (x - vmin) / (vmax - vmin + 1e-9) cmap = plt.get_cmap("RdYlGn") # красный->желтый->зеленый rgb = (cmap(norm)[..., :3] * 255).astype(np.uint8) # Заштрихуем NaN серым if cloud_mask is not None: nan_mask = nan_mask | cloud_mask rgb[nan_mask] = np.array([180, 180, 180], dtype=np.uint8) return rgb def stretch_16_to_8(img: np.ndarray, p_low=2, p_high=98) -> np.ndarray: # Клип и линейная растяжка для лучшего контраста if img.ndim == 2: lo, hi = np.percentile(img[img > 0], [p_low, p_high]) if np.any(img > 0) else (0, 1) hi = max(hi, lo + 1) out = (np.clip(img, lo, hi) - lo) / (hi - lo + 1e-6) return (out * 255).astype(np.uint8) else: out = np.zeros_like(img, dtype=np.uint8) for c in range(img.shape[2]): chan = img[..., c] if np.any(chan > 0): lo, hi = np.percentile(chan[chan > 0], [p_low, p_high]) hi = max(hi, lo + 1) out[..., c] = ((np.clip(chan, lo, hi) - lo) / (hi - lo + 1e-6) * 255).astype(np.uint8) return out # -------------------------- # Основная логика анализа # -------------------------- def analyze_vegetation( address_or_coords: str, date_str: str, radius_m: int, ) -> Tuple[np.ndarray, np.ndarray, str]: # 1) Получаем координаты latlon = parse_latlon(address_or_coords.strip()) resolved_label = None if latlon is None: geo = geocode_nominatim(address_or_coords.strip()) if geo is None: raise gr.Error("Не удалось геокодировать адрес или распознать координаты. Попробуйте формат '55.75, 37.61' или уточните адрес.") lat, lon, resolved_label = geo else: lat, lon = latlon resolved_label = f"Координаты: {lat:.6f}, {lon:.6f}" # 2) MGRS тайл + положение tile = latlon_to_mgrs(lat, lon) zone = tile["zone"] band = tile["band"] square = tile["square"] easting_m = tile["easting_m"] northing_m = tile["northing_m"] mgrs_str = tile["mgrs_str"] cx, cy = pixel_from_mgrs(easting_m, northing_m) rad_px = max(5, int(radius_m / 10)) # 10 м/пикс # 3) Дата и последовательность if date_str: try: d = dt.datetime.strptime(date_str.strip(), "%Y-%m-%d").date() except Exception: raise gr.Error("Дата должна быть в формате ГГГГ-ММ-ДД (например, 2024-07-15).") else: d = dt.date.today() seq = find_sequence_with_data(zone, band, square, d, test_band="B04") if seq is None: # Попробуем отмотать назад до 10 дней found = False for back in range(1, 11): dd = d - dt.timedelta(days=back) seq_try = find_sequence_with_data(zone, band, square, dd, test_band="B04") if seq_try is not None: d = dd seq = seq_try found = True break if not found: raise gr.Error("Не нашёл подходящих сцен Sentinel-2 для этой локации и последних 10 дней.") # 4) Скачиваем каналы: B04 (red), B08 (nir), QA60 (облака), TCI (true color) либо B02/B03/B04 url_b04 = url_band(zone, band, square, d, seq, "B04") url_b08 = url_band(zone, band, square, d, seq, "B08") url_qa60 = url_band(zone, band, square, d, seq, "QA60") url_tci = url_band(zone, band, square, d, seq, "TCI") b04_bytes = http_get(url_b04) b08_bytes = http_get(url_b08) if b04_bytes is None or b08_bytes is None: raise gr.Error("Не удалось скачать каналы B04/B08. Попробуйте другую дату.") qa60_bytes = http_get(url_qa60) # может быть None tci_bytes = http_get(url_tci) # может быть None # 5) Декодирование b04 = decode_jp2_to_np(b04_bytes, cv2.IMREAD_UNCHANGED) # (10980, 10980), uint16 b08 = decode_jp2_to_np(b08_bytes, cv2.IMREAD_UNCHANGED) if b04 is None or b08 is None or b04.shape != b08.shape: raise gr.Error("Ошибка чтения JP2 или несовпадение размеров каналов B04/B08.") # QA60 обычно 60м (1830x1830) — апсемплим до 10м qa10m = None cloud_pct_roi = None if qa60_bytes is not None: qa = decode_jp2_to_np(qa60_bytes, cv2.IMREAD_UNCHANGED) if qa is not None: qa10m = cv2.resize(qa, (TILE_SIZE_10M, TILE_SIZE_10M), interpolation=cv2.INTER_NEAREST) # 6) Кадрируем ROI b04_roi, (x0, y0, x1, y1) = crop_roi(b04, cx, cy, rad_px) b08_roi, _ = crop_roi(b08, cx, cy, rad_px) qa_roi = None if qa10m is not None: qa_roi, _ = crop_roi(qa10m, cx, cy, rad_px) # 7) NDVI + маска облаков ndvi_roi, cloud_mask = compute_ndvi(b08_roi, b04_roi, qa_roi) # 8) Статистика valid = ~np.isnan(ndvi_roi) if np.any(valid): mean_ndvi = float(np.nanmean(ndvi_roi)) med_ndvi = float(np.nanmedian(ndvi_roi)) pct_veg_good = float(np.nanmean(ndvi_roi > 0.5) * 100.0) pct_veg_med = float(np.nanmean((ndvi_roi > 0.3) & (ndvi_roi <= 0.5)) * 100.0) else: mean_ndvi = float("nan") med_ndvi = float("nan") pct_veg_good = 0.0 pct_veg_med = 0.0 if cloud_mask is not None: cloud_pct_roi = float(np.mean(cloud_mask) * 100.0) else: cloud_pct_roi = None # 9) Рендер NDVI ndvi_rgb = colorize_ndvi(ndvi_roi, cloud_mask) # 10) True color ROI # a) Сначала пытаемся TCI tci_roi_rgb = None if tci_bytes is not None: tci = decode_jp2_to_np(tci_bytes, cv2.IMREAD_UNCHANGED) if tci is not None: if tci.ndim == 3: # cv2 читает BGR — переведём в RGB if tci.shape[2] == 3: tci = cv2.cvtColor(tci, cv2.COLOR_BGR2RGB) tci_roi, _ = crop_roi(tci, cx, cy, rad_px) tci_roi_rgb = stretch_16_to_8(tci_roi) else: # бывает моно — не используем tci_roi_rgb = None # b) Если TCI нет — соберём из B02/B03/B04 if tci_roi_rgb is None: url_b02 = url_band(zone, band, square, d, seq, "B02") url_b03 = url_band(zone, band, square, d, seq, "B03") b02_bytes = http_get(url_b02) b03_bytes = http_get(url_b03) if b02_bytes is not None and b03_bytes is not None: b02 = decode_jp2_to_np(b02_bytes, cv2.IMREAD_UNCHANGED) b03 = decode_jp2_to_np(b03_bytes, cv2.IMREAD_UNCHANGED) if b02 is not None and b03 is not None and b02.shape == b03.shape == b04.shape: b02_roi, _ = crop_roi(b02, cx, cy, rad_px) b03_roi, _ = crop_roi(b03, cx, cy, rad_px) rgb16 = np.dstack([b04_roi, b03_roi, b02_roi]) tci_roi_rgb = stretch_16_to_8(rgb16) # если не получилось — сделаем псевдоцвет из B08/B04 if tci_roi_rgb is None: pseudo = np.dstack([b08_roi, b04_roi, b04_roi]) tci_roi_rgb = stretch_16_to_8(pseudo) # 11) Текстовый вывод/диагностика url_used_preview = url_preview(zone, band, square, d, seq) info_lines = [] info_lines.append(f"Адрес/точка: {resolved_label}") info_lines.append(f"MGRS тайл: {mgrs_str} (zone={zone}, band={band}, square={square})") info_lines.append(f"Дата съёмки: {d.isoformat()} (seq={seq})") if cloud_pct_roi is not None: info_lines.append(f"Облака в ROI: {cloud_pct_roi:.1f}%") if not np.isnan(mean_ndvi): info_lines.append(f"Средний NDVI (ROI): {mean_ndvi:.3f}") info_lines.append(f"Медианный NDVI (ROI): {med_ndvi:.3f}") info_lines.append(f"Доля пикселей NDVI>0.5: {pct_veg_good:.1f}%") info_lines.append(f"Доля пикселей 0.3