| import asyncio |
| import time |
| import os |
| from contextlib import asynccontextmanager |
| from fastapi import FastAPI, Request, HTTPException, status |
| from fastapi.responses import JSONResponse |
|
|
| |
| MAX_CONCURRENT_REQUESTS = 100 |
| request_semaphore = None |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """ |
| অ্যাপ্লিকেশন স্টার্টআপ এবং শাটডাউন ইভেন্ট হ্যান্ডলার। |
| এখানে আমরা সেমাফোর ইনিশিয়ালাইজ করছি যাতে একসাথে অতিরিক্ত রিকোয়েস্ট না আসে। |
| """ |
| global request_semaphore |
| request_semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) |
| print("🚀 FastAPI Server Starting up... Protection Mechanisms Enabled.") |
| yield |
| print("🛑 FastAPI Server Shutting down gracefully...") |
|
|
| |
| app = FastAPI( |
| title="High-Performance Protected API", |
| description="Designed for High-Load Shared Environments", |
| version="1.0.0", |
| lifespan=lifespan |
| ) |
|
|
| @app.middleware("http") |
| async def limit_concurrency_and_timeout(request: Request, call_next): |
| """ |
| এই মিডলওয়্যারটি দুটি কাজ করে: |
| ১. কনকারেন্সি লিমিট চেক করে (Semaphore) |
| ২. রিকোয়েস্ট প্রসেস হতে কত সময় লাগছে তা মাপে |
| """ |
| start_time = time.time() |
| |
| |
| try: |
| async with request_semaphore: |
| |
| |
| response = await call_next(request) |
| |
| process_time = time.time() - start_time |
| |
| response.headers["X-Process-Time"] = str(round(process_time, 4)) |
| |
| |
| if process_time > 2.0: |
| print(f"⚠️ Warning: Slow request detected on {request.url.path} ({process_time:.2f}s)") |
| |
| return response |
| |
| except Exception as e: |
| |
| return JSONResponse( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| content={"detail": "Internal Server Error or Server Overloaded", "error": str(e)} |
| ) |
|
|
| def get_system_metrics(): |
| """ |
| সরাসরি লিনাক্স কার্নেল থেকে সিস্টেমের অবস্থা জানার ফাংশন। |
| এটি আপনার ১৩+ লোড এভারেজ ট্র্যাক করতে সাহায্য করবে। |
| """ |
| metrics = {"load_average": None, "memory_mb": {}} |
| try: |
| |
| load1, load5, load15 = os.getloadavg() |
| metrics["load_average"] = {"1_min": load1, "5_min": load5, "15_min": load15} |
| |
| |
| with open('/proc/meminfo', 'r') as f: |
| meminfo = {} |
| for line in f.readlines()[:5]: |
| parts = line.split(':') |
| if len(parts) == 2: |
| key = parts[0].strip() |
| |
| val = int(parts[1].strip().replace(' kB', '')) // 1024 |
| meminfo[key] = val |
| metrics["memory_mb"] = meminfo |
| except Exception: |
| pass |
| |
| return metrics |
|
|
| @app.get("/") |
| async def root(): |
| """বেসিক হেলথ এবং ওয়েলকাম এন্ডপয়েন্ট""" |
| return { |
| "message": "Welcome to the Protected FastAPI Server", |
| "status": "Running smoothly despite the ghost load!" |
| } |
|
|
| @app.get("/system-health") |
| async def system_health(): |
| """ |
| আপনার কন্টেইনার এবং মেইন হোস্টের বর্তমান স্বাস্থ্য পরীক্ষা করার এন্ডপয়েন্ট। |
| """ |
| metrics = get_system_metrics() |
| |
| |
| is_overloaded = False |
| if metrics["load_average"] and metrics["load_average"]["1_min"] > 15.0: |
| is_overloaded = True |
|
|
| return { |
| "status": "warning" if is_overloaded else "healthy", |
| "host_overloaded": is_overloaded, |
| "metrics": metrics, |
| "advice": "If host_overloaded is true, expect slower API response times due to shared context switching." |
| } |
|
|
| @app.get("/heavy-task") |
| async def simulate_heavy_task(): |
| """ |
| একটি নন-ব্লকিং টাস্কের উদাহরণ। |
| টাইম ডট স্লিপ (time.sleep) ব্যবহার না করে asyncio.sleep ব্যবহার করা হয়েছে, |
| যাতে হাই কন্টেক্সট সুইচিংয়ের সময় অন্যান্য রিকোয়েস্ট ব্লক না হয়। |
| """ |
| |
| await asyncio.sleep(3) |
| return {"message": "Heavy task completed successfully without blocking the CPU."} |