truthtaicom's picture
Upload folder using huggingface_hub
4b0794d verified
import asyncio
import json
import mimetypes
import re
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
from zoneinfo import ZoneInfo
import httpx
import validators
from langflow.base.curl.parse import parse_context
from langflow.custom import Component
from langflow.io import BoolInput, DataInput, DropdownInput, IntInput, MessageTextInput, NestedDictInput, Output
from langflow.schema import Data
from langflow.schema.dotdict import dotdict
class APIRequestComponent(Component):
display_name = "API Request"
description = (
"This component allows you to make HTTP requests to one or more URLs. "
"You can provide headers and body as either dictionaries or Data objects. "
"Additionally, you can append query parameters to the URLs.\n\n"
"**Note:** Check advanced options for more settings."
)
icon = "Globe"
name = "APIRequest"
inputs = [
MessageTextInput(
name="urls",
display_name="URLs",
list=True,
info="Enter one or more URLs, separated by commas.",
),
MessageTextInput(
name="curl",
display_name="cURL",
info="Paste a curl command to populate the fields. "
"This will fill in the dictionary fields for headers and body.",
advanced=False,
refresh_button=True,
real_time_refresh=True,
tool_mode=True,
),
DropdownInput(
name="method",
display_name="Method",
options=["GET", "POST", "PATCH", "PUT"],
value="GET",
info="The HTTP method to use (GET, POST, PATCH, PUT).",
),
NestedDictInput(
name="headers",
display_name="Headers",
info="The headers to send with the request as a dictionary. This is populated when using the CURL field.",
input_types=["Data"],
),
NestedDictInput(
name="body",
display_name="Body",
info="The body to send with the request as a dictionary (for POST, PATCH, PUT). "
"This is populated when using the CURL field.",
input_types=["Data"],
),
DataInput(
name="query_params",
display_name="Query Parameters",
info="The query parameters to append to the URL.",
tool_mode=True,
),
IntInput(
name="timeout",
display_name="Timeout",
value=5,
info="The timeout to use for the request.",
),
BoolInput(
name="follow_redirects",
display_name="Follow Redirects",
value=True,
info="Whether to follow http redirects.",
advanced=True,
),
BoolInput(
name="save_to_file",
display_name="Save to File",
value=False,
info="Save the API response to a temporary file",
advanced=True,
),
BoolInput(
name="include_httpx_metadata",
display_name="Include HTTPx Metadata",
value=False,
info=(
"Include properties such as headers, status_code, response_headers, "
"and redirection_history in the output."
),
advanced=True,
),
]
outputs = [
Output(display_name="Data", name="data", method="make_requests"),
]
def parse_curl(self, curl: str, build_config: dotdict) -> dotdict:
try:
parsed = parse_context(curl)
build_config["urls"]["value"] = [parsed.url]
build_config["method"]["value"] = parsed.method.upper()
build_config["headers"]["value"] = dict(parsed.headers)
if parsed.data:
try:
json_data = json.loads(parsed.data)
build_config["body"]["value"] = json_data
except json.JSONDecodeError:
self.log("Error decoding JSON data")
else:
build_config["body"]["value"] = {}
except Exception as exc:
msg = f"Error parsing curl: {exc}"
self.log(msg)
raise ValueError(msg) from exc
return build_config
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
if field_name == "curl" and field_value:
build_config = self.parse_curl(field_value, build_config)
return build_config
async def make_request(
self,
client: httpx.AsyncClient,
method: str,
url: str,
headers: dict | None = None,
body: dict | None = None,
timeout: int = 5,
*,
follow_redirects: bool = True,
save_to_file: bool = False,
include_httpx_metadata: bool = False,
) -> Data:
method = method.upper()
if method not in {"GET", "POST", "PATCH", "PUT", "DELETE"}:
msg = f"Unsupported method: {method}"
raise ValueError(msg)
if isinstance(body, str) and body:
try:
body = json.loads(body)
except Exception as e:
msg = f"Error decoding JSON data: {e}"
self.log.exception(msg)
body = None
raise ValueError(msg) from e
data = body or None
redirection_history = []
try:
response = await client.request(
method,
url,
headers=headers,
json=data,
timeout=timeout,
follow_redirects=follow_redirects,
)
redirection_history = [
{"url": str(redirect.url), "status_code": redirect.status_code} for redirect in response.history
]
if response.is_redirect:
redirection_history.append({"url": str(response.url), "status_code": response.status_code})
is_binary, file_path = self._response_info(response, with_file_path=save_to_file)
response_headers = self._headers_to_dict(response.headers)
metadata: dict[str, Any] = {
"source": url,
}
if save_to_file:
mode = "wb" if is_binary else "w"
encoding = response.encoding if mode == "w" else None
if file_path:
with file_path.open(mode, encoding=encoding) as f:
f.write(response.content if is_binary else response.text)
if include_httpx_metadata:
metadata.update(
{
"file_path": str(file_path),
"headers": headers,
"status_code": response.status_code,
"response_headers": response_headers,
**({"redirection_history": redirection_history} if redirection_history else {}),
}
)
return Data(data=metadata)
# Populate result when not saving to a file
if is_binary:
result = response.content
else:
try:
result = response.json()
except Exception: # noqa: BLE001
self.log("Error decoding JSON response")
result = response.text.encode("utf-8")
# Add result to metadata
metadata.update({"result": result})
# Add metadata to the output
if include_httpx_metadata:
metadata.update(
{
"headers": headers,
"status_code": response.status_code,
"response_headers": response_headers,
**({"redirection_history": redirection_history} if redirection_history else {}),
}
)
return Data(data=metadata)
except httpx.TimeoutException:
return Data(
data={
"source": url,
"headers": headers,
"status_code": 408,
"error": "Request timed out",
},
)
except Exception as exc: # noqa: BLE001
self.log(f"Error making request to {url}")
return Data(
data={
"source": url,
"headers": headers,
"status_code": 500,
"error": str(exc),
**({"redirection_history": redirection_history} if redirection_history else {}),
},
)
def add_query_params(self, url: str, params: dict) -> str:
url_parts = list(urlparse(url))
query = dict(parse_qsl(url_parts[4]))
query.update(params)
url_parts[4] = urlencode(query)
return urlunparse(url_parts)
async def make_requests(self) -> list[Data]:
method = self.method
urls = [url.strip() for url in self.urls if url.strip()]
curl = self.curl
headers = self.headers or {}
body = self.body or {}
timeout = self.timeout
follow_redirects = self.follow_redirects
save_to_file = self.save_to_file
include_httpx_metadata = self.include_httpx_metadata
invalid_urls = [url for url in urls if not validators.url(url)]
if invalid_urls:
msg = f"Invalid URLs provided: {invalid_urls}"
raise ValueError(msg)
if isinstance(self.query_params, str):
query_params = dict(parse_qsl(self.query_params))
else:
query_params = self.query_params.data if self.query_params else {}
if curl:
self._build_config = self.parse_curl(curl, dotdict())
if isinstance(headers, Data):
headers = headers.data
if isinstance(body, Data):
body = body.data
bodies = [body] * len(urls)
urls = [self.add_query_params(url, query_params) for url in urls]
async with httpx.AsyncClient() as client:
results = await asyncio.gather(
*[
self.make_request(
client,
method,
u,
headers,
rec,
timeout,
follow_redirects=follow_redirects,
save_to_file=save_to_file,
include_httpx_metadata=include_httpx_metadata,
)
for u, rec in zip(urls, bodies, strict=True)
]
)
self.status = results
return results
def _response_info(self, response: httpx.Response, *, with_file_path: bool = False) -> tuple[bool, Path | None]:
"""Determine the file path and whether the response content is binary.
Args:
response (Response): The HTTP response object.
with_file_path (bool): Whether to save the response content to a file.
Returns:
Tuple[bool, Path | None]:
A tuple containing a boolean indicating if the content is binary and the full file path (if applicable).
"""
# Determine if the content is binary
content_type = response.headers.get("Content-Type", "")
is_binary = "application/octet-stream" in content_type or "application/binary" in content_type
if not with_file_path:
return is_binary, None
# Step 1: Set up a subdirectory for the component in the OS temp directory
component_temp_dir = Path(tempfile.gettempdir()) / self.__class__.__name__
component_temp_dir.mkdir(parents=True, exist_ok=True)
# Step 2: Extract filename from Content-Disposition
filename = None
if "Content-Disposition" in response.headers:
content_disposition = response.headers["Content-Disposition"]
filename_match = re.search(r'filename="(.+?)"', content_disposition)
if not filename_match: # Try to match RFC 5987 style
filename_match = re.search(r"filename\*=(?:UTF-8'')?(.+)", content_disposition)
if filename_match:
extracted_filename = filename_match.group(1)
# Ensure the filename is unique
if (component_temp_dir / extracted_filename).exists():
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f")
filename = f"{timestamp}-{extracted_filename}"
else:
filename = extracted_filename
# Step 3: Infer file extension or use part of the request URL if no filename
if not filename:
# Extract the last segment of the URL path
url_path = urlparse(str(response.request.url)).path
base_name = Path(url_path).name # Get the last segment of the path
if not base_name: # If the path ends with a slash or is empty
base_name = "response"
# Infer file extension
extension = mimetypes.guess_extension(content_type.split(";")[0]) if content_type else None
if not extension:
extension = ".bin" if is_binary else ".txt" # Default extensions
# Combine the base name with timestamp and extension
timestamp = datetime.now(ZoneInfo("UTC")).strftime("%Y%m%d%H%M%S%f")
filename = f"{timestamp}-{base_name}{extension}"
# Step 4: Define the full file path
file_path = component_temp_dir / filename
return is_binary, file_path
def _headers_to_dict(self, headers: httpx.Headers) -> dict[str, str]:
"""Convert HTTP headers to a dictionary with lowercased keys."""
return {k.lower(): v for k, v in headers.items()}