|
|
import os |
|
|
import json |
|
|
import pickle |
|
|
import pandas as pd |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Any |
|
|
import yfinance as yf |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BASE_DIR = "./data/store" |
|
|
os.makedirs(BASE_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
VALIDITY_MAP = { |
|
|
"result": {"days": 7}, |
|
|
"qresult": {"days": 7}, |
|
|
"bhav": {"days": 1}, |
|
|
"intraday": {"minutes": 15}, |
|
|
"eq": {"hours": 1}, |
|
|
"daily": {"days": 1}, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _ts(): |
|
|
return datetime.now().strftime("%Y_%m_%d_%H_%M_%S") |
|
|
|
|
|
def _path(filename: str): |
|
|
return os.path.join(BASE_DIR, filename) |
|
|
|
|
|
def _list_files(): |
|
|
return os.listdir(BASE_DIR) |
|
|
|
|
|
def _latest(prefix: str, ext: str): |
|
|
files = [ |
|
|
f for f in _list_files() |
|
|
if f.startswith(prefix + "_") and f.endswith("." + ext) |
|
|
] |
|
|
return max(files) if files else None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save(name: str, data: Any, ftype: str) -> bool: |
|
|
ts = _ts() |
|
|
filename = f"{name}_{ts}.{ftype}" |
|
|
path = _path(filename) |
|
|
try: |
|
|
if ftype == "csv": |
|
|
if not isinstance(data, pd.DataFrame): |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [SAVE FAILED] CSV requires pandas DataFrame for {filename}") |
|
|
return False |
|
|
data.to_csv(path, index=False) |
|
|
elif ftype == "json": |
|
|
with open(path, "w", encoding="utf-8") as f: |
|
|
json.dump(data, f, indent=2) |
|
|
elif ftype == "html": |
|
|
with open(path, "w", encoding="utf-8") as f: |
|
|
f.write(str(data)) |
|
|
elif ftype == "pkl": |
|
|
with open(path, "wb") as f: |
|
|
pickle.dump(data, f) |
|
|
else: |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [SAVE FAILED] Unsupported file type: {ftype} for {filename}") |
|
|
return False |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [SAVE OK] {filename}") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [SAVE FAILED] {filename} - Exception: {e}") |
|
|
return False |
|
|
|
|
|
def load(name: str, ftype: str): |
|
|
filename = _latest(name, ftype) if "." not in name else name |
|
|
path = _path(filename) |
|
|
if not os.path.exists(path): |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [LOAD FAILED] File does not exist: {filename}") |
|
|
return False |
|
|
try: |
|
|
if filename.endswith(".csv"): |
|
|
df = pd.read_csv(path) |
|
|
elif filename.endswith(".json"): |
|
|
with open(path, "r", encoding="utf-8") as f: |
|
|
df = json.load(f) |
|
|
elif filename.endswith(".html"): |
|
|
with open(path, "r", encoding="utf-8") as f: |
|
|
df = f.read() |
|
|
elif filename.endswith(".pkl"): |
|
|
with open(path, "rb") as f: |
|
|
df = pickle.load(f) |
|
|
else: |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [LOAD FAILED] Unsupported file type: {filename}") |
|
|
return False |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [LOAD OK] {filename}") |
|
|
return df |
|
|
except Exception as e: |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [LOAD FAILED] {filename} - Exception: {e}") |
|
|
return False |
|
|
|
|
|
def exists(name: str, ftype: str) -> bool: |
|
|
""" |
|
|
Checks if a file exists AND is valid within TTL based on parent type + symbol. |
|
|
Example: |
|
|
name = INTRADAY_RELIANCE |
|
|
matches any file starting with INTRADAY_RELIANCE_YYYY_MM_DD_HH_MM_SS.csv |
|
|
TTL applied according to parent type (INTRADAY -> 15 minutes) |
|
|
""" |
|
|
filename = _latest(name, ftype) |
|
|
if not filename: |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [EXISTS] No file found for {name}.{ftype}") |
|
|
return False |
|
|
|
|
|
path = _path(filename) |
|
|
if not os.path.exists(path): |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [EXISTS] File not present: {filename}") |
|
|
return False |
|
|
|
|
|
parent_func = name.split("_")[0].lower() |
|
|
validity = VALIDITY_MAP.get(parent_func) |
|
|
if not validity: |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [EXISTS] File exists with no TTL: {filename}") |
|
|
return True |
|
|
|
|
|
mtime = datetime.fromtimestamp(os.path.getmtime(path)) |
|
|
now = datetime.now() |
|
|
is_valid = True |
|
|
if "minutes" in validity: |
|
|
is_valid = (now - mtime) <= timedelta(minutes=validity["minutes"]) |
|
|
elif "hours" in validity: |
|
|
is_valid = (now - mtime) <= timedelta(hours=validity["hours"]) |
|
|
elif "days" in validity: |
|
|
is_valid = (now - mtime) <= timedelta(days=validity["days"]) |
|
|
|
|
|
if is_valid: |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [EXISTS] File exists and valid: {filename}") |
|
|
else: |
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] [EXISTS] File expired: {filename}") |
|
|
|
|
|
return is_valid |
|
|
|
|
|
def list_files(name=None, ftype=None): |
|
|
files = sorted(_list_files()) |
|
|
if name: |
|
|
files = [f for f in files if f.startswith(name + "_")] |
|
|
if ftype: |
|
|
files = [f for f in files if f.endswith("." + ftype)] |
|
|
return files |
|
|
|
|
|
|
|
|
|