|
|
|
|
|
from collections.abc import Sequence |
|
|
from typing import Any |
|
|
|
|
|
|
|
|
from composio.client.collections import AppAuthScheme |
|
|
from composio.client.exceptions import NoItemsFound |
|
|
from composio_langchain import Action, App, ComposioToolSet |
|
|
from langchain_core.tools import Tool |
|
|
from loguru import logger |
|
|
from typing_extensions import override |
|
|
|
|
|
|
|
|
from langflow.base.langchain_utilities.model import LCToolComponent |
|
|
from langflow.inputs import DropdownInput, LinkInput, MessageTextInput, MultiselectInput, SecretStrInput, StrInput |
|
|
from langflow.io import Output |
|
|
|
|
|
|
|
|
class ComposioAPIComponent(LCToolComponent): |
|
|
display_name: str = "Composio Tools" |
|
|
description: str = "Use Composio toolset to run actions with your agent" |
|
|
name = "ComposioAPI" |
|
|
icon = "Composio" |
|
|
documentation: str = "https://docs.composio.dev" |
|
|
|
|
|
inputs = [ |
|
|
|
|
|
MessageTextInput(name="entity_id", display_name="Entity ID", value="default", advanced=True), |
|
|
SecretStrInput( |
|
|
name="api_key", |
|
|
display_name="Composio API Key", |
|
|
required=True, |
|
|
|
|
|
info="Refer to https://docs.composio.dev/faq/api_key/api_key", |
|
|
), |
|
|
DropdownInput( |
|
|
name="app_names", |
|
|
display_name="App Name", |
|
|
options=list(App.__annotations__), |
|
|
value="", |
|
|
info="The app name to use. Please refresh after selecting app name", |
|
|
refresh_button=True, |
|
|
), |
|
|
|
|
|
SecretStrInput( |
|
|
name="app_credentials", |
|
|
display_name="App Credentials", |
|
|
required=False, |
|
|
dynamic=True, |
|
|
show=False, |
|
|
info="Credentials for app authentication (API Key, Password, etc)", |
|
|
), |
|
|
MessageTextInput( |
|
|
name="username", |
|
|
display_name="Username", |
|
|
required=False, |
|
|
dynamic=True, |
|
|
show=False, |
|
|
info="Username for Basic authentication", |
|
|
), |
|
|
LinkInput( |
|
|
name="auth_link", |
|
|
display_name="Authentication Link", |
|
|
value="", |
|
|
info="Click to authenticate with OAuth2", |
|
|
dynamic=True, |
|
|
show=False, |
|
|
placeholder="Click to authenticate", |
|
|
), |
|
|
StrInput( |
|
|
name="auth_status", |
|
|
display_name="Auth Status", |
|
|
value="Not Connected", |
|
|
info="Current authentication status", |
|
|
dynamic=True, |
|
|
show=False, |
|
|
), |
|
|
MultiselectInput( |
|
|
name="action_names", |
|
|
display_name="Actions to use", |
|
|
required=True, |
|
|
options=[], |
|
|
value=[], |
|
|
info="The actions to pass to agent to execute", |
|
|
dynamic=True, |
|
|
show=False, |
|
|
), |
|
|
] |
|
|
|
|
|
outputs = [ |
|
|
Output(name="tools", display_name="Tools", method="build_tool"), |
|
|
] |
|
|
|
|
|
def _check_for_authorization(self, app: str) -> str: |
|
|
"""Checks if the app is authorized. |
|
|
|
|
|
Args: |
|
|
app (str): The app name to check authorization for. |
|
|
|
|
|
Returns: |
|
|
str: The authorization status or URL. |
|
|
""" |
|
|
toolset = self._build_wrapper() |
|
|
entity = toolset.client.get_entity(id=self.entity_id) |
|
|
try: |
|
|
|
|
|
entity.get_connection(app=app) |
|
|
except NoItemsFound: |
|
|
|
|
|
auth_scheme = self._get_auth_scheme(app) |
|
|
return self._handle_auth_by_scheme(entity, app, auth_scheme) |
|
|
except Exception: |
|
|
logger.exception("Authorization error") |
|
|
return "Error checking authorization" |
|
|
else: |
|
|
return f"{app} CONNECTED" |
|
|
|
|
|
def _get_auth_scheme(self, app_name: str) -> AppAuthScheme: |
|
|
"""Get the primary auth scheme for an app. |
|
|
|
|
|
Args: |
|
|
app_name (str): The name of the app to get auth scheme for. |
|
|
|
|
|
Returns: |
|
|
AppAuthScheme: The auth scheme details. |
|
|
""" |
|
|
toolset = self._build_wrapper() |
|
|
try: |
|
|
return toolset.get_auth_scheme_for_app(app=app_name.lower()) |
|
|
except Exception: |
|
|
logger.exception(f"Error getting auth scheme for {app_name}") |
|
|
return None |
|
|
|
|
|
def _handle_auth_by_scheme(self, entity: Any, app: str, auth_scheme: AppAuthScheme) -> str: |
|
|
"""Handle authentication based on the auth scheme. |
|
|
|
|
|
Args: |
|
|
entity (Any): The entity instance. |
|
|
app (str): The app name. |
|
|
auth_scheme (AppAuthScheme): The auth scheme details. |
|
|
|
|
|
Returns: |
|
|
str: The authentication status or URL. |
|
|
""" |
|
|
auth_mode = auth_scheme.auth_mode |
|
|
|
|
|
try: |
|
|
|
|
|
entity.get_connection(app=app) |
|
|
except NoItemsFound: |
|
|
|
|
|
if auth_mode == "API_KEY": |
|
|
if hasattr(self, "app_credentials") and self.app_credentials: |
|
|
try: |
|
|
entity.initiate_connection( |
|
|
app_name=app, |
|
|
auth_mode="API_KEY", |
|
|
auth_config={"api_key": self.app_credentials}, |
|
|
use_composio_auth=False, |
|
|
force_new_integration=True, |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error connecting with API Key: {e}") |
|
|
return "Invalid API Key" |
|
|
else: |
|
|
return f"{app} CONNECTED" |
|
|
return "Enter API Key" |
|
|
|
|
|
if ( |
|
|
auth_mode == "BASIC" |
|
|
and hasattr(self, "username") |
|
|
and hasattr(self, "app_credentials") |
|
|
and self.username |
|
|
and self.app_credentials |
|
|
): |
|
|
try: |
|
|
entity.initiate_connection( |
|
|
app_name=app, |
|
|
auth_mode="BASIC", |
|
|
auth_config={"username": self.username, "password": self.app_credentials}, |
|
|
use_composio_auth=False, |
|
|
force_new_integration=True, |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error connecting with Basic Auth: {e}") |
|
|
return "Invalid credentials" |
|
|
else: |
|
|
return f"{app} CONNECTED" |
|
|
elif auth_mode == "BASIC": |
|
|
return "Enter Username and Password" |
|
|
|
|
|
if auth_mode == "OAUTH2": |
|
|
try: |
|
|
return self._initiate_default_connection(entity, app) |
|
|
except Exception as e: |
|
|
logger.error(f"Error initiating OAuth2: {e}") |
|
|
return "OAuth2 initialization failed" |
|
|
|
|
|
return "Unsupported auth mode" |
|
|
except Exception as e: |
|
|
logger.error(f"Error checking connection status: {e}") |
|
|
return f"Error: {e!s}" |
|
|
else: |
|
|
return f"{app} CONNECTED" |
|
|
|
|
|
def _initiate_default_connection(self, entity: Any, app: str) -> str: |
|
|
connection = entity.initiate_connection(app_name=app, use_composio_auth=True, force_new_integration=True) |
|
|
return connection.redirectUrl |
|
|
|
|
|
def _get_connected_app_names_for_entity(self) -> list[str]: |
|
|
toolset = self._build_wrapper() |
|
|
connections = toolset.client.get_entity(id=self.entity_id).get_connections() |
|
|
return list({connection.appUniqueId for connection in connections}) |
|
|
|
|
|
def _get_normalized_app_name(self) -> str: |
|
|
"""Get app name without connection status suffix. |
|
|
|
|
|
Returns: |
|
|
str: Normalized app name. |
|
|
""" |
|
|
return self.app_names.replace(" ✅", "").replace("_connected", "") |
|
|
|
|
|
@override |
|
|
def update_build_config(self, build_config: dict, field_value: Any, field_name: str | None = None) -> dict: |
|
|
|
|
|
dynamic_fields = ["app_credentials", "username", "auth_link", "auth_status", "action_names"] |
|
|
for field in dynamic_fields: |
|
|
if field in build_config: |
|
|
if build_config[field]["value"] is None or build_config[field]["value"] == "": |
|
|
build_config[field]["show"] = False |
|
|
build_config[field]["advanced"] = True |
|
|
else: |
|
|
build_config[field]["show"] = True |
|
|
build_config[field]["advanced"] = False |
|
|
|
|
|
if field_name in {"app_names"} and hasattr(self, "api_key") and self.api_key != "": |
|
|
|
|
|
app_name = self.app_names |
|
|
try: |
|
|
toolset = self._build_wrapper() |
|
|
entity = toolset.client.get_entity(id=self.entity_id) |
|
|
|
|
|
|
|
|
build_config["auth_status"]["show"] = True |
|
|
build_config["auth_status"]["advanced"] = False |
|
|
|
|
|
try: |
|
|
|
|
|
entity.get_connection(app=app_name) |
|
|
build_config["auth_status"]["value"] = "✅" |
|
|
build_config["auth_link"]["show"] = False |
|
|
|
|
|
build_config["action_names"]["show"] = True |
|
|
build_config["action_names"]["advanced"] = False |
|
|
|
|
|
except NoItemsFound: |
|
|
|
|
|
auth_scheme = self._get_auth_scheme(app_name) |
|
|
auth_mode = auth_scheme.auth_mode |
|
|
logger.info(f"Auth mode for {app_name}: {auth_mode}") |
|
|
|
|
|
if auth_mode == "API_KEY": |
|
|
build_config["app_credentials"]["show"] = True |
|
|
build_config["app_credentials"]["advanced"] = False |
|
|
build_config["app_credentials"]["display_name"] = "API Key" |
|
|
build_config["auth_status"]["value"] = "Enter API Key" |
|
|
|
|
|
elif auth_mode == "BASIC": |
|
|
build_config["username"]["show"] = True |
|
|
build_config["username"]["advanced"] = False |
|
|
build_config["app_credentials"]["show"] = True |
|
|
build_config["app_credentials"]["advanced"] = False |
|
|
build_config["app_credentials"]["display_name"] = "Password" |
|
|
build_config["auth_status"]["value"] = "Enter Username and Password" |
|
|
|
|
|
elif auth_mode == "OAUTH2": |
|
|
build_config["auth_link"]["show"] = True |
|
|
build_config["auth_link"]["advanced"] = False |
|
|
auth_url = self._initiate_default_connection(entity, app_name) |
|
|
build_config["auth_link"]["value"] = auth_url |
|
|
build_config["auth_status"]["value"] = "Click link to authenticate" |
|
|
|
|
|
else: |
|
|
build_config["auth_status"]["value"] = "Unsupported auth mode" |
|
|
|
|
|
|
|
|
if build_config["auth_status"]["value"] == "✅": |
|
|
all_action_names = list(Action.__annotations__) |
|
|
app_action_names = [ |
|
|
action_name |
|
|
for action_name in all_action_names |
|
|
if action_name.lower().startswith(app_name.lower() + "_") |
|
|
] |
|
|
if build_config["action_names"]["options"] != app_action_names: |
|
|
build_config["action_names"]["options"] = app_action_names |
|
|
build_config["action_names"]["value"] = [app_action_names[0]] if app_action_names else [""] |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error checking auth status: {e}, app: {app_name}") |
|
|
build_config["auth_status"]["value"] = f"Error: {e!s}" |
|
|
|
|
|
return build_config |
|
|
|
|
|
def build_tool(self) -> Sequence[Tool]: |
|
|
"""Build Composio tools based on selected actions. |
|
|
|
|
|
Returns: |
|
|
Sequence[Tool]: List of configured Composio tools. |
|
|
""" |
|
|
composio_toolset = self._build_wrapper() |
|
|
return composio_toolset.get_tools(actions=self.action_names) |
|
|
|
|
|
def _build_wrapper(self) -> ComposioToolSet: |
|
|
"""Build the Composio toolset wrapper. |
|
|
|
|
|
Returns: |
|
|
ComposioToolSet: The initialized toolset. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the API key is not found or invalid. |
|
|
""" |
|
|
try: |
|
|
if not self.api_key: |
|
|
msg = "Composio API Key is required" |
|
|
raise ValueError(msg) |
|
|
return ComposioToolSet(api_key=self.api_key) |
|
|
except ValueError as e: |
|
|
logger.error(f"Error building Composio wrapper: {e}") |
|
|
msg = "Please provide a valid Composio API Key in the component settings" |
|
|
raise ValueError(msg) from e |
|
|
|