Spaces:
Sleeping
Sleeping
File size: 3,843 Bytes
f396c16 0a6956c f396c16 0a6956c f396c16 0a6956c f396c16 0a6956c f396c16 0a6956c f396c16 c47e024 f396c16 0a6956c f396c16 0a6956c f396c16 0a6956c f396c16 0a6956c f396c16 0a6956c f396c16 0a6956c f396c16 0a6956c f396c16 0a6956c f396c16 0a6956c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
from fastapi import FastAPI
from pydantic import BaseModel
import numpy as np
import pandas as pd
import time
import joblib
from huggingface_hub import hf_hub_download
from prometheus_client import (
Counter,
Histogram,
generate_latest,
CONTENT_TYPE_LATEST
)
from fastapi.responses import Response
# =====================================================
# App
# =====================================================
app = FastAPI(title="Investor Sentiment Inference API")
# =====================================================
# Prometheus Metrics
# =====================================================
REQUEST_COUNT = Counter(
"prediction_requests_total",
"Total number of prediction requests"
)
REQUEST_LATENCY = Histogram(
"prediction_latency_seconds",
"Prediction latency in seconds"
)
SENTIMENT_DISTRIBUTION = Histogram(
"sentiment_score_distribution",
"Distribution of sentiment scores",
buckets=(-1, -0.5, 0, 0.5, 1)
)
# =====================================================
# Load model dynamically from Hugging Face Hub
# =====================================================
HF_MODEL_REPO = "Mayur-cinderace/investor-mlopsmodels"
TICKER = "AAPL"
def load_model():
model_path = hf_hub_download(
repo_id=HF_MODEL_REPO,
filename=f"{TICKER}/rf.joblib"
)
scaler_path = hf_hub_download(
repo_id=HF_MODEL_REPO,
filename=f"{TICKER}/scaler_x.joblib"
)
model = joblib.load(model_path)
scaler = joblib.load(scaler_path)
return model, scaler
model, scaler_x = load_model()
# =====================================================
# Sentiment Logic
# =====================================================
POS_WORDS = {
"good", "buy", "up", "rise", "gain", "bull",
"profit", "growth", "bullish", "strong"
}
NEG_WORDS = {
"bad", "sell", "down", "fall", "loss",
"bear", "risk", "crash", "bearish", "weak"
}
def simple_sentiment(text: str) -> float:
words = text.lower().split()
pos = sum(w in POS_WORDS for w in words)
neg = sum(w in NEG_WORDS for w in words)
return (pos - neg) / (pos + neg) if (pos + neg) > 0 else 0.0
# =====================================================
# Input Schema
# =====================================================
class InputText(BaseModel):
sentence: str
# =====================================================
# Market Context (latest available features)
# =====================================================
def get_latest_market_context():
df = pd.read_csv("data/processed/merged_features.csv")
last = df[df["Ticker"] == TICKER].iloc[-1]
return last["return_lag1"], last["volume_lag1"]
# =====================================================
# Prediction Endpoint
# =====================================================
@app.post("/predict")
def predict(data: InputText):
start_time = time.time()
REQUEST_COUNT.inc()
sentiment = simple_sentiment(data.sentence)
SENTIMENT_DISTRIBUTION.observe(sentiment)
return_lag1, volume_lag1 = get_latest_market_context()
X = np.array([[return_lag1, volume_lag1, sentiment]])
Xs = scaler_x.transform(X)
prediction = model.predict(Xs)[0]
REQUEST_LATENCY.observe(time.time() - start_time)
return {
"sentence": data.sentence,
"sentiment_score": sentiment,
"predicted_return": float(prediction)
}
# =====================================================
# Prometheus Metrics Endpoint
# =====================================================
@app.get("/metrics")
def metrics():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
# =====================================================
# Health Check
# =====================================================
@app.get("/health")
def health():
return {"status": "ok"}
|