agentsay commited on
Commit
5ceeabf
·
verified ·
1 Parent(s): 31b00d1

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +211 -173
main.py CHANGED
@@ -1,174 +1,212 @@
1
- from fastapi import FastAPI, HTTPException, Response
2
- import json
3
- import random
4
- import math
5
- import pandas as pd
6
- import folium
7
- from folium.plugins import HeatMap
8
- from typing import Dict, Any, List
9
-
10
- app = FastAPI(title="GeoJSON and Heatmap API", description="API for random coordinates, worker path simulation, and heatmap HTML from PS data")
11
-
12
- # Load GeoJSON data from file
13
- def load_geojson_data(file_path: str = "synthetic_ps_points.geojson") -> Dict[str, Any]:
14
- try:
15
- with open(file_path, 'r') as file:
16
- return json.load(file)
17
- except FileNotFoundError:
18
- raise HTTPException(status_code=404, detail="GeoJSON file not found")
19
- except json.JSONDecodeError:
20
- raise HTTPException(status_code=400, detail="Invalid GeoJSON format")
21
-
22
- # Load CSV data for heatmap
23
- def load_csv_data(file_path: str = "synthetic_ps_points.csv") -> pd.DataFrame:
24
- try:
25
- return pd.read_csv(file_path)
26
- except FileNotFoundError:
27
- raise HTTPException(status_code=404, detail="CSV file not found")
28
- except pd.errors.EmptyDataError:
29
- raise HTTPException(status_code=400, detail="Invalid or empty CSV file")
30
-
31
- # Calculate Euclidean distance between two coordinates
32
- def calculate_distance(coord1: List[float], coord2: List[float]) -> float:
33
- return math.sqrt((coord2[0] - coord1[0]) ** 2 + (coord2[1] - coord1[1]) ** 2)
34
-
35
- # Find the closest feature to a given coordinate
36
- def find_closest_feature(coord: List[float], features: List[Dict]) -> Dict:
37
- min_distance = float('inf')
38
- closest_feature = None
39
- for feature in features:
40
- feature_coord = feature["geometry"]["coordinates"]
41
- distance = calculate_distance(coord, feature_coord)
42
- if distance < min_distance:
43
- min_distance = distance
44
- closest_feature = feature
45
- return closest_feature
46
-
47
- # Generate a linear path between two points
48
- def generate_path(start_coord: List[float], end_coord: List[float], num_steps: int = 11) -> List[Dict]:
49
- path = []
50
- for i in range(num_steps):
51
- t = i / (num_steps - 1) # Interpolation factor
52
- lon = start_coord[0] + t * (end_coord[0] - start_coord[0])
53
- lat = start_coord[1] + t * (end_coord[1] - start_coord[1])
54
- path.append({"step": i, "coordinates": [lon, lat]})
55
- return path
56
-
57
- # Endpoint to get random coordinates
58
- @app.get("/get-coordinates", response_model=dict)
59
- async def get_random_coordinates():
60
- data = load_geojson_data()
61
- features = data.get("features", [])
62
- if not features:
63
- raise HTTPException(status_code=400, detail="No features found in GeoJSON data")
64
-
65
- random_feature = random.choice(features)
66
- coordinates = random_feature["geometry"]["coordinates"]
67
- properties = random_feature["properties"]
68
-
69
- return {
70
- "ps_id": properties["ps_id"],
71
- "coordinates": {
72
- "longitude": coordinates[0],
73
- "latitude": coordinates[1]
74
- },
75
- "velocity_mm_yr": properties["velocity_mm_yr"],
76
- "risk": properties["risk"]
77
- }
78
-
79
- # Endpoint to simulate a worker's path from normal to high risk
80
- @app.get("/simulate-worker-path", response_model=dict)
81
- async def simulate_worker_path():
82
- data = load_geojson_data()
83
- features = data.get("features", [])
84
- if not features:
85
- raise HTTPException(status_code=400, detail="No features found in GeoJSON data")
86
-
87
- normal_risk_features = [f for f in features if f["properties"]["risk"] == "Normal"]
88
- high_risk_features = [f for f in features if f["properties"]["risk"] == "High"]
89
-
90
- if not normal_risk_features or not high_risk_features:
91
- raise HTTPException(status_code=400, detail="Insufficient normal or high risk features for path simulation")
92
-
93
- start_feature = random.choice(normal_risk_features)
94
- end_feature = random.choice(high_risk_features)
95
-
96
- start_coord = start_feature["geometry"]["coordinates"]
97
- end_coord = end_feature["geometry"]["coordinates"]
98
-
99
- path = generate_path(start_coord, end_coord, num_steps=11)
100
-
101
- path_with_risk = []
102
- for point in path:
103
- closest_feature = find_closest_feature(point["coordinates"], features)
104
- path_with_risk.append({
105
- "step": point["step"],
106
- "coordinates": {
107
- "longitude": point["coordinates"][0],
108
- "latitude": point["coordinates"][1]
109
- },
110
- "risk": closest_feature["properties"]["risk"]
111
- })
112
-
113
- return {
114
- "start": {
115
- "ps_id": start_feature["properties"]["ps_id"],
116
- "coordinates": {"longitude": start_coord[0], "latitude": start_coord[1]},
117
- "risk": start_feature["properties"]["risk"]
118
- },
119
- "end": {
120
- "ps_id": end_feature["properties"]["ps_id"],
121
- "coordinates": {"longitude": end_coord[0], "latitude": end_coord[1]},
122
- "risk": end_feature["properties"]["risk"]
123
- },
124
- "path": path_with_risk
125
- }
126
-
127
- # Endpoint to generate and return raw HTML heatmap
128
- @app.get("/heatmap", response_class=Response)
129
- async def get_heatmap():
130
- # Load CSV data
131
- ps_data = load_csv_data()
132
-
133
- # Polygon bounds for Singrauli
134
- polygon_coords = [[(82.5065, 22.3105), (82.628, 22.3105), (82.628, 22.3421), (82.5065, 22.3421), (82.5065, 22.3105)]]
135
-
136
- # Center for map
137
- center_lat = (22.3105 + 22.3421) / 2
138
- center_lon = (82.5065 + 82.628) / 2
139
-
140
- # Create base map
141
- m = folium.Map(location=[center_lat, center_lon], zoom_start=12, tiles="OpenStreetMap")
142
-
143
- # Heatmap using velocity
144
- heat_data = [[row['lat'], row['lon'], abs(row['velocity_mm_yr'])] for _, row in ps_data.iterrows()]
145
- HeatMap(heat_data, radius=15, gradient={0.2: 'blue', 0.4: 'green', 0.6: 'yellow', 1: 'red'}).add_to(m)
146
-
147
- # Add polygon boundary
148
- folium.Polygon(
149
- locations=[(lat, lon) for lon, lat in polygon_coords[0]],
150
- color="white",
151
- fill=False,
152
- weight=2
153
- ).add_to(m)
154
-
155
- # Get HTML content
156
- html_content = m.get_root().render()
157
-
158
- return Response(content=html_content, media_type="text/html")
159
-
160
- # Root endpoint for API info
161
- @app.get("/")
162
- async def root():
163
- return {
164
- "message": "Welcome to the GeoJSON and Heatmap API",
165
- "endpoints": {
166
- "/get-coordinates": "Returns a random coordinate pair with associated properties",
167
- "/simulate-worker-path": "Simulates a worker's path from a normal risk to a high risk zone",
168
- "/heatmap": "Returns raw HTML for a Folium heatmap of PS data"
169
- }
170
- }
171
-
172
- if __name__ == "__main__":
173
- import uvicorn
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
174
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
+ from fastapi import FastAPI, HTTPException, Response
2
+ import json
3
+ import random
4
+ import math
5
+ import pandas as pd
6
+ import folium
7
+ from folium.plugins import HeatMap
8
+ from typing import Dict, Any, List
9
+
10
+ app = FastAPI(title="GeoJSON and Heatmap API", description="API for random coordinates, worker path simulation, and heatmap HTML from PS data")
11
+
12
+ # Load GeoJSON data from file
13
+ def load_geojson_data(file_path: str = "synthetic_ps_points.geojson") -> Dict[str, Any]:
14
+ try:
15
+ with open(file_path, 'r') as file:
16
+ return json.load(file)
17
+ except FileNotFoundError:
18
+ raise HTTPException(status_code=404, detail="GeoJSON file not found")
19
+ except json.JSONDecodeError:
20
+ raise HTTPException(status_code=400, detail="Invalid GeoJSON format")
21
+
22
+ # Load CSV data for heatmap
23
+ def load_csv_data(file_path: str = "synthetic_ps_points.csv") -> pd.DataFrame:
24
+ try:
25
+ return pd.read_csv(file_path)
26
+ except FileNotFoundError:
27
+ raise HTTPException(status_code=404, detail="CSV file not found")
28
+ except pd.errors.EmptyDataError:
29
+ raise HTTPException(status_code=400, detail="Invalid or empty CSV file")
30
+
31
+ # Calculate Euclidean distance between two coordinates
32
+ def calculate_distance(coord1: List[float], coord2: List[float]) -> float:
33
+ return math.sqrt((coord2[0] - coord1[0]) ** 2 + (coord2[1] - coord1[1]) ** 2)
34
+
35
+ # Find the closest feature to a given coordinate
36
+ def find_closest_feature(coord: List[float], features: List[Dict]) -> Dict:
37
+ min_distance = float('inf')
38
+ closest_feature = None
39
+ for feature in features:
40
+ feature_coord = feature["geometry"]["coordinates"]
41
+ distance = calculate_distance(coord, feature_coord)
42
+ if distance < min_distance:
43
+ min_distance = distance
44
+ closest_feature = feature
45
+ return closest_feature
46
+
47
+ # Generate a linear path between two points with more steps for better separation
48
+ def generate_path(start_coord: List[float], end_coord: List[float], num_steps: int = 20) -> List[Dict]:
49
+ path = []
50
+ for i in range(num_steps):
51
+ t = i / (num_steps - 1) # Interpolation factor
52
+ lon = start_coord[0] + t * (end_coord[0] - start_coord[0])
53
+ lat = start_coord[1] + t * (end_coord[1] - start_coord[1])
54
+ path.append({"step": i, "coordinates": [lon, lat]})
55
+ return path
56
+
57
+ # Endpoint to get multiple random coordinates with minimum separation
58
+ @app.get("/get-coordinates", response_model=List[Dict])
59
+ async def get_random_coordinates(num_points: int = 3, min_distance: float = 0.01):
60
+ """
61
+ Returns a list of random coordinates with associated properties, ensuring a minimum distance between them.
62
+
63
+ Parameters:
64
+ - num_points: Number of coordinates to return (default: 3)
65
+ - min_distance: Minimum distance between coordinates in degrees (default: 0.01)
66
+ """
67
+ data = load_geojson_data()
68
+ features = data.get("features", [])
69
+ if not features:
70
+ raise HTTPException(status_code=400, detail="No features found in GeoJSON data")
71
+
72
+ if num_points > len(features):
73
+ raise HTTPException(status_code=400, detail=f"Requested {num_points} points, but only {len(features)} available")
74
+
75
+ selected_features = []
76
+ attempts = 0
77
+ max_attempts = 100 # Prevent infinite loops
78
+
79
+ while len(selected_features) < num_points and attempts < max_attempts:
80
+ random_feature = random.choice(features)
81
+
82
+ # Check if the feature is sufficiently far from all selected features
83
+ is_valid = True
84
+ for selected in selected_features:
85
+ distance = calculate_distance(
86
+ random_feature["geometry"]["coordinates"],
87
+ selected["geometry"]["coordinates"]
88
+ )
89
+ if distance < min_distance:
90
+ is_valid = False
91
+ break
92
+
93
+ if is_valid:
94
+ selected_features.append(random_feature)
95
+
96
+ attempts += 1
97
+
98
+ if len(selected_features) < num_points:
99
+ raise HTTPException(status_code=400, detail="Could not find enough points with specified minimum distance")
100
+
101
+ # Format the response
102
+ result = [
103
+ {
104
+ "ps_id": feature["properties"]["ps_id"],
105
+ "coordinates": {
106
+ "longitude": feature["geometry"]["coordinates"][0],
107
+ "latitude": feature["geometry"]["coordinates"][1]
108
+ },
109
+ "velocity_mm_yr": feature["properties"]["velocity_mm_yr"],
110
+ "risk": feature["properties"]["risk"]
111
+ }
112
+ for feature in selected_features
113
+ ]
114
+
115
+ return result
116
+
117
+ # Endpoint to simulate a worker's path from normal to high risk
118
+ @app.get("/simulate-worker-path", response_model=dict)
119
+ async def simulate_worker_path():
120
+ data = load_geojson_data()
121
+ features = data.get("features", [])
122
+ if not features:
123
+ raise HTTPException(status_code=400, detail="No features found in GeoJSON data")
124
+
125
+ normal_risk_features = [f for f in features if f["properties"]["risk"] == "Normal"]
126
+ high_risk_features = [f for f in features if f["properties"]["risk"] == "High"]
127
+
128
+ if not normal_risk_features or not high_risk_features:
129
+ raise HTTPException(status_code=400, detail="Insufficient normal or high risk features for path simulation")
130
+
131
+ start_feature = random.choice(normal_risk_features)
132
+ end_feature = random.choice(high_risk_features)
133
+
134
+ start_coord = start_feature["geometry"]["coordinates"]
135
+ end_coord = end_feature["geometry"]["coordinates"]
136
+
137
+ path = generate_path(start_coord, end_coord, num_steps=20)
138
+
139
+ path_with_risk = []
140
+ for point in path:
141
+ closest_feature = find_closest_feature(point["coordinates"], features)
142
+ path_with_risk.append({
143
+ "step": point["step"],
144
+ "coordinates": {
145
+ "longitude": point["coordinates"][0],
146
+ "latitude": point["coordinates"][1]
147
+ },
148
+ "risk": closest_feature["properties"]["risk"]
149
+ })
150
+
151
+ return {
152
+ "start": {
153
+ "ps_id": start_feature["properties"]["ps_id"],
154
+ "coordinates": {"longitude": start_coord[0], "latitude": start_coord[1]},
155
+ "risk": start_feature["properties"]["risk"]
156
+ },
157
+ "end": {
158
+ "ps_id": end_feature["properties"]["ps_id"],
159
+ "coordinates": {"longitude": end_coord[0], "latitude": end_coord[1]},
160
+ "risk": end_feature["properties"]["risk"]
161
+ },
162
+ "path": path_with_risk
163
+ }
164
+
165
+ # Endpoint to generate and return raw HTML heatmap
166
+ @app.get("/heatmap", response_class=Response)
167
+ async def get_heatmap():
168
+ # Load CSV data
169
+ ps_data = load_csv_data()
170
+
171
+ # Polygon bounds for Singrauli
172
+ polygon_coords = [[(82.5065, 22.3105), (82.628, 22.3105), (82.628, 22.3421), (82.5065, 22.3421), (82.5065, 22.3105)]]
173
+
174
+ # Center for map
175
+ center_lat = (22.3105 + 22.3421) / 2
176
+ center_lon = (82.5065 + 82.628) / 2
177
+
178
+ # Create base map
179
+ m = folium.Map(location=[center_lat, center_lon], zoom_start=12, tiles="OpenStreetMap")
180
+
181
+ # Heatmap using velocity
182
+ heat_data = [[row['lat'], row['lon'], abs(row['velocity_mm_yr'])] for _, row in ps_data.iterrows()]
183
+ HeatMap(heat_data, radius=15, gradient={0.2: 'blue', 0.4: 'green', 0.6: 'yellow', 1: 'red'}).add_to(m)
184
+
185
+ # Add polygon boundary
186
+ folium.Polygon(
187
+ locations=[(lat, lon) for lon, lat in polygon_coords[0]],
188
+ color="white",
189
+ fill=False,
190
+ weight=2
191
+ ).add_to(m)
192
+
193
+ # Get HTML content
194
+ html_content = m.get_root().render()
195
+
196
+ return Response(content=html_content, media_type="text/html")
197
+
198
+ # Root endpoint for API info
199
+ @app.get("/")
200
+ async def root():
201
+ return {
202
+ "message": "Welcome to the GeoJSON and Heatmap API",
203
+ "endpoints": {
204
+ "/get-coordinates": "Returns a list of random coordinates with associated properties, ensuring minimum separation",
205
+ "/simulate-worker-path": "Simulates a worker's path from a normal risk to a high risk zone",
206
+ "/heatmap": "Returns raw HTML for a Folium heatmap of PS data"
207
+ }
208
+ }
209
+
210
+ if __name__ == "__main__":
211
+ import uvicorn
212
  uvicorn.run(app, host="0.0.0.0", port=7860)