acc_api / api /services /analyzer_service.py
Adit1Sharma's picture
Initial commit: FastAPI Incident Analyzer for HF Spaces
8e6f164
Raw
History Blame Contribute Delete
5.8 kB
class IncidentAnalyzer:
@staticmethod
def classify_incident(labels: set[str]) -> str:
"""
Determines the specific incident classification type based on the detected labels.
"""
vehicles = {
"car",
"truck",
"bus",
"motorcycle",
"damaged vehicle"
}
vehicle_count = len(labels.intersection(vehicles))
# ==========================
# FIRE RELATED
# ==========================
if "fire" in labels and "building" in labels:
return "building_fire"
if "fire" in labels and vehicle_count > 0:
return "vehicle_fire"
if "fire" in labels:
return "fire_incident"
if "smoke" in labels and "building" in labels:
return "possible_building_fire"
if "smoke" in labels:
return "smoke_hazard"
# ==========================
# FLOOD RELATED
# ==========================
if "water" in labels and "road" in labels:
return "road_flooding"
if "water" in labels and "building" in labels:
return "urban_flooding"
if "water" in labels and "tree" in labels:
return "storm_damage"
if "water" in labels:
return "water_hazard"
# ==========================
# BUILDING DAMAGE
# ==========================
if "collapsed structure" in labels and "person" in labels:
return "major_building_collapse"
if "collapsed structure" in labels:
return "building_collapse"
# ==========================
# ROAD ACCIDENTS
# ==========================
if "damaged vehicle" in labels and "ambulance" in labels:
return "critical_road_accident"
if vehicle_count >= 2 and "person" in labels:
return "road_accident"
if vehicle_count >= 2:
return "possible_vehicle_collision"
if "motorcycle" in labels and "ambulance" in labels:
return "motorcycle_accident"
if "truck" in labels and "ambulance" in labels:
return "truck_accident"
if "bus" in labels and "ambulance" in labels:
return "bus_accident"
# ==========================
# TRAFFIC RELATED
# ==========================
if "traffic congestion" in labels and vehicle_count >= 2:
return "heavy_traffic"
if "construction barrier" in labels and "road" in labels:
return "road_construction"
# ==========================
# ROAD BLOCKAGE
# ==========================
if "tree" in labels and "road" in labels:
return "fallen_tree_blockage"
if "debris" in labels and "road" in labels:
return "road_debris"
if "tree" in labels and "debris" in labels:
return "storm_road_blockage"
# ==========================
# MEDICAL
# ==========================
if "ambulance" in labels and "person" in labels:
return "medical_emergency"
# ==========================
# LAW ENFORCEMENT
# ==========================
if "police vehicle" in labels and vehicle_count > 0:
return "traffic_enforcement"
if "police vehicle" in labels:
return "police_activity"
# ==========================
# GENERAL EVENTS
# ==========================
if vehicle_count > 0:
return "vehicle_activity"
if "building" in labels:
return "building_related_event"
if "person" in labels:
return "crowd_activity"
return "unknown_incident"
@classmethod
def analyze(cls, labels_found: list[str]) -> dict:
"""
Calculates severity scores, severity categorizations, and determines
the incident type classification.
"""
labels = set(x.lower().strip() for x in labels_found)
score = 0
# Object weight mapping for severity scoring
weights = {
"ambulance": 35,
"fire": 40,
"smoke": 20,
"water": 20,
"person": 5,
"debris": 15,
"tree": 10,
"police vehicle": 15,
"damaged vehicle": 25
}
# Accumulate weights of detected labels
for label in labels:
score += weights.get(label, 0)
vehicles = {
"car",
"truck",
"bus",
"motorcycle",
"damaged vehicle"
}
vehicle_count = len(labels.intersection(vehicles))
# Scoring modifiers based on incident context
# Fire
if "fire" in labels:
score += 20
# Flood
elif "water" in labels and "road" in labels:
score += 20
# Accident
elif vehicle_count >= 2 and "person" in labels:
score += 25
elif "ambulance" in labels and vehicle_count >= 1:
score += 30
# Obstruction
elif "tree" in labels and "road" in labels:
score += 15
# Cap score at 100
score = min(score, 100)
# Categorize severity based on score thresholds
if score >= 80:
severity = "critical"
elif score >= 60:
severity = "high"
elif score >= 30:
severity = "medium"
else:
severity = "low"
# Get classified incident type
incident_type = cls.classify_incident(labels)
return {
"incident_type": incident_type,
"severity": severity,
"severity_score": score
}