Spaces:
Sleeping
Sleeping
| """ | |
| PolyMarket API Integration | |
| Handles interactions with Polymarket's Gamma API and CLOB API | |
| """ | |
| import requests | |
| from typing import Dict, Optional, List | |
| import re | |
| class PolymarketAPI: | |
| """Class to handle Polymarket API operations""" | |
| GAMMA_API_BASE = "https://gamma-api.polymarket.com" | |
| CLOB_API_BASE = "https://clob.polymarket.com" | |
| def __init__(self): | |
| pass | |
| def extract_slug_from_url(self, url: str) -> Optional[str]: | |
| """ | |
| Extract the slug from a Polymarket URL | |
| Args: | |
| url: Polymarket URL (e.g., https://polymarket.com/event/will-the-supreme-court-rule-in-favor-of-trumps-tariffs) | |
| Returns: | |
| The slug string or None if not found | |
| """ | |
| # Handle different URL patterns | |
| # Pattern 1: https://polymarket.com/event/slug-here | |
| # Pattern 2: https://polymarket.com/market/slug-here | |
| patterns = [ | |
| r'polymarket\.com/event/([a-z0-9\-]+)', | |
| r'polymarket\.com/market/([a-z0-9\-]+)', | |
| r'polymarket\.com/([a-z0-9\-]+)', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url.lower()) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def get_market_info(self, url: str) -> Optional[Dict]: | |
| """ | |
| Get market information from Polymarket using the URL | |
| Args: | |
| url: Polymarket URL | |
| Returns: | |
| Dictionary containing market information including: | |
| - id: market id | |
| - question: market question | |
| - conditionId: condition id | |
| - clobTokenIds: array of token ids | |
| """ | |
| slug = self.extract_slug_from_url(url) | |
| if not slug: | |
| raise ValueError(f"Could not extract slug from URL: {url}") | |
| # Call Gamma API with the slug | |
| endpoint = f"{self.GAMMA_API_BASE}/markets" | |
| params = {"slug": slug} | |
| try: | |
| response = requests.get(endpoint, params=params, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| # The API returns a singleton array | |
| if isinstance(data, list) and len(data) > 0: | |
| market_info = data[0] | |
| # Validate required fields | |
| required_fields = ['id', 'question', 'conditionId', 'clobTokenIds'] | |
| for field in required_fields: | |
| if field not in market_info: | |
| raise ValueError(f"Missing required field: {field}") | |
| return market_info | |
| else: | |
| raise ValueError(f"No market found for slug: {slug}") | |
| except requests.exceptions.RequestException as e: | |
| raise Exception(f"Error calling Gamma API: {str(e)}") | |
| def get_first_token_id(self, market_info: Dict) -> str: | |
| """ | |
| Extract the first token ID from market information | |
| Args: | |
| market_info: Dictionary containing market information | |
| Returns: | |
| The first clobTokenId as a string | |
| """ | |
| clob_token_ids = market_info.get('clobTokenIds', []) | |
| if isinstance(clob_token_ids, str): | |
| # Sometimes it might be a string representation of an array | |
| import json | |
| try: | |
| clob_token_ids = json.loads(clob_token_ids) | |
| except: | |
| pass | |
| if not clob_token_ids or len(clob_token_ids) == 0: | |
| raise ValueError("No clobTokenIds found in market info") | |
| return str(clob_token_ids[0]) | |
| def get_price_history( | |
| self, | |
| token_id: str, | |
| start_unix: int, | |
| end_unix: int, | |
| fidelity: int = 1 | |
| ) -> List[Dict]: | |
| """ | |
| Get price history from CLOB API | |
| If the requested time period is longer than 5 days, this method automatically | |
| breaks it into chunks of up to 5 days and combines the results. | |
| Args: | |
| token_id: The clobTokenId to fetch prices for | |
| start_unix: Start time as Unix timestamp | |
| end_unix: End time as Unix timestamp | |
| fidelity: Time granularity in minutes (default: 1 minute) | |
| Returns: | |
| List of price points, each containing: | |
| - t: Unix timestamp | |
| - p: Price (0-1 probability) | |
| Example: | |
| [ | |
| {"t": 1763121606, "p": 0.26}, | |
| {"t": 1763121664, "p": 0.26}, | |
| ... | |
| ] | |
| """ | |
| # Maximum interval is 5 days (in seconds) | |
| MAX_INTERVAL_SECONDS = 5 * 24 * 60 * 60 # 432000 seconds | |
| # Calculate the time difference | |
| time_diff = end_unix - start_unix | |
| # If the interval is within the limit, make a single API call | |
| if time_diff <= MAX_INTERVAL_SECONDS: | |
| return self._fetch_price_history_chunk(token_id, start_unix, end_unix, fidelity) | |
| # Otherwise, break it into chunks | |
| all_history = [] | |
| current_start = start_unix | |
| while current_start < end_unix: | |
| # Calculate chunk end (current_start + 5 days, or end_unix if smaller) | |
| chunk_end = min(current_start + MAX_INTERVAL_SECONDS, end_unix) | |
| # Fetch this chunk | |
| chunk_history = self._fetch_price_history_chunk( | |
| token_id, | |
| current_start, | |
| chunk_end, | |
| fidelity | |
| ) | |
| # Add to combined results | |
| all_history.extend(chunk_history) | |
| # Move to next chunk (start from the end of this chunk) | |
| current_start = chunk_end | |
| # Sort by timestamp and remove duplicates | |
| # Use a set to track seen timestamps to avoid duplicates | |
| seen_timestamps = set() | |
| unique_history = [] | |
| for point in sorted(all_history, key=lambda x: x['t']): | |
| if point['t'] not in seen_timestamps: | |
| seen_timestamps.add(point['t']) | |
| unique_history.append(point) | |
| return unique_history | |
| def _fetch_price_history_chunk( | |
| self, | |
| token_id: str, | |
| start_unix: int, | |
| end_unix: int, | |
| fidelity: int | |
| ) -> List[Dict]: | |
| """ | |
| Internal method to fetch a single chunk of price history from CLOB API | |
| Args: | |
| token_id: The clobTokenId to fetch prices for | |
| start_unix: Start time as Unix timestamp | |
| end_unix: End time as Unix timestamp | |
| fidelity: Time granularity in minutes | |
| Returns: | |
| List of price points | |
| """ | |
| endpoint = f"{self.CLOB_API_BASE}/prices-history" | |
| params = { | |
| "market": token_id, | |
| "startTs": start_unix, | |
| "endTs": end_unix, | |
| "fidelity": fidelity | |
| } | |
| try: | |
| response = requests.get(endpoint, params=params, timeout=30) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Extract history array | |
| if 'history' not in data: | |
| raise ValueError("No 'history' field in API response") | |
| history = data['history'] | |
| if not isinstance(history, list): | |
| raise ValueError("'history' field is not a list") | |
| # Validate data points | |
| for point in history: | |
| if 't' not in point or 'p' not in point: | |
| raise ValueError("Invalid data point format in history") | |
| return history | |
| except requests.exceptions.RequestException as e: | |
| raise Exception(f"Error calling CLOB API: {str(e)}") | |
| def reconstruct_timeseries( | |
| self, | |
| history: List[Dict], | |
| start_unix: int, | |
| end_unix: int, | |
| granularity_minutes: int = 1 | |
| ) -> List[Dict]: | |
| """ | |
| Reconstruct a complete time series from sparse price history data | |
| The API may return sparse data points. This method fills in gaps by | |
| forward-filling prices (last known price carries forward). | |
| Args: | |
| history: List of price points from get_price_history() | |
| start_unix: Start time as Unix timestamp | |
| end_unix: End time as Unix timestamp | |
| granularity_minutes: Desired time granularity in minutes | |
| Returns: | |
| List of complete time series data points: | |
| [ | |
| {"timestamp": 1763121600, "price": 0.26, "datetime": "..."}, | |
| {"timestamp": 1763121660, "price": 0.26, "datetime": "..."}, | |
| ... | |
| ] | |
| """ | |
| if not history: | |
| return [] | |
| # Sort history by timestamp | |
| history = sorted(history, key=lambda x: x['t']) | |
| # Initialize result | |
| timeseries = [] | |
| # Convert granularity to seconds | |
| granularity_seconds = granularity_minutes * 60 | |
| # Current price (forward fill) | |
| current_price = None | |
| history_idx = 0 | |
| # Iterate through each time point | |
| current_time = start_unix | |
| while current_time <= end_unix: | |
| # Update current price if we have new data points | |
| while history_idx < len(history) and history[history_idx]['t'] <= current_time: | |
| current_price = history[history_idx]['p'] | |
| history_idx += 1 | |
| # Add data point if we have a price | |
| if current_price is not None: | |
| timeseries.append({ | |
| "timestamp": current_time, | |
| "price": current_price | |
| }) | |
| # Move to next time point | |
| current_time += granularity_seconds | |
| return timeseries | |
| # Example usage | |
| if __name__ == "__main__": | |
| api = PolymarketAPI() | |
| # Test with example URL | |
| test_url = "https://polymarket.com/event/will-the-supreme-court-rule-in-favor-of-trumps-tariffs" | |
| try: | |
| print(f"Testing with URL: {test_url}") | |
| market_info = api.get_market_info(test_url) | |
| print(f"\nMarket Info:") | |
| print(f" ID: {market_info['id']}") | |
| print(f" Question: {market_info['question']}") | |
| print(f" Condition ID: {market_info['conditionId']}") | |
| token_id = api.get_first_token_id(market_info) | |
| print(f"\nFirst Token ID: {token_id}") | |
| # Test price history (example timestamps) | |
| # This is just for demonstration - use real timestamps when testing | |
| print("\nTo test price history, use:") | |
| print(f" history = api.get_price_history('{token_id}', start_unix, end_unix, fidelity=1)") | |
| print(f" timeseries = api.reconstruct_timeseries(history, start_unix, end_unix, 1)") | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |