Spaces:
Sleeping
Sleeping
| # ============================================================================= | |
| # app/polymarket.py | |
| # Polymarket Analysis Tool — Single File Module | |
| # Part of: Universal MCP Hub (Sandboxed) - based on PyFundaments Architecture | |
| # Copyright 2026 - Volkan Kücükbudak | |
| # Apache License V. 2 + ESOL 1.1 | |
| # Repo: https://github.com/VolkanSah/Universal-MCP-Hub-sandboxed | |
| # ============================================================================= | |
| # ARCHITECTURE NOTE: | |
| # This file lives exclusively in app/ and is ONLY registered by app/mcp.py. | |
| # NO direct access to fundaments/*, .env, or Guardian (main.py). | |
| # All config comes from app/.pyfun via app/config.py. | |
| # | |
| # LAZY INIT PRINCIPLE: | |
| # initialize() is called on first tool use — NOT during startup. | |
| # This keeps startup fast and avoids crashes if Gamma API is unreachable. | |
| # | |
| # SANDBOX RULES: | |
| # - SQLite cache is app/* internal state — NOT postgresql.py (Guardian-only) | |
| # - LLM calls are optional — gracefully skipped if no provider key is set | |
| # - No global state leaks outside this file | |
| # - Read-Only access to Polymarket — no transactions, no auth needed | |
| # ============================================================================= | |
| import asyncio | |
| import aiosqlite | |
| import aiohttp | |
| import logging | |
| import json | |
| import os | |
| from datetime import datetime, timezone | |
| from typing import Optional | |
| logger = logging.getLogger("polymarket") | |
| # ============================================================================= | |
| # Constants | |
| # ============================================================================= | |
| GAMMA_API = "https://gamma-api.polymarket.com" | |
| CACHE_DB = "SQLITE_PATH" | |
| FETCH_INTERVAL = 300 # seconds — 5 min between API pulls | |
| MARKET_LIMIT = 100 # max markets per fetch (rate limit friendly) | |
| PRICE_DECIMALS = 2 # rounding for probability display | |
| # Gamma API uses its own tag/category system. | |
| # We map their tags to our simplified categories for filtering. | |
| CATEGORY_MAP = { | |
| "politics": ["politics", "elections", "government", "trump", "us-politics"], | |
| "crypto": ["crypto", "bitcoin", "ethereum", "defi", "web3"], | |
| "economics": ["economics", "inflation", "fed", "interest-rates", "finance"], | |
| "energy": ["energy", "oil", "gas", "renewables", "climate"], | |
| "tech": ["technology", "ai", "spacex", "elon-musk", "science"], | |
| "sports": ["sports", "football", "soccer", "nba", "nfl", "esports"], | |
| "world": ["world", "geopolitics", "war", "nato", "china", "russia"], | |
| } | |
| # ============================================================================= | |
| # Internal State — lazy init guard | |
| # ============================================================================= | |
| _initialized = False | |
| _scheduler_task = None # asyncio background task handle | |
| # ============================================================================= | |
| # SECTION 1 — Init (Lazy) | |
| # ============================================================================= | |
| async def initialize() -> None: | |
| """ | |
| Lazy initializer — called on first tool use. | |
| Sets up SQLite cache and starts background scheduler. | |
| Idempotent — safe to call multiple times. | |
| """ | |
| global _initialized, _scheduler_task | |
| if _initialized: | |
| return | |
| logger.info("Polymarket module initializing (lazy)...") | |
| await _init_cache() | |
| # Start background scheduler as asyncio task | |
| if _scheduler_task is None or _scheduler_task.done(): | |
| _scheduler_task = asyncio.create_task(_scheduler_loop()) | |
| logger.info(f"Scheduler started — fetching every {FETCH_INTERVAL}s.") | |
| _initialized = True | |
| logger.info("Polymarket module ready.") | |
| # ============================================================================= | |
| # SECTION 2 — SQLite Cache (app/* internal, NOT postgresql.py!) | |
| # ============================================================================= | |
| async def _init_cache() -> None: | |
| """Create SQLite tables if they don't exist.""" | |
| async with aiosqlite.connect(CACHE_DB) as db: | |
| await db.execute(""" | |
| CREATE TABLE IF NOT EXISTS markets ( | |
| id TEXT PRIMARY KEY, | |
| slug TEXT, | |
| question TEXT, | |
| category TEXT, | |
| probability REAL, | |
| volume REAL, | |
| liquidity REAL, | |
| end_date TEXT, | |
| active INTEGER, | |
| data TEXT, | |
| fetched_at TEXT | |
| ) | |
| """) | |
| await db.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_category ON markets(category); | |
| """) | |
| await db.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_active ON markets(active); | |
| """) | |
| await db.commit() | |
| logger.info("Cache initialized.") | |
| async def _store_markets(markets: list) -> None: | |
| """Upsert markets into SQLite cache.""" | |
| if not markets: | |
| return | |
| now = datetime.now(timezone.utc).isoformat() | |
| async with aiosqlite.connect(CACHE_DB) as db: | |
| for m in markets: | |
| category = _categorize_market(m) | |
| prob = _extract_probability(m) | |
| await db.execute(""" | |
| INSERT OR REPLACE INTO markets | |
| (id, slug, question, category, probability, volume, liquidity, end_date, active, data, fetched_at) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| str(m.get("id", "")), | |
| m.get("slug", ""), | |
| m.get("question", ""), | |
| category, | |
| prob, | |
| float(m.get("volume", 0) or 0), | |
| float(m.get("liquidity", 0) or 0), | |
| m.get("endDate", ""), | |
| 1 if m.get("active", False) else 0, | |
| json.dumps(m), | |
| now, | |
| )) | |
| await db.commit() | |
| logger.info(f"Cached {len(markets)} markets.") | |
| async def _get_cached_markets( | |
| category: Optional[str] = None, | |
| active_only: bool = True, | |
| limit: int = 50 | |
| ) -> list: | |
| """Retrieve markets from SQLite cache with optional filters.""" | |
| async with aiosqlite.connect(CACHE_DB) as db: | |
| db.row_factory = aiosqlite.Row | |
| conditions = [] | |
| params = [] | |
| if active_only: | |
| conditions.append("active = 1") | |
| if category and category.lower() in CATEGORY_MAP: | |
| conditions.append("category = ?") | |
| params.append(category.lower()) | |
| where = f"WHERE {' AND '.join(conditions)}" if conditions else "" | |
| params.append(limit) | |
| cursor = await db.execute( | |
| f"SELECT * FROM markets {where} ORDER BY volume DESC LIMIT ?", | |
| params | |
| ) | |
| rows = await cursor.fetchall() | |
| return [dict(r) for r in rows] | |
| async def _get_market_by_id(market_id: str) -> Optional[dict]: | |
| """Retrieve a single market by ID from cache.""" | |
| async with aiosqlite.connect(CACHE_DB) as db: | |
| db.row_factory = aiosqlite.Row | |
| cursor = await db.execute( | |
| "SELECT * FROM markets WHERE id = ?", (market_id,) | |
| ) | |
| row = await cursor.fetchone() | |
| return dict(row) if row else None | |
| async def _get_cache_stats() -> dict: | |
| """Return basic cache statistics.""" | |
| async with aiosqlite.connect(CACHE_DB) as db: | |
| cursor = await db.execute("SELECT COUNT(*) FROM markets WHERE active = 1") | |
| active = (await cursor.fetchone())[0] | |
| cursor = await db.execute("SELECT MAX(fetched_at) FROM markets") | |
| last = (await cursor.fetchone())[0] | |
| return {"active_markets": active, "last_fetch": last} | |
| # ============================================================================= | |
| # SECTION 3 — Fetcher (Gamma API — Read Only, no auth) | |
| # ============================================================================= | |
| async def _fetch_from_api(limit: int = MARKET_LIMIT) -> list: | |
| """ | |
| Fetch active markets from Polymarket Gamma API. | |
| Read-Only — no auth, no transactions. | |
| """ | |
| params = { | |
| "active": "true", | |
| "archived": "false", | |
| "closed": "false", | |
| "limit": limit, | |
| "order": "volume", | |
| "ascending":"false", | |
| } | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get( | |
| f"{GAMMA_API}/markets", | |
| params=params, | |
| timeout=aiohttp.ClientTimeout(total=30) | |
| ) as resp: | |
| resp.raise_for_status() | |
| data = await resp.json() | |
| logger.info(f"Fetched {len(data)} markets from Gamma API.") | |
| return data | |
| async def _scheduler_loop() -> None: | |
| """ | |
| Background task — polls Gamma API every FETCH_INTERVAL seconds. | |
| Runs indefinitely inside asyncio event loop. | |
| """ | |
| logger.info("Scheduler loop started.") | |
| while True: | |
| try: | |
| markets = await _fetch_from_api() | |
| await _store_markets(markets) | |
| except aiohttp.ClientError as e: | |
| logger.warning(f"Gamma API fetch failed (network): {e}") | |
| except Exception as e: | |
| logger.error(f"Scheduler error: {e}") | |
| await asyncio.sleep(FETCH_INTERVAL) | |
| # ============================================================================= | |
| # SECTION 4 — Filter & Categorization | |
| # ============================================================================= | |
| def _categorize_market(market: dict) -> str: | |
| """ | |
| Map a Polymarket market to our simplified category system. | |
| Uses tags array from Gamma API response. | |
| Falls back to 'other' if no match found. | |
| """ | |
| tags = [] | |
| # Gamma API returns tags as list of dicts or strings | |
| for t in market.get("tags", []): | |
| if isinstance(t, dict): | |
| tags.append(t.get("slug", "").lower()) | |
| tags.append(t.get("label", "").lower()) | |
| elif isinstance(t, str): | |
| tags.append(t.lower()) | |
| # Also check question text for keywords | |
| question = market.get("question", "").lower() | |
| for category, keywords in CATEGORY_MAP.items(): | |
| for kw in keywords: | |
| if kw in tags or kw in question: | |
| return category | |
| return "other" | |
| def _extract_probability(market: dict) -> float: | |
| """ | |
| Extract YES probability from market data. | |
| Polymarket stores outcome prices as probability (0.0 - 1.0). | |
| Returns probability as percentage (0-100). | |
| """ | |
| try: | |
| # outcomePrices is a JSON string like '["0.73", "0.27"]' | |
| prices = market.get("outcomePrices") | |
| if isinstance(prices, str): | |
| prices = json.loads(prices) | |
| if prices and len(prices) > 0: | |
| return round(float(prices[0]) * 100, PRICE_DECIMALS) | |
| except (ValueError, TypeError, json.JSONDecodeError): | |
| pass | |
| return 0.0 | |
| def _format_market_simple(market: dict) -> dict: | |
| """ | |
| Format a market for human-readable output. | |
| Used by all public tools for consistent output. | |
| """ | |
| prob = market.get("probability", 0.0) | |
| # Simple plain-language probability label | |
| if prob >= 80: | |
| sentiment = "sehr wahrscheinlich" | |
| elif prob >= 60: | |
| sentiment = "wahrscheinlich" | |
| elif prob >= 40: | |
| sentiment = "ungewiss" | |
| elif prob >= 20: | |
| sentiment = "unwahrscheinlich" | |
| else: | |
| sentiment = "sehr unwahrscheinlich" | |
| return { | |
| "id": market.get("id"), | |
| "question": market.get("question"), | |
| "category": market.get("category"), | |
| "probability": f"{prob}%", | |
| "sentiment": sentiment, | |
| "volume_usd": f"${market.get('volume', 0):,.0f}", | |
| "liquidity": f"${market.get('liquidity', 0):,.0f}", | |
| "end_date": market.get("end_date", ""), | |
| "slug": market.get("slug", ""), | |
| "url": f"https://polymarket.com/event/{market.get('slug', '')}", | |
| } | |
| # ============================================================================= | |
| # SECTION 5 — LLM Adapter (Optional — graceful fallback if no key) | |
| # ============================================================================= | |
| async def _llm_analyze(prompt: str) -> Optional[str]: | |
| """ | |
| Send prompt to available LLM provider. | |
| Checks for API keys in order: Anthropic → OpenRouter → HuggingFace. | |
| Returns None if no provider is available — caller handles fallback. | |
| """ | |
| # --- Anthropic Claude --- | |
| anthropic_key = os.getenv("ANTHROPIC_API_KEY") | |
| if anthropic_key: | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| "https://api.anthropic.com/v1/messages", | |
| headers={ | |
| "x-api-key": anthropic_key, | |
| "anthropic-version": "2023-06-01", | |
| "content-type": "application/json", | |
| }, | |
| json={ | |
| "model": "claude-haiku-4-5-20251001", | |
| "max_tokens": 512, | |
| "messages": [{"role": "user", "content": prompt}], | |
| }, | |
| timeout=aiohttp.ClientTimeout(total=30) | |
| ) as resp: | |
| resp.raise_for_status() | |
| data = await resp.json() | |
| return data["content"][0]["text"] | |
| except Exception as e: | |
| logger.warning(f"Anthropic LLM call failed: {e}") | |
| # --- OpenRouter fallback --- | |
| openrouter_key = os.getenv("OPENROUTER_API_KEY") | |
| if openrouter_key: | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| "https://openrouter.ai/api/v1/chat/completions", | |
| headers={ | |
| "Authorization": f"Bearer {openrouter_key}", | |
| "content-type": "application/json", | |
| }, | |
| json={ | |
| "model": "mistralai/mistral-7b-instruct", | |
| "max_tokens": 512, | |
| "messages": [{"role": "user", "content": prompt}], | |
| }, | |
| timeout=aiohttp.ClientTimeout(total=30) | |
| ) as resp: | |
| resp.raise_for_status() | |
| data = await resp.json() | |
| return data["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| logger.warning(f"OpenRouter LLM call failed: {e}") | |
| # --- HuggingFace fallback --- | |
| hf_key = os.getenv("HF_API_KEY") | |
| if hf_key: | |
| try: | |
| model = "mistralai/Mistral-7B-Instruct-v0.3" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| f"https://api-inference.huggingface.co/models/{model}/v1/chat/completions", | |
| headers={ | |
| "Authorization": f"Bearer {hf_key}", | |
| "content-type": "application/json", | |
| }, | |
| json={ | |
| "model": model, | |
| "max_tokens": 512, | |
| "messages": [{"role": "user", "content": prompt}], | |
| }, | |
| timeout=aiohttp.ClientTimeout(total=60) | |
| ) as resp: | |
| resp.raise_for_status() | |
| data = await resp.json() | |
| return data["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| logger.warning(f"HuggingFace LLM call failed: {e}") | |
| logger.info("No LLM provider available — returning None.") | |
| return None | |
| # ============================================================================= | |
| # SECTION 6 — Public Tools (registered by mcp.py) | |
| # ============================================================================= | |
| async def get_markets( | |
| category: Optional[str] = None, | |
| limit: int = 20 | |
| ) -> list: | |
| """ | |
| MCP Tool: Get active prediction markets from cache. | |
| Args: | |
| category: Filter by category. Options: politics, crypto, economics, | |
| energy, tech, sports, world, other. None = all categories. | |
| limit: Max number of markets to return (default 20, max 100). | |
| Returns: | |
| List of formatted market dicts with human-readable probability. | |
| """ | |
| await initialize() | |
| limit = min(limit, 100) | |
| markets = await _get_cached_markets(category=category, limit=limit) | |
| if not markets: | |
| return [{"info": "No markets in cache yet. Try again in 30 seconds."}] | |
| return [_format_market_simple(m) for m in markets] | |
| async def trending_markets(limit: int = 10) -> list: | |
| """ | |
| MCP Tool: Get top trending markets by trading volume. | |
| Args: | |
| limit: Number of trending markets to return (default 10). | |
| Returns: | |
| List of top markets sorted by volume descending. | |
| """ | |
| await initialize() | |
| markets = await _get_cached_markets(active_only=True, limit=limit) | |
| if not markets: | |
| return [{"info": "No markets in cache yet. Try again in 30 seconds."}] | |
| return [_format_market_simple(m) for m in markets] | |
| async def analyze_market(market_id: str) -> dict: | |
| """ | |
| MCP Tool: Get LLM analysis of a single prediction market. | |
| Falls back to structured data summary if no LLM key is configured. | |
| Args: | |
| market_id: Polymarket market ID from get_markets() results. | |
| Returns: | |
| Dict with market data + LLM analysis (or structured fallback). | |
| """ | |
| await initialize() | |
| market = await _get_market_by_id(market_id) | |
| if not market: | |
| return {"error": f"Market '{market_id}' not found in cache."} | |
| formatted = _format_market_simple(market) | |
| # Build LLM prompt | |
| prompt = ( | |
| f"Analysiere diesen Prediction Market kurz und präzise:\n\n" | |
| f"Frage: {market.get('question')}\n" | |
| f"Wahrscheinlichkeit YES: {formatted['probability']}\n" | |
| f"Handelsvolumen: {formatted['volume_usd']}\n" | |
| f"Kategorie: {market.get('category')}\n" | |
| f"Läuft bis: {market.get('end_date', 'unbekannt')}\n\n" | |
| f"Erkläre in 2-3 Sätzen was der Markt aussagt und was das für " | |
| f"Alltagsentscheidungen bedeuten könnte. Keine Finanzberatung." | |
| ) | |
| analysis = await _llm_analyze(prompt) | |
| if analysis: | |
| formatted["analysis"] = analysis | |
| else: | |
| # Structured fallback — no LLM needed | |
| prob = market.get("probability", 0) | |
| formatted["analysis"] = ( | |
| f"Der Markt bewertet '{market.get('question')}' mit " | |
| f"{prob}% Wahrscheinlichkeit. " | |
| f"Für eine KI-Analyse bitte LLM Provider API Key konfigurieren." | |
| ) | |
| return formatted | |
| async def summary_report(category: Optional[str] = None) -> dict: | |
| """ | |
| MCP Tool: Generate a summary report for a category or all markets. | |
| Uses LLM if available, falls back to structured statistics. | |
| Args: | |
| category: Category to summarize. None = all active markets. | |
| Returns: | |
| Dict with statistics and optional LLM narrative summary. | |
| """ | |
| await initialize() | |
| markets = await _get_cached_markets(category=category, limit=50) | |
| if not markets: | |
| return {"error": "No markets in cache yet. Try again in 30 seconds."} | |
| # --- Build statistics (always available, no LLM needed) --- | |
| probs = [m["probability"] for m in markets if m.get("probability")] | |
| avg_prob = round(sum(probs) / len(probs), 1) if probs else 0 | |
| total_vol = sum(m.get("volume", 0) for m in markets) | |
| # Top 3 by volume | |
| top3 = [_format_market_simple(m) for m in markets[:3]] | |
| stats = { | |
| "category": category or "all", | |
| "market_count": len(markets), | |
| "avg_probability":f"{avg_prob}%", | |
| "total_volume": f"${total_vol:,.0f}", | |
| "top_markets": top3, | |
| "generated_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| # --- LLM narrative (optional) --- | |
| market_list = "\n".join([ | |
| f"- {m.get('question')} ({m.get('probability', 0)}% YES, Vol: ${m.get('volume', 0):,.0f})" | |
| for m in markets[:10] | |
| ]) | |
| prompt = ( | |
| f"Erstelle eine kurze Zusammenfassung (3-4 Sätze) der aktuellen " | |
| f"Prediction Market Lage{' für ' + category if category else ''}:\n\n" | |
| f"{market_list}\n\n" | |
| f"Was sind die auffälligsten Trends? Sachlich, keine Finanzberatung." | |
| ) | |
| narrative = await _llm_analyze(prompt) | |
| if narrative: | |
| stats["narrative"] = narrative | |
| return stats | |
| async def get_cache_info() -> dict: | |
| """ | |
| MCP Tool: Get cache status and available categories. | |
| Useful for debugging and monitoring. | |
| Returns: | |
| Dict with cache stats and available category list. | |
| """ | |
| await initialize() | |
| cache_stats = await _get_cache_stats() | |
| return { | |
| **cache_stats, | |
| "fetch_interval_seconds": FETCH_INTERVAL, | |
| "available_categories": list(CATEGORY_MAP.keys()) + ["other"], | |
| "gamma_api": GAMMA_API, | |
| "llm_available": any([ | |
| os.getenv("ANTHROPIC_API_KEY"), | |
| os.getenv("OPENROUTER_API_KEY"), | |
| os.getenv("HF_API_KEY"), | |
| ]), | |
| } | |
| # ============================================================================= | |
| # Direct execution guard | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| print("WARNING: Run via main.py → app.py → mcp.py, not directly.") |