Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel, Field | |
| import joblib | |
| import pandas as pd | |
| import os | |
| from Preprocessing import preprocess_input,model | |
| app = FastAPI( | |
| title="Electricity Cost Prediction API", | |
| description="Predicts electricity cost based on facility and operational parameters" | |
| ) | |
| if model is None: | |
| raise RuntimeError("Critical Error: ML model failed to load from external source during application startup.") | |
| class ElectricityInput(BaseModel): | |
| site_area: float = Field(..., description="Area of the site in square units") | |
| structure_type: str = Field(..., description="Type of structure (e.g., 'residential', 'commercial')") | |
| water_consumption: float = Field(..., description="Daily/monthly water consumption") | |
| recycling_rate: float = Field(..., description="Percentage of waste recycled") | |
| utilisation_rate: float = Field(..., description="Rate of facility utilization") | |
| air_qality_index: float = Field(..., description="Air quality index") | |
| issue_reolution_time: float = Field(..., description="Time taken to resolve issues (e.g., in hours)") | |
| resident_count: int = Field(..., description="Number of residents/occupants") | |
| #Basically all these inputs for the base model will be converted into pydantic object after checking, optimizing and safety assurance | |
| #Moving to main API code-> Using the predict model for my task | |
| async def predict_electricity_cost(data: ElectricityInput): | |
| print("Predicts the total electricity cost based on the provided input features") | |
| try: | |
| input_data_dict = data.model_dump() | |
| processed_df = preprocess_input(input_data_dict) | |
| prediction = model.predict(processed_df)[0] | |
| predicted_cost = round(float(prediction), 2) | |
| return {"predicted_electricity_cost": predicted_cost} | |
| except Exception as e: | |
| print(f"An unexpected error occurred during prediction: {e}") | |
| raise HTTPException( | |
| status_code=500, #Internal server error....Basically giving error coverups | |
| detail=f"An internal server error occurred during prediction. Error : {e}" | |
| ) | |
| #My overall work in this API File -> | |
| #User input : JSON -> Pydantic object -> dictionary -> DataFrame -> model.predict() | |
| async def health_check(): | |
| #Using asyn function, because it can pause wherever the function needs to run a different block of code and then restart again | |
| return {"status": "ok", "message": "Electricity Cost Prediction API is running accurately!"} | |
| #So, this was the overall API implementation.....Also, I've created the docker and .dockerignore files in this folder to package my work and deploy it....basically storing it in a container...You can see that I've marked up all error points as much as possible to resolve all the incoming issues fast |