| | from flask import Flask, request, jsonify, render_template, url_for |
| | from urllib.parse import urlencode |
| | import requests |
| | import os |
| |
|
| | app = Flask(__name__) |
| |
|
| | |
| | SCREENSHOT_API = "https://corvo-ai-xx-sc.hf.space/capture" |
| |
|
| | |
| | INTERVAL_MAP = { |
| | |
| | "1m": "1", |
| | "3m": "3", |
| | "5m": "5", |
| | "5min": "5", |
| | "15m": "15", |
| | "15min": "15", |
| | "30m": "30", |
| | "30min": "30", |
| | |
| | "1h": "60", |
| | "2h": "120", |
| | "4h": "240", |
| | |
| | "1d": "D", |
| | "1D": "D", |
| | "d": "D", |
| | "D": "D", |
| | "1w": "W", |
| | "1W": "W", |
| | "w": "W", |
| | "W": "W", |
| | "1mth": "M", |
| | "1M": "M", |
| | "m": "M", |
| | "M": "M" |
| | } |
| |
|
| | def normalize_interval(raw_interval: str) -> str: |
| | if not raw_interval: |
| | return "D" |
| | key = raw_interval.strip() |
| | return INTERVAL_MAP.get(key, key) |
| |
|
| | def parse_indicators_param(indicators_raw): |
| | |
| | |
| | |
| | |
| | if indicators_raw is None: |
| | return [] |
| | if isinstance(indicators_raw, list): |
| | return [str(x).strip() for x in indicators_raw if str(x).strip()] |
| | if isinstance(indicators_raw, str): |
| | parts = [p.strip() for p in indicators_raw.split(",") if p.strip()] |
| | return parts |
| | return [] |
| |
|
| | @app.route("/") |
| | def index(): |
| | return jsonify({ |
| | "message": "Chart Screenshot API", |
| | "endpoints": { |
| | "chart_html": "/chart?symbol=BTCUSDT&exchange=BINANCE&interval=1h&indicators=PUB;abc,PUB;def", |
| | "screenshot_api": "/api/screenshot" |
| | } |
| | }) |
| |
|
| | @app.route("/chart") |
| | def chart(): |
| | |
| | |
| | symbol = request.args.get("symbol", "BTCUSDT") |
| | exchange = request.args.get("exchange", "BINANCE") |
| | interval = normalize_interval(request.args.get("interval", "1D")) |
| | indicators = parse_indicators_param(request.args.get("indicators")) |
| | theme = request.args.get("theme", "dark") |
| |
|
| | return render_template( |
| | "chart.html", |
| | symbol=symbol, |
| | exchange=exchange, |
| | interval=interval, |
| | indicators=indicators, |
| | theme=theme |
| | ) |
| |
|
| | @app.route("/api/screenshot", methods=["POST"]) |
| | def api_screenshot(): |
| | """ |
| | Request JSON: |
| | { |
| | "symbol": "BTCUSDT", // required |
| | "exchange": "BINANCE", // optional, default BINANCE |
| | "interval": "1h", // optional, default 1D |
| | "indicators": ["PUB;id1","PUB;id2"], // optional, or comma-separated string |
| | "theme": "dark", // optional, default "dark" |
| | "width": 1080, // optional |
| | "height": 1920, // optional |
| | "fullPage": false // optional |
| | } |
| | |
| | Response JSON: |
| | { |
| | "imageUrl": "https://...png", |
| | "htmlUrl": "https://...html", |
| | "screenshotMeta": {...} |
| | } |
| | """ |
| | data = request.get_json(silent=True) or {} |
| |
|
| | symbol = data.get("symbol", None) |
| | if not symbol: |
| | return jsonify({"error": "symbol is required"}), 400 |
| |
|
| | exchange = data.get("exchange", "BINANCE") |
| | interval = normalize_interval(data.get("interval", "1D")) |
| | indicators = parse_indicators_param(data.get("indicators")) |
| | theme = data.get("theme", "dark") |
| |
|
| | |
| | |
| | query = { |
| | "symbol": symbol, |
| | "exchange": exchange, |
| | "interval": interval, |
| | "theme": theme |
| | } |
| | if indicators: |
| | query["indicators"] = ",".join(indicators) |
| |
|
| | |
| | chart_url = request.url_root.rstrip("/") + url_for("chart") + "?" + urlencode(query) |
| |
|
| | |
| | width = data.get("width", 1080) |
| | height = data.get("height", 1920) |
| | full_page = data.get("fullPage", False) |
| |
|
| | payload = { |
| | "urls": [chart_url], |
| | "width": width, |
| | "height": height, |
| | "fullPage": bool(full_page) |
| | } |
| |
|
| | try: |
| | resp = requests.post(SCREENSHOT_API, json=payload, timeout=60) |
| | except requests.RequestException as e: |
| | return jsonify({"error": "Failed to reach screenshot service", "details": str(e)}), 502 |
| |
|
| | if resp.status_code != 200: |
| | return jsonify({"error": "Screenshot service error", "status": resp.status_code, "body": resp.text}), 502 |
| |
|
| | body = resp.json() |
| | errors = body.get("errors") or [] |
| | if errors: |
| | return jsonify({"error": "Screenshot service reported errors", "details": errors}), 502 |
| |
|
| | results = body.get("results") or [] |
| | if not results: |
| | return jsonify({"error": "No results from screenshot service"}), 502 |
| |
|
| | first = results[0] |
| | output = first.get("output") or {} |
| | image_url = output.get("imageUrl") |
| | html_url = output.get("htmlUrl") |
| |
|
| | if not image_url: |
| | return jsonify({"error": "Screenshot result missing imageUrl", "raw": first}), 502 |
| |
|
| | |
| | return jsonify({ |
| | "imageUrl": image_url, |
| | "htmlUrl": html_url, |
| | "screenshotMeta": first.get("meta", {}) |
| | }) |
| |
|
| | if __name__ == "__main__": |
| | |
| | host = os.getenv("HOST", "0.0.0.0") |
| | port = int(os.getenv("PORT", "7860")) |
| | app.run(host=host, port=port, debug=True) |