Multi-LLM-API-Gateway / app /polymarket.py
Alibrown's picture
Update app/polymarket.py
d33822e verified
raw
history blame
21.6 kB
# =============================================================================
# app/polymarket.py
# Polymarket Analysis Tool — Single File Module
# Part of: Universal MCP Hub (Sandboxed) - based on PyFundaments Architecture
# Copyright 2026 - Volkan Kücükbudak
# Apache License V. 2 + ESOL 1.1
# Repo: https://github.com/VolkanSah/Universal-MCP-Hub-sandboxed
# =============================================================================
# ARCHITECTURE NOTE:
# This file lives exclusively in app/ and is ONLY registered by app/mcp.py.
# NO direct access to fundaments/*, .env, or Guardian (main.py).
# All config comes from app/.pyfun via app/config.py.
#
# LAZY INIT PRINCIPLE:
# initialize() is called on first tool use — NOT during startup.
# This keeps startup fast and avoids crashes if Gamma API is unreachable.
#
# SANDBOX RULES:
# - SQLite cache is app/* internal state — NOT postgresql.py (Guardian-only)
# - LLM calls are optional — gracefully skipped if no provider key is set
# - No global state leaks outside this file
# - Read-Only access to Polymarket — no transactions, no auth needed
# =============================================================================
import asyncio
import aiosqlite
import aiohttp
import logging
import json
import os
from datetime import datetime, timezone
from typing import Optional
logger = logging.getLogger("polymarket")
# =============================================================================
# Constants
# =============================================================================
GAMMA_API = "https://gamma-api.polymarket.com"
CACHE_DB = "SQLITE_PATH"
FETCH_INTERVAL = 300 # seconds — 5 min between API pulls
MARKET_LIMIT = 100 # max markets per fetch (rate limit friendly)
PRICE_DECIMALS = 2 # rounding for probability display
# Gamma API uses its own tag/category system.
# We map their tags to our simplified categories for filtering.
CATEGORY_MAP = {
"politics": ["politics", "elections", "government", "trump", "us-politics"],
"crypto": ["crypto", "bitcoin", "ethereum", "defi", "web3"],
"economics": ["economics", "inflation", "fed", "interest-rates", "finance"],
"energy": ["energy", "oil", "gas", "renewables", "climate"],
"tech": ["technology", "ai", "spacex", "elon-musk", "science"],
"sports": ["sports", "football", "soccer", "nba", "nfl", "esports"],
"world": ["world", "geopolitics", "war", "nato", "china", "russia"],
}
# =============================================================================
# Internal State — lazy init guard
# =============================================================================
_initialized = False
_scheduler_task = None # asyncio background task handle
# =============================================================================
# SECTION 1 — Init (Lazy)
# =============================================================================
async def initialize() -> None:
"""
Lazy initializer — called on first tool use.
Sets up SQLite cache and starts background scheduler.
Idempotent — safe to call multiple times.
"""
global _initialized, _scheduler_task
if _initialized:
return
logger.info("Polymarket module initializing (lazy)...")
await _init_cache()
# Start background scheduler as asyncio task
if _scheduler_task is None or _scheduler_task.done():
_scheduler_task = asyncio.create_task(_scheduler_loop())
logger.info(f"Scheduler started — fetching every {FETCH_INTERVAL}s.")
_initialized = True
logger.info("Polymarket module ready.")
# =============================================================================
# SECTION 2 — SQLite Cache (app/* internal, NOT postgresql.py!)
# =============================================================================
async def _init_cache() -> None:
"""Create SQLite tables if they don't exist."""
async with aiosqlite.connect(CACHE_DB) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS markets (
id TEXT PRIMARY KEY,
slug TEXT,
question TEXT,
category TEXT,
probability REAL,
volume REAL,
liquidity REAL,
end_date TEXT,
active INTEGER,
data TEXT,
fetched_at TEXT
)
""")
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_category ON markets(category);
""")
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_active ON markets(active);
""")
await db.commit()
logger.info("Cache initialized.")
async def _store_markets(markets: list) -> None:
"""Upsert markets into SQLite cache."""
if not markets:
return
now = datetime.now(timezone.utc).isoformat()
async with aiosqlite.connect(CACHE_DB) as db:
for m in markets:
category = _categorize_market(m)
prob = _extract_probability(m)
await db.execute("""
INSERT OR REPLACE INTO markets
(id, slug, question, category, probability, volume, liquidity, end_date, active, data, fetched_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
str(m.get("id", "")),
m.get("slug", ""),
m.get("question", ""),
category,
prob,
float(m.get("volume", 0) or 0),
float(m.get("liquidity", 0) or 0),
m.get("endDate", ""),
1 if m.get("active", False) else 0,
json.dumps(m),
now,
))
await db.commit()
logger.info(f"Cached {len(markets)} markets.")
async def _get_cached_markets(
category: Optional[str] = None,
active_only: bool = True,
limit: int = 50
) -> list:
"""Retrieve markets from SQLite cache with optional filters."""
async with aiosqlite.connect(CACHE_DB) as db:
db.row_factory = aiosqlite.Row
conditions = []
params = []
if active_only:
conditions.append("active = 1")
if category and category.lower() in CATEGORY_MAP:
conditions.append("category = ?")
params.append(category.lower())
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
params.append(limit)
cursor = await db.execute(
f"SELECT * FROM markets {where} ORDER BY volume DESC LIMIT ?",
params
)
rows = await cursor.fetchall()
return [dict(r) for r in rows]
async def _get_market_by_id(market_id: str) -> Optional[dict]:
"""Retrieve a single market by ID from cache."""
async with aiosqlite.connect(CACHE_DB) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM markets WHERE id = ?", (market_id,)
)
row = await cursor.fetchone()
return dict(row) if row else None
async def _get_cache_stats() -> dict:
"""Return basic cache statistics."""
async with aiosqlite.connect(CACHE_DB) as db:
cursor = await db.execute("SELECT COUNT(*) FROM markets WHERE active = 1")
active = (await cursor.fetchone())[0]
cursor = await db.execute("SELECT MAX(fetched_at) FROM markets")
last = (await cursor.fetchone())[0]
return {"active_markets": active, "last_fetch": last}
# =============================================================================
# SECTION 3 — Fetcher (Gamma API — Read Only, no auth)
# =============================================================================
async def _fetch_from_api(limit: int = MARKET_LIMIT) -> list:
"""
Fetch active markets from Polymarket Gamma API.
Read-Only — no auth, no transactions.
"""
params = {
"active": "true",
"archived": "false",
"closed": "false",
"limit": limit,
"order": "volume",
"ascending":"false",
}
async with aiohttp.ClientSession() as session:
async with session.get(
f"{GAMMA_API}/markets",
params=params,
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
resp.raise_for_status()
data = await resp.json()
logger.info(f"Fetched {len(data)} markets from Gamma API.")
return data
async def _scheduler_loop() -> None:
"""
Background task — polls Gamma API every FETCH_INTERVAL seconds.
Runs indefinitely inside asyncio event loop.
"""
logger.info("Scheduler loop started.")
while True:
try:
markets = await _fetch_from_api()
await _store_markets(markets)
except aiohttp.ClientError as e:
logger.warning(f"Gamma API fetch failed (network): {e}")
except Exception as e:
logger.error(f"Scheduler error: {e}")
await asyncio.sleep(FETCH_INTERVAL)
# =============================================================================
# SECTION 4 — Filter & Categorization
# =============================================================================
def _categorize_market(market: dict) -> str:
"""
Map a Polymarket market to our simplified category system.
Uses tags array from Gamma API response.
Falls back to 'other' if no match found.
"""
tags = []
# Gamma API returns tags as list of dicts or strings
for t in market.get("tags", []):
if isinstance(t, dict):
tags.append(t.get("slug", "").lower())
tags.append(t.get("label", "").lower())
elif isinstance(t, str):
tags.append(t.lower())
# Also check question text for keywords
question = market.get("question", "").lower()
for category, keywords in CATEGORY_MAP.items():
for kw in keywords:
if kw in tags or kw in question:
return category
return "other"
def _extract_probability(market: dict) -> float:
"""
Extract YES probability from market data.
Polymarket stores outcome prices as probability (0.0 - 1.0).
Returns probability as percentage (0-100).
"""
try:
# outcomePrices is a JSON string like '["0.73", "0.27"]'
prices = market.get("outcomePrices")
if isinstance(prices, str):
prices = json.loads(prices)
if prices and len(prices) > 0:
return round(float(prices[0]) * 100, PRICE_DECIMALS)
except (ValueError, TypeError, json.JSONDecodeError):
pass
return 0.0
def _format_market_simple(market: dict) -> dict:
"""
Format a market for human-readable output.
Used by all public tools for consistent output.
"""
prob = market.get("probability", 0.0)
# Simple plain-language probability label
if prob >= 80:
sentiment = "sehr wahrscheinlich"
elif prob >= 60:
sentiment = "wahrscheinlich"
elif prob >= 40:
sentiment = "ungewiss"
elif prob >= 20:
sentiment = "unwahrscheinlich"
else:
sentiment = "sehr unwahrscheinlich"
return {
"id": market.get("id"),
"question": market.get("question"),
"category": market.get("category"),
"probability": f"{prob}%",
"sentiment": sentiment,
"volume_usd": f"${market.get('volume', 0):,.0f}",
"liquidity": f"${market.get('liquidity', 0):,.0f}",
"end_date": market.get("end_date", ""),
"slug": market.get("slug", ""),
"url": f"https://polymarket.com/event/{market.get('slug', '')}",
}
# =============================================================================
# SECTION 5 — LLM Adapter (Optional — graceful fallback if no key)
# =============================================================================
async def _llm_analyze(prompt: str) -> Optional[str]:
"""
Send prompt to available LLM provider.
Checks for API keys in order: Anthropic → OpenRouter → HuggingFace.
Returns None if no provider is available — caller handles fallback.
"""
# --- Anthropic Claude ---
anthropic_key = os.getenv("ANTHROPIC_API_KEY")
if anthropic_key:
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": anthropic_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": "claude-haiku-4-5-20251001",
"max_tokens": 512,
"messages": [{"role": "user", "content": prompt}],
},
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
resp.raise_for_status()
data = await resp.json()
return data["content"][0]["text"]
except Exception as e:
logger.warning(f"Anthropic LLM call failed: {e}")
# --- OpenRouter fallback ---
openrouter_key = os.getenv("OPENROUTER_API_KEY")
if openrouter_key:
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {openrouter_key}",
"content-type": "application/json",
},
json={
"model": "mistralai/mistral-7b-instruct",
"max_tokens": 512,
"messages": [{"role": "user", "content": prompt}],
},
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
resp.raise_for_status()
data = await resp.json()
return data["choices"][0]["message"]["content"]
except Exception as e:
logger.warning(f"OpenRouter LLM call failed: {e}")
# --- HuggingFace fallback ---
hf_key = os.getenv("HF_API_KEY")
if hf_key:
try:
model = "mistralai/Mistral-7B-Instruct-v0.3"
async with aiohttp.ClientSession() as session:
async with session.post(
f"https://api-inference.huggingface.co/models/{model}/v1/chat/completions",
headers={
"Authorization": f"Bearer {hf_key}",
"content-type": "application/json",
},
json={
"model": model,
"max_tokens": 512,
"messages": [{"role": "user", "content": prompt}],
},
timeout=aiohttp.ClientTimeout(total=60)
) as resp:
resp.raise_for_status()
data = await resp.json()
return data["choices"][0]["message"]["content"]
except Exception as e:
logger.warning(f"HuggingFace LLM call failed: {e}")
logger.info("No LLM provider available — returning None.")
return None
# =============================================================================
# SECTION 6 — Public Tools (registered by mcp.py)
# =============================================================================
async def get_markets(
category: Optional[str] = None,
limit: int = 20
) -> list:
"""
MCP Tool: Get active prediction markets from cache.
Args:
category: Filter by category. Options: politics, crypto, economics,
energy, tech, sports, world, other. None = all categories.
limit: Max number of markets to return (default 20, max 100).
Returns:
List of formatted market dicts with human-readable probability.
"""
await initialize()
limit = min(limit, 100)
markets = await _get_cached_markets(category=category, limit=limit)
if not markets:
return [{"info": "No markets in cache yet. Try again in 30 seconds."}]
return [_format_market_simple(m) for m in markets]
async def trending_markets(limit: int = 10) -> list:
"""
MCP Tool: Get top trending markets by trading volume.
Args:
limit: Number of trending markets to return (default 10).
Returns:
List of top markets sorted by volume descending.
"""
await initialize()
markets = await _get_cached_markets(active_only=True, limit=limit)
if not markets:
return [{"info": "No markets in cache yet. Try again in 30 seconds."}]
return [_format_market_simple(m) for m in markets]
async def analyze_market(market_id: str) -> dict:
"""
MCP Tool: Get LLM analysis of a single prediction market.
Falls back to structured data summary if no LLM key is configured.
Args:
market_id: Polymarket market ID from get_markets() results.
Returns:
Dict with market data + LLM analysis (or structured fallback).
"""
await initialize()
market = await _get_market_by_id(market_id)
if not market:
return {"error": f"Market '{market_id}' not found in cache."}
formatted = _format_market_simple(market)
# Build LLM prompt
prompt = (
f"Analysiere diesen Prediction Market kurz und präzise:\n\n"
f"Frage: {market.get('question')}\n"
f"Wahrscheinlichkeit YES: {formatted['probability']}\n"
f"Handelsvolumen: {formatted['volume_usd']}\n"
f"Kategorie: {market.get('category')}\n"
f"Läuft bis: {market.get('end_date', 'unbekannt')}\n\n"
f"Erkläre in 2-3 Sätzen was der Markt aussagt und was das für "
f"Alltagsentscheidungen bedeuten könnte. Keine Finanzberatung."
)
analysis = await _llm_analyze(prompt)
if analysis:
formatted["analysis"] = analysis
else:
# Structured fallback — no LLM needed
prob = market.get("probability", 0)
formatted["analysis"] = (
f"Der Markt bewertet '{market.get('question')}' mit "
f"{prob}% Wahrscheinlichkeit. "
f"Für eine KI-Analyse bitte LLM Provider API Key konfigurieren."
)
return formatted
async def summary_report(category: Optional[str] = None) -> dict:
"""
MCP Tool: Generate a summary report for a category or all markets.
Uses LLM if available, falls back to structured statistics.
Args:
category: Category to summarize. None = all active markets.
Returns:
Dict with statistics and optional LLM narrative summary.
"""
await initialize()
markets = await _get_cached_markets(category=category, limit=50)
if not markets:
return {"error": "No markets in cache yet. Try again in 30 seconds."}
# --- Build statistics (always available, no LLM needed) ---
probs = [m["probability"] for m in markets if m.get("probability")]
avg_prob = round(sum(probs) / len(probs), 1) if probs else 0
total_vol = sum(m.get("volume", 0) for m in markets)
# Top 3 by volume
top3 = [_format_market_simple(m) for m in markets[:3]]
stats = {
"category": category or "all",
"market_count": len(markets),
"avg_probability":f"{avg_prob}%",
"total_volume": f"${total_vol:,.0f}",
"top_markets": top3,
"generated_at": datetime.now(timezone.utc).isoformat(),
}
# --- LLM narrative (optional) ---
market_list = "\n".join([
f"- {m.get('question')} ({m.get('probability', 0)}% YES, Vol: ${m.get('volume', 0):,.0f})"
for m in markets[:10]
])
prompt = (
f"Erstelle eine kurze Zusammenfassung (3-4 Sätze) der aktuellen "
f"Prediction Market Lage{' für ' + category if category else ''}:\n\n"
f"{market_list}\n\n"
f"Was sind die auffälligsten Trends? Sachlich, keine Finanzberatung."
)
narrative = await _llm_analyze(prompt)
if narrative:
stats["narrative"] = narrative
return stats
async def get_cache_info() -> dict:
"""
MCP Tool: Get cache status and available categories.
Useful for debugging and monitoring.
Returns:
Dict with cache stats and available category list.
"""
await initialize()
cache_stats = await _get_cache_stats()
return {
**cache_stats,
"fetch_interval_seconds": FETCH_INTERVAL,
"available_categories": list(CATEGORY_MAP.keys()) + ["other"],
"gamma_api": GAMMA_API,
"llm_available": any([
os.getenv("ANTHROPIC_API_KEY"),
os.getenv("OPENROUTER_API_KEY"),
os.getenv("HF_API_KEY"),
]),
}
# =============================================================================
# Direct execution guard
# =============================================================================
if __name__ == "__main__":
print("WARNING: Run via main.py → app.py → mcp.py, not directly.")