KATHIR2006 commited on
Commit
bcd9f7f
·
verified ·
1 Parent(s): 20ad03e

Upload backend/ml/zone_detector.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. backend/ml/zone_detector.py +244 -0
backend/ml/zone_detector.py ADDED
@@ -0,0 +1,244 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Real Prohibited Zone Detection with Shapely Polygon Support
3
+ Integrates with 159 Indian fishing prohibited zones dataset
4
+ """
5
+
6
+ import json
7
+ import os
8
+ from shapely.geometry import Point, Polygon
9
+ import numpy as np
10
+ from typing import Dict, List, Tuple, Optional
11
+
12
+ class ZoneDetector:
13
+ def __init__(self, zones_file='../../datasets/indian_fishing_prohibited_zones.json'):
14
+ """Initialize zone detector with prohibited zones data"""
15
+ # Load zones from JSON file
16
+ zones_path = os.path.join(os.path.dirname(__file__), zones_file)
17
+ with open(zones_path, 'r') as f:
18
+ self.zones = json.load(f)
19
+
20
+ # Create Shapely polygons for complex zones
21
+ self.zone_polygons = {}
22
+ for zone in self.zones:
23
+ if zone.get('polygon'):
24
+ # Polygon coordinates in format [[lng, lat], ...]
25
+ coords = [(p[1], p[0]) for p in zone['polygon']] # Swap to (lat, lng)
26
+ self.zone_polygons[zone['id']] = Polygon(coords)
27
+
28
+ print(f"✅ Loaded {len(self.zones)} prohibited zones")
29
+ print(f" 📐 {len(self.zone_polygons)} zones with polygon boundaries")
30
+
31
+ def check_zone_violation(self, lat: float, lng: float) -> Dict:
32
+ """Check if vessel is in prohibited zone"""
33
+ vessel_point = Point(lat, lng) # Shapely Point (lat, lng)
34
+
35
+ violations = []
36
+ warnings = []
37
+
38
+ for zone in self.zones:
39
+ zone_id = zone['zone_id']
40
+
41
+ # Check polygon-based zones first (most accurate)
42
+ if zone_id in self.zone_polygons:
43
+ polygon = self.zone_polygons[zone_id]
44
+ if polygon.contains(vessel_point):
45
+ violations.append({
46
+ 'zone_id': zone_id,
47
+ 'zone_name': zone['name'],
48
+ 'zone_type': zone.get('zone_type', 'prohibited'),
49
+ 'enforcement_agency': zone.get('enforcement_agency', 'Unknown'),
50
+ 'penalty': zone.get('penalty', 'Legal action'),
51
+ 'distance': 0, # Inside zone
52
+ 'severity': 'critical'
53
+ })
54
+ continue
55
+
56
+ # Fallback to radius-based detection
57
+ if zone.get('radius_km'):
58
+ distance = self.haversine_distance(
59
+ lat, lng,
60
+ zone['latitude'], zone['longitude']
61
+ )
62
+
63
+ # Inside prohibited zone
64
+ if distance <= zone['radius_km']:
65
+ violations.append({
66
+ 'zone_id': zone_id,
67
+ 'zone_name': zone['name'],
68
+ 'zone_type': zone.get('zone_type', 'prohibited'),
69
+ 'enforcement_agency': zone.get('enforcement_agency', 'Unknown'),
70
+ 'penalty': zone.get('penalty', 'Legal action'),
71
+ 'distance': distance,
72
+ 'severity': 'critical'
73
+ })
74
+ # Near zone (within 5km buffer)
75
+ elif distance <= zone['radius_km'] + 5:
76
+ warnings.append({
77
+ 'zone_id': zone_id,
78
+ 'zone_name': zone['name'],
79
+ 'distance_to_zone': distance - zone['radius_km'],
80
+ 'severity': 'warning'
81
+ })
82
+
83
+ return {
84
+ 'violations': violations,
85
+ 'warnings': warnings,
86
+ 'isViolating': len(violations) > 0,
87
+ 'isNearZone': len(warnings) > 0
88
+ }
89
+
90
+ def predict_trajectory_violation(
91
+ self,
92
+ lat: float,
93
+ lng: float,
94
+ heading: float,
95
+ speed_knots: float,
96
+ time_horizon_minutes: int = 15
97
+ ) -> Dict:
98
+ """Predict if vessel will enter prohibited zone based on current trajectory"""
99
+ # Convert speed to km/h
100
+ speed_kmh = speed_knots * 1.852
101
+
102
+ # Project position ahead
103
+ time_delta = time_horizon_minutes / 60 # hours
104
+ distance_km = speed_kmh * time_delta
105
+
106
+ # Calculate new position based on heading
107
+ new_lat, new_lng = self.project_position(lat, lng, heading, distance_km)
108
+
109
+ # Check if projected position violates zones
110
+ future_check = self.check_zone_violation(new_lat, new_lng)
111
+
112
+ if future_check['violations']:
113
+ # Calculate actual time to violation (more accurate)
114
+ target_zone = future_check['violations'][0]
115
+ time_to_violation = self.calculate_time_to_zone(
116
+ lat, lng, heading, speed_kmh,
117
+ target_zone['zone_id']
118
+ )
119
+
120
+ return {
121
+ 'willViolate': True,
122
+ 'timeToViolation': time_to_violation, # minutes
123
+ 'targetZone': target_zone,
124
+ 'projectedPosition': {
125
+ 'lat': new_lat,
126
+ 'lng': new_lng,
127
+ 'timeAhead': time_horizon_minutes
128
+ },
129
+ 'severity': 'high',
130
+ 'confidence': self.calculate_trajectory_confidence(speed_kmh, heading)
131
+ }
132
+
133
+ return {'willViolate': False, 'confidence': 0.0}
134
+
135
+ def calculate_time_to_zone(
136
+ self,
137
+ lat: float,
138
+ lng: float,
139
+ heading: float,
140
+ speed_kmh: float,
141
+ zone_id: str
142
+ ) -> Optional[float]:
143
+ """Calculate estimated time (in minutes) until vessel enters zone"""
144
+ zone = next((z for z in self.zones if z['zone_id'] == zone_id), None)
145
+ if not zone:
146
+ return None
147
+
148
+ # Distance to zone center
149
+ distance_km = self.haversine_distance(lat, lng, zone['latitude'], zone['longitude'])
150
+
151
+ # Subtract zone radius
152
+ distance_to_boundary = distance_km - zone.get('radius_km', 0)
153
+
154
+ if distance_to_boundary <= 0 or speed_kmh == 0:
155
+ return 0 # Already inside or not moving
156
+
157
+ # Time = distance / speed (in hours) * 60 (to minutes)
158
+ time_hours = distance_to_boundary / speed_kmh
159
+ return time_hours * 60
160
+
161
+ def calculate_trajectory_confidence(self, speed_kmh: float, heading: float) -> float:
162
+ """Calculate confidence score for trajectory prediction (0-1)"""
163
+ # Higher speed = more reliable trajectory prediction
164
+ speed_confidence = min(speed_kmh / 30.0, 1.0) # Max confidence at 30 km/h
165
+
166
+ # Valid heading check
167
+ heading_confidence = 1.0 if 0 <= heading <= 360 else 0.5
168
+
169
+ # Combined confidence
170
+ return (speed_confidence + heading_confidence) / 2
171
+
172
+ def project_position(
173
+ self,
174
+ lat: float,
175
+ lng: float,
176
+ heading: float,
177
+ distance_km: float
178
+ ) -> Tuple[float, float]:
179
+ """Project position based on heading and distance (Great Circle calculation)"""
180
+ R = 6371 # Earth radius in km
181
+
182
+ bearing = np.radians(heading)
183
+ lat_rad = np.radians(lat)
184
+ lng_rad = np.radians(lng)
185
+
186
+ # Haversine formula for new position
187
+ new_lat_rad = np.arcsin(
188
+ np.sin(lat_rad) * np.cos(distance_km / R) +
189
+ np.cos(lat_rad) * np.sin(distance_km / R) * np.cos(bearing)
190
+ )
191
+
192
+ new_lng_rad = lng_rad + np.arctan2(
193
+ np.sin(bearing) * np.sin(distance_km / R) * np.cos(lat_rad),
194
+ np.cos(distance_km / R) - np.sin(lat_rad) * np.sin(new_lat_rad)
195
+ )
196
+
197
+ return np.degrees(new_lat_rad), np.degrees(new_lng_rad)
198
+
199
+ def haversine_distance(
200
+ self,
201
+ lat1: float,
202
+ lon1: float,
203
+ lat2: float,
204
+ lon2: float
205
+ ) -> float:
206
+ """Calculate distance between two points in km"""
207
+ R = 6371 # Earth radius in km
208
+ dlat = np.radians(lat2 - lat1)
209
+ dlon = np.radians(lon2 - lon1)
210
+ a = (np.sin(dlat/2)**2 +
211
+ np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin(dlon/2)**2)
212
+ c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
213
+ return R * c
214
+
215
+ def get_zone_by_id(self, zone_id: str) -> Optional[Dict]:
216
+ """Get zone details by ID"""
217
+ return next((z for z in self.zones if z['zone_id'] == zone_id), None)
218
+
219
+ def get_all_zones(self) -> List[Dict]:
220
+ """Get all prohibited zones"""
221
+ return self.zones
222
+
223
+ def get_zones_in_area(
224
+ self,
225
+ center_lat: float,
226
+ center_lng: float,
227
+ radius_km: float
228
+ ) -> List[Dict]:
229
+ """Get all zones within specified radius of a point"""
230
+ nearby_zones = []
231
+
232
+ for zone in self.zones:
233
+ distance = self.haversine_distance(
234
+ center_lat, center_lng,
235
+ zone['latitude'], zone['longitude']
236
+ )
237
+
238
+ if distance <= radius_km:
239
+ nearby_zones.append({
240
+ **zone,
241
+ 'distance_from_center': distance
242
+ })
243
+
244
+ return sorted(nearby_zones, key=lambda x: x['distance_from_center'])