import os import logging import threading from contextlib import asynccontextmanager from fastapi import FastAPI, Query, HTTPException from fastapi.responses import JSONResponse, PlainTextResponse from apscheduler.schedulers.background import BackgroundScheduler logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger("api") from proxy_pool import ProxyPool from orchestrator import Orchestrator WORKER_URL = os.environ.get( "WORKER_URL", "https://iptest-u2uk.onrender.com" ) pool = ProxyPool() orch = Orchestrator(pool, WORKER_URL) scheduler = BackgroundScheduler(daemon=True) def do_tick(): try: orch.tick() except Exception as e: logger.error(f"Tick: {e}") def ping_worker(): orch.ping_worker() def self_ping(): host = os.environ.get("SPACE_HOST", "") if not host: return try: import requests requests.get(f"https://{host}/health", timeout=5) except Exception: pass @asynccontextmanager async def lifespan(app: FastAPI): logger.info("🚀 Proxy API v3") logger.info(f" Worker: {WORKER_URL}") # charger backup GitHub au démarrage try: from github_store import load_from_github saved = load_from_github() if saved: pool.import_json(saved) logger.info(f"📂 Loaded {pool.size} proxies from GitHub") except Exception as e: logger.error(f"Backup load: {e}") scheduler.add_job( do_tick, "interval", seconds=30, id="tick", max_instances=1, coalesce=True, ) scheduler.add_job( ping_worker, "interval", minutes=10, id="wpng", max_instances=1, ) scheduler.add_job( self_ping, "interval", minutes=4, id="spng", max_instances=1, ) scheduler.start() logger.info("⏰ Tick/30s | WorkerPing/10m | SelfPing/4m") threading.Thread(target=do_tick, daemon=True).start() yield scheduler.shutdown(wait=False) app = FastAPI( title="🔌 Free Proxy API", version="3.0.0", lifespan=lifespan, ) def _resp(entry, strategy): if entry is None: raise HTTPException(503, "No proxy available") d = entry.to_dict() d["strategy"] = strategy return d @app.get("/") async def root(): return { "name": "Free Proxy API", "pool": pool.size, "worker": orch._worker_ok, "phase": orch._phase.value, "docs": "/docs", } @app.get("/health") async def health(): return { "status": "ok" if pool.size > 0 else "warming", "pool": pool.size, "worker": orch._worker_ok, } @app.get("/stats") async def stats(): s = pool.get_stats() s["orchestrator"] = orch.status return s @app.get("/proxy") async def get_proxy( protocol: str = Query(None), verified: bool = Query(False), strategy: str = Query("round-robin"), ): funcs = { "round-robin": pool.get_round_robin, "random": pool.get_random, "fastest": pool.get_fastest, "least-used": pool.get_least_used, } f = funcs.get(strategy) if not f: raise HTTPException(400, "Bad strategy") return _resp(f(protocol, verified), strategy) @app.get("/proxy/random") async def rand(protocol: str = Query(None), verified: bool = Query(False)): return _resp(pool.get_random(protocol, verified), "random") @app.get("/proxy/best") async def best(protocol: str = Query(None), verified: bool = Query(False)): return _resp(pool.get_fastest(protocol, verified), "fastest") @app.get("/proxy/least") async def least(protocol: str = Query(None), verified: bool = Query(False)): return _resp(pool.get_least_used(protocol, verified), "least-used") @app.get("/rotate") async def rotate( count: int = Query(5, ge=1, le=100), protocol: str = Query(None), verified: bool = Query(False), ): results, seen = [], set() for _ in range(count * 3): p = pool.get_round_robin(protocol, verified) if not p: break if p.proxy_url not in seen: seen.add(p.proxy_url) results.append(p.to_dict()) if len(results) >= count: break if not results: raise HTTPException(503, "No proxies") return {"count": len(results), "proxies": results} @app.get("/all") async def get_all( protocol: str = Query(None), verified: bool = Query(False), limit: int = Query(200, le=1000), ): px = pool.get_all(protocol, verified, limit) return {"count": len(px), "proxies": px} @app.get("/plain") async def plain( protocol: str = Query(None), verified: bool = Query(False), limit: int = Query(200, le=1000), ): px = pool.get_all(protocol, verified, limit) return PlainTextResponse("\n".join(p["proxy_url"] for p in px)) @app.post("/feedback") async def feedback(proxy_url: str = Query(...), success: bool = Query(...)): if success: pool.report_success(proxy_url) else: pool.report_failure(proxy_url) return {"ok": True} @app.post("/force-check") async def force(): if orch._phase.value != "idle": return {"status": orch._phase.value} threading.Thread(target=do_tick, daemon=True).start() return {"status": "started"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)