Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| import yaml | |
| import json | |
| import re | |
| import xml.dom.minidom | |
| import xml.etree.ElementTree as ET | |
| from pathlib import Path | |
| from urllib.parse import urlparse | |
| from typing import Dict, List, Any, Optional, Tuple, Set | |
| from collections import defaultdict | |
| class ApiSchemaGeneratorV5: | |
| """ | |
| A class to dynamically process API specifications and generate appropriate output files | |
| like datasource_plugin_meta.json and default_schema.orx files. | |
| Version 5: Added protection against circular references in schema objects | |
| """ | |
| def __init__(self, api_spec_url: str, api_name: str = None, selected_endpoints: List[str] = None): | |
| """ | |
| Initialize the ApiSchemaGeneratorV5 with an API specification URL. | |
| Args: | |
| api_spec_url: URL or path to the API specification (OpenAPI/Swagger) | |
| api_name: Optional name for the API (will be extracted from spec if not provided) | |
| selected_endpoints: Optional list of endpoint paths to include in the schema | |
| """ | |
| self.api_spec_url = api_spec_url | |
| self.api_name = api_name | |
| self.api_spec = None | |
| self.api_info = None | |
| self.auth_info = None | |
| self.endpoints = None | |
| self.schema_objects = None | |
| self.common_parameters = {} # Store common parameters across endpoints | |
| self.selected_endpoints = selected_endpoints # Store selected endpoints | |
| def fetch_api_spec(self) -> dict: | |
| """Fetch and parse the API specification""" | |
| try: | |
| if self.api_spec_url.startswith(('http://', 'https://')): | |
| response = requests.get(self.api_spec_url) | |
| response.raise_for_status() | |
| content = response.text | |
| else: | |
| with open(self.api_spec_url, 'r') as f: | |
| content = f.read() | |
| # Determine if it's JSON or YAML | |
| try: | |
| spec = json.loads(content) | |
| except json.JSONDecodeError: | |
| spec = yaml.safe_load(content) | |
| self.api_spec = spec | |
| return spec | |
| except Exception as e: | |
| print(f"Error fetching API specification: {str(e)}") | |
| return {} | |
| def extract_api_info(self) -> dict: | |
| """Extract essential information from the API specification""" | |
| if not self.api_spec: | |
| self.fetch_api_spec() | |
| if not self.api_spec: | |
| return {} | |
| api_info = { | |
| 'title': self.api_spec.get('info', {}).get('title', 'Unknown API'), | |
| 'description': self.api_spec.get('info', {}).get('description', ''), | |
| 'version': self.api_spec.get('info', {}).get('version', '1.0.0'), | |
| 'endpoints': {}, | |
| 'auth': None, | |
| 'schemas': {} | |
| } | |
| # Set API name if not provided | |
| if not self.api_name: | |
| self.api_name = self._sanitize_name(api_info['title']) | |
| # Extract authentication info if present | |
| if 'components' in self.api_spec and 'securitySchemes' in self.api_spec['components']: | |
| api_info['auth'] = self._extract_auth_info(self.api_spec) | |
| # Extract common parameters if present | |
| if 'components' in self.api_spec and 'parameters' in self.api_spec['components']: | |
| self.common_parameters = self.api_spec['components']['parameters'] | |
| # Extract paths and their parameters | |
| if 'paths' in self.api_spec: | |
| for path, methods in self.api_spec['paths'].items(): | |
| # Extract path-level parameters | |
| path_params = [] | |
| if 'parameters' in methods: | |
| path_params = methods['parameters'] | |
| api_info['endpoints'][path] = {} | |
| for method, details in methods.items(): | |
| if method.lower() in ['get', 'post', 'put', 'delete', 'patch']: | |
| # Combine path-level and method-level parameters | |
| all_params = path_params.copy() if path_params else [] | |
| if 'parameters' in details: | |
| all_params.extend(details['parameters']) | |
| # Process parameters | |
| params = [] | |
| for param in all_params: | |
| # Handle parameter references | |
| if '$ref' in param: | |
| ref = param['$ref'] | |
| param_name = ref.split('/')[-1] | |
| if param_name in self.common_parameters: | |
| param = self.common_parameters[param_name] | |
| # Extract parameter schema | |
| param_schema = param.get('schema', {}) | |
| param_type = None | |
| # Try to get the type from the schema | |
| if param_schema: | |
| param_type = param_schema.get('type') | |
| # If schema has a reference, resolve it | |
| if '$ref' in param_schema: | |
| ref_schema = self._resolve_schema_reference(param_schema['$ref']) | |
| if ref_schema: | |
| param_type = ref_schema.get('type') | |
| # Default to string if no type is found | |
| if not param_type: | |
| param_type = 'string' | |
| params.append({ | |
| 'name': param.get('name'), | |
| 'in': param.get('in'), | |
| 'required': param.get('required', False), | |
| 'type': param_type, | |
| 'enum': param_schema.get('enum'), | |
| 'description': param.get('description', '') | |
| }) | |
| # Extract requestBody if present | |
| request_body_schema = None | |
| if 'requestBody' in details: | |
| content = details['requestBody'].get('content', {}) | |
| if 'application/json' in content: | |
| request_body_schema = content['application/json'].get('schema') | |
| # Extract response schema | |
| response_schema = None | |
| if 'responses' in details: | |
| for status_code, response in details['responses'].items(): | |
| if status_code.startswith('2'): # 2xx responses | |
| if 'content' in response: | |
| for content_type, content_details in response['content'].items(): | |
| if 'schema' in content_details: | |
| response_schema = content_details['schema'] | |
| break | |
| break | |
| api_info['endpoints'][path][method] = { | |
| 'summary': details.get('summary', ''), | |
| 'description': details.get('description', ''), | |
| 'operationId': details.get('operationId', ''), | |
| 'parameters': params, | |
| 'requestBody': request_body_schema, | |
| 'responseSchema': response_schema, | |
| 'security': details.get('security') | |
| } | |
| # Extract schema objects | |
| if 'components' in self.api_spec and 'schemas' in self.api_spec['components']: | |
| api_info['schemas'] = self.api_spec['components']['schemas'] | |
| self.api_info = api_info | |
| self.auth_info = api_info['auth'] | |
| self.endpoints = api_info['endpoints'] | |
| self.schema_objects = api_info['schemas'] | |
| return api_info | |
| def _extract_auth_info(self, spec: dict) -> dict: | |
| """Extract detailed authentication information from API spec""" | |
| auth_info = {} | |
| if 'securitySchemes' in spec.get('components', {}): | |
| for scheme_name, scheme_details in spec['components']['securitySchemes'].items(): | |
| auth_type = scheme_details.get('type') | |
| # Extract common info | |
| auth_info[scheme_name] = { | |
| 'type': auth_type, | |
| 'description': scheme_details.get('description', '') | |
| } | |
| # Extract type-specific info | |
| if auth_type == 'apiKey': | |
| auth_info[scheme_name].update({ | |
| 'name': scheme_details.get('name'), | |
| 'in': scheme_details.get('in') # header, query, or cookie | |
| }) | |
| elif auth_type == 'http': | |
| auth_info[scheme_name].update({ | |
| 'scheme': scheme_details.get('scheme') # basic, bearer, etc. | |
| }) | |
| elif auth_type == 'oauth2': | |
| flows = scheme_details.get('flows', {}) | |
| auth_info[scheme_name]['flows'] = {} | |
| for flow_type, flow_details in flows.items(): | |
| auth_info[scheme_name]['flows'][flow_type] = { | |
| 'authorizationUrl': flow_details.get('authorizationUrl'), | |
| 'tokenUrl': flow_details.get('tokenUrl'), | |
| 'refreshUrl': flow_details.get('refreshUrl'), | |
| 'scopes': flow_details.get('scopes', {}) | |
| } | |
| return auth_info | |
| def _sanitize_name(self, name: str) -> str: | |
| """Sanitize a name to be used as an identifier""" | |
| if not name: | |
| return "api" | |
| # Remove special characters and replace spaces with underscores | |
| sanitized = re.sub(r'[^a-zA-Z0-9_\s]', '', name) | |
| sanitized = re.sub(r'\s+', '_', sanitized).lower() | |
| return sanitized | |
| def _extract_server_url(self) -> str: | |
| """Extract server URL from API specification""" | |
| if not self.api_spec: | |
| self.fetch_api_spec() | |
| # Extract server information | |
| servers = self.api_spec.get('servers', []) | |
| if servers and 'url' in servers[0]: | |
| return servers[0]['url'] | |
| # Default URL if no servers defined | |
| return "https://api.example.com" | |
| def _extract_maxretries(self) -> str: | |
| """Extract maximum retries from API specification""" | |
| if not self.api_spec: | |
| self.fetch_api_spec() | |
| # Look for rate limiting information in the API spec | |
| # Check in extensions | |
| if 'x-ratelimit-retries' in self.api_spec: | |
| return str(self.api_spec['x-ratelimit-retries']) | |
| # Check in info section | |
| if 'info' in self.api_spec and 'x-ratelimit-retries' in self.api_spec['info']: | |
| return str(self.api_spec['info']['x-ratelimit-retries']) | |
| # Check in components section | |
| if 'components' in self.api_spec and 'x-ratelimit-retries' in self.api_spec['components']: | |
| return str(self.api_spec['components']['x-ratelimit-retries']) | |
| # Default value if not found in API spec | |
| return "3" | |
| def _extract_timeout(self) -> str: | |
| """Extract timeout from API specification""" | |
| if not self.api_spec: | |
| self.fetch_api_spec() | |
| # Look for timeout information in the API spec | |
| # Check in extensions | |
| if 'x-timeout' in self.api_spec: | |
| return str(self.api_spec['x-timeout']) | |
| # Check in info section | |
| if 'info' in self.api_spec and 'x-timeout' in self.api_spec['info']: | |
| return str(self.api_spec['info']['x-timeout']) | |
| # Check in components section | |
| if 'components' in self.api_spec and 'x-timeout' in self.api_spec['components']: | |
| return str(self.api_spec['components']['x-timeout']) | |
| # Default value if not found in API spec | |
| return "60" | |
| def _get_auth_meta_fields(self) -> List[Dict[str, Any]]: | |
| """Generate meta fields for authentication based on the API spec""" | |
| meta_fields = [] | |
| added_fields = set() # Track added field names to avoid duplication | |
| # Use the custom API name for all field descriptions | |
| api_name_title = self.api_name.title() | |
| if not self.auth_info: | |
| # Default to basic auth if no auth info is found | |
| meta_fields.extend([ | |
| { | |
| "name": "url", | |
| "description": f"{api_name_title} URL", | |
| "sectionName": "Connection Info", | |
| "defaultValue": self._extract_server_url(), | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }, | |
| { | |
| "name": "username", | |
| "description": f"{api_name_title} Username", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[username]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }, | |
| { | |
| "name": "password", | |
| "description": f"{api_name_title} Password", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[password]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| } | |
| ]) | |
| added_fields.update(["url", "username", "password"]) | |
| else: | |
| # Add URL field | |
| meta_fields.append({ | |
| "name": "url", | |
| "description": f"{api_name_title} URL", | |
| "sectionName": "Connection Info", | |
| "defaultValue": self._extract_server_url(), | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("url") | |
| # Process each auth scheme | |
| for scheme_name, scheme_details in self.auth_info.items(): | |
| auth_type = scheme_details.get('type') | |
| if auth_type == 'apiKey' and "apitoken" not in added_fields: | |
| meta_fields.append({ | |
| "name": "apitoken", | |
| "description": f"{api_name_title} API Token", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[your_api_token]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("apitoken") | |
| elif auth_type == 'http': | |
| scheme = scheme_details.get('scheme', '').lower() | |
| if scheme == 'basic': | |
| if "username" not in added_fields: | |
| meta_fields.append({ | |
| "name": "username", | |
| "description": f"{api_name_title} Username", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[username]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("username") | |
| if "password" not in added_fields: | |
| meta_fields.append({ | |
| "name": "password", | |
| "description": f"{api_name_title} Password", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[password]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("password") | |
| elif scheme == 'bearer' and "token" not in added_fields: | |
| meta_fields.append({ | |
| "name": "token", | |
| "description": f"{api_name_title} Bearer Token", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[your_bearer_token]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("token") | |
| elif auth_type == 'oauth2': | |
| flows = scheme_details.get('flows', {}) | |
| # Check for client credentials flow | |
| if 'clientCredentials' in flows: | |
| if "clientId" not in added_fields: | |
| meta_fields.append({ | |
| "name": "clientId", | |
| "description": f"{api_name_title} Client ID", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[your_client_id]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("clientId") | |
| if "clientSecret" not in added_fields: | |
| meta_fields.append({ | |
| "name": "clientSecret", | |
| "description": f"{api_name_title} Client Secret", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[your_client_secret]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("clientSecret") | |
| if "tokenUrl" not in added_fields: | |
| meta_fields.append({ | |
| "name": "tokenUrl", | |
| "description": f"{api_name_title} Token URL", | |
| "sectionName": "Connection Info", | |
| "defaultValue": flows['clientCredentials'].get('tokenUrl', ''), | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("tokenUrl") | |
| # Check for password flow | |
| elif 'password' in flows: | |
| if "username" not in added_fields: | |
| meta_fields.append({ | |
| "name": "username", | |
| "description": f"{api_name_title} Username", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[username]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("username") | |
| if "password" not in added_fields: | |
| meta_fields.append({ | |
| "name": "password", | |
| "description": f"{api_name_title} Password", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[password]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("password") | |
| if "clientId" not in added_fields: | |
| meta_fields.append({ | |
| "name": "clientId", | |
| "description": f"{api_name_title} Client ID", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[your_client_id]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("clientId") | |
| if "clientSecret" not in added_fields: | |
| meta_fields.append({ | |
| "name": "clientSecret", | |
| "description": f"{api_name_title} Client Secret", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[your_client_secret]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("clientSecret") | |
| if "tokenUrl" not in added_fields: | |
| meta_fields.append({ | |
| "name": "tokenUrl", | |
| "description": f"{api_name_title} Token URL", | |
| "sectionName": "Connection Info", | |
| "defaultValue": flows['password'].get('tokenUrl', ''), | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("tokenUrl") | |
| # Check for authorization code flow | |
| elif 'authorizationCode' in flows: | |
| if "clientId" not in added_fields: | |
| meta_fields.append({ | |
| "name": "clientId", | |
| "description": f"{api_name_title} Client ID", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[your_client_id]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("clientId") | |
| if "clientSecret" not in added_fields: | |
| meta_fields.append({ | |
| "name": "clientSecret", | |
| "description": f"{api_name_title} Client Secret", | |
| "sectionName": "Connection Info", | |
| "defaultValue": "[your_client_secret]", | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("clientSecret") | |
| if "authorizationUrl" not in added_fields: | |
| meta_fields.append({ | |
| "name": "authorizationUrl", | |
| "description": f"{api_name_title} Authorization URL", | |
| "sectionName": "Connection Info", | |
| "defaultValue": flows['authorizationCode'].get('authorizationUrl', ''), | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("authorizationUrl") | |
| if "tokenUrl" not in added_fields: | |
| meta_fields.append({ | |
| "name": "tokenUrl", | |
| "description": f"{api_name_title} Token URL", | |
| "sectionName": "Connection Info", | |
| "defaultValue": flows['authorizationCode'].get('tokenUrl', ''), | |
| "dataType": "STRING", | |
| "isRequired": True, | |
| "regex": "" | |
| }) | |
| added_fields.add("tokenUrl") | |
| # Add request parameters from API spec | |
| if "maxretries" not in added_fields: | |
| maxretries_value = self._extract_maxretries() | |
| meta_fields.append({ | |
| "name": "maxretries", | |
| "description": "Maximum Request Retries", | |
| "sectionName": "Request Metering", | |
| "defaultValue": maxretries_value, | |
| "dataType": "NUMBER", | |
| "isRequired": True | |
| }) | |
| if "timeout" not in added_fields: | |
| timeout_value = self._extract_timeout() | |
| meta_fields.append({ | |
| "name": "timeout", | |
| "description": "Request Timeout (seconds)", | |
| "sectionName": "Request Metering", | |
| "defaultValue": timeout_value, | |
| "dataType": "NUMBER", | |
| "isRequired": True | |
| }) | |
| return meta_fields | |
| def generate_datasource_plugin_meta(self) -> dict: | |
| """Generate the datasource plugin meta JSON file""" | |
| if not self.api_info: | |
| self.extract_api_info() | |
| # Handle case where API info couldn't be extracted | |
| if not self.api_info: | |
| self.api_info = { | |
| 'title': 'Unknown API', | |
| 'description': '', | |
| 'version': '1.0.0', | |
| 'endpoints': {}, | |
| 'auth': None, | |
| 'schemas': {} | |
| } | |
| # Use the custom API name for all references in the metadata | |
| api_name_title = self.api_name.title() | |
| api_name_lower = self.api_name.lower() | |
| meta_data = { | |
| "name": api_name_title, | |
| "description": self.api_info.get('description') or f"{api_name_title} Directory", | |
| "backendCategory": "custom", | |
| "userCreated": False, | |
| "icon": f"/{api_name_lower}.svg", | |
| "isSchemaExtractable": False, | |
| "meta": self._get_auth_meta_fields() | |
| } | |
| return meta_data | |
| def _resolve_schema_reference(self, ref: str) -> dict: | |
| """Resolve a schema reference to the actual schema definition""" | |
| if not ref.startswith('#/components/schemas/'): | |
| return {} | |
| schema_name = ref.split('/')[-1] | |
| if self.schema_objects and schema_name in self.schema_objects: | |
| return self.schema_objects[schema_name] | |
| return {} | |
| def _extract_properties_from_schema(self, schema: dict) -> dict: | |
| """Extract properties from a schema, handling references and composition""" | |
| if not schema or not isinstance(schema, dict): | |
| return {} | |
| # Handle direct reference | |
| if '$ref' in schema: | |
| return self._extract_properties_from_schema(self._resolve_schema_reference(schema['$ref'])) | |
| # Get direct properties | |
| properties = schema.get('properties', {}) | |
| # Handle allOf composition | |
| if 'allOf' in schema: | |
| for sub_schema in schema['allOf']: | |
| sub_properties = self._extract_properties_from_schema(sub_schema) | |
| properties.update(sub_properties) | |
| # Handle oneOf composition | |
| if 'oneOf' in schema: | |
| # For oneOf, we'll take properties from the first schema as a representative | |
| if schema['oneOf'] and isinstance(schema['oneOf'][0], dict): | |
| first_schema = schema['oneOf'][0] | |
| sub_properties = self._extract_properties_from_schema(first_schema) | |
| properties.update(sub_properties) | |
| # Handle anyOf composition | |
| if 'anyOf' in schema: | |
| # For anyOf, we'll take properties from all schemas | |
| for sub_schema in schema['anyOf']: | |
| sub_properties = self._extract_properties_from_schema(sub_schema) | |
| properties.update(sub_properties) | |
| return properties | |
| def _extract_common_parameters(self) -> Dict[str, Dict[str, Any]]: | |
| """Extract common parameters from the API spec""" | |
| # Return empty dict as we're not using common parameters anymore | |
| return {} | |
| def _extract_data_models(self) -> List[Dict[str, Any]]: | |
| """Extract data models from schema objects that are relevant to selected endpoints""" | |
| data_models = [] | |
| processed_schemas = set() | |
| relevant_schemas = set() | |
| if not self.schema_objects: | |
| return data_models | |
| # First, identify which schemas are referenced by the selected endpoints | |
| if self.selected_endpoints: | |
| # Extract endpoints based on selected paths and methods | |
| selected_paths = {} | |
| for path, method in self.selected_endpoints: | |
| if path not in selected_paths: | |
| selected_paths[path] = [] | |
| selected_paths[path].append(method.lower()) | |
| # Process endpoints to find referenced schemas | |
| for path, methods in self.endpoints.items(): | |
| if path not in selected_paths: | |
| continue | |
| for method, details in methods.items(): | |
| if method.lower() not in selected_paths.get(path, []): | |
| continue | |
| # Check response schema | |
| response_schema = details.get('responseSchema') | |
| if response_schema: | |
| self._collect_referenced_schemas(response_schema, relevant_schemas) | |
| # Check request body schema | |
| request_body = details.get('requestBody') | |
| if request_body: | |
| self._collect_referenced_schemas(request_body, relevant_schemas) | |
| # Check parameter schemas | |
| for param in details.get('parameters', []): | |
| param_schema = param.get('schema', {}) | |
| if param_schema: | |
| self._collect_referenced_schemas(param_schema, relevant_schemas) | |
| else: | |
| # If no endpoints are selected, include all schemas | |
| relevant_schemas = set(self.schema_objects.keys()) | |
| # Process only the relevant schema objects | |
| for schema_name in relevant_schemas: | |
| if schema_name in processed_schemas: | |
| continue | |
| schema_def = self.schema_objects.get(schema_name) | |
| if not schema_def: | |
| continue | |
| # Skip non-object schemas | |
| if schema_def.get('type') and schema_def.get('type') != 'object': | |
| continue | |
| table = self._create_table_from_schema(schema_name, schema_def) | |
| if table: | |
| # Add source information to the table | |
| table["Source"] = "Components/Schemas" | |
| table["Type"] = "DATA_MODEL" | |
| data_models.append(table) | |
| processed_schemas.add(schema_name) | |
| return data_models | |
| def _collect_referenced_schemas(self, schema: dict, referenced_schemas: set, visited_refs: set = None) -> None: | |
| """ | |
| Recursively collect all schema references from a schema object. | |
| Args: | |
| schema: The schema object to process | |
| referenced_schemas: Set to collect referenced schema names | |
| visited_refs: Set to track already visited references to prevent circular reference issues | |
| """ | |
| if not schema or not isinstance(schema, dict): | |
| return | |
| # Initialize visited_refs if not provided | |
| if visited_refs is None: | |
| visited_refs = set() | |
| # Handle direct reference | |
| if '$ref' in schema: | |
| ref = schema['$ref'] | |
| if ref.startswith('#/components/schemas/'): | |
| schema_name = ref.split('/')[-1] | |
| # Skip if we've already processed this reference to avoid circular references | |
| if ref in visited_refs: | |
| return | |
| # Add to referenced schemas and mark as visited | |
| referenced_schemas.add(schema_name) | |
| visited_refs.add(ref) | |
| # Recursively collect references from the referenced schema | |
| if self.schema_objects and schema_name in self.schema_objects: | |
| self._collect_referenced_schemas(self.schema_objects[schema_name], referenced_schemas, visited_refs) | |
| # Handle array items | |
| if schema.get('type') == 'array' and 'items' in schema: | |
| self._collect_referenced_schemas(schema['items'], referenced_schemas, visited_refs) | |
| # Handle object properties | |
| if schema.get('type') == 'object' and 'properties' in schema: | |
| for prop_name, prop_def in schema['properties'].items(): | |
| self._collect_referenced_schemas(prop_def, referenced_schemas, visited_refs) | |
| # Handle composition schemas | |
| for comp_key in ['allOf', 'oneOf', 'anyOf']: | |
| if comp_key in schema: | |
| for sub_schema in schema[comp_key]: | |
| self._collect_referenced_schemas(sub_schema, referenced_schemas, visited_refs) | |
| def _extract_endpoints(self) -> List[Dict[str, Any]]: | |
| """Extract endpoint information""" | |
| endpoints = [] | |
| # Extract common parameters | |
| common_params = self._extract_common_parameters() | |
| selected_paths = {} | |
| if self.selected_endpoints: | |
| for path, method in self.selected_endpoints: | |
| if path not in selected_paths: | |
| selected_paths[path] = [] | |
| selected_paths[path].append(method.lower()) | |
| # Process endpoints | |
| for path, methods in self.endpoints.items(): | |
| # Skip endpoints that are not in the selected_endpoints list if it's provided | |
| if self.selected_endpoints is not None and path not in selected_paths: | |
| continue | |
| for method, details in methods.items(): | |
| # Skip methods that are not in the selected methods for this path | |
| if self.selected_endpoints is not None and method.lower() not in selected_paths.get(path, []): | |
| continue | |
| if method.lower() in ['get', 'post', 'put', 'delete', 'patch']: | |
| endpoint = { | |
| "ID": self._sanitize_name(f"{method}_{path}"), | |
| "Name": details.get('operationId') or f"{method.upper()} {path}", | |
| "Path": path, | |
| "Method": method.upper(), | |
| "Summary": details.get('summary', ''), | |
| "Description": details.get('description', ''), | |
| "Parameters": [], | |
| "Type": "ENDPOINT" | |
| } | |
| # Process parameters | |
| for param in details.get('parameters', []): | |
| param_key = f"{param.get('name')}:{param.get('in')}" | |
| # Extract parameter schema and type | |
| param_schema = param.get('schema', {}) | |
| param_type = None | |
| # Try to get the type from the schema | |
| if param_schema: | |
| param_type = param_schema.get('type') | |
| # If schema has a reference, resolve it | |
| if '$ref' in param_schema: | |
| ref_schema = self._resolve_schema_reference(param_schema['$ref']) | |
| if ref_schema: | |
| param_type = ref_schema.get('type') | |
| # Default to string if no type is found | |
| if not param_type: | |
| param_type = 'string' | |
| # Check if this is a common parameter | |
| if param_key in common_params: | |
| endpoint["Parameters"].append({ | |
| "Name": param.get('name'), | |
| "In": param.get('in'), | |
| "Required": param.get('required', False), | |
| "Type": param_type, # Include type even for common parameters | |
| "IsCommon": True, | |
| "CommonRef": param_key | |
| }) | |
| else: | |
| endpoint["Parameters"].append({ | |
| "Name": param.get('name'), | |
| "In": param.get('in'), | |
| "Required": param.get('required', False), | |
| "Type": param_type, | |
| "Enum": param_schema.get('enum'), | |
| "Description": param.get('description', ''), | |
| "IsCommon": False | |
| }) | |
| # Process response schema | |
| response_schema = details.get('responseSchema') | |
| if response_schema: | |
| if '$ref' in response_schema: | |
| ref = response_schema['$ref'] | |
| schema_name = ref.split('/')[-1] | |
| endpoint["ResponseModel"] = schema_name | |
| endpoint["ResponseType"] = "object" | |
| elif response_schema.get('type') == 'array' and 'items' in response_schema: | |
| items = response_schema['items'] | |
| if '$ref' in items: | |
| ref = items['$ref'] | |
| schema_name = ref.split('/')[-1] | |
| endpoint["ResponseModel"] = schema_name | |
| endpoint["ResponseType"] = "array" | |
| else: | |
| # Inline schema | |
| endpoint["ResponseType"] = "array" | |
| endpoint["ResponseInlineSchema"] = items | |
| else: | |
| # Inline schema | |
| endpoint["ResponseType"] = response_schema.get('type', 'object') | |
| endpoint["ResponseInlineSchema"] = response_schema | |
| # Process request body | |
| request_body = details.get('requestBody') | |
| if request_body: | |
| if '$ref' in request_body: | |
| ref = request_body['$ref'] | |
| schema_name = ref.split('/')[-1] | |
| endpoint["RequestModel"] = schema_name | |
| else: | |
| # Inline schema | |
| endpoint["RequestInlineSchema"] = request_body | |
| endpoints.append(endpoint) | |
| return endpoints | |
| def _create_table_from_schema(self, schema_name: str, schema_def: dict) -> Optional[Dict[str, Any]]: | |
| """Create a table definition from a schema object""" | |
| if not schema_def or not isinstance(schema_def, dict): | |
| return None | |
| # Extract properties, handling references and composition | |
| properties = self._extract_properties_from_schema(schema_def) | |
| if not properties: | |
| return None | |
| # Get required properties | |
| required = schema_def.get('required', []) | |
| # Create table | |
| table = { | |
| "ID": self._sanitize_name(schema_name), | |
| "Name": schema_name, | |
| "Description": schema_def.get('description', ''), | |
| "Properties": [] | |
| } | |
| # Add properties | |
| for prop_name, prop_def in properties.items(): | |
| property_info = { | |
| "Name": prop_name, | |
| "Required": prop_name in required, | |
| "Description": prop_def.get('description', '') if isinstance(prop_def, dict) else '' | |
| } | |
| # Determine property type | |
| if isinstance(prop_def, dict): | |
| prop_type = prop_def.get('type') | |
| if prop_type: | |
| property_info["Type"] = prop_type | |
| # Handle enum values | |
| if 'enum' in prop_def: | |
| property_info["Enum"] = prop_def['enum'] | |
| # Handle array items | |
| if prop_type == 'array' and 'items' in prop_def: | |
| items = prop_def['items'] | |
| if '$ref' in items: | |
| ref = items['$ref'] | |
| schema_name = ref.split('/')[-1] | |
| property_info["ItemsType"] = "reference" | |
| property_info["ItemsRef"] = schema_name | |
| else: | |
| property_info["ItemsType"] = items.get('type', 'object') | |
| # Handle object properties | |
| if prop_type == 'object' and 'properties' in prop_def: | |
| property_info["ObjectProperties"] = [] | |
| for sub_prop_name, sub_prop_def in prop_def['properties'].items(): | |
| sub_prop_info = { | |
| "Name": sub_prop_name, | |
| "Type": sub_prop_def.get('type', 'string'), | |
| "Description": sub_prop_def.get('description', '') | |
| } | |
| property_info["ObjectProperties"].append(sub_prop_info) | |
| # Handle references | |
| if '$ref' in prop_def: | |
| ref = prop_def['$ref'] | |
| schema_name = ref.split('/')[-1] | |
| property_info["Type"] = "reference" | |
| property_info["Ref"] = schema_name | |
| table["Properties"].append(property_info) | |
| return table | |
| def _generate_datasource_value(self) -> str: | |
| """Generate a DataSource value based on API spec information""" | |
| # Extract server information | |
| servers = self.api_spec.get('servers', []) | |
| server_url = "https://api.example.com" | |
| if servers and 'url' in servers[0]: | |
| server_url = servers[0]['url'] | |
| # Extract authentication information | |
| auth_info = {} | |
| if self.auth_info: | |
| for scheme_name, scheme_details in self.auth_info.items(): | |
| auth_type = scheme_details.get('type') | |
| if auth_type == 'apiKey': | |
| auth_info['type'] = 'apiKey' | |
| auth_info['name'] = scheme_details.get('name', '') | |
| auth_info['in'] = scheme_details.get('in', 'header') | |
| elif auth_type == 'http': | |
| scheme = scheme_details.get('scheme', '').lower() | |
| auth_info['type'] = 'http' | |
| auth_info['scheme'] = scheme | |
| elif auth_type == 'oauth2': | |
| auth_info['type'] = 'oauth2' | |
| flows = scheme_details.get('flows', {}) | |
| if 'clientCredentials' in flows: | |
| auth_info['flow'] = 'clientCredentials' | |
| auth_info['tokenUrl'] = flows['clientCredentials'].get('tokenUrl', '') | |
| elif 'password' in flows: | |
| auth_info['flow'] = 'password' | |
| auth_info['tokenUrl'] = flows['password'].get('tokenUrl', '') | |
| elif 'authorizationCode' in flows: | |
| auth_info['flow'] = 'authorizationCode' | |
| auth_info['authorizationUrl'] = flows['authorizationCode'].get('authorizationUrl', '') | |
| auth_info['tokenUrl'] = flows['authorizationCode'].get('tokenUrl', '') | |
| # Create a connection string based on the extracted information | |
| connection_parts = [] | |
| connection_parts.append(f"url={server_url}") | |
| if auth_info: | |
| connection_parts.append(f"auth_type={auth_info.get('type', 'none')}") | |
| if auth_info.get('type') == 'apiKey': | |
| connection_parts.append(f"api_key_name={auth_info.get('name', '')}") | |
| connection_parts.append(f"api_key_in={auth_info.get('in', '')}") | |
| elif auth_info.get('type') == 'http': | |
| connection_parts.append(f"http_scheme={auth_info.get('scheme', '')}") | |
| elif auth_info.get('type') == 'oauth2': | |
| connection_parts.append(f"oauth2_flow={auth_info.get('flow', '')}") | |
| if 'tokenUrl' in auth_info: | |
| connection_parts.append(f"token_url={auth_info.get('tokenUrl', '')}") | |
| if 'authorizationUrl' in auth_info: | |
| connection_parts.append(f"authorization_url={auth_info.get('authorizationUrl', '')}") | |
| # Add API name and version | |
| if self.api_info: | |
| connection_parts.append(f"api_name={self.api_name}") | |
| connection_parts.append(f"api_version={self.api_info.get('version', '1.0.0')}") | |
| # Create the connection string | |
| connection_string = ";".join(connection_parts) | |
| # Simulate AES encryption with a prefix | |
| # In a real implementation, this would be properly encrypted | |
| return f"{{AES}}{connection_string}" | |
| def generate_default_schema(self) -> str: | |
| """Generate the default schema XML file""" | |
| if not self.api_info: | |
| self.extract_api_info() | |
| # Create XML structure | |
| root = ET.Element("Xml") | |
| org = ET.SubElement(root, "ORG", Name=self.api_name.lower()) | |
| ET.SubElement(org, "AccessMethod").text = "PDM" | |
| pdm = ET.SubElement(org, "PDM") | |
| ET.SubElement(pdm, "Name").text = self.api_name.lower() | |
| # Generate DataSource value from API spec information | |
| datasource_value = self._generate_datasource_value() | |
| ET.SubElement(pdm, "DataSource").text = datasource_value | |
| ET.SubElement(pdm, "DataSourceType").text = "XML" | |
| # Add tables | |
| tables = ET.SubElement(pdm, "Tables") | |
| # We no longer include common parameters section | |
| # Extract data models | |
| data_models = self._extract_data_models() | |
| # Add a section for data models | |
| if data_models: | |
| tables.append(ET.Comment(" DATA MODELS ")) | |
| for model in data_models: | |
| model_class = ET.SubElement(tables, "Class", type=model["ID"]) | |
| ET.SubElement(model_class, "Name").text = model["Name"] | |
| ET.SubElement(model_class, "Type").text = "DATA_MODEL" | |
| ET.SubElement(model_class, "Description").text = model.get("Description", "") | |
| properties = ET.SubElement(model_class, "Properties") | |
| for prop in model.get("Properties", []): | |
| prop_elem = ET.SubElement(properties, "Property", ID=self._sanitize_name(prop["Name"])) | |
| ET.SubElement(prop_elem, "Name").text = prop["Name"] | |
| ET.SubElement(prop_elem, "Type").text = prop.get("Type", "string") | |
| ET.SubElement(prop_elem, "Required").text = str(prop.get("Required", False)).lower() | |
| ET.SubElement(prop_elem, "Description").text = prop.get("Description", "") | |
| # Handle references | |
| if "Ref" in prop: | |
| ET.SubElement(prop_elem, "ClassType").text = prop["Ref"] | |
| # Handle arrays | |
| if prop.get("Type") == "array" and "ItemsType" in prop: | |
| items = ET.SubElement(prop_elem, "Items") | |
| ET.SubElement(items, "Type").text = prop["ItemsType"] | |
| if "ItemsRef" in prop: | |
| ET.SubElement(items, "ClassType").text = prop["ItemsRef"] | |
| # Handle object properties | |
| if prop.get("Type") == "object" and "ObjectProperties" in prop: | |
| obj_props = ET.SubElement(prop_elem, "ObjectProperties") | |
| for obj_prop in prop["ObjectProperties"]: | |
| obj_prop_elem = ET.SubElement(obj_props, "Property", ID=self._sanitize_name(obj_prop["Name"])) | |
| ET.SubElement(obj_prop_elem, "Name").text = obj_prop["Name"] | |
| ET.SubElement(obj_prop_elem, "Type").text = obj_prop.get("Type", "string") | |
| ET.SubElement(obj_prop_elem, "Description").text = obj_prop.get("Description", "") | |
| # Extract endpoints | |
| endpoints_list = self._extract_endpoints() | |
| # Add a section for endpoints | |
| if endpoints_list: | |
| tables.append(ET.Comment(" API ENDPOINTS ")) | |
| for endpoint in endpoints_list: | |
| endpoint_elem = ET.SubElement(tables, "Endpoint", ID=endpoint["ID"]) | |
| ET.SubElement(endpoint_elem, "Name").text = endpoint["Name"] | |
| ET.SubElement(endpoint_elem, "Type").text = "ENDPOINT" | |
| ET.SubElement(endpoint_elem, "Path").text = endpoint["Path"] | |
| ET.SubElement(endpoint_elem, "Method").text = endpoint["Method"] | |
| ET.SubElement(endpoint_elem, "Summary").text = endpoint.get("Summary", "") | |
| ET.SubElement(endpoint_elem, "Description").text = endpoint.get("Description", "") | |
| # Add parameters | |
| if endpoint.get("Parameters"): | |
| params = ET.SubElement(endpoint_elem, "Parameters") | |
| for param in endpoint["Parameters"]: | |
| param_elem = ET.SubElement(params, "Parameter", ID=self._sanitize_name(param["Name"])) | |
| ET.SubElement(param_elem, "Name").text = param["Name"] | |
| ET.SubElement(param_elem, "In").text = param["In"] | |
| ET.SubElement(param_elem, "Required").text = str(param.get("Required", False)).lower() | |
| # Always include Type for all parameters | |
| ET.SubElement(param_elem, "Type").text = param.get("Type", "string") | |
| # Handle common parameters | |
| if param.get("IsCommon", False): | |
| ET.SubElement(param_elem, "IsCommon").text = "true" | |
| ET.SubElement(param_elem, "CommonRef").text = param["CommonRef"] | |
| else: | |
| ET.SubElement(param_elem, "IsCommon").text = "false" | |
| ET.SubElement(param_elem, "Description").text = param.get("Description", "") | |
| # Add response information | |
| if "ResponseModel" in endpoint: | |
| response = ET.SubElement(endpoint_elem, "Response") | |
| ET.SubElement(response, "Type").text = endpoint.get("ResponseType", "object") | |
| ET.SubElement(response, "ClassType").text = endpoint["ResponseModel"] | |
| elif "ResponseInlineSchema" in endpoint: | |
| response = ET.SubElement(endpoint_elem, "Response") | |
| ET.SubElement(response, "Type").text = endpoint.get("ResponseType", "object") | |
| ET.SubElement(response, "InlineSchema").text = json.dumps(endpoint["ResponseInlineSchema"]) | |
| # Add request body information | |
| if "RequestModel" in endpoint: | |
| request = ET.SubElement(endpoint_elem, "Request") | |
| ET.SubElement(request, "ClassType").text = endpoint["RequestModel"] | |
| elif "RequestInlineSchema" in endpoint: | |
| request = ET.SubElement(endpoint_elem, "Request") | |
| ET.SubElement(request, "InlineSchema").text = json.dumps(endpoint["RequestInlineSchema"]) | |
| # Convert to pretty XML string | |
| rough_string = ET.tostring(root, 'utf-8') | |
| reparsed = xml.dom.minidom.parseString(rough_string) | |
| return reparsed.toprettyxml(indent=" ") | |
| def save_datasource_plugin_meta(self, output_path: str) -> None: | |
| """Save the datasource plugin meta JSON file""" | |
| meta_data = self.generate_datasource_plugin_meta() | |
| with open(output_path, 'w') as f: | |
| json.dump(meta_data, f, indent=2) | |
| print(f"Datasource plugin meta saved to: {output_path}") | |
| def save_default_schema(self, output_path: str) -> None: | |
| """Save the default schema XML file""" | |
| schema_xml = self.generate_default_schema() | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| f.write(schema_xml) | |
| print(f"Default schema saved to: {output_path}") | |
| def generate_files(self, output_dir: str = None) -> Tuple[str, str]: | |
| """Generate both the datasource plugin meta JSON and default schema XML files. | |
| Args: | |
| output_dir: Optional directory to save the files (defaults to current directory) | |
| Returns: | |
| Tuple of (meta_json_path, schema_xml_path) | |
| """ | |
| if not self.api_info: | |
| self.extract_api_info() | |
| # Set default output directory if not provided | |
| if not output_dir: | |
| output_dir = os.getcwd() | |
| else: | |
| # Create directory if it doesn't exist | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Generate filenames | |
| meta_filename = f"{self.api_name.lower()}_datasource_plugin_meta.json" | |
| schema_filename = f"{self.api_name.lower()}_default_schema.orx" | |
| meta_path = os.path.join(output_dir, meta_filename) | |
| schema_path = os.path.join(output_dir, schema_filename) | |
| # Save files | |
| self.save_datasource_plugin_meta(meta_path) | |
| self.save_default_schema(schema_path) | |
| return (meta_path, schema_path) | |