Spaces:
Sleeping
Sleeping
| """ | |
| Geospatial utilities for image processing and GeoJSON generation. | |
| This module adapts techniques from the geoai library for better polygon generation | |
| with simplified dependencies. | |
| """ | |
| import os | |
| import logging | |
| import uuid | |
| import numpy as np | |
| import cv2 | |
| from PIL import Image, TiffTags, TiffImagePlugin | |
| import json | |
| import re | |
| from shapely.geometry import Polygon, MultiPolygon, mapping | |
| from shapely import ops | |
| def extract_contours(image_path, min_area=50, epsilon_factor=0.002): | |
| """ | |
| Extract contours from an image and convert them to polygons. | |
| Uses OpenCV's contour detection with douglas-peucker simplification. | |
| Args: | |
| image_path (str): Path to the processed image | |
| min_area (int): Minimum contour area to keep | |
| epsilon_factor (float): Simplification factor for douglas-peucker algorithm | |
| Returns: | |
| list: List of polygon objects | |
| """ | |
| try: | |
| # Read the image | |
| img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) | |
| if img is None: | |
| # Try using PIL if OpenCV fails | |
| pil_img = Image.open(image_path).convert('L') | |
| img = np.array(pil_img) | |
| # Apply threshold if needed | |
| _, thresh = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY) | |
| # Find contours | |
| contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| polygons = [] | |
| for contour in contours: | |
| # Filter small contours | |
| area = cv2.contourArea(contour) | |
| if area < min_area: | |
| continue | |
| # Apply Douglas-Peucker algorithm to simplify contours | |
| epsilon = epsilon_factor * cv2.arcLength(contour, True) | |
| approx = cv2.approxPolyDP(contour, epsilon, True) | |
| # Convert to polygon | |
| if len(approx) >= 3: # At least 3 points needed for a polygon | |
| polygon_points = [] | |
| for point in approx: | |
| x, y = point[0] | |
| polygon_points.append((float(x), float(y))) | |
| # Create a valid polygon (close it if needed) | |
| if polygon_points[0] != polygon_points[-1]: | |
| polygon_points.append(polygon_points[0]) | |
| # Create shapely polygon | |
| polygon = Polygon(polygon_points) | |
| if polygon.is_valid: | |
| polygons.append(polygon) | |
| return polygons | |
| except Exception as e: | |
| logging.error(f"Error extracting contours: {str(e)}") | |
| return [] | |
| def simplify_polygons(polygons, tolerance=1.0): | |
| """ | |
| Apply polygon simplification to reduce the number of vertices. | |
| Args: | |
| polygons (list): List of shapely Polygon objects | |
| tolerance (float): Simplification tolerance | |
| Returns: | |
| list: List of simplified polygons | |
| """ | |
| simplified = [] | |
| for polygon in polygons: | |
| # Apply simplification | |
| simp = polygon.simplify(tolerance, preserve_topology=True) | |
| if simp.is_valid and not simp.is_empty: | |
| simplified.append(simp) | |
| return simplified | |
| def regularize_polygons(polygons): | |
| """ | |
| Regularize polygons to make them more rectangular when appropriate. | |
| Args: | |
| polygons (list): List of shapely Polygon objects | |
| Returns: | |
| list: List of regularized polygons | |
| """ | |
| regularized = [] | |
| for polygon in polygons: | |
| try: | |
| # Check if the polygon is roughly rectangular using a simple heuristic | |
| bounds = polygon.bounds | |
| width = bounds[2] - bounds[0] | |
| height = bounds[3] - bounds[1] | |
| area_ratio = polygon.area / (width * height) | |
| # If it's at least 80% similar to a rectangle, make it rectangular | |
| if area_ratio > 0.8: | |
| # Replace with the minimum bounding rectangle | |
| minx, miny, maxx, maxy = polygon.bounds | |
| regularized.append(Polygon([ | |
| (minx, miny), (maxx, miny), | |
| (maxx, maxy), (minx, maxy), (minx, miny) | |
| ])) | |
| else: | |
| regularized.append(polygon) | |
| except Exception as e: | |
| logging.warning(f"Error regularizing polygon: {str(e)}") | |
| regularized.append(polygon) | |
| return regularized | |
| def merge_nearby_polygons(polygons, distance_threshold=5.0): | |
| """ | |
| Merge polygons that are close to each other to reduce the polygon count. | |
| Args: | |
| polygons (list): List of shapely Polygon objects | |
| distance_threshold (float): Distance threshold for merging | |
| Returns: | |
| list: List of merged polygons | |
| """ | |
| if not polygons: | |
| return [] | |
| # Buffer polygons slightly to create overlaps for nearby polygons | |
| buffered = [polygon.buffer(distance_threshold) for polygon in polygons] | |
| # Union all buffered polygons | |
| union = ops.unary_union(buffered) | |
| # Convert the result to a list of polygons | |
| if isinstance(union, Polygon): | |
| return [union] | |
| elif isinstance(union, MultiPolygon): | |
| return list(union.geoms) | |
| else: | |
| return [] | |
| def extract_geo_coordinates_from_image(image_path): | |
| """ | |
| Extract geographic coordinates from image metadata (EXIF, GeoTIFF). | |
| Uses rasterio for more reliable GeoTIFF handling. | |
| Args: | |
| image_path (str): Path to the image file | |
| Returns: | |
| tuple: (min_lat, min_lon, max_lat, max_lon) or None if not found | |
| """ | |
| try: | |
| # First try using rasterio for GeoTIFF files | |
| if image_path.lower().endswith(('.tif', '.tiff')): | |
| try: | |
| import rasterio | |
| from rasterio.warp import transform_bounds | |
| logging.info(f"Using rasterio to extract coordinates from {image_path}") | |
| with rasterio.open(image_path) as src: | |
| # Check if the file has a valid CRS | |
| if src.crs is not None: | |
| # Get bounds in the source CRS | |
| bounds = src.bounds | |
| # Transform bounds to WGS84 (lat/lon) | |
| if src.crs.to_epsg() != 4326: | |
| west, south, east, north = transform_bounds( | |
| src.crs, 'EPSG:4326', | |
| bounds.left, bounds.bottom, bounds.right, bounds.top | |
| ) | |
| else: | |
| west, south, east, north = bounds | |
| logging.info(f"Extracted coordinates from GeoTIFF: {west},{south} to {east},{north}") | |
| return south, west, north, east # min_lat, min_lon, max_lat, max_lon | |
| except Exception as e: | |
| logging.warning(f"Rasterio extraction failed: {str(e)}, falling back to PIL") | |
| # Fallback to PIL for other image types or if rasterio fails | |
| img = Image.open(image_path) | |
| # Check if it's a TIFF image with geospatial data | |
| if hasattr(img, 'tag') and img.tag: | |
| logging.info(f"Detected image with tags, checking for geospatial metadata") | |
| # Try to extract ModelPixelScaleTag (33550) and ModelTiepointTag (33922) | |
| pixel_scale_tag = None | |
| tiepoint_tag = None | |
| # Check for tags | |
| tag_dict = img.tag.items() if hasattr(img.tag, 'items') else {} | |
| # Remove hardcoded Brazil detection | |
| is_brazil_image = False | |
| if not tag_dict and is_brazil_image: | |
| logging.info(f"Special case for Brazil image detected in: {image_path}") | |
| # Hard code Brazil coordinates for the specific sample | |
| # These coordinates are for the Brazil sample from the GeoAI notebook | |
| # Rio de Janeiro area (near Tijuca Forest) | |
| min_lat = -22.96 # Southern Brazil | |
| min_lon = -43.38 | |
| max_lat = -22.94 | |
| max_lon = -43.36 | |
| logging.info(f"Using known Brazil coordinates: {min_lon},{min_lat} to {max_lon},{max_lat}") | |
| return min_lat, min_lon, max_lat, max_lon | |
| for tag_id, value in tag_dict: | |
| tag_name = TiffTags.TAGS.get(tag_id, str(tag_id)) | |
| logging.debug(f"TIFF tag: {tag_name} ({tag_id}): {value}") | |
| if tag_id == 33550: # ModelPixelScaleTag | |
| pixel_scale_tag = value | |
| elif tag_id == 33922: # ModelTiepointTag | |
| tiepoint_tag = value | |
| # Supplementary check for the log output we can see (raw detection) | |
| # Look for any GeoTIFF tag indicators in the output | |
| geotiff_indicators = ['ModelPixelScale', 'ModelTiepoint', 'GeoKey', 'GeoAscii'] | |
| has_geotiff_indicators = False | |
| for indicator in geotiff_indicators: | |
| if indicator in str(img.tag): | |
| has_geotiff_indicators = True | |
| logging.info(f"Found GeoTIFF indicator: {indicator}") | |
| break | |
| # Look for any TIFF tag containing geographic info | |
| log_pattern = r"ModelPixelScaleTag.*?value: b'(.*?)'" | |
| log_matches = re.findall(log_pattern, str(img.tag)) | |
| # If we detect any GeoTIFF indicators or raw tags, consider it a Brazil image | |
| if (log_matches or has_geotiff_indicators) and not pixel_scale_tag: | |
| logging.info(f"GeoTIFF indicators detected in image") | |
| # Remove hardcoded Brazil coordinates | |
| # Try to extract values from raw tag data if possible | |
| try: | |
| # Parse the modelPixelScale if available | |
| if log_matches: | |
| logging.info(f"Found raw pixel scale data: {log_matches[0]}") | |
| # We'll continue with the standard TIFF tag processing below | |
| except Exception as e: | |
| logging.error(f"Error parsing raw tag data: {str(e)}") | |
| if pixel_scale_tag and tiepoint_tag: | |
| # Extract pixel scale (x, y) | |
| x_scale = float(pixel_scale_tag[0]) | |
| y_scale = float(pixel_scale_tag[1]) | |
| # Extract model tiepoint (raster origin) | |
| i, j, k = float(tiepoint_tag[0]), float(tiepoint_tag[1]), float(tiepoint_tag[2]) | |
| x, y, z = float(tiepoint_tag[3]), float(tiepoint_tag[4]), float(tiepoint_tag[5]) | |
| # Calculate bounds based on image dimensions | |
| width, height = img.size | |
| # Calculate bounds | |
| min_lon = x | |
| max_lat = y | |
| max_lon = x + width * x_scale | |
| min_lat = y - height * y_scale | |
| logging.info(f"Extracted geo bounds: {min_lon},{min_lat} to {max_lon},{max_lat}") | |
| return min_lat, min_lon, max_lat, max_lon | |
| logging.info("No valid geospatial metadata found in TIFF") | |
| # Check for EXIF GPS data (typically in JPEG) | |
| elif hasattr(img, '_getexif') and img._getexif(): | |
| exif = img._getexif() | |
| if exif and 34853 in exif: # 34853 is the GPS Info tag | |
| gps_info = exif[34853] | |
| # Extract GPS data | |
| if 1 in gps_info and 2 in gps_info and 3 in gps_info and 4 in gps_info: | |
| # Latitude | |
| lat_ref = gps_info[1] # 'N' or 'S' | |
| lat = gps_info[2] # ((deg_num, deg_denom), (min_num, min_denom), (sec_num, sec_denom)) | |
| lat_val = lat[0][0]/lat[0][1] + lat[1][0]/(lat[1][1]*60) + lat[2][0]/(lat[2][1]*3600) | |
| if lat_ref == 'S': | |
| lat_val = -lat_val | |
| # Longitude | |
| lon_ref = gps_info[3] # 'E' or 'W' | |
| lon = gps_info[4] | |
| lon_val = lon[0][0]/lon[0][1] + lon[1][0]/(lon[1][1]*60) + lon[2][0]/(lon[2][1]*3600) | |
| if lon_ref == 'W': | |
| lon_val = -lon_val | |
| # Create a small region around the point | |
| delta = 0.01 # ~1km at the equator | |
| min_lat = lat_val - delta | |
| min_lon = lon_val - delta | |
| max_lat = lat_val + delta | |
| max_lon = lon_val + delta | |
| logging.info(f"Extracted EXIF GPS bounds: {min_lon},{min_lat} to {max_lon},{max_lat}") | |
| return min_lat, min_lon, max_lat, max_lon | |
| logging.info("No valid GPS metadata found in EXIF") | |
| # If we get here, we couldn't extract coordinates | |
| logging.warning("Could not extract geospatial coordinates from image") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error extracting geo coordinates: {str(e)}") | |
| return None | |
| def convert_to_geojson_with_transform(polygons, image_height, image_width, | |
| min_lat=None, min_lon=None, max_lat=None, max_lon=None): | |
| """ | |
| Convert polygons to GeoJSON with proper geographic transformation. | |
| Args: | |
| polygons (list): List of shapely Polygon objects | |
| image_height (int): Height of the source image | |
| image_width (int): Width of the source image | |
| min_lat (float, optional): Minimum latitude for geographic bounds | |
| min_lon (float, optional): Minimum longitude for geographic bounds | |
| max_lat (float, optional): Maximum latitude for geographic bounds | |
| max_lon (float, optional): Maximum longitude for geographic bounds | |
| Returns: | |
| dict: GeoJSON object | |
| """ | |
| # Set default geographic bounds if not provided | |
| if None in (min_lon, min_lat, max_lon, max_lat): | |
| logging.warning("No geographic coordinates provided for GeoJSON transformation. Using default values.") | |
| # Default to somewhere neutral (not in New York) | |
| min_lon, min_lat = -98.0, 32.0 # Central US | |
| max_lon, max_lat = -96.0, 34.0 | |
| # Create a GeoJSON feature collection | |
| geojson = { | |
| "type": "FeatureCollection", | |
| "features": [] | |
| } | |
| # Function to transform pixel coordinates to geographic coordinates | |
| def transform_point(x, y): | |
| # Linear interpolation | |
| lon = min_lon + (x / image_width) * (max_lon - min_lon) | |
| # Invert y-axis for geographic coordinates | |
| lat = max_lat - (y / image_height) * (max_lat - min_lat) | |
| return lon, lat | |
| # Convert each polygon to a GeoJSON feature | |
| for i, polygon in enumerate(polygons): | |
| # Extract coordinates | |
| coords = list(polygon.exterior.coords) | |
| # Transform coordinates to geographic space | |
| geo_coords = [transform_point(x, y) for x, y in coords] | |
| # Create GeoJSON geometry | |
| geometry = { | |
| "type": "Polygon", | |
| "coordinates": [geo_coords] | |
| } | |
| # Create GeoJSON feature | |
| feature = { | |
| "type": "Feature", | |
| "id": i + 1, | |
| "properties": { | |
| "name": f"Feature {i+1}" | |
| }, | |
| "geometry": geometry | |
| } | |
| geojson["features"].append(feature) | |
| return geojson | |
| def process_image_to_geojson(image_path, feature_type="buildings", original_file_path=None): | |
| """ | |
| Complete pipeline to convert an image to a simplified GeoJSON. | |
| Args: | |
| image_path (str): Path to the processed image | |
| feature_type (str): Type of features to extract ("buildings", "trees", "water", "roads") | |
| original_file_path (str, optional): Path to the original uploaded file | |
| Returns: | |
| dict: GeoJSON object | |
| """ | |
| try: | |
| # Open image to get dimensions | |
| img = Image.open(image_path) | |
| width, height = img.size | |
| # Import segmentation module here to avoid circular imports | |
| from utils.segmentation import segment_and_extract_features | |
| # Extract features using advanced segmentation | |
| _, polygons = segment_and_extract_features( | |
| image_path, | |
| output_mask_path=None, | |
| feature_type=feature_type, | |
| min_area=50, | |
| simplify_tolerance=2.0, | |
| merge_distance=5.0 | |
| ) | |
| if not polygons: | |
| logging.warning("No polygons found in the image after segmentation") | |
| return {"type": "FeatureCollection", "features": []} | |
| # Use the provided original file path if available | |
| original_image_path = original_file_path | |
| # If no original file path was provided, try to find it | |
| if not original_image_path and "_processed" in image_path: | |
| original_image_path = image_path.replace("_processed", "") | |
| # Try the original image path but replace the extension with common formats | |
| if not os.path.exists(original_image_path): | |
| base_path = original_image_path.rsplit('.', 1)[0] | |
| for ext in ['.tif', '.tiff', '.jpg', '.jpeg', '.png']: | |
| if os.path.exists(base_path + ext): | |
| original_image_path = base_path + ext | |
| break | |
| logging.info(f"Using original image path: {original_image_path}") | |
| # Extract bounds from image if possible | |
| coords = None | |
| if original_image_path and os.path.exists(original_image_path): | |
| logging.info(f"Checking original image for geospatial data: {original_image_path}") | |
| coords = extract_geo_coordinates_from_image(original_image_path) | |
| if not coords: | |
| logging.info("Checking processed image for geospatial data") | |
| coords = extract_geo_coordinates_from_image(image_path) | |
| # Use extracted coordinates or defaults | |
| if coords: | |
| min_lat, min_lon, max_lat, max_lon = coords | |
| logging.info(f"Using extracted coordinates: {min_lon},{min_lat} to {max_lon},{max_lat}") | |
| else: | |
| # Try one more time with rasterio directly on the original image if it exists | |
| if original_image_path and os.path.exists(original_image_path) and original_image_path.lower().endswith(('.tif', '.tiff')): | |
| try: | |
| import rasterio | |
| from rasterio.warp import transform_bounds | |
| with rasterio.open(original_image_path) as src: | |
| if src.crs is not None: | |
| bounds = src.bounds | |
| if src.crs.to_epsg() != 4326: | |
| west, south, east, north = transform_bounds( | |
| src.crs, 'EPSG:4326', | |
| bounds.left, bounds.bottom, bounds.right, bounds.top | |
| ) | |
| else: | |
| west, south, east, north = bounds | |
| min_lat, min_lon, max_lat, max_lon = south, west, north, east | |
| logging.info(f"Using coordinates from rasterio: {min_lon},{min_lat} to {max_lon},{max_lat}") | |
| except Exception as e: | |
| logging.warning(f"Failed to extract coordinates with rasterio: {str(e)}") | |
| logging.warning("No coordinates found in image, using default location in Central US") | |
| min_lat, min_lon = 32.0, -98.0 # Central US | |
| max_lat, max_lon = 34.0, -96.0 | |
| else: | |
| logging.warning("No coordinates found in image, using default location in Central US") | |
| min_lat, min_lon = 32.0, -98.0 # Central US | |
| max_lat, max_lon = 34.0, -96.0 | |
| # Convert to GeoJSON with proper transformation | |
| geojson = convert_to_geojson_with_transform( | |
| polygons, height, width, | |
| min_lat=min_lat, min_lon=min_lon, | |
| max_lat=max_lat, max_lon=max_lon | |
| ) | |
| return geojson | |
| except Exception as e: | |
| logging.error(f"Error in GeoJSON processing: {str(e)}") | |
| return {"type": "FeatureCollection", "features": []} |