| import asyncio |
| import time |
|
|
| import aiohttp |
| from fastapi import FastAPI |
|
|
| TARGET_URL = "https://dontspam.this.superhot.pics/vote" |
| CONCURRENT_REQUESTS = 200 |
| TOTAL_REQUESTS = 1000000 |
|
|
| HEADERS = { |
| "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:145.0) Gecko/20100101 Firefox/145.0", |
| "Accept": "application/json", |
| "Referer": "https://america.remy.cam/", |
| "Origin": "https://america.remy.cam", |
| "Sec-GPC": "1", |
| "Sec-Fetch-Dest": "empty", |
| "Sec-Fetch-Mode": "cors", |
| "Sec-Fetch-Site": "cross-site", |
| } |
|
|
| success = 0 |
| failed = 0 |
|
|
|
|
| async def send_vote(session, semaphore): |
| global success, failed |
|
|
| async with semaphore: |
| try: |
| async with session.post(TARGET_URL, headers=HEADERS, timeout=5) as resp: |
| if resp.status == 200: |
| success += 1 |
| else: |
| failed += 1 |
| except: |
| failed += 1 |
|
|
| if (success + failed) % 1000 == 0: |
| print( |
| f"Progress: {success + failed} | Success: {success} | Failed: {failed}" |
| ) |
|
|
|
|
| async def main(): |
| semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS) |
|
|
| async with aiohttp.ClientSession() as session: |
| tasks = [send_vote(session, semaphore) for _ in range(TOTAL_REQUESTS)] |
| await asyncio.gather(*tasks) |
|
|
|
|
| app = FastAPI() |
|
|
|
|
| @app.get("/spam") |
| def spam(): |
| start = time.time() |
| asyncio.run(main()) |
| return { |
| "Hello": f"\nFini en {time.time() - start:.2f}s | Succès: {success} | Échecs: {failed}" |
| } |
|
|
|
|
| @app.get("/") |
| def greet_json(): |
| return {"success": success, "failed": failed} |
|
|