Spaces:
Sleeping
Sleeping
File size: 5,312 Bytes
89e6f26 dd6e252 89e6f26 ae9550f dd6e252 6581e5a dd6e252 89e6f26 6581e5a 89e6f26 6581e5a 89e6f26 dd6e252 ae9550f dd6e252 89e6f26 dd6e252 89e6f26 dd6e252 89e6f26 dd6e252 6581e5a dd6e252 6581e5a dd6e252 6581e5a dd6e252 6581e5a dd6e252 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import requests
import time
from datetime import datetime
from fastapi.middleware.cors import CORSMiddleware
import os
app = FastAPI(title="IoT Traffic Monitor API",
description="API to get the latest traffic status from IoT devices",
version="1.0.0")
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Firebase configuration - same as in your HTML file
firebase_config = {
"apiKey": os.getenv('a'),
"projectId": os.getenv('b')
}
class TrafficResponse(BaseModel):
detected_count: int
traffic_status: str
timestamp: str
def get_traffic_status(detected_count):
"""Determine traffic status based on detected count."""
if detected_count < 4:
return "LIGHT TRAFFIC"
elif detected_count < 7:
return "MEDIUM TRAFFIC"
else:
return "HEAVY TRAFFIC"
def fetch_latest_data():
"""Fetch the latest data from the IOT collection using Firebase REST API."""
try:
# Firestore REST API endpoint for querying collections
base_url = f"https://firestore.googleapis.com/v1/projects/{firebase_config['projectId']}/databases/(default)/documents/IOT"
# Make the request to get all documents
response = requests.get(base_url, params={"key": firebase_config["apiKey"]})
if response.status_code == 200:
data = response.json()
if 'documents' in data and data['documents']:
# Sort documents by timestamp field (if it exists)
sorted_docs = sorted(
data['documents'],
key=lambda doc: doc.get('fields', {}).get('timestamp', {}).get('stringValue', ''),
reverse=True # Most recent first
)
# Get the most recent document
if sorted_docs:
doc_data = parse_firestore_document(sorted_docs[0])
return doc_data
return None
else:
raise HTTPException(status_code=503, detail=f"Error fetching data from Firebase: {response.status_code}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
def parse_firestore_document(doc):
"""Parse Firestore document from REST API format to a simple dict."""
result = {}
if 'fields' in doc:
for key, value in doc['fields'].items():
# Extract the value based on its type
for type_key, actual_value in value.items():
if type_key == 'stringValue':
result[key] = actual_value
elif type_key == 'integerValue':
result[key] = int(actual_value)
elif type_key == 'doubleValue':
result[key] = float(actual_value)
elif type_key == 'booleanValue':
result[key] = bool(actual_value)
elif type_key == 'timestampValue':
result[key] = actual_value
elif type_key == 'arrayValue':
# Handle array values if needed
result[key] = [parse_firestore_value(item) for item in actual_value.get('values', [])]
elif type_key == 'mapValue':
# Handle nested maps if needed
if 'fields' in actual_value:
result[key] = {k: parse_firestore_value(v) for k, v in actual_value['fields'].items()}
else:
result[key] = {}
return result
def parse_firestore_value(value_dict):
"""Helper function to parse Firestore values."""
for type_key, actual_value in value_dict.items():
if type_key == 'stringValue':
return actual_value
elif type_key == 'integerValue':
return int(actual_value)
elif type_key == 'doubleValue':
return float(actual_value)
elif type_key == 'booleanValue':
return bool(actual_value)
elif type_key == 'timestampValue':
return actual_value
return None
@app.get("/", response_model=TrafficResponse)
async def get_traffic():
"""
Get the latest traffic information
Returns:
- detected_count: Number of objects detected by the IoT device
- traffic_status: Current traffic status classification (LIGHT/MEDIUM/HEAVY)
- timestamp: Timestamp of the detection
"""
data = fetch_latest_data()
if not data or 'detected_count' not in data:
raise HTTPException(status_code=404, detail="No traffic data available")
detected_count = data.get('detected_count', 0)
traffic_status = get_traffic_status(detected_count)
timestamp = data.get('timestamp', datetime.now().isoformat())
return TrafficResponse(
detected_count=detected_count,
traffic_status=traffic_status,
timestamp=timestamp
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "timestamp": datetime.now().isoformat()} |