from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field import joblib import pandas as pd import os from Preprocessing import preprocess_input,model app = FastAPI( title="Electricity Cost Prediction API", description="Predicts electricity cost based on facility and operational parameters" ) if model is None: raise RuntimeError("Critical Error: ML model failed to load from external source during application startup.") class ElectricityInput(BaseModel): site_area: float = Field(..., description="Area of the site in square units") structure_type: str = Field(..., description="Type of structure (e.g., 'residential', 'commercial')") water_consumption: float = Field(..., description="Daily/monthly water consumption") recycling_rate: float = Field(..., description="Percentage of waste recycled") utilisation_rate: float = Field(..., description="Rate of facility utilization") air_qality_index: float = Field(..., description="Air quality index") issue_reolution_time: float = Field(..., description="Time taken to resolve issues (e.g., in hours)") resident_count: int = Field(..., description="Number of residents/occupants") #Basically all these inputs for the base model will be converted into pydantic object after checking, optimizing and safety assurance #Moving to main API code-> Using the predict model for my task @app.post("/predict") async def predict_electricity_cost(data: ElectricityInput): print("Predicts the total electricity cost based on the provided input features") try: input_data_dict = data.model_dump() processed_df = preprocess_input(input_data_dict) prediction = model.predict(processed_df)[0] predicted_cost = round(float(prediction), 2) return {"predicted_electricity_cost": predicted_cost} except Exception as e: print(f"An unexpected error occurred during prediction: {e}") raise HTTPException( status_code=500, #Internal server error....Basically giving error coverups detail=f"An internal server error occurred during prediction. Error : {e}" ) #My overall work in this API File -> #User input : JSON -> Pydantic object -> dictionary -> DataFrame -> model.predict() @app.get("/health") async def health_check(): #Using asyn function, because it can pause wherever the function needs to run a different block of code and then restart again return {"status": "ok", "message": "Electricity Cost Prediction API is running accurately!"} #So, this was the overall API implementation.....Also, I've created the docker and .dockerignore files in this folder to package my work and deploy it....basically storing it in a container...You can see that I've marked up all error points as much as possible to resolve all the incoming issues fast