import asyncio import time import os from contextlib import asynccontextmanager from fastapi import FastAPI, Request, HTTPException, status from fastapi.responses import JSONResponse # গ্লোবাল ভেরিয়েবল সেটআপ MAX_CONCURRENT_REQUESTS = 100 # আপনার "Unlimited" সিস্টেমকে কন্ট্রোল করার জন্য লিমিট request_semaphore = None @asynccontextmanager async def lifespan(app: FastAPI): """ অ্যাপ্লিকেশন স্টার্টআপ এবং শাটডাউন ইভেন্ট হ্যান্ডলার। এখানে আমরা সেমাফোর ইনিশিয়ালাইজ করছি যাতে একসাথে অতিরিক্ত রিকোয়েস্ট না আসে। """ global request_semaphore request_semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) print("🚀 FastAPI Server Starting up... Protection Mechanisms Enabled.") yield print("🛑 FastAPI Server Shutting down gracefully...") # FastAPI ইনস্ট্যান্স তৈরি app = FastAPI( title="High-Performance Protected API", description="Designed for High-Load Shared Environments", version="1.0.0", lifespan=lifespan ) @app.middleware("http") async def limit_concurrency_and_timeout(request: Request, call_next): """ এই মিডলওয়্যারটি দুটি কাজ করে: ১. কনকারেন্সি লিমিট চেক করে (Semaphore) ২. রিকোয়েস্ট প্রসেস হতে কত সময় লাগছে তা মাপে """ start_time = time.time() # সেমাফোর দিয়ে কনকারেন্সি কন্ট্রোল try: async with request_semaphore: # যদি সার্ভার স্লো থাকে, তবে একটি টাইমআউট মেকানিজম (যেমন ৫ সেকেন্ড) # এখানে asyncio.wait_for ব্যবহার করা যেতে পারে, তবে সাধারণ ব্যবহারের জন্য call_next যথেষ্ট response = await call_next(request) process_time = time.time() - start_time # হেডারে রেসপন্স টাইম যুক্ত করে দেওয়া হলো response.headers["X-Process-Time"] = str(round(process_time, 4)) # যদি রিকোয়েস্ট খুব বেশি সময় নেয় (যেমন ২ সেকেন্ডের বেশি), তাহলে ওয়ার্নিং দিতে পারি (লগিংয়ের জন্য) if process_time > 2.0: print(f"⚠️ Warning: Slow request detected on {request.url.path} ({process_time:.2f}s)") return response except Exception as e: # কোনো কারণে এরর হলে ৫০০ স্ট্যাটাস রিটার্ন করবে return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"detail": "Internal Server Error or Server Overloaded", "error": str(e)} ) def get_system_metrics(): """ সরাসরি লিনাক্স কার্নেল থেকে সিস্টেমের অবস্থা জানার ফাংশন। এটি আপনার ১৩+ লোড এভারেজ ট্র্যাক করতে সাহায্য করবে। """ metrics = {"load_average": None, "memory_mb": {}} try: # Load Average চেক (1 min, 5 min, 15 min) load1, load5, load15 = os.getloadavg() metrics["load_average"] = {"1_min": load1, "5_min": load5, "15_min": load15} # Memory চেক (/proc/meminfo থেকে) with open('/proc/meminfo', 'r') as f: meminfo = {} for line in f.readlines()[:5]: # প্রথম ৫ লাইন যথেষ্ট parts = line.split(':') if len(parts) == 2: key = parts[0].strip() # KB থেকে MB তে কনভার্ট val = int(parts[1].strip().replace(' kB', '')) // 1024 meminfo[key] = val metrics["memory_mb"] = meminfo except Exception: pass return metrics @app.get("/") async def root(): """বেসিক হেলথ এবং ওয়েলকাম এন্ডপয়েন্ট""" return { "message": "Welcome to the Protected FastAPI Server", "status": "Running smoothly despite the ghost load!" } @app.get("/system-health") async def system_health(): """ আপনার কন্টেইনার এবং মেইন হোস্টের বর্তমান স্বাস্থ্য পরীক্ষা করার এন্ডপয়েন্ট। """ metrics = get_system_metrics() # যদি লোড ১৫ এর বেশি হয়, তবে সতর্কবার্তা দেবে is_overloaded = False if metrics["load_average"] and metrics["load_average"]["1_min"] > 15.0: is_overloaded = True return { "status": "warning" if is_overloaded else "healthy", "host_overloaded": is_overloaded, "metrics": metrics, "advice": "If host_overloaded is true, expect slower API response times due to shared context switching." } @app.get("/heavy-task") async def simulate_heavy_task(): """ একটি নন-ব্লকিং টাস্কের উদাহরণ। টাইম ডট স্লিপ (time.sleep) ব্যবহার না করে asyncio.sleep ব্যবহার করা হয়েছে, যাতে হাই কন্টেক্সট সুইচিংয়ের সময় অন্যান্য রিকোয়েস্ট ব্লক না হয়। """ # ৩ সেকেন্ডের একটি এসিনক্রোনাস ডিলে (ডাটাবেস কল বা এপিআই কলের বিকল্প) await asyncio.sleep(3) return {"message": "Heavy task completed successfully without blocking the CPU."}