Spaces:
Running
Running
| # Standard library imports | |
| from collections.abc import Sequence | |
| from typing import Any | |
| # Third-party imports | |
| from composio.client.collections import AppAuthScheme | |
| from composio.client.exceptions import NoItemsFound | |
| from composio_langchain import Action, App, ComposioToolSet | |
| from langchain_core.tools import Tool | |
| from loguru import logger | |
| from typing_extensions import override | |
| # Local imports | |
| from langflow.base.langchain_utilities.model import LCToolComponent | |
| from langflow.inputs import DropdownInput, LinkInput, MessageTextInput, MultiselectInput, SecretStrInput, StrInput | |
| from langflow.io import Output | |
| class ComposioAPIComponent(LCToolComponent): | |
| display_name: str = "Composio Tools" | |
| description: str = "Use Composio toolset to run actions with your agent" | |
| name = "ComposioAPI" | |
| icon = "Composio" | |
| documentation: str = "https://docs.composio.dev" | |
| inputs = [ | |
| # Basic configuration inputs | |
| MessageTextInput(name="entity_id", display_name="Entity ID", value="default", advanced=True), | |
| SecretStrInput( | |
| name="api_key", | |
| display_name="Composio API Key", | |
| required=True, | |
| # refresh_button=True, | |
| info="Refer to https://docs.composio.dev/faq/api_key/api_key", | |
| ), | |
| DropdownInput( | |
| name="app_names", | |
| display_name="App Name", | |
| options=list(App.__annotations__), | |
| value="", | |
| info="The app name to use. Please refresh after selecting app name", | |
| refresh_button=True, | |
| ), | |
| # Authentication-related inputs (initially hidden) | |
| SecretStrInput( | |
| name="app_credentials", | |
| display_name="App Credentials", | |
| required=False, | |
| dynamic=True, | |
| show=False, | |
| info="Credentials for app authentication (API Key, Password, etc)", | |
| ), | |
| MessageTextInput( | |
| name="username", | |
| display_name="Username", | |
| required=False, | |
| dynamic=True, | |
| show=False, | |
| info="Username for Basic authentication", | |
| ), | |
| LinkInput( | |
| name="auth_link", | |
| display_name="Authentication Link", | |
| value="", | |
| info="Click to authenticate with OAuth2", | |
| dynamic=True, | |
| show=False, | |
| placeholder="Click to authenticate", | |
| ), | |
| StrInput( | |
| name="auth_status", | |
| display_name="Auth Status", | |
| value="Not Connected", | |
| info="Current authentication status", | |
| dynamic=True, | |
| show=False, | |
| ), | |
| MultiselectInput( | |
| name="action_names", | |
| display_name="Actions to use", | |
| required=True, | |
| options=[], | |
| value=[], | |
| info="The actions to pass to agent to execute", | |
| dynamic=True, | |
| show=False, | |
| ), | |
| ] | |
| outputs = [ | |
| Output(name="tools", display_name="Tools", method="build_tool"), | |
| ] | |
| def _check_for_authorization(self, app: str) -> str: | |
| """Checks if the app is authorized. | |
| Args: | |
| app (str): The app name to check authorization for. | |
| Returns: | |
| str: The authorization status or URL. | |
| """ | |
| toolset = self._build_wrapper() | |
| entity = toolset.client.get_entity(id=self.entity_id) | |
| try: | |
| # Check if user is already connected | |
| entity.get_connection(app=app) | |
| except NoItemsFound: | |
| # Get auth scheme for the app | |
| auth_scheme = self._get_auth_scheme(app) | |
| return self._handle_auth_by_scheme(entity, app, auth_scheme) | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Authorization error") | |
| return "Error checking authorization" | |
| else: | |
| return f"{app} CONNECTED" | |
| def _get_auth_scheme(self, app_name: str) -> AppAuthScheme: | |
| """Get the primary auth scheme for an app. | |
| Args: | |
| app_name (str): The name of the app to get auth scheme for. | |
| Returns: | |
| AppAuthScheme: The auth scheme details. | |
| """ | |
| toolset = self._build_wrapper() | |
| try: | |
| return toolset.get_auth_scheme_for_app(app=app_name.lower()) | |
| except Exception: # noqa: BLE001 | |
| logger.exception(f"Error getting auth scheme for {app_name}") | |
| return None | |
| def _handle_auth_by_scheme(self, entity: Any, app: str, auth_scheme: AppAuthScheme) -> str: | |
| """Handle authentication based on the auth scheme. | |
| Args: | |
| entity (Any): The entity instance. | |
| app (str): The app name. | |
| auth_scheme (AppAuthScheme): The auth scheme details. | |
| Returns: | |
| str: The authentication status or URL. | |
| """ | |
| auth_mode = auth_scheme.auth_mode | |
| try: | |
| # First check if already connected | |
| entity.get_connection(app=app) | |
| except NoItemsFound: | |
| # If not connected, handle new connection based on auth mode | |
| if auth_mode == "API_KEY": | |
| if hasattr(self, "app_credentials") and self.app_credentials: | |
| try: | |
| entity.initiate_connection( | |
| app_name=app, | |
| auth_mode="API_KEY", | |
| auth_config={"api_key": self.app_credentials}, | |
| use_composio_auth=False, | |
| force_new_integration=True, | |
| ) | |
| except Exception as e: # noqa: BLE001 | |
| logger.error(f"Error connecting with API Key: {e}") | |
| return "Invalid API Key" | |
| else: | |
| return f"{app} CONNECTED" | |
| return "Enter API Key" | |
| if ( | |
| auth_mode == "BASIC" | |
| and hasattr(self, "username") | |
| and hasattr(self, "app_credentials") | |
| and self.username | |
| and self.app_credentials | |
| ): | |
| try: | |
| entity.initiate_connection( | |
| app_name=app, | |
| auth_mode="BASIC", | |
| auth_config={"username": self.username, "password": self.app_credentials}, | |
| use_composio_auth=False, | |
| force_new_integration=True, | |
| ) | |
| except Exception as e: # noqa: BLE001 | |
| logger.error(f"Error connecting with Basic Auth: {e}") | |
| return "Invalid credentials" | |
| else: | |
| return f"{app} CONNECTED" | |
| elif auth_mode == "BASIC": | |
| return "Enter Username and Password" | |
| if auth_mode == "OAUTH2": | |
| try: | |
| return self._initiate_default_connection(entity, app) | |
| except Exception as e: # noqa: BLE001 | |
| logger.error(f"Error initiating OAuth2: {e}") | |
| return "OAuth2 initialization failed" | |
| return "Unsupported auth mode" | |
| except Exception as e: # noqa: BLE001 | |
| logger.error(f"Error checking connection status: {e}") | |
| return f"Error: {e!s}" | |
| else: | |
| return f"{app} CONNECTED" | |
| def _initiate_default_connection(self, entity: Any, app: str) -> str: | |
| connection = entity.initiate_connection(app_name=app, use_composio_auth=True, force_new_integration=True) | |
| return connection.redirectUrl | |
| def _get_connected_app_names_for_entity(self) -> list[str]: | |
| toolset = self._build_wrapper() | |
| connections = toolset.client.get_entity(id=self.entity_id).get_connections() | |
| return list({connection.appUniqueId for connection in connections}) | |
| def _get_normalized_app_name(self) -> str: | |
| """Get app name without connection status suffix. | |
| Returns: | |
| str: Normalized app name. | |
| """ | |
| return self.app_names.replace(" ✅", "").replace("_connected", "") | |
| def update_build_config(self, build_config: dict, field_value: Any, field_name: str | None = None) -> dict: | |
| # First, ensure all dynamic fields are hidden by default | |
| dynamic_fields = ["app_credentials", "username", "auth_link", "auth_status", "action_names"] | |
| for field in dynamic_fields: | |
| if field in build_config: | |
| if build_config[field]["value"] is None or build_config[field]["value"] == "": | |
| build_config[field]["show"] = False | |
| build_config[field]["advanced"] = True # Hide from main view | |
| else: | |
| build_config[field]["show"] = True | |
| build_config[field]["advanced"] = False | |
| if field_name in {"app_names"} and hasattr(self, "api_key") and self.api_key != "": | |
| # app_name = self._get_normalized_app_name() | |
| app_name = self.app_names | |
| try: | |
| toolset = self._build_wrapper() | |
| entity = toolset.client.get_entity(id=self.entity_id) | |
| # Always show auth_status when app is selected | |
| build_config["auth_status"]["show"] = True | |
| build_config["auth_status"]["advanced"] = False | |
| try: | |
| # Check if already connected | |
| entity.get_connection(app=app_name) | |
| build_config["auth_status"]["value"] = "✅" | |
| build_config["auth_link"]["show"] = False | |
| # Show action selection for connected apps | |
| build_config["action_names"]["show"] = True | |
| build_config["action_names"]["advanced"] = False | |
| except NoItemsFound: | |
| # Get auth scheme and show relevant fields | |
| auth_scheme = self._get_auth_scheme(app_name) | |
| auth_mode = auth_scheme.auth_mode | |
| logger.info(f"Auth mode for {app_name}: {auth_mode}") | |
| if auth_mode == "API_KEY": | |
| build_config["app_credentials"]["show"] = True | |
| build_config["app_credentials"]["advanced"] = False | |
| build_config["app_credentials"]["display_name"] = "API Key" | |
| build_config["auth_status"]["value"] = "Enter API Key" | |
| elif auth_mode == "BASIC": | |
| build_config["username"]["show"] = True | |
| build_config["username"]["advanced"] = False | |
| build_config["app_credentials"]["show"] = True | |
| build_config["app_credentials"]["advanced"] = False | |
| build_config["app_credentials"]["display_name"] = "Password" | |
| build_config["auth_status"]["value"] = "Enter Username and Password" | |
| elif auth_mode == "OAUTH2": | |
| build_config["auth_link"]["show"] = True | |
| build_config["auth_link"]["advanced"] = False | |
| auth_url = self._initiate_default_connection(entity, app_name) | |
| build_config["auth_link"]["value"] = auth_url | |
| build_config["auth_status"]["value"] = "Click link to authenticate" | |
| else: | |
| build_config["auth_status"]["value"] = "Unsupported auth mode" | |
| # Update action names if connected | |
| if build_config["auth_status"]["value"] == "✅": | |
| all_action_names = list(Action.__annotations__) | |
| app_action_names = [ | |
| action_name | |
| for action_name in all_action_names | |
| if action_name.lower().startswith(app_name.lower() + "_") | |
| ] | |
| if build_config["action_names"]["options"] != app_action_names: | |
| build_config["action_names"]["options"] = app_action_names | |
| build_config["action_names"]["value"] = [app_action_names[0]] if app_action_names else [""] | |
| except Exception as e: # noqa: BLE001 | |
| logger.error(f"Error checking auth status: {e}, app: {app_name}") | |
| build_config["auth_status"]["value"] = f"Error: {e!s}" | |
| return build_config | |
| def build_tool(self) -> Sequence[Tool]: | |
| """Build Composio tools based on selected actions. | |
| Returns: | |
| Sequence[Tool]: List of configured Composio tools. | |
| """ | |
| composio_toolset = self._build_wrapper() | |
| return composio_toolset.get_tools(actions=self.action_names) | |
| def _build_wrapper(self) -> ComposioToolSet: | |
| """Build the Composio toolset wrapper. | |
| Returns: | |
| ComposioToolSet: The initialized toolset. | |
| Raises: | |
| ValueError: If the API key is not found or invalid. | |
| """ | |
| try: | |
| if not self.api_key: | |
| msg = "Composio API Key is required" | |
| raise ValueError(msg) | |
| return ComposioToolSet(api_key=self.api_key) | |
| except ValueError as e: | |
| logger.error(f"Error building Composio wrapper: {e}") | |
| msg = "Please provide a valid Composio API Key in the component settings" | |
| raise ValueError(msg) from e | |