import sys import joblib import pandas as pd import numpy as np from fastapi import FastAPI from pydantic import BaseModel from sklearn.preprocessing import FunctionTransformer, OrdinalEncoder, StandardScaler from sklearn.impute import SimpleImputer from sklearn.pipeline import make_pipeline from sklearn.base import BaseEstimator, TransformerMixin from sklearn.utils.validation import check_X_y, check_array, check_is_fitted # ================================ # 1️⃣ Custom Preprocessing Functions # ================================ def temp_cat(X): X = pd.DataFrame(X) X['avg_temp_cat'] = pd.cut( X['avg_temp'], bins=[0, 5, 10, 20, 30, np.inf], labels=['very_cold', 'cold', 'warm', 'hot', 'very_hot'] ) return X def clean(X): return pd.DataFrame(X).dropna() def proxy_humidity(X): X = pd.DataFrame(X) X["proxy_humidity"] = X["average_rain_fall_mm_per_year"] / (X["avg_temp"] + 1) return X # ================================ # 2️⃣ Transformers and Pipelines # ================================ temp_cat_pipeline = make_pipeline( FunctionTransformer(temp_cat), OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1) ) clean_pipeline = make_pipeline( FunctionTransformer(clean), StandardScaler() ) cat_pipeline = make_pipeline( SimpleImputer(strategy="most_frequent"), OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=-1) ) proxy_humidity_pipeline = make_pipeline( FunctionTransformer(proxy_humidity), StandardScaler() ) square_pipeline = make_pipeline(FunctionTransformer(np.square), StandardScaler()) log_pipeline = make_pipeline(FunctionTransformer(np.log1p), StandardScaler()) default_num_pipeline = make_pipeline(StandardScaler()) # ================================ # 3️⃣ Custom Feature Selector # ================================ class CorrelationThresholdSelector(BaseEstimator, TransformerMixin): def __init__(self, threshold=0.9, target_threshold=0.0, method="pearson", min_variance=0.0): self.threshold = threshold self.target_threshold = target_threshold self.method = method self.min_variance = min_variance def fit(self, X, y): X_arr, y_arr = check_X_y(X, y, accept_sparse=False, dtype=np.float64) n_features = X_arr.shape[1] self.feature_names_in_ = np.array(getattr(X, "columns", [f"f{i}" for i in range(n_features)])) X_df = pd.DataFrame(X_arr, columns=self.feature_names_in_) variances = X_df.var(numeric_only=True) low_var_idx = np.where(variances <= self.min_variance)[0].tolist() corr_mat = X_df.corr(method=self.method).abs().values np.fill_diagonal(corr_mat, 0.0) y_series = pd.Series(y_arr) target_corr = X_df.corrwith(y_series, method=self.method).abs().fillna(0.0).values visited, drops = set(), set() for i in range(n_features): if i in visited or i in low_var_idx: continue correlated_idx = set(np.where(corr_mat[i] > self.threshold)[0].tolist()) cluster = {i} | correlated_idx visited |= cluster if len(cluster) > 1: best = max(cluster, key=lambda idx: (target_corr[idx], X_df.iloc[:, idx].var())) if self.target_threshold > 0 and target_corr[best] < self.target_threshold: drops |= cluster else: cluster.remove(best) drops |= cluster drops |= set(low_var_idx) self.selected_features_ = np.array(sorted(set(range(n_features)) - drops), dtype=int) return self def transform(self, X): check_is_fitted(self, "selected_features_") X_arr = check_array(X, accept_sparse=False, dtype=np.float64) return X_arr[:, self.selected_features_] # ================================ # 4️⃣ Register Custom Functions for joblib # ================================ sys.modules['__main__'].temp_cat = temp_cat sys.modules['__main__'].clean = clean sys.modules['__main__'].proxy_humidity = proxy_humidity sys.modules['__main__'].CorrelationThresholdSelector = CorrelationThresholdSelector # ================================ # 5️⃣ Initialize FastAPI # ================================ app = FastAPI(title="🌾 Crop Yield Predictor API", version="1.0") # ================================ # 6️⃣ Load Model # ================================ try: model = joblib.load("CropYieldPredictor.pkl") print("✅ Model loaded successfully!") except Exception as e: print(f"❌ Error loading model: {e}") model = None # ================================ # 7️⃣ Define Input Schema # ================================ class CropInput(BaseModel): Area: str Item: str Year: int average_rain_fall_mm_per_year: float pesticides_tonnes: float avg_temp: float # ================================ # 8️⃣ Routes # ================================ @app.get("/") def home(): return {"message": "🌾 Crop Yield Predictor API is live and running!"} @app.post("/predict") def predict_yield(data: CropInput): if model is None: return {"error": "Model not loaded properly!"} try: input_df = pd.DataFrame([data.dict()]) prediction = model.predict(input_df)[0] predicted_yield_kg_ha = prediction * 0.1 return { "predicted_yield_hg_per_ha": float(prediction), "predicted_yield_kg_per_ha": float(predicted_yield_kg_ha), "message": "✅ Prediction successful!" } except Exception as e: return { "error": str(e), "message": "❌ Prediction failed due to preprocessing or feature mismatch." } # ================================ # 9️⃣ Local or Hugging Face Run # ================================ if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)