Spaces:
Sleeping
Sleeping
| """Tile-grid scanner: downloads satellite tiles, runs DeepSolar-3M, clusters installations. | |
| Tile source priority: | |
| 1. VWorld Satellite (VWORLD_API_KEY set) โ Korean gov imagery, fresher for Korea | |
| 2. Google Maps satellite โ global fallback | |
| Revenue estimates use 2025 Korean market rates: | |
| - Capacity: 80 W/mยฒ effective (panels + row spacing) | |
| - Generation: 1200 kWh/kWp/year (Korea avg, CF ~13.7%) | |
| - Price: SMP ~120 + REC ~35 = 155 KRW/kWh | |
| """ | |
| import math | |
| import os | |
| import time | |
| import numpy as np | |
| import requests | |
| from collections import deque | |
| from PIL import Image | |
| from io import BytesIO | |
| from api.models import ScanDetection, ScanTile, SolarInstallation, ScanResult | |
| from api.config import VWORLD_API_KEY, VWORLD_GEOCODER_URL | |
| ZOOM = 19 # ํ์ผ ๋ค์ด๋ก๋ ์ค | |
| SPLIT = 2 # ๊ฐ ํ์ผ์ SPLITรSPLIT์ผ๋ก ์ธ๋ถํ (๋ค์ด๋ก๋ ์ถ๊ฐ ์์) | |
| EFFECTIVE_ZOOM = ZOOM + 1 # log2(SPLIT)=1 โ ์ ํจ ์ค (37mร37m ํด์๋) | |
| # Configurable via env so HF Space can use a smaller cap (CPU-only, slower) | |
| MAX_BASE_TILES = int(os.environ.get("MAX_BASE_TILES", "150")) | |
| _HEADERS = {"User-Agent": "Mozilla/5.0"} | |
| # Economics constants | |
| _W_PER_M2 = 80 # ์คํจ ์ค์น ๋ฐ๋ (W/mยฒ) | |
| _KWH_PER_KWP = 1200 # ์ฐ๊ฐ ๋ฐ์ ๋ (kWh/kWp) | |
| _KRW_PER_KWH = 155 # SMP + REC ๋จ๊ฐ (์/kWh) | |
| def _ll_to_tile(lat: float, lon: float, zoom: int) -> tuple[int, int]: | |
| n = 2 ** zoom | |
| x = int((lon + 180) / 360 * n) | |
| y = int( | |
| (1 - math.log(math.tan(math.radians(lat)) + 1 / math.cos(math.radians(lat))) / math.pi) | |
| / 2 * n | |
| ) | |
| return x, y | |
| def tile_center_latlon(x: int, y: int, zoom: int) -> tuple[float, float]: | |
| n = 2 ** zoom | |
| lon = (x + 0.5) / n * 360 - 180 | |
| lat = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 0.5) / n)))) | |
| return lat, lon | |
| def tile_bounds(x: int, y: int, zoom: int) -> tuple[float, float, float, float]: | |
| """ํ์ผ์ ์๊ฒฝ๋ ๊ฒฝ๊ณ ๋ฐํ: (lat_min, lat_max, lon_min, lon_max).""" | |
| n = 2 ** zoom | |
| lon_min = x / n * 360 - 180 | |
| lon_max = (x + 1) / n * 360 - 180 | |
| lat_max = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * y / n)))) | |
| lat_min = math.degrees(math.atan(math.sinh(math.pi * (1 - 2 * (y + 1) / n)))) | |
| return lat_min, lat_max, lon_min, lon_max | |
| def tile_area_m2(lat: float, zoom: int) -> float: | |
| """Mercator ํ์ผ ์ค์ ๋ฉด์ (์๋์ ๋ฐ๋ผ ๊ฒฝ๋ ๋ฐฉํฅ ๋ณด์ ).""" | |
| tile_m = 40075016.686 / (2 ** zoom) # ์ ๋ ๊ธฐ์ค ํ์ผ ๋๋น | |
| width = tile_m * math.cos(math.radians(lat)) | |
| return width * tile_m # height๋ ์๋ ๋ฌด๊ด | |
| def bbox_to_tiles( | |
| lat_min: float, lat_max: float, lon_min: float, lon_max: float, zoom: int = ZOOM | |
| ) -> list[tuple[int, int]]: | |
| x0, y1 = _ll_to_tile(lat_min, lon_min, zoom) | |
| x1, y0 = _ll_to_tile(lat_max, lon_max, zoom) | |
| tiles = [(x, y) for x in range(x0, x1 + 1) for y in range(y0, y1 + 1)] | |
| if len(tiles) > MAX_BASE_TILES: | |
| mx, my = (x0 + x1) // 2, (y0 + y1) // 2 | |
| tiles.sort(key=lambda t: abs(t[0] - mx) + abs(t[1] - my)) | |
| tiles = tiles[:MAX_BASE_TILES] | |
| return tiles | |
| def _get_tile(x: int, y: int) -> Image.Image | None: | |
| urls = [] | |
| if VWORLD_API_KEY: | |
| urls.append( | |
| f"https://api.vworld.kr/req/wmts/1.0.0/{VWORLD_API_KEY}/Satellite/{ZOOM}/{y}/{x}.jpeg" | |
| ) | |
| urls.append(f"https://mt0.google.com/vt/lyrs=s&x={x}&y={y}&z={ZOOM}") | |
| for url in urls: | |
| try: | |
| r = requests.get(url, headers=_HEADERS, timeout=8) | |
| if r.status_code == 200 and len(r.content) > 1000: | |
| return Image.open(BytesIO(r.content)).convert("RGB") | |
| except Exception: | |
| continue | |
| return None | |
| def _cluster_tiles( | |
| positives: list[tuple[int, int, float]] | |
| ) -> list[list[tuple[int, int, float]]]: | |
| """8-๋ฐฉํฅ BFS๋ก ์ธ์ ํ์ผ์ ๋จ์ง๋ณ๋ก ๋ฌถ๋๋ค.""" | |
| coord_map: dict[tuple[int, int], float] = {(x, y): p for x, y, p in positives} | |
| visited: set[tuple[int, int]] = set() | |
| clusters: list[list[tuple[int, int, float]]] = [] | |
| for x, y, _ in positives: | |
| if (x, y) in visited: | |
| continue | |
| cluster: list[tuple[int, int, float]] = [] | |
| queue: deque[tuple[int, int]] = deque([(x, y)]) | |
| visited.add((x, y)) | |
| while queue: | |
| cx, cy = queue.popleft() | |
| cluster.append((cx, cy, coord_map[(cx, cy)])) | |
| for dx in (-1, 0, 1): | |
| for dy in (-1, 0, 1): | |
| if dx == 0 and dy == 0: | |
| continue | |
| nb = (cx + dx, cy + dy) | |
| if nb in coord_map and nb not in visited: | |
| visited.add(nb) | |
| queue.append(nb) | |
| clusters.append(cluster) | |
| return clusters | |
| def _reverse_geocode(lat: float, lon: float) -> str | None: | |
| """VWorld ์ญ์ง์ค์ฝ๋ฉ โ ์ง๋ฒ ์ฃผ์.""" | |
| if not VWORLD_API_KEY: | |
| return None | |
| try: | |
| r = requests.get( | |
| VWORLD_GEOCODER_URL, | |
| params={ | |
| "service": "address", | |
| "request": "getAddress", | |
| "version": "2.0", | |
| "crs": "epsg:4326", | |
| "point": f"{lon},{lat}", | |
| "type": "parcel", | |
| "key": VWORLD_API_KEY, | |
| "format": "json", | |
| }, | |
| timeout=6, | |
| ) | |
| data = r.json() | |
| results = data.get("response", {}).get("result", []) | |
| if results: | |
| return results[0].get("text") | |
| except Exception: | |
| pass | |
| return None | |
| def _make_installation( | |
| cluster: list[tuple[int, int, float]] | |
| ) -> SolarInstallation: | |
| """ํ์ผ ํด๋ฌ์คํฐ โ SolarInstallation ๊ณ์ฐ.""" | |
| lats = [tile_center_latlon(x, y, ZOOM)[0] for x, y, _ in cluster] | |
| lons = [tile_center_latlon(x, y, ZOOM)[1] for x, y, _ in cluster] | |
| probs = [p for _, _, p in cluster] | |
| center_lat = sum(lats) / len(lats) | |
| center_lon = sum(lons) / len(lons) | |
| # ๋ฉด์ : ๊ฐ ํ์ผ ์ค๋ฉด์ ํฉ์ฐ | |
| area = sum(tile_area_m2(lat, ZOOM) for lat in lats) | |
| capacity_kw = area * _W_PER_M2 / 1000 | |
| annual_kwh = capacity_kw * _KWH_PER_KWP | |
| revenue_krw = int(annual_kwh * _KRW_PER_KWH) | |
| address = _reverse_geocode(center_lat, center_lon) | |
| return SolarInstallation( | |
| lat=round(center_lat, 6), | |
| lon=round(center_lon, 6), | |
| tile_count=len(cluster), | |
| area_m2=round(area), | |
| capacity_kw=round(capacity_kw, 1), | |
| annual_kwh=round(annual_kwh), | |
| revenue_krw=revenue_krw, | |
| max_prob=round(max(probs), 3), | |
| address=address, | |
| ) | |
| def _img_features(img: Image.Image) -> dict: | |
| arr = np.array(img, dtype=np.float32) | |
| return { | |
| 'brightness': float(arr.mean()), | |
| 'variance': float(arr.var()), | |
| 'mean_r': float(arr[:, :, 0].mean()), | |
| 'mean_g': float(arr[:, :, 1].mean()), | |
| 'mean_b': float(arr[:, :, 2].mean()), | |
| } | |
| def _is_solar_like(feat: dict, seed_feats: list[dict]) -> bool: | |
| """์จ์ ํ์ผ๊ณผ ์๊ฐ์ ์ผ๋ก ์ ์ฌํ์ง. | |
| ์ ๋ ์์ ๋น์จ ๋์ ์จ์ ์๋ ๋น๊ต ์ฌ์ฉ. | |
| (์์ฑ ์ด๋ฏธ์ง ์กฐ๊ฑด์ ๋ฐ๋ผ ํจ๋ ์์ด ๋ฌ๋ผ์ง ์ ์์) | |
| """ | |
| # ๋๋ฌด ๋ฐ์ผ๋ฉด ๋๋กยท๊ฑด๋ฌผ | |
| if feat['brightness'] > 125: | |
| return False | |
| # ๋ถ์ฐ์ด ํฌ๋ฉด ๋ณต์กํ ์งํยท๋๋ฌด | |
| if feat['variance'] > 2500: | |
| return False | |
| # ์จ์ ๋๋น ๋ฐ๊ธฐยท์์ ์ ์ฌ๋ | |
| for sf in seed_feats: | |
| if abs(feat['brightness'] - sf['brightness']) > 40: | |
| continue | |
| if abs(feat['mean_r'] - sf['mean_r']) > 30: | |
| continue | |
| if abs(feat['mean_g'] - sf['mean_g']) > 30: | |
| continue | |
| if abs(feat['mean_b'] - sf['mean_b']) > 30: | |
| continue | |
| return True | |
| return False | |
| # ํ์๊ด ๋ด๋ถ ํ์ผ ์ต์ ํ๋ฅ (0.004~0.02) vs ๋์งยท์ผ๋ฐ์งํ (~0.001~0.003) | |
| _GROW_MIN_PROB = 0.004 | |
| def _region_grow( | |
| tile_data: dict[tuple[int, int], tuple[float, dict]], | |
| seed_threshold: float, | |
| ) -> set[tuple[int, int]]: | |
| """๊ณ ์ ๋ขฐ๋ ์จ์ โ ML ํ๋ฅ ๊ฒ์ดํธ + ์๊ฐ ์ ์ฌ๋๋ก BFS ํ์ฅ.""" | |
| seeds = {(x, y) for (x, y), (p, _) in tile_data.items() if p >= seed_threshold} | |
| if not seeds: | |
| return seeds | |
| seed_feats = [tile_data[xy][1] for xy in seeds] | |
| detected: set[tuple[int, int]] = set(seeds) | |
| queue: deque[tuple[int, int]] = deque(seeds) | |
| while queue: | |
| cx, cy = queue.popleft() | |
| for dx in (-1, 0, 1): | |
| for dy in (-1, 0, 1): | |
| if dx == 0 and dy == 0: | |
| continue | |
| nb = (cx + dx, cy + dy) | |
| if nb in detected or nb not in tile_data: | |
| continue | |
| nb_prob, nb_feat = tile_data[nb] | |
| # ML ํ๋ฅ ์ต์๊ฐ + ์๊ฐ ์ ์ฌ๋ ๋์ ์ถฉ์กฑ | |
| if nb_prob >= _GROW_MIN_PROB and _is_solar_like(nb_feat, seed_feats): | |
| detected.add(nb) | |
| queue.append(nb) | |
| return detected | |
| def scan_region( | |
| lat_min: float, lat_max: float, lon_min: float, lon_max: float, threshold: float = 0.12 | |
| ) -> ScanResult: | |
| """ | |
| 1. ZOOM=19 ํ์ผ ๋ค์ด๋ก๋ (MAX_BASE_TILES๊ฐ) | |
| 2. ๊ฐ ํ์ผ์ SPLITรSPLIT์ผ๋ก ์ธ๋ถํ โ ์ ํจ ํด์๋ EFFECTIVE_ZOOM | |
| 3. ์ธ๋ถ ํ์ผ๋ณ DeepSolar ์ถ๋ก | |
| 4. region growing (์จ์ + ML ํ๋ฅ ๊ฒ์ดํธ + ์๊ฐ ์ ์ฌ๋) | |
| 5. ํด๋ฌ์คํฐ๋ง โ ๋จ์ง ๋ถ์ | |
| """ | |
| from api.clients.deepsolar import infer_batch | |
| base_coords = bbox_to_tiles(lat_min, lat_max, lon_min, lon_max, zoom=ZOOM) | |
| # ๊ฐ์ ํ์ผ ์ขํ: (vx, vy) at EFFECTIVE_ZOOM | |
| # base tile (x,y) โ sub-tiles (x*SPLIT+j, y*SPLIT+i) for i,j in [0, SPLIT) | |
| tile_data: dict[tuple[int, int], tuple[float, dict]] = {} | |
| # Collect all sub-tile images, then infer in one batch for speed | |
| batch_imgs: list = [] | |
| batch_keys: list[tuple[int, int]] = [] | |
| batch_feats: list[dict] = [] | |
| for bx, by in base_coords: | |
| img = _get_tile(bx, by) | |
| if img is None: | |
| continue | |
| iw, ih = img.size | |
| sw, sh = iw // SPLIT, ih // SPLIT | |
| for si in range(SPLIT): # row: 0=top | |
| for sj in range(SPLIT): # col: 0=left | |
| crop = img.crop((sj * sw, si * sh, (sj + 1) * sw, (si + 1) * sh)) | |
| crop_256 = crop.resize((256, 256), Image.LANCZOS) | |
| vx = bx * SPLIT + sj | |
| vy = by * SPLIT + si | |
| batch_imgs.append(crop_256) | |
| batch_keys.append((vx, vy)) | |
| batch_feats.append(_img_features(crop_256)) | |
| # ์๊ฐ ์์ฐ 50s: ์ด๊ณผ ์ ๋จ์ ํ์ผ์ prob=0์ผ๋ก ์ฒ๋ฆฌ (ํ์์์ ๋ฐฉ์ง) | |
| _BUDGET_S = 50.0 | |
| _t0 = time.monotonic() | |
| if batch_imgs: | |
| probs: list[float] = [] | |
| mini = 8 | |
| for i in range(0, len(batch_imgs), mini): | |
| if time.monotonic() - _t0 > _BUDGET_S: | |
| probs.extend([0.0] * (len(batch_imgs) - len(probs))) | |
| break | |
| probs.extend(infer_batch(batch_imgs[i : i + mini])) | |
| for (vx, vy), prob, feat in zip(batch_keys, probs, batch_feats): | |
| tile_data[(vx, vy)] = (prob, feat) | |
| # region growing (EFFECTIVE_ZOOM ๊ฐ์ ์ขํ๊ณ) | |
| detected_xy = _region_grow(tile_data, seed_threshold=threshold) | |
| # ๊ฒฐ๊ณผ ์กฐ๋ฆฝ | |
| positives: list[tuple[int, int, float]] = [] | |
| scan_tiles: list[ScanTile] = [] | |
| for (vx, vy), (prob, _) in tile_data.items(): | |
| detected = (vx, vy) in detected_xy | |
| bmin_lat, bmax_lat, bmin_lon, bmax_lon = tile_bounds(vx, vy, EFFECTIVE_ZOOM) | |
| scan_tiles.append(ScanTile( | |
| lat_min=round(bmin_lat, 7), | |
| lat_max=round(bmax_lat, 7), | |
| lon_min=round(bmin_lon, 7), | |
| lon_max=round(bmax_lon, 7), | |
| prob=round(prob, 3), | |
| detected=detected, | |
| )) | |
| if detected: | |
| positives.append((vx, vy, prob)) | |
| detections = [ | |
| ScanDetection( | |
| lat=tile_center_latlon(vx, vy, EFFECTIVE_ZOOM)[0], | |
| lon=tile_center_latlon(vx, vy, EFFECTIVE_ZOOM)[1], | |
| prob=round(p, 3), | |
| ) | |
| for vx, vy, p in positives | |
| ] | |
| clusters = _cluster_tiles(positives) | |
| installations = [_make_installation(c) for c in clusters] | |
| total_area = sum(inst.area_m2 for inst in installations) | |
| total_cap = sum(inst.capacity_kw for inst in installations) | |
| total_rev = sum(inst.revenue_krw for inst in installations) | |
| return ScanResult( | |
| detections=detections, | |
| installations=installations, | |
| tiles=scan_tiles, | |
| tiles_scanned=len(tile_data), | |
| tiles_positive=len(positives), | |
| total_area_m2=round(total_area), | |
| total_capacity_kw=round(total_cap, 1), | |
| total_revenue_krw=total_rev, | |
| ) | |