MicroCapLab / app.py
AmirTrader's picture
Create app.py
04e49dd verified
import gradio as gr
import pandas as pd
import numpy as np
from datetime import datetime
import os
from utils import upload_to_hf_dataset, download_from_hf_dataset
import dotenv
# Load environment variables from .env file
dotenv.load_dotenv()
#Read HF_TOKEN from .env file
HF_TOKEN = os.getenv("HF_TOKEN")
startdate = "2020-01-01"
enddate = "2025-07-01"
#Filename of parquet file on HuggingFace
# file_path = "marketsession_polygon_2020-01-01_2025-07-01.parquet"
file_path = f"marketsession_polygon_{startdate}_{enddate}.parquet"
file_path = f"{os.path.basename(file_path)}_with_premarketvolume900K_marketcap1B.parquet"
#Downloading parquet file on HuggingFace
download_from_hf_dataset(file_path = file_path, dataset_name= "AmirTrader/PennyStocks", token=HF_TOKEN, repo_type="dataset")
# Load the parquet file into a DataFrame
df_org = pd.read_parquet(file_path)
displayCols = ['Ticker', 'premarket_volume', 'marketcap(M$)', 'SharesFloat(M)', 'Rotation', 'datetime', 'Sector', 'premarket_change_from_perviousday_perc']
preferedCols = ['Ticker', 'premarket_volume', 'marketcap', 'Shares Float', 'Rotation', 'datetime',
'Sector',
'premarket_change_from_perviousday_perc',
'premarket_change_from_perviousday_high_perc',
'high_closepermarketperc', 'low_closepermarketperc',
'close_closepermarketperc', 'marketsession_3min_closepermarketperc',
'marketsession_5min_closepermarketperc',
'marketsession_10min_closepermarketperc',
'marketsession_15min_closepermarketperc',
'marketsession_30min_closepermarketperc',
'marketsession_60min_closepermarketperc',
'marketsession_120min_closepermarketperc'
]
df = df_org[preferedCols]
# Convert 'marketcap' to numeric, removing commas and converting to billions
# Step 1: Clean formatting (remove commas, if any)
df['Shares Float'] = df['Shares Float'].replace(',', '', regex=True)
# Step 2: Convert to numeric safely
df['Shares Float'] = pd.to_numeric(df['Shares Float'], errors='coerce')
# Step 3: Convert to millions with 3 decimal precision
df['SharesFloat(M)'] = (df['Shares Float'] / 1_000_000).round(3)
# Find all columns that include 'perc' in their name
perc_columns = [col for col in df.columns if 'perc' in col.lower()]
# Convert each to numeric, divide by 100, and round to 1 decimal
for col in perc_columns:
df[col] = pd.to_numeric(df[col], errors='coerce') # ensure numeric
df[col] = (df[col] / 100).round(1)
# convert datetime columns to datetime type
df['datetime'] = pd.to_datetime(df['datetime'], errors='coerce')
# Convert Rotation columsn to 2 decimal
df['Rotation'] = pd.to_numeric(df['Rotation'], errors='coerce').round(2)
#rename marketcap column to marketcap marketcap(M$)
df.rename(columns={'marketcap': 'marketcap(M$)'}, inplace=True)
# Global variables to store filter state
current_page = 0
filtered_df = None
current_query = ""
def get_total_pages():
global filtered_df
if filtered_df is None or len(filtered_df) == 0:
return 1
page_size = 20
return (len(filtered_df) + page_size - 1) // page_size
def filter_dataframe(start_dt, end_dt, query_text=""):
global filtered_df, current_page, current_query
current_page = 0 # Reset to first page when filtering
current_query = query_text
try:
# Start with the full dataset
working_df = df.copy()
# Apply date filter if provided
if start_dt and end_dt:
# Convert to datetime if they're strings
if isinstance(start_dt, str):
start = pd.to_datetime(start_dt)
else:
start = start_dt
if isinstance(end_dt, str):
end = pd.to_datetime(end_dt)
else:
end = end_dt
# Validate date range
if start > end:
return pd.DataFrame({"Error": ["Start date must be before end date"]}), "Page 1 of 1", ""
# Filter dataframe by date
mask = (working_df['datetime'] >= start) & (working_df['datetime'] <= end)
working_df = working_df.loc[mask]
# Apply query filter if provided
if query_text and query_text.strip():
try:
# Execute the query on the working dataframe
working_df = working_df.query(query_text.strip())
query_status = f"βœ… Query executed successfully. Found {len(working_df)} rows."
except Exception as query_error:
query_status = f"❌ Query error: {str(query_error)}"
# If query fails, show the error but continue with date-filtered data
pass
else:
query_status = ""
# Apply display columns filter
filtered_df = working_df[displayCols].copy() if not working_df.empty else pd.DataFrame()
return paginate_data(), get_page_info(), query_status
except Exception as e:
return pd.DataFrame({"Error": [f"Error processing request: {str(e)}"]}), "Error", f"❌ Error: {str(e)}"
def execute_query_only(query_text):
"""Execute query without changing date filters"""
global filtered_df, current_page, current_query
current_page = 0 # Reset to first page when querying
current_query = query_text
try:
# Start with current filtered data or full dataset
if filtered_df is not None and not filtered_df.empty:
# Get the current date-filtered data from the main df
working_df = df.copy()
# We need to reapply any existing date filters, but for now we'll work with full dataset
# In a more sophisticated implementation, we'd store the date filter state
else:
working_df = df.copy()
# Apply query filter if provided
if query_text and query_text.strip():
try:
# Execute the query on the working dataframe
working_df = working_df.query(query_text.strip())
query_status = f"βœ… Query executed successfully. Found {len(working_df)} rows."
except Exception as query_error:
query_status = f"❌ Query error: {str(query_error)}"
# If query fails, return current data
return paginate_data(), get_page_info(), query_status
else:
query_status = ""
# Apply display columns filter
filtered_df = working_df[displayCols].copy() if not working_df.empty else pd.DataFrame()
return paginate_data(), get_page_info(), query_status
except Exception as e:
return paginate_data(), get_page_info(), f"❌ Error: {str(e)}"
def paginate_data():
global filtered_df, current_page
if filtered_df is None or len(filtered_df) == 0:
return pd.DataFrame()
page_size = 20
total_pages = get_total_pages()
# Ensure page is within bounds
current_page = max(0, min(current_page, total_pages - 1))
start_i = current_page * page_size
page_df = filtered_df.iloc[start_i:start_i + page_size].reset_index(drop=True)
return page_df
def get_page_info():
global current_page
total_pages = get_total_pages()
total_rows = len(filtered_df) if filtered_df is not None else 0
return f"Page {current_page + 1} of {total_pages} (Total rows: {total_rows})"
def go_previous():
global current_page
if current_page > 0:
current_page -= 1
return paginate_data(), get_page_info()
def go_next():
global current_page
total_pages = get_total_pages()
if current_page < total_pages - 1:
current_page += 1
return paginate_data(), get_page_info()
def reset_filters():
global current_page, current_query
current_page = 0
current_query = ""
return startdate, enddate, ""
def get_column_info():
"""Return information about available columns for querying"""
info = "Available columns for querying:\n"
for col in displayCols:
dtype = str(df[col].dtype)
info += f"β€’ `{col}` ({dtype})\n"
info += "\nExample queries:\n"
info += "β€’ `premarket_volume > 100000`\n"
info += "β€’ `Sector == 'Technology'`\n"
info += "β€’ `Rotation > 1.5 and premarket_volume > 50000`\n"
info += "β€’ `Ticker.str.contains('AA', na=False)`\n"
return info
with gr.Blocks(css="""
.dataframe table {
font-size: 10px !important;
}
.dataframe th, .dataframe td {
padding: 4px 8px !important;
font-size: 10px !important;
}
.dataframe thead th {
font-size: 10px !important;
font-weight: bold !important;
}
.query-info {
font-family: monospace;
font-size: 12px;
background-color: #f8f9fa;
padding: 10px;
border-radius: 5px;
margin: 10px 0;
}
""") as demo:
gr.Markdown("## πŸ§ͺ Micro Cap Lab!")
with gr.Row():
# Use Textbox instead of DateTime for better compatibility
start_picker = gr.Textbox(
label="Start Date (YYYY-MM-DD)",
value=startdate,
placeholder=startdate
)
end_picker = gr.Textbox(
label="End Date (YYYY-MM-DD)",
value=enddate,
placeholder=enddate
)
# Query section
with gr.Row():
with gr.Column(scale=4):
query_input = gr.Textbox(
label="DataFrame Query",
placeholder="e.g., premarket_volume > 100000",
lines=2,
info="Enter pandas query expression (use backticks for column names with spaces)"
)
with gr.Column(scale=1):
query_btn = gr.Button("Execute Query", variant="primary")
query_status = gr.Textbox(
label="Query Status",
interactive=False,
visible=True
)
# Column information (collapsible)
with gr.Accordion("πŸ“‹ Column Information & Query Examples", open=False):
column_info = gr.Textbox(
value=get_column_info(),
label="",
interactive=False,
lines=15,
elem_classes=["query-info"]
)
output = gr.Dataframe(
label="Filtered Table",
interactive=False
)
# Pagination controls
with gr.Row():
prev_btn = gr.Button("← Previous", variant="secondary")
page_info = gr.Textbox(
value="Page 1 of 1",
interactive=False,
show_label=False,
container=False
)
next_btn = gr.Button("Next β†’", variant="secondary")
with gr.Row():
apply_btn = gr.Button("Apply Date Filter", variant="primary")
reset_btn = gr.Button("Reset All", variant="secondary")
# Event handlers
apply_btn.click(
fn=filter_dataframe,
inputs=[start_picker, end_picker, query_input],
outputs=[output, page_info, query_status]
)
query_btn.click(
fn=execute_query_only,
inputs=[query_input],
outputs=[output, page_info, query_status]
)
prev_btn.click(
fn=go_previous,
inputs=[],
outputs=[output, page_info]
)
next_btn.click(
fn=go_next,
inputs=[],
outputs=[output, page_info]
)
reset_btn.click(
fn=reset_filters,
inputs=[],
outputs=[start_picker, end_picker, query_input]
).then(
fn=filter_dataframe,
inputs=[start_picker, end_picker, query_input],
outputs=[output, page_info, query_status]
)
# Load initial data
demo.load(
fn=filter_dataframe,
inputs=[start_picker, end_picker, query_input],
outputs=[output, page_info, query_status]
)
if __name__ == "__main__":
demo.launch()