PrimoGreedy-Agent / vps /catalyst_poll.py
CiscsoPonce's picture
feat: Sprint 9 — Execution & Quality Control
645673f
"""Intraday Catalyst Polling Daemon for PrimoGreedy.
Runs as a systemd timer on the VPS (every 15 minutes during market hours,
9:00-16:30 EST on weekdays). When a trigger fires, dispatches a GitHub
Actions ``repository_dispatch`` event to run the pipeline for the triggered
ticker.
Trigger conditions (any one fires):
1. Volume > 3x average daily volume for a seed ticker
2. New Form 4 insider purchase > $50K for a tracked ticker
3. Price move > 10% intraday for a seed ticker
Usage:
python catalyst_poll.py
Environment variables:
FINNHUB_API_KEY — Finnhub REST API key
GITHUB_TOKEN — GitHub PAT with repo scope
GITHUB_REPO — e.g. "CiscoPonce/primogreedy"
VPS_API_URL — PrimoGreedy data API URL
VPS_API_KEY — Data API key
"""
import os
import sys
import time
import logging
from datetime import datetime, timezone
import requests
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [catalyst_poll] %(levelname)s: %(message)s",
)
logger = logging.getLogger("catalyst_poll")
FINNHUB_API_KEY = os.getenv("FINNHUB_API_KEY", "")
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN", "")
GITHUB_REPO = os.getenv("GITHUB_REPO", "CiscoPonce/primogreedy")
VPS_API_URL = os.getenv("VPS_API_URL", "").rstrip("/")
VPS_API_KEY = os.getenv("VPS_API_KEY", "")
VOLUME_MULTIPLIER = 3.0
INSIDER_MIN_VALUE = 50_000
PRICE_MOVE_PCT = 10.0
SEED_TICKERS = [
"HNRA", "TLSS", "MNTS", "BMTX", "KORE", "RVSN", "DRUG", "HYSR",
"AAON", "CASS", "GIII", "PATK", "MGRC", "LCNB", "CHCO",
]
def _finnhub_get(path: str, params: dict) -> dict:
"""Wrapper for Finnhub REST calls."""
params["token"] = FINNHUB_API_KEY
try:
resp = requests.get(
f"https://finnhub.io/api/v1/{path}",
params=params,
timeout=10,
)
resp.raise_for_status()
return resp.json()
except Exception as exc:
logger.warning("Finnhub %s error: %s", path, exc)
return {}
def check_unusual_volume() -> list[str]:
"""Check seed tickers for unusual volume (>3x average)."""
triggered = []
for ticker in SEED_TICKERS:
try:
quote = _finnhub_get("quote", {"symbol": ticker})
if not quote or not quote.get("v"):
continue
profile = _finnhub_get("stock/metric", {
"symbol": ticker,
"metric": "all",
})
metrics = profile.get("metric", {})
avg_vol = metrics.get("10DayAverageTradingVolume", 0)
if avg_vol and avg_vol > 0:
avg_vol_shares = avg_vol * 1_000_000
current_vol = quote.get("v", 0)
if current_vol > avg_vol_shares * VOLUME_MULTIPLIER:
logger.info(
"VOLUME TRIGGER: %s — current %s vs avg %s (%.1fx)",
ticker, f"{current_vol:,.0f}", f"{avg_vol_shares:,.0f}",
current_vol / avg_vol_shares,
)
triggered.append(ticker)
except Exception as exc:
logger.warning("Volume check error for %s: %s", ticker, exc)
return triggered
def check_price_moves() -> list[str]:
"""Check seed tickers for >10% intraday price moves."""
triggered = []
for ticker in SEED_TICKERS:
try:
quote = _finnhub_get("quote", {"symbol": ticker})
if not quote:
continue
prev_close = quote.get("pc", 0)
current = quote.get("c", 0)
if prev_close > 0 and current > 0:
pct_change = abs((current - prev_close) / prev_close) * 100
if pct_change >= PRICE_MOVE_PCT:
direction = "UP" if current > prev_close else "DOWN"
logger.info(
"PRICE TRIGGER: %s — %s %.1f%% ($%.2f -> $%.2f)",
ticker, direction, pct_change, prev_close, current,
)
triggered.append(ticker)
except Exception as exc:
logger.warning("Price check error for %s: %s", ticker, exc)
return triggered
def check_insider_filings() -> list[str]:
"""Check SEC EDGAR for new Form 4 insider purchases > $50K."""
triggered = []
try:
headers = {
"User-Agent": "PrimoGreedy/1.0 (contact@primogreedy.com)",
"Accept": "application/json",
}
resp = requests.get(
"https://efts.sec.gov/LATEST/search-index",
params={
"q": '"acquired" AND "Form 4"',
"dateRange": "custom",
"startdt": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"enddt": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"forms": "4",
},
headers=headers,
timeout=10,
)
if resp.status_code != 200:
logger.info("SEC EDGAR returned %d, skipping insider check", resp.status_code)
return []
data = resp.json()
hits = data.get("hits", {}).get("hits", [])[:30]
for hit in hits:
src = hit.get("_source", {})
tickers = src.get("tickers", [])
if tickers:
ticker = tickers[0].upper()
if ticker in SEED_TICKERS:
logger.info("INSIDER TRIGGER: Form 4 filing for %s", ticker)
triggered.append(ticker)
except Exception as exc:
logger.warning("SEC insider check error: %s", exc)
return triggered
def dispatch_github_workflow(ticker: str) -> bool:
"""Fire a GitHub Actions repository_dispatch event for a specific ticker."""
if not GITHUB_TOKEN:
logger.warning("GITHUB_TOKEN not set — cannot dispatch")
return False
url = f"https://api.github.com/repos/{GITHUB_REPO}/dispatches"
headers = {
"Authorization": f"Bearer {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
}
payload = {
"event_type": "catalyst-alert",
"client_payload": {
"ticker": ticker,
"triggered_at": datetime.now(timezone.utc).isoformat(),
},
}
try:
resp = requests.post(url, json=payload, headers=headers, timeout=10)
if resp.status_code == 204:
logger.info("Dispatched catalyst alert for %s", ticker)
return True
logger.warning("GitHub dispatch failed (%d): %s", resp.status_code, resp.text)
return False
except Exception as exc:
logger.error("GitHub dispatch error: %s", exc)
return False
def log_trigger_to_vps(ticker: str, trigger_type: str) -> None:
"""Log the catalyst trigger to the VPS agent_runs table."""
if not VPS_API_URL:
return
try:
import uuid
requests.post(
f"{VPS_API_URL}/runs",
headers={"X-API-Key": VPS_API_KEY, "Content-Type": "application/json"},
json={
"id": str(uuid.uuid4()),
"ticker": ticker,
"status": f"CATALYST:{trigger_type}",
"model": "catalyst_poll",
"latency_ms": 0,
"region": "USA",
},
timeout=5,
)
except Exception as exc:
logger.warning("VPS log error: %s", exc)
def is_market_hours() -> bool:
"""Check if we're within US market hours (9:00-16:30 EST, weekdays)."""
from datetime import timezone, timedelta
est = timezone(timedelta(hours=-5))
now = datetime.now(est)
if now.weekday() >= 5:
return False
market_open = now.replace(hour=9, minute=0, second=0, microsecond=0)
market_close = now.replace(hour=16, minute=30, second=0, microsecond=0)
return market_open <= now <= market_close
def main():
"""Run one polling cycle: check all triggers, dispatch if needed."""
if not is_market_hours():
logger.info("Outside market hours — skipping")
return
logger.info("Starting catalyst poll cycle...")
all_triggered = set()
volume_triggers = check_unusual_volume()
all_triggered.update(volume_triggers)
price_triggers = check_price_moves()
all_triggered.update(price_triggers)
insider_triggers = check_insider_filings()
all_triggered.update(insider_triggers)
if not all_triggered:
logger.info("No catalysts detected this cycle")
return
logger.info("Catalysts detected for: %s", ", ".join(all_triggered))
for ticker in all_triggered:
trigger_type = []
if ticker in volume_triggers:
trigger_type.append("VOLUME")
if ticker in price_triggers:
trigger_type.append("PRICE")
if ticker in insider_triggers:
trigger_type.append("INSIDER")
dispatch_github_workflow(ticker)
log_trigger_to_vps(ticker, "+".join(trigger_type))
logger.info("Catalyst poll cycle complete — %d triggers fired", len(all_triggered))
if __name__ == "__main__":
main()