Mayur-cinderace
Rename requirements_api.txt to requirements.txt for HF build
c47e024
from fastapi import FastAPI
from pydantic import BaseModel
import numpy as np
import pandas as pd
import time
import joblib
from huggingface_hub import hf_hub_download
from prometheus_client import (
Counter,
Histogram,
generate_latest,
CONTENT_TYPE_LATEST
)
from fastapi.responses import Response
# =====================================================
# App
# =====================================================
app = FastAPI(title="Investor Sentiment Inference API")
# =====================================================
# Prometheus Metrics
# =====================================================
REQUEST_COUNT = Counter(
"prediction_requests_total",
"Total number of prediction requests"
)
REQUEST_LATENCY = Histogram(
"prediction_latency_seconds",
"Prediction latency in seconds"
)
SENTIMENT_DISTRIBUTION = Histogram(
"sentiment_score_distribution",
"Distribution of sentiment scores",
buckets=(-1, -0.5, 0, 0.5, 1)
)
# =====================================================
# Load model dynamically from Hugging Face Hub
# =====================================================
HF_MODEL_REPO = "Mayur-cinderace/investor-mlopsmodels"
TICKER = "AAPL"
def load_model():
model_path = hf_hub_download(
repo_id=HF_MODEL_REPO,
filename=f"{TICKER}/rf.joblib"
)
scaler_path = hf_hub_download(
repo_id=HF_MODEL_REPO,
filename=f"{TICKER}/scaler_x.joblib"
)
model = joblib.load(model_path)
scaler = joblib.load(scaler_path)
return model, scaler
model, scaler_x = load_model()
# =====================================================
# Sentiment Logic
# =====================================================
POS_WORDS = {
"good", "buy", "up", "rise", "gain", "bull",
"profit", "growth", "bullish", "strong"
}
NEG_WORDS = {
"bad", "sell", "down", "fall", "loss",
"bear", "risk", "crash", "bearish", "weak"
}
def simple_sentiment(text: str) -> float:
words = text.lower().split()
pos = sum(w in POS_WORDS for w in words)
neg = sum(w in NEG_WORDS for w in words)
return (pos - neg) / (pos + neg) if (pos + neg) > 0 else 0.0
# =====================================================
# Input Schema
# =====================================================
class InputText(BaseModel):
sentence: str
# =====================================================
# Market Context (latest available features)
# =====================================================
def get_latest_market_context():
df = pd.read_csv("data/processed/merged_features.csv")
last = df[df["Ticker"] == TICKER].iloc[-1]
return last["return_lag1"], last["volume_lag1"]
# =====================================================
# Prediction Endpoint
# =====================================================
@app.post("/predict")
def predict(data: InputText):
start_time = time.time()
REQUEST_COUNT.inc()
sentiment = simple_sentiment(data.sentence)
SENTIMENT_DISTRIBUTION.observe(sentiment)
return_lag1, volume_lag1 = get_latest_market_context()
X = np.array([[return_lag1, volume_lag1, sentiment]])
Xs = scaler_x.transform(X)
prediction = model.predict(Xs)[0]
REQUEST_LATENCY.observe(time.time() - start_time)
return {
"sentence": data.sentence,
"sentiment_score": sentiment,
"predicted_return": float(prediction)
}
# =====================================================
# Prometheus Metrics Endpoint
# =====================================================
@app.get("/metrics")
def metrics():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
# =====================================================
# Health Check
# =====================================================
@app.get("/health")
def health():
return {"status": "ok"}