Spaces:
Sleeping
Sleeping
File size: 5,410 Bytes
36dd04b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
"""
HTTP request handling features
"""
import json
import logging
from datetime import datetime
from browser.driver import get_driver, cleanup_driver
from config.settings import MAX_REQUEST_HISTORY
logger = logging.getLogger(__name__)
# Request history storage
_request_history = []
def make_http_request(url: str, method: str = "GET", headers: str = "", data: str = "", use_persistent: bool = False) -> str:
"""Make HTTP request with custom method, headers, and data (like Postman)"""
global _request_history
driver = None
try:
driver = get_driver(None, use_persistent) # Don't navigate yet
# Parse headers if provided
headers_dict = {}
if headers:
try:
headers_dict = json.loads(headers)
except:
# Try to parse as key:value lines
for line in headers.strip().split('\n'):
if ':' in line:
key, value = line.split(':', 1)
headers_dict[key.strip()] = value.strip()
# Build the JavaScript to make the request
js_code = f"""
async function makeRequest() {{
const options = {{
method: '{method}',
headers: {json.dumps(headers_dict)},
}};
if (['{method}'].includes('POST') || ['{method}'].includes('PUT') || ['{method}'].includes('PATCH')) {{
options.body = {json.dumps(data) if data else '""'};
}}
try {{
const startTime = performance.now();
const response = await fetch('{url}', options);
const endTime = performance.now();
const responseHeaders = {{}};
response.headers.forEach((value, key) => {{
responseHeaders[key] = value;
}});
let responseBody;
const contentType = response.headers.get('content-type');
if (contentType && contentType.includes('application/json')) {{
responseBody = await response.json();
}} else {{
responseBody = await response.text();
}}
return {{
status: response.status,
statusText: response.statusText,
headers: responseHeaders,
body: responseBody,
url: response.url,
type: response.type,
redirected: response.redirected,
responseTime: Math.round(endTime - startTime)
}};
}} catch (error) {{
return {{
error: error.message,
type: 'NetworkError'
}};
}}
}}
return makeRequest();
"""
# Navigate to a blank page first to avoid CORS issues
driver.get("about:blank")
# Execute the request
result = driver.execute_script(js_code)
# If result is a promise, wait for it
if isinstance(result, dict) and 'then' in str(type(result)):
result = driver.execute_async_script("""
const callback = arguments[arguments.length - 1];
arguments[0].then(callback).catch(e => callback({error: e.toString()}));
""", result)
# Save to history
_request_history.append({
"timestamp": datetime.now().isoformat(),
"method": method,
"url": url,
"headers": headers_dict,
"data": data,
"response": result
})
# Keep only last N requests
if len(_request_history) > MAX_REQUEST_HISTORY:
_request_history = _request_history[-MAX_REQUEST_HISTORY:]
return json.dumps(result, indent=2, default=str)
except Exception as e:
logger.error(f"Error in make_http_request: {e}")
return f"Error: {e}"
finally:
cleanup_driver(driver, use_persistent)
def export_as_curl(method: str, url: str, headers: str = "", data: str = "") -> str:
"""Convert HTTP request to cURL command"""
try:
curl_cmd = f"curl -X {method}"
# Add headers
if headers:
headers_dict = {}
try:
headers_dict = json.loads(headers)
except:
for line in headers.strip().split('\n'):
if ':' in line:
key, value = line.split(':', 1)
headers_dict[key.strip()] = value.strip()
for key, value in headers_dict.items():
curl_cmd += f' -H "{key}: {value}"'
# Add data
if data and method in ['POST', 'PUT', 'PATCH']:
curl_cmd += f" -d '{data}'"
# Add URL
curl_cmd += f' "{url}"'
return curl_cmd
except Exception as e:
return f"Error generating cURL command: {e}"
def get_request_history() -> str:
"""Get history of HTTP requests"""
if not _request_history:
return "No requests in history"
return json.dumps(_request_history, indent=2, default=str) |