from fastapi import FastAPI from pydantic import BaseModel import numpy as np import pandas as pd import time import joblib from huggingface_hub import hf_hub_download from prometheus_client import ( Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST ) from fastapi.responses import Response # ===================================================== # App # ===================================================== app = FastAPI(title="Investor Sentiment Inference API") # ===================================================== # Prometheus Metrics # ===================================================== REQUEST_COUNT = Counter( "prediction_requests_total", "Total number of prediction requests" ) REQUEST_LATENCY = Histogram( "prediction_latency_seconds", "Prediction latency in seconds" ) SENTIMENT_DISTRIBUTION = Histogram( "sentiment_score_distribution", "Distribution of sentiment scores", buckets=(-1, -0.5, 0, 0.5, 1) ) # ===================================================== # Load model dynamically from Hugging Face Hub # ===================================================== HF_MODEL_REPO = "Mayur-cinderace/investor-mlopsmodels" TICKER = "AAPL" def load_model(): model_path = hf_hub_download( repo_id=HF_MODEL_REPO, filename=f"{TICKER}/rf.joblib" ) scaler_path = hf_hub_download( repo_id=HF_MODEL_REPO, filename=f"{TICKER}/scaler_x.joblib" ) model = joblib.load(model_path) scaler = joblib.load(scaler_path) return model, scaler model, scaler_x = load_model() # ===================================================== # Sentiment Logic # ===================================================== POS_WORDS = { "good", "buy", "up", "rise", "gain", "bull", "profit", "growth", "bullish", "strong" } NEG_WORDS = { "bad", "sell", "down", "fall", "loss", "bear", "risk", "crash", "bearish", "weak" } def simple_sentiment(text: str) -> float: words = text.lower().split() pos = sum(w in POS_WORDS for w in words) neg = sum(w in NEG_WORDS for w in words) return (pos - neg) / (pos + neg) if (pos + neg) > 0 else 0.0 # ===================================================== # Input Schema # ===================================================== class InputText(BaseModel): sentence: str # ===================================================== # Market Context (latest available features) # ===================================================== def get_latest_market_context(): df = pd.read_csv("data/processed/merged_features.csv") last = df[df["Ticker"] == TICKER].iloc[-1] return last["return_lag1"], last["volume_lag1"] # ===================================================== # Prediction Endpoint # ===================================================== @app.post("/predict") def predict(data: InputText): start_time = time.time() REQUEST_COUNT.inc() sentiment = simple_sentiment(data.sentence) SENTIMENT_DISTRIBUTION.observe(sentiment) return_lag1, volume_lag1 = get_latest_market_context() X = np.array([[return_lag1, volume_lag1, sentiment]]) Xs = scaler_x.transform(X) prediction = model.predict(Xs)[0] REQUEST_LATENCY.observe(time.time() - start_time) return { "sentence": data.sentence, "sentiment_score": sentiment, "predicted_return": float(prediction) } # ===================================================== # Prometheus Metrics Endpoint # ===================================================== @app.get("/metrics") def metrics(): return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) # ===================================================== # Health Check # ===================================================== @app.get("/health") def health(): return {"status": "ok"}