chaofeng / app.py
chaofenghui's picture
Upload folder using huggingface_hub
c9eb688 verified
"""
超峰海外代理 (Chaofeng Overseas Proxy)
HuggingFace Space — FastAPI proxy for accessing overseas resources from China.
Capabilities:
1. US stock quotes/klines (Yahoo Finance)
2. Global financial news (Finnhub)
3. File/model download proxy (HuggingFace, GitHub, etc.)
4. General HTTP fetch proxy for blocked sites
Used by: 金融家 (JinRongJia), Narnia, and other projects needing overseas access.
"""
import hashlib
import io
import logging
import os
import tempfile
import time
from typing import Optional
from urllib.parse import urlparse
import requests
from fastapi import FastAPI, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse, Response
import yfinance as yf
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("chaofeng")
app = FastAPI(title="超峰海外代理", version="0.2.2")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
TICKER_CACHE: dict[str, yf.Ticker] = {}
# Safety: only proxy to known-safe domains by default
ALLOWED_DOMAINS = {
"huggingface.co", "hf-mirror.com", "cdn-lfs.hf.co", "cdn-lfs-us-1.hf.co",
"github.com", "api.github.com", "raw.githubusercontent.com",
"github-releases.githubusercontent.com", "objects.githubusercontent.com",
"pypi.org", "files.pythonhosted.org",
"storage.googleapis.com",
"query1.finance.yahoo.com", "query2.finance.yahoo.com",
"fc.yahoo.com", "finance.yahoo.com",
"finnhub.io",
"arxiv.org",
}
# Max file size for downloads (500MB)
MAX_DOWNLOAD_SIZE = 500 * 1024 * 1024
def _get_ticker(symbol: str) -> yf.Ticker:
if symbol not in TICKER_CACHE:
TICKER_CACHE[symbol] = yf.Ticker(symbol)
return TICKER_CACHE[symbol]
def _is_allowed(url: str) -> bool:
"""Check if URL domain is in the allowlist."""
try:
host = urlparse(url).hostname
if host is None:
return False
return any(
host == allowed or host.endswith("." + allowed)
for allowed in ALLOWED_DOMAINS
)
except Exception:
return False
# ═══════════════════════════════════════════════════════════════
# Health
# ═══════════════════════════════════════════════════════════════
@app.get("/")
def health():
return {
"status": "ok",
"service": "chaofeng-overseas-proxy",
"version": "0.2.2",
"endpoints": [
"GET /api/us/quotes?symbols=AAPL,TSLA",
"GET /api/us/klines/{symbol}?period=day&count=250",
"GET /api/us/news/{symbol}?limit=20",
"GET /api/us/news/bulk?symbols=AAPL,QCOM&limit=10",
"GET /api/us/earnings/{symbol}",
"GET /api/news/global?limit=50",
"GET /api/proxy/fetch?url=...",
"GET /api/proxy/download?url=...&filename=...",
],
}
# ═══════════════════════════════════════════════════════════════
# US Stock Quotes
# ═══════════════════════════════════════════════════════════════
@app.get("/api/us/quotes")
def us_quotes(symbols: str = Query(..., description="Comma-separated ticker symbols")):
ticker_list = [s.strip() for s in symbols.split(",") if s.strip()]
if not ticker_list:
return JSONResponse({"error": "No symbols provided"}, status_code=400)
results = []
for symbol in ticker_list:
try:
t = _get_ticker(symbol)
info = t.fast_info
price = float(info.get("lastPrice", 0) or 0)
prev_close = float(info.get("previousClose", 0) or 0)
change = price - prev_close if price and prev_close else 0
change_pct = (change / prev_close * 100) if prev_close else 0
results.append({
"symbol": symbol,
"price": price,
"open": float(info.get("open", 0) or 0),
"high": float(info.get("dayHigh", 0) or 0),
"low": float(info.get("dayLow", 0) or 0),
"pre_close": prev_close,
"change": round(change, 4),
"change_percent": round(change_pct, 2),
"volume": int(info.get("lastVolume", 0) or 0),
})
except Exception as e:
logger.warning(f"Quote fetch failed for {symbol}: {e}")
results.append({"symbol": symbol, "error": str(e)})
return {"count": len(results), "quotes": results}
# ═══════════════════════════════════════════════════════════════
# US Stock Klines
# ═══════════════════════════════════════════════════════════════
@app.get("/api/us/klines/{symbol}")
def us_klines(
symbol: str,
period: str = Query("day"),
count: int = Query(250),
):
interval_map = {"day": "1d", "week": "1wk", "month": "1mo"}
interval = interval_map.get(period, "1d")
if count <= 5:
range_str = "5d"
elif count <= 30:
range_str = "1mo"
elif count <= 90:
range_str = "3mo"
elif count <= 180:
range_str = "6mo"
elif count <= 365:
range_str = "1y"
else:
range_str = "2y"
try:
t = _get_ticker(symbol)
df = t.history(period=range_str, interval=interval)
if df.empty:
return {"symbol": symbol, "period": period, "count": 0, "klines": []}
results = []
for idx, row in df.iterrows():
results.append({
"timestamp": int(idx.timestamp() * 1000),
"open": float(row["Open"]),
"high": float(row["High"]),
"low": float(row["Low"]),
"close": float(row["Close"]),
"volume": int(row["Volume"]),
})
if count and len(results) > count:
results = results[-count:]
return {"symbol": symbol, "period": period, "count": len(results), "klines": results}
except Exception as e:
logger.warning(f"Kline fetch failed for {symbol}: {e}")
return JSONResponse({"symbol": symbol, "error": str(e)}, status_code=500)
# ═══════════════════════════════════════════════════════════════
# Global Financial News
# ═══════════════════════════════════════════════════════════════
@app.get("/api/news/global")
def global_news(
limit: int = Query(50, ge=1, le=100),
category: str = Query("general"),
):
articles = []
# Try Finnhub first
try:
url = "https://finnhub.io/api/v1/news"
params = {"category": category}
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
for item in resp.json()[:limit]:
articles.append({
"id": f"finnhub_{item.get('id', '')}",
"source": "finnhub",
"title": item.get("headline", ""),
"content": item.get("summary", ""),
"url": item.get("url", ""),
"publish_time": item.get("datetime", 0) * 1000,
"related_symbols": item.get("related", ""),
"category": item.get("category", category),
})
except Exception as e:
logger.warning(f"Finnhub news failed: {e}")
# Fallback: yfinance news for major US tickers
if not articles:
articles = _fetch_yf_news(["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "META", "TSLA"], limit)
return {"count": len(articles), "articles": articles}
@app.get("/api/us/news/bulk")
def us_stock_news_bulk(
symbols: str = Query(..., description="Comma-separated US stock symbols"),
limit: int = Query(10, ge=1, le=30),
):
"""Get recent news for multiple US stocks in one request."""
syms = [s.strip() for s in symbols.split(",") if s.strip()]
articles = _fetch_yf_news(syms, limit)
return {"symbols": symbols, "count": len(articles), "articles": articles}
@app.get("/api/us/news/{symbol}")
def us_stock_news(
symbol: str,
limit: int = Query(20, ge=1, le=50),
):
"""Get recent news for a US stock via yfinance."""
articles = _fetch_yf_news([symbol], limit)
return {"symbol": symbol, "count": len(articles), "articles": articles}
def _fetch_yf_news(symbols: list[str], limit: int) -> list[dict]:
"""Fetch news for given symbols via yfinance."""
from datetime import datetime, timezone
articles = []
seen_ids = set()
for sym in symbols:
try:
ticker = _get_ticker(sym)
raw_news = ticker.news
if not raw_news:
continue
for item in raw_news[:limit]:
# yfinance format: {'id': '...', 'content': {title, summary, pubDate, canonicalUrl, relatedTickers, ...}}
inner = item.get("content", item) # fallback: use item directly
title = inner.get("title", "")
desc = inner.get("summary", "") or inner.get("description", "")
url = ""
curl = inner.get("canonicalUrl")
if isinstance(curl, dict):
url = curl.get("url", "")
art_id = item.get("id", "") or hashlib.md5((url + title).encode()).hexdigest()
if art_id in seen_ids:
continue
seen_ids.add(art_id)
# Parse pubDate (ISO 8601 string like "2026-05-07T21:04:15Z")
pub_date = inner.get("pubDate", "")
pub_ms = 0
if pub_date:
try:
pub_ms = int(datetime.fromisoformat(pub_date.replace("Z", "+00:00")).timestamp() * 1000)
except Exception:
pass
related = inner.get("relatedTickers", [])
if isinstance(related, list):
related = ",".join(related)
articles.append({
"id": f"yf_{art_id[:16]}",
"source": "yahoo_finance",
"title": title,
"content": desc,
"url": url,
"publish_time": pub_ms,
"related_symbols": str(related) if related else "",
"category": inner.get("contentType", "news"),
})
except Exception as e:
logger.warning(f"yfinance news for {sym} failed: {e}")
return articles[:limit * len(symbols)]
# ═══════════════════════════════════════════════════════════════
# Earnings / Financial Reports
# ═══════════════════════════════════════════════════════════════
@app.get("/api/us/earnings/{symbol}")
def us_earnings(symbol: str):
"""Get quarterly earnings data for a US stock via yfinance."""
try:
t = _get_ticker(symbol)
quarterly = []
debug = {}
# Method 1: earnings_history (quoteSummary API) with retry
for attempt in range(3):
try:
hist = t.earnings_history
debug["hist_type"] = str(type(hist))
if hist is not None and not hist.empty:
debug["hist_len"] = len(hist)
debug["hist_cols"] = list(hist.columns)
for idx, row in hist.iterrows():
quarterly.append({
"quarter": str(idx),
"eps_estimate": float(row.get("epsEstimate", row.get("EPS Estimate", 0)) or 0),
"reported_eps": float(row.get("epsActual", row.get("Reported EPS", 0)) or 0),
"surprise_pct": round(float(row.get("surprisePercent", row.get("Surprise(%)", 0)) or 0) * 100, 2),
})
break # success
else:
debug["hist_empty_attempt_%d" % attempt] = True
except Exception as e:
debug["hist_error_attempt_%d" % attempt] = str(e)
if "rate limit" in str(e).lower() and attempt < 2:
time.sleep(2 * (attempt + 1))
continue
# Method 2: quarterly_earnings (income statement data)
if not quarterly:
try:
qe = t.quarterly_earnings
debug["qe_type"] = str(type(qe))
if qe is not None and hasattr(qe, "empty") and not qe.empty:
debug["qe_len"] = len(qe)
debug["qe_cols"] = list(qe.columns) if hasattr(qe, "columns") else "no_cols"
for idx, row in qe.iterrows():
quarterly.append({
"quarter": str(idx),
"eps_estimate": 0.0,
"reported_eps": float(row.get("Diluted EPS", 0) or 0),
"surprise_pct": 0.0,
})
except Exception as e:
debug["qe_error"] = str(e)
# Method 3: earnings_dates (scraper — last resort)
if not quarterly:
try:
df = t.get_earnings_dates(limit=12)
debug["df_type"] = str(type(df))
if df is not None and not df.empty:
debug["df_len"] = len(df)
debug["df_cols"] = list(df.columns)
for idx, row in df.iterrows():
row_dict = row.to_dict() if hasattr(row, "to_dict") else dict(row)
quarterly.append({
"quarter": str(idx),
"eps_estimate": float(row_dict.get("EPS Estimate", 0) or 0),
"reported_eps": float(row_dict.get("Reported EPS", 0) or 0),
"surprise_pct": float(row_dict.get("Surprise(%)", 0) or 0),
})
except Exception as e:
debug["df_error"] = str(e)
debug["df_trace"] = repr(e)
# Best-effort: upcoming earnings calendar
next_ed = None
rev_avg = None
rev_low = None
rev_high = None
try:
cal = t.calendar
if cal and isinstance(cal, dict):
ed = cal.get("Earnings Date")
if isinstance(ed, list) and ed:
first = ed[0]
if hasattr(first, "isoformat"):
next_ed = first.isoformat()
elif isinstance(first, str):
next_ed = first
rev_avg = cal.get("Revenue Average")
rev_low = cal.get("Revenue Low")
rev_high = cal.get("Revenue High")
except Exception:
pass
return {
"symbol": symbol,
"quarterly_earnings": quarterly,
"next_earnings_date": next_ed,
"revenue_estimate_avg": float(rev_avg) if rev_avg is not None else None,
"revenue_estimate_low": float(rev_low) if rev_low is not None else None,
"revenue_estimate_high": float(rev_high) if rev_high is not None else None,
"debug": debug,
}
except Exception as e:
logger.exception(f"Earnings fetch failed for {symbol}")
return JSONResponse({"symbol": symbol, "error": str(e), "trace": repr(e)}, status_code=500)
# ═══════════════════════════════════════════════════════════════
# General-purpose HTTP fetch proxy
# ═══════════════════════════════════════════════════════════════
@app.get("/api/proxy/fetch")
def proxy_fetch(
url: str = Query(..., description="Target URL to fetch"),
timeout: int = Query(30, ge=5, le=120),
):
"""
Fetch content from an overseas URL and return it.
For JSON APIs, returns parsed JSON. For HTML/text, returns raw content.
Domain must be in the allowlist.
"""
if not _is_allowed(url):
return JSONResponse({
"error": f"Domain not allowed: {urlparse(url).hostname}",
"allowed_domains": sorted(ALLOWED_DOMAINS),
}, status_code=403)
try:
resp = requests.get(
url,
headers={"User-Agent": "Mozilla/5.0 (compatible; chaofeng-proxy/0.2)"},
timeout=timeout,
allow_redirects=True,
)
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
# Return JSON directly
if "json" in content_type:
return JSONResponse(resp.json())
# Return text
if any(t in content_type for t in ("text/", "application/xml", "application/javascript")):
return Response(
content=resp.text,
media_type=content_type,
headers={"X-Proxied-Url": url},
)
# Binary — return as base64 with metadata
if len(resp.content) > 10 * 1024 * 1024:
return JSONResponse({
"url": url,
"content_type": content_type,
"size": len(resp.content),
"note": "Binary content too large (>10MB). Use /api/proxy/download instead.",
})
import base64
return JSONResponse({
"url": url,
"content_type": content_type,
"size": len(resp.content),
"data_base64": base64.b64encode(resp.content).decode("ascii"),
})
except requests.Timeout:
return JSONResponse({"error": f"Request timed out after {timeout}s"}, status_code=504)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=502)
# ═══════════════════════════════════════════════════════════════
# File / Model download proxy (streaming)
# ═══════════════════════════════════════════════════════════════
@app.get("/api/proxy/download")
def proxy_download(
url: str = Query(..., description="File URL to download"),
filename: str = Query("", description="Optional filename override"),
):
"""
Stream-download a file from an overseas URL.
Supports large files (models, datasets) up to 500MB.
Domain must be in the allowlist.
"""
if not _is_allowed(url):
return JSONResponse({
"error": f"Domain not allowed: {urlparse(url).hostname}",
}, status_code=403)
try:
resp = requests.get(
url,
headers={"User-Agent": "Mozilla/5.0 (compatible; chaofeng-proxy/0.2)"},
stream=True,
timeout=30,
allow_redirects=True,
)
resp.raise_for_status()
content_length = resp.headers.get("content-length")
if content_length and int(content_length) > MAX_DOWNLOAD_SIZE:
return JSONResponse({
"error": f"File too large ({int(content_length)} bytes). Max: {MAX_DOWNLOAD_SIZE}",
}, status_code=413)
# Determine filename
if not filename:
disposition = resp.headers.get("content-disposition", "")
if "filename=" in disposition:
filename = disposition.split("filename=")[-1].strip('"\' ')
else:
filename = os.path.basename(urlparse(url).path) or "download"
content_type = resp.headers.get("content-type", "application/octet-stream")
def iter_chunks():
for chunk in resp.iter_content(chunk_size=8192):
yield chunk
return StreamingResponse(
iter_chunks(),
media_type=content_type,
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
"X-Proxied-Url": url,
"X-Content-Length": content_length or "unknown",
},
)
except requests.Timeout:
return JSONResponse({"error": "Download timed out"}, status_code=504)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=502)
# ═══════════════════════════════════════════════════════════════
# HuggingFace model download helper
# ═══════════════════════════════════════════════════════════════
@app.get("/api/proxy/hf-info/{repo_id:path}")
def hf_model_info(repo_id: str):
"""
Get HuggingFace model/repo info (files, sizes).
Useful for checking model availability before downloading.
Example: /api/proxy/hf-info/google/mobilebert-uncased
"""
try:
# Get repo info from HF API
resp = requests.get(
f"https://huggingface.co/api/models/{repo_id}",
timeout=15,
)
if resp.status_code == 404:
return JSONResponse({"error": f"Repo not found: {repo_id}"}, status_code=404)
resp.raise_for_status()
data = resp.json()
# List files
files = []
siblings = data.get("siblings", [])
for sib in siblings:
files.append({
"filename": sib.get("rfilename"),
"size": sib.get("size", 0),
})
return {
"repo_id": repo_id,
"model_id": data.get("modelId", repo_id),
"pipeline_tag": data.get("pipeline_tag"),
"tags": data.get("tags", []),
"files": sorted(files, key=lambda f: f["size"], reverse=True),
"total_files": len(files),
}
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=502)
# ═══════════════════════════════════════════════════════════════
# HuggingFace API relay (for hf upload / hf CLI through GFW)
# ═══════════════════════════════════════════════════════════════
@app.api_route("/api/hf-proxy/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def hf_api_proxy(path: str, request: Request):
"""
Generic HuggingFace API proxy.
Forwards any request to https://huggingface.co/api/{path}.
Used by hf CLI when HF_ENDPOINT is set to this proxy.
"""
HF_API_BASE = "https://huggingface.co"
# HF_ENDPOINT is set to https://chaofenghui-chaofeng.hf.space/api/hf-proxy
# The hf CLI replaces https://huggingface.co with HF_ENDPOINT, so:
# api/models → hf.co/api/models
# spaces/X.git/info/lfs/... → hf.co/spaces/X.git/info/lfs/...
# Just forward the raw path to huggingface.co
target_url = f"{HF_API_BASE}/{path}"
if request.url.query:
target_url += f"?{request.url.query}"
headers = dict(request.headers)
headers.pop("host", None)
headers.pop("content-length", None)
body = await request.body()
try:
resp = requests.request(
method=request.method,
url=target_url,
headers=headers,
data=body if body else None,
timeout=120,
allow_redirects=True,
)
# Forward all response headers from HF API
response_headers = {}
for key, value in resp.headers.items():
if key.lower() not in ("transfer-encoding", "content-encoding", "content-length"):
response_headers[key] = value
return Response(
content=resp.content,
status_code=resp.status_code,
headers=response_headers,
)
except Exception as e:
logger.error(f"HF proxy error: {e}")
return JSONResponse({"error": str(e)}, status_code=502)
# ═══════════════════════════════════════════════════════════════
# Short Track Dashboard API Proxy
# Forwards requests from WeChat Mini Program → Flask master server
# WeChat requires HTTPS, so we proxy through this HF Space
# ═══════════════════════════════════════════════════════════════
ST_API_BASE = "http://122.51.80.140:5000"
@app.api_route("/api/st-proxy/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
async def st_api_proxy(path: str, request: Request):
"""Forward API requests to the short track dashboard master server."""
target_url = f"{ST_API_BASE}/api/{path}"
if request.url.query:
target_url += f"?{request.url.query}"
headers = dict(request.headers)
headers.pop("host", None)
headers.pop("content-length", None)
# Preserve the original Content-Type (important for POST JSON)
if "content-type" not in {k.lower() for k in headers}:
headers["content-type"] = request.headers.get("content-type", "application/json")
body = await request.body()
try:
resp = requests.request(
method=request.method,
url=target_url,
headers=headers,
data=body if body else None,
timeout=30,
allow_redirects=False,
)
# Build response, preserving headers the client needs
response_headers = {}
for key, value in resp.headers.items():
if key.lower() not in ("transfer-encoding", "content-encoding", "content-length"):
response_headers[key] = value
response_headers["access-control-allow-origin"] = "*"
return Response(
content=resp.content,
status_code=resp.status_code,
headers=response_headers,
)
except Exception as e:
logger.error(f"ST proxy error: {e}")
return JSONResponse({"error": str(e)}, status_code=502)