IOTcloudDB / main.py
Rakshitjan's picture
Update main.py
ae9550f verified
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import requests
import time
from datetime import datetime
from fastapi.middleware.cors import CORSMiddleware
import os
app = FastAPI(title="IoT Traffic Monitor API",
description="API to get the latest traffic status from IoT devices",
version="1.0.0")
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Firebase configuration - same as in your HTML file
firebase_config = {
"apiKey": os.getenv('a'),
"projectId": os.getenv('b')
}
class TrafficResponse(BaseModel):
detected_count: int
traffic_status: str
timestamp: str
def get_traffic_status(detected_count):
"""Determine traffic status based on detected count."""
if detected_count < 4:
return "LIGHT TRAFFIC"
elif detected_count < 7:
return "MEDIUM TRAFFIC"
else:
return "HEAVY TRAFFIC"
def fetch_latest_data():
"""Fetch the latest data from the IOT collection using Firebase REST API."""
try:
# Firestore REST API endpoint for querying collections
base_url = f"https://firestore.googleapis.com/v1/projects/{firebase_config['projectId']}/databases/(default)/documents/IOT"
# Make the request to get all documents
response = requests.get(base_url, params={"key": firebase_config["apiKey"]})
if response.status_code == 200:
data = response.json()
if 'documents' in data and data['documents']:
# Sort documents by timestamp field (if it exists)
sorted_docs = sorted(
data['documents'],
key=lambda doc: doc.get('fields', {}).get('timestamp', {}).get('stringValue', ''),
reverse=True # Most recent first
)
# Get the most recent document
if sorted_docs:
doc_data = parse_firestore_document(sorted_docs[0])
return doc_data
return None
else:
raise HTTPException(status_code=503, detail=f"Error fetching data from Firebase: {response.status_code}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
def parse_firestore_document(doc):
"""Parse Firestore document from REST API format to a simple dict."""
result = {}
if 'fields' in doc:
for key, value in doc['fields'].items():
# Extract the value based on its type
for type_key, actual_value in value.items():
if type_key == 'stringValue':
result[key] = actual_value
elif type_key == 'integerValue':
result[key] = int(actual_value)
elif type_key == 'doubleValue':
result[key] = float(actual_value)
elif type_key == 'booleanValue':
result[key] = bool(actual_value)
elif type_key == 'timestampValue':
result[key] = actual_value
elif type_key == 'arrayValue':
# Handle array values if needed
result[key] = [parse_firestore_value(item) for item in actual_value.get('values', [])]
elif type_key == 'mapValue':
# Handle nested maps if needed
if 'fields' in actual_value:
result[key] = {k: parse_firestore_value(v) for k, v in actual_value['fields'].items()}
else:
result[key] = {}
return result
def parse_firestore_value(value_dict):
"""Helper function to parse Firestore values."""
for type_key, actual_value in value_dict.items():
if type_key == 'stringValue':
return actual_value
elif type_key == 'integerValue':
return int(actual_value)
elif type_key == 'doubleValue':
return float(actual_value)
elif type_key == 'booleanValue':
return bool(actual_value)
elif type_key == 'timestampValue':
return actual_value
return None
@app.get("/", response_model=TrafficResponse)
async def get_traffic():
"""
Get the latest traffic information
Returns:
- detected_count: Number of objects detected by the IoT device
- traffic_status: Current traffic status classification (LIGHT/MEDIUM/HEAVY)
- timestamp: Timestamp of the detection
"""
data = fetch_latest_data()
if not data or 'detected_count' not in data:
raise HTTPException(status_code=404, detail="No traffic data available")
detected_count = data.get('detected_count', 0)
traffic_status = get_traffic_status(detected_count)
timestamp = data.get('timestamp', datetime.now().isoformat())
return TrafficResponse(
detected_count=detected_count,
traffic_status=traffic_status,
timestamp=timestamp
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "timestamp": datetime.now().isoformat()}