AnalyzrAI / apps /core /tasks.py
thejagstudio's picture
Upload 92 files
0310410 verified
import logging
import threading
import time
from django.core.cache import cache
from django.utils import timezone
logger = logging.getLogger(__name__)
BATCH_SIZE = 500
BATCH_DELAY_SECONDS = 1.5
FETCH_INTERVAL = 600 # 10 minutes
REDIS_ALL_STOCKS_KEY = "screener:all_stocks"
REDIS_ALL_STOCKS_TTL = 900 # 15 minutes
_fetcher_lock = threading.Lock()
_fetcher_started = False
def fetch_all_stocks():
from .models import ScreenerSymbolCache
from .views import (
TradingViewError,
_build_tradingview_scan_payload,
_normalize_tradingview_row,
_scan_tradingview,
)
filter_list = [{"left": "is_primary", "operation": "equal", "right": True}]
all_rows = []
offset = 0
total_count = None
while True:
payload = _build_tradingview_scan_payload(
filter_list,
offset=offset,
limit=BATCH_SIZE,
sort_by="market_cap_basic",
sort_order="desc",
)
try:
result = _scan_tradingview(payload)
except TradingViewError as exc:
logger.warning("TradingView error at offset %d: %s", offset, exc)
break
if total_count is None:
total_count = result.get("totalCount", 0)
logger.info("TradingView reports %d total stocks", total_count)
data = result.get("data") or []
if not data:
break
for raw_row in data:
normalized = _normalize_tradingview_row(raw_row)
if normalized:
all_rows.append(normalized)
offset += BATCH_SIZE
if total_count and offset >= total_count:
break
time.sleep(BATCH_DELAY_SECONDS)
if all_rows:
cache.set(REDIS_ALL_STOCKS_KEY, all_rows, timeout=REDIS_ALL_STOCKS_TTL)
now = timezone.now()
db_objects = []
for row in all_rows:
symbol = (row.get("symbol") or "").strip().upper()
if not symbol:
continue
db_objects.append(
ScreenerSymbolCache(
symbol=symbol,
data=row,
last_fetched_at=now,
)
)
chunk_size = 500
for i in range(0, len(db_objects), chunk_size):
ScreenerSymbolCache.objects.bulk_create(
db_objects[i : i + chunk_size],
update_conflicts=True,
unique_fields=["symbol"],
update_fields=["data", "last_fetched_at"],
)
logger.info("Stored %d stocks in Redis + DB", len(all_rows))
return {"fetched": len(all_rows), "total_reported": total_count}
def _fetcher_loop():
from django.conf import settings
interval = getattr(settings, "STOCK_FETCHER_INTERVAL", FETCH_INTERVAL)
time.sleep(5) # let Django finish startup
while True:
try:
result = fetch_all_stocks()
logger.info("Stock fetcher completed: %s", result)
except Exception:
logger.exception("Stock fetcher cycle failed")
time.sleep(interval)
def start_stock_fetcher():
global _fetcher_started
with _fetcher_lock:
if _fetcher_started:
return
_fetcher_started = True
thread = threading.Thread(
target=_fetcher_loop,
daemon=True,
name="stock-fetcher",
)
thread.start()
logger.info(
"Background stock fetcher started (interval=%ds)", FETCH_INTERVAL
)