from fastapi import FastAPI, Query, Request from fastapi.responses import JSONResponse, FileResponse, Response, StreamingResponse from playwright.async_api import async_playwright import os, uuid, shutil, asyncio, time, stat from collections import defaultdict app = FastAPI( title="BRAT GEN - API", description="API for generating BRAT-style text images & videos.", version="1.0.0", ) def _ensure_dir(path: str) -> str: """ Ensures the directory can be created and written to. If it fails, falls back to /tmp. Sets 0777 permissions for safety in non-root containers. """ try: os.makedirs(path, exist_ok=True) os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0o777 return path except Exception: fallback = os.path.join("/tmp", os.path.basename(path)) os.makedirs(fallback, exist_ok=True) os.chmod(fallback, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) return fallback OUTPUT_DIR = _ensure_dir(os.environ.get("OUTPUT_DIR", os.path.join(os.getcwd(), "output"))) TMP_DIR = _ensure_dir(os.environ.get("TMP_DIR", os.path.join(os.getcwd(), "tmp_brat"))) REQUEST_LIMIT = 10 # Max requests allowed TIME_WINDOW = 60 # Time window in seconds (1 minute) BAN_DURATION = 300 # Ban duration in seconds (5 minutes) request_logs: dict[str, list[float]] = defaultdict(list) banned_ips: dict[str, float] = {} def get_client_ip(request: Request) -> str: cf = request.headers.get("CF-Connecting-IP") if cf: return cf.strip() xff = request.headers.get("X-Forwarded-For") if xff: return xff.split(",")[0].strip() return (request.client.host or "").strip() @app.middleware("http") async def anti_ddos_middleware(request: Request, call_next): ip = get_client_ip(request) now = time.time() if ip in banned_ips: if now < banned_ips[ip]: return JSONResponse( status_code=429, content={"error": "IP is blocked for 5 minutes due to too many requests."}, ) else: del banned_ips[ip] window = [ts for ts in request_logs[ip] if now - ts < TIME_WINDOW] window.append(now) request_logs[ip] = window if len(request_logs[ip]) > REQUEST_LIMIT: banned_ips[ip] = now + BAN_DURATION return JSONResponse( status_code=429, content={"error": "Too many requests. IP is blocked for 5 minutes."}, ) return await call_next(request) async def delete_file_after_delay(filepath: str, delay: int = 600): await asyncio.sleep(delay) try: if os.path.exists(filepath): os.remove(filepath) except Exception: pass @app.get("/api/brat", tags=["MAIN"], summary="Generate BRAT text image") async def generate_brat( request: Request, text: str = Query(..., description="Text to be inserted into the BRAT image"), background: str | None = Query(None, description="Background color (e.g., #000000)"), color: str | None = Query(None, description="Text color (e.g., #FFFFFF)"), ): text = (text or "").strip() if not text: return JSONResponse(status_code=400, content={"error": "Text cannot be empty."}) try: async with async_playwright() as p: browser = await p.chromium.launch(args=["--no-sandbox"]) context = await browser.new_context(viewport={"width": 1536, "height": 695}) page = await context.new_page() await page.goto("https://www.bratgenerator.com/", wait_until="domcontentloaded") try: await page.click("text=Accept", timeout=3000) except Exception: pass await page.click("#toggleButtonWhite") await page.click("#textOverlay") await page.click("#textInput") await page.fill("#textInput", text) await page.evaluate( """(data) => { if (data.background) $('.node__content.clearfix').css('background-color', data.background); if (data.color) $('.textFitted').css('color', data.color); }""", {"background": background, "color": color}, ) await asyncio.sleep(0.5) element = await page.query_selector("#textOverlay") if not element: await browser.close() return JSONResponse(status_code=500, content={"error": "Target element not found."}) box = await element.bounding_box() if not box: await browser.close() return JSONResponse(status_code=500, content={"error": "Failed to read element bounding box."}) filename = f"purrbits-{uuid.uuid4().hex[:8]}.png" filepath = os.path.join(OUTPUT_DIR, filename) screenshot = await page.screenshot( clip={"x": box["x"], "y": box["y"], "width": 500, "height": 440} ) with open(filepath, "wb") as f: f.write(screenshot) await context.close() await browser.close() asyncio.create_task(delete_file_after_delay(filepath)) base_url = str(request.base_url).rstrip("/") return {"status": 200, "URL": f"{base_url}/download/file/{filename}"} except Exception as e: return JSONResponse(status_code=500, content={"error": f"Failed to generate image: {str(e)}"}) @app.get("/api/bratvid", tags=["MAIN"], summary="Create animated video from BRAT text") async def generate_brat_video( request: Request, text: str = Query(..., description="Sentence to be animated (space-separated)"), background: str | None = Query(None, description="Background color (e.g., #000000)"), color: str | None = Query(None, description="Text color (e.g., #FFFFFF)"), ): text = (text or "").strip() if not text: return JSONResponse(status_code=400, content={"error": "Text cannot be empty."}) words = text.split() if not words: return JSONResponse(status_code=400, content={"error": "Text must contain at least one word."}) temp_dir = _ensure_dir(os.path.join(TMP_DIR, str(uuid.uuid4()))) try: async with async_playwright() as p: browser = await p.chromium.launch(args=["--no-sandbox"]) context = await browser.new_context(viewport={"width": 1536, "height": 695}) page = await context.new_page() await page.goto("https://www.bratgenerator.com/", wait_until="domcontentloaded") try: await page.click("text=Accept", timeout=3000) except Exception: pass await page.click("#toggleButtonWhite") await page.click("#textOverlay") await page.click("#textInput") for i in range(len(words)): partial_text = " ".join(words[: i + 1]) await page.fill("#textInput", partial_text) await page.evaluate( """(data) => { if (data.background) $('.node__content.clearfix').css('background-color', data.background); if (data.color) $('.textFitted').css('color', data.color); }""", {"background": background, "color": color}, ) await asyncio.sleep(0.2) element = await page.query_selector("#textOverlay") if not element: await context.close() await browser.close() shutil.rmtree(temp_dir, ignore_errors=True) return JSONResponse(status_code=500, content={"error": "Target element not found."}) box = await element.bounding_box() if not box: await context.close() await browser.close() shutil.rmtree(temp_dir, ignore_errors=True) return JSONResponse(status_code=500, content={"error": "Failed to read element bounding box."}) screenshot = await page.screenshot( clip={"x": box["x"], "y": box["y"], "width": 500, "height": 440} ) frame_path = os.path.join(temp_dir, f"frame{i:03d}.png") with open(frame_path, "wb") as f: f.write(screenshot) await context.close() await browser.close() # Render video with ffmpeg output_filename = f"purrbits-{uuid.uuid4().hex[:8]}.mp4" output_path = os.path.join(OUTPUT_DIR, output_filename) ffmpeg_cmd = [ "ffmpeg", "-y", "-framerate", "1.428", "-i", os.path.join(temp_dir, "frame%03d.png"), "-vf", "scale=trunc(iw/2)*2:trunc(ih/2)*2,fps=30", "-c:v", "libx264", "-preset", "ultrafast", "-pix_fmt", "yuv420p", output_path, ] process = await asyncio.create_subprocess_exec( *ffmpeg_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) _, stderr = await process.communicate() if process.returncode != 0: shutil.rmtree(temp_dir, ignore_errors=True) return JSONResponse(status_code=500, content={"error": stderr.decode()}) asyncio.create_task(delete_file_after_delay(output_path)) base_url = str(request.base_url).rstrip("/") return {"status": 200, "URL": f"{base_url}/download/file/{output_filename}"} except Exception as e: return JSONResponse(status_code=500, content={"error": str(e)}) finally: shutil.rmtree(temp_dir, ignore_errors=True) @app.get("/", summary="Root Endpoint", tags=["MISC"], description="Displaying the main page.") async def root(): # take absolute path based on main.py file location html_path = os.path.join(os.path.dirname(__file__), "index.html") if os.path.exists(html_path): return FileResponse(html_path) return JSONResponse(status_code=404, content={"error": "index.html file not found"}) @app.get("/download/file/{filename}", tags=["MISC"]) async def download_file(filename: str): filepath = os.path.join(OUTPUT_DIR, filename) if not os.path.exists(filepath): return JSONResponse(status_code=404, content={"error": "File not found"}) with open(filepath, "rb") as f: data = f.read() return Response(data, media_type="application/octet-stream")