File size: 4,356 Bytes
f95a740
 
 
 
 
 
 
 
 
b91dc4e
f95a740
 
 
 
 
 
 
 
 
b91dc4e
 
 
 
 
 
 
 
f95a740
8aa5811
 
f95a740
 
 
 
 
 
 
 
 
b91dc4e
 
 
 
 
 
8726d5b
f95a740
 
 
 
 
 
 
 
 
 
 
4757927
 
 
f95a740
 
 
 
 
 
b91dc4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f95a740
8aa5811
 
f95a740
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import logging
import os

import dotenv
import pandas as pd
import requests


class FetchForecast:
    def __init__(self, ticker: str, df_hist: pd.DataFrame, debug=False) -> None:
        if debug:
            self.logger_level = logging.DEBUG
        else:
            self.logger_level = logging.INFO
        self.logger = logging.getLogger(__name__)
        logging.basicConfig(level=self.logger_level)

        # args
        self.ticker = ticker
        self.df_hist = df_hist
        if df_hist is None:
            self.endpoint = "v1/forecast/from_symbol"
            logdatasuffix = "without data"
        else:
            self.endpoint = "v1/forecast/from_data"
            logdatasuffix = "with historic data"
        self.logger.info(f"Initialized FetchForecast for ticker: {self.ticker} {logdatasuffix}")
        # constants
        self.past_horizon = 5  # number of past business days
        # build the api-url based on env variables
        self.api_env = os.environ.get("FORECAST_API_ENV")
        api_url_temp = os.environ.get("API_URL_TEMPLATE")
        self.api_url = api_url_temp.replace("ENV", self.api_env)

    def run(self):
        past_df, fcst_df = self.call_api()
        return past_df, fcst_df

    def call_api(self) -> tuple:
        if self.endpoint.split("/")[-1] in ["from_symbol"]:
            self.logger.info(f"Sending the ticker symmbol to the forecast API ({self.api_env}) {self.endpoint} endpoint")
            pl_in = {"ticker": self.ticker, "past_horizon": self.past_horizon}
        elif self.endpoint.split("/")[-1] in ["from_data"]:
            self.logger.info(f"Formatting and sending ticker data to the forecast API ({self.api_env}) {self.endpoint} endpoint")
            pl_in = self.build_payload_with_data(ticker=self.ticker, past_horizon=self.past_horizon)
        resp = requests.post(f"{self.api_url}/{self.endpoint}", json=pl_in, timeout=30)
        if resp.status_code == 200:
            data = resp.json()
            past_df, fcst_df = self.transform_data(data)
        else:
            self.logger.error(f"Error (status: {resp.status_code}) fetching stock info for {self.ticker}.")
            past_df, fcst_df = None, None
        return past_df, fcst_df
    
    def transform_data(self, data) -> tuple:
        past_df = pd.DataFrame(data["past"]).rename(columns={"index": "Date"})
        fcst_df = pd.DataFrame(data["forecast"]).rename(columns={"index": "Date"})
        # Convert to dates
        past_df["Date"] = pd.to_datetime(past_df["Date"], utc=True).dt.tz_convert(None)
        fcst_df["Date"] = pd.to_datetime(fcst_df["Date"], utc=True).dt.tz_convert(None)
        # move Date to the front
        past_df = past_df[["Date"] + [col for col in past_df.columns if col != "Date"]]
        fcst_df = fcst_df[["Date"] + [col for col in fcst_df.columns if col != "Date"]]
        return past_df, fcst_df
    


    def build_payload_with_data(self, ticker: str, past_horizon: int) -> dict:
        """
        Takes a dataframe
        and returns the columnar JSON dict expected by the /from_data endpoint.
        """
        df = self.df_hist

        df.columns.name = None
        # Re-introduce the ticker as a regular column
        df["Ticker"] = ticker
        # Make 'Date' a regular column by resetting the index
        df.reset_index(inplace=True)

        if "Date" not in df.columns or "Close" not in df.columns:
            raise ValueError("DataFrame must contain 'Date' and 'Close' columns.")

        df = df[["Date", "Close"]].copy()
        df["Date"] = pd.to_datetime(df["Date"], utc=True, errors="coerce")

        # Clean + order
        df = (
            df.dropna(subset=["Date", "Close"])
            .sort_values("Date")
            .drop_duplicates(subset=["Date"], keep="last")
        )

        payload = {
            "ticker": ticker,
            "series": {
                "date": df["Date"].dt.strftime("%Y-%m-%d").tolist(),
                "close": df["Close"].astype(float).tolist(),
            },
            "past_horizon": past_horizon,
        }
        return payload
    

if __name__ == "__main__":
    # load the env variables fom .env file
    dotenv.load_dotenv(dotenv.find_dotenv())
    past_df, fcst_df = FetchForecast("AAPL").run()
    print("Last available price:\n", past_df.tail(1))
    print("Forecasts:\n", fcst_df.head())