from fastapi import FastAPI, HTTPException, Response import json import random import math import pandas as pd import folium from folium.plugins import HeatMap from typing import Dict, Any, List app = FastAPI(title="GeoJSON and Heatmap API", description="API for random coordinates, worker path simulation, and heatmap HTML from PS data") # Global variable to store the last selected coordinate last_coordinate: List[float] = None # Polygon bounds for Singrauli POLYGON_BOUNDS = [(82.5065, 22.3105), (82.628, 22.3105), (82.628, 22.3421), (82.5065, 22.3421)] # Load GeoJSON data from file def load_geojson_data(file_path: str = "synthetic_ps_points.geojson") -> Dict[str, Any]: try: with open(file_path, 'r') as file: return json.load(file) except FileNotFoundError: raise HTTPException(status_code=404, detail="GeoJSON file not found") except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid GeoJSON format") # Load CSV data for heatmap def load_csv_data(file_path: str = "synthetic_ps_points.csv") -> pd.DataFrame: try: return pd.read_csv(file_path) except FileNotFoundError: raise HTTPException(status_code=404, detail="CSV file not found") except pd.errors.EmptyDataError: raise HTTPException(status_code=400, detail="Invalid or empty CSV file") # Calculate Euclidean distance between two coordinates def calculate_distance(coord1: List[float], coord2: List[float]) -> float: return math.sqrt((coord2[0] - coord1[0]) ** 2 + (coord2[1] - coord1[1]) ** 2) # Point-in-polygon check using ray-casting algorithm def is_point_in_polygon(point: List[float], polygon: List[tuple]) -> bool: x, y = point[0], point[1] n = len(polygon) inside = False j = n - 1 for i in range(n): if ((polygon[i][1] > y) != (polygon[j][1] > y)) and \ (x < (polygon[j][0] - polygon[i][0]) * (y - polygon[i][1]) / (polygon[j][1] - polygon[i][1]) + polygon[i][0]): inside = not inside j = i return inside # Find the closest feature to a given coordinate def find_closest_feature(coord: List[float], features: List[Dict]) -> Dict: min_distance = float('inf') closest_feature = None for feature in features: feature_coord = feature["geometry"]["coordinates"] distance = calculate_distance(coord, feature_coord) if distance < min_distance: min_distance = distance closest_feature = feature return closest_feature # Generate a linear path between two points with more steps for better separation def generate_path(start_coord: List[float], end_coord: List[float], num_steps: int = 20) -> List[Dict]: path = [] for i in range(num_steps): t = i / (num_steps - 1) # Interpolation factor lon = start_coord[0] + t * (end_coord[0] - start_coord[0]) lat = start_coord[1] + t * (end_coord[1] - start_coord[1]) path.append({"step": i, "coordinates": [lon, lat]}) return path # Endpoint to get a single random coordinate, ensuring wide separation from the last coordinate @app.get("/get-coordinates", response_model=dict) async def get_random_coordinates(min_distance: float = 0.05): """ Returns a single random coordinate within the Singrauli polygon, ensuring a minimum distance from the last selected coordinate. Parameters: - min_distance: Minimum distance from the last coordinate in degrees (default: 0.05, ~5.5 km) """ global last_coordinate data = load_geojson_data() features = data.get("features", []) if not features: raise HTTPException(status_code=400, detail="No features found in GeoJSON data") # Filter features within the Singrauli polygon valid_features = [f for f in features if is_point_in_polygon(f["geometry"]["coordinates"], POLYGON_BOUNDS)] if not valid_features: raise HTTPException(status_code=400, detail="No features found within the Singrauli polygon") selected_feature = None attempts = 0 max_attempts = 200 # Increased to handle sparse valid selections while attempts < max_attempts: random_feature = random.choice(valid_features) random_coord = random_feature["geometry"]["coordinates"] # Check distance from last coordinate (if it exists) is_valid = True if last_coordinate is not None: distance = calculate_distance(random_coord, last_coordinate) if distance < min_distance: is_valid = False if is_valid: selected_feature = random_feature last_coordinate = random_coord # Update last coordinate break attempts += 1 if selected_feature is None: raise HTTPException(status_code=400, detail="Could not find a point with specified minimum distance from the last coordinate") coordinates = selected_feature["geometry"]["coordinates"] properties = selected_feature["properties"] return { "ps_id": properties["ps_id"], "coordinates": { "longitude": coordinates[0], "latitude": coordinates[1] }, "velocity_mm_yr": properties["velocity_mm_yr"], "risk": properties["risk"] } # Endpoint to simulate a worker's path from normal to high risk @app.get("/simulate-worker-path", response_model=dict) async def simulate_worker_path(): data = load_geojson_data() features = data.get("features", []) if not features: raise HTTPException(status_code=400, detail="No features found in GeoJSON data") normal_risk_features = [f for f in features if f["properties"]["risk"] == "Normal"] high_risk_features = [f for f in features if f["properties"]["risk"] == "High"] if not normal_risk_features or not high_risk_features: raise HTTPException(status_code=400, detail="Insufficient normal or high risk features for path simulation") start_feature = random.choice(normal_risk_features) end_feature = random.choice(high_risk_features) start_coord = start_feature["geometry"]["coordinates"] end_coord = end_feature["geometry"]["coordinates"] path = generate_path(start_coord, end_coord, num_steps=20) path_with_risk = [] for point in path: closest_feature = find_closest_feature(point["coordinates"], features) path_with_risk.append({ "step": point["step"], "coordinates": { "longitude": point["coordinates"][0], "latitude": point["coordinates"][1] }, "risk": closest_feature["properties"]["risk"] }) return { "start": { "ps_id": start_feature["properties"]["ps_id"], "coordinates": {"longitude": start_coord[0], "latitude": start_coord[1]}, "risk": start_feature["properties"]["risk"] }, "end": { "ps_id": end_feature["properties"]["ps_id"], "coordinates": {"longitude": end_coord[0], "latitude": end_coord[1]}, "risk": end_feature["properties"]["risk"] }, "path": path_with_risk } # Endpoint to generate and return raw HTML heatmap @app.get("/heatmap", response_class=Response) async def get_heatmap(): # Load CSV data ps_data = load_csv_data() # Polygon bounds for Singrauli polygon_coords = [[(82.5065, 22.3105), (82.628, 22.3105), (82.628, 22.3421), (82.5065, 22.3421), (82.5065, 22.3105)]] # Center for map center_lat = (22.3105 + 22.3421) / 2 center_lon = (82.5065 + 82.628) / 2 # Create base map m = folium.Map(location=[center_lat, center_lon], zoom_start=12, tiles="OpenStreetMap") # Heatmap using velocity heat_data = [[row['lat'], row['lon'], abs(row['velocity_mm_yr'])] for _, row in ps_data.iterrows()] HeatMap(heat_data, radius=15, gradient={0.2: 'blue', 0.4: 'green', 0.6: 'yellow', 1: 'red'}).add_to(m) # Add polygon boundary folium.Polygon( locations=[(lat, lon) for lon, lat in polygon_coords[0]], color="white", fill=False, weight=2 ).add_to(m) # Get HTML content html_content = m.get_root().render() return Response(content=html_content, media_type="text/html") # Root endpoint for API info @app.get("/") async def root(): return { "message": "Welcome to the GeoJSON and Heatmap API", "endpoints": { "/get-coordinates": "Returns a single random coordinate within the Singrauli polygon, widely spaced from the last coordinate", "/simulate-worker-path": "Simulates a worker's path from a normal risk to a high risk zone", "/heatmap": "Returns raw HTML for a Folium heatmap of PS data" } } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)