|
|
import json |
|
|
from typing import Any |
|
|
from urllib.parse import urljoin |
|
|
|
|
|
import httpx |
|
|
from langchain_core.tools import StructuredTool, ToolException |
|
|
from pydantic import BaseModel |
|
|
from pydantic.v1 import Field |
|
|
|
|
|
from langflow.base.langchain_utilities.model import LCToolComponent |
|
|
from langflow.field_typing import Tool |
|
|
from langflow.inputs import IntInput, MultilineInput, NestedDictInput, SecretStrInput, StrInput |
|
|
from langflow.schema import Data |
|
|
|
|
|
|
|
|
class GleanSearchAPISchema(BaseModel): |
|
|
query: str = Field(..., description="The search query") |
|
|
page_size: int = Field(10, description="Maximum number of results to return") |
|
|
request_options: dict[str, Any] | None = Field(default_factory=dict, description="Request Options") |
|
|
|
|
|
|
|
|
class GleanAPIWrapper(BaseModel): |
|
|
"""Wrapper around Glean API.""" |
|
|
|
|
|
glean_api_url: str |
|
|
glean_access_token: str |
|
|
act_as: str = "langflow-component@datastax.com" |
|
|
|
|
|
def _prepare_request( |
|
|
self, |
|
|
query: str, |
|
|
page_size: int = 10, |
|
|
request_options: dict[str, Any] | None = None, |
|
|
) -> dict: |
|
|
|
|
|
url = self.glean_api_url |
|
|
if not url.endswith("/"): |
|
|
url += "/" |
|
|
|
|
|
return { |
|
|
"url": urljoin(url, "search"), |
|
|
"headers": { |
|
|
"Authorization": f"Bearer {self.glean_access_token}", |
|
|
"X-Scio-ActAs": self.act_as, |
|
|
}, |
|
|
"payload": { |
|
|
"query": query, |
|
|
"pageSize": page_size, |
|
|
"requestOptions": request_options, |
|
|
}, |
|
|
} |
|
|
|
|
|
def results(self, query: str, **kwargs: Any) -> list[dict[str, Any]]: |
|
|
results = self._search_api_results(query, **kwargs) |
|
|
|
|
|
if len(results) == 0: |
|
|
msg = "No good Glean Search Result was found" |
|
|
raise AssertionError(msg) |
|
|
|
|
|
return results |
|
|
|
|
|
def run(self, query: str, **kwargs: Any) -> list[dict[str, Any]]: |
|
|
try: |
|
|
results = self.results(query, **kwargs) |
|
|
|
|
|
processed_results = [] |
|
|
for result in results: |
|
|
if "title" in result: |
|
|
result["snippets"] = result.get("snippets", [{"snippet": {"text": result["title"]}}]) |
|
|
if "text" not in result["snippets"][0]: |
|
|
result["snippets"][0]["text"] = result["title"] |
|
|
|
|
|
processed_results.append(result) |
|
|
except Exception as e: |
|
|
error_message = f"Error in Glean Search API: {e!s}" |
|
|
raise ToolException(error_message) from e |
|
|
|
|
|
return processed_results |
|
|
|
|
|
def _search_api_results(self, query: str, **kwargs: Any) -> list[dict[str, Any]]: |
|
|
request_details = self._prepare_request(query, **kwargs) |
|
|
|
|
|
response = httpx.post( |
|
|
request_details["url"], |
|
|
json=request_details["payload"], |
|
|
headers=request_details["headers"], |
|
|
) |
|
|
|
|
|
response.raise_for_status() |
|
|
response_json = response.json() |
|
|
|
|
|
return response_json.get("results", []) |
|
|
|
|
|
@staticmethod |
|
|
def _result_as_string(result: dict) -> str: |
|
|
return json.dumps(result, indent=4) |
|
|
|
|
|
|
|
|
class GleanSearchAPIComponent(LCToolComponent): |
|
|
display_name = "Glean Search API" |
|
|
description = "Call Glean Search API" |
|
|
name = "GleanAPI" |
|
|
icon = "Glean" |
|
|
|
|
|
inputs = [ |
|
|
StrInput( |
|
|
name="glean_api_url", |
|
|
display_name="Glean API URL", |
|
|
required=True, |
|
|
), |
|
|
SecretStrInput(name="glean_access_token", display_name="Glean Access Token", required=True), |
|
|
MultilineInput(name="query", display_name="Query", required=True), |
|
|
IntInput(name="page_size", display_name="Page Size", value=10), |
|
|
NestedDictInput(name="request_options", display_name="Request Options", required=False), |
|
|
] |
|
|
|
|
|
def build_tool(self) -> Tool: |
|
|
wrapper = self._build_wrapper( |
|
|
glean_api_url=self.glean_api_url, |
|
|
glean_access_token=self.glean_access_token, |
|
|
) |
|
|
|
|
|
tool = StructuredTool.from_function( |
|
|
name="glean_search_api", |
|
|
description="Search Glean for relevant results.", |
|
|
func=wrapper.run, |
|
|
args_schema=GleanSearchAPISchema, |
|
|
) |
|
|
|
|
|
self.status = "Glean Search API Tool for Langchain" |
|
|
|
|
|
return tool |
|
|
|
|
|
def run_model(self) -> list[Data]: |
|
|
tool = self.build_tool() |
|
|
|
|
|
results = tool.run( |
|
|
{ |
|
|
"query": self.query, |
|
|
"page_size": self.page_size, |
|
|
"request_options": self.request_options, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
data = [Data(data=result, text=result["snippets"][0]["text"]) for result in results] |
|
|
self.status = data |
|
|
|
|
|
return data |
|
|
|
|
|
def _build_wrapper( |
|
|
self, |
|
|
glean_api_url: str, |
|
|
glean_access_token: str, |
|
|
): |
|
|
return GleanAPIWrapper( |
|
|
glean_api_url=glean_api_url, |
|
|
glean_access_token=glean_access_token, |
|
|
) |
|
|
|