Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import requests | |
| import time | |
| from datetime import datetime | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import os | |
| app = FastAPI(title="IoT Traffic Monitor API", | |
| description="API to get the latest traffic status from IoT devices", | |
| version="1.0.0") | |
| # Enable CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Firebase configuration - same as in your HTML file | |
| firebase_config = { | |
| "apiKey": os.getenv('a'), | |
| "projectId": os.getenv('b') | |
| } | |
| class TrafficResponse(BaseModel): | |
| detected_count: int | |
| traffic_status: str | |
| timestamp: str | |
| def get_traffic_status(detected_count): | |
| """Determine traffic status based on detected count.""" | |
| if detected_count < 4: | |
| return "LIGHT TRAFFIC" | |
| elif detected_count < 7: | |
| return "MEDIUM TRAFFIC" | |
| else: | |
| return "HEAVY TRAFFIC" | |
| def fetch_latest_data(): | |
| """Fetch the latest data from the IOT collection using Firebase REST API.""" | |
| try: | |
| # Firestore REST API endpoint for querying collections | |
| base_url = f"https://firestore.googleapis.com/v1/projects/{firebase_config['projectId']}/databases/(default)/documents/IOT" | |
| # Make the request to get all documents | |
| response = requests.get(base_url, params={"key": firebase_config["apiKey"]}) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if 'documents' in data and data['documents']: | |
| # Sort documents by timestamp field (if it exists) | |
| sorted_docs = sorted( | |
| data['documents'], | |
| key=lambda doc: doc.get('fields', {}).get('timestamp', {}).get('stringValue', ''), | |
| reverse=True # Most recent first | |
| ) | |
| # Get the most recent document | |
| if sorted_docs: | |
| doc_data = parse_firestore_document(sorted_docs[0]) | |
| return doc_data | |
| return None | |
| else: | |
| raise HTTPException(status_code=503, detail=f"Error fetching data from Firebase: {response.status_code}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") | |
| def parse_firestore_document(doc): | |
| """Parse Firestore document from REST API format to a simple dict.""" | |
| result = {} | |
| if 'fields' in doc: | |
| for key, value in doc['fields'].items(): | |
| # Extract the value based on its type | |
| for type_key, actual_value in value.items(): | |
| if type_key == 'stringValue': | |
| result[key] = actual_value | |
| elif type_key == 'integerValue': | |
| result[key] = int(actual_value) | |
| elif type_key == 'doubleValue': | |
| result[key] = float(actual_value) | |
| elif type_key == 'booleanValue': | |
| result[key] = bool(actual_value) | |
| elif type_key == 'timestampValue': | |
| result[key] = actual_value | |
| elif type_key == 'arrayValue': | |
| # Handle array values if needed | |
| result[key] = [parse_firestore_value(item) for item in actual_value.get('values', [])] | |
| elif type_key == 'mapValue': | |
| # Handle nested maps if needed | |
| if 'fields' in actual_value: | |
| result[key] = {k: parse_firestore_value(v) for k, v in actual_value['fields'].items()} | |
| else: | |
| result[key] = {} | |
| return result | |
| def parse_firestore_value(value_dict): | |
| """Helper function to parse Firestore values.""" | |
| for type_key, actual_value in value_dict.items(): | |
| if type_key == 'stringValue': | |
| return actual_value | |
| elif type_key == 'integerValue': | |
| return int(actual_value) | |
| elif type_key == 'doubleValue': | |
| return float(actual_value) | |
| elif type_key == 'booleanValue': | |
| return bool(actual_value) | |
| elif type_key == 'timestampValue': | |
| return actual_value | |
| return None | |
| async def get_traffic(): | |
| """ | |
| Get the latest traffic information | |
| Returns: | |
| - detected_count: Number of objects detected by the IoT device | |
| - traffic_status: Current traffic status classification (LIGHT/MEDIUM/HEAVY) | |
| - timestamp: Timestamp of the detection | |
| """ | |
| data = fetch_latest_data() | |
| if not data or 'detected_count' not in data: | |
| raise HTTPException(status_code=404, detail="No traffic data available") | |
| detected_count = data.get('detected_count', 0) | |
| traffic_status = get_traffic_status(detected_count) | |
| timestamp = data.get('timestamp', datetime.now().isoformat()) | |
| return TrafficResponse( | |
| detected_count=detected_count, | |
| traffic_status=traffic_status, | |
| timestamp=timestamp | |
| ) | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return {"status": "healthy", "timestamp": datetime.now().isoformat()} |