Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime | |
| import os | |
| from utils import upload_to_hf_dataset, download_from_hf_dataset | |
| import dotenv | |
| # Load environment variables from .env file | |
| dotenv.load_dotenv() | |
| #Read HF_TOKEN from .env file | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| startdate = "2020-01-01" | |
| enddate = "2025-07-01" | |
| #Filename of parquet file on HuggingFace | |
| # file_path = "marketsession_polygon_2020-01-01_2025-07-01.parquet" | |
| file_path = f"marketsession_polygon_{startdate}_{enddate}.parquet" | |
| file_path = f"{os.path.basename(file_path)}_with_premarketvolume900K_marketcap1B.parquet" | |
| #Downloading parquet file on HuggingFace | |
| download_from_hf_dataset(file_path = file_path, dataset_name= "AmirTrader/PennyStocks", token=HF_TOKEN, repo_type="dataset") | |
| # Load the parquet file into a DataFrame | |
| df_org = pd.read_parquet(file_path) | |
| displayCols = ['Ticker', 'premarket_volume', 'marketcap(M$)', 'SharesFloat(M)', 'Rotation', 'datetime', 'Sector', 'premarket_change_from_perviousday_perc'] | |
| preferedCols = ['Ticker', 'premarket_volume', 'marketcap', 'Shares Float', 'Rotation', 'datetime', | |
| 'Sector', | |
| 'premarket_change_from_perviousday_perc', | |
| 'premarket_change_from_perviousday_high_perc', | |
| 'high_closepermarketperc', 'low_closepermarketperc', | |
| 'close_closepermarketperc', 'marketsession_3min_closepermarketperc', | |
| 'marketsession_5min_closepermarketperc', | |
| 'marketsession_10min_closepermarketperc', | |
| 'marketsession_15min_closepermarketperc', | |
| 'marketsession_30min_closepermarketperc', | |
| 'marketsession_60min_closepermarketperc', | |
| 'marketsession_120min_closepermarketperc' | |
| ] | |
| df = df_org[preferedCols] | |
| # Convert 'marketcap' to numeric, removing commas and converting to billions | |
| # Step 1: Clean formatting (remove commas, if any) | |
| df['Shares Float'] = df['Shares Float'].replace(',', '', regex=True) | |
| # Step 2: Convert to numeric safely | |
| df['Shares Float'] = pd.to_numeric(df['Shares Float'], errors='coerce') | |
| # Step 3: Convert to millions with 3 decimal precision | |
| df['SharesFloat(M)'] = (df['Shares Float'] / 1_000_000).round(3) | |
| # Find all columns that include 'perc' in their name | |
| perc_columns = [col for col in df.columns if 'perc' in col.lower()] | |
| # Convert each to numeric, divide by 100, and round to 1 decimal | |
| for col in perc_columns: | |
| df[col] = pd.to_numeric(df[col], errors='coerce') # ensure numeric | |
| df[col] = (df[col] / 100).round(1) | |
| # convert datetime columns to datetime type | |
| df['datetime'] = pd.to_datetime(df['datetime'], errors='coerce') | |
| # Convert Rotation columsn to 2 decimal | |
| df['Rotation'] = pd.to_numeric(df['Rotation'], errors='coerce').round(2) | |
| #rename marketcap column to marketcap marketcap(M$) | |
| df.rename(columns={'marketcap': 'marketcap(M$)'}, inplace=True) | |
| # Global variables to store filter state | |
| current_page = 0 | |
| filtered_df = None | |
| current_query = "" | |
| def get_total_pages(): | |
| global filtered_df | |
| if filtered_df is None or len(filtered_df) == 0: | |
| return 1 | |
| page_size = 20 | |
| return (len(filtered_df) + page_size - 1) // page_size | |
| def filter_dataframe(start_dt, end_dt, query_text=""): | |
| global filtered_df, current_page, current_query | |
| current_page = 0 # Reset to first page when filtering | |
| current_query = query_text | |
| try: | |
| # Start with the full dataset | |
| working_df = df.copy() | |
| # Apply date filter if provided | |
| if start_dt and end_dt: | |
| # Convert to datetime if they're strings | |
| if isinstance(start_dt, str): | |
| start = pd.to_datetime(start_dt) | |
| else: | |
| start = start_dt | |
| if isinstance(end_dt, str): | |
| end = pd.to_datetime(end_dt) | |
| else: | |
| end = end_dt | |
| # Validate date range | |
| if start > end: | |
| return pd.DataFrame({"Error": ["Start date must be before end date"]}), "Page 1 of 1", "" | |
| # Filter dataframe by date | |
| mask = (working_df['datetime'] >= start) & (working_df['datetime'] <= end) | |
| working_df = working_df.loc[mask] | |
| # Apply query filter if provided | |
| if query_text and query_text.strip(): | |
| try: | |
| # Execute the query on the working dataframe | |
| working_df = working_df.query(query_text.strip()) | |
| query_status = f"β Query executed successfully. Found {len(working_df)} rows." | |
| except Exception as query_error: | |
| query_status = f"β Query error: {str(query_error)}" | |
| # If query fails, show the error but continue with date-filtered data | |
| pass | |
| else: | |
| query_status = "" | |
| # Apply display columns filter | |
| filtered_df = working_df[displayCols].copy() if not working_df.empty else pd.DataFrame() | |
| return paginate_data(), get_page_info(), query_status | |
| except Exception as e: | |
| return pd.DataFrame({"Error": [f"Error processing request: {str(e)}"]}), "Error", f"β Error: {str(e)}" | |
| def execute_query_only(query_text): | |
| """Execute query without changing date filters""" | |
| global filtered_df, current_page, current_query | |
| current_page = 0 # Reset to first page when querying | |
| current_query = query_text | |
| try: | |
| # Start with current filtered data or full dataset | |
| if filtered_df is not None and not filtered_df.empty: | |
| # Get the current date-filtered data from the main df | |
| working_df = df.copy() | |
| # We need to reapply any existing date filters, but for now we'll work with full dataset | |
| # In a more sophisticated implementation, we'd store the date filter state | |
| else: | |
| working_df = df.copy() | |
| # Apply query filter if provided | |
| if query_text and query_text.strip(): | |
| try: | |
| # Execute the query on the working dataframe | |
| working_df = working_df.query(query_text.strip()) | |
| query_status = f"β Query executed successfully. Found {len(working_df)} rows." | |
| except Exception as query_error: | |
| query_status = f"β Query error: {str(query_error)}" | |
| # If query fails, return current data | |
| return paginate_data(), get_page_info(), query_status | |
| else: | |
| query_status = "" | |
| # Apply display columns filter | |
| filtered_df = working_df[displayCols].copy() if not working_df.empty else pd.DataFrame() | |
| return paginate_data(), get_page_info(), query_status | |
| except Exception as e: | |
| return paginate_data(), get_page_info(), f"β Error: {str(e)}" | |
| def paginate_data(): | |
| global filtered_df, current_page | |
| if filtered_df is None or len(filtered_df) == 0: | |
| return pd.DataFrame() | |
| page_size = 20 | |
| total_pages = get_total_pages() | |
| # Ensure page is within bounds | |
| current_page = max(0, min(current_page, total_pages - 1)) | |
| start_i = current_page * page_size | |
| page_df = filtered_df.iloc[start_i:start_i + page_size].reset_index(drop=True) | |
| return page_df | |
| def get_page_info(): | |
| global current_page | |
| total_pages = get_total_pages() | |
| total_rows = len(filtered_df) if filtered_df is not None else 0 | |
| return f"Page {current_page + 1} of {total_pages} (Total rows: {total_rows})" | |
| def go_previous(): | |
| global current_page | |
| if current_page > 0: | |
| current_page -= 1 | |
| return paginate_data(), get_page_info() | |
| def go_next(): | |
| global current_page | |
| total_pages = get_total_pages() | |
| if current_page < total_pages - 1: | |
| current_page += 1 | |
| return paginate_data(), get_page_info() | |
| def reset_filters(): | |
| global current_page, current_query | |
| current_page = 0 | |
| current_query = "" | |
| return startdate, enddate, "" | |
| def get_column_info(): | |
| """Return information about available columns for querying""" | |
| info = "Available columns for querying:\n" | |
| for col in displayCols: | |
| dtype = str(df[col].dtype) | |
| info += f"β’ `{col}` ({dtype})\n" | |
| info += "\nExample queries:\n" | |
| info += "β’ `premarket_volume > 100000`\n" | |
| info += "β’ `Sector == 'Technology'`\n" | |
| info += "β’ `Rotation > 1.5 and premarket_volume > 50000`\n" | |
| info += "β’ `Ticker.str.contains('AA', na=False)`\n" | |
| return info | |
| with gr.Blocks(css=""" | |
| .dataframe table { | |
| font-size: 10px !important; | |
| } | |
| .dataframe th, .dataframe td { | |
| padding: 4px 8px !important; | |
| font-size: 10px !important; | |
| } | |
| .dataframe thead th { | |
| font-size: 10px !important; | |
| font-weight: bold !important; | |
| } | |
| .query-info { | |
| font-family: monospace; | |
| font-size: 12px; | |
| background-color: #f8f9fa; | |
| padding: 10px; | |
| border-radius: 5px; | |
| margin: 10px 0; | |
| } | |
| """) as demo: | |
| gr.Markdown("## π§ͺ Micro Cap Lab!") | |
| with gr.Row(): | |
| # Use Textbox instead of DateTime for better compatibility | |
| start_picker = gr.Textbox( | |
| label="Start Date (YYYY-MM-DD)", | |
| value=startdate, | |
| placeholder=startdate | |
| ) | |
| end_picker = gr.Textbox( | |
| label="End Date (YYYY-MM-DD)", | |
| value=enddate, | |
| placeholder=enddate | |
| ) | |
| # Query section | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| query_input = gr.Textbox( | |
| label="DataFrame Query", | |
| placeholder="e.g., premarket_volume > 100000", | |
| lines=2, | |
| info="Enter pandas query expression (use backticks for column names with spaces)" | |
| ) | |
| with gr.Column(scale=1): | |
| query_btn = gr.Button("Execute Query", variant="primary") | |
| query_status = gr.Textbox( | |
| label="Query Status", | |
| interactive=False, | |
| visible=True | |
| ) | |
| # Column information (collapsible) | |
| with gr.Accordion("π Column Information & Query Examples", open=False): | |
| column_info = gr.Textbox( | |
| value=get_column_info(), | |
| label="", | |
| interactive=False, | |
| lines=15, | |
| elem_classes=["query-info"] | |
| ) | |
| output = gr.Dataframe( | |
| label="Filtered Table", | |
| interactive=False | |
| ) | |
| # Pagination controls | |
| with gr.Row(): | |
| prev_btn = gr.Button("β Previous", variant="secondary") | |
| page_info = gr.Textbox( | |
| value="Page 1 of 1", | |
| interactive=False, | |
| show_label=False, | |
| container=False | |
| ) | |
| next_btn = gr.Button("Next β", variant="secondary") | |
| with gr.Row(): | |
| apply_btn = gr.Button("Apply Date Filter", variant="primary") | |
| reset_btn = gr.Button("Reset All", variant="secondary") | |
| # Event handlers | |
| apply_btn.click( | |
| fn=filter_dataframe, | |
| inputs=[start_picker, end_picker, query_input], | |
| outputs=[output, page_info, query_status] | |
| ) | |
| query_btn.click( | |
| fn=execute_query_only, | |
| inputs=[query_input], | |
| outputs=[output, page_info, query_status] | |
| ) | |
| prev_btn.click( | |
| fn=go_previous, | |
| inputs=[], | |
| outputs=[output, page_info] | |
| ) | |
| next_btn.click( | |
| fn=go_next, | |
| inputs=[], | |
| outputs=[output, page_info] | |
| ) | |
| reset_btn.click( | |
| fn=reset_filters, | |
| inputs=[], | |
| outputs=[start_picker, end_picker, query_input] | |
| ).then( | |
| fn=filter_dataframe, | |
| inputs=[start_picker, end_picker, query_input], | |
| outputs=[output, page_info, query_status] | |
| ) | |
| # Load initial data | |
| demo.load( | |
| fn=filter_dataframe, | |
| inputs=[start_picker, end_picker, query_input], | |
| outputs=[output, page_info, query_status] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |