PROXPY / app.py
Voxxium's picture
Update app.py
b0a2828 verified
import os
import logging
import threading
from contextlib import asynccontextmanager
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import JSONResponse, PlainTextResponse
from apscheduler.schedulers.background import BackgroundScheduler
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("api")
from proxy_pool import ProxyPool
from orchestrator import Orchestrator
WORKER_URL = os.environ.get(
"WORKER_URL",
"https://iptest-u2uk.onrender.com"
)
pool = ProxyPool()
orch = Orchestrator(pool, WORKER_URL)
scheduler = BackgroundScheduler(daemon=True)
def do_tick():
try:
orch.tick()
except Exception as e:
logger.error(f"Tick: {e}")
def ping_worker():
orch.ping_worker()
def self_ping():
host = os.environ.get("SPACE_HOST", "")
if not host:
return
try:
import requests
requests.get(f"https://{host}/health", timeout=5)
except Exception:
pass
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("🚀 Proxy API v3")
logger.info(f" Worker: {WORKER_URL}")
# charger backup GitHub au démarrage
try:
from github_store import load_from_github
saved = load_from_github()
if saved:
pool.import_json(saved)
logger.info(f"📂 Loaded {pool.size} proxies from GitHub")
except Exception as e:
logger.error(f"Backup load: {e}")
scheduler.add_job(
do_tick, "interval", seconds=30,
id="tick", max_instances=1, coalesce=True,
)
scheduler.add_job(
ping_worker, "interval", minutes=10,
id="wpng", max_instances=1,
)
scheduler.add_job(
self_ping, "interval", minutes=4,
id="spng", max_instances=1,
)
scheduler.start()
logger.info("⏰ Tick/30s | WorkerPing/10m | SelfPing/4m")
threading.Thread(target=do_tick, daemon=True).start()
yield
scheduler.shutdown(wait=False)
app = FastAPI(
title="🔌 Free Proxy API",
version="3.0.0",
lifespan=lifespan,
)
def _resp(entry, strategy):
if entry is None:
raise HTTPException(503, "No proxy available")
d = entry.to_dict()
d["strategy"] = strategy
return d
@app.get("/")
async def root():
return {
"name": "Free Proxy API",
"pool": pool.size,
"worker": orch._worker_ok,
"phase": orch._phase.value,
"docs": "/docs",
}
@app.get("/health")
async def health():
return {
"status": "ok" if pool.size > 0 else "warming",
"pool": pool.size,
"worker": orch._worker_ok,
}
@app.get("/stats")
async def stats():
s = pool.get_stats()
s["orchestrator"] = orch.status
return s
@app.get("/proxy")
async def get_proxy(
protocol: str = Query(None),
verified: bool = Query(False),
strategy: str = Query("round-robin"),
):
funcs = {
"round-robin": pool.get_round_robin,
"random": pool.get_random,
"fastest": pool.get_fastest,
"least-used": pool.get_least_used,
}
f = funcs.get(strategy)
if not f:
raise HTTPException(400, "Bad strategy")
return _resp(f(protocol, verified), strategy)
@app.get("/proxy/random")
async def rand(protocol: str = Query(None), verified: bool = Query(False)):
return _resp(pool.get_random(protocol, verified), "random")
@app.get("/proxy/best")
async def best(protocol: str = Query(None), verified: bool = Query(False)):
return _resp(pool.get_fastest(protocol, verified), "fastest")
@app.get("/proxy/least")
async def least(protocol: str = Query(None), verified: bool = Query(False)):
return _resp(pool.get_least_used(protocol, verified), "least-used")
@app.get("/rotate")
async def rotate(
count: int = Query(5, ge=1, le=100),
protocol: str = Query(None),
verified: bool = Query(False),
):
results, seen = [], set()
for _ in range(count * 3):
p = pool.get_round_robin(protocol, verified)
if not p:
break
if p.proxy_url not in seen:
seen.add(p.proxy_url)
results.append(p.to_dict())
if len(results) >= count:
break
if not results:
raise HTTPException(503, "No proxies")
return {"count": len(results), "proxies": results}
@app.get("/all")
async def get_all(
protocol: str = Query(None),
verified: bool = Query(False),
limit: int = Query(200, le=1000),
):
px = pool.get_all(protocol, verified, limit)
return {"count": len(px), "proxies": px}
@app.get("/plain")
async def plain(
protocol: str = Query(None),
verified: bool = Query(False),
limit: int = Query(200, le=1000),
):
px = pool.get_all(protocol, verified, limit)
return PlainTextResponse("\n".join(p["proxy_url"] for p in px))
@app.post("/feedback")
async def feedback(proxy_url: str = Query(...), success: bool = Query(...)):
if success:
pool.report_success(proxy_url)
else:
pool.report_failure(proxy_url)
return {"ok": True}
@app.post("/force-check")
async def force():
if orch._phase.value != "idle":
return {"status": orch._phase.value}
threading.Thread(target=do_tick, daemon=True).start()
return {"status": "started"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)