AmirTrader's picture
Update app.py
b0d2ea1 verified
import pandas as pd
import numpy as np
import os
import io
from datetime import datetime
from datetime import timedelta
import holoviews as hvs
import panel as pn
import hvplot as hv
import hvplot.pandas
import yfinance as yf
from dotenv import load_dotenv
# from datasets import load_dataset
from utils import load_hf_dataset # ,upload_to_hf_dataset, download_from_hf_dataset
pn.extension("bokeh", template="bootstrap")
def _extract_raw_data(ticker):
df = yf.Ticker(ticker)
df = df.history(period="1y", interval="1d").reset_index()
df['SMA50'] = df['Close'].rolling(window=50).mean()
df['SMA200'] = df['Close'].rolling(window=200).mean()
return df
def _transform_data(raw_data: pd.DataFrame):
data = (
raw_data[["Date", "Open", "High", "Low", "Close", "Volume" ,"SMA50", "SMA200"]]
.copy(deep=True)
.rename(
columns={
"Date": "time",
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
"Volume": "volume",
}
)
)
t_delta = timedelta(hours=1)
data["time_start"] = data.time - 9 * t_delta # rectangles start
data["time_end"] = data.time + 9 * t_delta # rectangles end
data["positive"] = ((data.close - data.open) > 0).astype(int)
return data
def make_candle_stick(ticker):
raw_data = _extract_raw_data(ticker=ticker)
data = _transform_data(raw_data=raw_data)
_delta = np.median(np.diff(data.time))
candlestick = hvs.Segments(
data, kdims=["time", "low", "time", "high"]
) * hvs.Rectangles(
data, kdims=["time_start", "open", "time_end", "close"], vdims=["positive"]
) * hvs.Curve(
data, kdims=["time"], vdims=[ "SMA50"]
)
candlestick = candlestick.redim.label(Low="Values")
candlechart = pn.Column(
candlestick.opts(
hvs.opts.Rectangles(
color="positive", cmap=["red", "green"], responsive=True
),
hvs.opts.Segments(
color="black", height=400, responsive=True, show_grid=True
),
),
data.hvplot(
x="time", y="volume", kind="line", responsive=True, height=200
).opts(show_grid=True),
)
# data.hvplot(y="volume", kind="bar", responsive=True, height=200) )
return candlechart
# Function to convert DataFrame to CSV
def get_csv(df):
sio = io.StringIO()
df.to_csv(sio, index=False)
sio.seek(0)
return sio
# Function to convert the 'Ticker' column to a comma-separated string in a text file
def get_text(df):
tickers = df["Ticker"].tolist()
tickers_str = ",".join(tickers)
sio = io.StringIO()
sio.write(tickers_str)
sio.seek(0)
return sio
# Load environment variables from .env file
load_dotenv()
# Get the Hugging Face API token from the environment; either set in .env file or in the environment directly in GitHub
HF_TOKEN = os.getenv("HF_TOKEN")
# Get the name of the GuruFocus dataset for TradingView, Finviz, MarketBeat and TipRanks to read from
dataset_name_BeatingMarket_input = os.getenv("dataset_name_BeatingMarket_input")
dataset_name_TradingView_input = os.getenv("dataset_name_TradingView_input")
dataset_name_GuruFocus_input = os.getenv("dataset_name_GuruFocus_input")
dataset_name_FinViz_input = os.getenv("dataset_name_FinViz_input")
dataset_name_TipRanks_input = os.getenv("dataset_name_TipRanks_input")
@pn.cache()
def get_tradingview(current_datetime):
# Load lastest TradingView DataSet from HuggingFace Dataset which is always america.csv
# download_from_hf_dataset("america.csv", dataset_name_TradingView_input, HF_TOKEN)
return load_hf_dataset("america.csv", HF_TOKEN, dataset_name_TradingView_input)
@pn.cache()
def get_marketbeat(current_datetime):
return load_hf_dataset(
"beatingmarket.csv", HF_TOKEN, dataset_name_BeatingMarket_input
)
@pn.cache()
def get_gurufocus(current_datetime):
# Load lastest GuruFocus DataSet from HuggingFace Dataset which is always GuruFocus_merged.csv
# daily_gurufocus_DF = load_dataset(dataset_name_GuruFocus_output , data_files="GuruFocus_merged.csv", split="train" , token=HF_TOKEN ).to_pandas()
return load_hf_dataset(
"GuruFocus_merged.csv", HF_TOKEN, dataset_name_GuruFocus_input
)
@pn.cache()
def get_finviz(current_datetime):
# Load lastest FinViz DataSet from HuggingFace Dataset which is always FinViz.csv
return load_hf_dataset("FinViz.csv", HF_TOKEN, dataset_name_FinViz_input)
@pn.cache()
def get_tipranks(current_datetime):
# Load lastest TipRanks DataSet from HuggingFace Dataset which is always tipranks.csv
return load_hf_dataset("tipranks.csv", HF_TOKEN, dataset_name_TipRanks_input)
# Reading gurufocus,finviz from crawling pipelines with GitHub Action
current_datetime = datetime.now().strftime("%Y-%m-%d")
daily_gurufocus_DF = get_gurufocus(current_datetime)
daily_finviz_DF = get_finviz(current_datetime)
daily_marketbeat = get_marketbeat(current_datetime)
monthly_tiprank_DF = get_tipranks(current_datetime)
# TipRanks
monthly_tiprank_DF = monthly_tiprank_DF[["Ticker", "SmartScore"]]
monthly_tiprank_DF["SmartScore"] = monthly_tiprank_DF["SmartScore"].fillna(-1).astype(float)
# FinViz
daily_finviz_DF["FinVizPrice"] = (
pd.to_numeric(daily_finviz_DF["Price"], errors="coerce").fillna(0).astype(float)
)
daily_finviz_DF["FinVizTarget"] = (
pd.to_numeric(daily_finviz_DF["Target Price"], errors="coerce")
.fillna(0)
.astype(float)
)
daily_finviz_DF["FinVizTargetpercent"] = (
100
* (daily_finviz_DF["FinVizTarget"] - daily_finviz_DF["FinVizPrice"])
/ daily_finviz_DF["FinVizPrice"]
).round(2)
daily_finviz_DF_filtered = daily_finviz_DF[
["Ticker", "Price", "FinVizTarget", "FinVizTargetpercent"]
]
# Merging tipranks with Gurufocus
DFgurufocus = daily_gurufocus_DF[
["Ticker", "GFValue", "Market Capitalization", "Sector", "Industry"]
] # , 'GFValuediff']]
DFmerge_tipranks_gurufocus = DFgurufocus.merge(monthly_tiprank_DF)
# Merging Finviz with Merged last one
DFmerge_tipranks_gurufocus = DFmerge_tipranks_gurufocus.merge(daily_finviz_DF_filtered)
# ['Ticker', 'AverageTarget', 'MaxTarget', 'LowTarget', 'CurrentPrice']
daily_marketbeat["MarketBeat"] = (
(daily_marketbeat["MaxTarget"] > daily_marketbeat["CurrentPrice"])
& (daily_marketbeat["LowTarget"] > daily_marketbeat["CurrentPrice"])
& (daily_marketbeat["AverageTarget"] > daily_marketbeat["CurrentPrice"])
).astype(int)
# Merging daily_marketbeat with Merged last one
print(80 * "&")
print(DFmerge_tipranks_gurufocus.columns)
print(daily_marketbeat.columns)
# DFmerge_tipranks_gurufocus = DFmerge_tipranks_gurufocus.merge(daily_marketbeat)
DFmerge_tipranks_gurufocus = DFmerge_tipranks_gurufocus.merge(
daily_marketbeat, how="left", on="Ticker"
)
if (
"Price" in DFmerge_tipranks_gurufocus.columns
and "GFValue" in DFmerge_tipranks_gurufocus.columns
):
DFmerge_tipranks_gurufocus["GFValuepercent"] = (
100
* (DFmerge_tipranks_gurufocus["GFValue"] - DFmerge_tipranks_gurufocus["Price"])
/ DFmerge_tipranks_gurufocus["Price"]
)
DFmerge_tipranks_gurufocus["GFValuepercent"] = DFmerge_tipranks_gurufocus[
"GFValuepercent"
].round(2)
DFmerge_tipranks_gurufocus["Market Capitalization"] = (
DFmerge_tipranks_gurufocus["Market Capitalization"] / 1e9
)
DFmerge_tipranks_gurufocus["MarketCap"] = DFmerge_tipranks_gurufocus[
"Market Capitalization"
].round(1)
DFmerge_tipranks_gurufocus = DFmerge_tipranks_gurufocus.drop(
columns=["Market Capitalization"]
)
DFmerge_tipranks_gurufocus = DFmerge_tipranks_gurufocus[
[
"Ticker",
"Sector",
"Industry",
"MarketCap",
"SmartScore",
"Price",
"GFValue",
"GFValuepercent",
"FinVizTarget",
"FinVizTargetpercent",
"MarketBeat",
]
]
# widget
tickername = pn.state.location.query_params.get("ticker", "No ticker provided")
if tickername == "No ticker provided":
ticker = pn.widgets.AutocompleteInput(
name="Ticker",
options=list(DFmerge_tipranks_gurufocus.Ticker),
placeholder="Write Ticker here همین جا",
value="ALL",
restrict=False,
)
else:
ticker = pn.widgets.AutocompleteInput(
name="Ticker",
options=list(DFmerge_tipranks_gurufocus.Ticker),
placeholder="Write Ticker here همین جا",
value=f"{tickername}",
restrict=False,
)
SmartScore = pn.widgets.EditableRangeSlider(
name="Smart Score", start=-1, end=10, value=(-1, 10), step=1
)
Industry = pn.widgets.CheckBoxGroup(
name="Select Industry",
value=list(set(DFmerge_tipranks_gurufocus.Industry)),
options=list(set(DFmerge_tipranks_gurufocus.Industry)),
inline=True,
)
Sector = pn.widgets.MultiSelect(
name="Select Sector",
value=list(set(DFmerge_tipranks_gurufocus.Sector)),
options=list(set(DFmerge_tipranks_gurufocus.Sector)),
) # , inline=False)
GFValuepercent = pn.widgets.FloatSlider(
name="GF Value %", start=-100, end=100, step=1, value=12.0
)
FinVizTargetpercent = pn.widgets.FloatSlider(
name="FinViz Target %", start=-100, end=100, step=1, value=12.0
)
MarketCap = pn.widgets.FloatSlider(
name="Market Capital (B$)", start=0, end=1000, step=1, value=10
)
MarketBeat_radio_group = pn.widgets.RadioButtonGroup(
name="MarketBeat",
options=["Default Targets", "Above all Targets"],
button_type="success",
value="Above all Targets",
)
# Create a file download link
download_button_GuruFocus = pn.widgets.FileDownload(
filename=f"GuruFocus_merged_{current_datetime}.csv",
callback=pn.bind(get_csv, daily_gurufocus_DF),
button_type="primary",
label="Download GuruFocus",
width=300,
)
download_button_FinViz = pn.widgets.FileDownload(
filename=f"FinViz_{current_datetime}.csv",
callback=pn.bind(get_csv, daily_finviz_DF),
button_type="primary",
label="Download FinViz",
width=300,
)
download_button_MarketBeat = pn.widgets.FileDownload(
filename=f"MarketBeat_{current_datetime}.csv",
callback=pn.bind(get_csv, daily_marketbeat),
button_type="warning",
label="Download MarketBeat",
width=300,
)
download_button_TipRanks = pn.widgets.FileDownload(
filename=f"TipRanks_{current_datetime}.csv",
callback=pn.bind(get_csv, monthly_tiprank_DF),
button_type="warning",
label="Download TipRanks",
width=300,
)
def get_DF_filter(
DF, ticker, SmartScore, GFValuepercent, FinVizTargetpercent, Sector, MarketCap
):
return DF.query(
"SmartScore>=@SmartScore[0] & SmartScore <= @SmartScore[1] & GFValuepercent>=@GFValuepercent & FinVizTargetpercent>@FinVizTargetpercent & Sector in @Sector & MarketCap>@MarketCap"
)
download_button_Filter = pn.widgets.FileDownload(
filename=f"Filter_{current_datetime}.csv",
callback=pn.bind(
get_csv,
pn.bind(
get_DF_filter,
DF=DFmerge_tipranks_gurufocus,
ticker=ticker,
SmartScore=SmartScore,
GFValuepercent=GFValuepercent,
FinVizTargetpercent=FinVizTargetpercent,
Sector=Sector,
MarketCap=MarketCap,
),
),
button_type="danger",
label="Download your Filter",
width=300,
)
download_button_watchlist = pn.widgets.FileDownload(
filename=f"watchlist_{current_datetime}.txt",
callback=pn.bind(
get_text,
pn.bind(
get_DF_filter,
DF=DFmerge_tipranks_gurufocus,
ticker=ticker,
SmartScore=SmartScore,
GFValuepercent=GFValuepercent,
FinVizTargetpercent=FinVizTargetpercent,
Sector=Sector,
MarketCap=MarketCap,
),
),
button_type="success",
label="Download Watchlist",
width=300,
)
png_pane = pn.pane.GIF(
"https://cdn.dribbble.com/userupload/23265870/file/original-619180a5b218d92d799a2d60617a09fb.gif",
width=300,
) # , height=150)
png_pane2 = pn.pane.GIF(
"https://www.mywallstreetadvisor.com/_next/image?url=%2FMyWallStreetAdvisorLOGO.png&w=64&q=75",
width=50,
) # , height=150)
pn.extension('echarts')
def get_gauge(text, value, low, high):
if value:
return pn.indicators.Gauge(
name=f'{text}', value=value, bounds=(low, high), format='{value}',
colors=[(0.2, 'red'), (0.8, 'gold'), (1, 'green')], width=300, height=300)
def get_DF(
DF,
ticker,
SmartScore,
GFValuepercent,
FinVizTargetpercent,
Sector,
MarketCap,
MarketBeat,
):
if ticker and ticker != "ALL":
table1 = pn.widgets.Tabulator(
DF.query("Ticker == @ticker"), height=200, widths=200, show_index=False
)
table1.disabled = True
chart1 = make_candle_stick(ticker)
url_button_finviz = pn.widgets.Button(
name=f"{ticker} on FinViz", button_type="primary", width=250
)
url_button_finviz.js_on_click(
code=f"window.open('https://finviz.com/quote.ashx?t={ticker}&p=d#', '_blank')"
)
url_button_benzinga = pn.widgets.Button(
name=f"{ticker} on Benzinga", button_type="primary", width=250
)
url_button_benzinga.js_on_click(
code=f"window.open('https://www.benzinga.com/quote/{ticker}', '_blank')"
)
url_button_tipranks = pn.widgets.Button(
name=f"{ticker} on TipRanks", button_type="warning", width=250
)
url_button_tipranks.js_on_click(
code=f"window.open('https://www.tipranks.com/stocks/{ticker}/forecast', '_blank')"
)
url_button_gurufocus = pn.widgets.Button(
name=f"{ticker} on GuruFocus", button_type="warning", width=250
)
url_button_gurufocus.js_on_click(
code=f"window.open('https://gurufocus.com/stock/{ticker}', '_blank')"
)
url_button_barchart = pn.widgets.Button(
name=f"{ticker} on BarChart", button_type="success", width=250
)
url_button_barchart.js_on_click(
code=f"window.open('https://www.barchart.com/stocks/quotes/{ticker}/expected-move', '_blank')"
)
url_button_optionchart = pn.widgets.Button(
name=f"{ticker} on optionchart", button_type="danger", width=250
)
url_button_optionchart.js_on_click(
code=f"window.open('https://optioncharts.io/options/{ticker}', '_blank')"
)
url_button_tradingview = pn.widgets.Button(
name=f"{ticker} on tradingview", button_type="light", width=250
)
url_button_tradingview.js_on_click(
code=f"window.open('https://www.tradingview.com/symbols/NASDAQ-{ticker}/technicals/', '_blank')"
)
url_button_marketwatch = pn.widgets.Button(
name=f"{ticker} on marketwatch", button_type="danger", width=250
)
url_button_marketwatch.js_on_click(
code=f"window.open('https://www.marketwatch.com/investing/stock/{ticker}', '_blank')"
)
url_button_chartmill_TA = pn.widgets.Button(
name=f"{ticker} on ChartMill TA", button_type="danger", width=250
)
url_button_chartmill_TA.js_on_click(
code=f"window.open('https://www.chartmill.com/stock/quote/{ticker}/technical-analysis', '_blank')"
)
url_button_chartmill_FA = pn.widgets.Button(
name=f"{ticker} on ChartMill FA", button_type="danger", width=250
)
url_button_chartmill_FA.js_on_click(
code=f"window.open('https://www.chartmill.com/stock/quote/{ticker}/fundamental-analysis', '_blank')"
)
url_button_seekingalpha = pn.widgets.Button(
name=f"{ticker} on seekingalpha", button_type="light", width=250
)
url_button_seekingalpha.js_on_click(
code=f"window.open('https://seekingalpha.com/symbol/{ticker}', '_blank')"
)
return pn.Column(
table1,
chart1,
pn.Column(
pn.Row(
url_button_finviz,
url_button_benzinga,
url_button_gurufocus,
url_button_tipranks,
url_button_barchart,
url_button_optionchart,
url_button_tradingview,
url_button_marketwatch,
),
pn.Row(
url_button_chartmill_TA,
url_button_chartmill_FA,
url_button_seekingalpha
),
pn.Row(
get_gauge('SmartScore', float(DF.query('Ticker == @ticker').SmartScore.iloc[0]), 0, 10),
get_gauge('GuruFocus %', float(DF.query('Ticker == @ticker').GFValuepercent.iloc[0]), -100, 100),
get_gauge('FinViz %', float(DF.query('Ticker == @ticker').FinVizTargetpercent.iloc[0]), -100, 100),
align="center") )
)
else:
if MarketBeat == "Above all Targets":
tabulator = pn.widgets.Tabulator(
DF.query(
"SmartScore>=@SmartScore[0] & SmartScore <= @SmartScore[1] & GFValuepercent>=@GFValuepercent & FinVizTargetpercent>@FinVizTargetpercent & Sector in @Sector & MarketCap>@MarketCap & MarketBeat==1"
),
height=800,
widths=200,
show_index=False,
theme="modern",
)
else:
tabulator = pn.widgets.Tabulator(
DF.query(
"SmartScore>=@SmartScore[0] & SmartScore <= @SmartScore[1] & GFValuepercent>=@GFValuepercent & FinVizTargetpercent>@FinVizTargetpercent & Sector in @Sector & MarketCap>@MarketCap"
),
height=800,
widths=200,
show_index=False,
theme="modern",
)
tabulator.disabled = True
return tabulator
pn.extension("tabulator")
bound_plot = pn.bind(
get_DF,
DF=DFmerge_tipranks_gurufocus,
ticker=ticker,
SmartScore=SmartScore,
GFValuepercent=GFValuepercent,
FinVizTargetpercent=FinVizTargetpercent,
Sector=Sector,
MarketCap=MarketCap,
MarketBeat=MarketBeat_radio_group,
)
pn.Column(
pn.Row(
pn.Column(
pn.Row(png_pane2, align="center"),
ticker,
SmartScore,
GFValuepercent,
FinVizTargetpercent,
MarketCap,
pn.Row(MarketBeat_radio_group, align="center"),
png_pane,
download_button_GuruFocus,
download_button_FinViz,
download_button_MarketBeat,
download_button_TipRanks,
download_button_Filter,
download_button_watchlist,
),
pn.Row(bound_plot, align="center"),
)
).servable(
title="Fair Value Ranking - Merged Gurufocus & Tipranks & FinViz & MorningStars & MarketBeat"
)