from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse, PlainTextResponse, HTMLResponse from fastapi.middleware.cors import CORSMiddleware import httpx import json import markdown import asyncio from typing import Optional, Dict, Any import logging from urllib.parse import urljoin, urlparse import os # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI( title="Universal Proxy Server", description="Proxy server yang mendukung GET/POST dengan response JSON, Markdown, dan HTML", version="1.0.0" ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Global HTTP client client = httpx.AsyncClient(timeout=30.0) def is_valid_url(url: str) -> bool: """Validasi URL""" try: result = urlparse(url) return all([result.scheme, result.netloc]) except Exception: return False def convert_to_markdown(data: Any) -> str: """Convert data to markdown format""" if isinstance(data, dict): md_content = "# Response Data\n\n" for key, value in data.items(): md_content += f"## {key}\n\n" if isinstance(value, (dict, list)): md_content += f"```json\n{json.dumps(value, indent=2)}\n```\n\n" else: md_content += f"{value}\n\n" return md_content elif isinstance(data, list): md_content = "# Response Data\n\n" for i, item in enumerate(data): md_content += f"## Item {i+1}\n\n" if isinstance(item, (dict, list)): md_content += f"```json\n{json.dumps(item, indent=2)}\n```\n\n" else: md_content += f"{item}\n\n" return md_content else: return f"# Response\n\n{str(data)}" def convert_to_html(data: Any) -> str: """Convert data to HTML format""" html_template = """ Proxy Response

Proxy Response

{content}
""" if isinstance(data, (dict, list)): content = json.dumps(data, indent=2) else: content = str(data) return html_template.format(content=content) @app.get("/") async def root(): """Root endpoint dengan dokumentasi""" return { "message": "Universal Proxy Server", "endpoints": { "GET": "/proxy/get?url=&format=", "POST": "/proxy/post?url=&format=", "Health": "/health", "Ping": "/ping (GET/HEAD)", "HEAD Health": "/health (HEAD)" }, "formats": ["json", "markdown", "html"], "example": "/proxy/get?url=https://jsonplaceholder.typicode.com/posts/1&format=json" } @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "service": "proxy-server"} @app.head("/health") async def health_check_head(): """Health check endpoint for HEAD requests (Uptime Robot compatible)""" return PlainTextResponse("OK", status_code=200) @app.head("/ping") async def ping_head(): """Ping endpoint for HEAD requests (Uptime Robot monitoring)""" return PlainTextResponse("OK", status_code=200) @app.get("/ping") async def ping(): """Ping endpoint for uptime monitoring""" return { "status": "alive", "service": "proxy-server", "timestamp": asyncio.get_event_loop().time(), "uptime": "running" } @app.get("/proxy/get") async def proxy_get( url: str, format: str = "json", headers: Optional[str] = None ): """ Proxy GET request Args: url: Target URL format: Response format (json, markdown, html) headers: Optional headers as JSON string """ if not is_valid_url(url): raise HTTPException(status_code=400, detail="Invalid URL provided") try: # Parse headers if provided request_headers = {} if headers: try: request_headers = json.loads(headers) except json.JSONDecodeError: logger.warning("Invalid headers format, using default") # Add default headers request_headers.setdefault("User-Agent", "Universal-Proxy-Server/1.0") # Make request logger.info(f"Making GET request to: {url}") response = await client.get(url, headers=request_headers) # Parse response try: data = response.json() except Exception: data = {"content": response.text, "status_code": response.status_code} # Return based on format if format.lower() == "markdown": md_content = convert_to_markdown(data) return PlainTextResponse(md_content, media_type="text/markdown") elif format.lower() == "html": html_content = convert_to_html(data) return HTMLResponse(html_content) else: return JSONResponse({ "success": True, "status_code": response.status_code, "data": data, "headers": dict(response.headers) }) except httpx.RequestError as e: logger.error(f"Request error: {e}") raise HTTPException(status_code=500, detail=f"Request failed: {str(e)}") except Exception as e: logger.error(f"Unexpected error: {e}") raise HTTPException(status_code=500, detail=f"Internal error: {str(e)}") @app.post("/proxy/post") async def proxy_post( request: Request, url: str, format: str = "json", headers: Optional[str] = None ): """ Proxy POST request Args: url: Target URL format: Response format (json, markdown, html) headers: Optional headers as JSON string """ if not is_valid_url(url): raise HTTPException(status_code=400, detail="Invalid URL provided") try: # Get request body body = await request.body() # Parse headers if provided request_headers = {} if headers: try: request_headers = json.loads(headers) except json.JSONDecodeError: logger.warning("Invalid headers format, using default") # Add default headers request_headers.setdefault("User-Agent", "Universal-Proxy-Server/1.0") request_headers.setdefault("Content-Type", "application/json") # Make request logger.info(f"Making POST request to: {url}") response = await client.post(url, content=body, headers=request_headers) # Parse response try: data = response.json() except Exception: data = {"content": response.text, "status_code": response.status_code} # Return based on format if format.lower() == "markdown": md_content = convert_to_markdown(data) return PlainTextResponse(md_content, media_type="text/markdown") elif format.lower() == "html": html_content = convert_to_html(data) return HTMLResponse(html_content) else: return JSONResponse({ "success": True, "status_code": response.status_code, "data": data, "headers": dict(response.headers) }) except httpx.RequestError as e: logger.error(f"Request error: {e}") raise HTTPException(status_code=500, detail=f"Request failed: {str(e)}") except Exception as e: logger.error(f"Unexpected error: {e}") raise HTTPException(status_code=500, detail=f"Internal error: {str(e)}") @app.on_event("shutdown") async def shutdown_event(): """Cleanup on shutdown""" await client.aclose() if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port)