import streamlit as st import datetime import pandas as pd import numpy as np import matplotlib.pyplot as plt import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots st.set_page_config(layout="wide") @st.cache_data def create_datafram(start_date, num_prices, base_price, price_ratio): # Generate timestamps (assuming daily data) dates = pd.date_range(start=start_date, periods=num_prices) # Generate random prices prices = [base_price] # Initial value for _ in range(num_prices - 1): new_value = prices[-1] + (np.random.randn()* price_ratio) # AR(1) process prices.append(new_value) # Create the DataFrame data = { 'timestamp': dates, 'price': prices } return pd.DataFrame(data) st.header('Create Price Data', divider=True) col1, col2, col3, col4 = st.columns(4) with col1: date = st.date_input("Date Start", datetime.date(2019, 7, 6)) with col2: steps = st.number_input("Time steps (days)", value=365) with col3: price = st.number_input("Base Price", value = 100) with col4: ratio = st.number_input("Fluctuation factor [0 - 1]", value=0.9) if 'data' not in st.session_state: st.session_state['data'] = create_datafram(date, steps, price, ratio) if st.button('Create Time-series data'): st.session_state.data = create_datafram(date, steps, price, ratio) st.header('Show Price Data', divider=True) col5, col6 = st.columns((1,2)) with col5: st.dataframe(st.session_state.data, use_container_width=True) with col6: fig = px.line(st.session_state.data, x="timestamp", y="price", title='Time Series Price Data') st.plotly_chart(fig, use_container_width=True) #st.line_chart(st.session_state.data, x="timestamp", y="price", use_container_width=True) def cal_ipa(df_in: pd.DataFrame, window_size: int = 120, std_adjust : float = 2.0, ipa_col: str='IPA', adj_price: bool= False): if adj_price: df_in['price'] = df_in['price'] - (1 - df_in['price'].rolling(window=window_size).std()) df_in[ipa_col] = abs((df_in['price'] - df_in['price'].rolling(window=window_size).mean()) / (df_in['price'].rolling(window=window_size).std()*std_adjust)) return df_in def cal_anomaly(df_in): return ... st.header('Anomaly IPA Analysis', divider=True) a_col1, a_col2, a_col3, a_col4 = st.columns(4) with a_col1: window_quater = st.number_input("Quater window (days)", value=120) with a_col2: window_month = st.number_input("Monthly window (days)", value=30) with a_col3: std_adj = st.number_input("Standard Deviation Adjustment", value=2.5) with a_col4: adj_price = st.toggle("Adjust Price") b_col1, b_col2 = st.columns(2) with b_col1: gamma = st.number_input("Gamma adjustment", value=0.8) if 'ipa_df' not in st.session_state: st.session_state['ipa_df'] = st.session_state.data.copy() else: st.session_state.ipa_df = st.session_state.data.copy() if not adj_price: st.session_state.ipa_df = st.session_state.data.copy() st.session_state.ipa_df = cal_ipa(st.session_state.ipa_df, window_size=window_month, std_adjust=std_adj, ipa_col='IPA_m', adj_price=adj_price) st.session_state.ipa_df = cal_ipa(st.session_state.ipa_df, window_size=window_quater, std_adjust=std_adj, ipa_col='IPA_q', adj_price=adj_price) w_adj = np.random.uniform(low=0.9, high=1.0, size=steps) st.session_state.ipa_df['IPA_q'] = st.session_state.ipa_df['IPA_q'] * w_adj st.session_state.ipa_df['IPA_m'] = st.session_state.ipa_df['IPA_m'] * w_adj st.session_state.ipa_df['ipa_adj'] = gamma* st.session_state.ipa_df['IPA_m'] + (1-gamma)*st.session_state.ipa_df['IPA_q'] st.session_state.ipa_df['Warning'] = 0 #df.loc[df['IPA'] < 0.5, 'Warning'] = 'Normal' st.session_state.ipa_df.loc[st.session_state.ipa_df['ipa_adj'] >= 0.5, 'Warning'] = 1 st.session_state.ipa_df.loc[st.session_state.ipa_df['ipa_adj'] >= 1.0, 'Warning'] = 2 c_col1, c_col2 = st.columns((1,2)) with c_col1: st.dataframe(st.session_state.ipa_df) with c_col2: # fig, ax1 = plt.subplots(figsize=(14, 5), dpi=160) # ax2 = ax1.twinx() # ax1.plot(st.session_state.ipa_df['price']) # #ax1.set_ylim(90, 150) # ax1.set_xlim(120, 365) # #ax2.plot(df['IPA'], c='red') # ax2.fill_between(st.session_state.ipa_df.index, st.session_state.ipa_df['ipa_adj'], color='blue', alpha=0.1) # ax2.fill_between(st.session_state.ipa_df.index, st.session_state.ipa_df['Warning'], color='orange', alpha=0.3) # ax2.set_xlim(120, 365) # ax2.set_ylim(0, 2) # st.pyplot(fig) # ================================================================= fig = go.Figure() fig.add_trace(go.Scatter( x=st.session_state.ipa_df['timestamp'], y=st.session_state.ipa_df['price'], hoverinfo='x+y', mode='lines', line=dict(width=1.0, color='rgb(100, 100, 100)'), name='Price' )) fig.add_trace( go.Scatter( x=st.session_state.ipa_df['timestamp'], y=st.session_state.ipa_df['ipa_adj'], hoverinfo='x+y', mode='lines', line=dict(width=1.0, color='rgb(0, 230, 230)'), stackgroup='one', name='IPA Value', yaxis='y2' ) ,) fig.add_trace( go.Scatter( x=st.session_state.ipa_df['timestamp'], y=st.session_state.ipa_df['Warning'], hoverinfo='x+y', mode='lines', line=dict(width=1.0, color='rgb(230, 172, 0)'), stackgroup='one', name='Anomaly Index', yaxis='y3' ) ,) fig.update_layout( legend=dict(orientation="h"), yaxis=dict( title=dict(text="Price"), side="left", range=(min(st.session_state.ipa_df['price']*0.99), max(st.session_state.ipa_df['price']*1.01)), ), yaxis2=dict( title='', side="right", range=(0, 2), overlaying="y", showgrid=False, ), yaxis3=dict( title='Anomaly Index', side="right", range=(0, 2), overlaying="y", showgrid=False, ), ) #fig.update_yaxes(title_text="primary yaxis title") #fig.update_yaxes(title_text="secondary yaxis title", secondary_y=True) fig.update_layout( title_text="IPA Anomaly detection chart", ) st.plotly_chart(fig)