polymarket-bot-testing / external_data_daemon.py
frank0957's picture
Update external_data_daemon.py
dae1d5f verified
Raw
History Blame Contribute Delete
10.3 kB
# external_data_daemon.py
# Runs once every 24 hours.
# 1. Economic calendar data is provided by the backup GitHub repo
# (pulled by backup daemon). No live fetch is attempted.
# 2. Resolves open flags:
# a. Polymarket‑based – using /app/flag_mappings.json
# b. Price‑based – using yfinance (with retries & backoff)
# All comments in English.
import os, json, time, re, urllib.request, urllib.parse
from datetime import datetime, timezone, timedelta
import yfinance as yf
FLAGS_FILE = "/app/flags.json"
MAPPINGS_FILE = "/app/flag_mappings.json"
ECON_DATA_FILE = "/app/economic_calendar_data.json"
TELEGRAM_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
def send_telegram(text: str):
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID:
return False
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
data = urllib.parse.urlencode({"chat_id": TELEGRAM_CHAT_ID, "text": text}).encode()
try:
req = urllib.request.Request(url, data=data, method="POST")
with urllib.request.urlopen(req, timeout=15):
return True
except:
return False
def read_json(path, default=None):
try:
if os.path.exists(path):
with open(path, "r") as f:
return json.load(f)
except:
pass
return default
def write_json(path, data):
with open(path, "w") as f:
json.dump(data, f, indent=2)
# ── Flag resolution ──────────────────────────────────────────────
POLYMARKET_API = "https://data-api.polymarket.com/markets"
def resolve_polymarket_flags(flags, mappings):
updated = False
resolutions = []
now_iso = datetime.now(timezone.utc).isoformat()
for flag in flags:
if flag.get("status") != "open":
continue
desc = flag.get("description", "").lower()
matched_slug = None
for mapping in mappings:
keyword = mapping.get("flag_keywords", "").lower()
if keyword and keyword in desc:
matched_slug = mapping.get("market_slug")
break
if not matched_slug:
continue
try:
resp = urllib.request.urlopen(
f"{POLYMARKET_API}?slug={urllib.parse.quote(matched_slug)}", timeout=10
)
data = json.loads(resp.read().decode())
if not isinstance(data, list) or not data:
continue
market = data[0]
if not market.get("closed"):
continue
outcome_prices = market.get("outcomePrices", [])
if len(outcome_prices) < 2:
continue
winner = "Yes" if float(outcome_prices[0]) == 1.0 else "No"
prob_str = flag.get("probability", "")
if "YES" in prob_str.upper():
predicted = "Yes"
elif "NO" in prob_str.upper():
predicted = "No"
else:
continue
result = "correct" if predicted == winner else "incorrect"
flag["status"] = "closed"
flag["closed_at"] = now_iso
flag["result"] = result
flag["resolved_by"] = "polymarket"
updated = True
resolutions.append(f"{desc}{result} (market: {matched_slug}, winner: {winner})")
except Exception as e:
print(f"[ExternalData] Error checking {matched_slug}: {e}", flush=True)
return flags, updated, resolutions
def parse_price_rule(description: str):
desc = description.lower()
symbol_map = {
"gold": "GC=F", "wti": "CL=F", "brent": "BZ=F", "crude": "CL=F",
"btc": "BTC-USD", "bitcoin": "BTC-USD", "spx": "^GSPC",
"s&p 500": "^GSPC", "nasdaq": "^IXIC", "dow": "^DJI",
"usd/jpy": "JPY=X", "eur": "EURUSD=X"
}
symbol = None
for name, sym in symbol_map.items():
if name in desc:
symbol = sym
break
if not symbol:
return None
price_match = re.search(r"\$(\d{1,3}(?:,\d{3})*(?:\.\d+)?)", description)
if not price_match:
return None
target_price = float(price_match.group(1).replace(",", ""))
condition = None
through_mode = False
if "stays above" in desc or "stay above" in desc:
condition = "above"; through_mode = True
elif "rebounds above" in desc or "trades above" in desc or "above" in desc:
condition = "above"; through_mode = False
elif "below" in desc:
condition = "below"; through_mode = False
if condition is None:
return None
deadline = None
date_patterns = [
r"by\s+(\w+)\s+(\d{1,2})(?:st|nd|rd|th)?",
r"through\s+end\s+of\s+(\w+)",
r"through\s+(\w+)\s+(\d{1,2})",
]
for pat in date_patterns:
m = re.search(pat, desc)
if m:
groups = m.groups()
if len(groups) == 2:
month_str, day_str = groups
else:
month_str, day_str = groups[0], "last"
try:
month_num = datetime.strptime(month_str, "%B").month
if day_str == "last":
next_month = month_num % 12 + 1
year = datetime.now(timezone.utc).year
if month_num == 12:
next_month = 1
year += 1
deadline = datetime(year, next_month, 1, tzinfo=timezone.utc) - timedelta(days=1)
else:
day = int(day_str)
today = datetime.now(timezone.utc)
deadline = datetime(today.year, month_num, day, tzinfo=timezone.utc)
if deadline < today:
deadline = datetime(today.year + 1, month_num, day, tzinfo=timezone.utc)
except:
pass
break
if deadline is None:
return None
return {
"symbol": symbol, "target_price": target_price,
"deadline": deadline, "condition": condition, "through_mode": through_mode
}
def download_symbol_with_retry(symbol, start_date, end_date, max_retries=3):
"""
Download yfinance data with retry and exponential backoff.
Returns closes series or None if all attempts fail.
"""
delays = [10, 30, 60] # seconds between retries
for attempt in range(max_retries):
try:
df = yf.download(symbol, start=start_date, end=end_date, progress=False)
if df.empty:
return None
return df["Close"].dropna()
except Exception as e:
if "rate" in str(e).lower() and attempt < max_retries - 1:
wait = delays[attempt]
print(f"[ExternalData] Rate limited on {symbol}, retrying in {wait}s...", flush=True)
time.sleep(wait)
else:
print(f"[ExternalData] Failed to download {symbol} after {attempt+1} attempt(s): {e}", flush=True)
return None
def resolve_price_flags(flags):
updated = False
resolutions = []
now_utc = datetime.now(timezone.utc)
now_iso = now_utc.isoformat()
downloaded = {}
for flag in flags:
if flag.get("status") != "open":
continue
rule = parse_price_rule(flag.get("description", ""))
if not rule:
continue
if now_utc < rule["deadline"]:
continue
symbol = rule["symbol"]
target = rule["target_price"]
deadline = rule["deadline"]
through = rule["through_mode"]
condition = rule["condition"]
if symbol in downloaded:
closes = downloaded[symbol]
else:
start_date = (datetime.fromisoformat(flag.get("opened_at", now_iso)) - timedelta(days=2)).strftime("%Y-%m-%d")
end_date = (deadline + timedelta(days=1)).strftime("%Y-%m-%d")
closes = download_symbol_with_retry(symbol, start_date, end_date)
if closes is None:
continue # skip this flag, will retry next cycle
downloaded[symbol] = closes
if through:
if condition == "above":
passed = all(closes > target)
else:
passed = all(closes < target)
else:
if condition == "above":
passed = any(closes > target)
else:
passed = any(closes < target)
result = "correct" if passed else "incorrect"
flag["status"] = "closed"
flag["closed_at"] = now_iso
flag["result"] = result
flag["resolved_by"] = "yfinance"
updated = True
resolutions.append(f"{flag['description']}{result} ({symbol} vs ${target})")
return flags, updated, resolutions
def resolve_all_flags():
flags = read_json(FLAGS_FILE, [])
if not flags:
return
mappings = read_json(MAPPINGS_FILE, [])
total_closed = []
flags, updated_poly, poly_res = resolve_polymarket_flags(flags, mappings)
total_closed.extend(poly_res)
flags, updated_price, price_res = resolve_price_flags(flags)
total_closed.extend(price_res)
if updated_poly or updated_price:
write_json(FLAGS_FILE, flags)
try:
from llm_wiki import save_entry
content = "Auto‑resolved flags:\n" + "\n".join(total_closed)
save_entry(topic="Auto-Flag-Resolution", content=content, tags=["auto", "flags"])
except:
pass
if total_closed:
msg = "🏁 Auto‑closed flags:\n" + "\n".join(total_closed[:10])
send_telegram(msg)
print(f"[ExternalData] Flag resolution: {len(total_closed)} flags closed.", flush=True)
def run():
print("[ExternalData] Daemon started (calendar provided by backup sync).", flush=True)
# Short initial delay to avoid cold‑start rate limits
time.sleep(60)
while True:
try:
resolve_all_flags()
except Exception as e:
print(f"[ExternalData] Daily run failed: {e}", flush=True)
time.sleep(86400)
if __name__ == "__main__":
run()