Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Response | |
| import json | |
| import random | |
| import math | |
| import pandas as pd | |
| import folium | |
| from folium.plugins import HeatMap | |
| from typing import Dict, Any, List | |
| app = FastAPI(title="GeoJSON and Heatmap API", description="API for random coordinates, worker path simulation, and heatmap HTML from PS data") | |
| # Global variable to store the last selected coordinate | |
| last_coordinate: List[float] = None | |
| # Polygon bounds for Singrauli | |
| POLYGON_BOUNDS = [(82.5065, 22.3105), (82.628, 22.3105), (82.628, 22.3421), (82.5065, 22.3421)] | |
| # Load GeoJSON data from file | |
| def load_geojson_data(file_path: str = "synthetic_ps_points.geojson") -> Dict[str, Any]: | |
| try: | |
| with open(file_path, 'r') as file: | |
| return json.load(file) | |
| except FileNotFoundError: | |
| raise HTTPException(status_code=404, detail="GeoJSON file not found") | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=400, detail="Invalid GeoJSON format") | |
| # Load CSV data for heatmap | |
| def load_csv_data(file_path: str = "synthetic_ps_points.csv") -> pd.DataFrame: | |
| try: | |
| return pd.read_csv(file_path) | |
| except FileNotFoundError: | |
| raise HTTPException(status_code=404, detail="CSV file not found") | |
| except pd.errors.EmptyDataError: | |
| raise HTTPException(status_code=400, detail="Invalid or empty CSV file") | |
| # Calculate Euclidean distance between two coordinates | |
| def calculate_distance(coord1: List[float], coord2: List[float]) -> float: | |
| return math.sqrt((coord2[0] - coord1[0]) ** 2 + (coord2[1] - coord1[1]) ** 2) | |
| # Point-in-polygon check using ray-casting algorithm | |
| def is_point_in_polygon(point: List[float], polygon: List[tuple]) -> bool: | |
| x, y = point[0], point[1] | |
| n = len(polygon) | |
| inside = False | |
| j = n - 1 | |
| for i in range(n): | |
| if ((polygon[i][1] > y) != (polygon[j][1] > y)) and \ | |
| (x < (polygon[j][0] - polygon[i][0]) * (y - polygon[i][1]) / (polygon[j][1] - polygon[i][1]) + polygon[i][0]): | |
| inside = not inside | |
| j = i | |
| return inside | |
| # Find the closest feature to a given coordinate | |
| def find_closest_feature(coord: List[float], features: List[Dict]) -> Dict: | |
| min_distance = float('inf') | |
| closest_feature = None | |
| for feature in features: | |
| feature_coord = feature["geometry"]["coordinates"] | |
| distance = calculate_distance(coord, feature_coord) | |
| if distance < min_distance: | |
| min_distance = distance | |
| closest_feature = feature | |
| return closest_feature | |
| # Generate a linear path between two points with more steps for better separation | |
| def generate_path(start_coord: List[float], end_coord: List[float], num_steps: int = 20) -> List[Dict]: | |
| path = [] | |
| for i in range(num_steps): | |
| t = i / (num_steps - 1) # Interpolation factor | |
| lon = start_coord[0] + t * (end_coord[0] - start_coord[0]) | |
| lat = start_coord[1] + t * (end_coord[1] - start_coord[1]) | |
| path.append({"step": i, "coordinates": [lon, lat]}) | |
| return path | |
| # Endpoint to get a single random coordinate, ensuring wide separation from the last coordinate | |
| async def get_random_coordinates(min_distance: float = 0.05): | |
| """ | |
| Returns a single random coordinate within the Singrauli polygon, ensuring a minimum distance from the last selected coordinate. | |
| Parameters: | |
| - min_distance: Minimum distance from the last coordinate in degrees (default: 0.05, ~5.5 km) | |
| """ | |
| global last_coordinate | |
| data = load_geojson_data() | |
| features = data.get("features", []) | |
| if not features: | |
| raise HTTPException(status_code=400, detail="No features found in GeoJSON data") | |
| # Filter features within the Singrauli polygon | |
| valid_features = [f for f in features if is_point_in_polygon(f["geometry"]["coordinates"], POLYGON_BOUNDS)] | |
| if not valid_features: | |
| raise HTTPException(status_code=400, detail="No features found within the Singrauli polygon") | |
| selected_feature = None | |
| attempts = 0 | |
| max_attempts = 200 # Increased to handle sparse valid selections | |
| while attempts < max_attempts: | |
| random_feature = random.choice(valid_features) | |
| random_coord = random_feature["geometry"]["coordinates"] | |
| # Check distance from last coordinate (if it exists) | |
| is_valid = True | |
| if last_coordinate is not None: | |
| distance = calculate_distance(random_coord, last_coordinate) | |
| if distance < min_distance: | |
| is_valid = False | |
| if is_valid: | |
| selected_feature = random_feature | |
| last_coordinate = random_coord # Update last coordinate | |
| break | |
| attempts += 1 | |
| if selected_feature is None: | |
| raise HTTPException(status_code=400, detail="Could not find a point with specified minimum distance from the last coordinate") | |
| coordinates = selected_feature["geometry"]["coordinates"] | |
| properties = selected_feature["properties"] | |
| return { | |
| "ps_id": properties["ps_id"], | |
| "coordinates": { | |
| "longitude": coordinates[0], | |
| "latitude": coordinates[1] | |
| }, | |
| "velocity_mm_yr": properties["velocity_mm_yr"], | |
| "risk": properties["risk"] | |
| } | |
| # Endpoint to simulate a worker's path from normal to high risk | |
| async def simulate_worker_path(): | |
| data = load_geojson_data() | |
| features = data.get("features", []) | |
| if not features: | |
| raise HTTPException(status_code=400, detail="No features found in GeoJSON data") | |
| normal_risk_features = [f for f in features if f["properties"]["risk"] == "Normal"] | |
| high_risk_features = [f for f in features if f["properties"]["risk"] == "High"] | |
| if not normal_risk_features or not high_risk_features: | |
| raise HTTPException(status_code=400, detail="Insufficient normal or high risk features for path simulation") | |
| start_feature = random.choice(normal_risk_features) | |
| end_feature = random.choice(high_risk_features) | |
| start_coord = start_feature["geometry"]["coordinates"] | |
| end_coord = end_feature["geometry"]["coordinates"] | |
| path = generate_path(start_coord, end_coord, num_steps=20) | |
| path_with_risk = [] | |
| for point in path: | |
| closest_feature = find_closest_feature(point["coordinates"], features) | |
| path_with_risk.append({ | |
| "step": point["step"], | |
| "coordinates": { | |
| "longitude": point["coordinates"][0], | |
| "latitude": point["coordinates"][1] | |
| }, | |
| "risk": closest_feature["properties"]["risk"] | |
| }) | |
| return { | |
| "start": { | |
| "ps_id": start_feature["properties"]["ps_id"], | |
| "coordinates": {"longitude": start_coord[0], "latitude": start_coord[1]}, | |
| "risk": start_feature["properties"]["risk"] | |
| }, | |
| "end": { | |
| "ps_id": end_feature["properties"]["ps_id"], | |
| "coordinates": {"longitude": end_coord[0], "latitude": end_coord[1]}, | |
| "risk": end_feature["properties"]["risk"] | |
| }, | |
| "path": path_with_risk | |
| } | |
| # Endpoint to generate and return raw HTML heatmap | |
| async def get_heatmap(): | |
| # Load CSV data | |
| ps_data = load_csv_data() | |
| # Polygon bounds for Singrauli | |
| polygon_coords = [[(82.5065, 22.3105), (82.628, 22.3105), (82.628, 22.3421), (82.5065, 22.3421), (82.5065, 22.3105)]] | |
| # Center for map | |
| center_lat = (22.3105 + 22.3421) / 2 | |
| center_lon = (82.5065 + 82.628) / 2 | |
| # Create base map | |
| m = folium.Map(location=[center_lat, center_lon], zoom_start=12, tiles="OpenStreetMap") | |
| # Heatmap using velocity | |
| heat_data = [[row['lat'], row['lon'], abs(row['velocity_mm_yr'])] for _, row in ps_data.iterrows()] | |
| HeatMap(heat_data, radius=15, gradient={0.2: 'blue', 0.4: 'green', 0.6: 'yellow', 1: 'red'}).add_to(m) | |
| # Add polygon boundary | |
| folium.Polygon( | |
| locations=[(lat, lon) for lon, lat in polygon_coords[0]], | |
| color="white", | |
| fill=False, | |
| weight=2 | |
| ).add_to(m) | |
| # Get HTML content | |
| html_content = m.get_root().render() | |
| return Response(content=html_content, media_type="text/html") | |
| # Root endpoint for API info | |
| async def root(): | |
| return { | |
| "message": "Welcome to the GeoJSON and Heatmap API", | |
| "endpoints": { | |
| "/get-coordinates": "Returns a single random coordinate within the Singrauli polygon, widely spaced from the last coordinate", | |
| "/simulate-worker-path": "Simulates a worker's path from a normal risk to a high risk zone", | |
| "/heatmap": "Returns raw HTML for a Folium heatmap of PS data" | |
| } | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |