File size: 5,312 Bytes
89e6f26
dd6e252
 
 
 
89e6f26
ae9550f
dd6e252
 
 
6581e5a
dd6e252
89e6f26
 
6581e5a
89e6f26
6581e5a
 
89e6f26
 
dd6e252
 
ae9550f
 
dd6e252
89e6f26
dd6e252
 
 
 
89e6f26
dd6e252
 
 
 
 
 
 
 
89e6f26
dd6e252
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6581e5a
dd6e252
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6581e5a
dd6e252
 
 
 
 
 
 
 
 
 
 
 
 
 
6581e5a
dd6e252
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6581e5a
dd6e252
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import requests
import time
from datetime import datetime
from fastapi.middleware.cors import CORSMiddleware
import os
app = FastAPI(title="IoT Traffic Monitor API", 
              description="API to get the latest traffic status from IoT devices",
              version="1.0.0")

# Enable CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Firebase configuration - same as in your HTML file
firebase_config = {
    "apiKey": os.getenv('a'),
    "projectId": os.getenv('b')
}

class TrafficResponse(BaseModel):
    detected_count: int
    traffic_status: str
    timestamp: str

def get_traffic_status(detected_count):
    """Determine traffic status based on detected count."""
    if detected_count < 4:
        return "LIGHT TRAFFIC"
    elif detected_count < 7:
        return "MEDIUM TRAFFIC"
    else:
        return "HEAVY TRAFFIC"

def fetch_latest_data():
    """Fetch the latest data from the IOT collection using Firebase REST API."""
    try:
        # Firestore REST API endpoint for querying collections
        base_url = f"https://firestore.googleapis.com/v1/projects/{firebase_config['projectId']}/databases/(default)/documents/IOT"
        
        # Make the request to get all documents
        response = requests.get(base_url, params={"key": firebase_config["apiKey"]})
        
        if response.status_code == 200:
            data = response.json()
            if 'documents' in data and data['documents']:
                # Sort documents by timestamp field (if it exists)
                sorted_docs = sorted(
                    data['documents'], 
                    key=lambda doc: doc.get('fields', {}).get('timestamp', {}).get('stringValue', ''),
                    reverse=True  # Most recent first
                )
                
                # Get the most recent document
                if sorted_docs:
                    doc_data = parse_firestore_document(sorted_docs[0])
                    return doc_data
            
            return None
        else:
            raise HTTPException(status_code=503, detail=f"Error fetching data from Firebase: {response.status_code}")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

def parse_firestore_document(doc):
    """Parse Firestore document from REST API format to a simple dict."""
    result = {}
    
    if 'fields' in doc:
        for key, value in doc['fields'].items():
            # Extract the value based on its type
            for type_key, actual_value in value.items():
                if type_key == 'stringValue':
                    result[key] = actual_value
                elif type_key == 'integerValue':
                    result[key] = int(actual_value)
                elif type_key == 'doubleValue':
                    result[key] = float(actual_value)
                elif type_key == 'booleanValue':
                    result[key] = bool(actual_value)
                elif type_key == 'timestampValue':
                    result[key] = actual_value
                elif type_key == 'arrayValue':
                    # Handle array values if needed
                    result[key] = [parse_firestore_value(item) for item in actual_value.get('values', [])]
                elif type_key == 'mapValue':
                    # Handle nested maps if needed
                    if 'fields' in actual_value:
                        result[key] = {k: parse_firestore_value(v) for k, v in actual_value['fields'].items()}
                    else:
                        result[key] = {}
    
    return result

def parse_firestore_value(value_dict):
    """Helper function to parse Firestore values."""
    for type_key, actual_value in value_dict.items():
        if type_key == 'stringValue':
            return actual_value
        elif type_key == 'integerValue':
            return int(actual_value)
        elif type_key == 'doubleValue':
            return float(actual_value)
        elif type_key == 'booleanValue':
            return bool(actual_value)
        elif type_key == 'timestampValue':
            return actual_value
    return None

@app.get("/", response_model=TrafficResponse)
async def get_traffic():
    """
    Get the latest traffic information
    
    Returns:
        - detected_count: Number of objects detected by the IoT device
        - traffic_status: Current traffic status classification (LIGHT/MEDIUM/HEAVY)
        - timestamp: Timestamp of the detection
    """
    data = fetch_latest_data()
    
    if not data or 'detected_count' not in data:
        raise HTTPException(status_code=404, detail="No traffic data available")
    
    detected_count = data.get('detected_count', 0)
    traffic_status = get_traffic_status(detected_count)
    timestamp = data.get('timestamp', datetime.now().isoformat())
    
    return TrafficResponse(
        detected_count=detected_count,
        traffic_status=traffic_status,
        timestamp=timestamp
    )

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy", "timestamp": datetime.now().isoformat()}