Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| import pandas as pd | |
| import yfinance as yf | |
| from plotly.subplots import make_subplots | |
| import plotly.graph_objects as go | |
| import os | |
| # Global API key (hidden from users) | |
| API_KEY = os.getenv("FMP_API_KEY") | |
| # ------------------------------- | |
| # Helper function to fetch JSON safely | |
| # ------------------------------- | |
| def safe_get_json(url, log_list=None, dimension_label=""): | |
| try: | |
| response = requests.get(url) | |
| data = response.json() | |
| return data | |
| except Exception: | |
| msg = f"Unable to retrieve historical data for {dimension_label}." | |
| if log_list is not None: | |
| log_list.append(msg) | |
| else: | |
| st.error("An error occurred while retrieving historical data. Please try again later.") | |
| return None | |
| # ------------------------------- | |
| # Dimension Functions | |
| # ------------------------------- | |
| def dimension_1_positive_roa(symbol, years_back=1, log_list=None): | |
| limit_needed = years_back + 1 | |
| income_url = ( | |
| f"https://financialmodelingprep.com/api/v3/income-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| balance_url = ( | |
| f"https://financialmodelingprep.com/api/v3/balance-sheet-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| income_data = safe_get_json(income_url, log_list, "Dimension 1") | |
| balance_data = safe_get_json(balance_url, log_list, "Dimension 1") | |
| if income_data is None or balance_data is None: | |
| return [] | |
| if len(income_data) < limit_needed or len(balance_data) < limit_needed: | |
| msg = f"Not enough historical data available to calculate the metric for {years_back} year(s)." | |
| if log_list is not None: | |
| log_list.append(msg) | |
| else: | |
| st.error(msg) | |
| return [] | |
| results = [] | |
| for i in range(years_back): | |
| current_income = income_data[i] | |
| current_balance = balance_data[i] | |
| year_or_date = current_income.get("calendarYear") or current_income.get("date", f"N/A_{i}") | |
| net_income_current = current_income.get("netIncome", 0) | |
| ta_current = current_balance.get("totalAssets", 0) | |
| ta_previous = balance_data[i+1].get("totalAssets", 0) if i+1 < len(balance_data) else 0 | |
| avg_assets = (ta_current + ta_previous) / 2 if ta_previous else 0 | |
| roa_current = net_income_current / avg_assets if avg_assets else 0 | |
| score = 1 if roa_current > 0 else 0 | |
| log_message = ( | |
| f"Dimension 1 (Positive ROA) | Year={year_or_date}: {score} => " | |
| f"NetIncome={net_income_current}, AvgAssets={int(avg_assets)}, ROA={roa_current:.4f}" | |
| ) | |
| if log_list is not None: | |
| log_list.append(log_message) | |
| results.append({"year": str(year_or_date), "score": score}) | |
| return results | |
| def dimension_2_positive_cfo(symbol, years_back=1, log_list=None): | |
| limit_needed = years_back | |
| cf_url = ( | |
| f"https://financialmodelingprep.com/api/v3/cash-flow-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| cf_data = safe_get_json(cf_url, log_list, "Dimension 2") | |
| if cf_data is None: | |
| return [] | |
| if len(cf_data) < limit_needed: | |
| msg = f"Not enough historical data available to calculate the metric for {years_back} year(s)." | |
| if log_list is not None: | |
| log_list.append(msg) | |
| return [] | |
| results = [] | |
| for i in range(years_back): | |
| record = cf_data[i] | |
| year_or_date = record.get("calendarYear") or record.get("date", f"N/A_{i}") | |
| cfo_current = record.get("operatingCashFlow", 0) | |
| score = 1 if cfo_current > 0 else 0 | |
| log_message = f"Dimension 2 (Positive CFO) | Year={year_or_date}: {score} => CFO={cfo_current}" | |
| if log_list is not None: | |
| log_list.append(log_message) | |
| results.append({"year": str(year_or_date), "score": score}) | |
| return results | |
| def dimension_3_improved_roa(symbol, years_back=1, log_list=None): | |
| limit_needed = years_back + 1 | |
| income_url = ( | |
| f"https://financialmodelingprep.com/api/v3/income-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| balance_url = ( | |
| f"https://financialmodelingprep.com/api/v3/balance-sheet-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| income_data = safe_get_json(income_url, log_list, "Dimension 3") | |
| balance_data = safe_get_json(balance_url, log_list, "Dimension 3") | |
| if income_data is None or balance_data is None: | |
| return [] | |
| if len(income_data) < limit_needed or len(balance_data) < limit_needed: | |
| msg = f"Not enough historical data available to calculate the metric for {years_back} year(s)." | |
| if log_list is not None: | |
| log_list.append(msg) | |
| return [] | |
| results = [] | |
| for i in range(years_back): | |
| current_income = income_data[i] | |
| current_balance = balance_data[i] | |
| year_or_date = current_income.get("calendarYear") or current_income.get("date", f"N/A_{i}") | |
| net_income_current = current_income.get("netIncome", 0) | |
| ta_current = current_balance.get("totalAssets", 0) | |
| if i+1 < len(income_data): | |
| net_income_previous = income_data[i+1].get("netIncome", 0) | |
| ta_previous = balance_data[i+1].get("totalAssets", 0) | |
| else: | |
| net_income_previous = 0 | |
| ta_previous = 0 | |
| avg_current = (ta_current + ta_previous) / 2 if ta_previous else 0 | |
| roa_current = net_income_current / avg_current if avg_current else 0 | |
| roa_previous = (net_income_previous / ta_previous) if ta_previous else 0 | |
| score = 1 if roa_current > roa_previous else 0 | |
| log_message = ( | |
| f"Dimension 3 (ROA Improvement) | Year={year_or_date}: {score} => " | |
| f"ROA_current={roa_current:.4f}, ROA_previous={roa_previous:.4f}" | |
| ) | |
| if log_list is not None: | |
| log_list.append(log_message) | |
| results.append({"year": str(year_or_date), "score": score}) | |
| return results | |
| def dimension_4_cfo_exceeds_net_income(symbol, years_back=1, log_list=None): | |
| limit_needed = years_back | |
| cf_url = ( | |
| f"https://financialmodelingprep.com/api/v3/cash-flow-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| income_url = ( | |
| f"https://financialmodelingprep.com/api/v3/income-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| cf_data = safe_get_json(cf_url, log_list, "Dimension 4") | |
| income_data = safe_get_json(income_url, log_list, "Dimension 4") | |
| if cf_data is None or income_data is None: | |
| return [] | |
| if len(cf_data) < limit_needed or len(income_data) < limit_needed: | |
| msg = f"Not enough historical data available to calculate the metric for {years_back} year(s)." | |
| if log_list is not None: | |
| log_list.append(msg) | |
| return [] | |
| results = [] | |
| for i in range(years_back): | |
| c = cf_data[i] | |
| inc = income_data[i] | |
| year_or_date = c.get("calendarYear") or c.get("date", f"N/A_{i}") | |
| cfo_current = c.get("operatingCashFlow", 0) | |
| net_income_current = inc.get("netIncome", 0) | |
| score = 1 if cfo_current > net_income_current else 0 | |
| log_message = ( | |
| f"Dimension 4 (CFO > Net Income) | Year={year_or_date}: {score} => " | |
| f"CFO={cfo_current}, NetIncome={net_income_current}" | |
| ) | |
| if log_list is not None: | |
| log_list.append(log_message) | |
| results.append({"year": str(year_or_date), "score": score}) | |
| return results | |
| def dimension_5_lower_leverage(symbol, years_back=1, log_list=None): | |
| limit_needed = years_back + 1 | |
| bal_url = ( | |
| f"https://financialmodelingprep.com/api/v3/balance-sheet-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| balance_data = safe_get_json(bal_url, log_list, "Dimension 5") | |
| if balance_data is None: | |
| return [] | |
| if len(balance_data) < limit_needed: | |
| msg = f"Not enough historical data available to calculate the metric for {years_back} year(s)." | |
| if log_list is not None: | |
| log_list.append(msg) | |
| return [] | |
| results = [] | |
| for i in range(years_back): | |
| current_bal = balance_data[i] | |
| year_or_date = current_bal.get("calendarYear") or current_bal.get("date", f"N/A_{i}") | |
| ltd_current = current_bal.get("longTermDebt", 0) | |
| ta_current = current_bal.get("totalAssets", 0) | |
| if i+1 < len(balance_data): | |
| ltd_previous = balance_data[i+1].get("longTermDebt", 0) | |
| ta_previous = balance_data[i+1].get("totalAssets", 0) | |
| else: | |
| ltd_previous = 0 | |
| ta_previous = 0 | |
| ratio_current = ltd_current / ta_current if ta_current else 0 | |
| ratio_previous = ltd_previous / ta_previous if ta_previous else 0 | |
| score = 1 if ratio_current < ratio_previous else 0 | |
| log_message = ( | |
| f"Dimension 5 (Lower Debt Ratio) | Year={year_or_date}: {score} => " | |
| f"DebtRatio_current={ratio_current:.4f}, DebtRatio_previous={ratio_previous:.4f}" | |
| ) | |
| if log_list is not None: | |
| log_list.append(log_message) | |
| results.append({"year": str(year_or_date), "score": score}) | |
| return results | |
| def dimension_6_higher_current_ratio(symbol, years_back=1, log_list=None): | |
| limit_needed = years_back + 1 | |
| bal_url = ( | |
| f"https://financialmodelingprep.com/api/v3/balance-sheet-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| balance_data = safe_get_json(bal_url, log_list, "Dimension 6") | |
| if balance_data is None: | |
| return [] | |
| if len(balance_data) < limit_needed: | |
| msg = f"Not enough historical data available to calculate the metric for {years_back} year(s)." | |
| if log_list is not None: | |
| log_list.append(msg) | |
| return [] | |
| results = [] | |
| for i in range(years_back): | |
| current_bal = balance_data[i] | |
| year_or_date = current_bal.get("calendarYear") or current_bal.get("date", f"N/A_{i}") | |
| ca_current = current_bal.get("totalCurrentAssets", 0) | |
| cl_current = current_bal.get("totalCurrentLiabilities", 0) | |
| cr_current = ca_current / cl_current if cl_current else 0 | |
| if i+1 < len(balance_data): | |
| ca_previous = balance_data[i+1].get("totalCurrentAssets", 0) | |
| cl_previous = balance_data[i+1].get("totalCurrentLiabilities", 0) | |
| cr_previous = ca_previous / cl_previous if cl_previous else 0 | |
| else: | |
| cr_previous = 0 | |
| score = 1 if cr_current > cr_previous else 0 | |
| log_message = ( | |
| f"Dimension 6 (Higher Current Ratio) | Year={year_or_date}: {score} => " | |
| f"CR_current={cr_current:.4f}, CR_previous={cr_previous:.4f}" | |
| ) | |
| if log_list is not None: | |
| log_list.append(log_message) | |
| results.append({"year": str(year_or_date), "score": score}) | |
| return results | |
| def dimension_7_no_new_shares(symbol, years_back=1, log_list=None): | |
| limit_needed = years_back + 1 | |
| inc_url = ( | |
| f"https://financialmodelingprep.com/api/v3/income-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| inc_data = safe_get_json(inc_url, log_list, "Dimension 7") | |
| if inc_data is None: | |
| return [] | |
| if len(inc_data) < limit_needed: | |
| msg = f"Not enough historical data available to calculate the metric for {years_back} year(s)." | |
| if log_list is not None: | |
| log_list.append(msg) | |
| return [] | |
| results = [] | |
| for i in range(years_back): | |
| current_inc = inc_data[i] | |
| year_or_date = current_inc.get("calendarYear") or current_inc.get("date", f"N/A_{i}") | |
| shares_current = current_inc.get("weightedAverageShsOut", 0) | |
| shares_previous = inc_data[i+1].get("weightedAverageShsOut", 0) if i+1 < len(inc_data) else 0 | |
| score = 1 if shares_current <= shares_previous else 0 | |
| log_message = ( | |
| f"Dimension 7 (No New Shares) | Year={year_or_date}: {score} => " | |
| f"Shares_current={shares_current}, Shares_previous={shares_previous}" | |
| ) | |
| if log_list is not None: | |
| log_list.append(log_message) | |
| results.append({"year": str(year_or_date), "score": score}) | |
| return results | |
| def dimension_8_improved_gross_margin(symbol, years_back=1, log_list=None): | |
| limit_needed = years_back + 1 | |
| inc_url = ( | |
| f"https://financialmodelingprep.com/api/v3/income-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| inc_data = safe_get_json(inc_url, log_list, "Dimension 8") | |
| if inc_data is None: | |
| return [] | |
| if len(inc_data) < limit_needed: | |
| msg = f"Not enough historical data available to calculate the metric for {years_back} year(s)." | |
| if log_list is not None: | |
| log_list.append(msg) | |
| return [] | |
| results = [] | |
| for i in range(years_back): | |
| current_inc = inc_data[i] | |
| year_or_date = current_inc.get("calendarYear") or current_inc.get("date", f"N/A_{i}") | |
| rev_current = current_inc.get("revenue", 0) | |
| gp_current = current_inc.get("grossProfit", 0) | |
| gm_current = gp_current / rev_current if rev_current else 0 | |
| if i+1 < len(inc_data): | |
| rev_previous = inc_data[i+1].get("revenue", 0) | |
| gp_previous = inc_data[i+1].get("grossProfit", 0) | |
| gm_previous = gp_previous / rev_previous if rev_previous else 0 | |
| else: | |
| gm_previous = 0 | |
| score = 1 if gm_current > gm_previous else 0 | |
| log_message = ( | |
| f"Dimension 8 (Gross Margin Up) | Year={year_or_date}: {score} => " | |
| f"GM_current={gm_current:.4f}, GM_previous={gm_previous:.4f}" | |
| ) | |
| if log_list is not None: | |
| log_list.append(log_message) | |
| results.append({"year": str(year_or_date), "score": score}) | |
| return results | |
| def dimension_9_improved_ato(symbol, years_back=1, log_list=None): | |
| limit_needed = years_back + 1 | |
| inc_url = ( | |
| f"https://financialmodelingprep.com/api/v3/income-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| bal_url = ( | |
| f"https://financialmodelingprep.com/api/v3/balance-sheet-statement/{symbol}" | |
| f"?limit={limit_needed}&period=annual&apikey={API_KEY}" | |
| ) | |
| inc_data = safe_get_json(inc_url, log_list, "Dimension 9") | |
| bal_data = safe_get_json(bal_url, log_list, "Dimension 9") | |
| if inc_data is None or bal_data is None: | |
| return [] | |
| if len(inc_data) < limit_needed or len(bal_data) < limit_needed: | |
| msg = f"Not enough historical data available to calculate the metric for {years_back} year(s)." | |
| if log_list is not None: | |
| log_list.append(msg) | |
| return [] | |
| results = [] | |
| for i in range(years_back): | |
| inc_current = inc_data[i] | |
| bal_current = bal_data[i] | |
| year_or_date = inc_current.get("calendarYear") or inc_current.get("date", f"N/A_{i}") | |
| rev_current = inc_current.get("revenue", 0) | |
| ta_current = bal_current.get("totalAssets", 0) | |
| ta_prev_for_cur = bal_data[i+1].get("totalAssets", 0) if i+1 < len(bal_data) else 0 | |
| avg_assets_current = (ta_current + ta_prev_for_cur) / 2 if ta_prev_for_cur else 0 | |
| ato_current = rev_current / avg_assets_current if avg_assets_current else 0 | |
| if i+1 < len(inc_data) and i+2 < len(bal_data): | |
| rev_previous = inc_data[i+1].get("revenue", 0) | |
| ta_previous = bal_data[i+1].get("totalAssets", 0) | |
| ato_previous = rev_previous / ta_previous if ta_previous else 0 | |
| else: | |
| ato_previous = 0 | |
| score = 1 if ato_current > ato_previous else 0 | |
| log_message = ( | |
| f"Dimension 9 (Asset Turnover Up) | Year={year_or_date}: {score} => " | |
| f"ATO_current={ato_current:.4f}, ATO_previous={ato_previous:.4f}" | |
| ) | |
| if log_list is not None: | |
| log_list.append(log_message) | |
| results.append({"year": str(year_or_date), "score": score}) | |
| return results | |
| # ------------------------------- | |
| # Aggregator Function: Combine all dimensions over time | |
| # ------------------------------- | |
| def calculate_piotroski_scores_over_time(symbol, years_back=5, log_list=None): | |
| d1_list = dimension_1_positive_roa(symbol, years_back, log_list=log_list) | |
| d2_list = dimension_2_positive_cfo(symbol, years_back, log_list=log_list) | |
| d3_list = dimension_3_improved_roa(symbol, years_back, log_list=log_list) | |
| d4_list = dimension_4_cfo_exceeds_net_income(symbol, years_back, log_list=log_list) | |
| d5_list = dimension_5_lower_leverage(symbol, years_back, log_list=log_list) | |
| d6_list = dimension_6_higher_current_ratio(symbol, years_back, log_list=log_list) | |
| d7_list = dimension_7_no_new_shares(symbol, years_back, log_list=log_list) | |
| d8_list = dimension_8_improved_gross_margin(symbol, years_back, log_list=log_list) | |
| d9_list = dimension_9_improved_ato(symbol, years_back, log_list=log_list) | |
| rows = [] | |
| for i in range(years_back): | |
| year_str = d1_list[i]["year"] if i < len(d1_list) else f"N/A_{i}" | |
| dim1 = d1_list[i]["score"] if i < len(d1_list) else 0 | |
| dim2 = d2_list[i]["score"] if i < len(d2_list) else 0 | |
| dim3 = d3_list[i]["score"] if i < len(d3_list) else 0 | |
| dim4 = d4_list[i]["score"] if i < len(d4_list) else 0 | |
| dim5 = d5_list[i]["score"] if i < len(d5_list) else 0 | |
| dim6 = d6_list[i]["score"] if i < len(d6_list) else 0 | |
| dim7 = d7_list[i]["score"] if i < len(d7_list) else 0 | |
| dim8 = d8_list[i]["score"] if i < len(d8_list) else 0 | |
| dim9 = d9_list[i]["score"] if i < len(d9_list) else 0 | |
| total_score = sum([dim1, dim2, dim3, dim4, dim5, dim6, dim7, dim8, dim9]) | |
| rows.append({ | |
| "year": year_str, | |
| "dim1_roa": dim1, | |
| "dim2_cfo": dim2, | |
| "dim3_roa_improvement": dim3, | |
| "dim4_cfo_over_ni": dim4, | |
| "dim5_lower_debt_ratio": dim5, | |
| "dim6_higher_current_ratio": dim6, | |
| "dim7_no_new_shares": dim7, | |
| "dim8_gross_margin_up": dim8, | |
| "dim9_asset_turnover_up": dim9, | |
| "total_score": total_score | |
| }) | |
| df = pd.DataFrame(rows) | |
| return df | |
| # ------------------------------- | |
| # Fetch annual stock prices using yfinance | |
| # ------------------------------- | |
| def fetch_stock_prices_for_years(symbol, df_scores): | |
| try: | |
| df_scores["year_int"] = df_scores["year"].astype(int) | |
| except Exception: | |
| st.error("Error processing year values.") | |
| return df_scores | |
| min_year = df_scores["year_int"].min() | |
| max_year = df_scores["year_int"].max() | |
| start_date = f"{min_year}-01-01" | |
| end_date = f"{max_year}-12-31" | |
| try: | |
| ticker_obj = yf.Ticker(symbol) | |
| hist = ticker_obj.history(start=start_date, end=end_date) | |
| except Exception: | |
| st.error("Error retrieving stock price data.") | |
| return df_scores | |
| year_to_price = {} | |
| for y in df_scores["year_int"].unique(): | |
| try: | |
| data_y = hist.loc[str(y)] if str(y) in hist.index.strftime("%Y") else pd.DataFrame() | |
| except Exception: | |
| data_y = pd.DataFrame() | |
| if data_y.empty: | |
| year_to_price[y] = None | |
| else: | |
| last_close = data_y["Close"].iloc[-1] | |
| year_to_price[y] = float(f"{last_close:.2f}") | |
| df_scores["stock_price"] = df_scores["year_int"].map(year_to_price) | |
| return df_scores | |
| # ------------------------------- | |
| # Set wide layout and page title | |
| # ------------------------------- | |
| st.set_page_config(page_title="Piotroski Score Analysis", layout="wide") | |
| st.title("Piotroski Score Analysis") | |
| st.markdown( | |
| """ | |
| This tool calculates the Piotroski F-Score over time for a given stock to investigate its financial health and performance trends. | |
| Simply adjust the parameters in the sidebar and click **Run Analysis** to view detailed scores, its decomposition, and interactive visualizations. | |
| """ | |
| ) | |
| # ------------------------------- | |
| # Explanation of Calculations Expander | |
| # ------------------------------- | |
| with st.expander("F-Score Calculations", expanded=False): | |
| st.markdown( | |
| """ | |
| The Piotroski F-Score is a nine-point system designed to identify financially strong companies. | |
| Each of the nine dimensions is binary (1 if favorable, 0 if not) and falls into groups like Profitability, Leverage & Liquidity, and Operational Efficiency. | |
| """ | |
| ) | |
| st.markdown("##### 1. Positive Return on Assets (ROA)") | |
| st.markdown( | |
| """ | |
| Measures how effectively a company uses its assets to generate net income. | |
| Calculated as: | |
| """ | |
| ) | |
| st.latex(r"\text{ROA} = \frac{\text{Net Income}}{\frac{\text{Total Assets}_{\text{current}} + \text{Total Assets}_{\text{previous}}}{2}}") | |
| st.markdown("A positive ROA indicates the company is profitable relative to its asset base.") | |
| st.markdown("##### 2. Positive Operating Cash Flow (CFO)") | |
| st.markdown( | |
| """ | |
| Evaluates whether the company generates cash from its core operations. | |
| Expressed simply as: | |
| """ | |
| ) | |
| st.latex(r"\text{CFO} > 0") | |
| st.markdown("A positive CFO suggests sustainable business operations.") | |
| st.markdown("##### 3. Improvement in ROA") | |
| st.markdown( | |
| """ | |
| Compares the current year's ROA to the previous year's to indicate improving profitability. | |
| In formula form: | |
| """ | |
| ) | |
| st.latex(r"\Delta\text{ROA} = \text{ROA}_{\text{current}} - \text{ROA}_{\text{previous}} > 0") | |
| st.markdown("If the difference is positive, the score is 1.") | |
| st.markdown("##### 4. CFO Exceeds Net Income") | |
| st.markdown( | |
| """ | |
| Checks that the cash flow from operations is greater than net income, implying high earnings quality. | |
| Expressed as: | |
| """ | |
| ) | |
| st.latex(r"\text{CFO} > \text{Net Income}") | |
| st.markdown("If true, the indicator receives a score of 1.") | |
| st.markdown("##### 5. Decrease in Long-Term Debt Ratio") | |
| st.markdown( | |
| """ | |
| Evaluates whether the company is reducing its financial leverage over time. | |
| Calculated as: | |
| """ | |
| ) | |
| st.latex(r"\text{Debt Ratio} = \frac{\text{Long-Term Debt}}{\text{Total Assets}}") | |
| st.markdown("A lower debt ratio in the current year versus the previous year scores 1.") | |
| st.markdown("##### 6. Improvement in Current Ratio") | |
| st.markdown( | |
| """ | |
| Assesses short-term liquidity by comparing current assets to current liabilities. | |
| Calculated as: | |
| """ | |
| ) | |
| st.latex(r"\text{Current Ratio} = \frac{\text{Total Current Assets}}{\text{Total Current Liabilities}}") | |
| st.markdown("An increase in the current ratio year-over-year signals stronger liquidity.") | |
| st.markdown("##### 7. No New Shares Issued") | |
| st.markdown( | |
| """ | |
| Checks that the weighted average shares outstanding have not increased, avoiding dilution. | |
| Expressed as: | |
| """ | |
| ) | |
| st.latex(r"\text{Weighted Average Shares}_{\text{current}} \leq \text{Weighted Average Shares}_{\text{previous}}") | |
| st.markdown("If true, the score is 1.") | |
| st.markdown("##### 8. Improvement in Gross Margin") | |
| st.markdown("Gross Margin is defined as:") | |
| st.latex(r"\text{Gross Margin} = \frac{\text{Gross Profit}}{\text{Revenue}}") | |
| st.markdown("An increase in gross margin indicates better cost management or pricing power.") | |
| st.markdown("##### 9. Improvement in Asset Turnover") | |
| st.markdown( | |
| """ | |
| Measures how efficiently a company uses its assets to generate revenue. | |
| Calculated as: | |
| """ | |
| ) | |
| st.latex(r"\text{Asset Turnover} = \frac{\text{Revenue}}{\frac{\text{Total Assets}_{\text{current}} + \text{Total Assets}_{\text{previous}}}{2}}") | |
| st.markdown("An increase in asset turnover indicates more efficient use of assets.") | |
| # ------------------------------- | |
| # Sidebar: Parameters Expander | |
| # ------------------------------- | |
| with st.sidebar.expander("Parameters", expanded=True): | |
| ticker = st.text_input("Ticker Symbol", value="MSFT", | |
| help="Enter the stock ticker symbol (e.g., MSFT)") | |
| years_back = st.slider("Number of Years", min_value=1, max_value=20, value=10, help="Set how many past years to analyze") | |
| run_analysis = st.button("Run Analysis") | |
| # ------------------------------- | |
| # Run the analysis on button click | |
| # ------------------------------- | |
| if run_analysis: | |
| with st.spinner("Running analysis. Please wait..."): | |
| raw_logs = [] | |
| df_scores = calculate_piotroski_scores_over_time(ticker, years_back, log_list=raw_logs) | |
| df_scores = fetch_stock_prices_for_years(ticker, df_scores) | |
| dim_cols = [ | |
| "dim1_roa", "dim2_cfo", "dim3_roa_improvement", | |
| "dim4_cfo_over_ni", "dim5_lower_debt_ratio", | |
| "dim6_higher_current_ratio", "dim7_no_new_shares", | |
| "dim8_gross_margin_up", "dim9_asset_turnover_up" | |
| ] | |
| df_plot = df_scores.sort_values(by="year", ascending=True) | |
| # Create Plotly figure with secondary y-axis | |
| fig = make_subplots(specs=[[{"secondary_y": True}]]) | |
| for col in dim_cols: | |
| fig.add_trace( | |
| go.Bar( | |
| x=df_plot["year"], | |
| y=df_plot[col], | |
| name=col, | |
| text=df_plot[col], | |
| textposition="inside" | |
| ), | |
| secondary_y=False | |
| ) | |
| # Add annotations for total score above each bar | |
| for idx, row in df_plot.iterrows(): | |
| fig.add_annotation( | |
| x=row["year"], | |
| y=row["total_score"] + 0.1, | |
| text=f"Score={int(row['total_score'])}", | |
| showarrow=False, | |
| font=dict(color="white", size=10) | |
| ) | |
| fig.add_trace( | |
| go.Scatter( | |
| x=df_plot["year"], | |
| y=df_plot["stock_price"], | |
| mode="lines+markers", | |
| name="Stock Price", | |
| marker=dict(color="red"), | |
| line=dict(width=2) | |
| ), | |
| secondary_y=True | |
| ) | |
| fig.update_xaxes( | |
| tickmode='array', | |
| tickvals=df_plot["year"].tolist(), | |
| ticktext=df_plot["year"].tolist() | |
| ) | |
| fig.update_layout( | |
| height=800, | |
| barmode="stack", | |
| title_text=f"Piotroski Dimensions for {ticker} with Stock Price", | |
| xaxis_title="Year", | |
| yaxis_title="Dimension Scores (Stacked)", | |
| legend=dict(orientation="h", yanchor="bottom", y=1.20), | |
| margin=dict(b=150) | |
| ) | |
| fig.update_yaxes(title_text="Stock Price (USD)", secondary_y=True) | |
| st.subheader("Results") | |
| with st.expander("Raw Calculation Logs", expanded=False): | |
| st.markdown("Below are the raw logs for each metric's calculation:") | |
| for log in raw_logs: | |
| st.text(log) | |
| st.markdown("##### DataFrame") | |
| with st.expander("DataFrame", expanded=False): | |
| st.dataframe(df_scores) | |
| st.markdown("##### Time Series Plot") | |
| st.plotly_chart(fig, use_container_width=True) | |
| st.markdown("##### Interpretation of the results") | |
| with st.expander("interpretation of results", expanded=False): | |
| for idx, row in df_scores.iterrows(): | |
| year_label = row["year"] | |
| st.markdown(f"##### {year_label}") | |
| weaknesses = [] | |
| if row["dim1_roa"] == 0: | |
| weaknesses.append("ROA is not positive. This may indicate lower profit relative to assets.") | |
| if row["dim2_cfo"] == 0: | |
| weaknesses.append("CFO is negative or zero. Operations did not produce sufficient cash flow.") | |
| if row["dim3_roa_improvement"] == 0: | |
| weaknesses.append("ROA did not improve. Asset profitability may be stagnant.") | |
| if row["dim4_cfo_over_ni"] == 0: | |
| weaknesses.append("CFO is not higher than net income. Earnings quality could be weak.") | |
| if row["dim5_lower_debt_ratio"] == 0: | |
| weaknesses.append("Debt ratio did not decrease. Leverage has not improved.") | |
| if row["dim6_higher_current_ratio"] == 0: | |
| weaknesses.append("Current ratio is not higher than before. Short-term liquidity did not improve.") | |
| if row["dim7_no_new_shares"] == 0: | |
| weaknesses.append("Shares outstanding increased. This may dilute existing shareholders.") | |
| if row["dim8_gross_margin_up"] == 0: | |
| weaknesses.append("Gross margin did not rise. Cost or pricing factors may need attention.") | |
| if row["dim9_asset_turnover_up"] == 0: | |
| weaknesses.append("Asset turnover did not increase. Efficiency in using assets could be better.") | |
| if weaknesses: | |
| weakness_text = "; ".join(weaknesses) | |
| st.markdown(f"**Key Weaknesses:** {weakness_text}") | |
| else: | |
| st.markdown("No identified weaknesses in this year's metrics. Scores suggest strong performance.") | |
| st.markdown("---") | |
| hide_streamlit_style = """ | |
| <style> | |
| #MainMenu {visibility: hidden;} | |
| footer {visibility: hidden;} | |
| </style> | |
| """ | |
| st.markdown(hide_streamlit_style, unsafe_allow_html=True) | |