AlirezaX2's picture
Initial clean deploy
a84c47e
import os
import pandas as pd
from huggingface_hub import HfApi, hf_hub_download
from dotenv import load_dotenv
load_dotenv()
def download_from_hf_dataset(file_path, dataset_name, token=None, repo_type="dataset"):
"""
Download a file from a Hugging Face dataset repository.
"""
if token is None:
token = os.getenv("HF_TOKEN")
try:
local_path = hf_hub_download(
repo_id=dataset_name,
filename=file_path,
repo_type=repo_type,
local_dir=".",
token=token,
)
print(
f"Successfully downloaded {file_path} from {dataset_name} to {local_path}"
)
return local_path
except Exception as e:
print(f"Error downloading file: {str(e)}")
# Check if file exists locally as fallback
if os.path.exists(file_path):
print(f"Found local copy of {file_path}, using that.")
return file_path
return None
DEFAULT_FILTER_QUERY = (
"premarket_change_from_perviousday_perc > 8 and "
"premarket_close > 2 and "
"`Shares Float`>1e6 and "
"`Market Capitalization`<100e6"
)
def load_data(filter_query=DEFAULT_FILTER_QUERY):
"""
Loads and preprocesses the specific penny stock dataset.
"""
token = os.getenv("HF_TOKEN")
dataset_name = "AmirTrader/PennyStocks"
# Original logic for file name construction
# Get file name from environment variable or use default
default_file = "marketsession_post_polygon_2020-01-01_2025-12-01.parquet_with_premarketvolume900K_marketcap1B.parquet"
target_file = os.getenv("TARGET_FILE", default_file)
# Attempt download
local_path = download_from_hf_dataset(
file_path=target_file, dataset_name=dataset_name, token=token
)
if not local_path or not os.path.exists(local_path):
raise FileNotFoundError(f"Could not find or download dataset: {target_file}")
df = pd.read_parquet(local_path)
# Pre-filtering based on user's script
# This queries the "universe" of stocks
if filter_query:
try:
df = df.query(filter_query).copy()
except Exception as e:
print(f"Error applying query '{filter_query}': {e}")
# Fallback or re-raise? Let's re-raise to notify user in dashboard
raise e
else:
df = df.copy()
# Ensure datetime
if "datetime" in df.columns:
# Check if it needs conversion (it likely is already datetime in parquet)
# Using errors='ignore' in case it's already correct to avoid overhead
df["datetime"] = pd.to_datetime(df["datetime"])
df["date"] = df["datetime"].dt.date
return df