| import requests |
| from datetime import datetime, timedelta |
| import os |
| from typing import List, Dict, Any |
|
|
|
|
| api_key_metal = os.getenv("metail_api") |
|
|
| class MetalsPrice: |
| def __init__(self, api_key: str, base_url: str, timeout: int = 10): |
| self.api_key = api_key |
| self.base_url = base_url |
| self.timeout = timeout |
| |
| def get_price_history( |
| self, |
| symbol: str, |
| start_timestamp: int = int(datetime.utcnow().timestamp()), |
| end_timestamp: int = int((datetime.utcnow() - timedelta(days=730)).timestamp()), |
| group_by: str = "day", |
| aggregation: str = "max_price", |
| order_by: str = "desc" |
| ) -> List[Dict[str, Any]]: |
| """ |
| Fetch price history for a given asset from Gold API. |
| """ |
| headers = { |
| "x-api-key": self.api_key, |
| "Content-Type": "application/json" |
| } |
| |
| params = { |
| "symbol": symbol, |
| "startTimestamp": start_timestamp, |
| "endTimestamp": end_timestamp, |
| "groupBy": group_by, |
| "aggregation": aggregation, |
| "orderBy": order_by |
| } |
| |
| try: |
| response = requests.get( |
| self.base_url, |
| headers=headers, |
| params=params, |
| timeout=self.timeout |
| ) |
| return response.json() |
| except requests.exceptions.HTTPError as err: |
| print(f"HTTP Error: {err}") |
| except requests.exceptions.Timeout: |
| print("Request timed out.") |
| except requests.exceptions.RequestException as err: |
| print(f"Request failed: {err}") |
| |
| return [] |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |